heesung-sn commented on code in PR #23349:
URL: https://github.com/apache/pulsar/pull/23349#discussion_r1775968086


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -1381,8 +1400,10 @@ private void overrideOwnership(String serviceUnit, 
ServiceUnitStateData orphanDa
 
     private void waitForCleanups(String broker, boolean excludeSystemTopics, 
int maxWaitTimeInMillis) {
         long started = System.currentTimeMillis();

Review Comment:
   I think we can skip this waiting logic if the broker to cleanup is the last 
broker. 
   
   I think this waiting logic is optional, and the intention is to give some 
time to the cluster to converge cleaned ownership view, if ever. 
   
   However, because of the nature of the current ownership changes, there might 
be a chance that the ownership view might not converge within the timeout -- 
eventually the leader should fix any orphan or too long inflight states, 
though).
   
   If there is no broker waiting, we can just skip this waiting.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -141,6 +141,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
     private volatile long lastOwnEventHandledAt = 0;
     private long lastOwnedServiceUnitCountAt = 0;
     private int totalOwnedServiceUnitCnt = 0;
+    private volatile boolean disablePubOwnedEvent = false;

Review Comment:
   Can we add this case in the ChannelState enum below? I think a better name 
could be "CleaningOwnerships"



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -307,7 +309,7 @@ public synchronized void start() throws 
PulsarServerException {
                             
pulsar.getConfiguration().getDefaultNumberOfNamespaceBundles());
 
             tableview = createServiceUnitStateTableView();
-            tableview.start(pulsar, this::handleEvent, this::handleExisting);
+            tableview.start(pulsar, this::handleEvent, this::handleExisting, 
this::handleSkippedEvent);

Review Comment:
   I am actually not sure if this skip logic is required as this can return 
deferred lookups too soon.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java:
##########
@@ -182,6 +182,7 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
     private SplitManager splitManager;
 
     volatile boolean started = false;

Review Comment:
   Can we introduce an enum state?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -774,7 +774,11 @@ brokerId, getLogEventTag(data), serviceUnit,
         }
     }
 
-    private void handleSkippedEvent(String serviceUnit) {
+    private void handleSkippedEvent(String serviceUnit, ServiceUnitStateData 
skippedData) {
+        if (skippedData.state() == Free) {

Review Comment:
   This is risky as the skipped message impacts the current states.
   
   The channel design principle is that only non-skipped messages should impact 
the states in this channel.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to