This is an automated email from the ASF dual-hosted git repository.

Technoboy- pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f0766708ad [fix][client]Broker-side producer handle leak if closes a 
producer which state is regitering schema (#25725)
4f0766708ad is described below

commit 4f0766708ad4d6ce027938414d6fc8e4d764343f
Author: fengyubiao <[email protected]>
AuthorDate: Sat May 9 14:32:09 2026 +0800

    [fix][client]Broker-side producer handle leak if closes a producer which 
state is regitering schema (#25725)
---
 .../broker/service/OneWayReplicatorTest.java       | 46 +++++++++------
 .../SchemaCompatibilityCheckTest.java              | 68 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ProducerImpl.java    |  2 +-
 3 files changed, 96 insertions(+), 20 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index 8665a8cd53a..c4f5bdeab74 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -312,7 +312,7 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
         int age;
     }
 
-    @Test(dataProvider = "autoUpdateSchemaParams")
+    @Test(dataProvider = "autoUpdateSchemaParams", timeOut = 60_000)
     public void testMultipleVersionSchemas(boolean isAllowAutoUpdateSchema,
                                            Boolean 
allowAutoUpdateSchemaWithReplicator) throws Exception {
         final String ns = BrokerTestUtil.newUniqueName("public/ns");
@@ -331,17 +331,18 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
         RetentionPolicies retentionPolicies = new RetentionPolicies(10, 1);
         admin1.namespaces().setRetention(ns, retentionPolicies);
         admin2.namespaces().setRetention(ns, retentionPolicies);
-        PersistentTopic topic1 = (PersistentTopic) broker1.getTopic(topicName, 
false).join().get();
+        AtomicReference<PersistentTopic> topic1 = new 
AtomicReference<>((PersistentTopic) broker1
+                .getTopic(topicName, false).join().get());
         PersistentTopic topic2 = (PersistentTopic) broker2.getTopic(topicName, 
false).join().get();
         Awaitility.await().untilAsserted(() -> {
-            HierarchyTopicPolicies policies1 = 
topic1.getHierarchyTopicPolicies();
+            HierarchyTopicPolicies policies1 = 
topic1.get().getHierarchyTopicPolicies();
             HierarchyTopicPolicies policies2 = 
topic2.getHierarchyTopicPolicies();
             assertEquals(policies1.getSchemaCompatibilityStrategy().get(),
                     SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE);
             assertEquals(policies2.getSchemaCompatibilityStrategy().get(),
                     SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE);
-            assertTrue(topic1.isAllowAutoUpdateSchema);
-            assertTrue(topic1.isAllowAutoUpdateSchemaWithReplicator);
+            assertTrue(topic1.get().isAllowAutoUpdateSchema);
+            assertTrue(topic1.get().isAllowAutoUpdateSchemaWithReplicator);
             assertEquals(topic2.isAllowAutoUpdateSchema, 
isAllowAutoUpdateSchema);
             assertTrue(topic2.isAllowAutoUpdateSchemaWithReplicator);
             
assertEquals(policies1.getRetentionPolicies().get().getRetentionTimeInMinutes(),
 10);
@@ -405,8 +406,8 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
         admin2.namespaces().setIsAllowAutoUpdateSchemaAsync(ns, 
isAllowAutoUpdateSchema,
                 allowAutoUpdateSchemaWithReplicator);
         Awaitility.await().untilAsserted(() -> {
-            assertTrue(topic1.isAllowAutoUpdateSchema);
-            assertTrue(topic1.isAllowAutoUpdateSchemaWithReplicator);
+            assertTrue(topic1.get().isAllowAutoUpdateSchema);
+            assertTrue(topic1.get().isAllowAutoUpdateSchemaWithReplicator);
             assertEquals(topic2.isAllowAutoUpdateSchema, 
isAllowAutoUpdateSchema);
             if (allowAutoUpdateSchemaWithReplicator != null && 
!allowAutoUpdateSchemaWithReplicator) {
                 assertFalse(topic2.isAllowAutoUpdateSchemaWithReplicator);
@@ -426,8 +427,19 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
             // The message can not be replicated to the remote side.
             TopicStats topicStats = admin1.topics().getStats(topicName);
             
assertEquals(topicStats.getReplication().get(cluster2).getReplicationBacklog(), 
1);
-            producer1.close();
-            return;
+            // Change the policy to allow replicator update schemas.
+            admin2.namespaces().setIsAllowAutoUpdateSchemaAsync(ns, 
isAllowAutoUpdateSchema, true);
+            Awaitility.await().untilAsserted(() -> {
+                assertEquals(topic2.isAllowAutoUpdateSchema, 
isAllowAutoUpdateSchema);
+                assertTrue(topic2.isAllowAutoUpdateSchemaWithReplicator);
+            });
+            // Unload topic. Highlight, please do not remove this line, it is 
in order to test whether the replication
+            // can be recovered from the following case: the internal producer 
of replicator is closed when it's state
+            // is registering schema.
+            admin1.topics().unload(topicName);
+            topic1.set((PersistentTopic) broker1.getTopic(topicName, 
false).join().get());
+            waitReplicatorStarted(topicName);
+            //return;
         }
         Awaitility.await().untilAsserted(() -> {
             TopicStats topicStats = admin1.topics().getStats(topicName);
@@ -464,16 +476,12 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
         assertEquals(msg21.getValue().getAge(), 16);
         consumer2.acknowledge(msg21);
         Message<Customer> msg22 = consumer2.receive(5, TimeUnit.SECONDS);
-        if (allowAutoUpdateSchemaWithReplicator != null && 
!allowAutoUpdateSchemaWithReplicator) {
-            assertNull(msg22);
-        } else {
-            assertNotNull(msg22);
-            byte[] bytesVersion22 = msg22.getSchemaVersion();
-            assertEquals(ByteBuffer.wrap(bytesVersion22).getLong(), 1);
-            assertEquals(msg22.getValue().getName(), "Apache");
-            assertEquals(msg22.getValue().getAge(), 26);
-            consumer2.acknowledge(msg22);
-        }
+        assertNotNull(msg22);
+        byte[] bytesVersion22 = msg22.getSchemaVersion();
+        assertEquals(ByteBuffer.wrap(bytesVersion22).getLong(), 1);
+        assertEquals(msg22.getValue().getName(), "Apache");
+        assertEquals(msg22.getValue().getAge(), 26);
+        consumer2.acknowledge(msg22);
 
         // cleanup.
         consumer1.close();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
index 52c6628d37c..27b03d18e0a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
@@ -29,15 +29,20 @@ import com.google.common.collect.Sets;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 import lombok.CustomLog;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.InjectedClientCnxClientBuilder;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -45,7 +50,11 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
 import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
+import org.apache.pulsar.common.api.proto.CommandGetOrCreateSchemaResponse;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
@@ -57,6 +66,7 @@ import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.schema.MockExternalJsonSchema;
 import org.apache.pulsar.schema.Schemas;
+import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -638,6 +648,64 @@ public class SchemaCompatibilityCheckTest extends 
MockedPulsarServiceBaseTest {
 
     }
 
+    @Test
+    public void testCloseProducerWhenRegisteringNewSchema() throws Exception {
+        final String ns = BrokerTestUtil.newUniqueName(PUBLIC_TENANT + "/ns");
+        final String topic = "persistent://" + BrokerTestUtil.newUniqueName(ns 
+ "/tp");
+        admin.namespaces().createNamespace(ns);
+        admin.namespaces().setSchemaCompatibilityStrategy(ns, 
SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE);
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(ns),
+                    SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE);
+        });
+
+        // Injection: Let the handling response of registering schema delay, 
then we have enough time to close producer
+        // when it's state is registering schema.
+        CountDownLatch handleErrorSignal = new CountDownLatch(1);
+        ClientBuilderImpl clientBuilder = (ClientBuilderImpl) 
PulsarClient.builder().serviceUrl(lookupUrl.toString());
+        PulsarClient injectedReplClient = 
InjectedClientCnxClientBuilder.create(clientBuilder,
+            (conf, eventLoopGroup) -> {
+                return new ClientCnx(InstrumentProvider.NOOP, conf, 
eventLoopGroup) {
+
+                    @Override
+                    protected void 
handleGetOrCreateSchemaResponse(CommandGetOrCreateSchemaResponse response) {
+                        if (response.hasErrorCode()) {
+                            try {
+                                handleErrorSignal.await();
+                            } catch (InterruptedException e) {
+                                // Nothing to do.
+                            }
+                        }
+                        super.handleGetOrCreateSchemaResponse(response);
+                    }
+                };
+            });
+
+        Producer<byte[]> producer = 
injectedReplClient.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(topic).create();
+        // Registers a consumer to avoid client to close idle connections.
+        Consumer consumer = 
injectedReplClient.newConsumer(Schema.AUTO_CONSUME()).subscriptionName("s1")
+                .topic(topic).subscribe();
+        PersistentTopic persistentTopic =
+                (PersistentTopic) pulsar.getBrokerService().getTopic(topic, 
false).join().get();
+        assertEquals(persistentTopic.getProducers().size(), 1);
+        producer.newMessage(Schema.AVRO(Schemas.PersonOne.class)).value(new 
Schemas.PersonOne(1)).send();
+        CompletableFuture<MessageId> send2 = 
producer.newMessage(Schema.AVRO(Schemas.PersonTwo.class))
+                .value(new Schemas.PersonTwo(2, "2")).sendAsync();
+        producer.close();
+        Awaitility.await().untilAsserted(() -> {
+            assertTrue(send2.isDone());
+            assertTrue(send2.isCompletedExceptionally());
+            // Since the producer was closed, the topic should maintain 0 
producers.
+            assertEquals(persistentTopic.getProducers().size(), 0);
+        });
+        handleErrorSignal.countDown();
+
+        // cleanup.
+        consumer.close();
+        injectedReplClient.close();
+        admin.topics().unload(topic);
+    }
+
     @Test
     public void testExternalSchemaTypeCompatibility() throws Exception {
         String namespace = "test-namespace-" + randomName(16);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index cf5ba97f436..ca733da998c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1262,7 +1262,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         closeProducerTasks();
 
         ClientCnx cnx = cnx();
-        if (cnx == null || currentState != State.Ready) {
+        if (cnx == null || (currentState != State.Ready && currentState != 
State.RegisteringSchema)) {
             log.info("Closed Producer (not connected)");
             closeAndClearPendingMessages();
             return CompletableFuture.completedFuture(null);

Reply via email to