heesung-sn commented on code in PR #19410:
URL: https://github.com/apache/pulsar/pull/19410#discussion_r1096184582


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -279,67 +318,112 @@ private boolean isChannelOwner() {
     public CompletableFuture<String> getOwnerAsync(String serviceUnit) {
         validateChannelState(Started, true);
         ServiceUnitStateData data = tableview.get(serviceUnit);
-        if (data == null) {
-            return CompletableFuture.completedFuture(null);
-        }
-        switch (data.state()) {
+        ServiceUnitState state = data == null ? Free : data.state();
+        ownerLookUpCounters.get(state).incrementAndGet();
+        switch (state) {
             case Owned, Splitting -> {
                 return CompletableFuture.completedFuture(data.broker());
             }
             case Assigned, Released -> {
                 return deferGetOwnerRequest(serviceUnit);
             }
+            case Free -> {
+                return CompletableFuture.completedFuture(null);
+            }
             default -> {
-                return null;
+                throw new IllegalStateException("Invalid service unit state:" 
+ data.state());
             }
         }
     }
 
     public CompletableFuture<String> publishAssignEventAsync(String 
serviceUnit, String broker) {
-        CompletableFuture<String> getOwnerRequest = 
deferGetOwnerRequest(serviceUnit);
-        pubAsync(serviceUnit, new ServiceUnitStateData(Assigned, broker))
-                .whenComplete((__, ex) -> {
-                    if (ex != null) {
-                        getOwnerRequests.remove(serviceUnit, getOwnerRequest);
-                        if (!getOwnerRequest.isCompletedExceptionally()) {
-                            getOwnerRequest.completeExceptionally(ex);
+        EventType eventType = Assign;
+        eventCounters.get(eventType).getTotal().incrementAndGet();
+        try {
+            CompletableFuture<String> getOwnerRequest = 
deferGetOwnerRequest(serviceUnit);
+            pubAsync(serviceUnit, new ServiceUnitStateData(Assigned, broker))
+                    .whenComplete((__, ex) -> {
+                        if (ex != null) {
+                            getOwnerRequests.remove(serviceUnit, 
getOwnerRequest);
+                            if (!getOwnerRequest.isCompletedExceptionally()) {
+                                getOwnerRequest.completeExceptionally(ex);
+                            }
+                            
eventCounters.get(eventType).getFailure().incrementAndGet();
                         }
-                    }
-                });
-
-        return getOwnerRequest;
+                    });
+            return getOwnerRequest;
+        } catch (Throwable e) {
+            log.error("Failed to publish assign event. serviceUnit:{}, 
broker:{}, assignPublishFailureCount:{}",
+                    serviceUnit, broker, 
eventCounters.get(eventType).getFailure().incrementAndGet(), e);
+            throw e;

Review Comment:
   I removed this exception handling by returning an exceptional future from 
`validateChannelState()`. This outer try-catch is no longer required.
   
   



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -359,19 +443,46 @@ private static String getLogEventTag(ServiceUnitStateData 
data) {
                 isTransferCommand(data) ? "Transfer:" + data.state() : 
data.state().toString();
     }
 
+    private AtomicLong getHandlerTotalCounter(ServiceUnitStateData data) {
+        return getHandlerCounter(data, true);
+    }
+
+    private AtomicLong getHandlerFailureCounter(ServiceUnitStateData data) {
+        return getHandlerCounter(data, false);
+    }
+
+    private AtomicLong getHandlerCounter(ServiceUnitStateData data, boolean 
total) {
+        var state = data.state() == null ? Free : data.state();
+        var counter = total
+                ? handlerCounters.get(state).getTotal() : 
handlerCounters.get(state).getFailure();
+        if (counter == null) {
+            throw new IllegalStateException("Unknown state:" + state);
+        }
+        return counter;
+    }
+
     private void log(Throwable e, String serviceUnit, ServiceUnitStateData 
data, ServiceUnitStateData next) {
         if (e == null) {
             if (log.isDebugEnabled() || isTransferCommand(data)) {
-                log.info("{} handled {} event for serviceUnit:{}, cur:{}, 
next:{}",
+                long handlerTotalCount = getHandlerTotalCounter(data).get();
+                long handlerFailureCount = 
getHandlerFailureCounter(data).get();
+                log.info("{} handled {} event for serviceUnit:{}, cur:{}, 
next:{}, "
+                                + "totalHandledRequests{}, 
totalFailedRequests:{}",

Review Comment:
   This is meant for info.  If this is a transfer command, we want to log it at 
the info level.
   
   In other PRs, I will clean the debug logs based on 
isLoadBalancerDebugModeEnabled.
   
   
   private boolean debug() {
           return pulsar.getConfiguration().isLoadBalancerDebugModeEnabled() || 
log.isDebugEnabled();
       }



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to