This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 70750751176 [improve][broker] PIP-192 Added operation counters in 
ServiceUnitStateChannelImpl (#19410)
70750751176 is described below

commit 7075075117657135e67ef193947a1590d43309b8
Author: Heesung Sohn <[email protected]>
AuthorDate: Mon Feb 6 01:53:21 2023 -0800

    [improve][broker] PIP-192 Added operation counters in 
ServiceUnitStateChannelImpl (#19410)
---
 .../channel/ServiceUnitStateChannelImpl.java       | 169 +++++++++++++---
 .../channel/ServiceUnitStateChannelTest.java       | 214 +++++++++++++++++++--
 2 files changed, 335 insertions(+), 48 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 37dfe6090bb..3e0931b2d10 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -27,6 +27,9 @@ import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUni
 import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.ChannelState.Constructed;
 import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.ChannelState.LeaderElectionServiceStarted;
 import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.ChannelState.Started;
+import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.EventType.Assign;
+import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.EventType.Split;
+import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.EventType.Unload;
 import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MetadataState.Jittery;
 import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MetadataState.Stable;
 import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MetadataState.Unstable;
@@ -44,6 +47,8 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.mutable.MutableInt;
@@ -63,7 +68,6 @@ import 
org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.metadata.api.NotificationType;
 import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
@@ -108,6 +112,40 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
     private long totalCleanupCancelledCnt = 0;
     private volatile ChannelState channelState;
 
+    enum EventType {
+        Assign,
+        Split,
+        Unload
+    }
+
+    @Getter
+    @AllArgsConstructor
+    static class Counters {
+        private AtomicLong total;
+        private AtomicLong failure;
+    }
+
+    // operation metrics
+    final Map<ServiceUnitState, AtomicLong> ownerLookUpCounters = Map.of(
+            Owned, new AtomicLong(),
+            Assigned, new AtomicLong(),
+            Released, new AtomicLong(),
+            Splitting, new AtomicLong(),
+            Free, new AtomicLong()
+    );
+    final Map<EventType, Counters> eventCounters = Map.of(
+            Assign, new Counters(new AtomicLong(), new AtomicLong()),
+            Split, new Counters(new AtomicLong(), new AtomicLong()),
+            Unload, new Counters(new AtomicLong(), new AtomicLong())
+    );
+    final Map<ServiceUnitState, Counters> handlerCounters = Map.of(
+            Owned, new Counters(new AtomicLong(), new AtomicLong()),
+            Assigned, new Counters(new AtomicLong(), new AtomicLong()),
+            Released, new Counters(new AtomicLong(), new AtomicLong()),
+            Splitting, new Counters(new AtomicLong(), new AtomicLong()),
+            Free, new Counters(new AtomicLong(), new AtomicLong())
+    );
+
     enum ChannelState {
         Closed(0),
         Constructed(1),
@@ -151,7 +189,10 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
     }
 
     public synchronized void start() throws PulsarServerException {
-        validateChannelState(LeaderElectionServiceStarted, false);
+        if (!validateChannelState(LeaderElectionServiceStarted, false)) {
+            throw new IllegalStateException("Invalid channel state:" + 
channelState.name());
+        }
+
         try {
             leaderElectionService.start();
             this.channelState = LeaderElectionServiceStarted;
@@ -230,15 +271,24 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         }
     }
 
-    private void validateChannelState(ChannelState targetState, boolean 
checkLowerIds) {
+    private boolean validateChannelState(ChannelState targetState, boolean 
checkLowerIds) {
         int order = checkLowerIds ? -1 : 1;
         if (Integer.compare(channelState.id, targetState.id) * order > 0) {
-            throw new IllegalStateException("Invalid channel state:" + 
channelState.name());
+            return false;
         }
+        return true;
+    }
+
+    private boolean debug() {
+        return pulsar.getConfiguration().isLoadBalancerDebugModeEnabled() || 
log.isDebugEnabled();
     }
 
     public CompletableFuture<Optional<String>> getChannelOwnerAsync() {
-        validateChannelState(LeaderElectionServiceStarted, true);
+        if (!validateChannelState(LeaderElectionServiceStarted, true)) {
+            return CompletableFuture.failedFuture(
+                    new IllegalStateException("Invalid channel state:" + 
channelState.name()));
+        }
+
         return leaderElectionService.readCurrentLeader().thenApply(leader -> {
                     //expecting http://broker-xyz:port
                     // TODO: discard this protocol prefix removal
@@ -278,27 +328,35 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
     }
 
     public CompletableFuture<Optional<String>> getOwnerAsync(String 
serviceUnit) {
-        validateChannelState(Started, true);
-        ServiceUnitStateData data = tableview.get(serviceUnit);
-        if (data == null) {
-            return CompletableFuture.completedFuture(Optional.empty());
+        if (!validateChannelState(Started, true)) {
+            return CompletableFuture.failedFuture(
+                    new IllegalStateException("Invalid channel state:" + 
channelState.name()));
         }
-        switch (data.state()) {
+
+        ServiceUnitStateData data = tableview.get(serviceUnit);
+        ServiceUnitState state = data == null ? Free : data.state();
+        ownerLookUpCounters.get(state).incrementAndGet();
+        switch (state) {
             case Owned, Splitting -> {
                 return 
CompletableFuture.completedFuture(Optional.of(data.broker()));
             }
             case Assigned, Released -> {
                 return 
deferGetOwnerRequest(serviceUnit).thenApply(Optional::of);
             }
+            case Free -> {
+                return CompletableFuture.completedFuture(Optional.empty());
+            }
             default -> {
                 String errorMsg = String.format("Failed to process service 
unit state data: %s when get owner.", data);
                 log.error(errorMsg);
-                return FutureUtil.failedFuture(new 
IllegalStateException(errorMsg));
+                return CompletableFuture.failedFuture(new 
IllegalStateException(errorMsg));
             }
         }
     }
 
     public CompletableFuture<String> publishAssignEventAsync(String 
serviceUnit, String broker) {
+        EventType eventType = Assign;
+        eventCounters.get(eventType).getTotal().incrementAndGet();
         CompletableFuture<String> getOwnerRequest = 
deferGetOwnerRequest(serviceUnit);
         pubAsync(serviceUnit, new ServiceUnitStateData(Assigned, broker))
                 .whenComplete((__, ex) -> {
@@ -307,42 +365,65 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
                         if (!getOwnerRequest.isCompletedExceptionally()) {
                             getOwnerRequest.completeExceptionally(ex);
                         }
+                        
eventCounters.get(eventType).getFailure().incrementAndGet();
                     }
                 });
-
         return getOwnerRequest;
     }
 
     public CompletableFuture<Void> publishUnloadEventAsync(Unload unload) {
+        EventType eventType = Unload;
+        eventCounters.get(eventType).getTotal().incrementAndGet();
         String serviceUnit = unload.serviceUnit();
+        CompletableFuture<MessageId> future;
         if (isTransferCommand(unload)) {
             ServiceUnitStateData next = new ServiceUnitStateData(Assigned,
                     unload.destBroker().get(), unload.sourceBroker());
-            return pubAsync(serviceUnit, next).thenApply(__ -> null);
+            future = pubAsync(serviceUnit, next);
+        } else {
+            future = tombstoneAsync(serviceUnit);
         }
-        return tombstoneAsync(serviceUnit).thenApply(__ -> null);
+
+        return future.whenComplete((__, ex) -> {
+            if (ex != null) {
+                eventCounters.get(eventType).getFailure().incrementAndGet();
+            }
+        }).thenApply(__ -> null);
     }
 
     public CompletableFuture<Void> publishSplitEventAsync(Split split) {
+        EventType eventType = Split;
+        eventCounters.get(eventType).getTotal().incrementAndGet();
         String serviceUnit = split.serviceUnit();
         ServiceUnitStateData next = new ServiceUnitStateData(Splitting, 
split.sourceBroker());
-        return pubAsync(serviceUnit, next).thenApply(__ -> null);
+        return pubAsync(serviceUnit, next).whenComplete((__, ex) -> {
+            if (ex != null) {
+                eventCounters.get(eventType).getFailure().incrementAndGet();
+            }
+        }).thenApply(__ -> null);
     }
 
     private void handle(String serviceUnit, ServiceUnitStateData data) {
+        long totalHandledRequests = 
getHandlerTotalCounter(data).incrementAndGet();
         if (log.isDebugEnabled()) {
-            log.info("{} received a handle request for serviceUnit:{}, 
data:{}",
-                    lookupServiceAddress, serviceUnit, data);
+            log.info("{} received a handle request for serviceUnit:{}, 
data:{}. totalHandledRequests:{}",
+                    lookupServiceAddress, serviceUnit, data, 
totalHandledRequests);
         }
 
         ServiceUnitState state = data == null ? Free : data.state();
-        switch (state) {
-            case Owned -> handleOwnEvent(serviceUnit, data);
-            case Assigned -> handleAssignEvent(serviceUnit, data);
-            case Released -> handleReleaseEvent(serviceUnit, data);
-            case Splitting -> handleSplitEvent(serviceUnit, data);
-            case Free -> handleFreeEvent(serviceUnit);
-            default -> throw new IllegalStateException("Failed to handle 
channel data:" + data);
+        try {
+            switch (state) {
+                case Owned -> handleOwnEvent(serviceUnit, data);
+                case Assigned -> handleAssignEvent(serviceUnit, data);
+                case Released -> handleReleaseEvent(serviceUnit, data);
+                case Splitting -> handleSplitEvent(serviceUnit, data);
+                case Free -> handleFreeEvent(serviceUnit);
+                default -> throw new IllegalStateException("Failed to handle 
channel data:" + data);
+            }
+        } catch (Throwable e){
+            log.error("Failed to handle the event. serviceUnit:{}, data:{}, 
handlerFailureCount:{}",
+                    serviceUnit, data, 
getHandlerFailureCounter(data).incrementAndGet(), e);
+            throw e;
         }
     }
 
@@ -362,19 +443,46 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
                 isTransferCommand(data) ? "Transfer:" + data.state() : 
data.state().toString();
     }
 
+    private AtomicLong getHandlerTotalCounter(ServiceUnitStateData data) {
+        return getHandlerCounter(data, true);
+    }
+
+    private AtomicLong getHandlerFailureCounter(ServiceUnitStateData data) {
+        return getHandlerCounter(data, false);
+    }
+
+    private AtomicLong getHandlerCounter(ServiceUnitStateData data, boolean 
total) {
+        var state = data == null ? Free : data.state();
+        var counter = total
+                ? handlerCounters.get(state).getTotal() : 
handlerCounters.get(state).getFailure();
+        if (counter == null) {
+            throw new IllegalStateException("Unknown state:" + state);
+        }
+        return counter;
+    }
+
     private void log(Throwable e, String serviceUnit, ServiceUnitStateData 
data, ServiceUnitStateData next) {
         if (e == null) {
             if (log.isDebugEnabled() || isTransferCommand(data)) {
-                log.info("{} handled {} event for serviceUnit:{}, cur:{}, 
next:{}",
+                long handlerTotalCount = getHandlerTotalCounter(data).get();
+                long handlerFailureCount = 
getHandlerFailureCounter(data).get();
+                log.info("{} handled {} event for serviceUnit:{}, cur:{}, 
next:{}, "
+                                + "totalHandledRequests{}, 
totalFailedRequests:{}",
                         lookupServiceAddress, getLogEventTag(data), 
serviceUnit,
                         data == null ? "" : data,
-                        next == null ? "" : next);
+                        next == null ? "" : next,
+                        handlerTotalCount, handlerFailureCount
+                );
             }
         } else {
-            log.error("{} failed to handle {} event for serviceUnit:{}, 
cur:{}, next:{}",
+            long handlerTotalCount = getHandlerTotalCounter(data).get();
+            long handlerFailureCount = 
getHandlerFailureCounter(data).incrementAndGet();
+            log.error("{} failed to handle {} event for serviceUnit:{}, 
cur:{}, next:{}, "
+                            + "totalHandledRequests{}, totalFailedRequests:{}",
                     lookupServiceAddress, getLogEventTag(data), serviceUnit,
                     data == null ? "" : data,
                     next == null ? "" : next,
+                    handlerTotalCount, handlerFailureCount,
                     e);
         }
     }
@@ -387,7 +495,6 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         if (isTargetBroker(data.broker())) {
             log(null, serviceUnit, data, null);
         }
-
     }
 
     private void handleAssignEvent(String serviceUnit, ServiceUnitStateData 
data) {
@@ -401,7 +508,6 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
     }
 
     private void handleReleaseEvent(String serviceUnit, ServiceUnitStateData 
data) {
-
         if (isTargetBroker(data.sourceBroker())) {
             ServiceUnitStateData next = new ServiceUnitStateData(Owned, 
data.broker(), data.sourceBroker());
             // TODO: when close, pass message to clients to connect to the new 
broker
@@ -432,7 +538,10 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
     }
 
     private CompletableFuture<MessageId> pubAsync(String serviceUnit, 
ServiceUnitStateData data) {
-        validateChannelState(Started, true);
+        if (!validateChannelState(Started, true)) {
+            return CompletableFuture.failedFuture(
+                    new IllegalStateException("Invalid channel state:" + 
channelState.name()));
+        }
         CompletableFuture<MessageId> future = new CompletableFuture<>();
         producer.newMessage()
                 .key(serviceUnit)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index 1c0a4f37633..660999365c4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -18,7 +18,14 @@
  */
 package org.apache.pulsar.broker.loadbalance.extensions.channel;
 
+import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Assigned;
+import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Free;
 import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Owned;
+import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Released;
+import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Splitting;
+import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.EventType.Assign;
+import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.EventType.Split;
+import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.EventType.Unload;
 import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MAX_CLEAN_UP_DELAY_TIME_IN_SECS;
 import static 
org.apache.pulsar.metadata.api.extended.SessionEvent.ConnectionLost;
 import static org.apache.pulsar.metadata.api.extended.SessionEvent.Reconnected;
@@ -124,6 +131,8 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
     protected void initTableViews() throws Exception {
         cleanTableView(channel1, bundle);
         cleanTableView(channel2, bundle);
+        cleanOpsCounters(channel1);
+        cleanOpsCounters(channel2);
     }
 
 
@@ -219,38 +228,52 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
     }
 
     private int validateChannelStart(ServiceUnitStateChannelImpl channel)
-            throws ExecutionException, InterruptedException, TimeoutException {
+            throws InterruptedException, TimeoutException {
         int errorCnt = 0;
         try {
             channel.isChannelOwnerAsync().get(2, TimeUnit.SECONDS);
-        } catch (IllegalStateException e) {
-            errorCnt++;
+        } catch (ExecutionException e) {
+            if(e.getCause() instanceof IllegalStateException){
+                errorCnt++;
+            }
         }
         try {
             channel.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get();
-        } catch (IllegalStateException e) {
-            errorCnt++;
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof IllegalStateException) {
+                errorCnt++;
+            }
         }
         try {
-            channel.getOwnerAsync(bundle);
-        } catch (IllegalStateException e) {
-            errorCnt++;
+            channel.getOwnerAsync(bundle).get(2, TimeUnit.SECONDS).get();
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof IllegalStateException) {
+                errorCnt++;
+            }
         }
         try {
-            channel.publishAssignEventAsync(bundle, lookupServiceAddress1);
-        } catch (IllegalStateException e) {
-            errorCnt++;
+            channel.publishAssignEventAsync(bundle, 
lookupServiceAddress1).get(2, TimeUnit.SECONDS);
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof IllegalStateException) {
+                errorCnt++;
+            }
         }
         try {
             channel.publishUnloadEventAsync(
-                    new Unload(lookupServiceAddress1, bundle, 
Optional.of(lookupServiceAddress2)));
-        } catch (IllegalStateException e) {
-            errorCnt++;
+                    new Unload(lookupServiceAddress1, bundle, 
Optional.of(lookupServiceAddress2)))
+                    .get(2, TimeUnit.SECONDS);
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof IllegalStateException) {
+                errorCnt++;
+            }
         }
         try {
-            channel.publishSplitEventAsync(new Split(bundle, 
lookupServiceAddress1, Map.of()));
-        } catch (IllegalStateException e) {
-            errorCnt++;
+            channel.publishSplitEventAsync(new Split(bundle, 
lookupServiceAddress1, Map.of()))
+                    .get(2, TimeUnit.SECONDS);
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof IllegalStateException) {
+                errorCnt++;
+            }
         }
         return errorCnt;
     }
@@ -303,6 +326,11 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         assertEquals(ownerAddr1, ownerAddr2);
         assertEquals(getOwnerRequests1.size(), 0);
         assertEquals(getOwnerRequests2.size(), 0);
+
+        validateHandlerCounters(channel1, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0);
+        validateHandlerCounters(channel2, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0);
+        validateEventCounters(channel1, 1, 0, 0, 0, 0, 0);
+        validateEventCounters(channel2, 1, 0, 0, 0, 0, 0);
     }
 
     @Test(priority = 3)
@@ -346,7 +374,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
 
     @Test(priority = 4)
     public void unloadTest()
-            throws ExecutionException, InterruptedException, TimeoutException {
+            throws ExecutionException, InterruptedException, TimeoutException, 
IllegalAccessException {
 
         var owner1 = channel1.getOwnerAsync(bundle);
         var owner2 = channel2.getOwnerAsync(bundle);
@@ -374,6 +402,11 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         ownerAddr2 = channel2.getOwnerAsync(bundle).get(5, TimeUnit.SECONDS);
         assertEquals(ownerAddr1, ownerAddr2);
         assertEquals(ownerAddr1, Optional.of(lookupServiceAddress2));
+
+        validateHandlerCounters(channel1, 2, 0, 2, 0, 1, 0, 0, 0, 0, 0);
+        validateHandlerCounters(channel2, 2, 0, 2, 0, 1, 0, 0, 0, 0, 0);
+        validateEventCounters(channel1, 1, 0, 0, 0, 1, 0);
+        validateEventCounters(channel2, 0, 0, 0, 0, 0, 0);
     }
 
     @Test(priority = 5)
@@ -464,6 +497,10 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         waitUntilNewOwner(channel2, bundle, null);
 
         // TODO: assert child bundle ownerships in the channels.
+        validateHandlerCounters(channel1, 1, 0, 1, 0, 0, 0, 1, 0, 1, 0);
+        validateHandlerCounters(channel2, 1, 0, 1, 0, 0, 0, 1, 0, 1, 0);
+        validateEventCounters(channel1, 1, 0, 1, 0, 0, 0);
+        validateEventCounters(channel2, 0, 0, 0, 0, 0, 0);
     }
 
     @Test(priority = 7)
@@ -749,6 +786,32 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
                 "inFlightStateWaitingTimeInMillis", 30 * 1000, true);
     }
 
+    @Test(priority = 11)
+    public void ownerLookupCountTests() throws IllegalAccessException {
+
+        overrideTableView(channel1, bundle, null);
+        channel1.getOwnerAsync(bundle);
+
+        overrideTableView(channel1, bundle, new ServiceUnitStateData(Assigned, 
"b1"));
+        channel1.getOwnerAsync(bundle);
+        channel1.getOwnerAsync(bundle);
+
+        overrideTableView(channel1, bundle, new ServiceUnitStateData(Owned, 
"b1"));
+        channel1.getOwnerAsync(bundle);
+        channel1.getOwnerAsync(bundle);
+        channel1.getOwnerAsync(bundle);
+
+        overrideTableView(channel1, bundle, new ServiceUnitStateData(Released, 
"b1"));
+        channel1.getOwnerAsync(bundle);
+        channel1.getOwnerAsync(bundle);
+
+        overrideTableView(channel1, bundle, new 
ServiceUnitStateData(Splitting, "b1"));
+        channel1.getOwnerAsync(bundle);
+
+        validateOwnerLookUpCounters(channel1, 2, 3, 2, 1, 1);
+
+    }
+
 
     // TODO: add the channel recovery test when broker registry is added.
 
@@ -848,6 +911,48 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         cache.remove(serviceUnit);
     }
 
+    private static void overrideTableView(ServiceUnitStateChannel channel, 
String serviceUnit, ServiceUnitStateData val)
+            throws IllegalAccessException {
+        var tv = (TableViewImpl<ServiceUnitStateData>)
+                FieldUtils.readField(channel, "tableview", true);
+        var cache = (ConcurrentMap<String, ServiceUnitStateData>)
+                FieldUtils.readField(tv, "data", true);
+        if(val == null){
+            cache.remove(serviceUnit);
+        } else {
+            cache.put(serviceUnit, val);
+        }
+    }
+
+    private static void cleanOpsCounters(ServiceUnitStateChannel channel)
+            throws IllegalAccessException {
+        var handlerCounters =
+                (Map<ServiceUnitState, ServiceUnitStateChannelImpl.Counters>)
+                        FieldUtils.readDeclaredField(channel, 
"handlerCounters", true);
+
+        for(var val : handlerCounters.values()){
+            val.getFailure().set(0);
+            val.getTotal().set(0);
+        }
+
+        var eventCounters =
+                (Map<ServiceUnitStateChannelImpl.EventType, 
ServiceUnitStateChannelImpl.Counters>)
+                        FieldUtils.readDeclaredField(channel, "eventCounters", 
true);
+
+        for(var val : eventCounters.values()){
+            val.getFailure().set(0);
+            val.getTotal().set(0);
+        }
+
+        var ownerLookUpCounters =
+                (Map<ServiceUnitStateChannelImpl.EventType, AtomicLong>)
+                        FieldUtils.readDeclaredField(channel, 
"ownerLookUpCounters", true);
+
+        for(var val : ownerLookUpCounters.values()){
+            val.set(0);
+        }
+    }
+
     private static long getCleanupMetric(ServiceUnitStateChannel channel, 
String metric)
             throws IllegalAccessException {
         Object var = FieldUtils.readDeclaredField(channel, metric, true);
@@ -857,4 +962,77 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
             return (long) var;
         }
     }
+
+    private static void validateHandlerCounters(ServiceUnitStateChannel 
channel,
+                                                long assignedT, long assignedF,
+                                                long ownedT, long ownedF,
+                                                long releasedT, long releasedF,
+                                                long splittingT, long 
splittingF,
+                                                long freeT, long freeF)
+            throws IllegalAccessException {
+        var handlerCounters =
+                (Map<ServiceUnitState, ServiceUnitStateChannelImpl.Counters>)
+                        FieldUtils.readDeclaredField(channel, 
"handlerCounters", true);
+
+        Awaitility.await()
+                .pollInterval(200, TimeUnit.MILLISECONDS)
+                .atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(() -> { // wait until true
+                    assertEquals(assignedT, 
handlerCounters.get(Assigned).getTotal().get());
+                    assertEquals(assignedF, 
handlerCounters.get(Assigned).getFailure().get());
+                    assertEquals(ownedT, 
handlerCounters.get(Owned).getTotal().get());
+                    assertEquals(ownedF, 
handlerCounters.get(Owned).getFailure().get());
+                    assertEquals(releasedT, 
handlerCounters.get(Released).getTotal().get());
+                    assertEquals(releasedF, 
handlerCounters.get(Released).getFailure().get());
+                    assertEquals(splittingT, 
handlerCounters.get(Splitting).getTotal().get());
+                    assertEquals(splittingF, 
handlerCounters.get(Splitting).getFailure().get());
+                    assertEquals(freeT, 
handlerCounters.get(Free).getTotal().get());
+                    assertEquals(freeF, 
handlerCounters.get(Free).getFailure().get());
+                });
+    }
+
+    private static void validateEventCounters(ServiceUnitStateChannel channel,
+                                              long assignT, long assignF,
+                                              long splitT, long splitF,
+                                              long unloadT, long unloadF)
+            throws IllegalAccessException {
+        var eventCounters =
+                (Map<ServiceUnitStateChannelImpl.EventType, 
ServiceUnitStateChannelImpl.Counters>)
+                        FieldUtils.readDeclaredField(channel, "eventCounters", 
true);
+
+        Awaitility.await()
+                .pollInterval(200, TimeUnit.MILLISECONDS)
+                .atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(() -> { // wait until true
+                    assertEquals(assignT, 
eventCounters.get(Assign).getTotal().get());
+                    assertEquals(assignF, 
eventCounters.get(Assign).getFailure().get());
+                    assertEquals(splitT, 
eventCounters.get(Split).getTotal().get());
+                    assertEquals(splitF, 
eventCounters.get(Split).getFailure().get());
+                    assertEquals(unloadT, 
eventCounters.get(Unload).getTotal().get());
+                    assertEquals(unloadF, 
eventCounters.get(Unload).getFailure().get());
+                });
+    }
+
+    private static void validateOwnerLookUpCounters(ServiceUnitStateChannel 
channel,
+                                                    long assigned,
+                                                    long owned,
+                                                    long released,
+                                                    long splitting,
+                                                    long free)
+            throws IllegalAccessException {
+        var ownerLookUpCounters =
+                (Map<ServiceUnitState, AtomicLong>)
+                        FieldUtils.readDeclaredField(channel, 
"ownerLookUpCounters", true);
+
+        Awaitility.await()
+                .pollInterval(200, TimeUnit.MILLISECONDS)
+                .atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(() -> { // wait until true
+                    assertEquals(assigned, 
ownerLookUpCounters.get(Assigned).get());
+                    assertEquals(owned, ownerLookUpCounters.get(Owned).get());
+                    assertEquals(released, 
ownerLookUpCounters.get(Released).get());
+                    assertEquals(splitting, 
ownerLookUpCounters.get(Splitting).get());
+                    assertEquals(free, ownerLookUpCounters.get(Free).get());
+                });
+    }
 }

Reply via email to