This is an automated email from the ASF dual-hosted git repository.
Technoboy- pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4f0766708ad [fix][client]Broker-side producer handle leak if closes a
producer which state is regitering schema (#25725)
4f0766708ad is described below
commit 4f0766708ad4d6ce027938414d6fc8e4d764343f
Author: fengyubiao <[email protected]>
AuthorDate: Sat May 9 14:32:09 2026 +0800
[fix][client]Broker-side producer handle leak if closes a producer which
state is regitering schema (#25725)
---
.../broker/service/OneWayReplicatorTest.java | 46 +++++++++------
.../SchemaCompatibilityCheckTest.java | 68 ++++++++++++++++++++++
.../apache/pulsar/client/impl/ProducerImpl.java | 2 +-
3 files changed, 96 insertions(+), 20 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index 8665a8cd53a..c4f5bdeab74 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -312,7 +312,7 @@ public class OneWayReplicatorTest extends
OneWayReplicatorTestBase {
int age;
}
- @Test(dataProvider = "autoUpdateSchemaParams")
+ @Test(dataProvider = "autoUpdateSchemaParams", timeOut = 60_000)
public void testMultipleVersionSchemas(boolean isAllowAutoUpdateSchema,
Boolean
allowAutoUpdateSchemaWithReplicator) throws Exception {
final String ns = BrokerTestUtil.newUniqueName("public/ns");
@@ -331,17 +331,18 @@ public class OneWayReplicatorTest extends
OneWayReplicatorTestBase {
RetentionPolicies retentionPolicies = new RetentionPolicies(10, 1);
admin1.namespaces().setRetention(ns, retentionPolicies);
admin2.namespaces().setRetention(ns, retentionPolicies);
- PersistentTopic topic1 = (PersistentTopic) broker1.getTopic(topicName,
false).join().get();
+ AtomicReference<PersistentTopic> topic1 = new
AtomicReference<>((PersistentTopic) broker1
+ .getTopic(topicName, false).join().get());
PersistentTopic topic2 = (PersistentTopic) broker2.getTopic(topicName,
false).join().get();
Awaitility.await().untilAsserted(() -> {
- HierarchyTopicPolicies policies1 =
topic1.getHierarchyTopicPolicies();
+ HierarchyTopicPolicies policies1 =
topic1.get().getHierarchyTopicPolicies();
HierarchyTopicPolicies policies2 =
topic2.getHierarchyTopicPolicies();
assertEquals(policies1.getSchemaCompatibilityStrategy().get(),
SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE);
assertEquals(policies2.getSchemaCompatibilityStrategy().get(),
SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE);
- assertTrue(topic1.isAllowAutoUpdateSchema);
- assertTrue(topic1.isAllowAutoUpdateSchemaWithReplicator);
+ assertTrue(topic1.get().isAllowAutoUpdateSchema);
+ assertTrue(topic1.get().isAllowAutoUpdateSchemaWithReplicator);
assertEquals(topic2.isAllowAutoUpdateSchema,
isAllowAutoUpdateSchema);
assertTrue(topic2.isAllowAutoUpdateSchemaWithReplicator);
assertEquals(policies1.getRetentionPolicies().get().getRetentionTimeInMinutes(),
10);
@@ -405,8 +406,8 @@ public class OneWayReplicatorTest extends
OneWayReplicatorTestBase {
admin2.namespaces().setIsAllowAutoUpdateSchemaAsync(ns,
isAllowAutoUpdateSchema,
allowAutoUpdateSchemaWithReplicator);
Awaitility.await().untilAsserted(() -> {
- assertTrue(topic1.isAllowAutoUpdateSchema);
- assertTrue(topic1.isAllowAutoUpdateSchemaWithReplicator);
+ assertTrue(topic1.get().isAllowAutoUpdateSchema);
+ assertTrue(topic1.get().isAllowAutoUpdateSchemaWithReplicator);
assertEquals(topic2.isAllowAutoUpdateSchema,
isAllowAutoUpdateSchema);
if (allowAutoUpdateSchemaWithReplicator != null &&
!allowAutoUpdateSchemaWithReplicator) {
assertFalse(topic2.isAllowAutoUpdateSchemaWithReplicator);
@@ -426,8 +427,19 @@ public class OneWayReplicatorTest extends
OneWayReplicatorTestBase {
// The message can not be replicated to the remote side.
TopicStats topicStats = admin1.topics().getStats(topicName);
assertEquals(topicStats.getReplication().get(cluster2).getReplicationBacklog(),
1);
- producer1.close();
- return;
+ // Change the policy to allow replicator update schemas.
+ admin2.namespaces().setIsAllowAutoUpdateSchemaAsync(ns,
isAllowAutoUpdateSchema, true);
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(topic2.isAllowAutoUpdateSchema,
isAllowAutoUpdateSchema);
+ assertTrue(topic2.isAllowAutoUpdateSchemaWithReplicator);
+ });
+ // Unload topic. Highlight, please do not remove this line, it is
in order to test whether the replication
+ // can be recovered from the following case: the internal producer
of replicator is closed when it's state
+ // is registering schema.
+ admin1.topics().unload(topicName);
+ topic1.set((PersistentTopic) broker1.getTopic(topicName,
false).join().get());
+ waitReplicatorStarted(topicName);
+ //return;
}
Awaitility.await().untilAsserted(() -> {
TopicStats topicStats = admin1.topics().getStats(topicName);
@@ -464,16 +476,12 @@ public class OneWayReplicatorTest extends
OneWayReplicatorTestBase {
assertEquals(msg21.getValue().getAge(), 16);
consumer2.acknowledge(msg21);
Message<Customer> msg22 = consumer2.receive(5, TimeUnit.SECONDS);
- if (allowAutoUpdateSchemaWithReplicator != null &&
!allowAutoUpdateSchemaWithReplicator) {
- assertNull(msg22);
- } else {
- assertNotNull(msg22);
- byte[] bytesVersion22 = msg22.getSchemaVersion();
- assertEquals(ByteBuffer.wrap(bytesVersion22).getLong(), 1);
- assertEquals(msg22.getValue().getName(), "Apache");
- assertEquals(msg22.getValue().getAge(), 26);
- consumer2.acknowledge(msg22);
- }
+ assertNotNull(msg22);
+ byte[] bytesVersion22 = msg22.getSchemaVersion();
+ assertEquals(ByteBuffer.wrap(bytesVersion22).getLong(), 1);
+ assertEquals(msg22.getValue().getName(), "Apache");
+ assertEquals(msg22.getValue().getAge(), 26);
+ consumer2.acknowledge(msg22);
// cleanup.
consumer1.close();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
index 52c6628d37c..27b03d18e0a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
@@ -29,15 +29,20 @@ import com.google.common.collect.Sets;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import lombok.CustomLog;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.InjectedClientCnxClientBuilder;
import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
@@ -45,7 +50,11 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
+import org.apache.pulsar.common.api.proto.CommandGetOrCreateSchemaResponse;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
@@ -57,6 +66,7 @@ import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.schema.MockExternalJsonSchema;
import org.apache.pulsar.schema.Schemas;
+import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -638,6 +648,64 @@ public class SchemaCompatibilityCheckTest extends
MockedPulsarServiceBaseTest {
}
+ @Test
+ public void testCloseProducerWhenRegisteringNewSchema() throws Exception {
+ final String ns = BrokerTestUtil.newUniqueName(PUBLIC_TENANT + "/ns");
+ final String topic = "persistent://" + BrokerTestUtil.newUniqueName(ns
+ "/tp");
+ admin.namespaces().createNamespace(ns);
+ admin.namespaces().setSchemaCompatibilityStrategy(ns,
SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE);
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(ns),
+ SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE);
+ });
+
+ // Injection: Let the handling response of registering schema delay,
then we have enough time to close producer
+ // when it's state is registering schema.
+ CountDownLatch handleErrorSignal = new CountDownLatch(1);
+ ClientBuilderImpl clientBuilder = (ClientBuilderImpl)
PulsarClient.builder().serviceUrl(lookupUrl.toString());
+ PulsarClient injectedReplClient =
InjectedClientCnxClientBuilder.create(clientBuilder,
+ (conf, eventLoopGroup) -> {
+ return new ClientCnx(InstrumentProvider.NOOP, conf,
eventLoopGroup) {
+
+ @Override
+ protected void
handleGetOrCreateSchemaResponse(CommandGetOrCreateSchemaResponse response) {
+ if (response.hasErrorCode()) {
+ try {
+ handleErrorSignal.await();
+ } catch (InterruptedException e) {
+ // Nothing to do.
+ }
+ }
+ super.handleGetOrCreateSchemaResponse(response);
+ }
+ };
+ });
+
+ Producer<byte[]> producer =
injectedReplClient.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(topic).create();
+ // Registers a consumer to avoid client to close idle connections.
+ Consumer consumer =
injectedReplClient.newConsumer(Schema.AUTO_CONSUME()).subscriptionName("s1")
+ .topic(topic).subscribe();
+ PersistentTopic persistentTopic =
+ (PersistentTopic) pulsar.getBrokerService().getTopic(topic,
false).join().get();
+ assertEquals(persistentTopic.getProducers().size(), 1);
+ producer.newMessage(Schema.AVRO(Schemas.PersonOne.class)).value(new
Schemas.PersonOne(1)).send();
+ CompletableFuture<MessageId> send2 =
producer.newMessage(Schema.AVRO(Schemas.PersonTwo.class))
+ .value(new Schemas.PersonTwo(2, "2")).sendAsync();
+ producer.close();
+ Awaitility.await().untilAsserted(() -> {
+ assertTrue(send2.isDone());
+ assertTrue(send2.isCompletedExceptionally());
+ // Since the producer was closed, the topic should maintain 0
producers.
+ assertEquals(persistentTopic.getProducers().size(), 0);
+ });
+ handleErrorSignal.countDown();
+
+ // cleanup.
+ consumer.close();
+ injectedReplClient.close();
+ admin.topics().unload(topic);
+ }
+
@Test
public void testExternalSchemaTypeCompatibility() throws Exception {
String namespace = "test-namespace-" + randomName(16);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index cf5ba97f436..ca733da998c 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1262,7 +1262,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
closeProducerTasks();
ClientCnx cnx = cnx();
- if (cnx == null || currentState != State.Ready) {
+ if (cnx == null || (currentState != State.Ready && currentState !=
State.RegisteringSchema)) {
log.info("Closed Producer (not connected)");
closeAndClearPendingMessages();
return CompletableFuture.completedFuture(null);