This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 59e0cfb Ack response implement (#8161)
59e0cfb is described below
commit 59e0cfb580e4e6d22e1ea8284f5866c1c5a6fd07
Author: congbo <[email protected]>
AuthorDate: Wed Oct 14 15:40:08 2020 +0800
Ack response implement (#8161)
## Motivation
Now acknowledge messages will not get response, but acknowledge messages
with transaction will check the acknowledge whether conflict with other
transaction in pendingack. We can commit or abort the transaction after we get
the response.
## Modification
- We add requestId field in ack protocol and it type is optional.
- Server handle the ack command, if the command carry the requestId, server
will return the response.
- In normal, we don't need to get response. Unless we ack with transaction.
## Compatibility
This PR is for transaction, and don't change any client api, so we don't
need to think about the compatibility question.
---
.../broker/service/BrokerServiceException.java | 3 +
.../org/apache/pulsar/broker/service/Consumer.java | 28 ++---
.../apache/pulsar/broker/service/ServerCnx.java | 14 ++-
.../service/persistent/PersistentSubscription.java | 14 +--
.../persistent/PersistentSubscriptionTest.java | 19 ++--
.../client/impl/ConsumerAckResponseTest.java | 95 ++++++++++++++++
.../TransactionEndToEndTest.java} | 10 +-
.../pulsar/client/api/PulsarClientException.java | 62 ++++++++++
pulsar-client-cpp/include/pulsar/Result.h | 1 +
pulsar-client-cpp/lib/ClientConnection.cc | 3 +
pulsar-client-cpp/lib/Result.cc | 3 +
.../pulsar/client/impl/BatchMessageAcker.java | 4 +
.../org/apache/pulsar/client/impl/ClientCnx.java | 18 +++
.../apache/pulsar/client/impl/ConsumerBase.java | 3 +-
.../apache/pulsar/client/impl/ConsumerImpl.java | 126 ++++++++++++++++++++-
.../PersistentAcknowledgmentsGroupingTracker.java | 4 +-
.../impl/conf/ConsumerConfigurationData.java | 4 +-
.../apache/pulsar/common/api/proto/PulsarApi.java | 117 +++++++++++++++++++
.../apache/pulsar/common/protocol/Commands.java | 40 +++----
.../pulsar/common/protocol/PulsarDecoder.java | 11 ++
pulsar-common/src/main/proto/PulsarApi.proto | 4 +
21 files changed, 518 insertions(+), 65 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
index 7ec97ff..d4bcd8a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
@@ -22,6 +22,7 @@ import
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaExce
import
org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
+import
org.apache.pulsar.transaction.common.exception.TransactionConflictException;
import
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
/**
@@ -214,6 +215,8 @@ public class BrokerServiceException extends Exception {
return ServerError.InvalidTxnStatus;
} else if (t instanceof NotAllowedException) {
return ServerError.NotAllowedError;
+ } else if (t instanceof TransactionConflictException) {
+ return ServerError.TransactionConflict;
} else {
if (checkCauseIfUnknown) {
return getClientErrorCode(t.getCause(), false);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 0edb24d..0e93d35 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -34,6 +34,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;
@@ -60,6 +61,7 @@ import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.stats.Rate;
import org.apache.pulsar.common.util.DateFormatter;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.SafeCollectionUtils;
import
org.apache.pulsar.transaction.common.exception.TransactionConflictException;
import org.slf4j.Logger;
@@ -388,24 +390,22 @@ public class Consumer {
});
}
- void messageAcked(CommandAck ack) {
+ CompletableFuture<Void> messageAcked(CommandAck ack) {
this.lastAckedTimestamp = System.currentTimeMillis();
Map<String,Long> properties = Collections.emptyMap();
if (ack.getPropertiesCount() > 0) {
properties = ack.getPropertiesList().stream()
- .collect(Collectors.toMap((e) -> e.getKey(),
- (e) -> e.getValue()));
+ .collect(Collectors.toMap(PulsarApi.KeyLongValue::getKey,
+ PulsarApi.KeyLongValue::getValue));
}
if (ack.getAckType() == AckType.Cumulative) {
if (ack.getMessageIdCount() != 1) {
log.warn("[{}] [{}] Received multi-message ack", subscription,
consumerId);
- return;
}
if (Subscription.isIndividualAckMode(subType)) {
log.warn("[{}] [{}] Received cumulative ack on shared
subscription, ignoring", subscription, consumerId);
- return;
}
PositionImpl position = PositionImpl.earliest;
if (ack.getMessageIdCount() == 1) {
@@ -418,7 +418,7 @@ public class Consumer {
}
List<Position> positionsAcked =
Collections.singletonList(position);
if (ack.hasTxnidMostBits() && ack.hasTxnidLeastBits()) {
- transactionAcknowledge(ack.getTxnidMostBits(),
ack.getTxnidLeastBits(), positionsAcked, AckType.Cumulative);
+ return transactionAcknowledge(ack.getTxnidMostBits(),
ack.getTxnidLeastBits(), positionsAcked, AckType.Cumulative);
} else {
subscription.acknowledgeMessage(positionsAcked,
AckType.Cumulative, properties);
}
@@ -445,24 +445,24 @@ public class Consumer {
}
}
if (ack.hasTxnidMostBits() && ack.hasTxnidLeastBits()) {
- transactionAcknowledge(ack.getTxnidMostBits(),
ack.getTxnidLeastBits(), positionsAcked, AckType.Individual);
+ return transactionAcknowledge(ack.getTxnidMostBits(),
ack.getTxnidLeastBits(), positionsAcked, AckType.Individual);
} else {
subscription.acknowledgeMessage(positionsAcked,
AckType.Individual, properties);
}
}
+
+ return CompletableFuture.completedFuture(null);
}
- private void transactionAcknowledge(long txnidMostBits, long
txnidLeastBits,
+ private CompletableFuture<Void> transactionAcknowledge(long txnidMostBits,
long txnidLeastBits,
List<Position> positionList, AckType
ackType) {
if (subscription instanceof PersistentSubscription) {
TxnID txnID = new TxnID(txnidMostBits, txnidLeastBits);
- try {
- ((PersistentSubscription)
subscription).acknowledgeMessage(txnID, positionList, ackType);
- } catch (TransactionConflictException e) {
- log.error("Transaction acknowledge failed for txn " + txnID,
e);
- }
+ return ((PersistentSubscription)
subscription).acknowledgeMessage(txnID, positionList, ackType);
} else {
- log.error("Transaction acknowledge only support the
`PersistentSubscription`.");
+ String error = "Transaction acknowledge only support the
`PersistentSubscription`.";
+ log.error(error);
+ return FutureUtil.failedFuture(new
TransactionConflictException(error));
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index aa72c7a..505ecf2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1264,7 +1264,19 @@ public class ServerCnx extends PulsarHandler {
CompletableFuture<Consumer> consumerFuture =
consumers.get(ack.getConsumerId());
if (consumerFuture != null && consumerFuture.isDone() &&
!consumerFuture.isCompletedExceptionally()) {
- consumerFuture.getNow(null).messageAcked(ack);
+ consumerFuture.getNow(null).messageAcked(ack).thenRun(() -> {
+ if (ack.hasRequestId()) {
+ ctx.writeAndFlush(Commands.newAckResponse(
+ ack.getRequestId(), null, null,
ack.getConsumerId()));
+ }
+ }).exceptionally(e -> {
+ if (ack.hasRequestId()) {
+
ctx.writeAndFlush(Commands.newAckResponse(ack.getRequestId(),
+
BrokerServiceException.getClientErrorCode(e),
+ e.getMessage(), ack.getConsumerId()));
+ }
+ return null;
+ });
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 90be062..20ba166 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -67,7 +67,6 @@ import
org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsS
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
-import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
@@ -416,7 +415,7 @@ public class PersistentSubscription implements Subscription
{
* cumulative ack or try to single ack message already acked by any
ongoing transaction.
* @throws IllegalArgumentException if try to cumulative ack but passed in
multiple positions.
*/
- public synchronized void acknowledgeMessage(TxnID txnId, List<Position>
positions, AckType ackType) throws TransactionConflictException {
+ public synchronized CompletableFuture<Void> acknowledgeMessage(TxnID
txnId, List<Position> positions, AckType ackType) {
checkArgument(txnId != null, "TransactionID can not be null.");
if (AckType.Cumulative == ackType) {
// Check if another transaction is already using cumulative ack on
this subscription.
@@ -425,14 +424,14 @@ public class PersistentSubscription implements
Subscription {
" try to cumulative ack message while
transaction:" + this.pendingCumulativeAckTxnId +
" already cumulative acked messages.";
log.error(errorMsg);
- throw new TransactionConflictException(errorMsg);
+ return FutureUtil.failedFuture(new
TransactionConflictException(errorMsg));
}
if (positions.size() != 1) {
String errorMsg = "[" + topicName + "][" + subName + "]
Transaction:" + txnId +
" invalid cumulative ack received with
multiple message ids.";
log.error(errorMsg);
- throw new IllegalArgumentException(errorMsg);
+ return FutureUtil.failedFuture(new
TransactionConflictException(errorMsg));
}
Position position = positions.get(0);
@@ -443,7 +442,7 @@ public class PersistentSubscription implements Subscription
{
" try to cumulative ack position: " + position + "
within range of cursor's " +
"markDeletePosition: " +
cursor.getMarkDeletedPosition();
log.error(errorMsg);
- throw new TransactionConflictException(errorMsg);
+ return FutureUtil.failedFuture(new
TransactionConflictException(errorMsg));
}
if (log.isDebugEnabled()) {
@@ -481,7 +480,7 @@ public class PersistentSubscription implements Subscription
{
String errorMsg = "[" + topicName + "][" + subName + "]
Transaction:" + txnId +
" try to ack message:" + position + " in
pending ack status.";
log.error(errorMsg);
- throw new TransactionConflictException(errorMsg);
+ return FutureUtil.failedFuture(new
TransactionConflictException(errorMsg));
}
// If try to ack message already acked by committed
transaction or normal acknowledge, throw exception.
@@ -489,13 +488,14 @@ public class PersistentSubscription implements
Subscription {
String errorMsg = "[" + topicName + "][" + subName + "]
Transaction:" + txnId +
" try to ack message:" + position + " already
acked before.";
log.error(errorMsg);
- throw new TransactionConflictException(errorMsg);
+ return FutureUtil.failedFuture(new
TransactionConflictException(errorMsg));
}
pendingAckMessageForCurrentTxn.add(position);
this.pendingAckMessages.add(position);
}
}
+ return CompletableFuture.completedFuture(null);
}
private final MarkDeleteCallback markDeleteCallback = new
MarkDeleteCallback() {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
index 9d75762..4379375 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
@@ -40,6 +40,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -62,7 +63,6 @@ import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.compaction.Compactor;
-import
org.apache.pulsar.transaction.common.exception.TransactionConflictException;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.ZooKeeper;
@@ -165,7 +165,7 @@ public class PersistentSubscriptionTest {
}
@Test
- public void testCanAcknowledgeAndCommitForTransaction() throws
TransactionConflictException {
+ public void testCanAcknowledgeAndCommitForTransaction() {
List<Position> expectedSinglePositions = new ArrayList<>();
expectedSinglePositions.add(new PositionImpl(1, 1));
expectedSinglePositions.add(new PositionImpl(1, 3));
@@ -208,7 +208,7 @@ public class PersistentSubscriptionTest {
}
@Test
- public void testCanAcknowledgeAndAbortForTransaction() throws
TransactionConflictException, BrokerServiceException {
+ public void testCanAcknowledgeAndAbortForTransaction() throws
BrokerServiceException, InterruptedException {
List<Position> positions = new ArrayList<>();
positions.add(new PositionImpl(2, 1));
positions.add(new PositionImpl(2, 3));
@@ -242,10 +242,10 @@ public class PersistentSubscriptionTest {
// Can not single ack message already acked.
try {
- persistentSubscription.acknowledgeMessage(txnID2, positions,
AckType.Individual);
+ persistentSubscription.acknowledgeMessage(txnID2, positions,
AckType.Individual).get();
fail("Single acknowledge for transaction2 should fail. ");
- } catch (TransactionConflictException e) {
-
assertEquals(e.getMessage(),"[persistent://prop/use/ns-abc/successTopic][subscriptionName]
" +
+ } catch (ExecutionException e) {
+
assertEquals(e.getCause().getMessage(),"[persistent://prop/use/ns-abc/successTopic][subscriptionName]
" +
"Transaction:(1,2) try to ack message:2:1 in pending ack
status.");
}
@@ -254,11 +254,10 @@ public class PersistentSubscriptionTest {
// Can not cumulative ack message for another txn.
try {
- persistentSubscription.acknowledgeMessage(txnID2, positions,
AckType.Cumulative);
+ persistentSubscription.acknowledgeMessage(txnID2, positions,
AckType.Cumulative).get();
fail("Cumulative acknowledge for transaction2 should fail. ");
- } catch (TransactionConflictException e) {
- System.out.println(e.getMessage());
-
assertEquals(e.getMessage(),"[persistent://prop/use/ns-abc/successTopic][subscriptionName]
" +
+ } catch (ExecutionException e) {
+
assertEquals(e.getCause().getMessage(),"[persistent://prop/use/ns-abc/successTopic][subscriptionName]
" +
"Transaction:(1,2) try to cumulative ack message while
transaction:(1,1) already cumulative acked messages.");
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckResponseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckResponseTest.java
new file mode 100644
index 0000000..55fe046
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckResponseTest.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import lombok.Cleanup;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.transaction.TransactionImpl;
+
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+
+public class ConsumerAckResponseTest extends ProducerConsumerBase {
+
+ private final static TransactionImpl transaction =
mock(TransactionImpl.class);
+
+ @BeforeClass
+ public void setup() throws Exception {
+ super.internalSetup();
+ super.producerBaseSetup();
+ doReturn(1L).when(transaction).getTxnIdLeastBits();
+ doReturn(1L).when(transaction).getTxnIdMostBits();
+ CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+ doReturn(completableFuture).when(transaction).registerAckOp(any());
+ doNothing().when(transaction).registerAckedTopic(any(), any());
+
+ Thread.sleep(1000 * 3);
+ }
+
+ @AfterClass
+ public void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testAckResponse() throws PulsarClientException,
InterruptedException, ExecutionException {
+ String topic = "testAckResponse";
+ @Cleanup
+ Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
+ .topic(topic)
+ .enableBatching(false)
+ .create();
+ @Cleanup
+ ConsumerImpl<Integer> consumer = (ConsumerImpl<Integer>)
pulsarClient.newConsumer(Schema.INT32)
+ .topic(topic)
+ .subscriptionName("sub")
+ .subscriptionType(SubscriptionType.Shared)
+ .ackTimeout(1, TimeUnit.SECONDS)
+ .subscribe();
+ producer.send(1);
+ producer.send(2);
+ try {
+ consumer.acknowledgeAsync(new MessageIdImpl(1, 1, 1),
transaction).get();
+ fail();
+ } catch (ExecutionException e) {
+ Assert.assertTrue(e.getCause() instanceof
PulsarClientException.TransactionConflictException);
+ }
+ Message<Integer> message = consumer.receive();
+ consumer.acknowledgeAsync(message.getMessageId(), transaction).get();
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/transaction/EndToEndTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
similarity index 98%
rename from
pulsar-broker/src/test/java/org/apache/pulsar/client/transaction/EndToEndTest.java
rename to
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index 0da8cca..f9a5c44 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/transaction/EndToEndTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.client.transaction;
+package org.apache.pulsar.client.impl;
import static java.nio.charset.StandardCharsets.UTF_8;
@@ -57,7 +57,7 @@ import org.testng.annotations.Test;
* End to end transaction test.
*/
@Slf4j
-public class EndToEndTest extends TransactionTestBase {
+public class TransactionEndToEndTest extends TransactionTestBase {
private final static int TOPIC_PARTITION = 3;
@@ -248,9 +248,9 @@ public class EndToEndTest extends TransactionTestBase {
Message<byte[]> message = consumer.receive();
Assert.assertNotNull(message);
log.info("receive msgId: {}", message.getMessageId());
- consumer.acknowledgeAsync(message.getMessageId(), txn);
+ consumer.acknowledgeAsync(message.getMessageId(), txn).get();
}
- Thread.sleep(2000);
+ Thread.sleep(1000);
consumer.redeliverUnacknowledgedMessages();
@@ -266,7 +266,7 @@ public class EndToEndTest extends TransactionTestBase {
for (int i = 0; i < messageCnt; i++) {
message = consumer.receive(2, TimeUnit.SECONDS);
Assert.assertNotNull(message);
- consumer.acknowledgeAsync(message.getMessageId(), commitTxn);
+ consumer.acknowledgeAsync(message.getMessageId(),
commitTxn).get();
log.info("receive msgId: {}", message.getMessageId());
}
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
index 597e0d5..457d87c 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
@@ -763,6 +763,58 @@ public class PulsarClientException extends IOException {
}
}
+ /**
+ * Consumer assign exception thrown by Pulsar client.
+ */
+ public static class MessageAcknowledgeException extends
PulsarClientException {
+
+ /**
+ * Constructs an {@code MessageAcknowledgeException} with the
specified cause.
+ *
+ * @param t
+ * The cause (which is saved for later retrieval by the
+ * {@link #getCause()} method). (A null value is permitted,
+ * and indicates that the cause is nonexistent or unknown.)
+ */
+ public MessageAcknowledgeException(Throwable t) {
+ super(t);
+ }
+
+ /**
+ * Constructs an {@code MessageAcknowledgeException} with the
specified detail message.
+ * @param msg The detail message.
+ */
+ public MessageAcknowledgeException(String msg) {
+ super(msg);
+ }
+ }
+
+ /**
+ * Consumer assign exception thrown by Pulsar client.
+ */
+ public static class TransactionConflictException extends
PulsarClientException {
+
+ /**
+ * Constructs an {@code TransactionConflictException} with the
specified cause.
+ *
+ * @param t
+ * The cause (which is saved for later retrieval by the
+ * {@link #getCause()} method). (A null value is permitted,
+ * and indicates that the cause is nonexistent or unknown.)
+ */
+ public TransactionConflictException(Throwable t) {
+ super(t);
+ }
+
+ /**
+ * Constructs an {@code TransactionConflictException} with the
specified detail message.
+ * @param msg The detail message.
+ */
+ public TransactionConflictException(String msg) {
+ super(msg);
+ }
+ }
+
// wrap an exception to enriching more info messages.
public static Throwable wrap(Throwable t, String msg) {
msg += "\n" + t.getMessage();
@@ -821,6 +873,10 @@ public class PulsarClientException extends IOException {
return new CryptoException(msg);
} else if (t instanceof ConsumerAssignException) {
return new ConsumerAssignException(msg);
+ } else if (t instanceof MessageAcknowledgeException) {
+ return new MessageAcknowledgeException(msg);
+ } else if (t instanceof TransactionConflictException) {
+ return new TransactionConflictException(msg);
} else if (t instanceof PulsarClientException) {
return new PulsarClientException(msg);
} else if (t instanceof CompletionException) {
@@ -906,6 +962,10 @@ public class PulsarClientException extends IOException {
return new CryptoException(msg);
} else if (cause instanceof ConsumerAssignException) {
return new ConsumerAssignException(msg);
+ } else if (cause instanceof MessageAcknowledgeException) {
+ return new MessageAcknowledgeException(msg);
+ } else if (cause instanceof TransactionConflictException) {
+ return new TransactionConflictException(msg);
} else if (cause instanceof TopicDoesNotExistException) {
return new TopicDoesNotExistException(msg);
} else {
@@ -936,6 +996,8 @@ public class PulsarClientException extends IOException {
|| t instanceof ChecksumException
|| t instanceof CryptoException
|| t instanceof ConsumerAssignException
+ || t instanceof MessageAcknowledgeException
+ || t instanceof TransactionConflictException
|| t instanceof ProducerBusyException
|| t instanceof ConsumerBusyException) {
return false;
diff --git a/pulsar-client-cpp/include/pulsar/Result.h
b/pulsar-client-cpp/include/pulsar/Result.h
index 6dd7e45..01a2474 100644
--- a/pulsar-client-cpp/include/pulsar/Result.h
+++ b/pulsar-client-cpp/include/pulsar/Result.h
@@ -83,6 +83,7 @@ enum Result
ResultTransactionCoordinatorNotFoundError, /// Transaction
coordinator not found
ResultInvalidTxnStatusError, /// Invalid txn status
error
ResultNotAllowedError, /// Not allowed
+ ResultTransactionConflict, /// Transaction ack
conflict
};
// Return string representation of result code
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc
b/pulsar-client-cpp/lib/ClientConnection.cc
index ae44b52..22cc420 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -120,6 +120,9 @@ static Result getResult(ServerError serverError) {
case NotAllowedError:
return ResultNotAllowedError;
+
+ case TransactionConflict:
+ return ResultTransactionConflict;
}
// NOTE : Do not add default case in the switch above. In future if we get
new cases for
// ServerError and miss them in the switch above we would like to get
notified. Adding
diff --git a/pulsar-client-cpp/lib/Result.cc b/pulsar-client-cpp/lib/Result.cc
index 3c1c2a8..5f074a4 100644
--- a/pulsar-client-cpp/lib/Result.cc
+++ b/pulsar-client-cpp/lib/Result.cc
@@ -147,6 +147,9 @@ const char* strResult(Result result) {
case ResultNotAllowedError:
return "ResultNotAllowedError";
+
+ case ResultTransactionConflict:
+ return "ResultTransactionConflict";
};
// NOTE : Do not add default case in the switch above. In future if we get
new cases for
// ServerError and miss them in the switch above we would like to get
notified. Adding
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
index e34d1a1..e0b8197 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
@@ -63,6 +63,10 @@ class BatchMessageAcker {
return bitSet.isEmpty();
}
+ public synchronized int getBitSetSize() {
+ return bitSet.size();
+ }
+
public synchronized boolean ackCumulative(int batchIndex) {
// +1 since to argument is exclusive
bitSet.clear(0, batchIndex + 1);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 936b27f..056b75a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -64,6 +64,7 @@ import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.PulsarHandler;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAckResponse;
import
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseConsumer;
@@ -90,6 +91,7 @@ import
org.apache.pulsar.common.protocol.schema.SchemaInfoUtil;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
+import
org.apache.pulsar.transaction.common.exception.TransactionConflictException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -401,6 +403,20 @@ public class ClientCnx extends PulsarHandler {
}
@Override
+ protected void handleAckResponse(CommandAckResponse ackResponse) {
+ checkArgument(state == State.Ready);
+ checkArgument(ackResponse.getRequestId() >= 0);
+ long consumerId = ackResponse.getConsumerId();
+ if (!ackResponse.hasError()) {
+ consumers.get(consumerId).ackReceipt(ackResponse.getRequestId());
+ } else {
+ consumers.get(consumerId).ackError(ackResponse.getRequestId(),
+ getPulsarClientException(ackResponse.getError(),
ackResponse.getMessage()));
+ }
+ }
+
+
+ @Override
protected void handleMessage(CommandMessage cmdMessage, ByteBuf
headersAndPayload) {
checkArgument(state == State.Ready);
@@ -1029,6 +1045,8 @@ public class ClientCnx extends PulsarHandler {
return new PulsarClientException.ConsumerAssignException(errorMsg);
case NotAllowedError:
return new PulsarClientException.NotAllowedException(errorMsg);
+ case TransactionConflict:
+ return new
PulsarClientException.TransactionConflictException(errorMsg);
case UnknownError:
default:
return new PulsarClientException(errorMsg);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index db1b661..8c67f2c 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -448,7 +448,8 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
// committed
txn.registerAckedTopic(getTopic(), subscription);
// register the ackFuture as part of the transaction
- return txn.registerAckOp(ackFuture);
+ txn.registerAckOp(ackFuture);
+ return ackFuture;
} else {
return ackFuture;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index bfbc6ed..5b6bcfa 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -77,7 +77,9 @@ import
org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TypedMessageBuilder;
+import
org.apache.pulsar.client.api.PulsarClientException.MessageAcknowledgeException;
import
org.apache.pulsar.client.api.PulsarClientException.TopicDoesNotExistException;
+import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
@@ -87,6 +89,7 @@ import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAckResponse;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.ValidationError;
import
org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.PulsarApi.CompressionType;
@@ -103,8 +106,10 @@ import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.SafeCollectionUtils;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
+import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
+import
org.apache.pulsar.transaction.common.exception.TransactionConflictException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -186,6 +191,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
private final boolean createTopicIfDoesNotExist;
+ private final ConcurrentLongHashMap<OpForAckCallBack> ackRequests;
+
private final AtomicReference<ClientCnx>
clientCnxUsedForConsumerRegistration = new AtomicReference<>();
static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
@@ -328,6 +335,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
topicNameWithoutPartition = topicName.getPartitionedTopicName();
+ this.ackRequests = new ConcurrentLongHashMap<>(16, 1);
grabCnx();
}
@@ -535,6 +543,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
return FutureUtil.failedFuture(exception);
}
+ if (txn != null) {
+ return doAcknowledgeForResponse(messageId, ackType, null,
properties,
+ new TxnID(txn.getTxnIdMostBits(),
txn.getTxnIdLeastBits()));
+ }
+
if (messageId instanceof BatchMessageIdImpl) {
BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
if (ackType == AckType.Cumulative && txn != null) {
@@ -1037,6 +1050,16 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
possibleSendToDeadLetterTopicMessages.clear();
}
+ if (!ackRequests.isEmpty()) {
+ ackRequests.forEach((key, value) -> {
+ value.callback
+ .completeExceptionally(new
MessageAcknowledgeException("Consumer has closed!"));
+ value.recycle();
+ });
+
+ ackRequests.clear();
+ }
+
acknowledgmentsGroupingTracker.close();
if (batchReceiveTimeout != null) {
batchReceiveTimeout.cancel();
@@ -2344,6 +2367,107 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
pendingChunckedMessageCount--;
}
+ private CompletableFuture<Void> doAcknowledgeForResponse(MessageId
messageId, AckType ackType,
+ ValidationError
validationError,
+ Map<String, Long>
properties, TxnID txnID) {
+ CompletableFuture<Void> callBack = new CompletableFuture<>();
+ BitSetRecyclable bitSetRecyclable = null;
+ long ledgerId;
+ long entryId;
+ if (messageId instanceof BatchMessageIdImpl) {
+ BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
+ bitSetRecyclable = BitSetRecyclable.create();
+ ledgerId = batchMessageId.getLedgerId();
+ entryId = batchMessageId.getEntryId();
+ if (ackType == AckType.Cumulative) {
+ batchMessageId.ackCumulative();
+ bitSetRecyclable.set(0,
batchMessageId.getAcker().getBitSetSize());
+ bitSetRecyclable.clear(0, batchMessageId.getBatchIndex() + 1);
+ } else {
+ batchMessageId.ackIndividual();
+ bitSetRecyclable.set(0,
batchMessageId.getAcker().getBitSetSize());
+ bitSetRecyclable.clear(batchMessageId.getBatchIndex());
+ }
+ } else {
+ MessageIdImpl singleMessage = (MessageIdImpl) messageId;
+ ledgerId = singleMessage.getLedgerId();
+ entryId = singleMessage.getEntryId();
+ }
+ long requestId = client.newRequestId();
+ ByteBuf cmd = Commands.newAck(consumerId, ledgerId, entryId,
+ bitSetRecyclable, ackType,
+ validationError, properties, txnID.getLeastSigBits(),
txnID.getMostSigBits(), requestId);
+ OpForAckCallBack op = OpForAckCallBack.create(cmd, callBack, messageId,
+ new TxnID(txnID.getMostSigBits(), txnID.getLeastSigBits()));
+ ackRequests.put(requestId, op);
+ unAckedMessageTracker.remove(messageId);
+ cmd.retain();
+ cnx().ctx().writeAndFlush(cmd, cnx().ctx().voidPromise());
+ return callBack;
+ }
+
+ protected void ackReceipt(long requestId) {
+ OpForAckCallBack callBackOp = ackRequests.remove(requestId);
+ if (callBackOp == null || callBackOp.callback.isDone()) {
+ log.info("Ack request has been handled requestId : {}", requestId);
+ } else {
+ callBackOp.callback.complete(null);
+ if (log.isDebugEnabled()) {
+ log.debug("MessageId : {} has ack by TxnId : {}",
callBackOp.messageId.getLedgerId() + ":"
+ + callBackOp.messageId.getEntryId(),
callBackOp.txnID.toString());
+ }
+ }
+ }
+
+ protected void ackError(long requestId, PulsarClientException
pulsarClientException) {
+ OpForAckCallBack callBackOp = ackRequests.remove(requestId);
+ if (callBackOp == null || callBackOp.callback.isDone()) {
+ log.info("Ack request has been handled requestId : {}", requestId);
+ } else {
+ callBackOp.callback.completeExceptionally(pulsarClientException);
+ if (log.isDebugEnabled()) {
+ log.debug("MessageId : {} has ack by TxnId : {}",
callBackOp.messageId, callBackOp.txnID);
+ }
+ callBackOp.recycle();
+ }
+ }
+
+ private static class OpForAckCallBack {
+ protected ByteBuf cmd;
+ protected CompletableFuture<Void> callback;
+ protected MessageIdImpl messageId;
+ protected TxnID txnID;
+
+ static OpForAckCallBack create(ByteBuf cmd, CompletableFuture<Void>
callback,
+ MessageId messageId, TxnID txnID) {
+ OpForAckCallBack op = RECYCLER.get();
+ op.callback = callback;
+ op.cmd = cmd;
+ op.messageId = (MessageIdImpl) messageId;
+ op.txnID = txnID;
+ return op;
+ }
+
+ private OpForAckCallBack(Recycler.Handle<OpForAckCallBack>
recyclerHandle) {
+ this.recyclerHandle = recyclerHandle;
+ }
+
+ void recycle() {
+ if (cmd != null) {
+ ReferenceCountUtil.safeRelease(cmd);
+ }
+ recyclerHandle.recycle(this);
+ }
+
+ private final Recycler.Handle<OpForAckCallBack> recyclerHandle;
+ private static final Recycler<OpForAckCallBack> RECYCLER = new
Recycler<OpForAckCallBack>() {
+ @Override
+ protected OpForAckCallBack newObject(Handle<OpForAckCallBack>
handle) {
+ return new OpForAckCallBack(handle);
+ }
+ };
+ }
+
private static final Logger log =
LoggerFactory.getLogger(ConsumerImpl.class);
-}
+}
\ No newline at end of file
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index f6dfb84..62f9f58 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -313,7 +313,7 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
}
final ByteBuf cmd = Commands.newAck(consumer.consumerId,
msgId.ledgerId, msgId.entryId, bitSet, ackType,
- null, properties, txnidLeastBits, txnidMostBits);
+ null, properties, txnidLeastBits, txnidMostBits, -1);
bitSet.recycle();
cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
return true;
@@ -534,7 +534,7 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
this.consumer.unAckedChunckedMessageIdSequenceMap.remove(msgId);
} else {
ByteBuf cmd = Commands.newAck(consumerId, msgId.getLedgerId(),
msgId.getEntryId(), lastCumulativeAckSet,
- ackType, validationError, map, txnidLeastBits,
txnidMostBits);
+ ackType, validationError, map, txnidLeastBits,
txnidMostBits, -1);
if (flush) {
cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
} else {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index 7f5d65b..e1f6556 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -84,10 +84,10 @@ public class ConsumerConfigurationData<T> implements
Serializable, Cloneable {
private long tickDurationMillis = 1000;
private int priorityLevel = 0;
-
+
// max pending chunked message to avoid sitting incomplete message into
the queue and memory
private int maxPendingChuckedMessage = 10;
-
+
private boolean autoAckOldestChunkedMessageOnQueueFull = false;
private long expireTimeOfIncompleteChunkedMessageMillis = 60 * 1000;
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 65c4a93..9516c25 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -83,6 +83,7 @@ public final class PulsarApi {
TransactionCoordinatorNotFound(20, 20),
InvalidTxnStatus(21, 21),
NotAllowedError(22, 22),
+ TransactionConflict(23, 23),
;
public static final int UnknownError_VALUE = 0;
@@ -108,6 +109,7 @@ public final class PulsarApi {
public static final int TransactionCoordinatorNotFound_VALUE = 20;
public static final int InvalidTxnStatus_VALUE = 21;
public static final int NotAllowedError_VALUE = 22;
+ public static final int TransactionConflict_VALUE = 23;
public final int getNumber() { return value; }
@@ -137,6 +139,7 @@ public final class PulsarApi {
case 20: return TransactionCoordinatorNotFound;
case 21: return InvalidTxnStatus;
case 22: return NotAllowedError;
+ case 23: return TransactionConflict;
default: return null;
}
}
@@ -19071,6 +19074,10 @@ public final class PulsarApi {
// optional uint64 txnid_most_bits = 7 [default = 0];
boolean hasTxnidMostBits();
long getTxnidMostBits();
+
+ // optional uint64 request_id = 8;
+ boolean hasRequestId();
+ long getRequestId();
}
public static final class CommandAck extends
org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
@@ -19290,6 +19297,16 @@ public final class PulsarApi {
return txnidMostBits_;
}
+ // optional uint64 request_id = 8;
+ public static final int REQUEST_ID_FIELD_NUMBER = 8;
+ private long requestId_;
+ public boolean hasRequestId() {
+ return ((bitField0_ & 0x00000020) == 0x00000020);
+ }
+ public long getRequestId() {
+ return requestId_;
+ }
+
private void initFields() {
consumerId_ = 0L;
ackType_ =
org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType.Individual;
@@ -19298,6 +19315,7 @@ public final class PulsarApi {
properties_ = java.util.Collections.emptyList();
txnidLeastBits_ = 0L;
txnidMostBits_ = 0L;
+ requestId_ = 0L;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -19357,6 +19375,9 @@ public final class PulsarApi {
if (((bitField0_ & 0x00000010) == 0x00000010)) {
output.writeUInt64(7, txnidMostBits_);
}
+ if (((bitField0_ & 0x00000020) == 0x00000020)) {
+ output.writeUInt64(8, requestId_);
+ }
}
private int memoizedSerializedSize = -1;
@@ -19393,6 +19414,10 @@ public final class PulsarApi {
size +=
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
.computeUInt64Size(7, txnidMostBits_);
}
+ if (((bitField0_ & 0x00000020) == 0x00000020)) {
+ size +=
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+ .computeUInt64Size(8, requestId_);
+ }
memoizedSerializedSize = size;
return size;
}
@@ -19520,6 +19545,8 @@ public final class PulsarApi {
bitField0_ = (bitField0_ & ~0x00000020);
txnidMostBits_ = 0L;
bitField0_ = (bitField0_ & ~0x00000040);
+ requestId_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000080);
return this;
}
@@ -19583,6 +19610,10 @@ public final class PulsarApi {
to_bitField0_ |= 0x00000010;
}
result.txnidMostBits_ = txnidMostBits_;
+ if (((from_bitField0_ & 0x00000080) == 0x00000080)) {
+ to_bitField0_ |= 0x00000020;
+ }
+ result.requestId_ = requestId_;
result.bitField0_ = to_bitField0_;
return result;
}
@@ -19624,6 +19655,9 @@ public final class PulsarApi {
if (other.hasTxnidMostBits()) {
setTxnidMostBits(other.getTxnidMostBits());
}
+ if (other.hasRequestId()) {
+ setRequestId(other.getRequestId());
+ }
return this;
}
@@ -19718,6 +19752,11 @@ public final class PulsarApi {
txnidMostBits_ = input.readUInt64();
break;
}
+ case 64: {
+ bitField0_ |= 0x00000080;
+ requestId_ = input.readUInt64();
+ break;
+ }
}
}
}
@@ -20013,6 +20052,27 @@ public final class PulsarApi {
return this;
}
+ // optional uint64 request_id = 8;
+ private long requestId_ ;
+ public boolean hasRequestId() {
+ return ((bitField0_ & 0x00000080) == 0x00000080);
+ }
+ public long getRequestId() {
+ return requestId_;
+ }
+ public Builder setRequestId(long value) {
+ bitField0_ |= 0x00000080;
+ requestId_ = value;
+
+ return this;
+ }
+ public Builder clearRequestId() {
+ bitField0_ = (bitField0_ & ~0x00000080);
+ requestId_ = 0L;
+
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:pulsar.proto.CommandAck)
}
@@ -20046,6 +20106,10 @@ public final class PulsarApi {
// optional string message = 5;
boolean hasMessage();
String getMessage();
+
+ // optional uint64 request_id = 6;
+ boolean hasRequestId();
+ long getRequestId();
}
public static final class CommandAckResponse extends
org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
@@ -20154,12 +20218,23 @@ public final class PulsarApi {
}
}
+ // optional uint64 request_id = 6;
+ public static final int REQUEST_ID_FIELD_NUMBER = 6;
+ private long requestId_;
+ public boolean hasRequestId() {
+ return ((bitField0_ & 0x00000020) == 0x00000020);
+ }
+ public long getRequestId() {
+ return requestId_;
+ }
+
private void initFields() {
consumerId_ = 0L;
txnidLeastBits_ = 0L;
txnidMostBits_ = 0L;
error_ =
org.apache.pulsar.common.api.proto.PulsarApi.ServerError.UnknownError;
message_ = "";
+ requestId_ = 0L;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -20197,6 +20272,9 @@ public final class PulsarApi {
if (((bitField0_ & 0x00000010) == 0x00000010)) {
output.writeBytes(5, getMessageBytes());
}
+ if (((bitField0_ & 0x00000020) == 0x00000020)) {
+ output.writeUInt64(6, requestId_);
+ }
}
private int memoizedSerializedSize = -1;
@@ -20225,6 +20303,10 @@ public final class PulsarApi {
size +=
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
.computeBytesSize(5, getMessageBytes());
}
+ if (((bitField0_ & 0x00000020) == 0x00000020)) {
+ size +=
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+ .computeUInt64Size(6, requestId_);
+ }
memoizedSerializedSize = size;
return size;
}
@@ -20348,6 +20430,8 @@ public final class PulsarApi {
bitField0_ = (bitField0_ & ~0x00000008);
message_ = "";
bitField0_ = (bitField0_ & ~0x00000010);
+ requestId_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000020);
return this;
}
@@ -20401,6 +20485,10 @@ public final class PulsarApi {
to_bitField0_ |= 0x00000010;
}
result.message_ = message_;
+ if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
+ to_bitField0_ |= 0x00000020;
+ }
+ result.requestId_ = requestId_;
result.bitField0_ = to_bitField0_;
return result;
}
@@ -20422,6 +20510,9 @@ public final class PulsarApi {
if (other.hasMessage()) {
setMessage(other.getMessage());
}
+ if (other.hasRequestId()) {
+ setRequestId(other.getRequestId());
+ }
return this;
}
@@ -20484,6 +20575,11 @@ public final class PulsarApi {
message_ = input.readBytes();
break;
}
+ case 48: {
+ bitField0_ |= 0x00000020;
+ requestId_ = input.readUInt64();
+ break;
+ }
}
}
}
@@ -20613,6 +20709,27 @@ public final class PulsarApi {
}
+ // optional uint64 request_id = 6;
+ private long requestId_ ;
+ public boolean hasRequestId() {
+ return ((bitField0_ & 0x00000020) == 0x00000020);
+ }
+ public long getRequestId() {
+ return requestId_;
+ }
+ public Builder setRequestId(long value) {
+ bitField0_ |= 0x00000020;
+ requestId_ = value;
+
+ return this;
+ }
+ public Builder clearRequestId() {
+ bitField0_ = (bitField0_ & ~0x00000020);
+ requestId_ = 0L;
+
+ return this;
+ }
+
//
@@protoc_insertion_point(builder_scope:pulsar.proto.CommandAckResponse)
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index f9b2ca1..5b01e40 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -1140,12 +1140,12 @@ public class Commands {
public static ByteBuf newAck(long consumerId, long ledgerId, long entryId,
BitSetRecyclable ackSet, AckType ackType,
ValidationError validationError, Map<String,
Long> properties) {
- return newAck(consumerId, ledgerId, entryId, ackSet, ackType,
validationError, properties, -1, -1);
+ return newAck(consumerId, ledgerId, entryId, ackSet, ackType,
validationError, properties, -1, -1, -1);
}
public static ByteBuf newAck(long consumerId, long ledgerId, long entryId,
BitSetRecyclable ackSet, AckType ackType,
ValidationError validationError, Map<String,
Long> properties, long txnIdLeastBits,
- long txnIdMostBits) {
+ long txnIdMostBits, long requestId) {
CommandAck.Builder ackBuilder = CommandAck.newBuilder();
ackBuilder.setConsumerId(consumerId);
ackBuilder.setAckType(ackType);
@@ -1154,6 +1154,7 @@ public class Commands {
messageIdDataBuilder.setEntryId(entryId);
if (ackSet != null) {
messageIdDataBuilder.addAllAckSet(SafeCollectionUtils.longArrayToList(ackSet.toLongArray()));
+ ackSet.recycle();
}
MessageIdData messageIdData = messageIdDataBuilder.build();
ackBuilder.addMessageId(messageIdData);
@@ -1166,6 +1167,10 @@ public class Commands {
if (txnIdLeastBits >= 0) {
ackBuilder.setTxnidLeastBits(txnIdLeastBits);
}
+
+ if (requestId >= 0) {
+ ackBuilder.setRequestId(requestId);
+ }
for (Map.Entry<String, Long> e : properties.entrySet()) {
ackBuilder.addProperties(
KeyLongValue.newBuilder().setKey(e.getKey()).setValue(e.getValue()).build());
@@ -1180,34 +1185,25 @@ public class Commands {
return res;
}
- public static ByteBuf newAckResponse(long consumerId, long txnIdLeastBits,
long txnIdMostBits) {
+ public static ByteBuf newAckResponse(long requestId, ServerError error,
String errorMsg, long consumerId) {
CommandAckResponse.Builder commandAckResponseBuilder =
CommandAckResponse.newBuilder();
commandAckResponseBuilder.setConsumerId(consumerId);
- commandAckResponseBuilder.setTxnidLeastBits(txnIdLeastBits);
- commandAckResponseBuilder.setTxnidMostBits(txnIdMostBits);
- CommandAckResponse commandAckResponse =
commandAckResponseBuilder.build();
-
- ByteBuf res = serializeWithSize(
-
BaseCommand.newBuilder().setType(Type.ACK_RESPONSE).setAckResponse(commandAckResponse));
- commandAckResponseBuilder.recycle();
- commandAckResponse.recycle();
+ commandAckResponseBuilder.setRequestId(requestId);
- return res;
- }
+ if (error != null) {
+ commandAckResponseBuilder.setError(error);
+ }
- public static ByteBuf newAckErrorResponse(ServerError error, String
errorMsg, long consumerId) {
- CommandAckResponse.Builder ackErrorBuilder =
CommandAckResponse.newBuilder();
- ackErrorBuilder.setConsumerId(consumerId);
- ackErrorBuilder.setError(error);
if (errorMsg != null) {
- ackErrorBuilder.setMessage(errorMsg);
+ commandAckResponseBuilder.setMessage(errorMsg);
}
- CommandAckResponse response = ackErrorBuilder.build();
- ByteBuf res =
serializeWithSize(BaseCommand.newBuilder().setType(Type.ACK_RESPONSE).setAckResponse(response));
+ CommandAckResponse commandAckResponse =
commandAckResponseBuilder.build();
+ ByteBuf res = serializeWithSize(BaseCommand.newBuilder()
+
.setType(Type.ACK_RESPONSE).setAckResponse(commandAckResponseBuilder));
- ackErrorBuilder.recycle();
- response.recycle();
+ commandAckResponseBuilder.recycle();
+ commandAckResponse.recycle();
return res;
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
index 06462b7..123d2dc 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
@@ -27,6 +27,7 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAckResponse;
import
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAddPartitionToTxn;
import
org.apache.pulsar.common.api.proto.PulsarApi.CommandAddPartitionToTxnResponse;
@@ -148,6 +149,12 @@ public abstract class PulsarDecoder extends
ChannelInboundHandlerAdapter {
cmd.getAck().recycle();
break;
+ case ACK_RESPONSE:
+ checkArgument(cmd.hasAckResponse());
+ handleAckResponse(cmd.getAckResponse());
+ cmd.getAckResponse().recycle();
+ break;
+
case CLOSE_CONSUMER:
checkArgument(cmd.hasCloseConsumer());
safeInterceptCommand(cmd);
@@ -555,6 +562,10 @@ public abstract class PulsarDecoder extends
ChannelInboundHandlerAdapter {
throw new UnsupportedOperationException();
}
+ protected void handleAckResponse(CommandAckResponse ackResponse) {
+ throw new UnsupportedOperationException();
+ }
+
protected void handleFlow(CommandFlow flow) {
throw new UnsupportedOperationException();
}
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto
b/pulsar-common/src/main/proto/PulsarApi.proto
index 9c4b913..e740b11 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -201,6 +201,8 @@ enum ServerError {
TransactionCoordinatorNotFound = 20; // Transaction coordinator not found
error
InvalidTxnStatus = 21; // Invalid txn status error
NotAllowedError = 22; // Not allowed error
+
+ TransactionConflict = 23; // Ack with transaction conflict
}
enum AuthMethod {
@@ -508,6 +510,7 @@ message CommandAck {
optional uint64 txnid_least_bits = 6 [default = 0];
optional uint64 txnid_most_bits = 7 [default = 0];
+ optional uint64 request_id = 8;
}
message CommandAckResponse {
@@ -516,6 +519,7 @@ message CommandAckResponse {
optional uint64 txnid_most_bits = 3 [default = 0];
optional ServerError error = 4;
optional string message = 5;
+ optional uint64 request_id = 6;
}
// changes on active consumer