void-ptr974 commented on code in PR #25915:
URL: https://github.com/apache/pulsar/pull/25915#discussion_r3367181837


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java:
##########
@@ -665,16 +667,60 @@ protected Consumer getActiveConsumer(Subscription 
subscription) {
         return null;
     }
 
-    protected boolean hasLocalProducers() {
-        if (producers.isEmpty()) {
-            return false;
+    public abstract CompletableFuture<Void> closeReplProducersIfNoBacklog();
+
+    public abstract CompletableFuture<Void> startReplProducers();
+
+    public void disconnectReplicatorIfNoTrafficForLongTime() {
+        updateLocalProducersEmptyTime();
+
+        final Long cachedTime = localProducersEmptyTime;
+        // Still active.
+        if (cachedTime == null) {
+            return;
+        }
+        // Disabled the feature.
+        int threshold = 
brokerService.getPulsar().getConfig().getBrokerReplicationInactiveThresholdSeconds();
+        if (threshold <= 0) {
+            return;
+        }
+        // Check and close replication producers.
+        if (System.currentTimeMillis() - cachedTime > threshold * 1000L) {
+            log.info().attr("brokerReplicationInactiveThresholdSeconds", 
threshold)
+                    .log("Disconnecting replication producers since no 
producer is active for a long time.");
+            closeReplProducersIfNoBacklog().whenCompleteAsync((__, ex) -> {

Review Comment:
   Thanks for clarifying. I see your point, and I agree the previous 
traffic-based approach can cover the case where remote traffic keeps arriving 
continuously.
   
   I think we are mainly discussing which signal should be used as the 
invariant here. The traffic-based signal and the producer-lifecycle signal 
cover slightly different cases:
   
   - If remote messages keep arriving, traffic-based detection works well.
   - If the remote producer connection is still valid but the topic is quiet 
for a long time, traffic-based detection may still treat the topic as idle.
   
   So I think moving toward producer-lifecycle based detection is reasonable 
and closer to the original issue.
   
   One detail I want to double-check is that the PR description says the 
replicator producer is closed after there is “no producer registered” for a 
long time. That sounds like it includes both local and remote producers. The 
current implementation uses “no local producers” as the timer condition. In the 
backup-side scenario, a topic can have no local producer but still have a 
connected remote producer, so the current condition seems a little broader than 
the description.
   
   Would it be cleaner to align the implementation with the PR description?
   
   - Topic GC treats any producer, including remote producers, as active.
   - The inactive-replication timer starts only when there are no producers at 
all.
   - startReplProducers() only runs when the topic transitions from no 
producers to having a producer again.
   
   Concretely, localProducersEmptyTime could become something like 
producersEmptyTime. The timer would be set only when producers.isEmpty(), 
cleared when a producer is added, and startReplProducers() would only run when 
a producer is added after the timer had been set.
   
   This keeps the current producer-lifecycle direction, avoids relying only on 
recent message traffic, and also keeps the idle-replication condition 
consistent with the Topic GC condition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to