This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit fdaa85945be16e3a6ac3088f3cec8e4d488db199 Author: lipenghui <[email protected]> AuthorDate: Fri Jul 2 19:25:43 2021 +0800 Fix the dead lock when using hasMessageAvailableAsync and readNextAsync (#11183) The issue will happens after satisfying the following conditions: 1. The messages are added to the incoming queue before reading messages 2. The result future of the readNextAsync been complete before call future.whenComplete by users, This won't always appear. After that, since we are using the IO thread to call the callback of the hasMessageAvailableAsync, so the IO thread will process the message.getValue(). Then might get a deadlock as followings: ``` java.util.concurrent.CompletableFuture.get() CompletableFuture.java:1998 org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaInfoByVersion(byte[]) AbstractMultiVersionReader.java:115 org.apache.pulsar.client.impl.schema.reader.MultiVersionAvroReader.loadReader(BytesSchemaVersion) MultiVersionAvroReader.java:47 org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(BytesSchemaVersion) AbstractMultiVersionReader.java:52 org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(Object) AbstractMultiVersionReader.java:49 com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(Object, CacheLoader) LocalCache.java:3529 com.google.common.cache.LocalCache$Segment.loadSync(Object, int, LocalCache$LoadingValueReference, CacheLoader) LocalCache.java:2278 com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(Object, int, CacheLoader) LocalCache.java:2155 com.google.common.cache.LocalCache$Segment.get(Object, int, CacheLoader) LocalCache.java:2045 com.google.common.cache.LocalCache.get(Object, CacheLoader) LocalCache.java:3951 com.google.common.cache.LocalCache.getOrLoad(Object) LocalCache.java:3974 com.google.common.cache.LocalCache$LocalLoadingCache.get(Object) LocalCache.java:4935 org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(byte[], byte[]) AbstractMultiVersionReader.java:86 org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(byte[], byte[]) AbstractStructSchema.java:60 org.apache.pulsar.client.impl.MessageImpl.getValue() MessageImpl.java:301 org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.refreshTopicPoliciesCache(Message) SystemTopicBasedTopicPoliciesService.java:302 org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.lambda$initPolicesCache$7(SystemTopicClient$Reader, Throwable, CompletableFuture, Message, Throwable) SystemTopicBasedTopicPoliciesService.java:254 org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService$$Lambda$817.accept(Object, Object) java.util.concurrent.CompletableFuture.uniWhenComplete(Object, BiConsumer, CompletableFuture$UniWhenComplete) CompletableFuture.java:859 java.util.concurrent.CompletableFuture.uniWhenCompleteStage(Executor, BiConsumer) CompletableFuture.java:883 java.util.concurrent.CompletableFuture.whenComplete(BiConsumer) CompletableFuture.java:2251 org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.lambda$initPolicesCache$10(SystemTopicClient$Reader, CompletableFuture, Boolean, Throwable) SystemTopicBasedTopicPoliciesService.java:246 org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService$$Lambda$725.accept(Object, Object) org.apache.pulsar.client.impl.ClientCnx.handleGetLastMessageIdSuccess(PulsarApi$CommandGetLastMessageIdResponse) ClientCnx.java:468 ``` Since we are introduced the internal thread pool for handling the client internal executions. So the fix is using the internal thread to process the callback of the hasMessageAvailableAsync (cherry picked from commit ed42007d5f5df063a61626e49803bbf35c5a3eff) --- .../org/apache/pulsar/client/impl/ReaderTest.java | 46 +++++++++++++++++++++- .../apache/pulsar/client/impl/ConsumerImpl.java | 27 ++++++++----- 2 files changed, 62 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java index a04f73b..4a2466f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java @@ -32,8 +32,11 @@ import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; + import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; @@ -51,10 +54,11 @@ import org.apache.pulsar.client.api.ReaderBuilder; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.ClusterDataImpl; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.util.Murmur3_32Hash; +import org.apache.pulsar.schema.Schemas; +import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -510,4 +514,44 @@ public class ReaderTest extends MockedPulsarServiceBaseTest { } + @Test(timeOut = 30000) + public void testAvoidUsingIoThreadToGetValueOfMessage() throws Exception { + final String topic = "persistent://my-property/my-ns/testAvoidUsingIoThreadToGetValueOfMessage"; + + @Cleanup + Producer<Schemas.PersonOne> producer = pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class)) + .topic(topic) + .create(); + + producer.send(new Schemas.PersonOne(1)); + + @Cleanup + Reader<Schemas.PersonOne> reader = pulsarClient.newReader(Schema.AVRO(Schemas.PersonOne.class)) + .topic(topic) + .startMessageId(MessageId.earliest) + .create(); + + CountDownLatch latch = new CountDownLatch(1); + List<Schemas.PersonOne> received = new ArrayList<>(1); + // Make sure the message is added to the incoming queue + Awaitility.await().untilAsserted(() -> + assertTrue(((ReaderImpl<?>) reader).getConsumer().incomingMessages.size() > 0)); + reader.hasMessageAvailableAsync().whenComplete((has, e) -> { + if (e == null && has) { + CompletableFuture<Message<Schemas.PersonOne>> future = reader.readNextAsync(); + // Make sure the future completed + Awaitility.await().pollInterval(1, TimeUnit.MILLISECONDS).untilAsserted(future::isDone); + future.whenComplete((msg, ex) -> { + if (ex == null) { + received.add(msg.getValue()); + } + latch.countDown(); + }); + } else { + latch.countDown(); + } + }); + latch.await(); + Assert.assertEquals(received.size(), 1); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index c5301da..a0fb143 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1906,14 +1906,15 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle .compare(markDeletePosition.getEntryId(), lastMessageId.getEntryId()) .result(); if (lastMessageId.getEntryId() < 0) { - booleanFuture.complete(false); + completehasMessageAvailableWithValue(booleanFuture, false); } else { - booleanFuture.complete(resetIncludeHead ? result <= 0 : result < 0); + completehasMessageAvailableWithValue(booleanFuture, + resetIncludeHead ? result <= 0 : result < 0); } } else if (lastMessageId == null || lastMessageId.getEntryId() < 0) { - booleanFuture.complete(false); + completehasMessageAvailableWithValue(booleanFuture, false); } else { - booleanFuture.complete(resetIncludeHead); + completehasMessageAvailableWithValue(booleanFuture, resetIncludeHead); } }).exceptionally(ex -> { log.error("[{}][{}] Failed getLastMessageId command", topic, subscription, ex); @@ -1925,16 +1926,16 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle } if (hasMoreMessages(lastMessageIdInBroker, startMessageId, resetIncludeHead)) { - booleanFuture.complete(true); + completehasMessageAvailableWithValue(booleanFuture, true); return booleanFuture; } getLastMessageIdAsync().thenAccept(messageId -> { lastMessageIdInBroker = messageId; if (hasMoreMessages(lastMessageIdInBroker, startMessageId, resetIncludeHead)) { - booleanFuture.complete(true); + completehasMessageAvailableWithValue(booleanFuture, true); } else { - booleanFuture.complete(false); + completehasMessageAvailableWithValue(booleanFuture, false); } }).exceptionally(e -> { log.error("[{}][{}] Failed getLastMessageId command", topic, subscription); @@ -1945,16 +1946,16 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle } else { // read before, use lastDequeueMessage for comparison if (hasMoreMessages(lastMessageIdInBroker, lastDequeuedMessageId, false)) { - booleanFuture.complete(true); + completehasMessageAvailableWithValue(booleanFuture, true); return booleanFuture; } getLastMessageIdAsync().thenAccept(messageId -> { lastMessageIdInBroker = messageId; if (hasMoreMessages(lastMessageIdInBroker, lastDequeuedMessageId, false)) { - booleanFuture.complete(true); + completehasMessageAvailableWithValue(booleanFuture, true); } else { - booleanFuture.complete(false); + completehasMessageAvailableWithValue(booleanFuture, false); } }).exceptionally(e -> { log.error("[{}][{}] Failed getLastMessageId command", topic, subscription); @@ -1966,6 +1967,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle return booleanFuture; } + private void completehasMessageAvailableWithValue(CompletableFuture<Boolean> future, boolean value) { + internalPinnedExecutor.execute(() -> { + future.complete(value); + }); + } + private boolean hasMoreMessages(MessageId lastMessageIdInBroker, MessageId messageId, boolean inclusive) { if (inclusive && lastMessageIdInBroker.compareTo(messageId) >= 0 && ((MessageIdImpl) lastMessageIdInBroker).getEntryId() != -1) {
