This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d79cd04 Add negative ack redelivery backoff. (#12566)
d79cd04 is described below
commit d79cd0479eabebef2ce72eca1330af103115f67f
Author: hanmz <[email protected]>
AuthorDate: Thu Nov 4 14:15:17 2021 +0800
Add negative ack redelivery backoff. (#12566)
### Motivation
Add negative ack redelivery backoff.
### Modifications
- add new `NegativeAckBackoff` interface
- expose `egativeAckRedeliveryBackoff` in ConsumerBulider
- add unit test case
---
.../pulsar/client/impl/NegativeAcksTest.java | 102 +++++++++++++++++++++
.../pulsar/client/api/ConsumerConfiguration.java | 17 ++++
.../apache/pulsar/client/api/ConsumerBuilder.java | 14 +++
.../client/api/NegativeAckRedeliveryBackoff.java | 40 ++++++++
.../pulsar/client/impl/ConsumerBuilderImpl.java | 8 ++
.../apache/pulsar/client/impl/ConsumerImpl.java | 8 ++
.../client/impl/MultiTopicsConsumerImpl.java | 10 ++
.../NegativeAckRedeliveryExponentialBackoff.java | 94 +++++++++++++++++++
.../pulsar/client/impl/NegativeAcksTracker.java | 46 +++++++++-
.../impl/conf/ConsumerConfigurationData.java | 4 +
.../api/NegativeAckRedeliveryBackoffTest.java | 55 +++++++++++
.../client/impl/ConsumerBuilderImplTest.java | 8 ++
12 files changed, 405 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
index 5eb43af..638a969 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
@@ -31,6 +31,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.NegativeAckRedeliveryBackoff;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
@@ -154,4 +155,105 @@ public class NegativeAcksTest extends
ProducerConsumerBase {
consumer.close();
producer.close();
}
+
+ @DataProvider(name = "variationsBackoff")
+ public static Object[][] variationsBackoff() {
+ return new Object[][] {
+ // batching / partitions / subscription-type /
min-nack-time-ms/ max-nack-time-ms / ack-timeout
+ { false, false, SubscriptionType.Shared, 100, 1000, 0 },
+ { false, false, SubscriptionType.Failover, 100, 1000, 0 },
+ { false, true, SubscriptionType.Shared, 100, 1000, 0 },
+ { false, true, SubscriptionType.Failover, 100, 1000, 0 },
+ { true, false, SubscriptionType.Shared, 100, 1000, 0 },
+ { true, false, SubscriptionType.Failover, 100, 1000, 0 },
+ { true, true, SubscriptionType.Shared, 100, 1000, 0 },
+ { true, true, SubscriptionType.Failover, 100, 1000, 0 },
+
+ { false, false, SubscriptionType.Shared, 0, 1000, 0 },
+ { false, false, SubscriptionType.Failover, 0, 1000, 0 },
+ { false, true, SubscriptionType.Shared, 0, 1000, 0 },
+ { false, true, SubscriptionType.Failover, 0, 1000, 0 },
+ { true, false, SubscriptionType.Shared, 0, 1000, 0 },
+ { true, false, SubscriptionType.Failover, 0, 1000, 0 },
+ { true, true, SubscriptionType.Shared, 0, 1000, 0 },
+ { true, true, SubscriptionType.Failover, 0, 1000, 0 },
+
+ { false, false, SubscriptionType.Shared, 100, 1000, 1000 },
+ { false, false, SubscriptionType.Failover, 100, 1000, 1000 },
+ { false, true, SubscriptionType.Shared, 100, 1000, 1000 },
+ { false, true, SubscriptionType.Failover, 100, 1000, 1000 },
+ { true, false, SubscriptionType.Shared, 100, 1000, 1000 },
+ { true, false, SubscriptionType.Failover, 100, 1000, 1000 },
+ { true, true, SubscriptionType.Shared, 100, 1000, 1000 },
+ { true, true, SubscriptionType.Failover, 100, 1000, 1000 },
+
+ { false, false, SubscriptionType.Shared, 0, 1000, 1000 },
+ { false, false, SubscriptionType.Failover, 0, 1000, 1000 },
+ { false, true, SubscriptionType.Shared, 0, 1000, 1000 },
+ { false, true, SubscriptionType.Failover, 0, 1000, 1000 },
+ { true, false, SubscriptionType.Shared, 0, 1000, 1000 },
+ { true, false, SubscriptionType.Failover, 0, 1000, 1000 },
+ { true, true, SubscriptionType.Shared, 0, 1000, 1000 },
+ { true, true, SubscriptionType.Failover, 0, 1000, 1000 },
+ };
+ }
+
+ @Test(dataProvider = "variationsBackoff")
+ public void testNegativeAcksWithBackoff(boolean batching, boolean
usePartitions, SubscriptionType subscriptionType,
+ int minNackTimeMs, int maxNackTimeMs, int ackTimeout)
+ throws Exception {
+ log.info("Test negative acks with back off batching={} partitions={}
subType={} minNackTimeMs={}, "
+ + "maxNackTimeMs={}", batching, usePartitions,
subscriptionType, minNackTimeMs, maxNackTimeMs);
+ String topic =
BrokerTestUtil.newUniqueName("testNegativeAcksWithBackoff");
+
+ @Cleanup
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("sub1")
+ .acknowledgmentGroupTime(0, TimeUnit.SECONDS)
+ .subscriptionType(subscriptionType)
+
.negativeAckRedeliveryBackoff(NegativeAckRedeliveryExponentialBackoff.builder()
+ .minNackTimeMs(minNackTimeMs)
+ .maxNackTimeMs(maxNackTimeMs)
+ .build())
+ .ackTimeout(ackTimeout, TimeUnit.MILLISECONDS)
+ .subscribe();
+
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .enableBatching(batching)
+ .create();
+
+ Set<String> sentMessages = new HashSet<>();
+
+ final int N = 10;
+ for (int i = 0; i < N; i++) {
+ String value = "test-" + i;
+ producer.sendAsync(value);
+ sentMessages.add(value);
+ }
+ producer.flush();
+
+ for (int i = 0; i < N; i++) {
+ Message<String> msg = consumer.receive();
+ consumer.negativeAcknowledge(msg);
+ }
+
+ Set<String> receivedMessages = new HashSet<>();
+
+ // All the messages should be received again
+ for (int i = 0; i < N; i++) {
+ Message<String> msg = consumer.receive();
+ receivedMessages.add(msg.getValue());
+ consumer.acknowledge(msg);
+ }
+
+ assertEquals(receivedMessages, sentMessages);
+
+ // There should be no more messages
+ assertNull(consumer.receive(100, TimeUnit.MILLISECONDS));
+ consumer.close();
+ producer.close();
+ }
}
diff --git
a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
index 291cce3..3de0187 100644
---
a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
+++
b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
@@ -375,4 +375,21 @@ public class ConsumerConfiguration implements Serializable
{
public SubscriptionInitialPosition getSubscriptionInitialPosition(){
return conf.getSubscriptionInitialPosition();
}
+
+ /**
+ * @return the configured {@link NegativeAckRedeliveryBackoff} for the
consumer
+ */
+ public NegativeAckRedeliveryBackoff getNegativeAckRedeliveryBackoff() {
+ return conf.getNegativeAckRedeliveryBackoff();
+ }
+
+ /**
+ * @param negativeAckRedeliveryBackoff the negative ack redelivery backoff
policy.
+ * Default value is: NegativeAckRedeliveryExponentialBackoff
+ * @return the {@link ConsumerConfiguration}
+ */
+ public ConsumerConfiguration
setNegativeAckRedeliveryBackoff(NegativeAckRedeliveryBackoff
negativeAckRedeliveryBackoff) {
+ conf.setNegativeAckRedeliveryBackoff(negativeAckRedeliveryBackoff);
+ return this;
+ }
}
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index 3c3ce17..af1ece8 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -749,4 +749,18 @@ public interface ConsumerBuilder<T> extends Cloneable {
* Default: null
*/
ConsumerBuilder<T> messagePayloadProcessor(MessagePayloadProcessor
payloadProcessor);
+
+ /**
+ * Notice: the negativeAckRedeliveryBackoff will not work with
`consumer.negativeAcknowledge(MessageId messageId)`
+ * because we are not able to get the redelivery count from the message ID.
+ *
+ * <p>Example:
+ * <pre>
+ *
client.newConsumer().negativeAckRedeliveryBackoff(NegativeAckRedeliveryExponentialBackoff.builder()
+ * .minNackTimeMs(1000)
+ * .maxNackTimeMs(60 * 1000)
+ * .build()).subscribe();
+ * </pre>
+ */
+ ConsumerBuilder<T>
negativeAckRedeliveryBackoff(NegativeAckRedeliveryBackoff
negativeAckRedeliveryBackoff);
}
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/NegativeAckRedeliveryBackoff.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/NegativeAckRedeliveryBackoff.java
new file mode 100644
index 0000000..8e19c85
--- /dev/null
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/NegativeAckRedeliveryBackoff.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.io.Serializable;
+import org.apache.pulsar.common.classification.InterfaceAudience;
+import org.apache.pulsar.common.classification.InterfaceStability;
+
+/**
+ * Interface for custom message is negativeAcked policy, users can specify a
{@link NegativeAckRedeliveryBackoff} for
+ * a consumer.
+ *
+ * Notice: the consumer crashes will trigger the redelivery of the unacked
message, this case will not respect the
+ * {@link NegativeAckRedeliveryBackoff}, which means the message might get
redelivered earlier than the delay time
+ * from the backoff.
+ */
[email protected]
[email protected]
+public interface NegativeAckRedeliveryBackoff extends Serializable {
+ /**
+ * @param redeliveryCount indicates the number of times the message was
redelivered
+ */
+ long next(int redeliveryCount);
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index cbfc27d..d08dbda 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -45,6 +45,7 @@ import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.MessageCrypto;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.MessagePayloadProcessor;
+import org.apache.pulsar.client.api.NegativeAckRedeliveryBackoff;
import org.apache.pulsar.client.api.PulsarClientException;
import
org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
@@ -486,4 +487,11 @@ public class ConsumerBuilderImpl<T> implements
ConsumerBuilder<T> {
conf.setPayloadProcessor(payloadProcessor);
return this;
}
+
+ @Override
+ public ConsumerBuilder<T>
negativeAckRedeliveryBackoff(NegativeAckRedeliveryBackoff
negativeAckRedeliveryBackoff) {
+ checkArgument(negativeAckRedeliveryBackoff != null,
"negativeAckRedeliveryBackoff must not be null.");
+ conf.setNegativeAckRedeliveryBackoff(negativeAckRedeliveryBackoff);
+ return this;
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 8245de6..79aad9c 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -689,6 +689,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
@Override
+ public void negativeAcknowledge(Message<?> message) {
+ negativeAcksTracker.add(message);
+
+ // Ensure the message is not redelivered for ack-timeout, since we did
receive an "ack"
+ unAckedMessageTracker.remove(message.getMessageId());
+ }
+
+ @Override
public void connectionOpened(final ClientCnx cnx) {
previousExceptions.clear();
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 21ae2d7..f765e5e 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -523,6 +523,16 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
}
@Override
+ public void negativeAcknowledge(Message<?> message) {
+ MessageId messageId = message.getMessageId();
+ checkArgument(messageId instanceof TopicMessageIdImpl);
+ TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) messageId;
+
+ ConsumerImpl<T> consumer =
consumers.get(topicMessageId.getTopicPartitionName());
+ consumer.negativeAcknowledge(message);
+ }
+
+ @Override
public CompletableFuture<Void> unsubscribeAsync() {
if (getState() == State.Closing || getState() == State.Closed) {
return FutureUtil.failedFuture(
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAckRedeliveryExponentialBackoff.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAckRedeliveryExponentialBackoff.java
new file mode 100644
index 0000000..9b0eb7a
--- /dev/null
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAckRedeliveryExponentialBackoff.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import com.google.common.base.Preconditions;
+import org.apache.pulsar.client.api.NegativeAckRedeliveryBackoff;
+
+/**
+ * NegativeAckRedeliveryExponentialBackoff
+ */
+public class NegativeAckRedeliveryExponentialBackoff implements
NegativeAckRedeliveryBackoff {
+
+ private final long minNackTimeMs;
+ private final long maxNackTimeMs;
+ private final int maxBitShift;
+
+ private NegativeAckRedeliveryExponentialBackoff(long minNackTimeMs, long
maxNackTimeMs) {
+ this.minNackTimeMs = minNackTimeMs;
+ this.maxNackTimeMs = maxNackTimeMs;
+
+ for (int i = 0; ; ) {
+ if (this.minNackTimeMs << ++i <= 0) {
+ this.maxBitShift = i;
+ break;
+ }
+ }
+ }
+
+ public static
NegativeAckRedeliveryExponentialBackoff.NegativeAckRedeliveryExponentialBackoffBuilder
builder() {
+ return new
NegativeAckRedeliveryExponentialBackoff.NegativeAckRedeliveryExponentialBackoffBuilder();
+ }
+
+ public long getMinNackTimeMs() {
+ return this.minNackTimeMs;
+ }
+
+ public long getMaxNackTimeMs() {
+ return this.maxNackTimeMs;
+ }
+
+ @Override
+ public long next(int redeliveryCount) {
+ if (redeliveryCount <= 0 || minNackTimeMs <= 0) {
+ return this.minNackTimeMs;
+ }
+
+ if (this.maxBitShift <= redeliveryCount) {
+ return this.maxNackTimeMs;
+ }
+
+ return Math.min(this.minNackTimeMs << redeliveryCount,
this.maxNackTimeMs);
+ }
+
+ /**
+ * Builder of NegativeAckRedeliveryExponentialBackoff.
+ */
+ public static class NegativeAckRedeliveryExponentialBackoffBuilder {
+ private long minNackTimeMs = 1000 * 10;
+ private long maxNackTimeMs = 1000 * 60 * 10;
+
+ public NegativeAckRedeliveryExponentialBackoffBuilder
minNackTimeMs(long minNackTimeMs) {
+ this.minNackTimeMs = minNackTimeMs;
+ return this;
+ }
+
+ public NegativeAckRedeliveryExponentialBackoffBuilder
maxNackTimeMs(long maxNackTimeMs) {
+ this.maxNackTimeMs = maxNackTimeMs;
+ return this;
+ }
+
+ public NegativeAckRedeliveryExponentialBackoff build() {
+ Preconditions.checkArgument(minNackTimeMs >= 0, "min nack time
must be >= 0");
+ Preconditions.checkArgument(maxNackTimeMs >= minNackTimeMs,
+ "max nack time must be >= minNackTimeMs");
+ return new NegativeAckRedeliveryExponentialBackoff(minNackTimeMs,
maxNackTimeMs);
+ }
+ }
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
index a062009..5accee4 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
@@ -27,7 +27,9 @@ import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.NegativeAckRedeliveryBackoff;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import static
org.apache.pulsar.client.impl.UnAckedMessageTracker.addChunkedMessageIdsAndRemoveFromSequenceMap;
@@ -39,6 +41,7 @@ class NegativeAcksTracker implements Closeable {
private final Timer timer;
private final long nackDelayNanos;
private final long timerIntervalNanos;
+ private final NegativeAckRedeliveryBackoff negativeAckRedeliveryBackoff;
private Timeout timeout;
@@ -50,7 +53,14 @@ class NegativeAcksTracker implements Closeable {
this.timer = consumer.getClient().timer();
this.nackDelayNanos =
Math.max(TimeUnit.MICROSECONDS.toNanos(conf.getNegativeAckRedeliveryDelayMicros()),
MIN_NACK_DELAY_NANOS);
- this.timerIntervalNanos = nackDelayNanos / 3;
+ this.negativeAckRedeliveryBackoff =
conf.getNegativeAckRedeliveryBackoff();
+ if (negativeAckRedeliveryBackoff != null) {
+ this.timerIntervalNanos = Math.max(
+
TimeUnit.MILLISECONDS.toNanos(negativeAckRedeliveryBackoff.next(0)),
+ MIN_NACK_DELAY_NANOS) / 3;
+ } else {
+ this.timerIntervalNanos = nackDelayNanos / 3;
+ }
}
private synchronized void triggerRedelivery(Timeout t) {
@@ -95,6 +105,40 @@ class NegativeAcksTracker implements Closeable {
}
}
+ public synchronized void add(Message<?> message) {
+ if (negativeAckRedeliveryBackoff == null) {
+ add(message.getMessageId());
+ return;
+ }
+ add(message.getMessageId(), message.getRedeliveryCount());
+ }
+
+ private synchronized void add(MessageId messageId, int redeliveryCount) {
+ if (messageId instanceof TopicMessageIdImpl) {
+ TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) messageId;
+ messageId = topicMessageId.getInnerMessageId();
+ }
+
+ if (messageId instanceof BatchMessageIdImpl) {
+ BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
+ messageId = new MessageIdImpl(batchMessageId.getLedgerId(),
batchMessageId.getEntryId(),
+ batchMessageId.getPartitionIndex());
+ }
+
+ if (nackedMessages == null) {
+ nackedMessages = new HashMap<>();
+ }
+
+ long backoffNs =
TimeUnit.MILLISECONDS.toNanos(negativeAckRedeliveryBackoff.next(redeliveryCount));
+ nackedMessages.put(messageId, System.nanoTime() + backoffNs);
+
+ if (this.timeout == null) {
+ // Schedule a task and group all the redeliveries for same period.
Leave a small buffer to allow for
+ // nack immediately following the current one will be batched into
the same redeliver request.
+ this.timeout = timer.newTimeout(this::triggerRedelivery,
timerIntervalNanos, TimeUnit.NANOSECONDS);
+ }
+ }
+
@Override
public synchronized void close() {
if (timeout != null && !timeout.isCancelled()) {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index 1076946..9603992 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -43,6 +43,7 @@ import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.MessageCrypto;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.MessagePayloadProcessor;
+import org.apache.pulsar.client.api.NegativeAckRedeliveryBackoff;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
@@ -70,6 +71,9 @@ public class ConsumerConfigurationData<T> implements
Serializable, Cloneable {
@JsonIgnore
private ConsumerEventListener consumerEventListener;
+ @JsonIgnore
+ private NegativeAckRedeliveryBackoff negativeAckRedeliveryBackoff;
+
private int receiverQueueSize = 1000;
private long acknowledgementsGroupTimeMicros =
TimeUnit.MILLISECONDS.toMicros(100);
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/api/NegativeAckRedeliveryBackoffTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/api/NegativeAckRedeliveryBackoffTest.java
new file mode 100644
index 0000000..206a6f2
--- /dev/null
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/api/NegativeAckRedeliveryBackoffTest.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+
+import org.apache.pulsar.client.impl.NegativeAckRedeliveryExponentialBackoff;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test of {@link NegativeAckRedeliveryBackoff}.
+ */
+public class NegativeAckRedeliveryBackoffTest {
+
+ @SuppressWarnings("deprecation")
+ @Test
+ public void testNext() {
+
+ long minNackTime = 1000;
+ long maxNackTime = 1000 * 60 * 10;
+
+ NegativeAckRedeliveryBackoff nackBackoff = spy(
+ NegativeAckRedeliveryExponentialBackoff.builder()
+ .minNackTimeMs(minNackTime)
+ .maxNackTimeMs(maxNackTime)
+ .build());
+
+ assertEquals(nackBackoff.next(-1), minNackTime);
+
+ assertEquals(nackBackoff.next(0), minNackTime);
+
+ assertEquals(nackBackoff.next(1),minNackTime * 2);
+
+ assertEquals(nackBackoff.next(4), minNackTime * 16);
+
+ assertEquals(nackBackoff.next(100), maxNackTime);
+ }
+}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
index 13d63ba..798069c 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
@@ -302,4 +302,12 @@ public class ConsumerBuilderImplTest {
consumerBuilderImpl.subscriptionMode(SubscriptionMode.NonDurable)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
}
+
+ @Test
+ public void testNegativeAckRedeliveryBackoff() {
+
consumerBuilderImpl.negativeAckRedeliveryBackoff(NegativeAckRedeliveryExponentialBackoff.builder()
+ .minNackTimeMs(1000)
+ .maxNackTimeMs(10 * 1000)
+ .build());
+ }
}