This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new ffa6871a3be [improve][broker] defer the ownership checks if the owner
is inactive (ExtensibleLoadManager) (#21811)
ffa6871a3be is described below
commit ffa6871a3be31a5a50e874eec0e150080b42a3a8
Author: Heesung Sohn <[email protected]>
AuthorDate: Sat Dec 30 23:05:29 2023 -0800
[improve][broker] defer the ownership checks if the owner is inactive
(ExtensibleLoadManager) (#21811)
---
.../extensions/ExtensibleLoadManagerWrapper.java | 2 -
.../channel/ServiceUnitStateChannelImpl.java | 86 ++++++++++++++--------
.../channel/ServiceUnitStateChannelTest.java | 75 +++++++++++++++++++
.../loadbalance/ExtensibleLoadManagerTest.java | 39 +++++++---
4 files changed, 161 insertions(+), 41 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java
index 18e949537de..cd1561cb70e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java
@@ -118,13 +118,11 @@ public class ExtensibleLoadManagerWrapper implements
LoadManager {
@Override
public void writeLoadReportOnZookeeper() throws Exception {
// No-op, this operation is not useful, the load data reporter will
automatically write.
- throw new UnsupportedOperationException();
}
@Override
public void writeResourceQuotasToZooKeeper() throws Exception {
// No-op, this operation is not useful, the load data reporter will
automatically write.
- throw new UnsupportedOperationException();
}
@Override
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 713d98b7250..bd571284346 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -487,6 +487,27 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
return isOwner(serviceUnit, lookupServiceAddress);
}
+ private CompletableFuture<Optional<String>> getActiveOwnerAsync(
+ String serviceUnit,
+ ServiceUnitState state,
+ Optional<String> owner) {
+ CompletableFuture<Optional<String>> activeOwner = owner.isPresent()
+ ? brokerRegistry.lookupAsync(owner.get()).thenApply(lookupData
-> lookupData.flatMap(__ -> owner))
+ : CompletableFuture.completedFuture(Optional.empty());
+
+ return activeOwner
+ .thenCompose(broker -> broker
+ .map(__ -> activeOwner)
+ .orElseGet(() ->
deferGetOwnerRequest(serviceUnit).thenApply(Optional::ofNullable)))
+ .whenComplete((__, e) -> {
+ if (e != null) {
+ log.error("Failed to get active owner broker.
serviceUnit:{}, state:{}, owner:{}",
+ serviceUnit, state, owner, e);
+
ownerLookUpCounters.get(state).getFailure().incrementAndGet();
+ }
+ });
+ }
+
public CompletableFuture<Optional<String>> getOwnerAsync(String
serviceUnit) {
if (!validateChannelState(Started, true)) {
return CompletableFuture.failedFuture(
@@ -498,18 +519,13 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
ownerLookUpCounters.get(state).getTotal().incrementAndGet();
switch (state) {
case Owned -> {
- return
CompletableFuture.completedFuture(Optional.of(data.dstBroker()));
+ return getActiveOwnerAsync(serviceUnit, state,
Optional.of(data.dstBroker()));
}
case Splitting -> {
- return
CompletableFuture.completedFuture(Optional.of(data.sourceBroker()));
+ return getActiveOwnerAsync(serviceUnit, state,
Optional.of(data.sourceBroker()));
}
case Assigning, Releasing -> {
- return deferGetOwnerRequest(serviceUnit).whenComplete((__, e)
-> {
- if (e != null) {
-
ownerLookUpCounters.get(state).getFailure().incrementAndGet();
- }
- }).thenApply(
- broker -> broker == null ? Optional.empty() :
Optional.of(broker));
+ return getActiveOwnerAsync(serviceUnit, state,
Optional.empty());
}
case Init, Free -> {
return CompletableFuture.completedFuture(Optional.empty());
@@ -812,9 +828,14 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
if (getOwnerRequest != null) {
getOwnerRequest.complete(null);
}
- stateChangeListeners.notify(serviceUnit, data, null);
+
if (isTargetBroker(data.sourceBroker())) {
- log(null, serviceUnit, data, null);
+ stateChangeListeners.notifyOnCompletion(
+ data.force() ? closeServiceUnit(serviceUnit, true)
+ : CompletableFuture.completedFuture(0),
serviceUnit, data)
+ .whenComplete((__, e) -> log(e, serviceUnit, data, null));
+ } else {
+ stateChangeListeners.notify(serviceUnit, data, null);
}
}
@@ -1202,38 +1223,43 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
private ServiceUnitStateData
getOverrideInactiveBrokerStateData(ServiceUnitStateData orphanData,
- String
selectedBroker,
+
Optional<String> selectedBroker,
String
inactiveBroker) {
+
+
+ if (selectedBroker.isEmpty()) {
+ return new ServiceUnitStateData(Free, null, inactiveBroker,
+ true, getNextVersionId(orphanData));
+ }
+
if (orphanData.state() == Splitting) {
- return new ServiceUnitStateData(Splitting, orphanData.dstBroker(),
selectedBroker,
+ return new ServiceUnitStateData(Splitting, orphanData.dstBroker(),
selectedBroker.get(),
Map.copyOf(orphanData.splitServiceUnitToDestBroker()),
true, getNextVersionId(orphanData));
} else {
- return new ServiceUnitStateData(Owned, selectedBroker,
inactiveBroker,
+ return new ServiceUnitStateData(Owned, selectedBroker.get(),
inactiveBroker,
true, getNextVersionId(orphanData));
}
}
private void overrideOwnership(String serviceUnit, ServiceUnitStateData
orphanData, String inactiveBroker) {
Optional<String> selectedBroker = selectBroker(serviceUnit,
inactiveBroker);
- if (selectedBroker.isPresent()) {
- var override = getOverrideInactiveBrokerStateData(
- orphanData, selectedBroker.get(), inactiveBroker);
- log.info("Overriding ownership serviceUnit:{} from orphanData:{}
to overrideData:{}",
- serviceUnit, orphanData, override);
- publishOverrideEventAsync(serviceUnit, orphanData, override)
- .exceptionally(e -> {
- log.error(
- "Failed to override the ownership
serviceUnit:{} orphanData:{}. "
- + "Failed to publish override event.
totalCleanupErrorCnt:{}",
- serviceUnit, orphanData,
totalCleanupErrorCnt.incrementAndGet());
- return null;
- });
- } else {
- log.error("Failed to override the ownership serviceUnit:{}
orphanData:{}. Empty selected broker. "
+ if (selectedBroker.isEmpty()) {
+ log.warn("Empty selected broker for ownership serviceUnit:{}
orphanData:{}."
+ "totalCleanupErrorCnt:{}",
serviceUnit, orphanData,
totalCleanupErrorCnt.incrementAndGet());
}
+ var override = getOverrideInactiveBrokerStateData(orphanData,
selectedBroker, inactiveBroker);
+ log.info("Overriding ownership serviceUnit:{} from orphanData:{} to
overrideData:{}",
+ serviceUnit, orphanData, override);
+ publishOverrideEventAsync(serviceUnit, orphanData, override)
+ .exceptionally(e -> {
+ log.error(
+ "Failed to override the ownership serviceUnit:{}
orphanData:{}. "
+ + "Failed to publish override event.
totalCleanupErrorCnt:{}",
+ serviceUnit, orphanData,
totalCleanupErrorCnt.incrementAndGet());
+ return null;
+ });
}
private void waitForCleanups(String broker, boolean excludeSystemTopics,
int maxWaitTimeInMillis) {
@@ -1335,7 +1361,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
broker,
cleanupTime,
orphanServiceUnitCleanupCnt,
- totalCleanupErrorCntStart - totalCleanupErrorCnt.get(),
+ totalCleanupErrorCnt.get() - totalCleanupErrorCntStart,
printCleanupMetrics());
}
@@ -1524,7 +1550,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
inactiveBrokers, inactiveBrokers.size(),
orphanServiceUnitCleanupCnt,
serviceUnitTombstoneCleanupCnt,
- totalCleanupErrorCntStart - totalCleanupErrorCnt.get(),
+ totalCleanupErrorCnt.get() - totalCleanupErrorCntStart,
printCleanupMetrics());
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index f99a167ff48..7bd12d66704 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -39,6 +39,7 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.expectThrows;
import static org.testng.AssertJUnit.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
@@ -1560,6 +1561,80 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
cleanTableViews();
}
+ @Test(priority = 19)
+ public void testActiveGetOwner() throws Exception {
+
+
+ // set the bundle owner is the broker
+ String broker = lookupServiceAddress2;
+ String bundle = "public/owned/0xfffffff0_0xffffffff";
+ overrideTableViews(bundle,
+ new ServiceUnitStateData(Owned, broker, null, 1));
+ var owner = channel1.getOwnerAsync(bundle).get(5,
TimeUnit.SECONDS).get();
+ assertEquals(owner, broker);
+
+ // simulate the owner is inactive
+ var spyRegistry = spy(new BrokerRegistryImpl(pulsar));
+ doReturn(CompletableFuture.completedFuture(Optional.empty()))
+ .when(spyRegistry).lookupAsync(eq(broker));
+ FieldUtils.writeDeclaredField(channel1,
+ "brokerRegistry", spyRegistry , true);
+ FieldUtils.writeDeclaredField(channel1,
+ "inFlightStateWaitingTimeInMillis", 1000, true);
+
+
+ // verify getOwnerAsync times out because the owner is inactive now.
+ long start = System.currentTimeMillis();
+ var ex = expectThrows(ExecutionException.class, () ->
channel1.getOwnerAsync(bundle).get());
+ assertTrue(ex.getCause() instanceof TimeoutException);
+ assertTrue(System.currentTimeMillis() - start >= 1000);
+
+ // simulate ownership cleanup(no selected owner) by the leader channel
+ doReturn(CompletableFuture.completedFuture(Optional.empty()))
+ .when(loadManager).selectAsync(any(), any());
+ var leaderChannel = channel1;
+ String leader1 = channel1.getChannelOwnerAsync().get(2,
TimeUnit.SECONDS).get();
+ String leader2 = channel2.getChannelOwnerAsync().get(2,
TimeUnit.SECONDS).get();
+ assertEquals(leader1, leader2);
+ if (leader1.equals(lookupServiceAddress2)) {
+ leaderChannel = channel2;
+ }
+ leaderChannel.handleMetadataSessionEvent(SessionReestablished);
+ FieldUtils.writeDeclaredField(leaderChannel,
"lastMetadataSessionEventTimestamp",
+ System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS
* 1000 + 1000), true);
+ leaderChannel.handleBrokerRegistrationEvent(broker,
NotificationType.Deleted);
+
+ // verify the ownership cleanup, and channel's getOwnerAsync returns
empty result without timeout
+ FieldUtils.writeDeclaredField(channel1,
+ "inFlightStateWaitingTimeInMillis", 20 * 1000, true);
+ start = System.currentTimeMillis();
+ assertTrue(channel1.getOwnerAsync(bundle).get().isEmpty());
+ assertTrue(System.currentTimeMillis() - start < 20_000);
+
+ // simulate ownership cleanup(lookupServiceAddress1 selected owner) by
the leader channel
+ overrideTableViews(bundle,
+ new ServiceUnitStateData(Owned, broker, null, 1));
+
doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress1)))
+ .when(loadManager).selectAsync(any(), any());
+ leaderChannel.handleMetadataSessionEvent(SessionReestablished);
+ FieldUtils.writeDeclaredField(leaderChannel,
"lastMetadataSessionEventTimestamp",
+ System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS
* 1000 + 1000), true);
+ getCleanupJobs(leaderChannel).clear();
+ leaderChannel.handleBrokerRegistrationEvent(broker,
NotificationType.Deleted);
+
+ // verify the ownership cleanup, and channel's getOwnerAsync returns
lookupServiceAddress1 without timeout
+ start = System.currentTimeMillis();
+ assertEquals(lookupServiceAddress1,
channel1.getOwnerAsync(bundle).get().get());
+ assertTrue(System.currentTimeMillis() - start < 20_000);
+
+ // test clean-up
+ FieldUtils.writeDeclaredField(channel1,
+ "inFlightStateWaitingTimeInMillis", 30 * 1000, true);
+ FieldUtils.writeDeclaredField(channel1,
+ "brokerRegistry", registry , true);
+ cleanTableViews();
+
+ }
private static ConcurrentHashMap<String,
CompletableFuture<Optional<String>>> getOwnerRequests(
ServiceUnitStateChannel channel) throws IllegalAccessException {
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
index 64a05503e19..954c1aa3773 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
@@ -125,6 +125,26 @@ public class ExtensibleLoadManagerTest extends
TestRetrySupport {
brokerContainer.start();
}
});
+ String topicName = "persistent://" + DEFAULT_NAMESPACE +
"/startBrokerCheck";
+ Awaitility.await().atMost(120,
TimeUnit.SECONDS).ignoreExceptions().until(
+ () -> {
+ for (BrokerContainer brokerContainer :
pulsarCluster.getBrokers()) {
+ try (PulsarAdmin admin =
PulsarAdmin.builder().serviceHttpUrl(
+
brokerContainer.getHttpServiceUrl()).build()) {
+ if
(admin.brokers().getActiveBrokers(clusterName).size() != NUM_BROKERS) {
+ return false;
+ }
+ try {
+
admin.topics().createPartitionedTopic(topicName, 10);
+ } catch
(PulsarAdminException.ConflictException e) {
+ // expected
+ }
+
admin.lookups().lookupPartitionedTopic(topicName);
+ }
+ }
+ return true;
+ }
+ );
}
}
@@ -243,7 +263,7 @@ public class ExtensibleLoadManagerTest extends
TestRetrySupport {
assertFalse(admin.namespaces().getNamespaces(DEFAULT_TENANT).contains(namespace));
}
- @Test(timeOut = 40 * 1000)
+ @Test(timeOut = 120 * 1000)
public void testStopBroker() throws Exception {
String topicName = "persistent://" + DEFAULT_NAMESPACE +
"/test-stop-broker-topic";
@@ -259,9 +279,11 @@ public class ExtensibleLoadManagerTest extends
TestRetrySupport {
}
}
- String broker1 = admin.lookups().lookupTopic(topicName);
+ Awaitility.waitAtMost(60,
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
+ String broker1 = admin.lookups().lookupTopic(topicName);
+ assertNotEquals(broker1, broker);
+ });
- assertNotEquals(broker1, broker);
}
@Test(timeOut = 80 * 1000)
@@ -309,7 +331,7 @@ public class ExtensibleLoadManagerTest extends
TestRetrySupport {
parameters1.put("min_limit", "1");
parameters1.put("usage_threshold", "100");
- Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(
+ Awaitility.await().atMost(10,
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(
() -> {
List<String> activeBrokers =
admin.brokers().getActiveBrokers();
assertEquals(activeBrokers.size(), NUM_BROKERS);
@@ -350,14 +372,14 @@ public class ExtensibleLoadManagerTest extends
TestRetrySupport {
}
}
- Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(
+ Awaitility.await().atMost(30,
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(
() -> {
List<String> activeBrokers =
admin.brokers().getActiveBrokers();
assertEquals(activeBrokers.size(), 2);
}
);
- Awaitility.await().atMost(30,
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
+ Awaitility.await().atMost(60,
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
String ownerBroker = admin.lookups().lookupTopic(topic);
assertEquals(extractBrokerIndex(ownerBroker), 1);
});
@@ -369,7 +391,7 @@ public class ExtensibleLoadManagerTest extends
TestRetrySupport {
}
}
- Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(
+ Awaitility.await().atMost(30,
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(
() -> {
List<String> activeBrokers =
admin.brokers().getActiveBrokers();
assertEquals(activeBrokers.size(), 1);
@@ -380,8 +402,7 @@ public class ExtensibleLoadManagerTest extends
TestRetrySupport {
fail();
} catch (Exception ex) {
log.error("Failed to lookup topic: ", ex);
- assertThat(ex.getMessage()).containsAnyOf("Failed to look up a
broker",
- "Failed to select the new owner broker for bundle");
+ assertThat(ex.getMessage()).contains("Failed to select the new
owner broker for bundle");
}
}