This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 93dac25 Ack response implementation (#8996)
93dac25 is described below
commit 93dac25ee0f1577d56b1f7abb969bc98b760da02
Author: congbo <[email protected]>
AuthorDate: Thu Jan 21 09:22:50 2021 +0800
Ack response implementation (#8996)
## Motivation
in order to handle ack response implementation. When this PR commit, I will
handle #8997.
## implement
1. we implement a new PersistentAcknowledgmentsWithResponseGroupingTracker.
2. we will add two ackRequests struct for async and sync flush.
3. add a timer to handle timeout and the timeout task don't need to lock,
because the timeout is sequential.
### Verifying this change
Add the tests for it
---
.../pulsar/broker/service/ServerCnxTest.java | 2 +-
.../pulsar/client/api/ConsumerAckListTest.java | 22 +-
.../pulsar/client/api/ConsumerCleanupTest.java | 12 +-
.../pulsar/client/api/ConsumerRedeliveryTest.java | 21 +-
.../client/api/SimpleProducerConsumerTest.java | 71 ++-
.../impl/BatchMessageIndexAckDisableTest.java | 16 +-
.../client/impl/BatchMessageIndexAckTest.java | 16 +-
.../pulsar/client/impl/MessageChunkingTest.java | 26 +-
.../apache/pulsar/client/api/ConsumerBuilder.java | 8 +
.../impl/AcknowledgmentsGroupingTracker.java | 12 +-
.../pulsar/client/impl/BatchMessageAcker.java | 2 +-
.../org/apache/pulsar/client/impl/ClientCnx.java | 70 ++-
.../pulsar/client/impl/ConsumerBuilderImpl.java | 6 +
.../apache/pulsar/client/impl/ConsumerImpl.java | 230 +-------
...NonPersistentAcknowledgmentGroupingTracker.java | 16 +-
.../PersistentAcknowledgmentsGroupingTracker.java | 655 +++++++++++++--------
.../impl/conf/ConsumerConfigurationData.java | 2 +
.../impl/AcknowledgementsGroupingTrackerTest.java | 107 +++-
.../impl/ClientCnxRequestTimeoutQueueTest.java | 13 +
.../apache/pulsar/common/protocol/Commands.java | 14 +-
pulsar-common/src/main/proto/PulsarApi.proto | 1 +
21 files changed, 724 insertions(+), 598 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index d2cb692..7ff0c14 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -1256,7 +1256,7 @@ public class ServerCnxTest {
PositionImpl pos = new PositionImpl(0, 0);
clientCommand = Commands.newAck(1 /* consumer id */,
pos.getLedgerId(), pos.getEntryId(), null, AckType.Individual,
- null, Collections.emptyMap());
+ null, Collections.emptyMap(), -1);
channel.writeInbound(clientCommand);
// verify nothing is sent out on the wire after ack
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerAckListTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerAckListTest.java
index d67f5b4..5238796 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerAckListTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerAckListTest.java
@@ -22,6 +22,7 @@ import lombok.Cleanup;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.util.ArrayList;
@@ -46,15 +47,20 @@ public class ConsumerAckListTest extends
ProducerConsumerBase {
super.internalCleanup();
}
- @Test(timeOut = 30000)
- public void testBatchListAck() throws Exception {
- ackListMessage(true,true);
- ackListMessage(true,false);
- ackListMessage(false,false);
- ackListMessage(false,true);
+ @DataProvider(name = "ackReceiptEnabled")
+ public Object[][] ackReceiptEnabled() {
+ return new Object[][] { { true }, { false } };
}
- public void ackListMessage(boolean isBatch, boolean isPartitioned) throws
Exception {
+ @Test(timeOut = 30000, dataProvider = "ackReceiptEnabled")
+ public void testBatchListAck(boolean ackReceiptEnabled) throws Exception {
+ ackListMessage(true,true, ackReceiptEnabled);
+ ackListMessage(true,false, ackReceiptEnabled);
+ ackListMessage(false,false, ackReceiptEnabled);
+ ackListMessage(false,true, ackReceiptEnabled);
+ }
+
+ public void ackListMessage(boolean isBatch, boolean isPartitioned, boolean
ackReceiptEnabled) throws Exception {
final String topic = "persistent://my-property/my-ns/batch-ack-" +
UUID.randomUUID();
final String subName = "testBatchAck-sub" + UUID.randomUUID();
final int messageNum = ThreadLocalRandom.current().nextInt(50, 100);
@@ -72,6 +78,8 @@ public class ConsumerAckListTest extends ProducerConsumerBase
{
.topic(topic)
.negativeAckRedeliveryDelay(1001, TimeUnit.MILLISECONDS)
.subscriptionName(subName)
+ .enableBatchIndexAcknowledgment(ackReceiptEnabled)
+ .isAckReceiptEnabled(ackReceiptEnabled)
.subscribe();
sendMessagesAsyncAndWait(producer, messageNum);
List<MessageId> messages = new ArrayList<>();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCleanupTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCleanupTest.java
index 927af02..abe9354 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCleanupTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCleanupTest.java
@@ -24,6 +24,7 @@ import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.util.UUID;
@@ -43,12 +44,19 @@ public class ConsumerCleanupTest extends
ProducerConsumerBase {
super.internalCleanup();
}
- @Test
- public void testAllTimerTaskShouldCanceledAfterConsumerClosed() throws
PulsarClientException, InterruptedException {
+ @DataProvider(name = "ackReceiptEnabled")
+ public Object[][] ackReceiptEnabled() {
+ return new Object[][] { { true }, { false } };
+ }
+
+ @Test(dataProvider = "ackReceiptEnabled")
+ public void testAllTimerTaskShouldCanceledAfterConsumerClosed(boolean
ackReceiptEnabled)
+ throws PulsarClientException, InterruptedException {
PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 1);
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://public/default/" +
UUID.randomUUID().toString())
.subscriptionName("test")
+ .isAckReceiptEnabled(ackReceiptEnabled)
.subscribe();
consumer.close();
Thread.sleep(2000);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
index e828598..37337a8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
@@ -29,8 +29,10 @@ import lombok.Cleanup;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.util.FutureUtil;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import com.google.common.collect.Sets;
@@ -54,6 +56,11 @@ public class ConsumerRedeliveryTest extends
ProducerConsumerBase {
super.internalCleanup();
}
+ @DataProvider(name = "ackReceiptEnabled")
+ public Object[][] ackReceiptEnabled() {
+ return new Object[][] { { true }, { false } };
+ }
+
/**
* It verifies that redelivered messages are sorted based on the
ledger-ids.
* <pre>
@@ -64,8 +71,8 @@ public class ConsumerRedeliveryTest extends
ProducerConsumerBase {
* </pre>
* @throws Exception
*/
- @Test
- public void testOrderedRedelivery() throws Exception {
+ @Test(dataProvider = "ackReceiptEnabled")
+ public void testOrderedRedelivery(boolean ackReceiptEnabled) throws
Exception {
String topic = "persistent://my-property/my-ns/redelivery-" +
System.currentTimeMillis();
conf.setManagedLedgerMaxEntriesPerLedger(2);
@@ -77,7 +84,8 @@ public class ConsumerRedeliveryTest extends
ProducerConsumerBase {
.producerName("my-producer-name")
.create();
ConsumerBuilder<byte[]> consumerBuilder =
pulsarClient.newConsumer().topic(topic).subscriptionName("s1")
- .subscriptionType(SubscriptionType.Shared);
+ .subscriptionType(SubscriptionType.Shared)
+ .isAckReceiptEnabled(ackReceiptEnabled);
ConsumerImpl<byte[]> consumer1 = (ConsumerImpl<byte[]>)
consumerBuilder.subscribe();
final int totalMsgs = 100;
@@ -130,12 +138,14 @@ public class ConsumerRedeliveryTest extends
ProducerConsumerBase {
}
}
- @Test
- public void testUnAckMessageRedeliveryWithReceiveAsync() throws
PulsarClientException, ExecutionException, InterruptedException {
+ @Test(dataProvider = "ackReceiptEnabled")
+ public void testUnAckMessageRedeliveryWithReceiveAsync(boolean
ackReceiptEnabled) throws PulsarClientException, ExecutionException,
InterruptedException {
String topic = "persistent://my-property/my-ns/async-unack-redelivery";
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("s1")
+ .isAckReceiptEnabled(ackReceiptEnabled)
+ .enableBatchIndexAcknowledgment(ackReceiptEnabled)
.ackTimeout(3, TimeUnit.SECONDS)
.subscribe();
@@ -165,7 +175,6 @@ public class ConsumerRedeliveryTest extends
ProducerConsumerBase {
}
assertEquals(10, messageReceived);
-
for (int i = 0; i < messages; i++) {
Message<String> message = consumer.receive();
assertNotNull(message);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 9fc14c0..da8bcf8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -129,6 +129,11 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
};
}
+ @DataProvider(name = "ackReceiptEnabled")
+ public Object[][] ackReceiptEnabled() {
+ return new Object[][] { { true }, { false } };
+ }
+
@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
@@ -269,6 +274,11 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
}
}
+ @DataProvider(name = "batchAndAckReceipt")
+ public Object[][] codecProviderWithAckReceipt() {
+ return new Object[][] { { 0, true}, { 1000, false }, { 0, true }, {
1000, false }};
+ }
+
@DataProvider(name = "batch")
public Object[][] codecProvider() {
return new Object[][] { { 0 }, { 1000 } };
@@ -311,10 +321,11 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
log.info("-- Exiting {} test --", methodName);
}
- @Test(dataProvider = "batch")
- public void testAsyncProducerAndAsyncAck(int batchMessageDelayMs) throws
Exception {
+ @Test(dataProvider = "batchAndAckReceipt")
+ public void testAsyncProducerAndAsyncAck(int batchMessageDelayMs, boolean
ackReceiptEnabled) throws Exception {
log.info("-- Starting {} test --", methodName);
Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic2")
+ .isAckReceiptEnabled(ackReceiptEnabled)
.subscriptionName("my-subscriber-name").subscribe();
ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
@@ -1033,8 +1044,8 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
log.info("-- Exiting {} test --", methodName);
}
- @Test
- public void testDeactivatingBacklogConsumer() throws Exception {
+ @Test(dataProvider = "ackReceiptEnabled")
+ public void testDeactivatingBacklogConsumer(boolean ackReceiptEnabled)
throws Exception {
log.info("-- Starting {} test --", methodName);
final long batchMessageDelayMs = 100;
@@ -1047,10 +1058,12 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
// 1. Subscriber Faster subscriber: let it consume all messages
immediately
Consumer<byte[]> subscriber1 = pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/" +
topicName).subscriptionName(sub1)
+ .isAckReceiptEnabled(ackReceiptEnabled)
.subscriptionType(SubscriptionType.Shared).receiverQueueSize(receiverSize).subscribe();
// 1.b. Subscriber Slow subscriber:
Consumer<byte[]> subscriber2 = pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/" +
topicName).subscriptionName(sub2)
+ .isAckReceiptEnabled(ackReceiptEnabled)
.subscriptionType(SubscriptionType.Shared).receiverQueueSize(receiverSize).subscribe();
ProducerBuilder<byte[]> producerBuilder =
pulsarClient.newProducer().topic(topic);
@@ -1077,7 +1090,7 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
// 3. Consume messages: at Faster subscriber
for (int i = 0; i < totalMsgs; i++) {
msg = subscriber1.receive(100, TimeUnit.MILLISECONDS);
- subscriber1.acknowledge(msg);
+ subscriber1.acknowledgeAsync(msg);
}
// wait : so message can be eligible to to be evict from cache
@@ -1096,7 +1109,7 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
// 6. consume messages : at slower subscriber
for (int i = 0; i < totalMsgs; i++) {
msg = subscriber2.receive(100, TimeUnit.MILLISECONDS);
- subscriber2.acknowledge(msg);
+ subscriber2.acknowledgeAsync(msg);
}
topicRef.checkBackloggedCursors();
@@ -1258,13 +1271,14 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
*
* @throws Exception
*/
- @Test(timeOut = 30000)
- public void testSharedConsumerAckDifferentConsumer() throws Exception {
+ @Test(dataProvider = "ackReceiptEnabled", timeOut = 30000)
+ public void testSharedConsumerAckDifferentConsumer(boolean
ackReceiptEnabled) throws Exception {
log.info("-- Starting {} test --", methodName);
ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/my-topic1").subscriptionName("my-subscriber-name")
.receiverQueueSize(1).subscriptionType(SubscriptionType.Shared)
+ .isAckReceiptEnabled(ackReceiptEnabled)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS);
Consumer<byte[]> consumer1 = consumerBuilder.subscribe();
Consumer<byte[]> consumer2 = consumerBuilder.subscribe();
@@ -1355,8 +1369,8 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
*
* @throws Exception
*/
- @Test
- public void testConsumerBlockingWithUnAckedMessages() throws Exception {
+ @Test(dataProvider = "ackReceiptEnabled")
+ public void testConsumerBlockingWithUnAckedMessages(boolean
ackReceiptEnabled) throws Exception {
log.info("-- Starting {} test --", methodName);
int unAckedMessages =
pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
@@ -1368,6 +1382,7 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessagesBufferSize);
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
+ .isAckReceiptEnabled(ackReceiptEnabled)
.receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared).subscribe();
Producer<byte[]> producer = pulsarClient.newProducer()
@@ -1395,13 +1410,7 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
assertEquals(messages.size(), unAckedMessagesBufferSize);
// start acknowledging messages
- messages.forEach(m -> {
- try {
- consumer.acknowledge(m);
- } catch (PulsarClientException e) {
- fail("ack failed", e);
- }
- });
+ messages.forEach(consumer::acknowledgeAsync);
// try to consume remaining messages
int remainingMessages = totalProducedMsgs - messages.size();
@@ -1434,8 +1443,8 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
*
* @throws Exception
*/
- @Test
- public void testConsumerBlockingWithUnAckedMessagesMultipleIteration()
throws Exception {
+ @Test(dataProvider = "ackReceiptEnabled")
+ public void
testConsumerBlockingWithUnAckedMessagesMultipleIteration(boolean
ackReceiptEnabled) throws Exception {
log.info("-- Starting {} test --", methodName);
int unAckedMessages =
pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
@@ -1450,6 +1459,7 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
.receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared)
+ .isAckReceiptEnabled(ackReceiptEnabled)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
Producer<byte[]> producer = pulsarClient.newProducer()
@@ -1508,8 +1518,8 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
*
* @throws Exception
*/
- @Test
- public void testMutlipleSharedConsumerBlockingWithUnAckedMessages() throws
Exception {
+ @Test(dataProvider = "ackReceiptEnabled")
+ public void testMutlipleSharedConsumerBlockingWithUnAckedMessages(boolean
ackReceiptEnabled) throws Exception {
log.info("-- Starting {} test --", methodName);
int unAckedMessages =
pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
@@ -1522,11 +1532,13 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessages);
Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
+ .isAckReceiptEnabled(ackReceiptEnabled)
.receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared).subscribe();
PulsarClient newPulsarClient =
newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
Consumer<byte[]> consumer2 = newPulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
+ .isAckReceiptEnabled(ackReceiptEnabled)
.receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared).subscribe();
Producer<byte[]> producer = pulsarClient.newProducer()
@@ -1663,8 +1675,8 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
}
}
- @Test
- public void testUnackBlockRedeliverMessages() throws Exception {
+ @Test(dataProvider = "ackReceiptEnabled")
+ public void testUnackBlockRedeliverMessages(boolean ackReceiptEnabled)
throws Exception {
log.info("-- Starting {} test --", methodName);
int unAckedMessages =
pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
@@ -1677,6 +1689,7 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessagesBufferSize);
ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>)
pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
+ .isAckReceiptEnabled(ackReceiptEnabled)
.receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared).subscribe();
Producer<byte[]> producer = pulsarClient.newProducer()
@@ -1730,8 +1743,8 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
}
}
- @Test(dataProvider = "batch")
- public void testUnackedBlockAtBatch(int batchMessageDelayMs) throws
Exception {
+ @Test(dataProvider = "batchAndAckReceipt")
+ public void testUnackedBlockAtBatch(int batchMessageDelayMs, boolean
ackReceiptEnabled) throws Exception {
log.info("-- Starting {} test --", methodName);
int unAckedMessages =
pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
@@ -1744,6 +1757,7 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessages);
Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
+ .isAckReceiptEnabled(ackReceiptEnabled)
.receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared).subscribe();
ProducerBuilder<byte[]> producerBuidler =
pulsarClient.newProducer()
@@ -1800,7 +1814,7 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
if (msg != null) {
messages.add(msg);
totalReceiveMessages++;
- consumer1.acknowledge(msg);
+ consumer1.acknowledgeAsync(msg);
log.info("Received message: " + new String(msg.getData()));
} else {
break;
@@ -2326,8 +2340,8 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
log.info("-- Exiting {} test --", methodName);
}
- @Test
- public void testRedeliveryFailOverConsumer() throws Exception {
+ @Test(dataProvider = "ackReceiptEnabled")
+ public void testRedeliveryFailOverConsumer(boolean ackReceiptEnabled)
throws Exception {
log.info("-- Starting {} test --", methodName);
final int receiverQueueSize = 10;
@@ -2336,6 +2350,7 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>)
pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
.receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Failover)
+ .isAckReceiptEnabled(ackReceiptEnabled)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
Producer<byte[]> producer =
pulsarClient.newProducer().topic("persistent://my-property/my-ns/unacked-topic")
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckDisableTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckDisableTest.java
index 474a586..46f11fb 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckDisableTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckDisableTest.java
@@ -32,6 +32,7 @@ import org.apache.pulsar.common.util.FutureUtil;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.util.ArrayList;
@@ -57,8 +58,13 @@ public class BatchMessageIndexAckDisableTest extends
ProducerConsumerBase {
super.internalCleanup();
}
- @Test
- public void testBatchMessageIndexAckForSharedSubscription() throws
PulsarClientException, ExecutionException, InterruptedException {
+ @DataProvider(name = "ackReceiptEnabled")
+ public Object[][] ackReceiptEnabled() {
+ return new Object[][] { { true }, { false } };
+ }
+
+ @Test(dataProvider = "ackReceiptEnabled")
+ public void testBatchMessageIndexAckForSharedSubscription(boolean
ackReceiptEnabled) throws PulsarClientException, ExecutionException,
InterruptedException {
final String topic = "testBatchMessageIndexAckForSharedSubscription";
@Cleanup
@@ -67,6 +73,7 @@ public class BatchMessageIndexAckDisableTest extends
ProducerConsumerBase {
.subscriptionName("sub")
.receiverQueueSize(100)
.subscriptionType(SubscriptionType.Shared)
+ .isAckReceiptEnabled(ackReceiptEnabled)
.ackTimeout(1, TimeUnit.SECONDS)
.subscribe();
@@ -97,8 +104,8 @@ public class BatchMessageIndexAckDisableTest extends
ProducerConsumerBase {
Assert.assertEquals(received.size(), 100);
}
- @Test
- public void testBatchMessageIndexAckForExclusiveSubscription() throws
PulsarClientException, ExecutionException, InterruptedException {
+ @Test(dataProvider = "ackReceiptEnabled")
+ public void testBatchMessageIndexAckForExclusiveSubscription(boolean
ackReceiptEnabled) throws PulsarClientException, ExecutionException,
InterruptedException {
final String topic =
"testBatchMessageIndexAckForExclusiveSubscription";
@Cleanup
@@ -106,6 +113,7 @@ public class BatchMessageIndexAckDisableTest extends
ProducerConsumerBase {
.topic(topic)
.subscriptionName("sub")
.receiverQueueSize(100)
+ .isAckReceiptEnabled(ackReceiptEnabled)
.subscribe();
@Cleanup
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
index 6ff7d23..db9decb 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
@@ -33,6 +33,7 @@ import org.apache.pulsar.common.util.FutureUtil;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.util.ArrayList;
@@ -58,8 +59,13 @@ public class BatchMessageIndexAckTest extends
ProducerConsumerBase {
super.internalCleanup();
}
- @Test
- public void testBatchMessageIndexAckForSharedSubscription() throws
Exception {
+ @DataProvider(name = "ackReceiptEnabled")
+ public Object[][] ackReceiptEnabled() {
+ return new Object[][] { { true }, { false } };
+ }
+
+ @Test(dataProvider = "ackReceiptEnabled")
+ public void testBatchMessageIndexAckForSharedSubscription(boolean
ackReceiptEnabled) throws Exception {
final String topic = "testBatchMessageIndexAckForSharedSubscription";
final String subscriptionName = "sub";
@@ -68,6 +74,7 @@ public class BatchMessageIndexAckTest extends
ProducerConsumerBase {
.topic(topic)
.subscriptionName(subscriptionName)
.receiverQueueSize(100)
+ .isAckReceiptEnabled(ackReceiptEnabled)
.subscriptionType(SubscriptionType.Shared)
.enableBatchIndexAcknowledgment(true)
.negativeAckRedeliveryDelay(2, TimeUnit.SECONDS)
@@ -138,8 +145,8 @@ public class BatchMessageIndexAckTest extends
ProducerConsumerBase {
Assert.assertEquals(received.size(), 100);
}
- @Test
- public void testBatchMessageIndexAckForExclusiveSubscription() throws
PulsarClientException, ExecutionException, InterruptedException {
+ @Test(dataProvider = "ackReceiptEnabled")
+ public void testBatchMessageIndexAckForExclusiveSubscription(boolean
ackReceiptEnabled) throws PulsarClientException, ExecutionException,
InterruptedException {
final String topic =
"testBatchMessageIndexAckForExclusiveSubscription";
@Cleanup
@@ -147,6 +154,7 @@ public class BatchMessageIndexAckTest extends
ProducerConsumerBase {
.topic(topic)
.subscriptionName("sub")
.receiverQueueSize(100)
+ .isAckReceiptEnabled(ackReceiptEnabled)
.enableBatchIndexAcknowledgment(true)
.subscribe();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
index f0294df..1d3af92 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
@@ -56,6 +56,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import com.google.common.collect.Lists;
@@ -80,9 +81,26 @@ public class MessageChunkingTest extends
ProducerConsumerBase {
super.internalCleanup();
}
+ @DataProvider(name = "ackReceiptEnabled")
+ public Object[][] ackReceiptEnabled() {
+ return new Object[][] { { true }, { false } };
+ }
@Test
- public void testLargeMessage() throws Exception {
+ public void testInvalidConfig() throws Exception {
+ final String topicName = "persistent://my-property/my-ns/my-topic1";
+ ProducerBuilder<byte[]> producerBuilder =
pulsarClient.newProducer().topic(topicName);
+ // batching and chunking can't be enabled together
+ try {
+ Producer<byte[]> producer =
producerBuilder.enableChunking(true).enableBatching(true).create();
+ fail("producer creation should have fail");
+ } catch (IllegalArgumentException ie) {
+ // Ok
+ }
+ }
+
+ @Test(dataProvider = "ackReceiptEnabled")
+ public void testLargeMessage(boolean ackReceiptEnabled) throws Exception {
log.info("-- Starting {} test --", methodName);
this.conf.setMaxMessageSize(5);
@@ -90,6 +108,7 @@ public class MessageChunkingTest extends
ProducerConsumerBase {
final String topicName = "persistent://my-property/my-ns/my-topic1";
Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name")
+ .isAckReceiptEnabled(ackReceiptEnabled)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
ProducerBuilder<byte[]> producerBuilder =
pulsarClient.newProducer().topic(topicName);
@@ -145,8 +164,8 @@ public class MessageChunkingTest extends
ProducerConsumerBase {
}
- @Test
- public void testLargeMessageAckTimeOut() throws Exception {
+ @Test(dataProvider = "ackReceiptEnabled")
+ public void testLargeMessageAckTimeOut(boolean ackReceiptEnabled) throws
Exception {
log.info("-- Starting {} test --", methodName);
this.conf.setMaxMessageSize(5);
@@ -155,6 +174,7 @@ public class MessageChunkingTest extends
ProducerConsumerBase {
ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>)
pulsarClient.newConsumer().topic(topicName)
.subscriptionName("my-subscriber-name").acknowledgmentGroupTime(0,
TimeUnit.SECONDS)
+ .isAckReceiptEnabled(ackReceiptEnabled)
.ackTimeout(5, TimeUnit.SECONDS).subscribe();
ProducerBuilder<byte[]> producerBuilder =
pulsarClient.newProducer().topic(topicName);
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index c6a6555..246394a 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -187,6 +187,14 @@ public interface ConsumerBuilder<T> extends Cloneable {
ConsumerBuilder<T> ackTimeout(long ackTimeout, TimeUnit timeUnit);
/**
+ * Ack will return receipt but does not mean that the message will not be
resent after get receipt.
+ *
+ * @param isAckReceiptEnabled {@link Boolean} is enable ack for receipt
+ * @return the consumer builder instance
+ */
+ ConsumerBuilder<T> isAckReceiptEnabled(boolean isAckReceiptEnabled);
+
+ /**
* Define the granularity of the ack-timeout redelivery.
*
* <p>By default, the tick time is set to 1 second. Using an higher tick
time will
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java
index 47df3df..319e327 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java
@@ -20,8 +20,10 @@ package org.apache.pulsar.client.impl;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.impl.transaction.TransactionImpl;
+
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
/**
@@ -31,12 +33,10 @@ public interface AcknowledgmentsGroupingTracker extends
AutoCloseable {
boolean isDuplicate(MessageId messageId);
- void addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map<String,
Long> properties, TransactionImpl txn);
-
- void addListAcknowledgment(List<MessageIdImpl> messageIds, AckType
ackType, Map<String, Long> properties);
+ CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId, AckType
ackType, Map<String, Long> properties);
- void addBatchIndexAcknowledgment(BatchMessageIdImpl msgId, int batchIndex,
int batchSize, AckType ackType,
- Map<String, Long> properties,
TransactionImpl txn);
+ CompletableFuture<Void> addListAcknowledgment(List<MessageId> messageIds,
AckType ackType,
+ Map<String, Long>
properties);
void flush();
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
index a16dcb4..8b178f2 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
@@ -42,7 +42,7 @@ class BatchMessageAcker {
// bitset shared across messages in the same batch.
private final int batchSize;
private final BitSet bitSet;
- private boolean prevBatchCumulativelyAcked = false;
+ private volatile boolean prevBatchCumulativelyAcked = false;
BatchMessageAcker(BitSet bitSet, int batchSize) {
this.bitSet = bitSet;
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 8a0a7f5..4e6ea0b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -170,7 +170,8 @@ public class ClientCnx extends PulsarHandler {
GetLastMessageId,
GetTopics,
GetSchema,
- GetOrCreateSchema;
+ GetOrCreateSchema,
+ AckResponse;
String getDescription() {
if (this == Command) {
@@ -399,12 +400,17 @@ public class ClientCnx extends PulsarHandler {
protected void handleAckResponse(CommandAckResponse ackResponse) {
checkArgument(state == State.Ready);
checkArgument(ackResponse.getRequestId() >= 0);
- long consumerId = ackResponse.getConsumerId();
- if (!ackResponse.hasError()) {
- consumers.get(consumerId).ackReceipt(ackResponse.getRequestId());
+ CompletableFuture<?> completableFuture =
pendingRequests.remove(ackResponse.getRequestId());
+ if (completableFuture != null && !completableFuture.isDone()) {
+ if (!ackResponse.hasError()) {
+ completableFuture.complete(null);
+ } else {
+ completableFuture.completeExceptionally(
+ getPulsarClientException(ackResponse.getError(),
ackResponse.getMessage()));
+ }
} else {
- consumers.get(consumerId).ackError(ackResponse.getRequestId(),
- getPulsarClientException(ackResponse.getError(),
ackResponse.getMessage()));
+ log.warn("AckResponse has complete when receive response!
requestId : {}, consumerId : {}",
+ ackResponse.getRequestId(), ackResponse.hasConsumerId());
}
}
@@ -741,7 +747,16 @@ public class ClientCnx extends PulsarHandler {
}
public CompletableFuture<List<String>> newGetTopicsOfNamespace(ByteBuf
request, long requestId) {
- return sendRequestAndHandleTimeout(request, requestId,
RequestType.GetTopics);
+ return sendRequestAndHandleTimeout(request, requestId,
RequestType.GetTopics, true);
+ }
+
+ public CompletableFuture<Void> newAckForReceipt(ByteBuf request, long
requestId) {
+ return sendRequestAndHandleTimeout(request, requestId,
RequestType.AckResponse,true);
+ }
+
+ public void newAckForReceiptWithFuture(ByteBuf request, long requestId,
+ TimedCompletableFuture<Void>
future) {
+ sendRequestAndHandleTimeout(request, requestId,
RequestType.AckResponse, false, future);
}
@Override
@@ -816,25 +831,39 @@ public class ClientCnx extends PulsarHandler {
}
CompletableFuture<ProducerResponse> sendRequestWithId(ByteBuf cmd, long
requestId) {
- return sendRequestAndHandleTimeout(cmd, requestId,
RequestType.Command);
+ return sendRequestAndHandleTimeout(cmd, requestId,
RequestType.Command, true);
}
- private <T> CompletableFuture<T> sendRequestAndHandleTimeout(ByteBuf
requestMessage, long requestId, RequestType requestType) {
- TimedCompletableFuture<T> future = new TimedCompletableFuture<>();
+ private <T> void sendRequestAndHandleTimeout(ByteBuf requestMessage, long
requestId,
+ RequestType
requestType, boolean flush,
+
TimedCompletableFuture<T> future) {
pendingRequests.put(requestId, future);
- ctx.writeAndFlush(requestMessage).addListener(writeFuture -> {
- if (!writeFuture.isSuccess()) {
- log.warn("{} Failed to send {} to broker: {}", ctx.channel(),
requestType.getDescription(), writeFuture.cause().getMessage());
- pendingRequests.remove(requestId);
- future.completeExceptionally(writeFuture.cause());
- }
- });
+ if (flush) {
+ ctx.writeAndFlush(requestMessage).addListener(writeFuture -> {
+ if (!writeFuture.isSuccess()) {
+ CompletableFuture<?> newFuture =
pendingRequests.remove(requestId);
+ if (!newFuture.isDone()) {
+ log.warn("{} Failed to send {} to broker: {}",
ctx.channel(),
+ requestType.getDescription(),
writeFuture.cause().getMessage());
+ future.completeExceptionally(writeFuture.cause());
+ }
+ }
+ });
+ } else {
+ ctx.write(requestMessage, ctx().voidPromise());
+ }
requestTimeoutQueue.add(new RequestTime(System.currentTimeMillis(),
requestId, requestType));
+ }
+
+ private <T> CompletableFuture<T> sendRequestAndHandleTimeout(ByteBuf
requestMessage, long requestId,
+ RequestType requestType,
boolean flush) {
+ TimedCompletableFuture<T> future = new TimedCompletableFuture<>();
+ sendRequestAndHandleTimeout(requestMessage, requestId, requestType,
flush, future);
return future;
}
public CompletableFuture<MessageIdData> sendGetLastMessageId(ByteBuf
request, long requestId) {
- return sendRequestAndHandleTimeout(request, requestId,
RequestType.GetLastMessageId);
+ return sendRequestAndHandleTimeout(request, requestId,
RequestType.GetLastMessageId, true);
}
public CompletableFuture<Optional<SchemaInfo>> sendGetSchema(ByteBuf
request, long requestId) {
@@ -856,11 +885,12 @@ public class ClientCnx extends PulsarHandler {
}
public CompletableFuture<CommandGetSchemaResponse>
sendGetRawSchema(ByteBuf request, long requestId) {
- return sendRequestAndHandleTimeout(request, requestId,
RequestType.GetSchema);
+ return sendRequestAndHandleTimeout(request, requestId,
RequestType.GetSchema, true);
}
public CompletableFuture<byte[]> sendGetOrCreateSchema(ByteBuf request,
long requestId) {
- CompletableFuture<CommandGetOrCreateSchemaResponse> future =
sendRequestAndHandleTimeout(request, requestId, RequestType.GetOrCreateSchema);
+ CompletableFuture<CommandGetOrCreateSchemaResponse> future =
sendRequestAndHandleTimeout(request, requestId,
+ RequestType.GetOrCreateSchema, true);
return future.thenCompose(response -> {
if (response.hasErrorCode()) {
// Request has failed
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 8429598..1e987e1 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -194,6 +194,12 @@ public class ConsumerBuilderImpl<T> implements
ConsumerBuilder<T> {
}
@Override
+ public ConsumerBuilder<T> isAckReceiptEnabled(boolean isAckReceiptEnabled)
{
+ conf.setAckReceiptEnabled(isAckReceiptEnabled);
+ return this;
+ }
+
+ @Override
public ConsumerBuilder<T> ackTimeoutTickTime(long tickTime, TimeUnit
timeUnit) {
checkArgument(timeUnit.toMillis(tickTime) >= MIN_TICK_TIME_MILLIS,
"Ack timeout tick time should be greater than " +
MIN_TICK_TIME_MILLIS + " ms");
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 21f3f13..d8d0ec4 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -60,7 +60,6 @@ import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
-import org.apache.pulsar.client.api.ConsumerStats;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageCrypto;
@@ -68,7 +67,6 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
-import
org.apache.pulsar.client.api.PulsarClientException.MessageAcknowledgeException;
import
org.apache.pulsar.client.api.PulsarClientException.TopicDoesNotExistException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
@@ -101,7 +99,6 @@ import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.SafeCollectionUtils;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
-import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.slf4j.Logger;
@@ -186,8 +183,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
private final boolean createTopicIfDoesNotExist;
- private final ConcurrentLongHashMap<OpForAckCallBack> ackRequests;
-
private final AtomicReference<ClientCnx>
clientCnxUsedForConsumerRegistration = new AtomicReference<>();
static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
@@ -346,7 +341,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
topicNameWithoutPartition = topicName.getPartitionedTopicName();
- this.ackRequests = new ConcurrentLongHashMap<>(16, 1);
grabCnx();
}
@@ -505,43 +499,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
return result;
}
- boolean markAckForBatchMessage(BatchMessageIdImpl batchMessageId, AckType
ackType,
- Map<String,Long> properties,
TransactionImpl txn) {
- boolean isAllMsgsAcked;
- if (ackType == AckType.Individual) {
- isAllMsgsAcked = txn == null && batchMessageId.ackIndividual();
- } else {
- isAllMsgsAcked = batchMessageId.ackCumulative();
- }
- int outstandingAcks = 0;
- if (log.isDebugEnabled()) {
- outstandingAcks = batchMessageId.getOutstandingAcksInSameBatch();
- }
-
- int batchSize = batchMessageId.getBatchSize();
- // all messages in this batch have been acked
- if (isAllMsgsAcked) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] [{}] can ack message to broker {}, acktype {},
cardinality {}, length {}", subscription,
- consumerName, batchMessageId, ackType,
outstandingAcks, batchSize);
- }
- return true;
- } else {
- if (AckType.Cumulative == ackType
- && !batchMessageId.getAcker().isPrevBatchCumulativelyAcked()) {
- sendAcknowledge(batchMessageId.prevBatchMessageId(),
AckType.Cumulative, properties, null);
- batchMessageId.getAcker().setPrevBatchCumulativelyAcked(true);
- } else {
- onAcknowledge(batchMessageId, null);
- }
- if (log.isDebugEnabled()) {
- log.debug("[{}] [{}] cannot ack message to broker {}, acktype
{}, pending acks - {}", subscription,
- consumerName, batchMessageId, ackType,
outstandingAcks);
- }
- }
- return false;
- }
-
@Override
protected CompletableFuture<Void> doAcknowledge(MessageId messageId,
AckType ackType,
Map<String,Long>
properties,
@@ -562,67 +519,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
return doTransactionAcknowledgeForResponse(messageId, ackType,
null, properties,
new TxnID(txn.getTxnIdMostBits(),
txn.getTxnIdLeastBits()));
}
-
- if (messageId instanceof BatchMessageIdImpl) {
- BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
- if (markAckForBatchMessage(batchMessageId, ackType, properties,
txn)) {
- // all messages in batch have been acked so broker can be
acked via sendAcknowledge()
- if (log.isDebugEnabled()) {
- log.debug("[{}] [{}] acknowledging message - {}, acktype
{}", subscription, consumerName, messageId,
- ackType);
- }
- } else {
- if (conf.isBatchIndexAckEnabled()) {
-
acknowledgmentsGroupingTracker.addBatchIndexAcknowledgment(batchMessageId,
- batchMessageId.getBatchIndex(),
batchMessageId.getBatchSize(),
- ackType, properties, txn);
- }
- // other messages in batch are still pending ack.
- return CompletableFuture.completedFuture(null);
- }
- }
- return sendAcknowledge(messageId, ackType, properties, txn);
+ return
acknowledgmentsGroupingTracker.addAcknowledgment((MessageIdImpl) messageId,
ackType, properties);
}
@Override
protected CompletableFuture<Void> doAcknowledge(List<MessageId>
messageIdList, AckType ackType, Map<String, Long> properties, TransactionImpl
txn) {
- if (AckType.Cumulative.equals(ackType)) {
- List<CompletableFuture<Void>> completableFutures = new
ArrayList<>();
- messageIdList.forEach(messageId ->
completableFutures.add(doAcknowledge(messageId, ackType, properties, txn)));
- return CompletableFuture.allOf(completableFutures.toArray(new
CompletableFuture[0]));
- }
- if (getState() != State.Ready && getState() != State.Connecting) {
- stats.incrementNumAcksFailed();
- PulsarClientException exception = new
PulsarClientException("Consumer not ready. State: " + getState());
- messageIdList.forEach(messageId -> onAcknowledge(messageId,
exception));
- return FutureUtil.failedFuture(exception);
- }
- List<MessageIdImpl> nonBatchMessageIds = new ArrayList<>();
- for (MessageId messageId : messageIdList) {
- MessageIdImpl messageIdImpl;
- if (messageId instanceof BatchMessageIdImpl
- && !markAckForBatchMessage((BatchMessageIdImpl) messageId,
ackType, properties, txn)) {
- BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl)
messageId;
- messageIdImpl = new
MessageIdImpl(batchMessageId.getLedgerId(), batchMessageId.getEntryId()
- , batchMessageId.getPartitionIndex());
-
acknowledgmentsGroupingTracker.addBatchIndexAcknowledgment(batchMessageId,
batchMessageId.getBatchIndex(),
- batchMessageId.getBatchSize(), ackType, properties,
txn);
- stats.incrementNumAcksSent(batchMessageId.getBatchSize());
- } else {
- messageIdImpl = (MessageIdImpl) messageId;
- stats.incrementNumAcksSent(1);
- nonBatchMessageIds.add(messageIdImpl);
- }
- unAckedMessageTracker.remove(messageIdImpl);
- if (possibleSendToDeadLetterTopicMessages != null) {
- possibleSendToDeadLetterTopicMessages.remove(messageIdImpl);
- }
- onAcknowledge(messageId, null);
- }
- if (nonBatchMessageIds.size() > 0) {
-
acknowledgmentsGroupingTracker.addListAcknowledgment(nonBatchMessageIds,
ackType, properties);
- }
- return CompletableFuture.completedFuture(null);
+ return
this.acknowledgmentsGroupingTracker.addListAcknowledgment(messageIdList,
ackType, properties);
}
@@ -749,44 +651,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
return CompletableFuture.completedFuture(null);
}
- // TODO: handle transactional acknowledgements.
- private CompletableFuture<Void> sendAcknowledge(MessageId messageId,
AckType ackType,
- Map<String,Long>
properties,
- TransactionImpl txnImpl) {
- MessageIdImpl msgId = (MessageIdImpl) messageId;
-
- if (ackType == AckType.Individual) {
- if (messageId instanceof BatchMessageIdImpl) {
- BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl)
messageId;
-
- stats.incrementNumAcksSent(batchMessageId.getBatchSize());
- unAckedMessageTracker.remove(new
MessageIdImpl(batchMessageId.getLedgerId(),
- batchMessageId.getEntryId(),
batchMessageId.getPartitionIndex()));
- if (possibleSendToDeadLetterTopicMessages != null) {
- possibleSendToDeadLetterTopicMessages.remove(new
MessageIdImpl(batchMessageId.getLedgerId(),
- batchMessageId.getEntryId(),
batchMessageId.getPartitionIndex()));
- }
- } else {
- // increment counter by 1 for non-batch msg
- unAckedMessageTracker.remove(msgId);
- if (possibleSendToDeadLetterTopicMessages != null) {
- possibleSendToDeadLetterTopicMessages.remove(msgId);
- }
- stats.incrementNumAcksSent(1);
- }
- onAcknowledge(messageId, null);
- } else if (ackType == AckType.Cumulative) {
- onAcknowledgeCumulative(messageId, null);
-
stats.incrementNumAcksSent(unAckedMessageTracker.removeMessagesTill(msgId));
- }
-
- acknowledgmentsGroupingTracker.addAcknowledgment(msgId, ackType,
properties, txnImpl);
-
- // Consumer acknowledgment operation immediately succeeds. In any
case, if we're not able to send ack to broker,
- // the messages will be re-delivered
- return CompletableFuture.completedFuture(null);
- }
-
@Override
public void negativeAcknowledge(MessageId messageId) {
negativeAcksTracker.add(messageId);
@@ -1053,17 +917,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
if (possibleSendToDeadLetterTopicMessages != null) {
possibleSendToDeadLetterTopicMessages.clear();
}
-
- if (!ackRequests.isEmpty()) {
- ackRequests.forEach((key, value) -> {
- value.callback
- .completeExceptionally(new
MessageAcknowledgeException("Consumer has closed!"));
- value.recycle();
- });
-
- ackRequests.clear();
- }
-
acknowledgmentsGroupingTracker.close();
if (batchReceiveTimeout != null) {
batchReceiveTimeout.cancel();
@@ -1679,7 +1532,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
private void discardMessage(MessageIdData messageId, ClientCnx currentCnx,
ValidationError validationError) {
ByteBuf cmd = Commands.newAck(consumerId, messageId.getLedgerId(),
messageId.getEntryId(), null, AckType.Individual,
- validationError, Collections.emptyMap());
+ validationError, Collections.emptyMap(),
-1);
currentCnx.ctx().writeAndFlush(cmd, currentCnx.ctx().voidPromise());
increaseAvailablePermits(currentCnx);
stats.incrementNumReceiveFailed();
@@ -2216,7 +2069,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
@Override
- public ConsumerStats getStats() {
+ public ConsumerStatsRecorder getStats() {
return stats;
}
@@ -2269,6 +2122,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
if (clientCnx != null) {
this.connectionHandler.setClientCnx(clientCnx);
clientCnx.registerConsumer(consumerId, this);
+ if (conf.isAckReceiptEnabled() &&
+
Commands.peerSupportsAckReceipt(clientCnx.getRemoteEndpointProtocolVersion())) {
+ log.warn("Server don't support ack for receipt! " +
+ "ProtoVersion >=17 support! nowVersion : {}",
clientCnx.getRemoteEndpointProtocolVersion());
+ }
}
ClientCnx previousClientCnx =
clientCnxUsedForConsumerRegistration.getAndSet(clientCnx);
if (previousClientCnx != null && previousClientCnx != clientCnx) {
@@ -2388,7 +2246,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
private CompletableFuture<Void>
doTransactionAcknowledgeForResponse(MessageId messageId, AckType ackType,
ValidationError validationError,
Map<String, Long> properties, TxnID txnID) {
- CompletableFuture<Void> callBack = new CompletableFuture<>();
BitSetRecyclable bitSetRecyclable = null;
long ledgerId;
long entryId;
@@ -2418,79 +2275,16 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
validationError, properties, txnID.getLeastSigBits(),
txnID.getMostSigBits(), requestId);
}
- OpForAckCallBack op = OpForAckCallBack.create(cmd, callBack, messageId,
- new TxnID(txnID.getMostSigBits(), txnID.getLeastSigBits()));
- ackRequests.put(requestId, op);
if (ackType == AckType.Cumulative) {
unAckedMessageTracker.removeMessagesTill(messageId);
} else {
unAckedMessageTracker.remove(messageId);
}
- cmd.retain();
- cnx().ctx().writeAndFlush(cmd, cnx().ctx().voidPromise());
- return callBack;
+ return cnx().newAckForReceipt(cmd, requestId);
}
- protected void ackReceipt(long requestId) {
- OpForAckCallBack callBackOp = ackRequests.remove(requestId);
- if (callBackOp == null || callBackOp.callback.isDone()) {
- log.info("Ack request has been handled requestId : {}", requestId);
- } else {
- callBackOp.callback.complete(null);
- if (log.isDebugEnabled()) {
- log.debug("MessageId : {} has ack by TxnId : {}",
callBackOp.messageId.getLedgerId() + ":"
- + callBackOp.messageId.getEntryId(),
callBackOp.txnID.toString());
- }
- }
- }
-
- protected void ackError(long requestId, PulsarClientException
pulsarClientException) {
- OpForAckCallBack callBackOp = ackRequests.remove(requestId);
- if (callBackOp == null || callBackOp.callback.isDone()) {
- log.info("Ack request has been handled requestId : {}", requestId);
- } else {
- callBackOp.callback.completeExceptionally(pulsarClientException);
- if (log.isDebugEnabled()) {
- log.debug("MessageId : {} has ack by TxnId : {}",
callBackOp.messageId, callBackOp.txnID);
- }
- callBackOp.recycle();
- }
- }
-
- private static class OpForAckCallBack {
- protected ByteBuf cmd;
- protected CompletableFuture<Void> callback;
- protected MessageIdImpl messageId;
- protected TxnID txnID;
-
- static OpForAckCallBack create(ByteBuf cmd, CompletableFuture<Void>
callback,
- MessageId messageId, TxnID txnID) {
- OpForAckCallBack op = RECYCLER.get();
- op.callback = callback;
- op.cmd = cmd;
- op.messageId = (MessageIdImpl) messageId;
- op.txnID = txnID;
- return op;
- }
-
- private OpForAckCallBack(Recycler.Handle<OpForAckCallBack>
recyclerHandle) {
- this.recyclerHandle = recyclerHandle;
- }
-
- void recycle() {
- if (cmd != null) {
- ReferenceCountUtil.safeRelease(cmd);
- }
- recyclerHandle.recycle(this);
- }
-
- private final Recycler.Handle<OpForAckCallBack> recyclerHandle;
- private static final Recycler<OpForAckCallBack> RECYCLER = new
Recycler<OpForAckCallBack>() {
- @Override
- protected OpForAckCallBack newObject(Handle<OpForAckCallBack>
handle) {
- return new OpForAckCallBack(handle);
- }
- };
+ public Map<MessageIdImpl, List<MessageImpl<T>>>
getPossibleSendToDeadLetterTopicMessages() {
+ return possibleSendToDeadLetterTopicMessages;
}
private static final Logger log =
LoggerFactory.getLogger(ConsumerImpl.class);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java
index 6f56f26..36cd99b 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.client.impl;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
@@ -42,20 +44,16 @@ public class NonPersistentAcknowledgmentGroupingTracker
implements Acknowledgmen
return false;
}
- public void addAcknowledgment(MessageIdImpl msgId, AckType ackType,
Map<String,
- Long> properties, TransactionImpl txnImpl) {
- // no-op
- }
-
- @Override
- public void addListAcknowledgment(List<MessageIdImpl> messageIds, AckType
ackType, Map<String, Long> properties) {
+ public CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId,
AckType ackType, Map<String,
+ Long> properties) {
// no-op
+ return CompletableFuture.completedFuture(null);
}
@Override
- public void addBatchIndexAcknowledgment(BatchMessageIdImpl msgId, int
batchIndex, int batchSize,
- AckType ackType, Map<String, Long>
properties, TransactionImpl transaction) {
+ public CompletableFuture<Void> addListAcknowledgment(List<MessageId>
messageIds, AckType ackType, Map<String, Long> properties) {
// no-op
+ return CompletableFuture.completedFuture(null);
}
@Override
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index bfa025b..ae3eb97 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -22,24 +22,30 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.EventLoopGroup;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import io.netty.util.Recycler;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
-import org.apache.pulsar.client.impl.transaction.TransactionImpl;
-import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.client.util.TimedCompletableFuture;
+import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
-import org.apache.pulsar.common.api.proto.CommandAck.ValidationError;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable;
@@ -58,18 +64,19 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
private final long acknowledgementGroupTimeMicros;
- /**
- * Latest cumulative ack sent to broker
- */
- private volatile MessageIdImpl lastCumulativeAck = (MessageIdImpl)
MessageId.earliest;
- private volatile BitSetRecyclable lastCumulativeAckSet = null;
+ private volatile TimedCompletableFuture<Void> currentIndividualAckFuture;
+ private volatile TimedCompletableFuture<Void> currentCumulativeAckFuture;
+
+ private volatile LastCumulativeAck lastCumulativeAck =
+ LastCumulativeAck.create((MessageIdImpl) MessageIdImpl.earliest,
null);
+
private volatile boolean cumulativeAckFlushRequired = false;
- private static final
AtomicReferenceFieldUpdater<PersistentAcknowledgmentsGroupingTracker,
MessageIdImpl> LAST_CUMULATIVE_ACK_UPDATER = AtomicReferenceFieldUpdater
- .newUpdater(PersistentAcknowledgmentsGroupingTracker.class,
MessageIdImpl.class, "lastCumulativeAck");
- private static final
AtomicReferenceFieldUpdater<PersistentAcknowledgmentsGroupingTracker,
BitSetRecyclable> LAST_CUMULATIVE_ACK_SET_UPDATER = AtomicReferenceFieldUpdater
- .newUpdater(PersistentAcknowledgmentsGroupingTracker.class,
BitSetRecyclable.class, "lastCumulativeAckSet");
+ // When we flush the command, we should ensure current ack request will
send correct
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ private static final
AtomicReferenceFieldUpdater<PersistentAcknowledgmentsGroupingTracker,
LastCumulativeAck> LAST_CUMULATIVE_ACK_UPDATER = AtomicReferenceFieldUpdater
+ .newUpdater(PersistentAcknowledgmentsGroupingTracker.class,
LastCumulativeAck.class, "lastCumulativeAck");
/**
* This is a set of all the individual acks that the application has
issued and that were not already sent to
@@ -78,10 +85,9 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
private final ConcurrentSkipListSet<MessageIdImpl> pendingIndividualAcks;
private final ConcurrentHashMap<MessageIdImpl, ConcurrentBitSetRecyclable>
pendingIndividualBatchIndexAcks;
- private final ConcurrentHashMap<TransactionImpl,
ConcurrentHashMap<MessageIdImpl, ConcurrentBitSetRecyclable>>
pendingIndividualTransactionBatchIndexAcks;
- private final ConcurrentSkipListSet<Triple<Long, Long, MessageIdImpl>>
pendingIndividualTransactionAcks;
-
private final ScheduledFuture<?> scheduledTask;
+ private final boolean batchIndexAckEnabled;
+ private final boolean ackReceiptEnabled;
public PersistentAcknowledgmentsGroupingTracker(ConsumerImpl<?> consumer,
ConsumerConfigurationData<?> conf,
EventLoopGroup
eventLoopGroup) {
@@ -89,8 +95,10 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
this.pendingIndividualAcks = new ConcurrentSkipListSet<>();
this.pendingIndividualBatchIndexAcks = new ConcurrentHashMap<>();
this.acknowledgementGroupTimeMicros =
conf.getAcknowledgementsGroupTimeMicros();
- this.pendingIndividualTransactionBatchIndexAcks = new
ConcurrentHashMap<>();
- this.pendingIndividualTransactionAcks = new ConcurrentSkipListSet<>();
+ this.batchIndexAckEnabled = conf.isBatchIndexAckEnabled();
+ this.ackReceiptEnabled = conf.isAckReceiptEnabled();
+ this.currentIndividualAckFuture = new TimedCompletableFuture<>();
+ this.currentCumulativeAckFuture = new TimedCompletableFuture<>();
if (acknowledgementGroupTimeMicros > 0) {
scheduledTask =
eventLoopGroup.next().scheduleWithFixedDelay(this::flush,
acknowledgementGroupTimeMicros,
@@ -106,7 +114,7 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
*/
@Override
public boolean isDuplicate(MessageId messageId) {
- if (messageId.compareTo(lastCumulativeAck) <= 0) {
+ if (messageId.compareTo(lastCumulativeAck.messageId) <= 0) {
// Already included in a cumulative ack
return true;
} else {
@@ -115,186 +123,302 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
}
@Override
- public void addListAcknowledgment(List<MessageIdImpl> messageIds, AckType
ackType, Map<String, Long> properties) {
- if (ackType == AckType.Cumulative) {
- messageIds.forEach(messageId -> doCumulativeAck(messageId, null));
- return;
+ public CompletableFuture<Void> addListAcknowledgment(List<MessageId>
messageIds,
+ AckType ackType,
Map<String, Long> properties) {
+ if (AckType.Cumulative.equals(ackType)) {
+ if (isAckReceiptEnabled(consumer.getClientCnx())) {
+ Set<CompletableFuture<Void>> completableFutureSet = new
HashSet<>();
+ messageIds.forEach(messageId ->
+
completableFutureSet.add(addAcknowledgment((MessageIdImpl) messageId, ackType,
properties)));
+ return FutureUtil.waitForAll(new
ArrayList<>(completableFutureSet));
+ } else {
+ messageIds.forEach(messageId ->
addAcknowledgment((MessageIdImpl) messageId, ackType, properties));
+ return CompletableFuture.completedFuture(null);
+ }
+ } else {
+ if (isAckReceiptEnabled(consumer.getClientCnx())) {
+ try {
+ // when flush the ack, we should bind the this ack in the
currentFuture, during this time we can't
+ // change currentFuture. but we can lock by the read lock,
because the currentFuture is not change
+ // any ack operation is allowed.
+ this.lock.readLock().lock();
+ addListAcknowledgment(messageIds);
+ return this.currentIndividualAckFuture;
+ } finally {
+ this.lock.readLock().unlock();
+ if (acknowledgementGroupTimeMicros == 0 ||
pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
+ flush();
+ }
+ }
+ } else {
+ addListAcknowledgment(messageIds);
+ if (acknowledgementGroupTimeMicros == 0 ||
pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
+ flush();
+ }
+ return CompletableFuture.completedFuture(null);
+ }
}
- messageIds.forEach(messageId -> {
+ }
+
+ private void addListAcknowledgment(List<MessageId> messageIds) {
+ for (MessageId messageId : messageIds) {
+ consumer.onAcknowledge(messageId, null);
if (messageId instanceof BatchMessageIdImpl) {
BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl)
messageId;
- pendingIndividualAcks.add(new
MessageIdImpl(batchMessageId.getLedgerId(),
- batchMessageId.getEntryId(),
batchMessageId.getPartitionIndex()));
+ if (!batchMessageId.ackIndividual()) {
+ doIndividualBatchAckAsync((BatchMessageIdImpl) messageId);
+ } else {
+ messageId =
modifyBatchMessageIdAndStatesInConsumer(batchMessageId);
+ doIndividualAckAsync((MessageIdImpl) messageId);
+ }
} else {
- pendingIndividualAcks.add(messageId);
+ modifyMessageIdStatesInConsumer((MessageIdImpl) messageId);
+ doIndividualAckAsync((MessageIdImpl) messageId);
}
- pendingIndividualBatchIndexAcks.remove(messageId);
- if (pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
- flush();
- }
- });
- if (acknowledgementGroupTimeMicros == 0) {
- flush();
}
}
@Override
- public void addAcknowledgment(MessageIdImpl msgId, AckType ackType,
Map<String, Long> properties,
- TransactionImpl txn) {
- if (acknowledgementGroupTimeMicros == 0 || !properties.isEmpty() ||
- (txn != null && ackType == AckType.Cumulative)) {
- if (msgId instanceof BatchMessageIdImpl && txn != null) {
- BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl)
msgId;
- doImmediateBatchIndexAck(batchMessageId,
batchMessageId.getBatchIndex(),
- batchMessageId.getBatchIndex(),
- ackType, properties, txn.getTxnIdMostBits(),
txn.getTxnIdLeastBits());
- return;
+ public CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId,
AckType ackType,
+ Map<String, Long>
properties) {
+ if (msgId instanceof BatchMessageIdImpl) {
+ BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) msgId;
+ if (ackType == AckType.Individual) {
+ consumer.onAcknowledge(msgId, null);
+ // ack this ack carry bitSet index and judge bit set are all
ack
+ if (batchMessageId.ackIndividual()) {
+ MessageIdImpl messageId =
modifyBatchMessageIdAndStatesInConsumer(batchMessageId);
+ return doIndividualAck(messageId, properties);
+ } else if (batchIndexAckEnabled){
+ return doIndividualBatchAck(batchMessageId, properties);
+ } else {
+ // if we prevent batchIndexAck, we can't send the ack
command to broker when the batch message are
+ // all ack complete
+ return CompletableFuture.completedFuture(null);
}
- // We cannot group acks if the delay is 0 or when there are
properties attached to it. Fortunately that's an
- // uncommon condition since it's only used for the compaction
subscription.
- doImmediateAck(msgId, ackType, properties, txn);
- } else if (ackType == AckType.Cumulative) {
- doCumulativeAck(msgId, null);
- } else {
- // Individual ack
- if (msgId instanceof BatchMessageIdImpl) {
- pendingIndividualAcks.add(new
MessageIdImpl(msgId.getLedgerId(),
- msgId.getEntryId(), msgId.getPartitionIndex()));
} else {
- if (txn != null) {
- pendingIndividualTransactionAcks
- .add(Triple.of(txn.getTxnIdMostBits(),
txn.getTxnIdLeastBits(), msgId));
+ consumer.onAcknowledgeCumulative(msgId, null);
+ if (batchMessageId.ackCumulative()) {
+ return doCumulativeAck(msgId, properties, null);
} else {
- pendingIndividualAcks.add(msgId);
+ if (batchIndexAckEnabled) {
+ return doCumulativeBatchIndexAck(batchMessageId,
properties);
+ } else {
+ // ack the pre messageId, because we prevent the
batchIndexAck, we can ensure pre messageId can
+ // ack
+ if (AckType.Cumulative == ackType
+ &&
!batchMessageId.getAcker().isPrevBatchCumulativelyAcked()) {
+
doCumulativeAck(batchMessageId.prevBatchMessageId(), properties, null);
+
batchMessageId.getAcker().setPrevBatchCumulativelyAcked(true);
+ }
+ return CompletableFuture.completedFuture(null);
+ }
}
}
- pendingIndividualBatchIndexAcks.remove(msgId);
- if (pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
- flush();
+ } else {
+ if (ackType == AckType.Individual) {
+ consumer.onAcknowledge(msgId, null);
+ modifyMessageIdStatesInConsumer(msgId);
+ return doIndividualAck(msgId, properties);
+ } else {
+ consumer.onAcknowledgeCumulative(msgId, null);
+ return doCumulativeAck(msgId, properties, null);
}
}
}
- public void addBatchIndexAcknowledgment(BatchMessageIdImpl msgId, int
batchIndex, int batchSize, AckType ackType,
- Map<String, Long> properties,
TransactionImpl txn) {
- if (acknowledgementGroupTimeMicros == 0 || !properties.isEmpty()) {
- doImmediateBatchIndexAck(msgId, batchIndex, batchSize, ackType,
properties,
- txn == null ? -1 : txn.getTxnIdMostBits(),
- txn == null ? -1 : txn.getTxnIdLeastBits());
- } else if (ackType == AckType.Cumulative) {
- BitSetRecyclable bitSet = BitSetRecyclable.create();
- bitSet.set(0, batchSize);
- bitSet.clear(0, batchIndex + 1);
- doCumulativeAck(msgId, bitSet);
- } else if (ackType == AckType.Individual) {
- ConcurrentBitSetRecyclable bitSet;
- if (txn != null) {
- synchronized (txn) {
- ConcurrentHashMap<MessageIdImpl,
ConcurrentBitSetRecyclable> transactionIndividualBatchIndexAcks =
- pendingIndividualTransactionBatchIndexAcks
- .computeIfAbsent(txn, (v) -> new
ConcurrentHashMap<>());
- bitSet =
transactionIndividualBatchIndexAcks.computeIfAbsent(msgId, (v) -> {
- ConcurrentBitSetRecyclable value;
- value = ConcurrentBitSetRecyclable.create();
- value.set(0, msgId.getAcker().getBatchSize());
- return value;
- });
- bitSet.clear(batchIndex);
+ private MessageIdImpl
modifyBatchMessageIdAndStatesInConsumer(BatchMessageIdImpl batchMessageId) {
+ MessageIdImpl messageId = new
MessageIdImpl(batchMessageId.getLedgerId(),
+ batchMessageId.getEntryId(),
batchMessageId.getPartitionIndex());
+
consumer.getStats().incrementNumAcksSent(batchMessageId.getBatchSize());
+ clearMessageIdFromUnAckTrackerAndDeadLetter(messageId);
+ return messageId;
+ }
+
+ private void modifyMessageIdStatesInConsumer(MessageIdImpl messageId) {
+ consumer.getStats().incrementNumAcksSent(1);
+ clearMessageIdFromUnAckTrackerAndDeadLetter(messageId);
+ }
+
+ private void clearMessageIdFromUnAckTrackerAndDeadLetter(MessageIdImpl
messageId) {
+ consumer.getUnAckedMessageTracker().remove(messageId);
+ if (consumer.getPossibleSendToDeadLetterTopicMessages() != null) {
+
consumer.getPossibleSendToDeadLetterTopicMessages().remove(messageId);
+ }
+ }
+
+ private CompletableFuture<Void> doIndividualAck(MessageIdImpl messageId,
Map<String, Long> properties) {
+ if (acknowledgementGroupTimeMicros == 0 || (properties != null &&
!properties.isEmpty())) {
+ // We cannot group acks if the delay is 0 or when there are
properties attached to it. Fortunately that's an
+ // uncommon condition since it's only used for the compaction
subscription.
+ return doImmediateAck(messageId, AckType.Individual, properties,
null);
+ } else {
+ if (isAckReceiptEnabled(consumer.getClientCnx())) {
+ // when flush the ack, we should bind the this ack in the
currentFuture, during this time we can't
+ // change currentFuture. but we can lock by the read lock,
because the currentFuture is not change
+ // any ack operation is allowed.
+ this.lock.readLock().lock();
+ try {
+ doIndividualAckAsync(messageId);
+ return this.currentIndividualAckFuture;
+ } finally {
+ this.lock.readLock().unlock();
+ if (pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
+ flush();
+ }
}
} else {
- bitSet = pendingIndividualBatchIndexAcks.computeIfAbsent(
- new MessageIdImpl(msgId.getLedgerId(), msgId.getEntryId(),
msgId.getPartitionIndex()), (v) -> {
- ConcurrentBitSetRecyclable value;
- if (msgId.getAcker() != null && !(msgId.getAcker()
instanceof BatchMessageAckerDisabled)) {
- value =
ConcurrentBitSetRecyclable.create(msgId.getAcker().getBitSet());
- } else {
- value = ConcurrentBitSetRecyclable.create();
- value.set(0, batchSize);
- }
- return value;
- });
- bitSet.clear(batchIndex);
+ doIndividualAckAsync(messageId);
+ if (pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
+ flush();
+ }
+ return CompletableFuture.completedFuture(null);
}
- if (pendingIndividualBatchIndexAcks.size() >= MAX_ACK_GROUP_SIZE) {
- flush();
+ }
+ }
+
+
+ private void doIndividualAckAsync(MessageIdImpl messageId) {
+ pendingIndividualAcks.add(messageId);
+ pendingIndividualBatchIndexAcks.remove(messageId);
+ }
+
+ private CompletableFuture<Void> doIndividualBatchAck(BatchMessageIdImpl
batchMessageId,
+ Map<String, Long>
properties) {
+ if (acknowledgementGroupTimeMicros == 0 || (properties != null &&
!properties.isEmpty())) {
+ return doImmediateBatchIndexAck(batchMessageId,
batchMessageId.getBatchIndex(),
+ batchMessageId.getBatchSize(), AckType.Individual,
properties);
+ } else {
+ return doIndividualBatchAck(batchMessageId);
+ }
+ }
+
+ private CompletableFuture<Void> doIndividualBatchAck(BatchMessageIdImpl
batchMessageId) {
+ if (isAckReceiptEnabled(consumer.getClientCnx())) {
+ // when flush the ack, we should bind the this ack in the
currentFuture, during this time we can't
+ // change currentFuture. but we can lock by the read lock, because
the currentFuture is not change
+ // any ack operation is allowed.
+ this.lock.readLock().lock();
+ try {
+ doIndividualBatchAckAsync(batchMessageId);
+ return this.currentIndividualAckFuture;
+ } finally {
+ this.lock.readLock().unlock();
}
+ } else {
+ doIndividualBatchAckAsync(batchMessageId);
+ return CompletableFuture.completedFuture(null);
}
}
- private void doCumulativeAck(MessageIdImpl msgId, BitSetRecyclable bitSet)
{
- // Handle concurrent updates from different threads
- while (true) {
- MessageIdImpl lastCumlativeAck = this.lastCumulativeAck;
- BitSetRecyclable lastBitSet = this.lastCumulativeAckSet;
- if (msgId.compareTo(lastCumlativeAck) > 0) {
- if (LAST_CUMULATIVE_ACK_UPDATER.compareAndSet(this,
lastCumlativeAck, msgId) && LAST_CUMULATIVE_ACK_SET_UPDATER.compareAndSet(this,
lastBitSet, bitSet)) {
- if (lastBitSet != null) {
- try {
- lastBitSet.recycle();
- } catch (Exception ignore) {
- // no-op
- }
+ private CompletableFuture<Void> doCumulativeAck(MessageIdImpl messageId,
Map<String, Long> properties,
+ BitSetRecyclable bitSet) {
+
consumer.getStats().incrementNumAcksSent(consumer.getUnAckedMessageTracker().removeMessagesTill(messageId));
+ if (acknowledgementGroupTimeMicros == 0 || (properties != null &&
!properties.isEmpty())) {
+ // We cannot group acks if the delay is 0 or when there are
properties attached to it. Fortunately that's an
+ // uncommon condition since it's only used for the compaction
subscription.
+ return doImmediateAck(messageId, AckType.Cumulative, properties,
bitSet);
+ } else {
+ if (isAckReceiptEnabled(consumer.getClientCnx())) {
+ // when flush the ack, we should bind the this ack in the
currentFuture, during this time we can't
+ // change currentFuture. but we can lock by the read lock,
because the currentFuture is not change
+ // any ack operation is allowed.
+ this.lock.readLock().lock();
+ try {
+ doCumulativeAckAsync(messageId, bitSet);
+ return this.currentCumulativeAckFuture;
+ } finally {
+ this.lock.readLock().unlock();
+ if (pendingIndividualBatchIndexAcks.size() >=
MAX_ACK_GROUP_SIZE) {
+ flush();
}
- // Successfully updated the last cumulative ack. Next
flush iteration will send this to broker.
- cumulativeAckFlushRequired = true;
- return;
}
} else {
- // message id acknowledging an before the current last
cumulative ack
- return;
+ doCumulativeAckAsync(messageId, bitSet);
+ if (pendingIndividualBatchIndexAcks.size() >=
MAX_ACK_GROUP_SIZE) {
+ flush();
+ }
+ return CompletableFuture.completedFuture(null);
}
}
}
- private void doTransactionCumulativeAck(MessageIdImpl msgId,
BitSetRecyclable bitSet) {
+ private void doIndividualBatchAckAsync(BatchMessageIdImpl batchMessageId) {
+ ConcurrentBitSetRecyclable bitSet =
pendingIndividualBatchIndexAcks.computeIfAbsent(
+ new MessageIdImpl(batchMessageId.getLedgerId(),
batchMessageId.getEntryId(),
+ batchMessageId.getPartitionIndex()), (v) -> {
+ ConcurrentBitSetRecyclable value;
+ if (batchMessageId.getAcker() != null &&
+ !(batchMessageId.getAcker() instanceof
BatchMessageAckerDisabled)) {
+ value =
ConcurrentBitSetRecyclable.create(batchMessageId.getAcker().getBitSet());
+ } else {
+ value = ConcurrentBitSetRecyclable.create();
+ value.set(0, batchMessageId.getBatchIndex());
+ }
+ return value;
+ });
+ bitSet.clear(batchMessageId.getBatchIndex());
+ }
+
+ private void doCumulativeAckAsync(MessageIdImpl msgId, BitSetRecyclable
bitSet) {
// Handle concurrent updates from different threads
+ LastCumulativeAck currentCumulativeAck =
LastCumulativeAck.create(msgId, bitSet);
while (true) {
- MessageIdImpl lastCumlativeAck = this.lastCumulativeAck;
- BitSetRecyclable lastBitSet = this.lastCumulativeAckSet;
- if (msgId.compareTo(lastCumlativeAck) > 0) {
- if (LAST_CUMULATIVE_ACK_UPDATER.compareAndSet(this,
lastCumlativeAck, msgId) && LAST_CUMULATIVE_ACK_SET_UPDATER.compareAndSet(this,
lastBitSet, bitSet)) {
- if (lastBitSet != null) {
+ LastCumulativeAck lastCumulativeAck = this.lastCumulativeAck;
+ if (msgId.compareTo(lastCumulativeAck.messageId) > 0) {
+ if (LAST_CUMULATIVE_ACK_UPDATER.compareAndSet(this,
this.lastCumulativeAck, currentCumulativeAck)) {
+ if (lastCumulativeAck.bitSetRecyclable != null) {
try {
- lastBitSet.recycle();
+ lastCumulativeAck.bitSetRecyclable.recycle();
} catch (Exception ignore) {
// no-op
}
+ lastCumulativeAck.bitSetRecyclable = null;
}
+ lastCumulativeAck.recycle();
// Successfully updated the last cumulative ack. Next
flush iteration will send this to broker.
cumulativeAckFlushRequired = true;
return;
}
} else {
+ currentCumulativeAck.recycle();
// message id acknowledging an before the current last
cumulative ack
return;
}
}
}
- private boolean doImmediateAck(MessageIdImpl msgId, AckType ackType,
Map<String, Long> properties,
- TransactionImpl transaction) {
+ private CompletableFuture<Void>
doCumulativeBatchIndexAck(BatchMessageIdImpl batchMessageId,
+ Map<String,
Long> properties) {
+ if (acknowledgementGroupTimeMicros == 0 || (properties != null &&
!properties.isEmpty())) {
+ return doImmediateBatchIndexAck(batchMessageId,
batchMessageId.getBatchIndex(),
+ batchMessageId.getBatchSize(), AckType.Cumulative,
properties);
+ } else {
+ BitSetRecyclable bitSet = BitSetRecyclable.create();
+ bitSet.set(0, batchMessageId.getBatchSize());
+ bitSet.clear(0, batchMessageId.getBatchIndex() + 1);
+ return doCumulativeAck(batchMessageId, null, bitSet);
+ }
+ }
+
+ private CompletableFuture<Void> doImmediateAck(MessageIdImpl msgId,
AckType ackType, Map<String, Long> properties,
+ BitSetRecyclable bitSet) {
ClientCnx cnx = consumer.getClientCnx();
if (cnx == null) {
- return false;
+ return FutureUtil.failedFuture(new PulsarClientException
+ .ConnectException("Consumer connect fail! consumer state:"
+ consumer.getState()));
}
- if (transaction != null) {
- newAckCommand(consumer.consumerId, msgId, null, ackType, null,
- properties, cnx, true /* flush */,
transaction.getTxnIdMostBits(),
- transaction.getTxnIdLeastBits());
- } else {
- newAckCommand(consumer.consumerId, msgId, null, ackType, null,
- properties, cnx, true /* flush */, -1, -1);
- }
- return true;
+ return newImmediateAckAndFlush(consumer.consumerId, msgId, bitSet,
ackType, properties, cnx);
}
- private boolean doImmediateBatchIndexAck(BatchMessageIdImpl msgId, int
batchIndex, int batchSize, AckType ackType,
- Map<String, Long> properties,
long txnidMostBits, long txnidLeastBits) {
+ private CompletableFuture<Void>
doImmediateBatchIndexAck(BatchMessageIdImpl msgId, int batchIndex, int
batchSize, AckType ackType,
+ Map<String, Long> properties) {
ClientCnx cnx = consumer.getClientCnx();
if (cnx == null) {
- return false;
+ return FutureUtil.failedFuture(new PulsarClientException
+ .ConnectException("Consumer connect fail! consumer state:"
+ consumer.getState()));
}
BitSetRecyclable bitSet;
if (msgId.getAcker() != null && !(msgId.getAcker() instanceof
BatchMessageAckerDisabled)) {
@@ -309,11 +433,10 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
bitSet.clear(batchIndex);
}
- final ByteBuf cmd = Commands.newAck(consumer.consumerId,
msgId.ledgerId, msgId.entryId, bitSet, ackType,
- null, properties, txnidLeastBits, txnidMostBits, -1);
+ CompletableFuture<Void> completableFuture =
newMessageAckCommandAndWrite(cnx, consumer.consumerId,
+ msgId.ledgerId, msgId.entryId, bitSet, ackType, null,
properties, true, null, null);
bitSet.recycle();
- cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
- return true;
+ return completableFuture;
}
/**
@@ -330,16 +453,32 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
return;
}
+ if (isAckReceiptEnabled(consumer.getClientCnx())) {
+ this.lock.writeLock().lock();
+ try {
+ flushAsync(cnx);
+ } finally {
+ this.lock.writeLock().unlock();
+ }
+ } else {
+ flushAsync(cnx);
+ }
+ }
+
+ private void flushAsync(ClientCnx cnx) {
boolean shouldFlush = false;
if (cumulativeAckFlushRequired) {
- newAckCommand(consumer.consumerId, lastCumulativeAck,
lastCumulativeAckSet, AckType.Cumulative, null, Collections.emptyMap(), cnx,
false /* flush */, -1, -1);
- shouldFlush=true;
+ newMessageAckCommandAndWrite(cnx, consumer.consumerId,
lastCumulativeAck.messageId.ledgerId,
+ lastCumulativeAck.messageId.getEntryId(),
lastCumulativeAck.bitSetRecyclable,
+ AckType.Cumulative, null, Collections.emptyMap(), false,
+ this.currentCumulativeAckFuture, null);
+
this.consumer.unAckedChunkedMessageIdSequenceMap.remove(lastCumulativeAck.messageId);
+ shouldFlush = true;
cumulativeAckFlushRequired = false;
}
// Flush all individual acks
List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck =
new ArrayList<>(pendingIndividualAcks.size() +
pendingIndividualBatchIndexAcks.size());
- HashMap<TransactionImpl, List<Triple<Long, Long,
ConcurrentBitSetRecyclable>>> transactionEntriesToAck = new HashMap<>();
if (!pendingIndividualAcks.isEmpty()) {
if
(Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion()))
{
// We can send 1 single protobuf command with all individual
acks
@@ -371,8 +510,8 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
if (msgId == null) {
break;
}
-
- newAckCommand(consumer.consumerId, msgId, null,
AckType.Individual, null, Collections.emptyMap(), cnx, false, -1, -1);
+ newMessageAckCommandAndWrite(cnx, consumer.consumerId,
msgId.getLedgerId(), msgId.getEntryId(), null,
+ AckType.Individual, null, Collections.emptyMap(),
false, null, null);
shouldFlush = true;
}
}
@@ -388,88 +527,10 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
}
}
- if (!pendingIndividualTransactionAcks.isEmpty()) {
- if
(Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion()))
{
- // We can send 1 single protobuf command with all individual
acks
- while (true) {
- Triple<Long, Long, MessageIdImpl> entry =
pendingIndividualTransactionAcks.pollFirst();
- if (entry == null) {
- break;
- }
-
- // if messageId is checked then all the chunked related to
that msg also processed so, ack all of
- // them
- MessageIdImpl[] chunkMsgIds =
this.consumer.unAckedChunkedMessageIdSequenceMap.get(entry.getRight());
- long mostSigBits = entry.getLeft();
- long leastSigBits = entry.getMiddle();
- MessageIdImpl messageId = entry.getRight();
- if (chunkMsgIds != null && chunkMsgIds.length > 1) {
- for (MessageIdImpl cMsgId : chunkMsgIds) {
- if (cMsgId != null) {
- newAckCommand(consumer.consumerId, cMsgId,
null, AckType.Individual, null, Collections.emptyMap(), cnx, false,
mostSigBits, leastSigBits);
- }
- }
- // messages will be acked so, remove checked message
sequence
-
this.consumer.unAckedChunkedMessageIdSequenceMap.remove(messageId);
- } else {
- newAckCommand(consumer.consumerId, messageId, null,
AckType.Individual, null, Collections.emptyMap(), cnx, false, mostSigBits,
leastSigBits);
- }
- }
- } else {
- // When talking to older brokers, send the acknowledgements
individually
- while (true) {
- Triple<Long, Long, MessageIdImpl> entry =
pendingIndividualTransactionAcks.pollFirst();
- if (entry == null) {
- break;
- }
-
- newAckCommand(consumer.consumerId, entry.getRight(), null,
AckType.Individual,
- null, Collections.emptyMap(), cnx, false,
entry.getLeft(), entry.getMiddle());
- shouldFlush = true;
- }
- }
- }
-
- if (!pendingIndividualTransactionBatchIndexAcks.isEmpty()) {
- Iterator<Map.Entry<TransactionImpl,
ConcurrentHashMap<MessageIdImpl, ConcurrentBitSetRecyclable>>>
transactionIterator =
pendingIndividualTransactionBatchIndexAcks.entrySet().iterator();
- while (transactionIterator.hasNext()) {
- Map.Entry<TransactionImpl, ConcurrentHashMap<MessageIdImpl,
ConcurrentBitSetRecyclable>> transactionEntry = transactionIterator.next();
- TransactionImpl txn = transactionEntry.getKey();
- synchronized (txn) {
- if
(pendingIndividualTransactionBatchIndexAcks.containsKey(txn)) {
- List<Triple<Long, Long, ConcurrentBitSetRecyclable>>
messageIdBitSetList = new ArrayList<>();
- transactionEntriesToAck.put(txn, messageIdBitSetList);
- Iterator<Map.Entry<MessageIdImpl,
ConcurrentBitSetRecyclable>> messageIdIterator =
transactionEntry.getValue().entrySet().iterator();
- while (messageIdIterator.hasNext()) {
- Map.Entry<MessageIdImpl,
ConcurrentBitSetRecyclable> messageIdEntry = messageIdIterator.next();
- ConcurrentBitSetRecyclable
concurrentBitSetRecyclable =
-
ConcurrentBitSetRecyclable.create(messageIdEntry.getValue());
- MessageIdImpl messageId = messageIdEntry.getKey();
-
messageIdBitSetList.add(Triple.of(messageId.ledgerId, messageId.entryId,
concurrentBitSetRecyclable));
- messageIdEntry.getValue().set(0,
messageIdEntry.getValue().size());
- messageIdIterator.remove();
- }
- transactionIterator.remove();
- }
- }
- }
- }
-
- if (transactionEntriesToAck.size() > 0) {
- Iterator<Map.Entry<TransactionImpl, List<Triple<Long, Long,
ConcurrentBitSetRecyclable>>>> iterator =
- transactionEntriesToAck.entrySet().iterator();
- while (iterator.hasNext()) {
- Map.Entry<TransactionImpl, List<Triple<Long, Long,
ConcurrentBitSetRecyclable>>> entry = iterator.next();
-
cnx.ctx().write(Commands.newMultiTransactionMessageAck(consumer.consumerId,
- new TxnID(entry.getKey().getTxnIdMostBits(),
- entry.getKey().getTxnIdLeastBits()),
entry.getValue()), cnx.ctx().voidPromise());
- shouldFlush = true;
- }
- }
-
if (entriesToAck.size() > 0) {
- cnx.ctx().write(Commands.newMultiMessageAck(consumer.consumerId,
entriesToAck),
- cnx.ctx().voidPromise());
+
+ newMessageAckCommandAndWrite(cnx, consumer.consumerId, 0L, 0L,
+ null, AckType.Individual, null, null, true,
currentIndividualAckFuture, entriesToAck);
shouldFlush = true;
}
@@ -480,12 +541,13 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
}
cnx.ctx().flush();
}
+
}
@Override
public void flushAndClean() {
flush();
- lastCumulativeAck = (MessageIdImpl) MessageId.earliest;
+ lastCumulativeAck = LastCumulativeAck.create((MessageIdImpl)
MessageIdImpl.earliest, null);
pendingIndividualAcks.clear();
}
@@ -497,46 +559,127 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
}
}
- private void newAckCommand(long consumerId, MessageIdImpl msgId,
BitSetRecyclable lastCumulativeAckSet,
- AckType ackType, ValidationError validationError, Map<String,
Long> map, ClientCnx cnx,
- boolean flush, long txnidMostBits, long
txnidLeastBits) {
-
- MessageIdImpl[] chunkMsgIds =
this.consumer.unAckedChunkedMessageIdSequenceMap.get(msgId);
- if (chunkMsgIds != null && txnidLeastBits < 0 && txnidMostBits < 0) {
- if
(Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())
- && ackType != AckType.Cumulative) {
+ private CompletableFuture<Void> newImmediateAckAndFlush(long consumerId,
MessageIdImpl msgId,
+ BitSetRecyclable
bitSet, AckType ackType,
+ Map<String, Long>
map, ClientCnx cnx) {
+ MessageIdImpl[] chunkMsgIds =
this.consumer.unAckedChunkedMessageIdSequenceMap.remove(msgId);
+ final CompletableFuture<Void> completableFuture;
+ // cumulative ack chunk by the last messageId
+ if (chunkMsgIds != null && ackType != AckType.Cumulative) {
+ if
(Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion()))
{
List<Triple<Long, Long, ConcurrentBitSetRecyclable>>
entriesToAck = new ArrayList<>(chunkMsgIds.length);
for (MessageIdImpl cMsgId : chunkMsgIds) {
if (cMsgId != null && chunkMsgIds.length > 1) {
entriesToAck.add(Triple.of(cMsgId.getLedgerId(),
cMsgId.getEntryId(), null));
}
}
- ByteBuf cmd = Commands.newMultiMessageAck(consumer.consumerId,
entriesToAck);
- if (flush) {
- cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
- } else {
- cnx.ctx().write(cmd, cnx.ctx().voidPromise());
- }
+ completableFuture = newMessageAckCommandAndWrite(cnx,
consumer.consumerId, 0L, 0L,
+ null, ackType, null, null, true, null, entriesToAck);
} else {
+ // if don't support multi message ack, it also support ack
receipt, so we should not think about the
+ // ack receipt in this logic
for (MessageIdImpl cMsgId : chunkMsgIds) {
- ByteBuf cmd = Commands.newAck(consumerId,
cMsgId.getLedgerId(), cMsgId.getEntryId(),
- lastCumulativeAckSet, ackType, validationError,
map);
- if (flush) {
- cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
- } else {
- cnx.ctx().write(cmd, cnx.ctx().voidPromise());
- }
+ newMessageAckCommandAndWrite(cnx, consumerId,
cMsgId.getLedgerId(), cMsgId.getEntryId(),
+ bitSet, ackType, null, map, true, null, null);
+ }
+ completableFuture = CompletableFuture.completedFuture(null);
+ }
+ } else {
+ completableFuture = newMessageAckCommandAndWrite(cnx, consumerId,
msgId.ledgerId, msgId.getEntryId(),
+ bitSet, ackType, null, map, true, null, null);
+ }
+ return completableFuture;
+ }
+
+ private CompletableFuture<Void> newMessageAckCommandAndWrite(ClientCnx
cnx, long consumerId, long ledgerId,
+ long entryId,
BitSetRecyclable ackSet, AckType ackType,
+
CommandAck.ValidationError validationError,
+ Map<String,
Long> properties, boolean flush,
+
TimedCompletableFuture<Void> timedCompletableFuture,
+
List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck) {
+ if (isAckReceiptEnabled(consumer.getClientCnx())) {
+ final long requestId = consumer.getClient().newRequestId();
+ final ByteBuf cmd;
+ if (entriesToAck == null) {
+ cmd = Commands.newAck(consumerId, ledgerId, entryId, ackSet,
+ ackType, null, properties, requestId);
+ } else {
+ cmd = Commands.newMultiMessageAck(consumerId, entriesToAck,
requestId);
+ }
+ if (timedCompletableFuture == null) {
+ return cnx.newAckForReceipt(cmd, requestId);
+ } else {
+ if (ackType == AckType.Individual) {
+ this.currentIndividualAckFuture = new
TimedCompletableFuture<>();
+ } else {
+ this.currentCumulativeAckFuture = new
TimedCompletableFuture<>();
}
+ cnx.newAckForReceiptWithFuture(cmd, requestId,
timedCompletableFuture);
+ return timedCompletableFuture;
}
- this.consumer.unAckedChunkedMessageIdSequenceMap.remove(msgId);
} else {
- ByteBuf cmd = Commands.newAck(consumerId, msgId.getLedgerId(),
msgId.getEntryId(), lastCumulativeAckSet,
- ackType, validationError, map, txnidLeastBits,
txnidMostBits, -1);
+ // client cnx don't support ack receipt, if we don't complete the
future, the client will block.
+ if (ackReceiptEnabled) {
+ synchronized (PersistentAcknowledgmentsGroupingTracker.this) {
+ if (!this.currentCumulativeAckFuture.isDone()) {
+ this.currentCumulativeAckFuture.complete(null);
+ }
+
+ if (!this.currentIndividualAckFuture.isDone()) {
+ this.currentIndividualAckFuture.complete(null);
+ }
+ }
+ }
+ final ByteBuf cmd;
+ if (entriesToAck == null) {
+ cmd = Commands.newAck(consumerId, ledgerId, entryId, ackSet,
+ ackType, null, properties, -1);
+ } else {
+ cmd = Commands.newMultiMessageAck(consumerId, entriesToAck,
-1);
+ }
if (flush) {
cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
} else {
cnx.ctx().write(cmd, cnx.ctx().voidPromise());
}
+ return CompletableFuture.completedFuture(null);
}
}
+
+ private boolean isAckReceiptEnabled(ClientCnx cnx) {
+ return ackReceiptEnabled && cnx != null
+ &&
Commands.peerSupportsAckReceipt(cnx.getRemoteEndpointProtocolVersion());
+ }
+
+ private static class LastCumulativeAck {
+ private MessageIdImpl messageId;
+ private BitSetRecyclable bitSetRecyclable;
+
+ static LastCumulativeAck create(MessageIdImpl messageId,
BitSetRecyclable bitSetRecyclable) {
+ LastCumulativeAck op = RECYCLER.get();
+ op.messageId = messageId;
+ op.bitSetRecyclable = bitSetRecyclable;
+ return op;
+ }
+
+ private LastCumulativeAck(Recycler.Handle<LastCumulativeAck>
recyclerHandle) {
+ this.recyclerHandle = recyclerHandle;
+ }
+
+ void recycle() {
+ if (bitSetRecyclable != null) {
+ this.bitSetRecyclable.recycle();
+ }
+ this.messageId = null;
+ recyclerHandle.recycle(this);
+ }
+
+ private final Recycler.Handle<LastCumulativeAck> recyclerHandle;
+ private static final Recycler<LastCumulativeAck> RECYCLER = new
Recycler<LastCumulativeAck>() {
+ @Override
+ protected LastCumulativeAck newObject(Handle<LastCumulativeAck>
handle) {
+ return new LastCumulativeAck(handle);
+ }
+ };
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index 665970c..b01cba4 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -129,6 +129,8 @@ public class ConsumerConfigurationData<T> implements
Serializable, Cloneable {
private boolean batchIndexAckEnabled = false;
+ private boolean ackReceiptEnabled = false;
+
public void setAutoUpdatePartitionsIntervalSeconds(int interval, TimeUnit
timeUnit) {
checkArgument(interval > 0, "interval needs to be > 0");
this.autoUpdatePartitionsIntervalSeconds =
timeUnit.toSeconds(interval);
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
index 9d95e98..a780a88 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
@@ -18,24 +18,31 @@
*/
package org.apache.pulsar.client.impl;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
+import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.util.TimedCompletableFuture;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
public class AcknowledgementsGroupingTrackerTest {
@@ -45,25 +52,38 @@ public class AcknowledgementsGroupingTrackerTest {
private EventLoopGroup eventLoopGroup;
@BeforeClass
- public void setup() {
+ public void setup() throws NoSuchFieldException, IllegalAccessException {
eventLoopGroup = new NioEventLoopGroup(1);
consumer = mock(ConsumerImpl.class);
consumer.unAckedChunkedMessageIdSequenceMap = new
ConcurrentOpenHashMap<>();
- cnx = mock(ClientCnx.class);
+ cnx = spy(new ClientCnxTest(new ClientConfigurationData(), new
NioEventLoopGroup()));
+ PulsarClientImpl client = mock(PulsarClientImpl.class);
+ doReturn(client).when(consumer).getClient();
+ doReturn(cnx).when(consumer).getClientCnx();
+ doReturn(new ConsumerStatsRecorderImpl()).when(consumer).getStats();
+ doReturn(new UnAckedMessageTracker().UNACKED_MESSAGE_TRACKER_DISABLED)
+ .when(consumer).getUnAckedMessageTracker();
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
when(cnx.ctx()).thenReturn(ctx);
}
+ @DataProvider(name = "isNeedReceipt")
+ public Object[][] isNeedReceipt() {
+ return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
+ }
+
@AfterClass
public void teardown() {
eventLoopGroup.shutdownGracefully();
}
- @Test
- public void testAckTracker() throws Exception {
+ @Test(dataProvider = "isNeedReceipt")
+ public void testAckTracker(boolean isNeedReceipt) throws Exception {
ConsumerConfigurationData<?> conf = new ConsumerConfigurationData<>();
conf.setAcknowledgementsGroupTimeMicros(TimeUnit.SECONDS.toMicros(10));
- PersistentAcknowledgmentsGroupingTracker tracker = new
PersistentAcknowledgmentsGroupingTracker(consumer, conf, eventLoopGroup);
+ conf.setAckReceiptEnabled(isNeedReceipt);
+ AcknowledgmentsGroupingTracker tracker;
+ tracker = new PersistentAcknowledgmentsGroupingTracker(consumer, conf,
eventLoopGroup);
MessageIdImpl msg1 = new MessageIdImpl(5, 1, 0);
MessageIdImpl msg2 = new MessageIdImpl(5, 2, 0);
@@ -74,12 +94,12 @@ public class AcknowledgementsGroupingTrackerTest {
assertFalse(tracker.isDuplicate(msg1));
- tracker.addAcknowledgment(msg1, AckType.Individual,
Collections.emptyMap(), null);
+ tracker.addAcknowledgment(msg1, AckType.Individual,
Collections.emptyMap());
assertTrue(tracker.isDuplicate(msg1));
assertFalse(tracker.isDuplicate(msg2));
- tracker.addAcknowledgment(msg5, AckType.Cumulative,
Collections.emptyMap(), null);
+ tracker.addAcknowledgment(msg5, AckType.Cumulative,
Collections.emptyMap());
assertTrue(tracker.isDuplicate(msg1));
assertTrue(tracker.isDuplicate(msg2));
assertTrue(tracker.isDuplicate(msg3));
@@ -99,7 +119,7 @@ public class AcknowledgementsGroupingTrackerTest {
assertTrue(tracker.isDuplicate(msg5));
assertFalse(tracker.isDuplicate(msg6));
- tracker.addAcknowledgment(msg6, AckType.Individual,
Collections.emptyMap(), null);
+ tracker.addAcknowledgment(msg6, AckType.Individual,
Collections.emptyMap());
assertTrue(tracker.isDuplicate(msg6));
when(consumer.getClientCnx()).thenReturn(cnx);
@@ -117,11 +137,13 @@ public class AcknowledgementsGroupingTrackerTest {
tracker.close();
}
- @Test
- public void testBatchAckTracker() throws Exception {
+ @Test(dataProvider = "isNeedReceipt")
+ public void testBatchAckTracker(boolean isNeedReceipt) throws Exception {
ConsumerConfigurationData<?> conf = new ConsumerConfigurationData<>();
conf.setAcknowledgementsGroupTimeMicros(TimeUnit.SECONDS.toMicros(10));
- PersistentAcknowledgmentsGroupingTracker tracker = new
PersistentAcknowledgmentsGroupingTracker(consumer, conf, eventLoopGroup);
+ conf.setAckReceiptEnabled(isNeedReceipt);
+ AcknowledgmentsGroupingTracker tracker;
+ tracker = new PersistentAcknowledgmentsGroupingTracker(consumer, conf,
eventLoopGroup);
MessageIdImpl msg1 = new MessageIdImpl(5, 1, 0);
MessageIdImpl msg2 = new MessageIdImpl(5, 2, 0);
@@ -175,11 +197,13 @@ public class AcknowledgementsGroupingTrackerTest {
tracker.close();
}
- @Test
- public void testImmediateAckingTracker() throws Exception {
+ @Test(dataProvider = "isNeedReceipt")
+ public void testImmediateAckingTracker(boolean isNeedReceipt) throws
Exception {
ConsumerConfigurationData<?> conf = new ConsumerConfigurationData<>();
conf.setAcknowledgementsGroupTimeMicros(0);
- PersistentAcknowledgmentsGroupingTracker tracker = new
PersistentAcknowledgmentsGroupingTracker(consumer, conf, eventLoopGroup);
+ conf.setAckReceiptEnabled(isNeedReceipt);
+ AcknowledgmentsGroupingTracker tracker;
+ tracker = new PersistentAcknowledgmentsGroupingTracker(consumer, conf,
eventLoopGroup);
MessageIdImpl msg1 = new MessageIdImpl(5, 1, 0);
MessageIdImpl msg2 = new MessageIdImpl(5, 2, 0);
@@ -188,7 +212,7 @@ public class AcknowledgementsGroupingTrackerTest {
when(consumer.getClientCnx()).thenReturn(null);
- tracker.addAcknowledgment(msg1, AckType.Individual,
Collections.emptyMap(), null);
+ tracker.addAcknowledgment(msg1, AckType.Individual,
Collections.emptyMap());
assertFalse(tracker.isDuplicate(msg1));
when(consumer.getClientCnx()).thenReturn(cnx);
@@ -196,17 +220,19 @@ public class AcknowledgementsGroupingTrackerTest {
tracker.flush();
assertFalse(tracker.isDuplicate(msg1));
- tracker.addAcknowledgment(msg2, AckType.Individual,
Collections.emptyMap(), null);
+ tracker.addAcknowledgment(msg2, AckType.Individual,
Collections.emptyMap());
// Since we were connected, the ack went out immediately
assertFalse(tracker.isDuplicate(msg2));
tracker.close();
}
- @Test
- public void testImmediateBatchAckingTracker() throws Exception {
+ @Test(dataProvider = "isNeedReceipt")
+ public void testImmediateBatchAckingTracker(boolean isNeedReceipt) throws
Exception {
ConsumerConfigurationData<?> conf = new ConsumerConfigurationData<>();
conf.setAcknowledgementsGroupTimeMicros(0);
- PersistentAcknowledgmentsGroupingTracker tracker = new
PersistentAcknowledgmentsGroupingTracker(consumer, conf, eventLoopGroup);
+ conf.setAckReceiptEnabled(isNeedReceipt);
+ AcknowledgmentsGroupingTracker tracker;
+ tracker = new PersistentAcknowledgmentsGroupingTracker(consumer, conf,
eventLoopGroup);
MessageIdImpl msg1 = new MessageIdImpl(5, 1, 0);
MessageIdImpl msg2 = new MessageIdImpl(5, 2, 0);
@@ -216,8 +242,6 @@ public class AcknowledgementsGroupingTrackerTest {
when(consumer.getClientCnx()).thenReturn(null);
tracker.addListAcknowledgment(Collections.singletonList(msg1),
AckType.Individual, Collections.emptyMap());
- tracker.flush();
- //cnx is null can not flush
assertTrue(tracker.isDuplicate(msg1));
when(consumer.getClientCnx()).thenReturn(cnx);
@@ -226,16 +250,20 @@ public class AcknowledgementsGroupingTrackerTest {
assertFalse(tracker.isDuplicate(msg1));
tracker.addListAcknowledgment(Collections.singletonList(msg2),
AckType.Individual, Collections.emptyMap());
+
+ tracker.flush();
// Since we were connected, the ack went out immediately
assertFalse(tracker.isDuplicate(msg2));
tracker.close();
}
- @Test
- public void testAckTrackerMultiAck() throws Exception {
+ @Test(dataProvider = "isNeedReceipt")
+ public void testAckTrackerMultiAck(boolean isNeedReceipt) throws Exception
{
ConsumerConfigurationData<?> conf = new ConsumerConfigurationData<>();
conf.setAcknowledgementsGroupTimeMicros(TimeUnit.SECONDS.toMicros(10));
- PersistentAcknowledgmentsGroupingTracker tracker = new
PersistentAcknowledgmentsGroupingTracker(consumer, conf, eventLoopGroup);
+ conf.setAckReceiptEnabled(isNeedReceipt);
+ AcknowledgmentsGroupingTracker tracker;
+ tracker = new PersistentAcknowledgmentsGroupingTracker(consumer, conf,
eventLoopGroup);
when(cnx.getRemoteEndpointProtocolVersion()).thenReturn(ProtocolVersion.v12_VALUE);
@@ -248,12 +276,12 @@ public class AcknowledgementsGroupingTrackerTest {
assertFalse(tracker.isDuplicate(msg1));
- tracker.addAcknowledgment(msg1, AckType.Individual,
Collections.emptyMap(), null);
+ tracker.addAcknowledgment(msg1, AckType.Individual,
Collections.emptyMap());
assertTrue(tracker.isDuplicate(msg1));
assertFalse(tracker.isDuplicate(msg2));
- tracker.addAcknowledgment(msg5, AckType.Cumulative,
Collections.emptyMap(), null);
+ tracker.addAcknowledgment(msg5, AckType.Cumulative,
Collections.emptyMap());
assertTrue(tracker.isDuplicate(msg1));
assertTrue(tracker.isDuplicate(msg2));
assertTrue(tracker.isDuplicate(msg3));
@@ -273,7 +301,7 @@ public class AcknowledgementsGroupingTrackerTest {
assertTrue(tracker.isDuplicate(msg5));
assertFalse(tracker.isDuplicate(msg6));
- tracker.addAcknowledgment(msg6, AckType.Individual,
Collections.emptyMap(), null);
+ tracker.addAcknowledgment(msg6, AckType.Individual,
Collections.emptyMap());
assertTrue(tracker.isDuplicate(msg6));
when(consumer.getClientCnx()).thenReturn(cnx);
@@ -291,11 +319,13 @@ public class AcknowledgementsGroupingTrackerTest {
tracker.close();
}
- @Test
- public void testBatchAckTrackerMultiAck() throws Exception {
+ @Test(dataProvider = "isNeedReceipt")
+ public void testBatchAckTrackerMultiAck(boolean isNeedReceipt) throws
Exception {
ConsumerConfigurationData<?> conf = new ConsumerConfigurationData<>();
conf.setAcknowledgementsGroupTimeMicros(TimeUnit.SECONDS.toMicros(10));
- PersistentAcknowledgmentsGroupingTracker tracker = new
PersistentAcknowledgmentsGroupingTracker(consumer, conf, eventLoopGroup);
+ conf.setAckReceiptEnabled(isNeedReceipt);
+ AcknowledgmentsGroupingTracker tracker;
+ tracker = new PersistentAcknowledgmentsGroupingTracker(consumer, conf,
eventLoopGroup);
when(cnx.getRemoteEndpointProtocolVersion()).thenReturn(ProtocolVersion.v12_VALUE);
@@ -350,4 +380,21 @@ public class AcknowledgementsGroupingTrackerTest {
tracker.close();
}
+
+ public class ClientCnxTest extends ClientCnx {
+
+ public ClientCnxTest(ClientConfigurationData conf, EventLoopGroup
eventLoopGroup) {
+ super(conf, eventLoopGroup);
+ }
+
+ @Override
+ public CompletableFuture<Void> newAckForReceipt(ByteBuf request, long
requestId) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public void newAckForReceiptWithFuture(ByteBuf request, long requestId,
+ TimedCompletableFuture<Void>
future) {
+ }
+ }
}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxRequestTimeoutQueueTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxRequestTimeoutQueueTest.java
index d0bd9b5..eb5207b 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxRequestTimeoutQueueTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxRequestTimeoutQueueTest.java
@@ -23,6 +23,7 @@ import io.netty.channel.*;
import io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.util.TimedCompletableFuture;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.testng.annotations.*;
@@ -86,6 +87,18 @@ public class ClientCnxRequestTimeoutQueueTest {
}
@Test
+ void testNewAckForResponseNoFlushTimeout() {
+ assertFutureTimesOut(cnx.newAckForReceipt(requestMessage, 1L));
+ }
+
+ @Test
+ void testNewAckForResponseFlushTimeout() {
+ TimedCompletableFuture<Void> timedCompletableFuture = new
TimedCompletableFuture<>();
+ cnx.newAckForReceiptWithFuture(requestMessage, 1L,
timedCompletableFuture);
+ assertFutureTimesOut(timedCompletableFuture);
+ }
+
+ @Test
void testGetSchemaRequestTimeout() {
assertFutureTimesOut(cnx.sendGetRawSchema(requestMessage, 1L));
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index e16ed9e..a29ceb1 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -858,18 +858,22 @@ public class Commands {
}
public static ByteBuf newMultiMessageAck(long consumerId,
- List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entries) {
+ List<Triple<Long, Long,
ConcurrentBitSetRecyclable>> entries,
+ long requestId) {
BaseCommand cmd = newMultiMessageAckCommon(entries);
cmd.getAck()
.setConsumerId(consumerId)
.setAckType(AckType.Individual);
+ if (requestId >= 0) {
+ cmd.getAck().setRequestId(requestId);
+ }
return serializeWithSize(cmd);
}
public static ByteBuf newAck(long consumerId, long ledgerId, long entryId,
BitSetRecyclable ackSet, AckType ackType,
- ValidationError validationError, Map<String,
Long> properties) {
+ ValidationError validationError, Map<String,
Long> properties, long requestId) {
return newAck(consumerId, ledgerId, entryId, ackSet, ackType,
validationError,
- properties, -1L, -1L, -1L, -1);
+ properties, -1L, -1L, requestId, -1);
}
public static ByteBuf newAck(long consumerId, long ledgerId, long entryId,
BitSetRecyclable ackSet, AckType ackType,
@@ -1693,6 +1697,10 @@ public class Commands {
return peerVersion >= ProtocolVersion.v15.getValue();
}
+ public static boolean peerSupportsAckReceipt(int peerVersion) {
+ return peerVersion >= ProtocolVersion.v17.getValue();
+ }
+
private static org.apache.pulsar.common.api.proto.ProducerAccessMode
convertProducerAccessMode(ProducerAccessMode accessMode) {
switch (accessMode) {
case Exclusive:
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto
b/pulsar-common/src/main/proto/PulsarApi.proto
index 5f3d427..ee5ac70 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -255,6 +255,7 @@ enum ProtocolVersion {
// Added Key_Shared subscription
v15 = 15; // Add CommandGetOrCreateSchema and
CommandGetOrCreateSchemaResponse
v16 = 16; // Add support for raw message metadata
+ v17 = 17; // Added support ack receipt
}
message CommandConnect {