void-ptr974 commented on code in PR #25915:
URL: https://github.com/apache/pulsar/pull/25915#discussion_r3357295252


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java:
##########
@@ -810,7 +856,12 @@ public CompletableFuture<Optional<Long>> 
addProducer(Producer producer,
                             log.warn("Attempting to add producer to a 
terminated topic");
                             throw new TopicTerminatedException("Topic was 
already terminated");
                         }
-                        return internalAddProducer(producer).thenApply(ignore 
-> {
+                        CompletableFuture<Void> internalAppProducerFuture = 
internalAddProducer(producer);
+                        internalAppProducerFuture.thenApply(__ -> 
this.startReplProducers()).exceptionally(ex -> {

Review Comment:
   Would it be possible to narrow this trigger a bit?
   
   This runs for every producer registration, including remote producers and 
additional local producers when the replicators may already be running. For 
persistent topics, startReplProducers() also does an async namespace policy 
lookup, so this adds work to the producer creation path.
   
   Maybe we can check whether this is a non-remote producer and whether 
localProducersEmptyTime was non-null before resetting it. In that case, 
startReplProducers() would only run when a local producer brings the topic back 
from the no-local-producer state.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java:
##########
@@ -665,16 +667,60 @@ protected Consumer getActiveConsumer(Subscription 
subscription) {
         return null;
     }
 
-    protected boolean hasLocalProducers() {
-        if (producers.isEmpty()) {
-            return false;
+    public abstract CompletableFuture<Void> closeReplProducersIfNoBacklog();
+
+    public abstract CompletableFuture<Void> startReplProducers();
+
+    public void disconnectReplicatorIfNoTrafficForLongTime() {
+        updateLocalProducersEmptyTime();
+
+        final Long cachedTime = localProducersEmptyTime;
+        // Still active.
+        if (cachedTime == null) {
+            return;
+        }
+        // Disabled the feature.
+        int threshold = 
brokerService.getPulsar().getConfig().getBrokerReplicationInactiveThresholdSeconds();
+        if (threshold <= 0) {
+            return;
+        }
+        // Check and close replication producers.
+        if (System.currentTimeMillis() - cachedTime > threshold * 1000L) {
+            log.info().attr("brokerReplicationInactiveThresholdSeconds", 
threshold)
+                    .log("Disconnecting replication producers since no 
producer is active for a long time.");
+            closeReplProducersIfNoBacklog().whenCompleteAsync((__, ex) -> {

Review Comment:
   One concern here is the case where a replicated topic only has remote 
producers.
   
   For example, cluster A may keep replicating messages to cluster B, while B 
has no local producers. In that state, Topic GC now considers B's topic active 
because the remote producer is connected, but this idle check can still 
disconnect B's outbound replicator after the threshold. If no local producer 
later connects to B, there may not be a clear event to start that replicator 
again, so the replication cursor could stop advancing and backlog could grow.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to