This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6a2e3a1 [Java Client] Remove data race in MultiTopicsConsumerImpl to
ensure correct message order (#12456)
6a2e3a1 is described below
commit 6a2e3a1ad735465154dc3fa12988c3068eae7da5
Author: Michael Marshall <[email protected]>
AuthorDate: Mon Oct 25 16:31:59 2021 -0500
[Java Client] Remove data race in MultiTopicsConsumerImpl to ensure correct
message order (#12456)
* [Java Client] Remove data race in MultiTopicsConsumerImpl to ensure
correct message order
* Fix test
* Return the checkState method call to keep original behavior
* Reproduce out-of-order delivery issue in PR 12456
* Remove unnecessary scheduling of receiveMessageFromConsumer
Co-authored-by: Lari Hotari <[email protected]>
---
.../pulsar/client/api/MultiTopicsConsumerTest.java | 75 ++++++++++++++++++++++
.../apache/pulsar/client/impl/ConsumerImpl.java | 3 +-
.../client/impl/MultiTopicsConsumerImpl.java | 37 ++++++-----
.../client/impl/MultiTopicsConsumerImplTest.java | 3 +-
4 files changed, 98 insertions(+), 20 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
index 715f3ad..d8c8bd6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.pulsar.client.api;
import static org.mockito.ArgumentMatchers.any;
@@ -24,9 +25,16 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import com.google.common.collect.Lists;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
import lombok.Cleanup;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
@@ -34,6 +42,7 @@ import org.mockito.AdditionalAnswers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -70,6 +79,7 @@ public class MultiTopicsConsumerTest extends
ProducerConsumerBase {
// method calls on the interface.
Mockito.withSettings().defaultAnswer(AdditionalAnswers.delegatesTo(internalExecutorService)));
}
+
@Override
public ExecutorService getInternalExecutorService() {
return internalExecutorServiceDelegate;
@@ -119,4 +129,69 @@ public class MultiTopicsConsumerTest extends
ProducerConsumerBase {
verify(internalExecutorServiceDelegate, times(0))
.schedule(any(Runnable.class), anyLong(), any());
}
+
+ // test that reproduces the issue that PR
https://github.com/apache/pulsar/pull/12456 fixes
+ // where MultiTopicsConsumerImpl has a data race that causes out-of-order
delivery of messages
+ @Test
+ public void
testShouldMaintainOrderForIndividualTopicInMultiTopicsConsumer()
+ throws PulsarAdminException, PulsarClientException,
ExecutionException, InterruptedException,
+ TimeoutException {
+ String topicName = newTopicName();
+ int numPartitions = 2;
+ int numMessages = 100000;
+ admin.topics().createPartitionedTopic(topicName, numPartitions);
+
+ Producer<Long>[] producers = new Producer[numPartitions];
+
+ for (int i = 0; i < numPartitions; i++) {
+ producers[i] = pulsarClient.newProducer(Schema.INT64)
+ // produce to each partition directly so that order can be
maintained in sending
+ .topic(topicName + "-partition-" + i)
+ .enableBatching(true)
+ .maxPendingMessages(30000)
+ .maxPendingMessagesAcrossPartitions(60000)
+ .batchingMaxMessages(10000)
+ .batchingMaxPublishDelay(5, TimeUnit.SECONDS)
+ .batchingMaxBytes(4 * 1024 * 1024)
+ .blockIfQueueFull(true)
+ .create();
+ }
+
+ @Cleanup
+ Consumer<Long> consumer = pulsarClient
+ .newConsumer(Schema.INT64)
+ // consume on the partitioned topic
+ .topic(topicName)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .receiverQueueSize(numMessages)
+ .subscriptionName(methodName)
+ .subscribe();
+
+ // produce sequence numbers to each partition topic
+ long sequenceNumber = 1L;
+ for (int i = 0; i < numMessages; i++) {
+ for (Producer<Long> producer : producers) {
+ producer.newMessage()
+ .value(sequenceNumber)
+ .sendAsync();
+ }
+ sequenceNumber++;
+ }
+ for (Producer<Long> producer : producers) {
+ producer.close();
+ }
+
+ // receive and validate sequences in the partitioned topic
+ Map<String, AtomicLong> receivedSequences = new HashMap<>();
+ int receivedCount = 0;
+ while (receivedCount < numPartitions * numMessages) {
+ Message<Long> message = consumer.receiveAsync().get(5,
TimeUnit.SECONDS);
+ consumer.acknowledge(message);
+ receivedCount++;
+ AtomicLong receivedSequenceCounter =
+ receivedSequences.computeIfAbsent(message.getTopicName(),
k -> new AtomicLong(1L));
+ Assert.assertEquals(message.getValue().longValue(),
receivedSequenceCounter.getAndIncrement());
+ }
+ Assert.assertEquals(numPartitions * numMessages, receivedCount);
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 62fff49..3159734 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -427,8 +427,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
if (message == null) {
pendingReceives.add(result);
cancellationHandler.setCancelAction(() ->
pendingReceives.remove(result));
- }
- if (message != null) {
+ } else {
messageProcessed(message);
result.complete(beforeConsume(message));
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 520e7f3..21ae2d7 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -245,7 +245,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
}
private void receiveMessageFromConsumer(ConsumerImpl<T> consumer) {
- consumer.receiveAsync().thenAccept(message -> {
+ consumer.receiveAsync().thenAcceptAsync(message -> {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Receive message from sub consumer:{}",
topic, subscription, consumer.getTopic());
@@ -260,16 +260,16 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
// or if any consumer is already paused (to create fair chance
for already paused consumers)
pausedConsumers.add(consumer);
- // Since we din't get a mutex, the condition on the incoming
queue might have changed after
+ // Since we didn't get a mutex, the condition on the incoming
queue might have changed after
// we have paused the current consumer. We need to re-check in
order to avoid this consumer
// from getting stalled.
resumeReceivingFromPausedConsumersIfNeeded();
} else {
- // Schedule next receiveAsync() if the incoming queue is not
full. Use a different thread to avoid
- // recursion and stack overflow
- internalPinnedExecutor.execute(() ->
receiveMessageFromConsumer(consumer));
+ // Call receiveAsync() if the incoming queue is not full.
Because this block is run with
+ // thenAcceptAsync, there is no chance for recursion that
would lead to stack overflow.
+ receiveMessageFromConsumer(consumer);
}
- }).exceptionally(ex -> {
+ }, internalPinnedExecutor).exceptionally(ex -> {
if (ex instanceof PulsarClientException.AlreadyClosedException
|| ex.getCause() instanceof
PulsarClientException.AlreadyClosedException) {
// ignore the exception that happens when the consumer is
closed
@@ -281,6 +281,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
});
}
+ // Must be called from the internalPinnedExecutor thread
private void messageReceived(ConsumerImpl<T> consumer, Message<T> message)
{
checkArgument(message instanceof MessageImpl);
TopicMessageImpl<T> topicMessage = new
TopicMessageImpl<>(consumer.getTopic(),
@@ -409,17 +410,19 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
protected CompletableFuture<Message<T>> internalReceiveAsync() {
CompletableFutureCancellationHandler cancellationHandler = new
CompletableFutureCancellationHandler();
CompletableFuture<Message<T>> result =
cancellationHandler.createFuture();
- Message<T> message = incomingMessages.poll();
- if (message == null) {
- pendingReceives.add(result);
- cancellationHandler.setCancelAction(() ->
pendingReceives.remove(result));
- } else {
- decreaseIncomingMessageSize(message);
- checkState(message instanceof TopicMessageImpl);
- unAckedMessageTracker.add(message.getMessageId());
- resumeReceivingFromPausedConsumersIfNeeded();
- result.complete(message);
- }
+ internalPinnedExecutor.execute(() -> {
+ Message<T> message = incomingMessages.poll();
+ if (message == null) {
+ pendingReceives.add(result);
+ cancellationHandler.setCancelAction(() ->
pendingReceives.remove(result));
+ } else {
+ decreaseIncomingMessageSize(message);
+ checkState(message instanceof TopicMessageImpl);
+ unAckedMessageTracker.add(message.getMessageId());
+ resumeReceivingFromPausedConsumersIfNeeded();
+ result.complete(message);
+ }
+ });
return result;
}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
index 6af8914..faa621c 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
@@ -36,6 +36,7 @@ import
org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
+import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
import org.testng.annotations.AfterMethod;
@@ -165,7 +166,7 @@ public class MultiTopicsConsumerImplTest {
// given
MultiTopicsConsumerImpl<byte[]> consumer = createMultiTopicsConsumer();
CompletableFuture<Message<byte[]>> future = consumer.receiveAsync();
- assertTrue(consumer.hasNextPendingReceive());
+ Awaitility.await().untilAsserted(() ->
assertTrue(consumer.hasNextPendingReceive()));
// when
future.cancel(true);
// then