lhotari commented on code in PR #25915:
URL: https://github.com/apache/pulsar/pull/25915#discussion_r3344005651


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -741,6 +741,10 @@ protected void startInactivityMonitor() {
             int interval = 
pulsar().getConfiguration().getBrokerDeleteInactiveTopicsFrequencySeconds();
             inactivityMonitor.scheduleAtFixedRateNonConcurrently(() -> 
checkGC(), interval, interval,
                     TimeUnit.SECONDS);
+            if 
(pulsar().getConfig().getBrokerReplicationInactiveThresholdSeconds() > 0) {

Review Comment:
   `brokerReplicationInactiveThresholdSeconds` is type of `Integer` in 
`ServiceConfiguration`. Technically it can be set to `null`. Therefore a null 
check could be useful.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java:
##########
@@ -665,16 +667,54 @@ protected Consumer getActiveConsumer(Subscription 
subscription) {
         return null;
     }
 
-    protected boolean hasLocalProducers() {
-        if (producers.isEmpty()) {
-            return false;
+    public abstract CompletableFuture<Void> closeReplProducersIfNoBacklog();
+
+    public abstract CompletableFuture<Void> startReplProducers();
+
+    public void disconnectReplicatorIfNoTrafficForLongTime() {
+        updateLocalProducersEmptyTime();
+
+        final Long cachedTime = localProducersEmptyTime;
+        if (cachedTime == null) {
+            return;
         }
+        int threshold = 
brokerService.getPulsar().getConfig().getBrokerReplicationInactiveThresholdSeconds();
+        if (System.currentTimeMillis() - cachedTime > threshold * 1000L) {

Review Comment:
   since `brokerReplicationInactiveThresholdSeconds` is a dynamic 
configuration, it's possible to change the value dynamically. It would be 
expected that setting it to `0` would disable the check. This condition would 
evaluate to true for all replicators if that would be performed.
   One possible way to address the problem is to make the setting non-dynamic 
with `dynamic = false`.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java:
##########
@@ -598,12 +598,18 @@ public CompletableFuture<Void> close(
         return closeFuture;
     }
 
-    public CompletableFuture<Void> stopReplProducers() {
+    public CompletableFuture<Void> closeReplProducersIfNoBacklog() {

Review Comment:
   Local Claude Code review comment:
   
   > ### `NonPersistentTopic` idle-disconnect uses `terminate()`, which can 
never be restarted
   > 
   > For `PersistentTopic`, `closeReplProducersIfNoBacklog()` calls 
`replicator.disconnect()` → `State.Disconnected`, and `startProducer()` revives 
from `Disconnected` (`AbstractReplicator#startProducer`). So the 
disconnect→reconnect cycle this PR relies on works.
   > 
   > For `NonPersistentTopic`, the renamed `closeReplProducersIfNoBacklog()` 
still calls `replicator.terminate()` → `State.Terminated`. That breaks the 
reconnect path in two ways:
   > 
   > - `startProducer()` is a **no-op** from `Terminated` 
(`AbstractReplicator#startProducer`: *"Skip the producer creation since the 
replicator is terminating"*), so neither the `startReplProducers()` safety net 
nor `addProducer()` can revive it.
   > - `terminate()` does **not** remove the replicator from the `replicators` 
map (only `removeReplicator()` does). `checkReplication()` only recreates 
*missing* replicators (`!replicators.containsKey(cluster)`), so it won't 
recreate the terminated-but-present one either.
   > 
   > Net effect: once a non-persistent replicated topic that has a remote 
producer (but no local producers) goes idle past 
`brokerReplicationInactiveThresholdSeconds`, 
`disconnectReplicatorIfNoTrafficForLongTime()` terminates its outbound 
replicators **permanently** — replication stays broken until the topic is 
unloaded/reloaded, even if a local producer reconnects later. This violates the 
disconnect→reconnect design the PR is built on.
   > 
   > The previous `terminate()` usage was always immediately followed by 
`delete()`, so the permanent state didn't matter; this PR introduces a 
terminate-without-delete path where it does.
   > 
   > The two new tests cover only the persistent topic; the non-persistent path 
is untested.
   > 
   > **Suggestion:** make `NonPersistentTopic.closeReplProducersIfNoBacklog()` 
use reconnectable (`disconnect()`) semantics consistent with `PersistentTopic`, 
or remove+recreate the replicator, and add a non-persistent test for the idle 
disconnect→reconnect cycle.
   > 
   



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -741,6 +741,10 @@ protected void startInactivityMonitor() {
             int interval = 
pulsar().getConfiguration().getBrokerDeleteInactiveTopicsFrequencySeconds();
             inactivityMonitor.scheduleAtFixedRateNonConcurrently(() -> 
checkGC(), interval, interval,
                     TimeUnit.SECONDS);
+            if 
(pulsar().getConfig().getBrokerReplicationInactiveThresholdSeconds() > 0) {

Review Comment:
   Another detail is that the field has `dynamic = true`. The scheduled task 
wouldn't get cancelled and rescheduled with the updated value. Again, setting 
to `dynamic = false` would be a way to address this without adding support for 
dynamic configuration of `brokerReplicationInactiveThresholdSeconds`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to