This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new fe1d9efe6f3 [improve][broker] defer the ownership checks if the owner
is inactive (ExtensibleLoadManager) (#21855)
fe1d9efe6f3 is described below
commit fe1d9efe6f323f153093151032f1d78ef575b655
Author: Heesung Sohn <[email protected]>
AuthorDate: Thu Jan 4 16:57:46 2024 -0800
[improve][broker] defer the ownership checks if the owner is inactive
(ExtensibleLoadManager) (#21855)
---
.../extensions/ExtensibleLoadManagerWrapper.java | 2 -
.../channel/ServiceUnitStateChannelImpl.java | 86 ++++++++++++++--------
.../channel/ServiceUnitStateChannelTest.java | 75 +++++++++++++++++++
.../loadbalance/ExtensibleLoadManagerTest.java | 56 +++++++++++---
4 files changed, 177 insertions(+), 42 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java
index 18e949537de..cd1561cb70e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java
@@ -118,13 +118,11 @@ public class ExtensibleLoadManagerWrapper implements
LoadManager {
@Override
public void writeLoadReportOnZookeeper() throws Exception {
// No-op, this operation is not useful, the load data reporter will
automatically write.
- throw new UnsupportedOperationException();
}
@Override
public void writeResourceQuotasToZooKeeper() throws Exception {
// No-op, this operation is not useful, the load data reporter will
automatically write.
- throw new UnsupportedOperationException();
}
@Override
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index bd7e032a24a..ad6bac1feeb 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -498,6 +498,27 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
return isOwner(serviceUnit, lookupServiceAddress);
}
+ private CompletableFuture<Optional<String>> getActiveOwnerAsync(
+ String serviceUnit,
+ ServiceUnitState state,
+ Optional<String> owner) {
+ CompletableFuture<Optional<String>> activeOwner = owner.isPresent()
+ ? brokerRegistry.lookupAsync(owner.get()).thenApply(lookupData
-> lookupData.flatMap(__ -> owner))
+ : CompletableFuture.completedFuture(Optional.empty());
+
+ return activeOwner
+ .thenCompose(broker -> broker
+ .map(__ -> activeOwner)
+ .orElseGet(() ->
deferGetOwnerRequest(serviceUnit).thenApply(Optional::ofNullable)))
+ .whenComplete((__, e) -> {
+ if (e != null) {
+ log.error("Failed to get active owner broker.
serviceUnit:{}, state:{}, owner:{}",
+ serviceUnit, state, owner, e);
+
ownerLookUpCounters.get(state).getFailure().incrementAndGet();
+ }
+ });
+ }
+
public CompletableFuture<Optional<String>> getOwnerAsync(String
serviceUnit) {
if (!validateChannelState(Started, true)) {
return CompletableFuture.failedFuture(
@@ -509,18 +530,13 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
ownerLookUpCounters.get(state).getTotal().incrementAndGet();
switch (state) {
case Owned -> {
- return
CompletableFuture.completedFuture(Optional.of(data.dstBroker()));
+ return getActiveOwnerAsync(serviceUnit, state,
Optional.of(data.dstBroker()));
}
case Splitting -> {
- return
CompletableFuture.completedFuture(Optional.of(data.sourceBroker()));
+ return getActiveOwnerAsync(serviceUnit, state,
Optional.of(data.sourceBroker()));
}
case Assigning, Releasing -> {
- return deferGetOwnerRequest(serviceUnit).whenComplete((__, e)
-> {
- if (e != null) {
-
ownerLookUpCounters.get(state).getFailure().incrementAndGet();
- }
- }).thenApply(
- broker -> broker == null ? Optional.empty() :
Optional.of(broker));
+ return getActiveOwnerAsync(serviceUnit, state,
Optional.empty());
}
case Init, Free -> {
return CompletableFuture.completedFuture(Optional.empty());
@@ -781,9 +797,14 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
if (getOwnerRequest != null) {
getOwnerRequest.complete(null);
}
- stateChangeListeners.notify(serviceUnit, data, null);
+
if (isTargetBroker(data.sourceBroker())) {
- log(null, serviceUnit, data, null);
+ stateChangeListeners.notifyOnCompletion(
+ data.force() ? closeServiceUnit(serviceUnit)
+ : CompletableFuture.completedFuture(0),
serviceUnit, data)
+ .whenComplete((__, e) -> log(e, serviceUnit, data, null));
+ } else {
+ stateChangeListeners.notify(serviceUnit, data, null);
}
}
@@ -1168,38 +1189,43 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
private ServiceUnitStateData
getOverrideInactiveBrokerStateData(ServiceUnitStateData orphanData,
- String
selectedBroker,
+
Optional<String> selectedBroker,
String
inactiveBroker) {
+
+
+ if (selectedBroker.isEmpty()) {
+ return new ServiceUnitStateData(Free, null, inactiveBroker,
+ true, getNextVersionId(orphanData));
+ }
+
if (orphanData.state() == Splitting) {
- return new ServiceUnitStateData(Splitting, orphanData.dstBroker(),
selectedBroker,
+ return new ServiceUnitStateData(Splitting, orphanData.dstBroker(),
selectedBroker.get(),
Map.copyOf(orphanData.splitServiceUnitToDestBroker()),
true, getNextVersionId(orphanData));
} else {
- return new ServiceUnitStateData(Owned, selectedBroker,
inactiveBroker,
+ return new ServiceUnitStateData(Owned, selectedBroker.get(),
inactiveBroker,
true, getNextVersionId(orphanData));
}
}
private void overrideOwnership(String serviceUnit, ServiceUnitStateData
orphanData, String inactiveBroker) {
Optional<String> selectedBroker = selectBroker(serviceUnit,
inactiveBroker);
- if (selectedBroker.isPresent()) {
- var override = getOverrideInactiveBrokerStateData(
- orphanData, selectedBroker.get(), inactiveBroker);
- log.info("Overriding ownership serviceUnit:{} from orphanData:{}
to overrideData:{}",
- serviceUnit, orphanData, override);
- publishOverrideEventAsync(serviceUnit, orphanData, override)
- .exceptionally(e -> {
- log.error(
- "Failed to override the ownership
serviceUnit:{} orphanData:{}. "
- + "Failed to publish override event.
totalCleanupErrorCnt:{}",
- serviceUnit, orphanData,
totalCleanupErrorCnt.incrementAndGet());
- return null;
- });
- } else {
- log.error("Failed to override the ownership serviceUnit:{}
orphanData:{}. Empty selected broker. "
+ if (selectedBroker.isEmpty()) {
+ log.warn("Empty selected broker for ownership serviceUnit:{}
orphanData:{}."
+ "totalCleanupErrorCnt:{}",
serviceUnit, orphanData,
totalCleanupErrorCnt.incrementAndGet());
}
+ var override = getOverrideInactiveBrokerStateData(orphanData,
selectedBroker, inactiveBroker);
+ log.info("Overriding ownership serviceUnit:{} from orphanData:{} to
overrideData:{}",
+ serviceUnit, orphanData, override);
+ publishOverrideEventAsync(serviceUnit, orphanData, override)
+ .exceptionally(e -> {
+ log.error(
+ "Failed to override the ownership serviceUnit:{}
orphanData:{}. "
+ + "Failed to publish override event.
totalCleanupErrorCnt:{}",
+ serviceUnit, orphanData,
totalCleanupErrorCnt.incrementAndGet());
+ return null;
+ });
}
private void waitForCleanups(String broker, boolean excludeSystemTopics,
int maxWaitTimeInMillis) {
@@ -1301,7 +1327,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
broker,
cleanupTime,
orphanServiceUnitCleanupCnt,
- totalCleanupErrorCntStart - totalCleanupErrorCnt.get(),
+ totalCleanupErrorCnt.get() - totalCleanupErrorCntStart,
printCleanupMetrics());
}
@@ -1490,7 +1516,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
inactiveBrokers, inactiveBrokers.size(),
orphanServiceUnitCleanupCnt,
serviceUnitTombstoneCleanupCnt,
- totalCleanupErrorCntStart - totalCleanupErrorCnt.get(),
+ totalCleanupErrorCnt.get() - totalCleanupErrorCntStart,
printCleanupMetrics());
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index acf87ec7500..1cffc3c626e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -39,6 +39,7 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.expectThrows;
import static org.testng.AssertJUnit.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
@@ -1557,6 +1558,80 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
cleanTableViews();
}
+ @Test(priority = 19)
+ public void testActiveGetOwner() throws Exception {
+
+
+ // set the bundle owner is the broker
+ String broker = lookupServiceAddress2;
+ String bundle = "public/owned/0xfffffff0_0xffffffff";
+ overrideTableViews(bundle,
+ new ServiceUnitStateData(Owned, broker, null, 1));
+ var owner = channel1.getOwnerAsync(bundle).get(5,
TimeUnit.SECONDS).get();
+ assertEquals(owner, broker);
+
+ // simulate the owner is inactive
+ var spyRegistry = spy(new BrokerRegistryImpl(pulsar));
+ doReturn(CompletableFuture.completedFuture(Optional.empty()))
+ .when(spyRegistry).lookupAsync(eq(broker));
+ FieldUtils.writeDeclaredField(channel1,
+ "brokerRegistry", spyRegistry , true);
+ FieldUtils.writeDeclaredField(channel1,
+ "inFlightStateWaitingTimeInMillis", 1000, true);
+
+
+ // verify getOwnerAsync times out because the owner is inactive now.
+ long start = System.currentTimeMillis();
+ var ex = expectThrows(ExecutionException.class, () ->
channel1.getOwnerAsync(bundle).get());
+ assertTrue(ex.getCause() instanceof TimeoutException);
+ assertTrue(System.currentTimeMillis() - start >= 1000);
+
+ // simulate ownership cleanup(no selected owner) by the leader channel
+ doReturn(CompletableFuture.completedFuture(Optional.empty()))
+ .when(loadManager).selectAsync(any(), any());
+ var leaderChannel = channel1;
+ String leader1 = channel1.getChannelOwnerAsync().get(2,
TimeUnit.SECONDS).get();
+ String leader2 = channel2.getChannelOwnerAsync().get(2,
TimeUnit.SECONDS).get();
+ assertEquals(leader1, leader2);
+ if (leader1.equals(lookupServiceAddress2)) {
+ leaderChannel = channel2;
+ }
+ leaderChannel.handleMetadataSessionEvent(SessionReestablished);
+ FieldUtils.writeDeclaredField(leaderChannel,
"lastMetadataSessionEventTimestamp",
+ System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS
* 1000 + 1000), true);
+ leaderChannel.handleBrokerRegistrationEvent(broker,
NotificationType.Deleted);
+
+ // verify the ownership cleanup, and channel's getOwnerAsync returns
empty result without timeout
+ FieldUtils.writeDeclaredField(channel1,
+ "inFlightStateWaitingTimeInMillis", 20 * 1000, true);
+ start = System.currentTimeMillis();
+ assertTrue(channel1.getOwnerAsync(bundle).get().isEmpty());
+ assertTrue(System.currentTimeMillis() - start < 20_000);
+
+ // simulate ownership cleanup(lookupServiceAddress1 selected owner) by
the leader channel
+ overrideTableViews(bundle,
+ new ServiceUnitStateData(Owned, broker, null, 1));
+
doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress1)))
+ .when(loadManager).selectAsync(any(), any());
+ leaderChannel.handleMetadataSessionEvent(SessionReestablished);
+ FieldUtils.writeDeclaredField(leaderChannel,
"lastMetadataSessionEventTimestamp",
+ System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS
* 1000 + 1000), true);
+ getCleanupJobs(leaderChannel).clear();
+ leaderChannel.handleBrokerRegistrationEvent(broker,
NotificationType.Deleted);
+
+ // verify the ownership cleanup, and channel's getOwnerAsync returns
lookupServiceAddress1 without timeout
+ start = System.currentTimeMillis();
+ assertEquals(lookupServiceAddress1,
channel1.getOwnerAsync(bundle).get().get());
+ assertTrue(System.currentTimeMillis() - start < 20_000);
+
+ // test clean-up
+ FieldUtils.writeDeclaredField(channel1,
+ "inFlightStateWaitingTimeInMillis", 30 * 1000, true);
+ FieldUtils.writeDeclaredField(channel1,
+ "brokerRegistry", registry , true);
+ cleanTableViews();
+
+ }
private static ConcurrentHashMap<String,
CompletableFuture<Optional<String>>> getOwnerRequests(
ServiceUnitStateChannel channel) throws IllegalAccessException {
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
index 23abf50bdb0..e262b27fe23 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
@@ -129,6 +129,26 @@ public class ExtensibleLoadManagerTest extends
TestRetrySupport {
brokerContainer.start();
}
});
+ String topicName = "persistent://" + DEFAULT_NAMESPACE +
"/startBrokerCheck";
+ Awaitility.await().atMost(120,
TimeUnit.SECONDS).ignoreExceptions().until(
+ () -> {
+ for (BrokerContainer brokerContainer :
pulsarCluster.getBrokers()) {
+ try (PulsarAdmin admin =
PulsarAdmin.builder().serviceHttpUrl(
+
brokerContainer.getHttpServiceUrl()).build()) {
+ if
(admin.brokers().getActiveBrokers(clusterName).size() != NUM_BROKERS) {
+ return false;
+ }
+ try {
+
admin.topics().createPartitionedTopic(topicName, 10);
+ } catch
(PulsarAdminException.ConflictException e) {
+ // expected
+ }
+
admin.lookups().lookupPartitionedTopic(topicName);
+ }
+ }
+ return true;
+ }
+ );
}
}
@@ -245,7 +265,7 @@ public class ExtensibleLoadManagerTest extends
TestRetrySupport {
assertFalse(admin.namespaces().getNamespaces(DEFAULT_TENANT).contains(namespace));
}
- @Test(timeOut = 40 * 1000)
+ @Test(timeOut = 120 * 1000)
public void testStopBroker() throws Exception {
String topicName = "persistent://" + DEFAULT_NAMESPACE +
"/test-stop-broker-topic";
@@ -261,9 +281,11 @@ public class ExtensibleLoadManagerTest extends
TestRetrySupport {
}
}
- String broker1 = admin.lookups().lookupTopic(topicName);
+ Awaitility.waitAtMost(60,
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
+ String broker1 = admin.lookups().lookupTopic(topicName);
+ assertNotEquals(broker1, broker);
+ });
- assertNotEquals(broker1, broker);
}
@Test(timeOut = 40 * 1000)
@@ -311,7 +333,7 @@ public class ExtensibleLoadManagerTest extends
TestRetrySupport {
parameters1.put("min_limit", "1");
parameters1.put("usage_threshold", "100");
- Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(
+ Awaitility.await().atMost(10,
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(
() -> {
List<String> activeBrokers =
admin.brokers().getActiveBrokers();
assertEquals(activeBrokers.size(), NUM_BROKERS);
@@ -347,6 +369,7 @@ public class ExtensibleLoadManagerTest extends
TestRetrySupport {
}
String broker = admin.lookups().lookupTopic(topic);
+ assertEquals(extractBrokerIndex(broker), 0);
for (BrokerContainer container : pulsarCluster.getBrokers()) {
String name = container.getHostName();
@@ -355,11 +378,17 @@ public class ExtensibleLoadManagerTest extends
TestRetrySupport {
}
}
- assertEquals(extractBrokerIndex(broker), 0);
-
- broker = admin.lookups().lookupTopic(topic);
+ Awaitility.await().atMost(30,
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(
+ () -> {
+ List<String> activeBrokers =
admin.brokers().getActiveBrokers();
+ assertEquals(activeBrokers.size(), 2);
+ }
+ );
- assertEquals(extractBrokerIndex(broker), 1);
+ Awaitility.await().atMost(60,
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
+ String ownerBroker = admin.lookups().lookupTopic(topic);
+ assertEquals(extractBrokerIndex(ownerBroker), 1);
+ });
for (BrokerContainer container : pulsarCluster.getBrokers()) {
String name = container.getHostName();
@@ -367,13 +396,20 @@ public class ExtensibleLoadManagerTest extends
TestRetrySupport {
container.stop();
}
}
+
+ Awaitility.await().atMost(30,
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(
+ () -> {
+ List<String> activeBrokers =
admin.brokers().getActiveBrokers();
+ assertEquals(activeBrokers.size(), 1);
+ }
+ );
+
try {
admin.lookups().lookupTopic(topic);
fail();
} catch (Exception ex) {
log.error("Failed to lookup topic: ", ex);
- assertThat(ex.getMessage()).containsAnyOf("Failed to look up a
broker",
- "Failed to select the new owner broker for bundle");
+ assertThat(ex.getMessage()).contains("Failed to select the new
owner broker for bundle");
}
}