This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new bd3de15d8ca [fix][broker] Intercept REDELIVER_UNACKNOWLEDGED_MESSAGES
command (#21265)
bd3de15d8ca is described below
commit bd3de15d8ca2bf3659d4133d072c3bc7cb9e6f5d
Author: kecona <[email protected]>
AuthorDate: Wed Oct 25 22:49:18 2023 +0800
[fix][broker] Intercept REDELIVER_UNACKNOWLEDGED_MESSAGES command (#21265)
---
.../broker/intercept/BrokerInterceptorTest.java | 19 +++++++++++++++++++
.../broker/intercept/CounterBrokerInterceptor.java | 7 +++++++
.../apache/pulsar/common/protocol/PulsarDecoder.java | 1 +
3 files changed, 27 insertions(+)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java
index 405d7caa1fa..d211de62963 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java
@@ -30,6 +30,7 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import okhttp3.Call;
import okhttp3.Callback;
@@ -306,4 +307,22 @@ public class BrokerInterceptorTest extends
ProducerConsumerBase {
}
}
+ @Test
+ public void testInterceptNack() throws Exception {
+ BrokerInterceptor interceptor = pulsar.getBrokerInterceptor();
+ Assert.assertTrue(interceptor instanceof CounterBrokerInterceptor);
+
+ final String topic = "test-intercept-nack" + UUID.randomUUID();
+ @Cleanup
+ Producer<String> producer =
pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+ @Cleanup
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .negativeAckRedeliveryDelay(1, TimeUnit.SECONDS)
+ .topic(topic)
+ .subscriptionName("test-sub").subscribe();
+ producer.send("test intercept nack message");
+ Message<String> message = consumer.receive();
+ consumer.negativeAcknowledge(message);
+ Awaitility.await().until(() -> ((CounterBrokerInterceptor)
interceptor).getHandleNackCount().get() == 1);
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
index 9c327a0ea6e..34fa4932da9 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
@@ -31,6 +31,7 @@ import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import lombok.AllArgsConstructor;
import lombok.Data;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.http.HttpStatus;
@@ -62,6 +63,8 @@ public class CounterBrokerInterceptor implements
BrokerInterceptor {
private final AtomicInteger messageDispatchCount = new AtomicInteger();
private final AtomicInteger messageAckCount = new AtomicInteger();
private final AtomicInteger handleAckCount = new AtomicInteger();
+ @Getter
+ private final AtomicInteger handleNackCount = new AtomicInteger();
private final AtomicInteger txnCount = new AtomicInteger();
private final AtomicInteger committedTxnCount = new AtomicInteger();
private final AtomicInteger abortedTxnCount = new AtomicInteger();
@@ -79,6 +82,7 @@ public class CounterBrokerInterceptor implements
BrokerInterceptor {
txnCount.set(0);
committedTxnCount.set(0);
abortedTxnCount.set(0);
+ handleNackCount.set(0);
}
private final List<ResponseEvent> responseList = new
CopyOnWriteArrayList<>();
@@ -209,6 +213,9 @@ public class CounterBrokerInterceptor implements
BrokerInterceptor {
if (command.getType().equals(BaseCommand.Type.ACK)) {
handleAckCount.incrementAndGet();
}
+
if(command.getType().equals(BaseCommand.Type.REDELIVER_UNACKNOWLEDGED_MESSAGES))
{
+ handleNackCount.incrementAndGet();
+ }
count.incrementAndGet();
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
index 496652fed0b..c1c1ebe355b 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
@@ -291,6 +291,7 @@ public abstract class PulsarDecoder extends
ChannelInboundHandlerAdapter {
case REDELIVER_UNACKNOWLEDGED_MESSAGES:
checkArgument(cmd.hasRedeliverUnacknowledgedMessages());
+ safeInterceptCommand(cmd);
handleRedeliverUnacknowledged(cmd.getRedeliverUnacknowledgedMessages());
break;