This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ff4a25e8f78 [fix][client] Fix producer/consumer stop to reconnect or 
Pub/Sub due to IO thread race-condition  (#23499)
ff4a25e8f78 is described below

commit ff4a25e8f7827c10be9614fe064cdd5249379c8f
Author: fengyubiao <[email protected]>
AuthorDate: Wed Oct 23 19:04:34 2024 +0800

    [fix][client] Fix producer/consumer stop to reconnect or Pub/Sub due to IO 
thread race-condition  (#23499)
---
 .../client/impl/SimpleProduceConsumeIoTest.java    | 134 +++++++++++++++++++++
 .../apache/pulsar/client/impl/ConnectionPool.java  |   2 +-
 2 files changed, 135 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SimpleProduceConsumeIoTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SimpleProduceConsumeIoTest.java
new file mode 100644
index 00000000000..4da3ce25733
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SimpleProduceConsumeIoTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.awaitility.Awaitility;
+import org.awaitility.reflect.WhiteboxImpl;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class SimpleProduceConsumeIoTest extends ProducerConsumerBase {
+
+    private PulsarClientImpl singleConnectionPerBrokerClient;
+
+    @BeforeClass(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+        singleConnectionPerBrokerClient = (PulsarClientImpl) 
PulsarClient.builder().connectionsPerBroker(1)
+                .serviceUrl(lookupUrl.toString()).build();
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        if (singleConnectionPerBrokerClient != null) {
+            singleConnectionPerBrokerClient.close();
+        }
+        super.internalCleanup();
+    }
+
+    /**
+     * 1. Create a producer with a pooled connection.
+     * 2. When executing "producer.connectionOpened", the pooled connection 
has been closed due to a network issue.
+     * 3. Verify: the producer can be created successfully.
+     */
+    @Test
+    public void testUnstableNetWorkWhenCreatingProducer() throws Exception {
+        final String topic = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        admin.topics().createNonPartitionedTopic(topic);
+        // Trigger a pooled connection creation.
+        ProducerImpl p = (ProducerImpl) 
singleConnectionPerBrokerClient.newProducer().topic(topic).create();
+        ClientCnx cnx = p.getClientCnx();
+        p.close();
+
+        // 1. Create a new producer with the pooled connection(since there is 
a pooled connection, the new producer
+        // will reuse it).
+        // 2. Trigger a network issue.
+        CountDownLatch countDownLatch = new CountDownLatch(1);
+        // A task for trigger network issue.
+        new Thread(() -> {
+            try {
+                countDownLatch.await();
+                cnx.ctx().close();
+            } catch (Exception ex) {
+            }
+        }).start();
+        // Create a new producer with the pooled connection.
+        AtomicReference<CompletableFuture<Producer<byte[]>>> p2FutureWrap = 
new AtomicReference<>();
+        new Thread(() -> {
+            ProducerBuilder producerBuilder = 
singleConnectionPerBrokerClient.newProducer().topic(topic);
+            ProducerConfigurationData producerConf = 
WhiteboxImpl.getInternalState(producerBuilder, "conf");
+            CompletableFuture<Producer<byte[]>> p2Future = new 
CompletableFuture();
+            p2FutureWrap.set(p2Future);
+            new ProducerImpl<>(singleConnectionPerBrokerClient, 
"public/default/tp1", producerConf, p2Future,
+                    -1, Schema.BYTES, null, Optional.empty()) {
+                @Override
+                public CompletableFuture<Void> connectionOpened(final 
ClientCnx cnx) {
+                    // Mock a network issue, and wait for the issue occurred.
+                    countDownLatch.countDown();
+                    try {
+                        Thread.sleep(1500);
+                    } catch (InterruptedException e) {
+                    }
+                    // Call the real implementation.
+                    return super.connectionOpened(cnx);
+                }
+            };
+        }).start();
+
+        // Verify: the producer can be created successfully.
+        Awaitility.await().untilAsserted(() -> {
+            assertNotNull(p2FutureWrap.get());
+            assertTrue(p2FutureWrap.get().isDone());
+        });
+        // Print log.
+        p2FutureWrap.get().exceptionally(ex -> {
+            log.error("Failed to create producer", ex);
+            return null;
+        });
+        Awaitility.await().untilAsserted(() -> {
+            assertFalse(p2FutureWrap.get().isCompletedExceptionally());
+            assertTrue("Ready".equals(
+                    WhiteboxImpl.getInternalState(p2FutureWrap.get().join(), 
"state").toString()));
+        });
+
+        // Cleanup.
+        p2FutureWrap.get().join().close();
+        admin.topics().delete(topic);
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index a6a809af858..0f49b77b057 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -275,7 +275,7 @@ public class ConnectionPool implements AutoCloseable {
             }
             // Try use exists connection.
             if (clientCnx.getIdleState().tryMarkUsingAndClearIdleTime()) {
-                return CompletableFuture.completedFuture(clientCnx);
+                return CompletableFuture.supplyAsync(() -> clientCnx, 
clientCnx.ctx().executor());
             } else {
                 // If connection already release, create a new one.
                 pool.remove(key, completableFuture);

Reply via email to