This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c07b158f003 [fix][client] Fix for early hit `beforeConsume` for
MultiTopicConsumer (#23141)
c07b158f003 is described below
commit c07b158f003c5a5623296189f0932d7058d2e75a
Author: Cong Zhao <[email protected]>
AuthorDate: Wed Aug 14 10:26:47 2024 +0800
[fix][client] Fix for early hit `beforeConsume` for MultiTopicConsumer
(#23141)
---
.../apache/pulsar/client/api/InterceptorsTest.java | 44 ++++++++++------
.../client/impl/MultiTopicsConsumerImpl.java | 58 ++++++++++++++++++++--
2 files changed, 83 insertions(+), 19 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
index f23d82b32cd..afb17a18647 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.api;
+import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -29,8 +30,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-
-import com.google.common.collect.Sets;
import lombok.Cleanup;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
@@ -79,6 +78,12 @@ public class InterceptorsTest extends ProducerConsumerBase {
return new Object[][] {{ 0 }, { 3 }};
}
+ @DataProvider(name = "topics")
+ public Object[][] getTopics() {
+ return new Object[][] {{
List.of("persistent://my-property/my-ns/my-topic") },
+ { List.of("persistent://my-property/my-ns/my-topic",
"persistent://my-property/my-ns/my-topic1") }};
+ }
+
@Test
public void testProducerInterceptor() throws Exception {
Map<MessageId, List<String>> ackCallback = new HashMap<>();
@@ -403,9 +408,9 @@ public class InterceptorsTest extends ProducerConsumerBase {
@Override
public Message<String> beforeConsume(Consumer<String> consumer,
Message<String> message) {
- MessageImpl<String> msg = (MessageImpl<String>) message;
+ MessageImpl<String> msg = ((MessageImpl<String>)
((TopicMessageImpl<String>) message).getMessage());
msg.getMessageBuilder().addProperty().setKey("beforeConsumer").setValue("1");
- return msg;
+ return message;
}
@Override
@@ -449,13 +454,19 @@ public class InterceptorsTest extends
ProducerConsumerBase {
int keyCount = 0;
for (int i = 0; i < 2; i++) {
- Message<String> received = consumer.receive();
+ Message<String> received;
+ if (i % 2 == 0) {
+ received = consumer.receive();
+ } else {
+ received = consumer.receiveAsync().join();
+ }
MessageImpl<String> msg = (MessageImpl<String>)
((TopicMessageImpl<String>) received).getMessage();
for (KeyValue keyValue :
msg.getMessageBuilder().getPropertiesList()) {
if ("beforeConsumer".equals(keyValue.getKey())) {
keyCount++;
}
}
+ Assert.assertEquals(keyCount, i + 1);
consumer.acknowledge(received);
}
Assert.assertEquals(2, keyCount);
@@ -475,9 +486,9 @@ public class InterceptorsTest extends ProducerConsumerBase {
@Override
public Message<String> beforeConsume(Consumer<String> consumer,
Message<String> message) {
- MessageImpl<String> msg = (MessageImpl<String>) message;
+ MessageImpl<String> msg = ((MessageImpl<String>)
((TopicMessageImpl<String>) message).getMessage());
msg.getMessageBuilder().addProperty().setKey("beforeConsumer").setValue("1");
- return msg;
+ return message;
}
@Override
@@ -612,8 +623,8 @@ public class InterceptorsTest extends ProducerConsumerBase {
consumer.close();
}
- @Test
- public void testConsumerInterceptorForNegativeAcksSend() throws
PulsarClientException, InterruptedException {
+ @Test(dataProvider = "topics")
+ public void testConsumerInterceptorForNegativeAcksSend(List<String>
topics) throws PulsarClientException, InterruptedException {
final int totalNumOfMessages = 100;
CountDownLatch latch = new CountDownLatch(totalNumOfMessages / 2);
@@ -640,6 +651,7 @@ public class InterceptorsTest extends ProducerConsumerBase {
@Override
public void onNegativeAcksSend(Consumer<String> consumer,
Set<MessageId> messageIds) {
+ Assert.assertTrue(latch.getCount() > 0);
messageIds.forEach(messageId -> latch.countDown());
}
@@ -650,7 +662,7 @@ public class InterceptorsTest extends ProducerConsumerBase {
};
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
- .topic("persistent://my-property/my-ns/my-topic")
+ .topics(topics)
.subscriptionType(SubscriptionType.Failover)
.intercept(interceptor)
.negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
@@ -658,7 +670,7 @@ public class InterceptorsTest extends ProducerConsumerBase {
.subscribe();
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
- .topic("persistent://my-property/my-ns/my-topic")
+ .topic(topics.get(0))
.create();
for (int i = 0; i < totalNumOfMessages; i++) {
@@ -682,8 +694,9 @@ public class InterceptorsTest extends ProducerConsumerBase {
consumer.close();
}
- @Test
- public void testConsumerInterceptorForAckTimeoutSend() throws
PulsarClientException, InterruptedException {
+ @Test(dataProvider = "topics")
+ public void testConsumerInterceptorForAckTimeoutSend(List<String> topics)
throws PulsarClientException,
+ InterruptedException {
final int totalNumOfMessages = 100;
CountDownLatch latch = new CountDownLatch(totalNumOfMessages / 2);
@@ -714,16 +727,17 @@ public class InterceptorsTest extends
ProducerConsumerBase {
@Override
public void onAckTimeoutSend(Consumer<String> consumer,
Set<MessageId> messageIds) {
+ Assert.assertTrue(latch.getCount() > 0);
messageIds.forEach(messageId -> latch.countDown());
}
};
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
- .topic("persistent://my-property/my-ns/my-topic")
+ .topic(topics.get(0))
.create();
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
- .topic("persistent://my-property/my-ns/my-topic")
+ .topics(topics)
.subscriptionName("foo")
.intercept(interceptor)
.ackTimeout(2, TimeUnit.SECONDS)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 3f5e501b281..bf8bd6cc951 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableMap.Builder;
import com.google.common.collect.Lists;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -108,6 +109,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
private final MessageIdAdv startMessageId;
private volatile boolean duringSeek = false;
private final long startMessageRollbackDurationInSec;
+ private final ConsumerInterceptors<T> internalConsumerInterceptors;
MultiTopicsConsumerImpl(PulsarClientImpl client,
ConsumerConfigurationData<T> conf,
ExecutorProvider executorProvider, CompletableFuture<Consumer<T>>
subscribeFuture, Schema<T> schema,
ConsumerInterceptors<T> interceptors, boolean
createTopicIfDoesNotExist) {
@@ -137,6 +139,11 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
long startMessageRollbackDurationInSec) {
super(client, singleTopic, conf, Math.max(2,
conf.getReceiverQueueSize()), executorProvider, subscribeFuture,
schema, interceptors);
+ if (interceptors != null) {
+ this.internalConsumerInterceptors =
getInternalConsumerInterceptors(interceptors);
+ } else {
+ this.internalConsumerInterceptors = null;
+ }
checkArgument(conf.getReceiverQueueSize() > 0,
"Receiver queue size needs to be greater than 0 for Topics
Consumer");
@@ -316,7 +323,8 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
CompletableFuture<Message<T>> receivedFuture = nextPendingReceive();
if (receivedFuture != null) {
unAckedMessageTracker.add(topicMessage.getMessageId(),
topicMessage.getRedeliveryCount());
- completePendingReceive(receivedFuture, topicMessage);
+ final Message<T> interceptMessage = beforeConsume(topicMessage);
+ completePendingReceive(receivedFuture, interceptMessage);
} else if (enqueueMessageAndCheckBatchReceive(topicMessage) &&
hasPendingBatchReceive()) {
notifyPendingBatchReceivedCallBack();
}
@@ -369,7 +377,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
checkState(message instanceof TopicMessageImpl);
unAckedMessageTracker.add(message.getMessageId(),
message.getRedeliveryCount());
resumeReceivingFromPausedConsumersIfNeeded();
- return message;
+ return beforeConsume(message);
} catch (Exception e) {
ExceptionHandler.handleInterruptedException(e);
throw PulsarClientException.unwrap(e);
@@ -388,6 +396,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
decreaseIncomingMessageSize(message);
checkArgument(message instanceof TopicMessageImpl);
trackUnAckedMsgIfNoListener(message.getMessageId(),
message.getRedeliveryCount());
+ message = beforeConsume(message);
}
resumeReceivingFromPausedConsumersIfNeeded();
return message;
@@ -447,7 +456,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
checkState(message instanceof TopicMessageImpl);
unAckedMessageTracker.add(message.getMessageId(),
message.getRedeliveryCount());
resumeReceivingFromPausedConsumersIfNeeded();
- result.complete(message);
+ result.complete(beforeConsume(message));
}
});
return result;
@@ -1185,7 +1194,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
return ConsumerImpl.newConsumerImpl(client, partitionName,
configurationData, client.externalExecutorProvider(),
partitionIndex, true, listener != null, subFuture,
- startMessageId, schema, interceptors,
+ startMessageId, schema, this.internalConsumerInterceptors,
createIfDoesNotExist, startMessageRollbackDurationInSec);
}
@@ -1595,4 +1604,45 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
return list;
});
}
+
+ private ConsumerInterceptors<T>
getInternalConsumerInterceptors(ConsumerInterceptors<T> multiTopicInterceptors)
{
+ return new ConsumerInterceptors<T>(new ArrayList<>()) {
+
+ @Override
+ public Message<T> beforeConsume(Consumer<T> consumer, Message<T>
message) {
+ return message;
+ }
+
+ @Override
+ public void onAcknowledge(Consumer<T> consumer, MessageId
messageId, Throwable exception) {
+ multiTopicInterceptors.onAcknowledge(consumer, messageId,
exception);
+ }
+
+ @Override
+ public void onAcknowledgeCumulative(Consumer<T> consumer,
+ MessageId messageId, Throwable
exception) {
+ multiTopicInterceptors.onAcknowledgeCumulative(consumer,
messageId, exception);
+ }
+
+ @Override
+ public void onNegativeAcksSend(Consumer<T> consumer,
Set<MessageId> set) {
+ multiTopicInterceptors.onNegativeAcksSend(consumer, set);
+ }
+
+ @Override
+ public void onAckTimeoutSend(Consumer<T> consumer, Set<MessageId>
set) {
+ multiTopicInterceptors.onAckTimeoutSend(consumer, set);
+ }
+
+ @Override
+ public void onPartitionsChange(String topicName, int partitions) {
+ multiTopicInterceptors.onPartitionsChange(topicName,
partitions);
+ }
+
+ @Override
+ public void close() throws IOException {
+ multiTopicInterceptors.close();
+ }
+ };
+ }
}