This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fdaa85945be16e3a6ac3088f3cec8e4d488db199
Author: lipenghui <[email protected]>
AuthorDate: Fri Jul 2 19:25:43 2021 +0800

    Fix the dead lock when using hasMessageAvailableAsync and readNextAsync 
(#11183)
    
    The issue will happens after satisfying the following conditions:
    
    1. The messages are added to the incoming queue before reading messages
    2. The result future of the readNextAsync been complete before call 
future.whenComplete by users,
       This won't always appear.
    
     After that, since we are using the IO thread to call the callback of the 
hasMessageAvailableAsync,
     so the IO thread will process the message.getValue(). Then might get a 
deadlock as followings:
    
     ```
     java.util.concurrent.CompletableFuture.get() CompletableFuture.java:1998
     
org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaInfoByVersion(byte[])
 AbstractMultiVersionReader.java:115
     
org.apache.pulsar.client.impl.schema.reader.MultiVersionAvroReader.loadReader(BytesSchemaVersion)
 MultiVersionAvroReader.java:47
     
org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(BytesSchemaVersion)
 AbstractMultiVersionReader.java:52
     
org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(Object)
 AbstractMultiVersionReader.java:49
     
com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(Object, 
CacheLoader) LocalCache.java:3529
     com.google.common.cache.LocalCache$Segment.loadSync(Object, int, 
LocalCache$LoadingValueReference, CacheLoader) LocalCache.java:2278
     com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(Object, int, 
CacheLoader) LocalCache.java:2155
     com.google.common.cache.LocalCache$Segment.get(Object, int, CacheLoader) 
LocalCache.java:2045
     com.google.common.cache.LocalCache.get(Object, CacheLoader) 
LocalCache.java:3951
     com.google.common.cache.LocalCache.getOrLoad(Object) LocalCache.java:3974
     com.google.common.cache.LocalCache$LocalLoadingCache.get(Object) 
LocalCache.java:4935
     
org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(byte[],
 byte[]) AbstractMultiVersionReader.java:86
     org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(byte[], 
byte[]) AbstractStructSchema.java:60
     org.apache.pulsar.client.impl.MessageImpl.getValue() MessageImpl.java:301
     
org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.refreshTopicPoliciesCache(Message)
 SystemTopicBasedTopicPoliciesService.java:302
     
org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.lambda$initPolicesCache$7(SystemTopicClient$Reader,
 Throwable, CompletableFuture, Message, Throwable) 
SystemTopicBasedTopicPoliciesService.java:254
     
org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService$$Lambda$817.accept(Object,
 Object)
     java.util.concurrent.CompletableFuture.uniWhenComplete(Object, BiConsumer, 
CompletableFuture$UniWhenComplete) CompletableFuture.java:859
     java.util.concurrent.CompletableFuture.uniWhenCompleteStage(Executor, 
BiConsumer) CompletableFuture.java:883
     java.util.concurrent.CompletableFuture.whenComplete(BiConsumer) 
CompletableFuture.java:2251
    
org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.lambda$initPolicesCache$10(SystemTopicClient$Reader,
 CompletableFuture, Boolean, Throwable) 
SystemTopicBasedTopicPoliciesService.java:246
     
org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService$$Lambda$725.accept(Object,
 Object) 
org.apache.pulsar.client.impl.ClientCnx.handleGetLastMessageIdSuccess(PulsarApi$CommandGetLastMessageIdResponse)
 ClientCnx.java:468
    
     ```
    
     Since we are introduced the internal thread pool for handling the client 
internal executions.
     So the fix is using the internal thread to process the callback of the 
hasMessageAvailableAsync
    
    (cherry picked from commit ed42007d5f5df063a61626e49803bbf35c5a3eff)
---
 .../org/apache/pulsar/client/impl/ReaderTest.java  | 46 +++++++++++++++++++++-
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 27 ++++++++-----
 2 files changed, 62 insertions(+), 11 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
index a04f73b..4a2466f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
@@ -32,8 +32,11 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -51,10 +54,11 @@ import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.util.Murmur3_32Hash;
+import org.apache.pulsar.schema.Schemas;
+import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -510,4 +514,44 @@ public class ReaderTest extends 
MockedPulsarServiceBaseTest {
 
     }
 
+    @Test(timeOut = 30000)
+    public void testAvoidUsingIoThreadToGetValueOfMessage() throws Exception {
+        final String topic = 
"persistent://my-property/my-ns/testAvoidUsingIoThreadToGetValueOfMessage";
+
+        @Cleanup
+        Producer<Schemas.PersonOne> producer = 
pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class))
+                .topic(topic)
+                .create();
+
+        producer.send(new Schemas.PersonOne(1));
+
+        @Cleanup
+        Reader<Schemas.PersonOne> reader = 
pulsarClient.newReader(Schema.AVRO(Schemas.PersonOne.class))
+                .topic(topic)
+                .startMessageId(MessageId.earliest)
+                .create();
+
+        CountDownLatch latch = new CountDownLatch(1);
+        List<Schemas.PersonOne> received = new ArrayList<>(1);
+        // Make sure the message is added to the incoming queue
+        Awaitility.await().untilAsserted(() ->
+                assertTrue(((ReaderImpl<?>) 
reader).getConsumer().incomingMessages.size() > 0));
+        reader.hasMessageAvailableAsync().whenComplete((has, e) -> {
+            if (e == null && has) {
+                CompletableFuture<Message<Schemas.PersonOne>> future = 
reader.readNextAsync();
+                // Make sure the future completed
+                Awaitility.await().pollInterval(1, 
TimeUnit.MILLISECONDS).untilAsserted(future::isDone);
+                future.whenComplete((msg, ex) -> {
+                    if (ex == null) {
+                        received.add(msg.getValue());
+                    }
+                    latch.countDown();
+                });
+            } else {
+                latch.countDown();
+            }
+        });
+        latch.await();
+        Assert.assertEquals(received.size(), 1);
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index c5301da..a0fb143 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1906,14 +1906,15 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                                 .compare(markDeletePosition.getEntryId(), 
lastMessageId.getEntryId())
                                 .result();
                         if (lastMessageId.getEntryId() < 0) {
-                            booleanFuture.complete(false);
+                            
completehasMessageAvailableWithValue(booleanFuture, false);
                         } else {
-                            booleanFuture.complete(resetIncludeHead ? result 
<= 0 : result < 0);
+                            completehasMessageAvailableWithValue(booleanFuture,
+                                    resetIncludeHead ? result <= 0 : result < 
0);
                         }
                     } else if (lastMessageId == null || 
lastMessageId.getEntryId() < 0) {
-                        booleanFuture.complete(false);
+                        completehasMessageAvailableWithValue(booleanFuture, 
false);
                     } else {
-                        booleanFuture.complete(resetIncludeHead);
+                        completehasMessageAvailableWithValue(booleanFuture, 
resetIncludeHead);
                     }
                 }).exceptionally(ex -> {
                     log.error("[{}][{}] Failed getLastMessageId command", 
topic, subscription, ex);
@@ -1925,16 +1926,16 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             }
 
             if (hasMoreMessages(lastMessageIdInBroker, startMessageId, 
resetIncludeHead)) {
-                booleanFuture.complete(true);
+                completehasMessageAvailableWithValue(booleanFuture, true);
                 return booleanFuture;
             }
 
             getLastMessageIdAsync().thenAccept(messageId -> {
                 lastMessageIdInBroker = messageId;
                 if (hasMoreMessages(lastMessageIdInBroker, startMessageId, 
resetIncludeHead)) {
-                    booleanFuture.complete(true);
+                    completehasMessageAvailableWithValue(booleanFuture, true);
                 } else {
-                    booleanFuture.complete(false);
+                    completehasMessageAvailableWithValue(booleanFuture, false);
                 }
             }).exceptionally(e -> {
                 log.error("[{}][{}] Failed getLastMessageId command", topic, 
subscription);
@@ -1945,16 +1946,16 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         } else {
             // read before, use lastDequeueMessage for comparison
             if (hasMoreMessages(lastMessageIdInBroker, lastDequeuedMessageId, 
false)) {
-                booleanFuture.complete(true);
+                completehasMessageAvailableWithValue(booleanFuture, true);
                 return booleanFuture;
             }
 
             getLastMessageIdAsync().thenAccept(messageId -> {
                 lastMessageIdInBroker = messageId;
                 if (hasMoreMessages(lastMessageIdInBroker, 
lastDequeuedMessageId, false)) {
-                    booleanFuture.complete(true);
+                    completehasMessageAvailableWithValue(booleanFuture, true);
                 } else {
-                    booleanFuture.complete(false);
+                    completehasMessageAvailableWithValue(booleanFuture, false);
                 }
             }).exceptionally(e -> {
                 log.error("[{}][{}] Failed getLastMessageId command", topic, 
subscription);
@@ -1966,6 +1967,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         return booleanFuture;
     }
 
+    private void 
completehasMessageAvailableWithValue(CompletableFuture<Boolean> future, boolean 
value) {
+        internalPinnedExecutor.execute(() -> {
+            future.complete(value);
+        });
+    }
+
     private boolean hasMoreMessages(MessageId lastMessageIdInBroker, MessageId 
messageId, boolean inclusive) {
         if (inclusive && lastMessageIdInBroker.compareTo(messageId) >= 0 &&
                 ((MessageIdImpl) lastMessageIdInBroker).getEntryId() != -1) {

Reply via email to