This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a141f30 Improve batch message acking by removing batch message
tracker (#1424)
a141f30 is described below
commit a141f301deef56e6a6dac5a1d4e527c584ce9ec7
Author: Sijie Guo <[email protected]>
AuthorDate: Tue Mar 27 23:37:28 2018 -0700
Improve batch message acking by removing batch message tracker (#1424)
* Improve batch message acking by removing batch message tracker
* fix
* Fix the cumulative ack
* Fix batchsize
---
.../pulsar/broker/service/BatchMessageTest.java | 6 +-
.../pulsar/client/impl/BatchMessageAcker.java | 75 ++++++++++++
.../client/impl/BatchMessageAckerDisabled.java | 48 ++++++++
.../pulsar/client/impl/BatchMessageIdImpl.java | 38 +++++-
.../apache/pulsar/client/impl/ConsumerImpl.java | 134 +++------------------
.../client/impl/PartitionedConsumerImpl.java | 9 --
.../pulsar/client/impl/TopicsConsumerImpl.java | 10 --
.../client/impl/BatchMessageAckerDisabledTest.java | 47 ++++++++
.../pulsar/client/impl/BatchMessageAckerTest.java | 71 +++++++++++
9 files changed, 294 insertions(+), 144 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
index 2d7c7ae..533ce85 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
@@ -42,7 +43,6 @@ import org.apache.pulsar.client.api.MessageBuilder;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -434,7 +434,7 @@ public class BatchMessageTest extends BrokerTestBase {
Message<byte[]> lastunackedMsg = null;
for (int i = 0; i < numMsgs; i++) {
Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
- LOG.info("received message {}", String.valueOf(msg.getData()));
+ LOG.info("received message {}", new String(msg.getData(), UTF_8));
assertNotNull(msg);
if (i == 8) {
consumer.acknowledgeCumulative(msg);
@@ -514,7 +514,6 @@ public class BatchMessageTest extends BrokerTestBase {
Thread.sleep(100);
rolloverPerIntervalStats();
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(),
0);
- assertTrue(((ConsumerImpl<byte[]>)
consumer).isBatchingAckTrackerEmpty());
consumer.close();
producer.close();
noBatchProducer.close();
@@ -574,7 +573,6 @@ public class BatchMessageTest extends BrokerTestBase {
}
Thread.sleep(100);
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(),
0);
- assertTrue(((ConsumerImpl<byte[]>)
consumer).isBatchingAckTrackerEmpty());
consumer.close();
producer.close();
noBatchProducer.close();
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
new file mode 100644
index 0000000..75e50aa
--- /dev/null
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.BitSet;
+
+class BatchMessageAcker {
+
+ static BatchMessageAcker newAcker(int batchSize) {
+ BitSet bitSet = new BitSet(batchSize);
+ bitSet.set(0, batchSize);
+ return new BatchMessageAcker(bitSet, batchSize);
+ }
+
+ // bitset shared across messages in the same batch.
+ private final int batchSize;
+ private final BitSet bitSet;
+ private boolean prevBatchCumulativelyAcked = false;
+
+ BatchMessageAcker(BitSet bitSet, int batchSize) {
+ this.bitSet = bitSet;
+ this.batchSize = batchSize;
+ }
+
+ @VisibleForTesting
+ BitSet getBitSet() {
+ return bitSet;
+ }
+
+ public synchronized int getBatchSize() {
+ return batchSize;
+ }
+
+ public synchronized boolean ackIndividual(int batchIndex) {
+ bitSet.clear(batchIndex);
+ return bitSet.isEmpty();
+ }
+
+ public synchronized boolean ackCumulative(int batchIndex) {
+ // +1 since to argument is exclusive
+ bitSet.clear(0, batchIndex + 1);
+ return bitSet.isEmpty();
+ }
+
+ // debug purpose
+ public synchronized int getOutstandingAcks() {
+ return bitSet.cardinality();
+ }
+
+ public void setPrevBatchCumulativelyAcked(boolean acked) {
+ this.prevBatchCumulativelyAcked = acked;
+ }
+
+ public boolean isPrevBatchCumulativelyAcked() {
+ return prevBatchCumulativelyAcked;
+ }
+
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAckerDisabled.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAckerDisabled.java
new file mode 100644
index 0000000..a521d63
--- /dev/null
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAckerDisabled.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+class BatchMessageAckerDisabled extends BatchMessageAcker {
+
+ static final BatchMessageAckerDisabled INSTANCE = new
BatchMessageAckerDisabled();
+
+ private BatchMessageAckerDisabled() {
+ super(null, 0);
+ }
+
+ @Override
+ public synchronized int getBatchSize() {
+ return 0;
+ }
+
+ @Override
+ public boolean ackIndividual(int batchIndex) {
+ return true;
+ }
+
+ @Override
+ public boolean ackCumulative(int batchIndex) {
+ return true;
+ }
+
+ @Override
+ public int getOutstandingAcks() {
+ return 0;
+ }
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java
index f7c5bdd..d87a6ab 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java
@@ -27,17 +27,27 @@ public class BatchMessageIdImpl extends MessageIdImpl {
private final static int NO_BATCH = -1;
private final int batchIndex;
+ private final BatchMessageAcker acker;
+
public BatchMessageIdImpl(long ledgerId, long entryId, int partitionIndex,
int batchIndex) {
+ this(ledgerId, entryId, partitionIndex, batchIndex,
BatchMessageAckerDisabled.INSTANCE);
+ }
+
+ public BatchMessageIdImpl(long ledgerId, long entryId, int partitionIndex,
int batchIndex, BatchMessageAcker acker) {
super(ledgerId, entryId, partitionIndex);
this.batchIndex = batchIndex;
+ this.acker = acker;
}
public BatchMessageIdImpl(MessageIdImpl other) {
super(other.ledgerId, other.entryId, other.partitionIndex);
if (other instanceof BatchMessageIdImpl) {
- this.batchIndex = ((BatchMessageIdImpl) other).batchIndex;
+ BatchMessageIdImpl otherId = (BatchMessageIdImpl) other;
+ this.batchIndex = otherId.batchIndex;
+ this.acker = otherId.acker;
} else {
this.batchIndex = NO_BATCH;
+ this.acker = BatchMessageAckerDisabled.INSTANCE;
}
}
@@ -95,4 +105,30 @@ public class BatchMessageIdImpl extends MessageIdImpl {
public byte[] toByteArray() {
return toByteArray(batchIndex);
}
+
+ public boolean ackIndividual() {
+ return acker.ackIndividual(batchIndex);
+ }
+
+ public boolean ackCumulative() {
+ return acker.ackCumulative(batchIndex);
+ }
+
+ public int getOutstandingAcksInSameBatch() {
+ return acker.getOutstandingAcks();
+ }
+
+ public int getBatchSize() {
+ return acker.getBatchSize();
+ }
+
+ public MessageIdImpl prevBatchMessageId() {
+ return new MessageIdImpl(
+ ledgerId, entryId - 1, partitionIndex);
+ }
+
+ public BatchMessageAcker getAcker() {
+ return acker;
+ }
+
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 46709d2..c215431 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -27,17 +27,13 @@ import static
org.apache.pulsar.common.api.Commands.readChecksum;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.NavigableMap;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
@@ -110,7 +106,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
private final ReadWriteLock zeroQueueLock;
private final UnAckedMessageTracker unAckedMessageTracker;
- private final ConcurrentNavigableMap<MessageIdImpl, BitSet>
batchMessageAckTracker;
protected final ConsumerStatsRecorder stats;
private final int priorityLevel;
@@ -155,7 +150,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
this.receiverQueueRefillThreshold = conf.getReceiverQueueSize() / 2;
this.codecProvider = new CompressionCodecProvider();
this.priorityLevel = conf.getPriorityLevel();
- this.batchMessageAckTracker = new ConcurrentSkipListMap<>();
this.readCompacted = conf.isReadCompacted();
this.subscriptionInitialPosition =
conf.getSubscriptionInitialPosition();
@@ -219,7 +213,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
cnx.sendRequestWithId(unsubscribe, requestId).thenRun(() -> {
cnx.removeConsumer(consumerId);
log.info("[{}][{}] Successfully unsubscribed from topic",
topic, subscription);
- batchMessageAckTracker.clear();
unAckedMessageTracker.close();
unsubscribeFuture.complete(null);
setState(State.Closed);
@@ -355,75 +348,26 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
}
- // we may not be able to ack message being acked by client. However
messages in prior
- // batch may be ackable
- private void ackMessagesInEarlierBatch(BatchMessageIdImpl batchMessageId,
MessageIdImpl message,
- Map<String,Long> properties) {
- // get entry before this message and ack that message on broker
- MessageIdImpl lowerKey = batchMessageAckTracker.lowerKey(message);
- if (lowerKey != null) {
- NavigableMap<MessageIdImpl, BitSet> entriesUpto =
batchMessageAckTracker.headMap(lowerKey, true);
- for (Object key : entriesUpto.keySet()) {
- entriesUpto.remove(key);
- }
- if (log.isDebugEnabled()) {
- log.debug("[{}] [{}] ack prior message {} to broker on
cumulative ack for message {}", subscription,
- consumerId, lowerKey, batchMessageId);
- }
- sendAcknowledge(lowerKey, AckType.Cumulative, properties);
- } else {
- if (log.isDebugEnabled()) {
- log.debug("[{}] [{}] no messages prior to message {}",
subscription, consumerId, batchMessageId);
- }
- }
- }
-
boolean markAckForBatchMessage(BatchMessageIdImpl batchMessageId, AckType
ackType,
Map<String,Long> properties) {
- // we keep track of entire batch and so need MessageIdImpl and cannot
use BatchMessageIdImpl
- MessageIdImpl message = new
MessageIdImpl(batchMessageId.getLedgerId(), batchMessageId.getEntryId(),
- batchMessageId.getPartitionIndex());
- BitSet bitSet = batchMessageAckTracker.get(message);
- if (bitSet == null) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] [{}] message not found {} for ack {}",
subscription, consumerId, batchMessageId,
- ackType);
- }
- return true;
+ boolean isAllMsgsAcked;
+ if (ackType == AckType.Individual) {
+ isAllMsgsAcked = batchMessageId.ackIndividual();
+ } else {
+ isAllMsgsAcked = batchMessageId.ackCumulative();
}
- int batchIndex = batchMessageId.getBatchIndex();
- // bitset is not thread-safe and requires external synchronization
- int batchSize = 0;
- // only used for debug-logging
int outstandingAcks = 0;
- boolean isAllMsgsAcked = false;
- lock.writeLock().lock();
- try {
- batchSize = bitSet.length();
- if (ackType == AckType.Individual) {
- bitSet.clear(batchIndex);
- } else {
- // +1 since to argument is exclusive
- bitSet.clear(0, batchIndex + 1);
- }
- isAllMsgsAcked = bitSet.isEmpty();
- if (log.isDebugEnabled()) {
- outstandingAcks = bitSet.cardinality();
- }
- } finally {
- lock.writeLock().unlock();
+ if (log.isDebugEnabled()) {
+ outstandingAcks = batchMessageId.getOutstandingAcksInSameBatch();
}
+ int batchSize = batchMessageId.getBatchSize();
// all messages in this batch have been acked
if (isAllMsgsAcked) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] can ack message to broker {}, acktype {},
cardinality {}, length {}", subscription,
consumerName, batchMessageId, ackType,
outstandingAcks, batchSize);
}
- if (ackType == AckType.Cumulative) {
- batchMessageAckTracker.keySet().removeIf(m ->
(m.compareTo(message) <= 0));
- }
- batchMessageAckTracker.remove(message);
// increment Acknowledge-msg counter with number of messages in
batch only if AckType is Individual.
// CumulativeAckType is handled while sending ack to broker
if (ackType == AckType.Individual) {
@@ -431,9 +375,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
return true;
} else {
- // we cannot ack this message to broker. but prior message may be
ackable
- if (ackType == AckType.Cumulative) {
- ackMessagesInEarlierBatch(batchMessageId, message, properties);
+ if (AckType.Cumulative == ackType
+ && !batchMessageId.getAcker().isPrevBatchCumulativelyAcked()) {
+ sendAcknowledge(batchMessageId.prevBatchMessageId(),
AckType.Cumulative, properties);
+ batchMessageId.getAcker().setPrevBatchCumulativelyAcked(true);
}
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] cannot ack message to broker {}, acktype
{}, pending acks - {}", subscription,
@@ -443,38 +388,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
return false;
}
- // if we are consuming a mix of batch and non-batch messages then
cumulative ack on non-batch messages
- // should clean up the ack tracker as well
- private void updateBatchAckTracker(MessageIdImpl message, AckType ackType)
{
- if (batchMessageAckTracker.isEmpty()) {
- return;
- }
- MessageIdImpl lowerKey = batchMessageAckTracker.lowerKey(message);
- if (lowerKey != null) {
- NavigableMap<MessageIdImpl, BitSet> entriesUpto =
batchMessageAckTracker.headMap(lowerKey, true);
- for (Object key : entriesUpto.keySet()) {
- entriesUpto.remove(key);
- }
- if (log.isDebugEnabled()) {
- log.debug("[{}] [{}] updated batch ack tracker up to message
{} on cumulative ack for message {}",
- subscription, consumerId, lowerKey, message);
- }
- } else {
- if (log.isDebugEnabled()) {
- log.debug("[{}] [{}] no messages to clean up prior to message
{}", subscription, consumerId, message);
- }
- }
- }
-
- /**
- * helper method that returns current state of data structure used to
track acks for batch messages
- *
- * @return true if all batch messages have been acknowledged
- */
- public boolean isBatchingAckTrackerEmpty() {
- return batchMessageAckTracker.isEmpty();
- }
-
@Override
protected CompletableFuture<Void> doAcknowledge(MessageId messageId,
AckType ackType,
Map<String,Long>
properties) {
@@ -496,11 +409,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
return CompletableFuture.completedFuture(null);
}
}
- // if we got a cumulative ack on non batch message, check if any
earlier batch messages need to be removed
- // from batch message tracker
- if (ackType == AckType.Cumulative && !(messageId instanceof
BatchMessageIdImpl)) {
- updateBatchAckTracker((MessageIdImpl) messageId, ackType);
- }
return sendAcknowledge(messageId, ackType, properties);
}
@@ -560,7 +468,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
currentSize = incomingMessages.size();
startMessageId = clearReceiverQueue();
unAckedMessageTracker.clear();
- batchMessageAckTracker.clear();
}
boolean isDurable = subscriptionMode == SubscriptionMode.Durable;
@@ -702,7 +609,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
@Override
public CompletableFuture<Void> closeAsync() {
if (getState() == State.Closing || getState() == State.Closed) {
- batchMessageAckTracker.clear();
unAckedMessageTracker.close();
return CompletableFuture.completedFuture(null);
}
@@ -710,7 +616,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
if (!isConnected()) {
log.info("[{}] [{}] Closed Consumer (not connected)", topic,
subscription);
setState(State.Closed);
- batchMessageAckTracker.clear();
unAckedMessageTracker.close();
client.cleanupConsumer(this);
return CompletableFuture.completedFuture(null);
@@ -730,7 +635,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
if (exception == null || !cnx.ctx().channel().isActive()) {
log.info("[{}] [{}] Closed consumer", topic, subscription);
setState(State.Closed);
- batchMessageAckTracker.clear();
unAckedMessageTracker.close();
closeFuture.complete(null);
client.cleanupConsumer(this);
@@ -925,15 +829,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
int batchSize = msgMetadata.getNumMessagesInBatch();
// create ack tracker for entry aka batch
- BitSet bitSet = new BitSet(batchSize);
MessageIdImpl batchMessage = new
MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(),
getPartitionIndex());
- bitSet.set(0, batchSize);
- if (log.isDebugEnabled()) {
- log.debug("[{}] [{}] added bit set for message {}, cardinality {},
length {}", subscription, consumerName,
- batchMessage, bitSet.cardinality(), bitSet.length());
- }
- batchMessageAckTracker.put(batchMessage, bitSet);
+ BatchMessageAcker acker = BatchMessageAcker.newAcker(batchSize);
unAckedMessageTracker.add(batchMessage);
int skippedMessages = 0;
@@ -971,7 +869,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
BatchMessageIdImpl batchMessageIdImpl = new
BatchMessageIdImpl(messageId.getLedgerId(),
- messageId.getEntryId(), getPartitionIndex(), i);
+ messageId.getEntryId(), getPartitionIndex(), i, acker);
final MessageImpl<T> message = new
MessageImpl<>(batchMessageIdImpl, msgMetadata,
singleMessageMetadataBuilder.build(),
singleMessagePayload, cnx, schema);
lock.readLock().lock();
@@ -988,9 +886,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
singleMessageMetadataBuilder.recycle();
}
} catch (IOException e) {
- //
log.warn("[{}] [{}] unable to obtain message in batch",
subscription, consumerName);
- batchMessageAckTracker.remove(batchMessage);
discardCorruptedMessage(messageId, cnx,
ValidationError.BatchDeSerializeError);
}
if (log.isDebugEnabled()) {
@@ -1192,7 +1088,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
currentSize = incomingMessages.size();
incomingMessages.clear();
unAckedMessageTracker.clear();
- batchMessageAckTracker.clear();
}
cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages(consumerId),
cnx.ctx().voidPromise());
if (currentSize > 0) {
@@ -1232,7 +1127,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
batches.forEach(ids -> {
List<MessageIdData> messageIdDatas =
ids.stream().map(messageId -> {
// attempt to remove message from batchMessageAckTracker
- batchMessageAckTracker.remove(messageId);
builder.setPartition(messageId.getPartitionIndex());
builder.setLedgerId(messageId.getLedgerId());
builder.setEntryId(messageId.getEntryId());
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
index 2d8ad4e..4bd2fda 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
@@ -489,15 +489,6 @@ public class PartitionedConsumerImpl<T> extends
ConsumerBase<T> {
return FutureUtil.failedFuture(new PulsarClientException("Seek
operation not supported on partitioned topics"));
}
- /**
- * helper method that returns current state of data structure used to
track acks for batch messages
- *
- * @return true if all batch messages have been acknowledged
- */
- public boolean isBatchingAckTrackerEmpty() {
- return
consumers.stream().allMatch(ConsumerImpl::isBatchingAckTrackerEmpty);
- }
-
List<ConsumerImpl<T>> getConsumers() {
return consumers;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
index e39de96..1089cef 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
@@ -536,16 +536,6 @@ public class TopicsConsumerImpl<T> extends ConsumerBase<T>
{
return FutureUtil.failedFuture(new PulsarClientException("Seek
operation not supported on topics consumer"));
}
- /**
- * helper method that returns current state of data structure used to
track acks for batch messages
- *
- * @return true if all batch messages have been acknowledged
- */
- public boolean isBatchingAckTrackerEmpty() {
- return consumers.values().stream().allMatch(consumer ->
consumer.isBatchingAckTrackerEmpty());
- }
-
-
@Override
public int getAvailablePermits() {
return
consumers.values().stream().mapToInt(ConsumerImpl::getAvailablePermits).sum();
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerDisabledTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerDisabledTest.java
new file mode 100644
index 0000000..e32a2ef
--- /dev/null
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerDisabledTest.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import org.testng.annotations.Test;
+
+public class BatchMessageAckerDisabledTest {
+
+ @Test
+ public void testAckIndividual() {
+ for (int i = 0; i < 10; i++) {
+ assertTrue(BatchMessageAckerDisabled.INSTANCE.ackIndividual(i));
+ }
+ }
+
+ @Test
+ public void testAckCumulative() {
+ for (int i = 0; i < 10; i++) {
+ assertTrue(BatchMessageAckerDisabled.INSTANCE.ackCumulative(i));
+ }
+ }
+
+ @Test
+ public void testGetOutstandingAcks() {
+ assertEquals(0,
BatchMessageAckerDisabled.INSTANCE.getOutstandingAcks());
+ }
+
+}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerTest.java
new file mode 100644
index 0000000..2bfa620
--- /dev/null
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerTest.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class BatchMessageAckerTest {
+
+ private static final int BATCH_SIZE = 10;
+
+ private BatchMessageAcker acker;
+
+ @BeforeMethod
+ public void setup() {
+ acker = BatchMessageAcker.newAcker(10);
+ }
+
+ @Test
+ public void testAckers() {
+ assertEquals(BATCH_SIZE, acker.getOutstandingAcks());
+ assertEquals(BATCH_SIZE, acker.getBatchSize());
+
+ assertFalse(acker.ackIndividual(4));
+ for (int i = 0; i < BATCH_SIZE; i++) {
+ if (4 == i) {
+ assertFalse(acker.getBitSet().get(i));
+ } else {
+ assertTrue(acker.getBitSet().get(i));
+ }
+ }
+
+ assertFalse(acker.ackCumulative(6));
+ for (int i = 0; i < BATCH_SIZE; i++) {
+ if (i <= 6) {
+ assertFalse(acker.getBitSet().get(i));
+ } else {
+ assertTrue(acker.getBitSet().get(i));
+ }
+ }
+
+ for (int i = BATCH_SIZE - 1; i >= 8; i--) {
+ assertFalse(acker.ackIndividual(i));
+ assertFalse(acker.getBitSet().get(i));
+ }
+
+ assertTrue(acker.ackIndividual(7));
+ assertEquals(0, acker.getOutstandingAcks());
+ }
+
+}
--
To stop receiving notification emails like this one, please contact
[email protected].