This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6622ff76f37 [fix][broker] Fix Metadata Event Synchronizer producer 
creation retry so that the producer gets created eventually (#24081)
6622ff76f37 is described below

commit 6622ff76f3723c7728e1c35b6809b5dcb7a03003
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Mon Mar 17 01:43:17 2025 -0700

    [fix][broker] Fix Metadata Event Synchronizer producer creation retry so 
that the producer gets created eventually (#24081)
---
 .../service/PulsarMetadataEventSynchronizer.java      | 14 ++++++++++----
 .../pulsar/broker/service/BrokerServiceTest.java      | 19 +++++++++++++++++++
 2 files changed, 29 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java
index 8b2ebf20053..f49188263a8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.service;
 
 import static 
org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
+import com.google.common.annotations.VisibleForTesting;
 import java.util.Arrays;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -52,13 +53,13 @@ public class PulsarMetadataEventSynchronizer implements 
MetadataEventSynchronize
     protected BrokerService brokerService;
     @Getter
     protected String topicName;
-    protected PulsarClientImpl client;
+    protected volatile PulsarClientImpl client;
     protected volatile Producer<MetadataEvent> producer;
     protected volatile Consumer<MetadataEvent> consumer;
     private final CopyOnWriteArrayList<Function<MetadataEvent, 
CompletableFuture<Void>>>
     listeners = new CopyOnWriteArrayList<>();
 
-    static final AtomicReferenceFieldUpdater<PulsarMetadataEventSynchronizer, 
State> STATE_UPDATER =
+    protected static final 
AtomicReferenceFieldUpdater<PulsarMetadataEventSynchronizer, State> 
STATE_UPDATER =
             
AtomicReferenceFieldUpdater.newUpdater(PulsarMetadataEventSynchronizer.class, 
State.class, "state");
     @Getter
     private volatile State state;
@@ -132,7 +133,7 @@ public class PulsarMetadataEventSynchronizer implements 
MetadataEventSynchronize
         });
     }
 
-    private void startProducer() {
+    protected void startProducer() {
         if (isClosingOrClosed()) {
             log.info("[{}] Skip to start new producer because the synchronizer 
is closed", topicName);
         }
@@ -167,11 +168,16 @@ public class PulsarMetadataEventSynchronizer implements 
MetadataEventSynchronize
                 log.warn("[{}] Failed to create producer ({}), retrying in {} 
s", topicName, ex.getMessage(),
                         waitTimeMs / 1000.0);
                 // BackOff before retrying
-                brokerService.executor().schedule(this::startProducer, 
waitTimeMs, TimeUnit.MILLISECONDS);
+                pulsar.getExecutor().schedule(this::startProducer, waitTimeMs, 
TimeUnit.MILLISECONDS);
                 return null;
             });
     }
 
+    @VisibleForTesting
+    public Producer<MetadataEvent> getProducer() {
+        return producer;
+    }
+
     private void startConsumer() {
         if (isClosingOrClosed()) {
             log.info("[{}] Skip to start new consumer because the synchronizer 
is closed", topicName);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index a786fbc9574..f874b827b12 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -85,6 +85,7 @@ import org.apache.http.impl.client.HttpClientBuilder;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
+import org.apache.pulsar.broker.service.PulsarMetadataEventSynchronizer.State;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient;
@@ -107,6 +108,7 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
 import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.ConnectionPool;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
@@ -2005,5 +2007,22 @@ public class BrokerServiceTest extends BrokerTestBase {
         }
     }
 
+    @Test
+    public void testPulsarMetadataEventSyncProducerCreation() throws Exception 
{
+        final String topicName = "persistent://prop/usw/my-ns/syncTopic";
+        pulsar.getConfiguration().setMetadataSyncEventTopic(topicName);
+        PulsarMetadataEventSynchronizer sync = new 
PulsarMetadataEventSynchronizer(pulsar, topicName);
+        // set invalid client for retry
+        PulsarClientImpl client = (PulsarClientImpl) 
PulsarClient.builder().serviceUrl("http://invalidhost:8080";)
+                .operationTimeout(1000, TimeUnit.MILLISECONDS).build();
+        sync.client = client;
+        sync.STATE_UPDATER.set(sync, State.Starting_Producer);
+        sync.startProducer();
+        assertNull(sync.getProducer());
+        // update valid client which will set the producer
+        sync.client = (PulsarClientImpl) pulsarClient;
+        retryStrategically((test) -> sync.getProducer() != null, 1000, 10);
+        assertNotNull(sync.getProducer());
+    }
 }
 

Reply via email to