This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 6622ff76f37 [fix][broker] Fix Metadata Event Synchronizer producer creation retry so that the producer gets created eventually (#24081) 6622ff76f37 is described below commit 6622ff76f3723c7728e1c35b6809b5dcb7a03003 Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Mon Mar 17 01:43:17 2025 -0700 [fix][broker] Fix Metadata Event Synchronizer producer creation retry so that the producer gets created eventually (#24081) --- .../service/PulsarMetadataEventSynchronizer.java | 14 ++++++++++---- .../pulsar/broker/service/BrokerServiceTest.java | 19 +++++++++++++++++++ 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java index 8b2ebf20053..f49188263a8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.service; import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS; +import com.google.common.annotations.VisibleForTesting; import java.util.Arrays; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; @@ -52,13 +53,13 @@ public class PulsarMetadataEventSynchronizer implements MetadataEventSynchronize protected BrokerService brokerService; @Getter protected String topicName; - protected PulsarClientImpl client; + protected volatile PulsarClientImpl client; protected volatile Producer<MetadataEvent> producer; protected volatile Consumer<MetadataEvent> consumer; private final CopyOnWriteArrayList<Function<MetadataEvent, CompletableFuture<Void>>> listeners = new CopyOnWriteArrayList<>(); - static final AtomicReferenceFieldUpdater<PulsarMetadataEventSynchronizer, State> STATE_UPDATER = + protected static final AtomicReferenceFieldUpdater<PulsarMetadataEventSynchronizer, State> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(PulsarMetadataEventSynchronizer.class, State.class, "state"); @Getter private volatile State state; @@ -132,7 +133,7 @@ public class PulsarMetadataEventSynchronizer implements MetadataEventSynchronize }); } - private void startProducer() { + protected void startProducer() { if (isClosingOrClosed()) { log.info("[{}] Skip to start new producer because the synchronizer is closed", topicName); } @@ -167,11 +168,16 @@ public class PulsarMetadataEventSynchronizer implements MetadataEventSynchronize log.warn("[{}] Failed to create producer ({}), retrying in {} s", topicName, ex.getMessage(), waitTimeMs / 1000.0); // BackOff before retrying - brokerService.executor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS); + pulsar.getExecutor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS); return null; }); } + @VisibleForTesting + public Producer<MetadataEvent> getProducer() { + return producer; + } + private void startConsumer() { if (isClosingOrClosed()) { log.info("[{}] Skip to start new consumer because the synchronizer is closed", topicName); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index a786fbc9574..f874b827b12 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -85,6 +85,7 @@ import org.apache.http.impl.client.HttpClientBuilder; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException; +import org.apache.pulsar.broker.service.PulsarMetadataEventSynchronizer.State; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient; @@ -107,6 +108,7 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.ClientBuilderImpl; import org.apache.pulsar.client.impl.ClientCnx; import org.apache.pulsar.client.impl.ConnectionPool; +import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.PulsarServiceNameResolver; import org.apache.pulsar.client.impl.auth.AuthenticationTls; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; @@ -2005,5 +2007,22 @@ public class BrokerServiceTest extends BrokerTestBase { } } + @Test + public void testPulsarMetadataEventSyncProducerCreation() throws Exception { + final String topicName = "persistent://prop/usw/my-ns/syncTopic"; + pulsar.getConfiguration().setMetadataSyncEventTopic(topicName); + PulsarMetadataEventSynchronizer sync = new PulsarMetadataEventSynchronizer(pulsar, topicName); + // set invalid client for retry + PulsarClientImpl client = (PulsarClientImpl) PulsarClient.builder().serviceUrl("http://invalidhost:8080") + .operationTimeout(1000, TimeUnit.MILLISECONDS).build(); + sync.client = client; + sync.STATE_UPDATER.set(sync, State.Starting_Producer); + sync.startProducer(); + assertNull(sync.getProducer()); + // update valid client which will set the producer + sync.client = (PulsarClientImpl) pulsarClient; + retryStrategically((test) -> sync.getProducer() != null, 1000, 10); + assertNotNull(sync.getProducer()); + } }