This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new db26946  UnAcked message tracker based on TimePartition (#3118)
db26946 is described below

commit db26946520d3fe2c66bc0bc7c87815a8ef3754f4
Author: penghui <[email protected]>
AuthorDate: Sat Dec 22 22:04:48 2018 +0800

    UnAcked message tracker based on TimePartition (#3118)
    
    ### Motivation
    
    currently un-ack message tracker  redelivery is done in batch, not taking 
into account the actual arrival time of the message.
    
    Split acktimeout into several time-partition, user can define the tick 
duration.
    
    ### Modifications
    
    Improve UnAckedMessageTracker use time partitions to maintain un-acked 
messages, UnAckedMessageTracker main partitions by LinkedList, remove first 
partition from the LinkedList and process it and add last partition to the 
LinkedList when timeout. All un-acked messages add into the last partition.
    
    ### Result
    
    UT passed
---
 .../PerMessageUnAcknowledgedRedeliveryTest.java    |   9 +-
 .../pulsar/client/impl/TopicsConsumerImplTest.java |   3 +-
 .../pulsar/client/api/ConsumerConfiguration.java   |   8 +-
 .../pulsar/client/impl/ConsumerBuilderImpl.java    |   1 -
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  10 +-
 .../client/impl/MultiTopicsConsumerImpl.java       |   6 +-
 .../pulsar/client/impl/UnAckedMessageTracker.java  | 163 ++++++++++++---------
 .../client/impl/UnAckedTopicMessageTracker.java    |  40 +++--
 .../impl/conf/ConsumerConfigurationData.java       |   8 +-
 9 files changed, 148 insertions(+), 100 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
index 87f8d36..6db887b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
@@ -112,7 +112,7 @@ public class PerMessageUnAcknowledgedRedeliveryTest extends 
BrokerTestBase {
         assertEquals(received, 5);
 
         // 7. Simulate ackTimeout
-        ((ConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().toggle();
+        Thread.sleep(ackTimeOutMillis);
 
         // 8. producer publish more messages
         for (int i = 0; i < totalMessages / 3; i++) {
@@ -210,7 +210,7 @@ public class PerMessageUnAcknowledgedRedeliveryTest extends 
BrokerTestBase {
         assertEquals(received, 5);
 
         // 7. Simulate ackTimeout
-        ((ConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().toggle();
+        Thread.sleep(ackTimeOutMillis);
 
         // 8. producer publish more messages
         for (int i = 0; i < totalMessages / 3; i++) {
@@ -308,7 +308,7 @@ public class PerMessageUnAcknowledgedRedeliveryTest extends 
BrokerTestBase {
         assertEquals(received, 5);
 
         // 7. Simulate ackTimeout
-        ((ConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().toggle();
+        Thread.sleep(ackTimeOutMillis);
 
         // 8. producer publish more messages
         for (int i = 0; i < totalMessages / 3; i++) {
@@ -415,8 +415,7 @@ public class PerMessageUnAcknowledgedRedeliveryTest extends 
BrokerTestBase {
         assertEquals(received, 5);
 
         // 7. Simulate ackTimeout
-        ((MultiTopicsConsumerImpl<byte[]>) 
consumer).getUnAckedMessageTracker().toggle();
-        ((MultiTopicsConsumerImpl<byte[]>) consumer).getConsumers().forEach(c 
-> c.getUnAckedMessageTracker().toggle());
+        Thread.sleep(ackTimeOutMillis);
 
         // 8. producer publish more messages
         for (int i = 0; i < totalMessages / 3; i++) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index 55be9ec..b791033 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -381,8 +381,7 @@ public class TopicsConsumerImplTest extends 
ProducerConsumerBase {
         assertEquals(received, totalMessages);
 
         // 8. Simulate ackTimeout
-        ((MultiTopicsConsumerImpl<byte[]>) 
consumer).getUnAckedMessageTracker().toggle();
-        ((MultiTopicsConsumerImpl<byte[]>) consumer).getConsumers().forEach(c 
-> c.getUnAckedMessageTracker().toggle());
+        Thread.sleep(ackTimeOutMillis);
 
         // 9. producer publish more messages
         for (int i = 0; i < totalMessages / 3; i++) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
index 5ffba38..4539d6d 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
@@ -20,13 +20,13 @@ package org.apache.pulsar.client.api;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 
 import java.io.Serializable;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
-import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+
 /**
  * Class specifying the configuration of a consumer. In Exclusive 
subscription, only a single consumer is allowed to
  * attach to the subscription. Other consumers will get an error message. In 
Shared subscription, multiple consumers
@@ -60,6 +60,10 @@ public class ConsumerConfiguration implements Serializable {
         return conf.getAckTimeoutMillis();
     }
 
+    public long getTickDurationMillis() {
+        return conf.getTickDurationMillis();
+    }
+
     /**
      * Set the timeout for unacked messages, truncated to the nearest 
millisecond. The timeout needs to be greater than
      * 10 seconds.
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 1060460..5792188 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -49,7 +49,6 @@ import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.
 import org.apache.pulsar.common.util.FutureUtil;
 
 import com.google.common.collect.Lists;
-
 import lombok.NonNull;
 
 public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 3e2d4d6..df65361 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -27,7 +27,6 @@ import static 
org.apache.pulsar.common.api.Commands.readChecksum;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.util.Timeout;
 
@@ -52,11 +51,11 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
 import org.apache.pulsar.client.api.ConsumerStats;
+import org.apache.pulsar.client.api.DeadLetterPolicy;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
@@ -66,7 +65,6 @@ import 
org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.common.api.Commands;
-import org.apache.pulsar.client.api.DeadLetterPolicy;
 import org.apache.pulsar.common.api.EncryptionContext;
 import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
 import org.apache.pulsar.common.api.PulsarDecoder;
@@ -188,7 +186,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         }
 
         if (conf.getAckTimeoutMillis() != 0) {
-            this.unAckedMessageTracker = new UnAckedMessageTracker(client, 
this, conf.getAckTimeoutMillis());
+            if (conf.getTickDurationMillis() > 0) {
+                this.unAckedMessageTracker = new UnAckedMessageTracker(client, 
this, conf.getAckTimeoutMillis(), conf.getTickDurationMillis());
+            } else {
+                this.unAckedMessageTracker = new UnAckedMessageTracker(client, 
this, conf.getAckTimeoutMillis());
+            }
         } else {
             this.unAckedMessageTracker = 
UnAckedMessageTracker.UNACKED_MESSAGE_TRACKER_DISABLED;
         }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 41821ad..e9ef632 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -99,7 +99,11 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         this.allTopicPartitionsNumber = new AtomicInteger(0);
 
         if (conf.getAckTimeoutMillis() != 0) {
-            this.unAckedMessageTracker = new 
UnAckedTopicMessageTracker(client, this, conf.getAckTimeoutMillis());
+            if (conf.getTickDurationMillis() > 0) {
+                this.unAckedMessageTracker = new 
UnAckedTopicMessageTracker(client, this, conf.getAckTimeoutMillis(), 
conf.getTickDurationMillis());
+            } else {
+                this.unAckedMessageTracker = new 
UnAckedTopicMessageTracker(client, this, conf.getAckTimeoutMillis());
+            }
         } else {
             this.unAckedMessageTracker = 
UnAckedMessageTracker.UNACKED_MESSAGE_TRACKER_DISABLED;
         }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
index 266eb3b..d31d353 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
@@ -18,29 +18,36 @@
  */
 package org.apache.pulsar.client.impl;
 
+import com.google.common.base.Preconditions;
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.Closeable;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class UnAckedMessageTracker implements Closeable {
     private static final Logger log = 
LoggerFactory.getLogger(UnAckedMessageTracker.class);
-    protected ConcurrentOpenHashSet<MessageId> currentSet;
-    protected ConcurrentOpenHashSet<MessageId> oldOpenSet;
-    private final ReentrantReadWriteLock readWriteLock;
+
+    protected final ConcurrentHashMap<MessageId, 
ConcurrentOpenHashSet<MessageId>> messageIdPartitionMap;
+    protected final LinkedList<ConcurrentOpenHashSet<MessageId>> 
timePartitions;
+
     protected final Lock readLock;
-    private final Lock writeLock;
-    private Timeout timeout;
+    protected final Lock writeLock;
 
     public static final UnAckedMessageTrackerDisabled 
UNACKED_MESSAGE_TRACKER_DISABLED = new UnAckedMessageTrackerDisabled();
+    private final long ackTimeoutMillis;
+    private final long tickDurationInMs;
 
     private static class UnAckedMessageTrackerDisabled extends 
UnAckedMessageTracker {
         @Override
@@ -67,116 +74,138 @@ public class UnAckedMessageTracker implements Closeable {
         }
     }
 
+    private Timeout timeout;
+
     public UnAckedMessageTracker() {
-        readWriteLock = null;
         readLock = null;
         writeLock = null;
+        timePartitions = null;
+        messageIdPartitionMap = null;
+        this.ackTimeoutMillis = 0;
+        this.tickDurationInMs = 0;
     }
 
     public UnAckedMessageTracker(PulsarClientImpl client, ConsumerBase<?> 
consumerBase, long ackTimeoutMillis) {
-        currentSet = new ConcurrentOpenHashSet<MessageId>();
-        oldOpenSet = new ConcurrentOpenHashSet<MessageId>();
-        readWriteLock = new ReentrantReadWriteLock();
-        readLock = readWriteLock.readLock();
-        writeLock = readWriteLock.writeLock();
-        start(client, consumerBase, ackTimeoutMillis);
+        this(client, consumerBase, ackTimeoutMillis, ackTimeoutMillis);
     }
 
-    public void start(PulsarClientImpl client, ConsumerBase<?> consumerBase, 
long ackTimeoutMillis) {
-        this.stop();
+    public UnAckedMessageTracker(PulsarClientImpl client, ConsumerBase<?> 
consumerBase, long ackTimeoutMillis, long tickDurationInMs) {
+        Preconditions.checkArgument(tickDurationInMs > 0 && ackTimeoutMillis 
>= tickDurationInMs);
+        this.ackTimeoutMillis = ackTimeoutMillis;
+        this.tickDurationInMs = tickDurationInMs;
+        ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+        this.readLock = readWriteLock.readLock();
+        this.writeLock = readWriteLock.writeLock();
+        this.messageIdPartitionMap = new ConcurrentHashMap<>();
+        this.timePartitions = new LinkedList<>();
+
+        int blankPartitions = (int)Math.ceil((double)this.ackTimeoutMillis / 
this.tickDurationInMs);
+        for (int i = 0; i < blankPartitions + 1; i++) {
+            timePartitions.add(new ConcurrentOpenHashSet<>());
+        }
+
         timeout = client.timer().newTimeout(new TimerTask() {
             @Override
             public void run(Timeout t) throws Exception {
-                if (isAckTimeout()) {
-                    log.warn("[{}] {} messages have timed-out", consumerBase, 
oldOpenSet.size());
-                    Set<MessageId> messageIds = new HashSet<>();
-                    oldOpenSet.forEach(messageIds::add);
-                    oldOpenSet.clear();
+                Set<MessageId> messageIds = new HashSet<>();
+                writeLock.lock();
+                try {
+                    timePartitions.addLast(new ConcurrentOpenHashSet<>());
+                    ConcurrentOpenHashSet<MessageId> headPartition = 
timePartitions.removeFirst();
+                    if (!headPartition.isEmpty()) {
+                        log.warn("[{}] {} messages have timed-out", 
consumerBase, timePartitions.size());
+                        headPartition.forEach(messageId -> {
+                            messageIds.add(messageId);
+                            messageIdPartitionMap.remove(messageId);
+                        });
+                    }
+                } finally {
+                    writeLock.unlock();
+                }
+                if (messageIds.size() > 0) {
                     consumerBase.redeliverUnacknowledgedMessages(messageIds);
                 }
-                toggle();
-                timeout = client.timer().newTimeout(this, ackTimeoutMillis, 
TimeUnit.MILLISECONDS);
+                timeout = client.timer().newTimeout(this, tickDurationInMs, 
TimeUnit.MILLISECONDS);
             }
-        }, ackTimeoutMillis, TimeUnit.MILLISECONDS);
+        }, this.tickDurationInMs, TimeUnit.MILLISECONDS);
     }
 
-    void toggle() {
+    public void clear() {
         writeLock.lock();
         try {
-            ConcurrentOpenHashSet<MessageId> temp = currentSet;
-            currentSet = oldOpenSet;
-            oldOpenSet = temp;
+            messageIdPartitionMap.clear();
+            timePartitions.clear();
+            int blankPartitions = (int)Math.ceil((double)ackTimeoutMillis / 
tickDurationInMs);
+            for (int i = 0; i < blankPartitions + 1; i++) {
+                timePartitions.add(new ConcurrentOpenHashSet<>());
+            }
         } finally {
             writeLock.unlock();
         }
     }
 
-    public void clear() {
-        readLock.lock();
-        try {
-            currentSet.clear();
-            oldOpenSet.clear();
-        } finally {
-            readLock.unlock();
-        }
-    }
-
-    public boolean add(MessageId m) {
-        readLock.lock();
+    public boolean add(MessageId messageId) {
+        writeLock.lock();
         try {
-            oldOpenSet.remove(m);
-            return currentSet.add(m);
+            ConcurrentOpenHashSet<MessageId> partition = 
timePartitions.peekLast();
+            messageIdPartitionMap.put(messageId, partition);
+            return partition.add(messageId);
         } finally {
-            readLock.unlock();
+            writeLock.unlock();
         }
-
     }
 
     boolean isEmpty() {
         readLock.lock();
         try {
-            return currentSet.isEmpty() && oldOpenSet.isEmpty();
+            return messageIdPartitionMap.isEmpty();
         } finally {
             readLock.unlock();
         }
     }
 
-    public boolean remove(MessageId m) {
-        readLock.lock();
+    public boolean remove(MessageId messageId) {
+        writeLock.lock();
         try {
-            return currentSet.remove(m) || oldOpenSet.remove(m);
+            boolean removed = false;
+            ConcurrentOpenHashSet<MessageId> exist = 
messageIdPartitionMap.remove(messageId);
+            if (exist != null) {
+                removed = exist.remove(messageId);
+            }
+            return removed;
         } finally {
-            readLock.unlock();
+            writeLock.unlock();
         }
     }
 
     long size() {
         readLock.lock();
         try {
-            return currentSet.size() + oldOpenSet.size();
-        } finally {
-            readLock.unlock();
-        }
-    }
-
-    private boolean isAckTimeout() {
-        readLock.lock();
-        try {
-            return !oldOpenSet.isEmpty();
+            return messageIdPartitionMap.size();
         } finally {
             readLock.unlock();
         }
     }
 
     public int removeMessagesTill(MessageId msgId) {
-        readLock.lock();
+        writeLock.lock();
         try {
-            int currentSetRemovedMsgCount = currentSet.removeIf(m -> 
(m.compareTo(msgId) <= 0));
-            int oldSetRemovedMsgCount = oldOpenSet.removeIf(m -> 
(m.compareTo(msgId) <= 0));
-
-            return currentSetRemovedMsgCount + oldSetRemovedMsgCount;
+            int removed = 0;
+            Iterator<MessageId> iterator = 
messageIdPartitionMap.keySet().iterator();
+            while (iterator.hasNext()) {
+                MessageId messageId = iterator.next();
+                if (messageId.compareTo(msgId) <= 0) {
+                    ConcurrentOpenHashSet<MessageId> exist = 
messageIdPartitionMap.get(messageId);
+                    if (exist != null) {
+                        exist.remove(messageId);
+                    }
+                    iterator.remove();
+                    removed ++;
+                }
+            }
+            return removed;
         } finally {
-            readLock.unlock();
+            writeLock.unlock();
         }
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java
index f500fda..afe9a5e 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java
@@ -18,7 +18,10 @@
  */
 package org.apache.pulsar.client.impl;
 
-import static com.google.common.base.Preconditions.checkState;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
+
+import java.util.Iterator;
 
 public class UnAckedTopicMessageTracker extends UnAckedMessageTracker {
 
@@ -26,23 +29,30 @@ public class UnAckedTopicMessageTracker extends 
UnAckedMessageTracker {
         super(client, consumerBase, ackTimeoutMillis);
     }
 
+    public UnAckedTopicMessageTracker(PulsarClientImpl client, ConsumerBase<?> 
consumerBase, long ackTimeoutMillis, long tickDurationMillis) {
+        super(client, consumerBase, ackTimeoutMillis, tickDurationMillis);
+    }
+
     public int removeTopicMessages(String topicName) {
-        readLock.lock();
+        writeLock.lock();
         try {
-            int currentSetRemovedMsgCount = currentSet.removeIf(m -> {
-                checkState(m instanceof TopicMessageIdImpl,
-                    "message should be of type TopicMessageIdImpl");
-                return 
((TopicMessageIdImpl)m).getTopicPartitionName().contains(topicName);
-            });
-            int oldSetRemovedMsgCount = oldOpenSet.removeIf(m -> {
-                checkState(m instanceof TopicMessageIdImpl,
-                    "message should be of type TopicMessageIdImpl");
-                return 
((TopicMessageIdImpl)m).getTopicPartitionName().contains(topicName);
-            });
-
-            return currentSetRemovedMsgCount + oldSetRemovedMsgCount;
+            int removed = 0;
+            Iterator<MessageId> iterator = 
messageIdPartitionMap.keySet().iterator();
+            while (iterator.hasNext()) {
+                MessageId messageId = iterator.next();
+                if (messageId instanceof TopicMessageIdImpl &&
+                        
((TopicMessageIdImpl)messageId).getTopicPartitionName().contains(topicName)) {
+                    ConcurrentOpenHashSet<MessageId> exist = 
messageIdPartitionMap.get(messageId);
+                    if (exist != null) {
+                        exist.remove(messageId);
+                    }
+                    iterator.remove();
+                    removed ++;
+                }
+            }
+            return removed;
         } finally {
-            readLock.unlock();
+            writeLock.unlock();
         }
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index 7e92d12..2c56b61 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -23,22 +23,22 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+
 import java.io.Serializable;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
 
 import lombok.Data;
-
-import java.util.regex.Pattern;
 import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
 import org.apache.pulsar.client.api.ConsumerEventListener;
 import org.apache.pulsar.client.api.CryptoKeyReader;
 import org.apache.pulsar.client.api.DeadLetterPolicy;
 import org.apache.pulsar.client.api.MessageListener;
-import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode;
 
 @Data
@@ -69,6 +69,8 @@ public class ConsumerConfigurationData<T> implements 
Serializable, Cloneable {
 
     private long ackTimeoutMillis = 0;
 
+    private long tickDurationMillis = 1000;
+
     private int priorityLevel = 0;
 
     @JsonIgnore

Reply via email to