This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a141f30  Improve batch message acking by removing batch message 
tracker (#1424)
a141f30 is described below

commit a141f301deef56e6a6dac5a1d4e527c584ce9ec7
Author: Sijie Guo <[email protected]>
AuthorDate: Tue Mar 27 23:37:28 2018 -0700

    Improve batch message acking by removing batch message tracker (#1424)
    
    * Improve batch message acking by removing batch message tracker
    
    * fix
    
    * Fix the cumulative ack
    
    * Fix batchsize
---
 .../pulsar/broker/service/BatchMessageTest.java    |   6 +-
 .../pulsar/client/impl/BatchMessageAcker.java      |  75 ++++++++++++
 .../client/impl/BatchMessageAckerDisabled.java     |  48 ++++++++
 .../pulsar/client/impl/BatchMessageIdImpl.java     |  38 +++++-
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 134 +++------------------
 .../client/impl/PartitionedConsumerImpl.java       |   9 --
 .../pulsar/client/impl/TopicsConsumerImpl.java     |  10 --
 .../client/impl/BatchMessageAckerDisabledTest.java |  47 ++++++++
 .../pulsar/client/impl/BatchMessageAckerTest.java  |  71 +++++++++++
 9 files changed, 294 insertions(+), 144 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
index 2d7c7ae..533ce85 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
@@ -42,7 +43,6 @@ import org.apache.pulsar.client.api.MessageBuilder;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -434,7 +434,7 @@ public class BatchMessageTest extends BrokerTestBase {
         Message<byte[]> lastunackedMsg = null;
         for (int i = 0; i < numMsgs; i++) {
             Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
-            LOG.info("received message {}", String.valueOf(msg.getData()));
+            LOG.info("received message {}", new String(msg.getData(), UTF_8));
             assertNotNull(msg);
             if (i == 8) {
                 consumer.acknowledgeCumulative(msg);
@@ -514,7 +514,6 @@ public class BatchMessageTest extends BrokerTestBase {
         Thread.sleep(100);
         rolloverPerIntervalStats();
         
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(),
 0);
-        assertTrue(((ConsumerImpl<byte[]>) 
consumer).isBatchingAckTrackerEmpty());
         consumer.close();
         producer.close();
         noBatchProducer.close();
@@ -574,7 +573,6 @@ public class BatchMessageTest extends BrokerTestBase {
         }
         Thread.sleep(100);
         
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(),
 0);
-        assertTrue(((ConsumerImpl<byte[]>) 
consumer).isBatchingAckTrackerEmpty());
         consumer.close();
         producer.close();
         noBatchProducer.close();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
new file mode 100644
index 0000000..75e50aa
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.BitSet;
+
+class BatchMessageAcker {
+
+    static BatchMessageAcker newAcker(int batchSize) {
+        BitSet bitSet = new BitSet(batchSize);
+        bitSet.set(0, batchSize);
+        return new BatchMessageAcker(bitSet, batchSize);
+    }
+
+    // bitset shared across messages in the same batch.
+    private final int batchSize;
+    private final BitSet bitSet;
+    private boolean prevBatchCumulativelyAcked = false;
+
+    BatchMessageAcker(BitSet bitSet, int batchSize) {
+        this.bitSet = bitSet;
+        this.batchSize = batchSize;
+    }
+
+    @VisibleForTesting
+    BitSet getBitSet() {
+        return bitSet;
+    }
+
+    public synchronized int getBatchSize() {
+        return batchSize;
+    }
+
+    public synchronized boolean ackIndividual(int batchIndex) {
+        bitSet.clear(batchIndex);
+        return bitSet.isEmpty();
+    }
+
+    public synchronized boolean ackCumulative(int batchIndex) {
+        // +1 since to argument is exclusive
+        bitSet.clear(0, batchIndex + 1);
+        return bitSet.isEmpty();
+    }
+
+    // debug purpose
+    public synchronized int getOutstandingAcks() {
+        return bitSet.cardinality();
+    }
+
+    public void setPrevBatchCumulativelyAcked(boolean acked) {
+        this.prevBatchCumulativelyAcked = acked;
+    }
+
+    public boolean isPrevBatchCumulativelyAcked() {
+        return prevBatchCumulativelyAcked;
+    }
+
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAckerDisabled.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAckerDisabled.java
new file mode 100644
index 0000000..a521d63
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAckerDisabled.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+class BatchMessageAckerDisabled extends BatchMessageAcker {
+
+    static final BatchMessageAckerDisabled INSTANCE = new 
BatchMessageAckerDisabled();
+
+    private BatchMessageAckerDisabled() {
+        super(null, 0);
+    }
+
+    @Override
+    public synchronized int getBatchSize() {
+        return 0;
+    }
+
+    @Override
+    public boolean ackIndividual(int batchIndex) {
+        return true;
+    }
+
+    @Override
+    public boolean ackCumulative(int batchIndex) {
+        return true;
+    }
+
+    @Override
+    public int getOutstandingAcks() {
+        return 0;
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java
index f7c5bdd..d87a6ab 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java
@@ -27,17 +27,27 @@ public class BatchMessageIdImpl extends MessageIdImpl {
     private final static int NO_BATCH = -1;
     private final int batchIndex;
 
+    private final BatchMessageAcker acker;
+
     public BatchMessageIdImpl(long ledgerId, long entryId, int partitionIndex, 
int batchIndex) {
+        this(ledgerId, entryId, partitionIndex, batchIndex, 
BatchMessageAckerDisabled.INSTANCE);
+    }
+
+    public BatchMessageIdImpl(long ledgerId, long entryId, int partitionIndex, 
int batchIndex, BatchMessageAcker acker) {
         super(ledgerId, entryId, partitionIndex);
         this.batchIndex = batchIndex;
+        this.acker = acker;
     }
 
     public BatchMessageIdImpl(MessageIdImpl other) {
         super(other.ledgerId, other.entryId, other.partitionIndex);
         if (other instanceof BatchMessageIdImpl) {
-            this.batchIndex = ((BatchMessageIdImpl) other).batchIndex;
+            BatchMessageIdImpl otherId = (BatchMessageIdImpl) other;
+            this.batchIndex = otherId.batchIndex;
+            this.acker = otherId.acker;
         } else {
             this.batchIndex = NO_BATCH;
+            this.acker = BatchMessageAckerDisabled.INSTANCE;
         }
     }
 
@@ -95,4 +105,30 @@ public class BatchMessageIdImpl extends MessageIdImpl {
     public byte[] toByteArray() {
         return toByteArray(batchIndex);
     }
+
+    public boolean ackIndividual() {
+        return acker.ackIndividual(batchIndex);
+    }
+
+    public boolean ackCumulative() {
+        return acker.ackCumulative(batchIndex);
+    }
+
+    public int getOutstandingAcksInSameBatch() {
+        return acker.getOutstandingAcks();
+    }
+
+    public int getBatchSize() {
+        return acker.getBatchSize();
+    }
+
+    public MessageIdImpl prevBatchMessageId() {
+        return new MessageIdImpl(
+            ledgerId, entryId - 1, partitionIndex);
+    }
+
+    public BatchMessageAcker getAcker() {
+        return acker;
+    }
+
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 46709d2..c215431 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -27,17 +27,13 @@ import static 
org.apache.pulsar.common.api.Commands.readChecksum;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.BitSet;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.NavigableMap;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
@@ -110,7 +106,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     private final ReadWriteLock zeroQueueLock;
 
     private final UnAckedMessageTracker unAckedMessageTracker;
-    private final ConcurrentNavigableMap<MessageIdImpl, BitSet> 
batchMessageAckTracker;
 
     protected final ConsumerStatsRecorder stats;
     private final int priorityLevel;
@@ -155,7 +150,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         this.receiverQueueRefillThreshold = conf.getReceiverQueueSize() / 2;
         this.codecProvider = new CompressionCodecProvider();
         this.priorityLevel = conf.getPriorityLevel();
-        this.batchMessageAckTracker = new ConcurrentSkipListMap<>();
         this.readCompacted = conf.isReadCompacted();
         this.subscriptionInitialPosition = 
conf.getSubscriptionInitialPosition();
 
@@ -219,7 +213,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             cnx.sendRequestWithId(unsubscribe, requestId).thenRun(() -> {
                 cnx.removeConsumer(consumerId);
                 log.info("[{}][{}] Successfully unsubscribed from topic", 
topic, subscription);
-                batchMessageAckTracker.clear();
                 unAckedMessageTracker.close();
                 unsubscribeFuture.complete(null);
                 setState(State.Closed);
@@ -355,75 +348,26 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         }
     }
 
-    // we may not be able to ack message being acked by client. However 
messages in prior
-    // batch may be ackable
-    private void ackMessagesInEarlierBatch(BatchMessageIdImpl batchMessageId, 
MessageIdImpl message,
-                                           Map<String,Long> properties) {
-        // get entry before this message and ack that message on broker
-        MessageIdImpl lowerKey = batchMessageAckTracker.lowerKey(message);
-        if (lowerKey != null) {
-            NavigableMap<MessageIdImpl, BitSet> entriesUpto = 
batchMessageAckTracker.headMap(lowerKey, true);
-            for (Object key : entriesUpto.keySet()) {
-                entriesUpto.remove(key);
-            }
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] [{}] ack prior message {} to broker on 
cumulative ack for message {}", subscription,
-                        consumerId, lowerKey, batchMessageId);
-            }
-            sendAcknowledge(lowerKey, AckType.Cumulative, properties);
-        } else {
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] [{}] no messages prior to message {}", 
subscription, consumerId, batchMessageId);
-            }
-        }
-    }
-
     boolean markAckForBatchMessage(BatchMessageIdImpl batchMessageId, AckType 
ackType,
                                    Map<String,Long> properties) {
-        // we keep track of entire batch and so need MessageIdImpl and cannot 
use BatchMessageIdImpl
-        MessageIdImpl message = new 
MessageIdImpl(batchMessageId.getLedgerId(), batchMessageId.getEntryId(),
-                batchMessageId.getPartitionIndex());
-        BitSet bitSet = batchMessageAckTracker.get(message);
-        if (bitSet == null) {
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] [{}] message not found {} for ack {}", 
subscription, consumerId, batchMessageId,
-                        ackType);
-            }
-            return true;
+        boolean isAllMsgsAcked;
+        if (ackType == AckType.Individual) {
+            isAllMsgsAcked = batchMessageId.ackIndividual();
+        } else {
+            isAllMsgsAcked = batchMessageId.ackCumulative();
         }
-        int batchIndex = batchMessageId.getBatchIndex();
-        // bitset is not thread-safe and requires external synchronization
-        int batchSize = 0;
-        // only used for debug-logging
         int outstandingAcks = 0;
-        boolean isAllMsgsAcked = false;
-        lock.writeLock().lock();
-        try {
-            batchSize = bitSet.length();
-            if (ackType == AckType.Individual) {
-                bitSet.clear(batchIndex);
-            } else {
-                // +1 since to argument is exclusive
-                bitSet.clear(0, batchIndex + 1);
-            }
-            isAllMsgsAcked = bitSet.isEmpty();
-            if (log.isDebugEnabled()) {
-                outstandingAcks = bitSet.cardinality();
-            }
-        } finally {
-            lock.writeLock().unlock();
+        if (log.isDebugEnabled()) {
+            outstandingAcks = batchMessageId.getOutstandingAcksInSameBatch();
         }
 
+        int batchSize = batchMessageId.getBatchSize();
         // all messages in this batch have been acked
         if (isAllMsgsAcked) {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] [{}] can ack message to broker {}, acktype {}, 
cardinality {}, length {}", subscription,
                         consumerName, batchMessageId, ackType, 
outstandingAcks, batchSize);
             }
-            if (ackType == AckType.Cumulative) {
-                batchMessageAckTracker.keySet().removeIf(m -> 
(m.compareTo(message) <= 0));
-            }
-            batchMessageAckTracker.remove(message);
             // increment Acknowledge-msg counter with number of messages in 
batch only if AckType is Individual.
             // CumulativeAckType is handled while sending ack to broker
             if (ackType == AckType.Individual) {
@@ -431,9 +375,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             }
             return true;
         } else {
-            // we cannot ack this message to broker. but prior message may be 
ackable
-            if (ackType == AckType.Cumulative) {
-                ackMessagesInEarlierBatch(batchMessageId, message, properties);
+            if (AckType.Cumulative == ackType
+                && !batchMessageId.getAcker().isPrevBatchCumulativelyAcked()) {
+                sendAcknowledge(batchMessageId.prevBatchMessageId(), 
AckType.Cumulative, properties);
+                batchMessageId.getAcker().setPrevBatchCumulativelyAcked(true);
             }
             if (log.isDebugEnabled()) {
                 log.debug("[{}] [{}] cannot ack message to broker {}, acktype 
{}, pending acks - {}", subscription,
@@ -443,38 +388,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         return false;
     }
 
-    // if we are consuming a mix of batch and non-batch messages then 
cumulative ack on non-batch messages
-    // should clean up the ack tracker as well
-    private void updateBatchAckTracker(MessageIdImpl message, AckType ackType) 
{
-        if (batchMessageAckTracker.isEmpty()) {
-            return;
-        }
-        MessageIdImpl lowerKey = batchMessageAckTracker.lowerKey(message);
-        if (lowerKey != null) {
-            NavigableMap<MessageIdImpl, BitSet> entriesUpto = 
batchMessageAckTracker.headMap(lowerKey, true);
-            for (Object key : entriesUpto.keySet()) {
-                entriesUpto.remove(key);
-            }
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] [{}] updated batch ack tracker up to message 
{} on cumulative ack for message {}",
-                        subscription, consumerId, lowerKey, message);
-            }
-        } else {
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] [{}] no messages to clean up prior to message 
{}", subscription, consumerId, message);
-            }
-        }
-    }
-
-    /**
-     * helper method that returns current state of data structure used to 
track acks for batch messages
-     *
-     * @return true if all batch messages have been acknowledged
-     */
-    public boolean isBatchingAckTrackerEmpty() {
-        return batchMessageAckTracker.isEmpty();
-    }
-
     @Override
     protected CompletableFuture<Void> doAcknowledge(MessageId messageId, 
AckType ackType,
                                                     Map<String,Long> 
properties) {
@@ -496,11 +409,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 return CompletableFuture.completedFuture(null);
             }
         }
-        // if we got a cumulative ack on non batch message, check if any 
earlier batch messages need to be removed
-        // from batch message tracker
-        if (ackType == AckType.Cumulative && !(messageId instanceof 
BatchMessageIdImpl)) {
-            updateBatchAckTracker((MessageIdImpl) messageId, ackType);
-        }
         return sendAcknowledge(messageId, ackType, properties);
     }
 
@@ -560,7 +468,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             currentSize = incomingMessages.size();
             startMessageId = clearReceiverQueue();
             unAckedMessageTracker.clear();
-            batchMessageAckTracker.clear();
         }
 
         boolean isDurable = subscriptionMode == SubscriptionMode.Durable;
@@ -702,7 +609,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     @Override
     public CompletableFuture<Void> closeAsync() {
         if (getState() == State.Closing || getState() == State.Closed) {
-            batchMessageAckTracker.clear();
             unAckedMessageTracker.close();
             return CompletableFuture.completedFuture(null);
         }
@@ -710,7 +616,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         if (!isConnected()) {
             log.info("[{}] [{}] Closed Consumer (not connected)", topic, 
subscription);
             setState(State.Closed);
-            batchMessageAckTracker.clear();
             unAckedMessageTracker.close();
             client.cleanupConsumer(this);
             return CompletableFuture.completedFuture(null);
@@ -730,7 +635,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             if (exception == null || !cnx.ctx().channel().isActive()) {
                 log.info("[{}] [{}] Closed consumer", topic, subscription);
                 setState(State.Closed);
-                batchMessageAckTracker.clear();
                 unAckedMessageTracker.close();
                 closeFuture.complete(null);
                 client.cleanupConsumer(this);
@@ -925,15 +829,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         int batchSize = msgMetadata.getNumMessagesInBatch();
 
         // create ack tracker for entry aka batch
-        BitSet bitSet = new BitSet(batchSize);
         MessageIdImpl batchMessage = new 
MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(),
                 getPartitionIndex());
-        bitSet.set(0, batchSize);
-        if (log.isDebugEnabled()) {
-            log.debug("[{}] [{}] added bit set for message {}, cardinality {}, 
length {}", subscription, consumerName,
-                    batchMessage, bitSet.cardinality(), bitSet.length());
-        }
-        batchMessageAckTracker.put(batchMessage, bitSet);
+        BatchMessageAcker acker = BatchMessageAcker.newAcker(batchSize);
         unAckedMessageTracker.add(batchMessage);
 
         int skippedMessages = 0;
@@ -971,7 +869,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 }
 
                 BatchMessageIdImpl batchMessageIdImpl = new 
BatchMessageIdImpl(messageId.getLedgerId(),
-                        messageId.getEntryId(), getPartitionIndex(), i);
+                        messageId.getEntryId(), getPartitionIndex(), i, acker);
                 final MessageImpl<T> message = new 
MessageImpl<>(batchMessageIdImpl, msgMetadata,
                         singleMessageMetadataBuilder.build(), 
singleMessagePayload, cnx, schema);
                 lock.readLock().lock();
@@ -988,9 +886,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 singleMessageMetadataBuilder.recycle();
             }
         } catch (IOException e) {
-            //
             log.warn("[{}] [{}] unable to obtain message in batch", 
subscription, consumerName);
-            batchMessageAckTracker.remove(batchMessage);
             discardCorruptedMessage(messageId, cnx, 
ValidationError.BatchDeSerializeError);
         }
         if (log.isDebugEnabled()) {
@@ -1192,7 +1088,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 currentSize = incomingMessages.size();
                 incomingMessages.clear();
                 unAckedMessageTracker.clear();
-                batchMessageAckTracker.clear();
             }
             
cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages(consumerId),
 cnx.ctx().voidPromise());
             if (currentSize > 0) {
@@ -1232,7 +1127,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             batches.forEach(ids -> {
                 List<MessageIdData> messageIdDatas = 
ids.stream().map(messageId -> {
                     // attempt to remove message from batchMessageAckTracker
-                    batchMessageAckTracker.remove(messageId);
                     builder.setPartition(messageId.getPartitionIndex());
                     builder.setLedgerId(messageId.getLedgerId());
                     builder.setEntryId(messageId.getEntryId());
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
index 2d8ad4e..4bd2fda 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
@@ -489,15 +489,6 @@ public class PartitionedConsumerImpl<T> extends 
ConsumerBase<T> {
         return FutureUtil.failedFuture(new PulsarClientException("Seek 
operation not supported on partitioned topics"));
     }
 
-    /**
-     * helper method that returns current state of data structure used to 
track acks for batch messages
-     *
-     * @return true if all batch messages have been acknowledged
-     */
-    public boolean isBatchingAckTrackerEmpty() {
-        return 
consumers.stream().allMatch(ConsumerImpl::isBatchingAckTrackerEmpty);
-    }
-
     List<ConsumerImpl<T>> getConsumers() {
         return consumers;
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
index e39de96..1089cef 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
@@ -536,16 +536,6 @@ public class TopicsConsumerImpl<T> extends ConsumerBase<T> 
{
         return FutureUtil.failedFuture(new PulsarClientException("Seek 
operation not supported on topics consumer"));
     }
 
-    /**
-     * helper method that returns current state of data structure used to 
track acks for batch messages
-     *
-     * @return true if all batch messages have been acknowledged
-     */
-    public boolean isBatchingAckTrackerEmpty() {
-        return consumers.values().stream().allMatch(consumer -> 
consumer.isBatchingAckTrackerEmpty());
-    }
-
-
     @Override
     public int getAvailablePermits() {
         return 
consumers.values().stream().mapToInt(ConsumerImpl::getAvailablePermits).sum();
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerDisabledTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerDisabledTest.java
new file mode 100644
index 0000000..e32a2ef
--- /dev/null
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerDisabledTest.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import org.testng.annotations.Test;
+
+public class BatchMessageAckerDisabledTest {
+
+    @Test
+    public void testAckIndividual() {
+        for (int i = 0; i < 10; i++) {
+            assertTrue(BatchMessageAckerDisabled.INSTANCE.ackIndividual(i));
+        }
+    }
+
+    @Test
+    public void testAckCumulative() {
+        for (int i = 0; i < 10; i++) {
+            assertTrue(BatchMessageAckerDisabled.INSTANCE.ackCumulative(i));
+        }
+    }
+
+    @Test
+    public void testGetOutstandingAcks() {
+        assertEquals(0, 
BatchMessageAckerDisabled.INSTANCE.getOutstandingAcks());
+    }
+
+}
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerTest.java
new file mode 100644
index 0000000..2bfa620
--- /dev/null
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerTest.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class BatchMessageAckerTest {
+
+    private static final int BATCH_SIZE = 10;
+
+    private BatchMessageAcker acker;
+
+    @BeforeMethod
+    public void setup() {
+        acker = BatchMessageAcker.newAcker(10);
+    }
+
+    @Test
+    public void testAckers() {
+        assertEquals(BATCH_SIZE, acker.getOutstandingAcks());
+        assertEquals(BATCH_SIZE, acker.getBatchSize());
+
+        assertFalse(acker.ackIndividual(4));
+        for (int i = 0; i < BATCH_SIZE; i++) {
+            if (4 == i) {
+                assertFalse(acker.getBitSet().get(i));
+            } else {
+                assertTrue(acker.getBitSet().get(i));
+            }
+        }
+
+        assertFalse(acker.ackCumulative(6));
+        for (int i = 0; i < BATCH_SIZE; i++) {
+            if (i <= 6) {
+                assertFalse(acker.getBitSet().get(i));
+            } else {
+                assertTrue(acker.getBitSet().get(i));
+            }
+        }
+
+        for (int i = BATCH_SIZE - 1; i >= 8; i--) {
+            assertFalse(acker.ackIndividual(i));
+            assertFalse(acker.getBitSet().get(i));
+        }
+
+        assertTrue(acker.ackIndividual(7));
+        assertEquals(0, acker.getOutstandingAcks());
+    }
+
+}

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to