gaoran10 commented on code in PR #19410:
URL: https://github.com/apache/pulsar/pull/19410#discussion_r1095329141


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -279,67 +318,112 @@ private boolean isChannelOwner() {
     public CompletableFuture<String> getOwnerAsync(String serviceUnit) {
         validateChannelState(Started, true);
         ServiceUnitStateData data = tableview.get(serviceUnit);
-        if (data == null) {
-            return CompletableFuture.completedFuture(null);
-        }
-        switch (data.state()) {
+        ServiceUnitState state = data == null ? Free : data.state();
+        ownerLookUpCounters.get(state).incrementAndGet();
+        switch (state) {
             case Owned, Splitting -> {
                 return CompletableFuture.completedFuture(data.broker());
             }
             case Assigned, Released -> {
                 return deferGetOwnerRequest(serviceUnit);
             }
+            case Free -> {
+                return CompletableFuture.completedFuture(null);
+            }
             default -> {
-                return null;
+                throw new IllegalStateException("Invalid service unit state:" 
+ data.state());
             }
         }
     }
 
     public CompletableFuture<String> publishAssignEventAsync(String 
serviceUnit, String broker) {
-        CompletableFuture<String> getOwnerRequest = 
deferGetOwnerRequest(serviceUnit);
-        pubAsync(serviceUnit, new ServiceUnitStateData(Assigned, broker))
-                .whenComplete((__, ex) -> {
-                    if (ex != null) {
-                        getOwnerRequests.remove(serviceUnit, getOwnerRequest);
-                        if (!getOwnerRequest.isCompletedExceptionally()) {
-                            getOwnerRequest.completeExceptionally(ex);
+        EventType eventType = Assign;
+        eventCounters.get(eventType).getTotal().incrementAndGet();
+        try {
+            CompletableFuture<String> getOwnerRequest = 
deferGetOwnerRequest(serviceUnit);
+            pubAsync(serviceUnit, new ServiceUnitStateData(Assigned, broker))
+                    .whenComplete((__, ex) -> {
+                        if (ex != null) {
+                            getOwnerRequests.remove(serviceUnit, 
getOwnerRequest);
+                            if (!getOwnerRequest.isCompletedExceptionally()) {
+                                getOwnerRequest.completeExceptionally(ex);
+                            }
+                            
eventCounters.get(eventType).getFailure().incrementAndGet();
                         }
-                    }
-                });
-
-        return getOwnerRequest;
+                    });
+            return getOwnerRequest;
+        } catch (Throwable e) {
+            log.error("Failed to publish assign event. serviceUnit:{}, 
broker:{}, assignPublishFailureCount:{}",
+                    serviceUnit, broker, 
eventCounters.get(eventType).getFailure().incrementAndGet(), e);
+            throw e;
+        }
     }
 
     public CompletableFuture<Void> publishUnloadEventAsync(Unload unload) {
-        String serviceUnit = unload.serviceUnit();
-        if (isTransferCommand(unload)) {
-            ServiceUnitStateData next = new ServiceUnitStateData(Assigned,
-                    unload.destBroker().get(), unload.sourceBroker());
-            return pubAsync(serviceUnit, next).thenApply(__ -> null);
+        EventType eventType = Unload;
+        eventCounters.get(eventType).getTotal().incrementAndGet();
+        try {
+            String serviceUnit = unload.serviceUnit();
+            CompletableFuture<MessageId> future;
+            if (isTransferCommand(unload)) {
+                ServiceUnitStateData next = new ServiceUnitStateData(Assigned,
+                        unload.destBroker().get(), unload.sourceBroker());
+                future = pubAsync(serviceUnit, next);
+            } else {
+                future = tombstoneAsync(serviceUnit);
+            }
+
+            return future.whenComplete((__, ex) -> {
+                if (ex != null) {
+                    
eventCounters.get(eventType).getFailure().incrementAndGet();
+                }
+            }).thenApply(__ -> null);
+        } catch (Throwable e) {
+            log.error("Failed to publish unload event. unload:{}. 
unloadPublishFailureCount:{}",
+                    unload, 
eventCounters.get(eventType).getFailure().incrementAndGet(), e);
+            throw e;
         }
-        return tombstoneAsync(serviceUnit).thenApply(__ -> null);
     }
 
     public CompletableFuture<Void> publishSplitEventAsync(Split split) {
-        String serviceUnit = split.serviceUnit();
-        ServiceUnitStateData next = new ServiceUnitStateData(Splitting, 
split.sourceBroker());
-        return pubAsync(serviceUnit, next).thenApply(__ -> null);
+        EventType eventType = Split;
+        eventCounters.get(eventType).getTotal().incrementAndGet();
+        try {
+            String serviceUnit = split.serviceUnit();
+            ServiceUnitStateData next = new ServiceUnitStateData(Splitting, 
split.sourceBroker());
+            return pubAsync(serviceUnit, next).whenComplete((__, ex) -> {
+                if (ex != null) {
+                    
eventCounters.get(eventType).getFailure().incrementAndGet();
+                }
+            }).thenApply(__ -> null);
+        } catch (Throwable e) {
+            log.error("Failed to publish split event. split:{}, 
splitPublishFailureCount:{}",
+                    split, 
eventCounters.get(eventType).getFailure().incrementAndGet(), e);
+            throw e;
+        }
     }
 
     private void handle(String serviceUnit, ServiceUnitStateData data) {
+        long totalHandledRequests = 
getHandlerTotalCounter(data).incrementAndGet();

Review Comment:
   Do we need to calculate all events for all brokers? Or record event metrics 
only for the target broker.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to