This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d377bc9d732 [improve][client] PIP-393: Improve performance of Negative
Acknowledgement (#23600)
d377bc9d732 is described below
commit d377bc9d7321a66201a301b6887fb1fea3ef8820
Author: Wenzhi Feng <[email protected]>
AuthorDate: Fri Jan 3 01:58:55 2025 +0800
[improve][client] PIP-393: Improve performance of Negative Acknowledgement
(#23600)
Co-authored-by: Lari Hotari <[email protected]>
---
distribution/shell/src/assemble/LICENSE.bin.txt | 2 +
pom.xml | 2 +
.../pulsar/client/impl/NegativeAcksTest.java | 50 +++++++-
pulsar-client-admin-shaded/pom.xml | 26 ++++
pulsar-client-all/pom.xml | 26 ++++
pulsar-client-dependencies-minimized/pom.xml | 100 +++++++++++++++
pulsar-client-shaded/pom.xml | 26 ++++
pulsar-client/pom.xml | 10 ++
.../apache/pulsar/client/impl/ConsumerImpl.java | 4 +-
.../pulsar/client/impl/NegativeAcksTracker.java | 139 ++++++++++++++-------
.../impl/conf/ConsumerConfigurationData.java | 10 ++
11 files changed, 344 insertions(+), 51 deletions(-)
diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt
b/distribution/shell/src/assemble/LICENSE.bin.txt
index 05342d17243..3333c9fe6ab 100644
--- a/distribution/shell/src/assemble/LICENSE.bin.txt
+++ b/distribution/shell/src/assemble/LICENSE.bin.txt
@@ -418,6 +418,8 @@ The Apache Software License, Version 2.0
- avro-protobuf-1.11.4.jar
* RE2j -- re2j-1.7.jar
* Spotify completable-futures -- completable-futures-0.3.6.jar
+ * RoaringBitmap -- RoaringBitmap-1.2.0.jar
+ * Fastutil -- fastutil-8.5.14.jar
BSD 3-clause "New" or "Revised" License
* JSR305 -- jsr305-3.0.2.jar -- ../licenses/LICENSE-JSR305.txt
diff --git a/pom.xml b/pom.xml
index 93cd3d5e11f..3cd9bd4b8d4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2580,6 +2580,7 @@ flexible messaging model and an intuitive client
API.</description>
<module>pulsar-metadata</module>
<module>jetcd-core-shaded</module>
<module>jclouds-shaded</module>
+ <module>pulsar-client-dependencies-minimized</module>
<!-- package management releated modules (begin) -->
<module>pulsar-package-management</module>
@@ -2645,6 +2646,7 @@ flexible messaging model and an intuitive client
API.</description>
<module>distribution</module>
<module>pulsar-metadata</module>
<module>jetcd-core-shaded</module>
+ <module>pulsar-client-dependencies-minimized</module>
<!-- package management releated modules (begin) -->
<module>pulsar-package-management</module>
<!-- package management releated modules (end) -->
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
index b372ecabc5d..f8bc30f0966 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import java.util.HashSet;
@@ -311,7 +312,7 @@ public class NegativeAcksTest extends ProducerConsumerBase {
// negative topic message id
consumer.negativeAcknowledge(topicMessageId);
NegativeAcksTracker negativeAcksTracker =
consumer.getNegativeAcksTracker();
-
assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse((long)
-1).longValue(), 1L);
+ assertEquals(negativeAcksTracker.getNackedMessagesCount(), 1L);
assertEquals(unAckedMessageTracker.size(), 0);
negativeAcksTracker.close();
// negative batch message id
@@ -319,11 +320,56 @@ public class NegativeAcksTest extends
ProducerConsumerBase {
consumer.negativeAcknowledge(batchMessageId);
consumer.negativeAcknowledge(batchMessageId2);
consumer.negativeAcknowledge(batchMessageId3);
-
assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse((long)
-1).longValue(), 1L);
+ assertEquals(negativeAcksTracker.getNackedMessagesCount(), 1L);
assertEquals(unAckedMessageTracker.size(), 0);
negativeAcksTracker.close();
}
+ /**
+ * If we nack multiple messages in the same batch with different
redelivery delays, the messages should be redelivered
+ * with the correct delay. However, all messages are redelivered at the
same time.
+ * @throws Exception
+ */
+ @Test
+ public void testNegativeAcksWithBatch() throws Exception {
+ cleanup();
+ conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
+ setup();
+ String topic =
BrokerTestUtil.newUniqueName("testNegativeAcksWithBatch");
+
+ @Cleanup
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("sub1")
+ .acknowledgmentGroupTime(0, TimeUnit.SECONDS)
+ .subscriptionType(SubscriptionType.Shared)
+ .enableBatchIndexAcknowledgment(true)
+ .negativeAckRedeliveryDelay(3, TimeUnit.SECONDS)
+ .subscribe();
+
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .enableBatching(true)
+ .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+ .batchingMaxMessages(2)
+ .create();
+ // send two messages in the same batch
+ producer.sendAsync("test-0");
+ producer.sendAsync("test-1");
+ producer.flush();
+
+ // negative ack the first message
+ consumer.negativeAcknowledge(consumer.receive());
+ // wait for 2s, negative ack the second message
+ Thread.sleep(2000);
+ consumer.negativeAcknowledge(consumer.receive());
+
+ // now 2s has passed, the first message should be redelivered 1s later.
+ Message<String> msg1 = consumer.receive(2, TimeUnit.SECONDS);
+ assertNotNull(msg1);
+ }
+
@Test
public void testNegativeAcksWithBatchAckEnabled() throws Exception {
cleanup();
diff --git a/pulsar-client-admin-shaded/pom.xml
b/pulsar-client-admin-shaded/pom.xml
index f667a8eb61e..de54c3d0496 100644
--- a/pulsar-client-admin-shaded/pom.xml
+++ b/pulsar-client-admin-shaded/pom.xml
@@ -34,6 +34,17 @@
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-client-admin-original</artifactId>
<version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>it.unimi.dsi</groupId>
+ <artifactId>fastutil</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-client-dependencies-minimized</artifactId>
+ <version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
@@ -150,6 +161,8 @@
<include>org.objenesis:*</include>
<include>org.reactivestreams:reactive-streams</include>
<include>org.yaml:snakeyaml</include>
+
<include>org.apache.pulsar:pulsar-client-dependencies-minimized</include>
+ <include>org.roaringbitmap:RoaringBitmap</include>
</includes>
<excludes>
<exclude>com.fasterxml.jackson.core:jackson-annotations</exclude>
@@ -269,6 +282,10 @@
<pattern>io.swagger</pattern>
<shadedPattern>org.apache.pulsar.shade.io.swagger</shadedPattern>
</relocation>
+ <relocation>
+ <pattern>it.unimi.dsi.fastutil</pattern>
+
<shadedPattern>org.apache.pulsar.shade.it.unimi.dsi.fastutil</shadedPattern>
+ </relocation>
<relocation>
<pattern>javassist</pattern>
<shadedPattern>org.apache.pulsar.shade.javassist</shadedPattern>
@@ -313,6 +330,11 @@
<shadedPattern>META-INF/versions/$1/org/apache/pulsar/shade/org/glassfish/</shadedPattern>
<rawString>true</rawString>
</relocation>
+ <relocation>
+ <pattern>META-INF/versions/(\d+)/org/roaringbitmap/</pattern>
+
<shadedPattern>META-INF/versions/$1/org/apache/pulsar/shade/org/roaringbitmap/</shadedPattern>
+ <rawString>true</rawString>
+ </relocation>
<relocation>
<pattern>META-INF/versions/(\d+)/org/yaml/</pattern>
<shadedPattern>META-INF/versions/$1/org/apache/pulsar/shade/org/yaml/</shadedPattern>
@@ -374,6 +396,10 @@
<pattern>org.reactivestreams</pattern>
<shadedPattern>org.apache.pulsar.shade.org.reactivestreams</shadedPattern>
</relocation>
+ <relocation>
+ <pattern>org.roaringbitmap</pattern>
+
<shadedPattern>org.apache.pulsar.shade.org.roaringbitmap</shadedPattern>
+ </relocation>
<relocation>
<pattern>org.yaml</pattern>
<shadedPattern>org.apache.pulsar.shade.org.yaml</shadedPattern>
diff --git a/pulsar-client-all/pom.xml b/pulsar-client-all/pom.xml
index 5e30dbd999d..4fec9ff51b8 100644
--- a/pulsar-client-all/pom.xml
+++ b/pulsar-client-all/pom.xml
@@ -39,6 +39,17 @@
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-client-original</artifactId>
<version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>it.unimi.dsi</groupId>
+ <artifactId>fastutil</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-client-dependencies-minimized</artifactId>
+ <version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
@@ -200,6 +211,8 @@
<include>org.reactivestreams:reactive-streams</include>
<include>org.tukaani:xz</include>
<include>org.yaml:snakeyaml</include>
+
<include>org.apache.pulsar:pulsar-client-dependencies-minimized</include>
+ <include>org.roaringbitmap:RoaringBitmap</include>
</includes>
<excludes>
<exclude>com.fasterxml.jackson.core:jackson-annotations</exclude>
@@ -317,6 +330,10 @@
<pattern>io.swagger</pattern>
<shadedPattern>org.apache.pulsar.shade.io.swagger</shadedPattern>
</relocation>
+ <relocation>
+ <pattern>it.unimi.dsi.fastutil</pattern>
+
<shadedPattern>org.apache.pulsar.shade.it.unimi.dsi.fastutil</shadedPattern>
+ </relocation>
<relocation>
<pattern>javassist</pattern>
<shadedPattern>org.apache.pulsar.shade.javassist</shadedPattern>
@@ -361,6 +378,11 @@
<shadedPattern>META-INF/versions/$1/org/apache/pulsar/shade/org/glassfish/</shadedPattern>
<rawString>true</rawString>
</relocation>
+ <relocation>
+ <pattern>META-INF/versions/(\d+)/org/roaringbitmap/</pattern>
+
<shadedPattern>META-INF/versions/$1/org/apache/pulsar/shade/org/roaringbitmap/</shadedPattern>
+ <rawString>true</rawString>
+ </relocation>
<relocation>
<pattern>META-INF/versions/(\d+)/org/yaml/</pattern>
<shadedPattern>META-INF/versions/$1/org/apache/pulsar/shade/org/yaml/</shadedPattern>
@@ -439,6 +461,10 @@
<pattern>org.reactivestreams</pattern>
<shadedPattern>org.apache.pulsar.shade.org.reactivestreams</shadedPattern>
</relocation>
+ <relocation>
+ <pattern>org.roaringbitmap</pattern>
+
<shadedPattern>org.apache.pulsar.shade.org.roaringbitmap</shadedPattern>
+ </relocation>
<relocation>
<pattern>org.tukaani</pattern>
<shadedPattern>org.apache.pulsar.shade.org.tukaani</shadedPattern>
diff --git a/pulsar-client-dependencies-minimized/pom.xml
b/pulsar-client-dependencies-minimized/pom.xml
new file mode 100644
index 00000000000..e838fedfddc
--- /dev/null
+++ b/pulsar-client-dependencies-minimized/pom.xml
@@ -0,0 +1,100 @@
+<?xml version="1.0"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd"
+ xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar</artifactId>
+ <version>4.1.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>pulsar-client-dependencies-minimized</artifactId>
+ <name>Apache Pulsar :: Client :: Dependencies minimized</name>
+ <description>This module is used in `pulsar-client-all`,
`pulsar-client-shaded`, and `pulsar-client-admin-shaded`
+ to minimize the number of classes included in the shaded jars for specific
dependencies.
+ Currently, it is used to minimize the classes included from `fastutil`.
+ </description>
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-client-original</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+ <build>
+ <finalName>${project.artifactId}-${project.version}</finalName>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-deploy-plugin</artifactId>
+ <!-- Skips the deployment of the minimized dependencies to Maven
Central as this is an intermediate
+ module used for building the shaded client jars -->
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <createDependencyReducedPom>true</createDependencyReducedPom>
+
<promoteTransitiveDependencies>false</promoteTransitiveDependencies>
+ <!-- minimize the classes included in the shaded jar -->
+ <minimizeJar>true</minimizeJar>
+ <artifactSet>
+ <includes>
+ <!-- The Pulsar module that references the library being
minimized -->
+ <include>org.apache.pulsar:pulsar-client-original</include>
+ <!-- Currently, only fastutil is minimized -->
+ <include>it.unimi.dsi:fastutil</include>
+ </includes>
+ </artifactSet>
+ <filters>
+ <!--
+ This filter specifies the classes that use the dependencies.
+ Both includes and excludes are set to **.
+ -->
+ <filter>
+ <artifact>org.apache.pulsar:pulsar-client-original</artifact>
+ <includes>
+ <include>**</include>
+ </includes>
+ <excludes>
+ <exclude>**</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/pulsar-client-shaded/pom.xml b/pulsar-client-shaded/pom.xml
index 62bab3cb2d7..d8adacbe8a0 100644
--- a/pulsar-client-shaded/pom.xml
+++ b/pulsar-client-shaded/pom.xml
@@ -39,6 +39,17 @@
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-client-original</artifactId>
<version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>it.unimi.dsi</groupId>
+ <artifactId>fastutil</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-client-dependencies-minimized</artifactId>
+ <version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
@@ -164,6 +175,8 @@
<include>org.reactivestreams:reactive-streams</include>
<include>org.tukaani:xz</include>
<include>org.yaml:snakeyaml</include>
+
<include>org.apache.pulsar:pulsar-client-dependencies-minimized</include>
+ <include>org.roaringbitmap:RoaringBitmap</include>
</includes>
<excludes>
<exclude>com.fasterxml.jackson.core:jackson-annotations</exclude>
@@ -263,6 +276,10 @@
<pattern>io.swagger</pattern>
<shadedPattern>org.apache.pulsar.shade.io.swagger</shadedPattern>
</relocation>
+ <relocation>
+ <pattern>it.unimi.dsi.fastutil</pattern>
+
<shadedPattern>org.apache.pulsar.shade.it.unimi.dsi.fastutil</shadedPattern>
+ </relocation>
<relocation>
<pattern>javax.activation</pattern>
<shadedPattern>org.apache.pulsar.shade.javax.activation</shadedPattern>
@@ -281,6 +298,11 @@
</shadedPattern>
<rawString>true</rawString>
</relocation>
+ <relocation>
+ <pattern>META-INF/versions/(\d+)/org/roaringbitmap/</pattern>
+
<shadedPattern>META-INF/versions/$1/org/apache/pulsar/shade/org/roaringbitmap/</shadedPattern>
+ <rawString>true</rawString>
+ </relocation>
<relocation>
<pattern>META-INF/versions/(\d+)/org/yaml/</pattern>
<shadedPattern>META-INF/versions/$1/org/apache/pulsar/shade/org/yaml/</shadedPattern>
@@ -343,6 +365,10 @@
<pattern>org.reactivestreams</pattern>
<shadedPattern>org.apache.pulsar.shade.org.reactivestreams</shadedPattern>
</relocation>
+ <relocation>
+ <pattern>org.roaringbitmap</pattern>
+
<shadedPattern>org.apache.pulsar.shade.org.roaringbitmap</shadedPattern>
+ </relocation>
<relocation>
<pattern>org.tukaani</pattern>
<shadedPattern>org.apache.pulsar.shade.org.tukaani</shadedPattern>
diff --git a/pulsar-client/pom.xml b/pulsar-client/pom.xml
index 49bb3c6490a..e1a70ed0748 100644
--- a/pulsar-client/pom.xml
+++ b/pulsar-client/pom.xml
@@ -207,6 +207,16 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.roaringbitmap</groupId>
+ <artifactId>RoaringBitmap</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>it.unimi.dsi</groupId>
+ <artifactId>fastutil</artifactId>
+ </dependency>
+
</dependencies>
<build>
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 16dc70f736e..86af4bdaf58 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -2752,7 +2752,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
int messagesFromQueue = 0;
Message<T> peek = incomingMessages.peek();
if (peek != null) {
- MessageIdAdv messageId =
MessageIdAdvUtils.discardBatch(peek.getMessageId());
+ MessageId messageId =
NegativeAcksTracker.discardBatchAndPartitionIndex(peek.getMessageId());
if (!messageIds.contains(messageId)) {
// first message is not expired, then no message is expired in
queue.
return 0;
@@ -2763,7 +2763,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
while (message != null) {
decreaseIncomingMessageSize(message);
messagesFromQueue++;
- MessageIdAdv id =
MessageIdAdvUtils.discardBatch(message.getMessageId());
+ MessageId id =
NegativeAcksTracker.discardBatchAndPartitionIndex(message.getMessageId());
if (!messageIds.contains(id)) {
messageIds.add(id);
break;
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
index 5256ebf04f4..273880569c3 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
@@ -22,9 +22,13 @@ import static
org.apache.pulsar.client.impl.UnAckedMessageTracker.addChunkedMess
import com.google.common.annotations.VisibleForTesting;
import io.netty.util.Timeout;
import io.netty.util.Timer;
+import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap;
+import it.unimi.dsi.fastutil.longs.LongBidirectionalIterator;
import java.io.Closeable;
import java.util.HashSet;
-import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Message;
@@ -32,40 +36,37 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.RedeliveryBackoff;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
-import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;
+import org.roaringbitmap.longlong.Roaring64Bitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class NegativeAcksTracker implements Closeable {
private static final Logger log =
LoggerFactory.getLogger(NegativeAcksTracker.class);
- private ConcurrentLongLongPairHashMap nackedMessages = null;
+ // timestamp -> ledgerId -> entryId, no need to batch index, if different
messages have
+ // different timestamp, there will be multiple entries in the map
+ // RB Tree -> LongOpenHashMap -> Roaring64Bitmap
+ private Long2ObjectSortedMap<Long2ObjectMap<Roaring64Bitmap>>
nackedMessages = null;
private final ConsumerBase<?> consumer;
private final Timer timer;
- private final long nackDelayNanos;
- private final long timerIntervalNanos;
+ private final long nackDelayMs;
private final RedeliveryBackoff negativeAckRedeliveryBackoff;
+ private final int negativeAckPrecisionBitCnt;
private Timeout timeout;
// Set a min delay to allow for grouping nacks within a single batch
- private static final long MIN_NACK_DELAY_NANOS =
TimeUnit.MILLISECONDS.toNanos(100);
- private static final long NON_PARTITIONED_TOPIC_PARTITION_INDEX =
Long.MAX_VALUE;
+ private static final long MIN_NACK_DELAY_MS = 100;
+ private static final int DUMMY_PARTITION_INDEX = -2;
public NegativeAcksTracker(ConsumerBase<?> consumer,
ConsumerConfigurationData<?> conf) {
this.consumer = consumer;
this.timer = consumer.getClient().timer();
- this.nackDelayNanos =
Math.max(TimeUnit.MICROSECONDS.toNanos(conf.getNegativeAckRedeliveryDelayMicros()),
- MIN_NACK_DELAY_NANOS);
+ this.nackDelayMs =
Math.max(TimeUnit.MICROSECONDS.toMillis(conf.getNegativeAckRedeliveryDelayMicros()),
+ MIN_NACK_DELAY_MS);
this.negativeAckRedeliveryBackoff =
conf.getNegativeAckRedeliveryBackoff();
- if (negativeAckRedeliveryBackoff != null) {
- this.timerIntervalNanos = Math.max(
-
TimeUnit.MILLISECONDS.toNanos(negativeAckRedeliveryBackoff.next(0)),
- MIN_NACK_DELAY_NANOS) / 3;
- } else {
- this.timerIntervalNanos = nackDelayNanos / 3;
- }
+ this.negativeAckPrecisionBitCnt = conf.getNegativeAckPrecisionBitCnt();
}
private void triggerRedelivery(Timeout t) {
@@ -76,21 +77,48 @@ class NegativeAcksTracker implements Closeable {
return;
}
- long now = System.nanoTime();
- nackedMessages.forEach((ledgerId, entryId, partitionIndex,
timestamp) -> {
- if (timestamp < now) {
- MessageId msgId = new MessageIdImpl(ledgerId, entryId,
- // need to covert non-partitioned topic partition
index to -1
- (int) (partitionIndex ==
NON_PARTITIONED_TOPIC_PARTITION_INDEX ? -1 : partitionIndex));
- addChunkedMessageIdsAndRemoveFromSequenceMap(msgId,
messagesToRedeliver, this.consumer);
- messagesToRedeliver.add(msgId);
+ long currentTimestamp = System.currentTimeMillis();
+ for (long timestamp : nackedMessages.keySet()) {
+ if (timestamp > currentTimestamp) {
+ // We are done with all the messages that need to be
redelivered
+ break;
+ }
+
+ Long2ObjectMap<Roaring64Bitmap> ledgerMap =
nackedMessages.get(timestamp);
+ for (Long2ObjectMap.Entry<Roaring64Bitmap> ledgerEntry :
ledgerMap.long2ObjectEntrySet()) {
+ long ledgerId = ledgerEntry.getLongKey();
+ Roaring64Bitmap entrySet = ledgerEntry.getValue();
+ entrySet.forEach(entryId -> {
+ MessageId msgId = new MessageIdImpl(ledgerId, entryId,
DUMMY_PARTITION_INDEX);
+ addChunkedMessageIdsAndRemoveFromSequenceMap(msgId,
messagesToRedeliver, this.consumer);
+ messagesToRedeliver.add(msgId);
+ });
+ }
+ }
+
+ // remove entries from the nackedMessages map
+ LongBidirectionalIterator iterator =
nackedMessages.keySet().iterator();
+ while (iterator.hasNext()) {
+ long timestamp = iterator.nextLong();
+ if (timestamp <= currentTimestamp) {
+ iterator.remove();
+ } else {
+ break;
}
- });
- for (MessageId messageId : messagesToRedeliver) {
- nackedMessages.remove(((MessageIdImpl)
messageId).getLedgerId(),
- ((MessageIdImpl) messageId).getEntryId());
}
- this.timeout = timer.newTimeout(this::triggerRedelivery,
timerIntervalNanos, TimeUnit.NANOSECONDS);
+
+ // Schedule the next redelivery if there are still messages to
redeliver
+ if (!nackedMessages.isEmpty()) {
+ long nextTriggerTimestamp = nackedMessages.firstLongKey();
+ long delayMs = Math.max(nextTriggerTimestamp -
currentTimestamp, 0);
+ if (delayMs > 0) {
+ this.timeout = timer.newTimeout(this::triggerRedelivery,
delayMs, TimeUnit.MILLISECONDS);
+ } else {
+ this.timeout = timer.newTimeout(this::triggerRedelivery,
0, TimeUnit.MILLISECONDS);
+ }
+ } else {
+ this.timeout = null;
+ }
}
// release the lock of NegativeAcksTracker before calling
consumer.redeliverUnacknowledgedMessages,
@@ -110,39 +138,56 @@ class NegativeAcksTracker implements Closeable {
add(message.getMessageId(), message.getRedeliveryCount());
}
+ static long trimLowerBit(long timestamp, int bits) {
+ return timestamp & (-1L << bits);
+ }
+
private synchronized void add(MessageId messageId, int redeliveryCount) {
if (nackedMessages == null) {
- nackedMessages = ConcurrentLongLongPairHashMap.newBuilder()
- .autoShrink(true)
- .concurrencyLevel(1)
- .build();
+ nackedMessages = new Long2ObjectAVLTreeMap<>();
}
- long backoffNs;
+ long backoffMs;
if (negativeAckRedeliveryBackoff != null) {
- backoffNs =
TimeUnit.MILLISECONDS.toNanos(negativeAckRedeliveryBackoff.next(redeliveryCount));
+ backoffMs =
TimeUnit.MILLISECONDS.toMillis(negativeAckRedeliveryBackoff.next(redeliveryCount));
} else {
- backoffNs = nackDelayNanos;
+ backoffMs = nackDelayMs;
}
- MessageIdAdv messageIdAdv = MessageIdAdvUtils.discardBatch(messageId);
- // ConcurrentLongLongPairHashMap requires the key and value >=0.
- // partitionIndex is -1 if the message is from a non-partitioned
topic, but we don't use
- // partitionIndex actually, so we can set it to Long.MAX_VALUE in the
case of non-partitioned topic to
- // avoid exception from ConcurrentLongLongPairHashMap.
- nackedMessages.put(messageIdAdv.getLedgerId(),
messageIdAdv.getEntryId(),
- messageIdAdv.getPartitionIndex() >= 0 ?
messageIdAdv.getPartitionIndex() :
- NON_PARTITIONED_TOPIC_PARTITION_INDEX,
System.nanoTime() + backoffNs);
+ MessageIdAdv messageIdAdv = (MessageIdAdv) messageId;
+ long timestamp = trimLowerBit(System.currentTimeMillis() + backoffMs,
negativeAckPrecisionBitCnt);
+ nackedMessages.computeIfAbsent(timestamp, k -> new
Long2ObjectOpenHashMap<>())
+ .computeIfAbsent(messageIdAdv.getLedgerId(), k -> new
Roaring64Bitmap())
+ .add(messageIdAdv.getEntryId());
if (this.timeout == null) {
// Schedule a task and group all the redeliveries for same period.
Leave a small buffer to allow for
// nack immediately following the current one will be batched into
the same redeliver request.
- this.timeout = timer.newTimeout(this::triggerRedelivery,
timerIntervalNanos, TimeUnit.NANOSECONDS);
+ this.timeout = timer.newTimeout(this::triggerRedelivery,
backoffMs, TimeUnit.MILLISECONDS);
}
}
+ /**
+ * Discard the batch index and partition index from the message id.
+ *
+ * @param messageId
+ * @return
+ */
+ public static MessageIdAdv discardBatchAndPartitionIndex(MessageId
messageId) {
+ if (messageId instanceof ChunkMessageIdImpl) {
+ return (MessageIdAdv) messageId;
+ }
+ MessageIdAdv msgId = (MessageIdAdv) messageId;
+ return new MessageIdImpl(msgId.getLedgerId(), msgId.getEntryId(),
DUMMY_PARTITION_INDEX);
+ }
+
@VisibleForTesting
- Optional<Long> getNackedMessagesCount() {
- return
Optional.ofNullable(nackedMessages).map(ConcurrentLongLongPairHashMap::size);
+ synchronized long getNackedMessagesCount() {
+ if (nackedMessages == null) {
+ return 0;
+ }
+ return nackedMessages.values().stream().mapToLong(
+ ledgerMap -> ledgerMap.values().stream().mapToLong(
+ Roaring64Bitmap::getLongCardinality).sum()).sum();
}
@Override
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index cd82b54618f..dc9251a975c 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -156,6 +156,16 @@ public class ConsumerConfigurationData<T> implements
Serializable, Cloneable {
)
private long negativeAckRedeliveryDelayMicros =
TimeUnit.MINUTES.toMicros(1);
+ @ApiModelProperty(
+ name = "negativeAckPrecisionBitCnt",
+ value = "The redelivery time precision bit count. The lower bits
of the redelivery time will be"
+ + "trimmed to reduce the memory occupation.\nThe default
value is 8, which means the"
+ + "redelivery time will be bucketed by 256ms, the
redelivery time could be earlier(no later)"
+ + "than the expected time, but no more than 256ms. \nIf
set to k, the redelivery time will be"
+ + "bucketed by 2^k ms.\nIf the value is 0, the redelivery
time will be accurate to ms."
+ )
+ private int negativeAckPrecisionBitCnt = 8;
+
@ApiModelProperty(
name = "maxTotalReceiverQueueSizeAcrossPartitions",
value = "The max total receiver queue size across partitions.\n"