This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new db26946 UnAcked message tracker based on TimePartition (#3118)
db26946 is described below
commit db26946520d3fe2c66bc0bc7c87815a8ef3754f4
Author: penghui <[email protected]>
AuthorDate: Sat Dec 22 22:04:48 2018 +0800
UnAcked message tracker based on TimePartition (#3118)
### Motivation
currently un-ack message tracker redelivery is done in batch, not taking
into account the actual arrival time of the message.
Split acktimeout into several time-partition, user can define the tick
duration.
### Modifications
Improve UnAckedMessageTracker use time partitions to maintain un-acked
messages, UnAckedMessageTracker main partitions by LinkedList, remove first
partition from the LinkedList and process it and add last partition to the
LinkedList when timeout. All un-acked messages add into the last partition.
### Result
UT passed
---
.../PerMessageUnAcknowledgedRedeliveryTest.java | 9 +-
.../pulsar/client/impl/TopicsConsumerImplTest.java | 3 +-
.../pulsar/client/api/ConsumerConfiguration.java | 8 +-
.../pulsar/client/impl/ConsumerBuilderImpl.java | 1 -
.../apache/pulsar/client/impl/ConsumerImpl.java | 10 +-
.../client/impl/MultiTopicsConsumerImpl.java | 6 +-
.../pulsar/client/impl/UnAckedMessageTracker.java | 163 ++++++++++++---------
.../client/impl/UnAckedTopicMessageTracker.java | 40 +++--
.../impl/conf/ConsumerConfigurationData.java | 8 +-
9 files changed, 148 insertions(+), 100 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
index 87f8d36..6db887b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
@@ -112,7 +112,7 @@ public class PerMessageUnAcknowledgedRedeliveryTest extends
BrokerTestBase {
assertEquals(received, 5);
// 7. Simulate ackTimeout
- ((ConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().toggle();
+ Thread.sleep(ackTimeOutMillis);
// 8. producer publish more messages
for (int i = 0; i < totalMessages / 3; i++) {
@@ -210,7 +210,7 @@ public class PerMessageUnAcknowledgedRedeliveryTest extends
BrokerTestBase {
assertEquals(received, 5);
// 7. Simulate ackTimeout
- ((ConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().toggle();
+ Thread.sleep(ackTimeOutMillis);
// 8. producer publish more messages
for (int i = 0; i < totalMessages / 3; i++) {
@@ -308,7 +308,7 @@ public class PerMessageUnAcknowledgedRedeliveryTest extends
BrokerTestBase {
assertEquals(received, 5);
// 7. Simulate ackTimeout
- ((ConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().toggle();
+ Thread.sleep(ackTimeOutMillis);
// 8. producer publish more messages
for (int i = 0; i < totalMessages / 3; i++) {
@@ -415,8 +415,7 @@ public class PerMessageUnAcknowledgedRedeliveryTest extends
BrokerTestBase {
assertEquals(received, 5);
// 7. Simulate ackTimeout
- ((MultiTopicsConsumerImpl<byte[]>)
consumer).getUnAckedMessageTracker().toggle();
- ((MultiTopicsConsumerImpl<byte[]>) consumer).getConsumers().forEach(c
-> c.getUnAckedMessageTracker().toggle());
+ Thread.sleep(ackTimeOutMillis);
// 8. producer publish more messages
for (int i = 0; i < totalMessages / 3; i++) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index 55be9ec..b791033 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -381,8 +381,7 @@ public class TopicsConsumerImplTest extends
ProducerConsumerBase {
assertEquals(received, totalMessages);
// 8. Simulate ackTimeout
- ((MultiTopicsConsumerImpl<byte[]>)
consumer).getUnAckedMessageTracker().toggle();
- ((MultiTopicsConsumerImpl<byte[]>) consumer).getConsumers().forEach(c
-> c.getUnAckedMessageTracker().toggle());
+ Thread.sleep(ackTimeOutMillis);
// 9. producer publish more messages
for (int i = 0; i < totalMessages / 3; i++) {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
index 5ffba38..4539d6d 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
@@ -20,13 +20,13 @@ package org.apache.pulsar.client.api;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
-import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+
/**
* Class specifying the configuration of a consumer. In Exclusive
subscription, only a single consumer is allowed to
* attach to the subscription. Other consumers will get an error message. In
Shared subscription, multiple consumers
@@ -60,6 +60,10 @@ public class ConsumerConfiguration implements Serializable {
return conf.getAckTimeoutMillis();
}
+ public long getTickDurationMillis() {
+ return conf.getTickDurationMillis();
+ }
+
/**
* Set the timeout for unacked messages, truncated to the nearest
millisecond. The timeout needs to be greater than
* 10 seconds.
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 1060460..5792188 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -49,7 +49,6 @@ import
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.
import org.apache.pulsar.common.util.FutureUtil;
import com.google.common.collect.Lists;
-
import lombok.NonNull;
public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 3e2d4d6..df65361 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -27,7 +27,6 @@ import static
org.apache.pulsar.common.api.Commands.readChecksum;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
-
import io.netty.buffer.ByteBuf;
import io.netty.util.Timeout;
@@ -52,11 +51,11 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.ConsumerStats;
+import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
@@ -66,7 +65,6 @@ import
org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.common.api.Commands;
-import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
import org.apache.pulsar.common.api.PulsarDecoder;
@@ -188,7 +186,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
if (conf.getAckTimeoutMillis() != 0) {
- this.unAckedMessageTracker = new UnAckedMessageTracker(client,
this, conf.getAckTimeoutMillis());
+ if (conf.getTickDurationMillis() > 0) {
+ this.unAckedMessageTracker = new UnAckedMessageTracker(client,
this, conf.getAckTimeoutMillis(), conf.getTickDurationMillis());
+ } else {
+ this.unAckedMessageTracker = new UnAckedMessageTracker(client,
this, conf.getAckTimeoutMillis());
+ }
} else {
this.unAckedMessageTracker =
UnAckedMessageTracker.UNACKED_MESSAGE_TRACKER_DISABLED;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 41821ad..e9ef632 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -99,7 +99,11 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
this.allTopicPartitionsNumber = new AtomicInteger(0);
if (conf.getAckTimeoutMillis() != 0) {
- this.unAckedMessageTracker = new
UnAckedTopicMessageTracker(client, this, conf.getAckTimeoutMillis());
+ if (conf.getTickDurationMillis() > 0) {
+ this.unAckedMessageTracker = new
UnAckedTopicMessageTracker(client, this, conf.getAckTimeoutMillis(),
conf.getTickDurationMillis());
+ } else {
+ this.unAckedMessageTracker = new
UnAckedTopicMessageTracker(client, this, conf.getAckTimeoutMillis());
+ }
} else {
this.unAckedMessageTracker =
UnAckedMessageTracker.UNACKED_MESSAGE_TRACKER_DISABLED;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
index 266eb3b..d31d353 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
@@ -18,29 +18,36 @@
*/
package org.apache.pulsar.client.impl;
+import com.google.common.base.Preconditions;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.Closeable;
import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class UnAckedMessageTracker implements Closeable {
private static final Logger log =
LoggerFactory.getLogger(UnAckedMessageTracker.class);
- protected ConcurrentOpenHashSet<MessageId> currentSet;
- protected ConcurrentOpenHashSet<MessageId> oldOpenSet;
- private final ReentrantReadWriteLock readWriteLock;
+
+ protected final ConcurrentHashMap<MessageId,
ConcurrentOpenHashSet<MessageId>> messageIdPartitionMap;
+ protected final LinkedList<ConcurrentOpenHashSet<MessageId>>
timePartitions;
+
protected final Lock readLock;
- private final Lock writeLock;
- private Timeout timeout;
+ protected final Lock writeLock;
public static final UnAckedMessageTrackerDisabled
UNACKED_MESSAGE_TRACKER_DISABLED = new UnAckedMessageTrackerDisabled();
+ private final long ackTimeoutMillis;
+ private final long tickDurationInMs;
private static class UnAckedMessageTrackerDisabled extends
UnAckedMessageTracker {
@Override
@@ -67,116 +74,138 @@ public class UnAckedMessageTracker implements Closeable {
}
}
+ private Timeout timeout;
+
public UnAckedMessageTracker() {
- readWriteLock = null;
readLock = null;
writeLock = null;
+ timePartitions = null;
+ messageIdPartitionMap = null;
+ this.ackTimeoutMillis = 0;
+ this.tickDurationInMs = 0;
}
public UnAckedMessageTracker(PulsarClientImpl client, ConsumerBase<?>
consumerBase, long ackTimeoutMillis) {
- currentSet = new ConcurrentOpenHashSet<MessageId>();
- oldOpenSet = new ConcurrentOpenHashSet<MessageId>();
- readWriteLock = new ReentrantReadWriteLock();
- readLock = readWriteLock.readLock();
- writeLock = readWriteLock.writeLock();
- start(client, consumerBase, ackTimeoutMillis);
+ this(client, consumerBase, ackTimeoutMillis, ackTimeoutMillis);
}
- public void start(PulsarClientImpl client, ConsumerBase<?> consumerBase,
long ackTimeoutMillis) {
- this.stop();
+ public UnAckedMessageTracker(PulsarClientImpl client, ConsumerBase<?>
consumerBase, long ackTimeoutMillis, long tickDurationInMs) {
+ Preconditions.checkArgument(tickDurationInMs > 0 && ackTimeoutMillis
>= tickDurationInMs);
+ this.ackTimeoutMillis = ackTimeoutMillis;
+ this.tickDurationInMs = tickDurationInMs;
+ ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+ this.readLock = readWriteLock.readLock();
+ this.writeLock = readWriteLock.writeLock();
+ this.messageIdPartitionMap = new ConcurrentHashMap<>();
+ this.timePartitions = new LinkedList<>();
+
+ int blankPartitions = (int)Math.ceil((double)this.ackTimeoutMillis /
this.tickDurationInMs);
+ for (int i = 0; i < blankPartitions + 1; i++) {
+ timePartitions.add(new ConcurrentOpenHashSet<>());
+ }
+
timeout = client.timer().newTimeout(new TimerTask() {
@Override
public void run(Timeout t) throws Exception {
- if (isAckTimeout()) {
- log.warn("[{}] {} messages have timed-out", consumerBase,
oldOpenSet.size());
- Set<MessageId> messageIds = new HashSet<>();
- oldOpenSet.forEach(messageIds::add);
- oldOpenSet.clear();
+ Set<MessageId> messageIds = new HashSet<>();
+ writeLock.lock();
+ try {
+ timePartitions.addLast(new ConcurrentOpenHashSet<>());
+ ConcurrentOpenHashSet<MessageId> headPartition =
timePartitions.removeFirst();
+ if (!headPartition.isEmpty()) {
+ log.warn("[{}] {} messages have timed-out",
consumerBase, timePartitions.size());
+ headPartition.forEach(messageId -> {
+ messageIds.add(messageId);
+ messageIdPartitionMap.remove(messageId);
+ });
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ if (messageIds.size() > 0) {
consumerBase.redeliverUnacknowledgedMessages(messageIds);
}
- toggle();
- timeout = client.timer().newTimeout(this, ackTimeoutMillis,
TimeUnit.MILLISECONDS);
+ timeout = client.timer().newTimeout(this, tickDurationInMs,
TimeUnit.MILLISECONDS);
}
- }, ackTimeoutMillis, TimeUnit.MILLISECONDS);
+ }, this.tickDurationInMs, TimeUnit.MILLISECONDS);
}
- void toggle() {
+ public void clear() {
writeLock.lock();
try {
- ConcurrentOpenHashSet<MessageId> temp = currentSet;
- currentSet = oldOpenSet;
- oldOpenSet = temp;
+ messageIdPartitionMap.clear();
+ timePartitions.clear();
+ int blankPartitions = (int)Math.ceil((double)ackTimeoutMillis /
tickDurationInMs);
+ for (int i = 0; i < blankPartitions + 1; i++) {
+ timePartitions.add(new ConcurrentOpenHashSet<>());
+ }
} finally {
writeLock.unlock();
}
}
- public void clear() {
- readLock.lock();
- try {
- currentSet.clear();
- oldOpenSet.clear();
- } finally {
- readLock.unlock();
- }
- }
-
- public boolean add(MessageId m) {
- readLock.lock();
+ public boolean add(MessageId messageId) {
+ writeLock.lock();
try {
- oldOpenSet.remove(m);
- return currentSet.add(m);
+ ConcurrentOpenHashSet<MessageId> partition =
timePartitions.peekLast();
+ messageIdPartitionMap.put(messageId, partition);
+ return partition.add(messageId);
} finally {
- readLock.unlock();
+ writeLock.unlock();
}
-
}
boolean isEmpty() {
readLock.lock();
try {
- return currentSet.isEmpty() && oldOpenSet.isEmpty();
+ return messageIdPartitionMap.isEmpty();
} finally {
readLock.unlock();
}
}
- public boolean remove(MessageId m) {
- readLock.lock();
+ public boolean remove(MessageId messageId) {
+ writeLock.lock();
try {
- return currentSet.remove(m) || oldOpenSet.remove(m);
+ boolean removed = false;
+ ConcurrentOpenHashSet<MessageId> exist =
messageIdPartitionMap.remove(messageId);
+ if (exist != null) {
+ removed = exist.remove(messageId);
+ }
+ return removed;
} finally {
- readLock.unlock();
+ writeLock.unlock();
}
}
long size() {
readLock.lock();
try {
- return currentSet.size() + oldOpenSet.size();
- } finally {
- readLock.unlock();
- }
- }
-
- private boolean isAckTimeout() {
- readLock.lock();
- try {
- return !oldOpenSet.isEmpty();
+ return messageIdPartitionMap.size();
} finally {
readLock.unlock();
}
}
public int removeMessagesTill(MessageId msgId) {
- readLock.lock();
+ writeLock.lock();
try {
- int currentSetRemovedMsgCount = currentSet.removeIf(m ->
(m.compareTo(msgId) <= 0));
- int oldSetRemovedMsgCount = oldOpenSet.removeIf(m ->
(m.compareTo(msgId) <= 0));
-
- return currentSetRemovedMsgCount + oldSetRemovedMsgCount;
+ int removed = 0;
+ Iterator<MessageId> iterator =
messageIdPartitionMap.keySet().iterator();
+ while (iterator.hasNext()) {
+ MessageId messageId = iterator.next();
+ if (messageId.compareTo(msgId) <= 0) {
+ ConcurrentOpenHashSet<MessageId> exist =
messageIdPartitionMap.get(messageId);
+ if (exist != null) {
+ exist.remove(messageId);
+ }
+ iterator.remove();
+ removed ++;
+ }
+ }
+ return removed;
} finally {
- readLock.unlock();
+ writeLock.unlock();
}
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java
index f500fda..afe9a5e 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java
@@ -18,7 +18,10 @@
*/
package org.apache.pulsar.client.impl;
-import static com.google.common.base.Preconditions.checkState;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
+
+import java.util.Iterator;
public class UnAckedTopicMessageTracker extends UnAckedMessageTracker {
@@ -26,23 +29,30 @@ public class UnAckedTopicMessageTracker extends
UnAckedMessageTracker {
super(client, consumerBase, ackTimeoutMillis);
}
+ public UnAckedTopicMessageTracker(PulsarClientImpl client, ConsumerBase<?>
consumerBase, long ackTimeoutMillis, long tickDurationMillis) {
+ super(client, consumerBase, ackTimeoutMillis, tickDurationMillis);
+ }
+
public int removeTopicMessages(String topicName) {
- readLock.lock();
+ writeLock.lock();
try {
- int currentSetRemovedMsgCount = currentSet.removeIf(m -> {
- checkState(m instanceof TopicMessageIdImpl,
- "message should be of type TopicMessageIdImpl");
- return
((TopicMessageIdImpl)m).getTopicPartitionName().contains(topicName);
- });
- int oldSetRemovedMsgCount = oldOpenSet.removeIf(m -> {
- checkState(m instanceof TopicMessageIdImpl,
- "message should be of type TopicMessageIdImpl");
- return
((TopicMessageIdImpl)m).getTopicPartitionName().contains(topicName);
- });
-
- return currentSetRemovedMsgCount + oldSetRemovedMsgCount;
+ int removed = 0;
+ Iterator<MessageId> iterator =
messageIdPartitionMap.keySet().iterator();
+ while (iterator.hasNext()) {
+ MessageId messageId = iterator.next();
+ if (messageId instanceof TopicMessageIdImpl &&
+
((TopicMessageIdImpl)messageId).getTopicPartitionName().contains(topicName)) {
+ ConcurrentOpenHashSet<MessageId> exist =
messageIdPartitionMap.get(messageId);
+ if (exist != null) {
+ exist.remove(messageId);
+ }
+ iterator.remove();
+ removed ++;
+ }
+ }
+ return removed;
} finally {
- readLock.unlock();
+ writeLock.unlock();
}
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index 7e92d12..2c56b61 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -23,22 +23,22 @@ import static
com.google.common.base.Preconditions.checkArgument;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+
import java.io.Serializable;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
import lombok.Data;
-
-import java.util.regex.Pattern;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.ConsumerEventListener;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.MessageListener;
-import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
import
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode;
@Data
@@ -69,6 +69,8 @@ public class ConsumerConfigurationData<T> implements
Serializable, Cloneable {
private long ackTimeoutMillis = 0;
+ private long tickDurationMillis = 1000;
+
private int priorityLevel = 0;
@JsonIgnore