This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 70750751176 [improve][broker] PIP-192 Added operation counters in
ServiceUnitStateChannelImpl (#19410)
70750751176 is described below
commit 7075075117657135e67ef193947a1590d43309b8
Author: Heesung Sohn <[email protected]>
AuthorDate: Mon Feb 6 01:53:21 2023 -0800
[improve][broker] PIP-192 Added operation counters in
ServiceUnitStateChannelImpl (#19410)
---
.../channel/ServiceUnitStateChannelImpl.java | 169 +++++++++++++---
.../channel/ServiceUnitStateChannelTest.java | 214 +++++++++++++++++++--
2 files changed, 335 insertions(+), 48 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 37dfe6090bb..3e0931b2d10 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -27,6 +27,9 @@ import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUni
import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.ChannelState.Constructed;
import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.ChannelState.LeaderElectionServiceStarted;
import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.ChannelState.Started;
+import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.EventType.Assign;
+import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.EventType.Split;
+import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.EventType.Unload;
import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MetadataState.Jittery;
import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MetadataState.Stable;
import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MetadataState.Unstable;
@@ -44,6 +47,8 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.mutable.MutableInt;
@@ -63,7 +68,6 @@ import
org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
@@ -108,6 +112,40 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
private long totalCleanupCancelledCnt = 0;
private volatile ChannelState channelState;
+ enum EventType {
+ Assign,
+ Split,
+ Unload
+ }
+
+ @Getter
+ @AllArgsConstructor
+ static class Counters {
+ private AtomicLong total;
+ private AtomicLong failure;
+ }
+
+ // operation metrics
+ final Map<ServiceUnitState, AtomicLong> ownerLookUpCounters = Map.of(
+ Owned, new AtomicLong(),
+ Assigned, new AtomicLong(),
+ Released, new AtomicLong(),
+ Splitting, new AtomicLong(),
+ Free, new AtomicLong()
+ );
+ final Map<EventType, Counters> eventCounters = Map.of(
+ Assign, new Counters(new AtomicLong(), new AtomicLong()),
+ Split, new Counters(new AtomicLong(), new AtomicLong()),
+ Unload, new Counters(new AtomicLong(), new AtomicLong())
+ );
+ final Map<ServiceUnitState, Counters> handlerCounters = Map.of(
+ Owned, new Counters(new AtomicLong(), new AtomicLong()),
+ Assigned, new Counters(new AtomicLong(), new AtomicLong()),
+ Released, new Counters(new AtomicLong(), new AtomicLong()),
+ Splitting, new Counters(new AtomicLong(), new AtomicLong()),
+ Free, new Counters(new AtomicLong(), new AtomicLong())
+ );
+
enum ChannelState {
Closed(0),
Constructed(1),
@@ -151,7 +189,10 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
}
public synchronized void start() throws PulsarServerException {
- validateChannelState(LeaderElectionServiceStarted, false);
+ if (!validateChannelState(LeaderElectionServiceStarted, false)) {
+ throw new IllegalStateException("Invalid channel state:" +
channelState.name());
+ }
+
try {
leaderElectionService.start();
this.channelState = LeaderElectionServiceStarted;
@@ -230,15 +271,24 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
}
}
- private void validateChannelState(ChannelState targetState, boolean
checkLowerIds) {
+ private boolean validateChannelState(ChannelState targetState, boolean
checkLowerIds) {
int order = checkLowerIds ? -1 : 1;
if (Integer.compare(channelState.id, targetState.id) * order > 0) {
- throw new IllegalStateException("Invalid channel state:" +
channelState.name());
+ return false;
}
+ return true;
+ }
+
+ private boolean debug() {
+ return pulsar.getConfiguration().isLoadBalancerDebugModeEnabled() ||
log.isDebugEnabled();
}
public CompletableFuture<Optional<String>> getChannelOwnerAsync() {
- validateChannelState(LeaderElectionServiceStarted, true);
+ if (!validateChannelState(LeaderElectionServiceStarted, true)) {
+ return CompletableFuture.failedFuture(
+ new IllegalStateException("Invalid channel state:" +
channelState.name()));
+ }
+
return leaderElectionService.readCurrentLeader().thenApply(leader -> {
//expecting http://broker-xyz:port
// TODO: discard this protocol prefix removal
@@ -278,27 +328,35 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
}
public CompletableFuture<Optional<String>> getOwnerAsync(String
serviceUnit) {
- validateChannelState(Started, true);
- ServiceUnitStateData data = tableview.get(serviceUnit);
- if (data == null) {
- return CompletableFuture.completedFuture(Optional.empty());
+ if (!validateChannelState(Started, true)) {
+ return CompletableFuture.failedFuture(
+ new IllegalStateException("Invalid channel state:" +
channelState.name()));
}
- switch (data.state()) {
+
+ ServiceUnitStateData data = tableview.get(serviceUnit);
+ ServiceUnitState state = data == null ? Free : data.state();
+ ownerLookUpCounters.get(state).incrementAndGet();
+ switch (state) {
case Owned, Splitting -> {
return
CompletableFuture.completedFuture(Optional.of(data.broker()));
}
case Assigned, Released -> {
return
deferGetOwnerRequest(serviceUnit).thenApply(Optional::of);
}
+ case Free -> {
+ return CompletableFuture.completedFuture(Optional.empty());
+ }
default -> {
String errorMsg = String.format("Failed to process service
unit state data: %s when get owner.", data);
log.error(errorMsg);
- return FutureUtil.failedFuture(new
IllegalStateException(errorMsg));
+ return CompletableFuture.failedFuture(new
IllegalStateException(errorMsg));
}
}
}
public CompletableFuture<String> publishAssignEventAsync(String
serviceUnit, String broker) {
+ EventType eventType = Assign;
+ eventCounters.get(eventType).getTotal().incrementAndGet();
CompletableFuture<String> getOwnerRequest =
deferGetOwnerRequest(serviceUnit);
pubAsync(serviceUnit, new ServiceUnitStateData(Assigned, broker))
.whenComplete((__, ex) -> {
@@ -307,42 +365,65 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
if (!getOwnerRequest.isCompletedExceptionally()) {
getOwnerRequest.completeExceptionally(ex);
}
+
eventCounters.get(eventType).getFailure().incrementAndGet();
}
});
-
return getOwnerRequest;
}
public CompletableFuture<Void> publishUnloadEventAsync(Unload unload) {
+ EventType eventType = Unload;
+ eventCounters.get(eventType).getTotal().incrementAndGet();
String serviceUnit = unload.serviceUnit();
+ CompletableFuture<MessageId> future;
if (isTransferCommand(unload)) {
ServiceUnitStateData next = new ServiceUnitStateData(Assigned,
unload.destBroker().get(), unload.sourceBroker());
- return pubAsync(serviceUnit, next).thenApply(__ -> null);
+ future = pubAsync(serviceUnit, next);
+ } else {
+ future = tombstoneAsync(serviceUnit);
}
- return tombstoneAsync(serviceUnit).thenApply(__ -> null);
+
+ return future.whenComplete((__, ex) -> {
+ if (ex != null) {
+ eventCounters.get(eventType).getFailure().incrementAndGet();
+ }
+ }).thenApply(__ -> null);
}
public CompletableFuture<Void> publishSplitEventAsync(Split split) {
+ EventType eventType = Split;
+ eventCounters.get(eventType).getTotal().incrementAndGet();
String serviceUnit = split.serviceUnit();
ServiceUnitStateData next = new ServiceUnitStateData(Splitting,
split.sourceBroker());
- return pubAsync(serviceUnit, next).thenApply(__ -> null);
+ return pubAsync(serviceUnit, next).whenComplete((__, ex) -> {
+ if (ex != null) {
+ eventCounters.get(eventType).getFailure().incrementAndGet();
+ }
+ }).thenApply(__ -> null);
}
private void handle(String serviceUnit, ServiceUnitStateData data) {
+ long totalHandledRequests =
getHandlerTotalCounter(data).incrementAndGet();
if (log.isDebugEnabled()) {
- log.info("{} received a handle request for serviceUnit:{},
data:{}",
- lookupServiceAddress, serviceUnit, data);
+ log.info("{} received a handle request for serviceUnit:{},
data:{}. totalHandledRequests:{}",
+ lookupServiceAddress, serviceUnit, data,
totalHandledRequests);
}
ServiceUnitState state = data == null ? Free : data.state();
- switch (state) {
- case Owned -> handleOwnEvent(serviceUnit, data);
- case Assigned -> handleAssignEvent(serviceUnit, data);
- case Released -> handleReleaseEvent(serviceUnit, data);
- case Splitting -> handleSplitEvent(serviceUnit, data);
- case Free -> handleFreeEvent(serviceUnit);
- default -> throw new IllegalStateException("Failed to handle
channel data:" + data);
+ try {
+ switch (state) {
+ case Owned -> handleOwnEvent(serviceUnit, data);
+ case Assigned -> handleAssignEvent(serviceUnit, data);
+ case Released -> handleReleaseEvent(serviceUnit, data);
+ case Splitting -> handleSplitEvent(serviceUnit, data);
+ case Free -> handleFreeEvent(serviceUnit);
+ default -> throw new IllegalStateException("Failed to handle
channel data:" + data);
+ }
+ } catch (Throwable e){
+ log.error("Failed to handle the event. serviceUnit:{}, data:{},
handlerFailureCount:{}",
+ serviceUnit, data,
getHandlerFailureCounter(data).incrementAndGet(), e);
+ throw e;
}
}
@@ -362,19 +443,46 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
isTransferCommand(data) ? "Transfer:" + data.state() :
data.state().toString();
}
+ private AtomicLong getHandlerTotalCounter(ServiceUnitStateData data) {
+ return getHandlerCounter(data, true);
+ }
+
+ private AtomicLong getHandlerFailureCounter(ServiceUnitStateData data) {
+ return getHandlerCounter(data, false);
+ }
+
+ private AtomicLong getHandlerCounter(ServiceUnitStateData data, boolean
total) {
+ var state = data == null ? Free : data.state();
+ var counter = total
+ ? handlerCounters.get(state).getTotal() :
handlerCounters.get(state).getFailure();
+ if (counter == null) {
+ throw new IllegalStateException("Unknown state:" + state);
+ }
+ return counter;
+ }
+
private void log(Throwable e, String serviceUnit, ServiceUnitStateData
data, ServiceUnitStateData next) {
if (e == null) {
if (log.isDebugEnabled() || isTransferCommand(data)) {
- log.info("{} handled {} event for serviceUnit:{}, cur:{},
next:{}",
+ long handlerTotalCount = getHandlerTotalCounter(data).get();
+ long handlerFailureCount =
getHandlerFailureCounter(data).get();
+ log.info("{} handled {} event for serviceUnit:{}, cur:{},
next:{}, "
+ + "totalHandledRequests{},
totalFailedRequests:{}",
lookupServiceAddress, getLogEventTag(data),
serviceUnit,
data == null ? "" : data,
- next == null ? "" : next);
+ next == null ? "" : next,
+ handlerTotalCount, handlerFailureCount
+ );
}
} else {
- log.error("{} failed to handle {} event for serviceUnit:{},
cur:{}, next:{}",
+ long handlerTotalCount = getHandlerTotalCounter(data).get();
+ long handlerFailureCount =
getHandlerFailureCounter(data).incrementAndGet();
+ log.error("{} failed to handle {} event for serviceUnit:{},
cur:{}, next:{}, "
+ + "totalHandledRequests{}, totalFailedRequests:{}",
lookupServiceAddress, getLogEventTag(data), serviceUnit,
data == null ? "" : data,
next == null ? "" : next,
+ handlerTotalCount, handlerFailureCount,
e);
}
}
@@ -387,7 +495,6 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
if (isTargetBroker(data.broker())) {
log(null, serviceUnit, data, null);
}
-
}
private void handleAssignEvent(String serviceUnit, ServiceUnitStateData
data) {
@@ -401,7 +508,6 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
}
private void handleReleaseEvent(String serviceUnit, ServiceUnitStateData
data) {
-
if (isTargetBroker(data.sourceBroker())) {
ServiceUnitStateData next = new ServiceUnitStateData(Owned,
data.broker(), data.sourceBroker());
// TODO: when close, pass message to clients to connect to the new
broker
@@ -432,7 +538,10 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
}
private CompletableFuture<MessageId> pubAsync(String serviceUnit,
ServiceUnitStateData data) {
- validateChannelState(Started, true);
+ if (!validateChannelState(Started, true)) {
+ return CompletableFuture.failedFuture(
+ new IllegalStateException("Invalid channel state:" +
channelState.name()));
+ }
CompletableFuture<MessageId> future = new CompletableFuture<>();
producer.newMessage()
.key(serviceUnit)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index 1c0a4f37633..660999365c4 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -18,7 +18,14 @@
*/
package org.apache.pulsar.broker.loadbalance.extensions.channel;
+import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Assigned;
+import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Free;
import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Owned;
+import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Released;
+import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Splitting;
+import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.EventType.Assign;
+import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.EventType.Split;
+import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.EventType.Unload;
import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MAX_CLEAN_UP_DELAY_TIME_IN_SECS;
import static
org.apache.pulsar.metadata.api.extended.SessionEvent.ConnectionLost;
import static org.apache.pulsar.metadata.api.extended.SessionEvent.Reconnected;
@@ -124,6 +131,8 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
protected void initTableViews() throws Exception {
cleanTableView(channel1, bundle);
cleanTableView(channel2, bundle);
+ cleanOpsCounters(channel1);
+ cleanOpsCounters(channel2);
}
@@ -219,38 +228,52 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
}
private int validateChannelStart(ServiceUnitStateChannelImpl channel)
- throws ExecutionException, InterruptedException, TimeoutException {
+ throws InterruptedException, TimeoutException {
int errorCnt = 0;
try {
channel.isChannelOwnerAsync().get(2, TimeUnit.SECONDS);
- } catch (IllegalStateException e) {
- errorCnt++;
+ } catch (ExecutionException e) {
+ if(e.getCause() instanceof IllegalStateException){
+ errorCnt++;
+ }
}
try {
channel.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get();
- } catch (IllegalStateException e) {
- errorCnt++;
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof IllegalStateException) {
+ errorCnt++;
+ }
}
try {
- channel.getOwnerAsync(bundle);
- } catch (IllegalStateException e) {
- errorCnt++;
+ channel.getOwnerAsync(bundle).get(2, TimeUnit.SECONDS).get();
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof IllegalStateException) {
+ errorCnt++;
+ }
}
try {
- channel.publishAssignEventAsync(bundle, lookupServiceAddress1);
- } catch (IllegalStateException e) {
- errorCnt++;
+ channel.publishAssignEventAsync(bundle,
lookupServiceAddress1).get(2, TimeUnit.SECONDS);
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof IllegalStateException) {
+ errorCnt++;
+ }
}
try {
channel.publishUnloadEventAsync(
- new Unload(lookupServiceAddress1, bundle,
Optional.of(lookupServiceAddress2)));
- } catch (IllegalStateException e) {
- errorCnt++;
+ new Unload(lookupServiceAddress1, bundle,
Optional.of(lookupServiceAddress2)))
+ .get(2, TimeUnit.SECONDS);
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof IllegalStateException) {
+ errorCnt++;
+ }
}
try {
- channel.publishSplitEventAsync(new Split(bundle,
lookupServiceAddress1, Map.of()));
- } catch (IllegalStateException e) {
- errorCnt++;
+ channel.publishSplitEventAsync(new Split(bundle,
lookupServiceAddress1, Map.of()))
+ .get(2, TimeUnit.SECONDS);
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof IllegalStateException) {
+ errorCnt++;
+ }
}
return errorCnt;
}
@@ -303,6 +326,11 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
assertEquals(ownerAddr1, ownerAddr2);
assertEquals(getOwnerRequests1.size(), 0);
assertEquals(getOwnerRequests2.size(), 0);
+
+ validateHandlerCounters(channel1, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0);
+ validateHandlerCounters(channel2, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0);
+ validateEventCounters(channel1, 1, 0, 0, 0, 0, 0);
+ validateEventCounters(channel2, 1, 0, 0, 0, 0, 0);
}
@Test(priority = 3)
@@ -346,7 +374,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
@Test(priority = 4)
public void unloadTest()
- throws ExecutionException, InterruptedException, TimeoutException {
+ throws ExecutionException, InterruptedException, TimeoutException,
IllegalAccessException {
var owner1 = channel1.getOwnerAsync(bundle);
var owner2 = channel2.getOwnerAsync(bundle);
@@ -374,6 +402,11 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
ownerAddr2 = channel2.getOwnerAsync(bundle).get(5, TimeUnit.SECONDS);
assertEquals(ownerAddr1, ownerAddr2);
assertEquals(ownerAddr1, Optional.of(lookupServiceAddress2));
+
+ validateHandlerCounters(channel1, 2, 0, 2, 0, 1, 0, 0, 0, 0, 0);
+ validateHandlerCounters(channel2, 2, 0, 2, 0, 1, 0, 0, 0, 0, 0);
+ validateEventCounters(channel1, 1, 0, 0, 0, 1, 0);
+ validateEventCounters(channel2, 0, 0, 0, 0, 0, 0);
}
@Test(priority = 5)
@@ -464,6 +497,10 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
waitUntilNewOwner(channel2, bundle, null);
// TODO: assert child bundle ownerships in the channels.
+ validateHandlerCounters(channel1, 1, 0, 1, 0, 0, 0, 1, 0, 1, 0);
+ validateHandlerCounters(channel2, 1, 0, 1, 0, 0, 0, 1, 0, 1, 0);
+ validateEventCounters(channel1, 1, 0, 1, 0, 0, 0);
+ validateEventCounters(channel2, 0, 0, 0, 0, 0, 0);
}
@Test(priority = 7)
@@ -749,6 +786,32 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
"inFlightStateWaitingTimeInMillis", 30 * 1000, true);
}
+ @Test(priority = 11)
+ public void ownerLookupCountTests() throws IllegalAccessException {
+
+ overrideTableView(channel1, bundle, null);
+ channel1.getOwnerAsync(bundle);
+
+ overrideTableView(channel1, bundle, new ServiceUnitStateData(Assigned,
"b1"));
+ channel1.getOwnerAsync(bundle);
+ channel1.getOwnerAsync(bundle);
+
+ overrideTableView(channel1, bundle, new ServiceUnitStateData(Owned,
"b1"));
+ channel1.getOwnerAsync(bundle);
+ channel1.getOwnerAsync(bundle);
+ channel1.getOwnerAsync(bundle);
+
+ overrideTableView(channel1, bundle, new ServiceUnitStateData(Released,
"b1"));
+ channel1.getOwnerAsync(bundle);
+ channel1.getOwnerAsync(bundle);
+
+ overrideTableView(channel1, bundle, new
ServiceUnitStateData(Splitting, "b1"));
+ channel1.getOwnerAsync(bundle);
+
+ validateOwnerLookUpCounters(channel1, 2, 3, 2, 1, 1);
+
+ }
+
// TODO: add the channel recovery test when broker registry is added.
@@ -848,6 +911,48 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
cache.remove(serviceUnit);
}
+ private static void overrideTableView(ServiceUnitStateChannel channel,
String serviceUnit, ServiceUnitStateData val)
+ throws IllegalAccessException {
+ var tv = (TableViewImpl<ServiceUnitStateData>)
+ FieldUtils.readField(channel, "tableview", true);
+ var cache = (ConcurrentMap<String, ServiceUnitStateData>)
+ FieldUtils.readField(tv, "data", true);
+ if(val == null){
+ cache.remove(serviceUnit);
+ } else {
+ cache.put(serviceUnit, val);
+ }
+ }
+
+ private static void cleanOpsCounters(ServiceUnitStateChannel channel)
+ throws IllegalAccessException {
+ var handlerCounters =
+ (Map<ServiceUnitState, ServiceUnitStateChannelImpl.Counters>)
+ FieldUtils.readDeclaredField(channel,
"handlerCounters", true);
+
+ for(var val : handlerCounters.values()){
+ val.getFailure().set(0);
+ val.getTotal().set(0);
+ }
+
+ var eventCounters =
+ (Map<ServiceUnitStateChannelImpl.EventType,
ServiceUnitStateChannelImpl.Counters>)
+ FieldUtils.readDeclaredField(channel, "eventCounters",
true);
+
+ for(var val : eventCounters.values()){
+ val.getFailure().set(0);
+ val.getTotal().set(0);
+ }
+
+ var ownerLookUpCounters =
+ (Map<ServiceUnitStateChannelImpl.EventType, AtomicLong>)
+ FieldUtils.readDeclaredField(channel,
"ownerLookUpCounters", true);
+
+ for(var val : ownerLookUpCounters.values()){
+ val.set(0);
+ }
+ }
+
private static long getCleanupMetric(ServiceUnitStateChannel channel,
String metric)
throws IllegalAccessException {
Object var = FieldUtils.readDeclaredField(channel, metric, true);
@@ -857,4 +962,77 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
return (long) var;
}
}
+
+ private static void validateHandlerCounters(ServiceUnitStateChannel
channel,
+ long assignedT, long assignedF,
+ long ownedT, long ownedF,
+ long releasedT, long releasedF,
+ long splittingT, long
splittingF,
+ long freeT, long freeF)
+ throws IllegalAccessException {
+ var handlerCounters =
+ (Map<ServiceUnitState, ServiceUnitStateChannelImpl.Counters>)
+ FieldUtils.readDeclaredField(channel,
"handlerCounters", true);
+
+ Awaitility.await()
+ .pollInterval(200, TimeUnit.MILLISECONDS)
+ .atMost(10, TimeUnit.SECONDS)
+ .untilAsserted(() -> { // wait until true
+ assertEquals(assignedT,
handlerCounters.get(Assigned).getTotal().get());
+ assertEquals(assignedF,
handlerCounters.get(Assigned).getFailure().get());
+ assertEquals(ownedT,
handlerCounters.get(Owned).getTotal().get());
+ assertEquals(ownedF,
handlerCounters.get(Owned).getFailure().get());
+ assertEquals(releasedT,
handlerCounters.get(Released).getTotal().get());
+ assertEquals(releasedF,
handlerCounters.get(Released).getFailure().get());
+ assertEquals(splittingT,
handlerCounters.get(Splitting).getTotal().get());
+ assertEquals(splittingF,
handlerCounters.get(Splitting).getFailure().get());
+ assertEquals(freeT,
handlerCounters.get(Free).getTotal().get());
+ assertEquals(freeF,
handlerCounters.get(Free).getFailure().get());
+ });
+ }
+
+ private static void validateEventCounters(ServiceUnitStateChannel channel,
+ long assignT, long assignF,
+ long splitT, long splitF,
+ long unloadT, long unloadF)
+ throws IllegalAccessException {
+ var eventCounters =
+ (Map<ServiceUnitStateChannelImpl.EventType,
ServiceUnitStateChannelImpl.Counters>)
+ FieldUtils.readDeclaredField(channel, "eventCounters",
true);
+
+ Awaitility.await()
+ .pollInterval(200, TimeUnit.MILLISECONDS)
+ .atMost(10, TimeUnit.SECONDS)
+ .untilAsserted(() -> { // wait until true
+ assertEquals(assignT,
eventCounters.get(Assign).getTotal().get());
+ assertEquals(assignF,
eventCounters.get(Assign).getFailure().get());
+ assertEquals(splitT,
eventCounters.get(Split).getTotal().get());
+ assertEquals(splitF,
eventCounters.get(Split).getFailure().get());
+ assertEquals(unloadT,
eventCounters.get(Unload).getTotal().get());
+ assertEquals(unloadF,
eventCounters.get(Unload).getFailure().get());
+ });
+ }
+
+ private static void validateOwnerLookUpCounters(ServiceUnitStateChannel
channel,
+ long assigned,
+ long owned,
+ long released,
+ long splitting,
+ long free)
+ throws IllegalAccessException {
+ var ownerLookUpCounters =
+ (Map<ServiceUnitState, AtomicLong>)
+ FieldUtils.readDeclaredField(channel,
"ownerLookUpCounters", true);
+
+ Awaitility.await()
+ .pollInterval(200, TimeUnit.MILLISECONDS)
+ .atMost(10, TimeUnit.SECONDS)
+ .untilAsserted(() -> { // wait until true
+ assertEquals(assigned,
ownerLookUpCounters.get(Assigned).get());
+ assertEquals(owned, ownerLookUpCounters.get(Owned).get());
+ assertEquals(released,
ownerLookUpCounters.get(Released).get());
+ assertEquals(splitting,
ownerLookUpCounters.get(Splitting).get());
+ assertEquals(free, ownerLookUpCounters.get(Free).get());
+ });
+ }
}