rdhabalia commented on code in PR #16425:
URL: https://github.com/apache/pulsar/pull/16425#discussion_r933927622


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java:
##########
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static 
org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.Backoff;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.MetadataEvent;
+import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PulsarMetadataEventSynchronizer implements 
MetadataEventSynchronizer {
+
+    private static final Logger log = 
LoggerFactory.getLogger(PulsarMetadataEventSynchronizer.class);
+    protected PulsarService pulsar;
+    protected BrokerService brokerService;
+    protected String topicName;
+    protected PulsarClientImpl client;
+    protected volatile Producer<MetadataEvent> producer;
+    protected volatile Consumer<MetadataEvent> consumer;
+    private final CopyOnWriteArrayList<Function<MetadataEvent, 
CompletableFuture<Void>>>
+    listeners = new CopyOnWriteArrayList<>();
+
+    private volatile boolean started = false;
+    public static final String SUBSCRIPTION_NAME = "metadata-syncer";
+    private static final int MAX_PRODUCER_PENDING_SIZE = 1000;
+    protected final Backoff backOff = new Backoff(100, TimeUnit.MILLISECONDS, 
1, TimeUnit.MINUTES, 0,
+            TimeUnit.MILLISECONDS);
+
+    public PulsarMetadataEventSynchronizer(PulsarService pulsar, String 
topicName) throws PulsarServerException {
+        this.pulsar = pulsar;
+        this.brokerService = pulsar.getBrokerService();
+        this.topicName = topicName;
+        if (!StringUtils.isNotBlank(topicName)) {
+            log.info("Metadata synchronizer is disabled");
+            return;
+        }
+    }
+
+    public void start() throws PulsarServerException {
+        if (StringUtils.isBlank(topicName)) {
+            log.info("metadata topic doesn't exist.. skipping metadata 
synchronizer init..");
+            return;
+        }
+        this.client = (PulsarClientImpl) pulsar.getClient();
+        startProducer();
+        startConsumer();
+        log.info("Metadata event synchronizer started on topic {}", topicName);
+    }
+
+    @Override
+    public CompletableFuture<Void> notify(MetadataEvent event) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        publishAsync(event, future);
+        return future;
+    }
+
+    @Override
+    public void registerSyncListener(Function<MetadataEvent, 
CompletableFuture<Void>> listener) {
+        listeners.add(listener);
+    }
+
+    @Override
+    public String getClusterName() {
+        return pulsar.getConfig().getClusterName();
+    }
+
+    private void publishAsync(MetadataEvent event, CompletableFuture<Void> 
future) {
+        if (!started) {
+            log.info("Producer is not started on {}, failed to publish {}", 
topicName, event);
+            future.completeExceptionally(new IllegalStateException("producer 
is not started yet"));
+        }
+        producer.newMessage().value(event).sendAsync().thenAccept(__ -> {
+            log.info("successfully published metadata change event {}", event);
+            future.complete(null);
+        }).exceptionally(ex -> {
+            log.warn("failed to publish metadata update {}, will retry in {}", 
topicName, MESSAGE_RATE_BACKOFF_MS, ex);
+            pulsar.getBrokerService().executor().schedule(() -> 
publishAsync(event, future), MESSAGE_RATE_BACKOFF_MS,
+                    TimeUnit.MILLISECONDS);
+            return null;
+        });
+    }
+
+    private void startProducer() {
+        log.info("[{}] Starting producer", topicName);
+        client.newProducer(AvroSchema.of(MetadataEvent.class)).topic(topicName)
+                
.messageRoutingMode(MessageRoutingMode.SinglePartition).enableBatching(false).enableBatching(false)
+                .sendTimeout(0, TimeUnit.SECONDS) //
+                
.maxPendingMessages(MAX_PRODUCER_PENDING_SIZE).createAsync().thenAccept(prod -> 
{
+                    producer = prod;
+                    started = true;
+                    log.info("producer is created successfully {}", topicName);
+                }).exceptionally(ex -> {
+                    long waitTimeMs = backOff.next();
+                    log.warn("[{}] Failed to create producer ({}), retrying in 
{} s", topicName, ex.getMessage(),
+                            waitTimeMs / 1000.0);
+                    // BackOff before retrying
+                    brokerService.executor().schedule(this::startProducer, 
waitTimeMs, TimeUnit.MILLISECONDS);
+                    return null;
+                });
+    }
+
+    private void startConsumer() {
+        if (consumer != null) {
+            return;
+        }
+        ConsumerBuilder<MetadataEvent> consumerBuilder = 
client.newConsumer(AvroSchema.of(MetadataEvent.class))

Review Comment:
   as I said, it won't make much of a difference but I still have made the 
change so, we can progress for this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to