gaoran10 commented on code in PR #19410:
URL: https://github.com/apache/pulsar/pull/19410#discussion_r1095416605


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -279,67 +318,112 @@ private boolean isChannelOwner() {
     public CompletableFuture<String> getOwnerAsync(String serviceUnit) {
         validateChannelState(Started, true);
         ServiceUnitStateData data = tableview.get(serviceUnit);
-        if (data == null) {
-            return CompletableFuture.completedFuture(null);
-        }
-        switch (data.state()) {
+        ServiceUnitState state = data == null ? Free : data.state();
+        ownerLookUpCounters.get(state).incrementAndGet();
+        switch (state) {
             case Owned, Splitting -> {
                 return CompletableFuture.completedFuture(data.broker());
             }
             case Assigned, Released -> {
                 return deferGetOwnerRequest(serviceUnit);
             }
+            case Free -> {
+                return CompletableFuture.completedFuture(null);
+            }
             default -> {
-                return null;
+                throw new IllegalStateException("Invalid service unit state:" 
+ data.state());
             }
         }
     }
 
     public CompletableFuture<String> publishAssignEventAsync(String 
serviceUnit, String broker) {
-        CompletableFuture<String> getOwnerRequest = 
deferGetOwnerRequest(serviceUnit);
-        pubAsync(serviceUnit, new ServiceUnitStateData(Assigned, broker))
-                .whenComplete((__, ex) -> {
-                    if (ex != null) {
-                        getOwnerRequests.remove(serviceUnit, getOwnerRequest);
-                        if (!getOwnerRequest.isCompletedExceptionally()) {
-                            getOwnerRequest.completeExceptionally(ex);
+        EventType eventType = Assign;
+        eventCounters.get(eventType).getTotal().incrementAndGet();
+        try {
+            CompletableFuture<String> getOwnerRequest = 
deferGetOwnerRequest(serviceUnit);
+            pubAsync(serviceUnit, new ServiceUnitStateData(Assigned, broker))
+                    .whenComplete((__, ex) -> {
+                        if (ex != null) {
+                            getOwnerRequests.remove(serviceUnit, 
getOwnerRequest);
+                            if (!getOwnerRequest.isCompletedExceptionally()) {
+                                getOwnerRequest.completeExceptionally(ex);
+                            }
+                            
eventCounters.get(eventType).getFailure().incrementAndGet();
                         }
-                    }
-                });
-
-        return getOwnerRequest;
+                    });
+            return getOwnerRequest;
+        } catch (Throwable e) {
+            log.error("Failed to publish assign event. serviceUnit:{}, 
broker:{}, assignPublishFailureCount:{}",
+                    serviceUnit, broker, 
eventCounters.get(eventType).getFailure().incrementAndGet(), e);
+            throw e;

Review Comment:
   It seems that we need to return to an exceptional future. There are some 
similar methods.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to