This is an automated email from the ASF dual-hosted git repository.
heesung pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 0ee3e002009 [fix][broker] timeout when broker registry hangs and
monitor broker registry (ExtensibleLoadManagerImpl only) (#23382) (#23510)
0ee3e002009 is described below
commit 0ee3e002009b0a75559898eafcf89427b2e31c9a
Author: Heesung Sohn <[email protected]>
AuthorDate: Thu Oct 24 21:28:23 2024 -0700
[fix][broker] timeout when broker registry hangs and monitor broker
registry (ExtensibleLoadManagerImpl only) (#23382) (#23510)
---
.../pulsar/broker/admin/impl/BrokersBase.java | 11 ++-
.../loadbalance/extensions/BrokerRegistry.java | 5 +
.../loadbalance/extensions/BrokerRegistryImpl.java | 69 +++++++++++---
.../extensions/ExtensibleLoadManagerImpl.java | 19 +++-
.../channel/ServiceUnitStateChannelImpl.java | 105 +++++++++++++++++----
.../loadbalance/extensions/BrokerRegistryTest.java | 33 +++++++
.../extensions/ExtensibleLoadManagerImplTest.java | 24 ++++-
.../channel/ServiceUnitStateChannelTest.java | 75 ++++++++++-----
.../org/apache/pulsar/client/admin/Brokers.java | 12 ++-
.../pulsar/client/admin/internal/BrokersImpl.java | 18 +++-
10 files changed, 310 insertions(+), 61 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index 10b15f6175f..b20312226e7 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -48,6 +48,7 @@ import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
+import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService.State;
@@ -368,20 +369,26 @@ public class BrokersBase extends AdminResource {
@ApiOperation(value = "Run a healthCheck against the broker")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Everything is OK"),
+ @ApiResponse(code = 307, message = "Current broker is not the target
broker"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Cluster doesn't exist"),
@ApiResponse(code = 500, message = "Internal server error")})
public void healthCheck(@Suspended AsyncResponse asyncResponse,
@ApiParam(value = "Topic Version")
- @QueryParam("topicVersion") TopicVersion
topicVersion) {
+ @QueryParam("topicVersion") TopicVersion
topicVersion,
+ @QueryParam("brokerId") String brokerId) {
validateSuperUserAccessAsync()
.thenAccept(__ -> checkDeadlockedThreads())
+ .thenCompose(__ -> maybeRedirectToBroker(
+ StringUtils.isBlank(brokerId) ? pulsar().getBrokerId()
: brokerId))
.thenCompose(__ -> internalRunHealthCheck(topicVersion))
.thenAccept(__ -> {
LOG.info("[{}] Successfully run health check.",
clientAppId());
asyncResponse.resume(Response.ok("ok").build());
}).exceptionally(ex -> {
- LOG.error("[{}] Fail to run health check.", clientAppId(),
ex);
+ if (!isRedirectException(ex)) {
+ LOG.error("[{}] Fail to run health check.",
clientAppId(), ex);
+ }
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java
index 79dba9c6334..d154edfbb32 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java
@@ -48,6 +48,11 @@ public interface BrokerRegistry extends AutoCloseable {
*/
boolean isStarted();
+ /**
+ * Return the broker has been registered.
+ */
+ boolean isRegistered();
+
/**
* Register local broker to metadata store.
*/
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
index c3cb8413b01..ef46aa8938f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
@@ -52,6 +52,8 @@ import org.apache.pulsar.metadata.api.extended.CreateOption;
@Slf4j
public class BrokerRegistryImpl implements BrokerRegistry {
+ private static final int MAX_REGISTER_RETRY_DELAY_IN_MILLIS = 1000;
+
private final PulsarService pulsar;
private final ServiceConfiguration conf;
@@ -77,10 +79,11 @@ public class BrokerRegistryImpl implements BrokerRegistry {
@VisibleForTesting
final AtomicReference<State> state = new AtomicReference<>(State.Init);
- public BrokerRegistryImpl(PulsarService pulsar) {
+ @VisibleForTesting
+ BrokerRegistryImpl(PulsarService pulsar, MetadataCache<BrokerLookupData>
brokerLookupDataMetadataCache) {
this.pulsar = pulsar;
this.conf = pulsar.getConfiguration();
- this.brokerLookupDataMetadataCache =
pulsar.getLocalMetadataStore().getMetadataCache(BrokerLookupData.class);
+ this.brokerLookupDataMetadataCache = brokerLookupDataMetadataCache;
this.scheduler = pulsar.getLoadManagerExecutor();
this.listeners = new ArrayList<>();
this.brokerIdKeyPath = keyPath(pulsar.getBrokerId());
@@ -98,6 +101,10 @@ public class BrokerRegistryImpl implements BrokerRegistry {
pulsar.getBrokerVersion());
}
+ public BrokerRegistryImpl(PulsarService pulsar) {
+ this(pulsar,
pulsar.getLocalMetadataStore().getMetadataCache(BrokerLookupData.class));
+ }
+
@Override
public synchronized void start() throws PulsarServerException {
if (!this.state.compareAndSet(State.Init, State.Started)) {
@@ -117,6 +124,12 @@ public class BrokerRegistryImpl implements BrokerRegistry {
return state == State.Started || state == State.Registered;
}
+ @Override
+ public boolean isRegistered() {
+ final var state = this.state.get();
+ return state == State.Registered;
+ }
+
@Override
public CompletableFuture<Void> registerAsync() {
final var state = this.state.get();
@@ -126,12 +139,35 @@ public class BrokerRegistryImpl implements BrokerRegistry
{
}
log.info("[{}] Started registering self to {} (state: {})",
getBrokerId(), brokerIdKeyPath, state);
return brokerLookupDataMetadataCache.put(brokerIdKeyPath,
brokerLookupData, EnumSet.of(CreateOption.Ephemeral))
- .thenAccept(__ -> {
- this.state.set(State.Registered);
- log.info("[{}] Finished registering self", getBrokerId());
+
.orTimeout(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(),
TimeUnit.SECONDS)
+ .whenComplete((__, ex) -> {
+ if (ex == null) {
+ this.state.set(State.Registered);
+ log.info("[{}] Finished registering self",
getBrokerId());
+ } else {
+ log.error("[{}] Failed registering self",
getBrokerId(), ex);
+ }
});
}
+ private void doRegisterAsyncWithRetries(int retry, CompletableFuture<Void>
future) {
+ pulsar.getExecutor().schedule(() -> {
+ registerAsync().whenComplete((__, e) -> {
+ if (e != null) {
+ doRegisterAsyncWithRetries(retry + 1, future);
+ } else {
+ future.complete(null);
+ }
+ });
+ }, Math.min(MAX_REGISTER_RETRY_DELAY_IN_MILLIS, retry * retry * 50),
TimeUnit.MILLISECONDS);
+ }
+
+ private CompletableFuture<Void> registerAsyncWithRetries() {
+ var retryFuture = new CompletableFuture<Void>();
+ doRegisterAsyncWithRetries(0, retryFuture);
+ return retryFuture;
+ }
+
@Override
public synchronized void unregister() throws MetadataStoreException {
if (state.compareAndSet(State.Registered, State.Unregistering)) {
@@ -218,17 +254,26 @@ public class BrokerRegistryImpl implements BrokerRegistry
{
// The registered node is an ephemeral node that could be deleted
when the metadata store client's session
// is expired. In this case, we should register again.
final var brokerId =
t.getPath().substring(LOADBALANCE_BROKERS_ROOT.length() + 1);
+
+ CompletableFuture<Void> register;
if (t.getType() == NotificationType.Deleted &&
getBrokerId().equals(brokerId)) {
- registerAsync();
- }
- if (listeners.isEmpty()) {
- return;
+ this.state.set(State.Started);
+ register = registerAsyncWithRetries();
+ } else {
+ register = CompletableFuture.completedFuture(null);
}
- this.scheduler.submit(() -> {
- for (BiConsumer<String, NotificationType> listener :
listeners) {
- listener.accept(brokerId, t.getType());
+ // Make sure to run the listeners after re-registered.
+ register.thenAccept(__ -> {
+ if (listeners.isEmpty()) {
+ return;
}
+ this.scheduler.submit(() -> {
+ for (BiConsumer<String, NotificationType> listener :
listeners) {
+ listener.accept(brokerId, t.getType());
+ }
+ });
});
+
} catch (RejectedExecutionException e) {
// Executor is shutting down
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index bf98df2c5ac..a1f01ea00be 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -36,8 +36,10 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
@@ -1007,8 +1009,12 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
return;
}
+ // Monitor broker registry
+ // Periodically check the broker registry in case metadata store
fails.
+ validateBrokerRegistry();
+
// Monitor role
- // Periodically check the role in case ZK watcher fails.
+ // Periodically check the role in case metadata store fails.
var isChannelOwner = serviceUnitStateChannel.isChannelOwner();
if (isChannelOwner) {
// System topic config might fail due to the race condition
@@ -1093,4 +1099,15 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
private boolean disabled() {
return state.get() == State.DISABLED;
}
+
+ private void validateBrokerRegistry()
+ throws ExecutionException, InterruptedException, TimeoutException {
+ var timeout =
pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds();
+ var lookup =
brokerRegistry.lookupAsync(brokerRegistry.getBrokerId()).get(timeout,
TimeUnit.SECONDS);
+ if (lookup.isEmpty()) {
+ log.warn("Found this broker:{} has not registered yet. Trying to
register it",
+ brokerRegistry.getBrokerId());
+ brokerRegistry.registerAsync().get(timeout, TimeUnit.SECONDS);
+ }
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 1f2715a00ac..58f2e6f30f4 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -87,6 +87,7 @@ import
org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
@@ -98,6 +99,7 @@ import
org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.naming.TopicVersion;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.topics.TopicCompactionStrategy;
import org.apache.pulsar.common.util.FutureUtil;
@@ -121,6 +123,8 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
private static final long MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS
= 10;
private static final int MAX_OUTSTANDING_PUB_MESSAGES = 500;
private static final long MAX_OWNED_BUNDLE_COUNT_DELAY_TIME_IN_MILLIS = 10
* 60 * 1000;
+ private static final long MAX_BROKER_HEALTH_CHECK_RETRY = 3;
+ private static final long MAX_BROKER_HEALTH_CHECK_DELAY_IN_MILLIS = 1000;
private final PulsarService pulsar;
private final ServiceConfiguration config;
private final Schema<ServiceUnitStateData> schema;
@@ -128,6 +132,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
private final String brokerId;
private final Map<String, CompletableFuture<Void>> cleanupJobs;
private final StateChangeListeners stateChangeListeners;
+
private BrokerRegistry brokerRegistry;
private LeaderElectionService leaderElectionService;
private TableView<ServiceUnitStateData> tableview;
@@ -267,7 +272,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
@Override
public void cleanOwnerships() {
disable();
- doCleanup(brokerId);
+ doCleanup(brokerId, true);
}
@Override
@@ -383,6 +388,12 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
.get().getLeaderElectionService();
}
+ @VisibleForTesting
+ protected PulsarAdmin getPulsarAdmin() throws PulsarServerException {
+ return pulsar.getAdminClient();
+ }
+
+ @Override
public synchronized void close() throws PulsarServerException {
channelState = Closed;
boolean debug = debug();
@@ -491,6 +502,14 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
String serviceUnit,
ServiceUnitState state,
Optional<String> owner) {
+
+ // If this broker's registry does not exist(possibly suffering from
connecting to the metadata store),
+ // we return the owner without its activeness check.
+ // This broker tries to serve lookups on a best efforts basis when
metadata store connection is unstable.
+ if (!brokerRegistry.isRegistered()) {
+ return CompletableFuture.completedFuture(owner);
+ }
+
return dedupeGetOwnerRequest(serviceUnit)
.thenCompose(newOwner -> {
if (newOwner == null) {
@@ -1259,19 +1278,25 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
}
private void handleBrokerCreationEvent(String broker) {
- CompletableFuture<Void> future = cleanupJobs.remove(broker);
- if (future != null) {
- future.cancel(false);
- totalInactiveBrokerCleanupCancelledCnt++;
- log.info("Successfully cancelled the ownership cleanup for
broker:{}."
- + " Active cleanup job count:{}",
- broker, cleanupJobs.size());
- } else {
- if (debug()) {
- log.info("No needs to cancel the ownership cleanup for
broker:{}."
- + " There was no scheduled cleanup job. Active
cleanup job count:{}",
- broker, cleanupJobs.size());
- }
+
+ if (!cleanupJobs.isEmpty() && cleanupJobs.containsKey(broker)) {
+ healthCheckBrokerAsync(broker)
+ .thenAccept(__ -> {
+ CompletableFuture<Void> future =
cleanupJobs.remove(broker);
+ if (future != null) {
+ future.cancel(false);
+ totalInactiveBrokerCleanupCancelledCnt++;
+ log.info("Successfully cancelled the ownership
cleanup for broker:{}."
+ + " Active cleanup job count:{}",
+ broker, cleanupJobs.size());
+ } else {
+ if (debug()) {
+ log.info("No needs to cancel the ownership
cleanup for broker:{}."
+ + " There was no scheduled
cleanup job. Active cleanup job count:{}",
+ broker, cleanupJobs.size());
+ }
+ }
+ });
}
}
@@ -1318,7 +1343,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
var future = CompletableFuture
.runAsync(() -> {
try {
- doCleanup(broker);
+ doCleanup(broker, false);
} catch (Throwable e) {
log.error("Failed to run the cleanup
job for the broker {}, "
+
"totalCleanupErrorCnt:{}.",
@@ -1422,7 +1447,38 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
System.currentTimeMillis() - started);
}
- private synchronized void doCleanup(String broker) {
+ private CompletableFuture<Void> healthCheckBrokerAsync(String brokerId) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ doHealthCheckBrokerAsyncWithRetries(brokerId, 0, future);
+ return future;
+ }
+
+ private void doHealthCheckBrokerAsyncWithRetries(String brokerId, int
retry, CompletableFuture<Void> future) {
+ try {
+ var admin = getPulsarAdmin();
+ admin.brokers().healthcheckAsync(TopicVersion.V2,
Optional.of(brokerId))
+ .whenComplete((__, e) -> {
+ if (e == null) {
+ log.info("Completed health-check broker :{}",
brokerId, e);
+ future.complete(null);
+ return;
+ }
+ if (retry == MAX_BROKER_HEALTH_CHECK_RETRY) {
+ log.error("Failed health-check broker :{}",
brokerId, e);
+
future.completeExceptionally(FutureUtil.unwrapCompletionException(e));
+ } else {
+ pulsar.getExecutor()
+ .schedule(() ->
doHealthCheckBrokerAsyncWithRetries(brokerId, retry + 1, future),
+
Math.min(MAX_BROKER_HEALTH_CHECK_DELAY_IN_MILLIS, retry * retry * 50),
+ MILLISECONDS);
+ }
+ });
+ } catch (PulsarServerException e) {
+ future.completeExceptionally(e);
+ }
+ }
+
+ private synchronized void doCleanup(String broker, boolean gracefully) {
try {
if
(getChannelOwnerAsync().get(MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS,
TimeUnit.SECONDS)
.isEmpty()) {
@@ -1435,6 +1491,23 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
return;
}
+ // if not gracefully, verify the broker is inactive by health-check.
+ if (!gracefully) {
+ try {
+ healthCheckBrokerAsync(broker).get(
+
pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), SECONDS);
+ log.warn("Found that the broker to clean is healthy. Skip the
broker:{}'s orphan bundle cleanup",
+ broker);
+ return;
+ } catch (Exception e) {
+ if (debug()) {
+ log.info("Failed to check broker:{} health", broker, e);
+ }
+ log.info("Checked the broker:{} health. Continue the orphan
bundle cleanup", broker);
+ }
+ }
+
+
long startTime = System.nanoTime();
log.info("Started ownership cleanup for the inactive broker:{}",
broker);
int orphanServiceUnitCleanupCnt = 0;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
index 28a2a18500f..941d0e4cbc3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
@@ -19,7 +19,10 @@
package org.apache.pulsar.broker.loadbalance.extensions;
import static
org.apache.pulsar.broker.loadbalance.LoadManager.LOADBALANCE_BROKERS_ROOT;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
@@ -36,6 +39,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@@ -48,6 +52,7 @@ import
org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
@@ -396,6 +401,34 @@ public class BrokerRegistryTest {
assertEquals(keyPath, LOADBALANCE_BROKERS_ROOT + "/brokerId");
}
+ @Test
+ public void testRegisterAsyncTimeout() throws Exception {
+ var pulsar1 = createPulsarService();
+ pulsar1.start();
+ pulsar1.getConfiguration().setMetadataStoreOperationTimeoutSeconds(1);
+ var metadataCache = mock(MetadataCache.class);
+ var brokerRegistry = new BrokerRegistryImpl(pulsar1, metadataCache);
+
+ // happy case
+
doReturn(CompletableFuture.completedFuture(null)).when(metadataCache).put(any(),
any(), any());
+ brokerRegistry.start();
+
+ // unhappy case (timeout)
+ doAnswer(invocationOnMock -> {
+ return CompletableFuture.supplyAsync(() -> null,
CompletableFuture.delayedExecutor(5, TimeUnit.SECONDS));
+ }).when(metadataCache).put(any(), any(), any());
+ try {
+ brokerRegistry.registerAsync().join();
+ } catch (Exception e) {
+ assertTrue(e.getCause() instanceof TimeoutException);
+ }
+
+ // happy case again
+
doReturn(CompletableFuture.completedFuture(null)).when(metadataCache).put(any(),
any(), any());
+ brokerRegistry.registerAsync().join();
+ }
+
+
private static BrokerRegistryImpl.State getState(BrokerRegistryImpl
brokerRegistry) {
return brokerRegistry.state.get();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 7580c165a50..a136e22c178 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -132,6 +132,7 @@ import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
@@ -457,14 +458,17 @@ public class ExtensibleLoadManagerImplTest extends
ExtensibleLoadManagerImplBase
assertEquals(unloadCount.get(), 0);
});
- ServiceUnitStateChannelImpl channel = new
ServiceUnitStateChannelImpl(pulsar1);
- channel.start();
+ @Cleanup
+ ServiceUnitStateChannelImpl channel3 = new
ServiceUnitStateChannelImpl(pulsar1);
+ channel3.start();
+ @Cleanup
+ ServiceUnitStateChannelImpl channel4 = new
ServiceUnitStateChannelImpl(pulsar2);
+ channel4.start();
Awaitility.await().untilAsserted(() -> {
assertEquals(onloadCount.get(), 2);
assertEquals(unloadCount.get(), 0);
});
- channel.close();
}
@DataProvider(name = "isPersistentTopicSubscriptionTypeTest")
@@ -1895,6 +1899,20 @@ public class ExtensibleLoadManagerImplTest extends
ExtensibleLoadManagerImplBase
TimeUnit.SECONDS).get(2, TimeUnit.SECONDS);
}
+ @Test(timeOut = 30 * 1000)
+ public void testMonitorBrokerRegistry() throws MetadataStoreException {
+ primaryLoadManager.getBrokerRegistry().unregister();
+ assertFalse(primaryLoadManager.getBrokerRegistry().isRegistered());
+ Awaitility.await()
+ .pollInterval(200, TimeUnit.MILLISECONDS)
+ .atMost(30, TimeUnit.SECONDS)
+ .ignoreExceptions()
+ .untilAsserted(() -> { // wait until true
+ primaryLoadManager.monitor();
+
assertTrue(primaryLoadManager.getBrokerRegistry().isRegistered());
+ });
+ }
+
private static abstract class MockBrokerFilter implements BrokerFilter {
@Override
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index af42649436b..a110746b229 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -90,6 +90,8 @@ import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.TableViewImpl;
+import org.apache.pulsar.client.admin.Brokers;
+import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.NotificationType;
@@ -129,8 +131,12 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
private BrokerRegistryImpl registry;
+ private PulsarAdmin pulsarAdmin;
+
private ExtensibleLoadManagerImpl loadManager;
+ private Brokers brokers;
+
@BeforeClass
@Override
protected void setup() throws Exception {
@@ -146,7 +152,9 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
admin.namespaces().createNamespace("public/default");
pulsar1 = pulsar;
- registry = new BrokerRegistryImpl(pulsar);
+ registry = spy(new BrokerRegistryImpl(pulsar1));
+ registry.start();
+ pulsarAdmin = spy(pulsar.getAdminClient());
loadManagerContext = mock(LoadManagerContext.class);
doReturn(mock(LoadDataStore.class)).when(loadManagerContext).brokerLoadDataStore();
doReturn(mock(LoadDataStore.class)).when(loadManagerContext).topBundleLoadDataStore();
@@ -177,6 +185,10 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
childBundle31 = "public/default3/" + childBundle1Range;
childBundle32 = "public/default3/" + childBundle2Range;
+
+ brokers = mock(Brokers.class);
+ doReturn(CompletableFuture.failedFuture(new
RuntimeException("failed"))).when(brokers)
+ .healthcheckAsync(any(), any());
}
@BeforeMethod
@@ -689,17 +701,18 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
@Test(priority = 8)
public void handleBrokerCreationEventTest() throws IllegalAccessException {
var cleanupJobs = getCleanupJobs(channel1);
- String broker = "broker-1";
+ String broker = brokerId2;
var future = new CompletableFuture();
cleanupJobs.put(broker, future);
- channel1.handleBrokerRegistrationEvent(broker,
NotificationType.Created);
- assertEquals(0, cleanupJobs.size());
- assertTrue(future.isCancelled());
+ ((ServiceUnitStateChannelImpl)
channel1).handleBrokerRegistrationEvent(broker, NotificationType.Created);
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
+ assertEquals(0, cleanupJobs.size());
+ assertTrue(future.isCancelled());
+ });
}
@Test(priority = 9)
- public void handleBrokerDeletionEventTest()
- throws IllegalAccessException, ExecutionException,
InterruptedException, TimeoutException {
+ public void handleBrokerDeletionEventTest() throws Exception {
var cleanupJobs1 = getCleanupJobs(channel1);
var cleanupJobs2 = getCleanupJobs(channel2);
@@ -752,6 +765,8 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS
* 1000 + 1000), true);
FieldUtils.writeDeclaredField(followerChannel,
"lastMetadataSessionEventTimestamp",
System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS
* 1000 + 1000), true);
+
+ doReturn(brokers).when(pulsarAdmin).brokers();
leaderChannel.handleBrokerRegistrationEvent(broker,
NotificationType.Deleted);
followerChannel.handleBrokerRegistrationEvent(broker,
NotificationType.Deleted);
leaderChannel.handleBrokerRegistrationEvent(brokerId2,
NotificationType.Deleted);
@@ -809,6 +824,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
3,
0,
0);
+ reset(pulsarAdmin);
// broker is back online
leaderChannel.handleBrokerRegistrationEvent(broker,
NotificationType.Created);
@@ -833,6 +849,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
// broker is offline again
+ doReturn(brokers).when(pulsarAdmin).brokers();
FieldUtils.writeDeclaredField(leaderChannel,
"maxCleanupDelayTimeInSecs", 3, true);
leaderChannel.handleBrokerRegistrationEvent(broker,
NotificationType.Deleted);
followerChannel.handleBrokerRegistrationEvent(broker,
NotificationType.Deleted);
@@ -874,6 +891,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
4,
0,
1);
+ reset(pulsarAdmin);
// test unstable state
channel1.publishUnloadEventAsync(new Unload(brokerId2, bundle1,
Optional.of(broker)));
@@ -1540,9 +1558,12 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS
* 1000 + 1000), true);
FieldUtils.writeDeclaredField(followerChannel,
"lastMetadataSessionEventTimestamp",
System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS
* 1000 + 1000), true);
+
+ doReturn(brokers).when(pulsarAdmin).brokers();
leaderChannel.handleBrokerRegistrationEvent(broker,
NotificationType.Deleted);
followerChannel.handleBrokerRegistrationEvent(broker,
NotificationType.Deleted);
+
waitUntilNewOwner(channel2, releasingBundle, brokerId2);
waitUntilNewOwner(channel2, childBundle11, brokerId2);
waitUntilNewOwner(channel2, childBundle12, brokerId2);
@@ -1558,7 +1579,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
// clean-up
FieldUtils.writeDeclaredField(leaderChannel,
"maxCleanupDelayTimeInSecs", 3 * 60, true);
cleanTableViews();
-
+ reset(pulsarAdmin);
}
@Test(priority = 18)
@@ -1675,7 +1696,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
// case 3: the bundle ownership is transferring, and the dst broker is
the channel owner
overrideTableViews(bundle,
new ServiceUnitStateData(Assigning, brokerId1, brokerId2, 1));
- assertTrue(!channel1.getOwnerAsync(bundle).isDone());
+ assertFalse(channel1.getOwnerAsync(bundle).isDone());
// case 4: the bundle ownership is found
overrideTableViews(bundle,
@@ -1684,18 +1705,15 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
assertEquals(owner, broker);
// case 5: the owner lookup gets delayed
- var spyRegistry = spy(new BrokerRegistryImpl(pulsar));
- FieldUtils.writeDeclaredField(channel1,
- "brokerRegistry", spyRegistry , true);
FieldUtils.writeDeclaredField(channel1,
"inFlightStateWaitingTimeInMillis", 1000, true);
var delayedFuture = new CompletableFuture();
- doReturn(delayedFuture).when(spyRegistry).lookupAsync(eq(broker));
+ doReturn(delayedFuture).when(registry).lookupAsync(eq(broker));
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
- Thread.currentThread().interrupt();;
+ Thread.currentThread().interrupt();
}
delayedFuture.complete(Optional.of(broker));
});
@@ -1708,7 +1726,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
// case 6: the owner is inactive
doReturn(CompletableFuture.completedFuture(Optional.empty()))
- .when(spyRegistry).lookupAsync(eq(broker));
+ .when(registry).lookupAsync(eq(broker));
// verify getOwnerAsync times out
start = System.currentTimeMillis();
@@ -1716,19 +1734,32 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
assertTrue(ex.getCause() instanceof IllegalStateException);
assertTrue(System.currentTimeMillis() - start >= 1000);
+ try {
+ // verify getOwnerAsync returns immediately when not registered
+ registry.unregister();
+ start = System.currentTimeMillis();
+ assertEquals(broker, channel1.getOwnerAsync(bundle).get().get());
+ elapsed = System.currentTimeMillis() - start;
+ assertTrue(elapsed < 1000);
+ } finally {
+ registry.registerAsync().join();
+ }
+
+
// case 7: the ownership cleanup(no new owner) by the leader channel
doReturn(CompletableFuture.completedFuture(Optional.empty()))
.when(loadManager).selectAsync(any(), any(), any());
- var leaderChannel = channel1;
+ ServiceUnitStateChannelImpl leaderChannel =
(ServiceUnitStateChannelImpl) channel1;
String leader1 = channel1.getChannelOwnerAsync().get(2,
TimeUnit.SECONDS).get();
String leader2 = channel2.getChannelOwnerAsync().get(2,
TimeUnit.SECONDS).get();
assertEquals(leader1, leader2);
if (leader1.equals(brokerId2)) {
- leaderChannel = channel2;
+ leaderChannel = (ServiceUnitStateChannelImpl) channel2;
}
leaderChannel.handleMetadataSessionEvent(SessionReestablished);
FieldUtils.writeDeclaredField(leaderChannel,
"lastMetadataSessionEventTimestamp",
System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS
* 1000 + 1000), true);
+ doReturn(brokers).when(pulsarAdmin).brokers();
leaderChannel.handleBrokerRegistrationEvent(broker,
NotificationType.Deleted);
// verify the ownership cleanup, and channel's getOwnerAsync returns
empty result without timeout
@@ -1740,7 +1771,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
waitUntilState(channel2, bundle, Init);
assertTrue(System.currentTimeMillis() - start < 20_000);
-
+ reset(pulsarAdmin);
// case 8: simulate ownership cleanup(brokerId1 as the new owner) by
the leader channel
overrideTableViews(bundle,
new ServiceUnitStateData(Owned, broker, null, 1));
@@ -1750,6 +1781,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
FieldUtils.writeDeclaredField(leaderChannel,
"lastMetadataSessionEventTimestamp",
System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS
* 1000 + 1000), true);
getCleanupJobs(leaderChannel).clear();
+ doReturn(brokers).when(pulsarAdmin).brokers();
leaderChannel.handleBrokerRegistrationEvent(broker,
NotificationType.Deleted);
// verify the ownership cleanup, and channel's getOwnerAsync returns
brokerId1 without timeout
@@ -1760,10 +1792,8 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
// test clean-up
FieldUtils.writeDeclaredField(channel1,
"inFlightStateWaitingTimeInMillis", 30 * 1000, true);
- FieldUtils.writeDeclaredField(channel1,
- "brokerRegistry", registry , true);
cleanTableViews();
-
+ reset(pulsarAdmin);
}
@Test(priority = 20)
@@ -2104,7 +2134,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
}
ServiceUnitStateChannelImpl createChannel(PulsarService pulsar)
- throws IllegalAccessException {
+ throws IllegalAccessException, PulsarServerException {
var tmpChannel = new ServiceUnitStateChannelImpl(pulsar);
FieldUtils.writeDeclaredField(tmpChannel,
"ownershipMonitorDelayTimeInSecs", 5, true);
var channel = spy(tmpChannel);
@@ -2112,6 +2142,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
doReturn(loadManagerContext).when(channel).getContext();
doReturn(registry).when(channel).getBrokerRegistry();
doReturn(loadManager).when(channel).getLoadManager();
+ doReturn(pulsarAdmin).when(channel).getPulsarAdmin();
var leaderElectionService = new LeaderElectionService(
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java
index dc0b7c9885a..eed73f38282 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.admin;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import
org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException;
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
@@ -320,10 +321,19 @@ public interface Brokers {
*/
void healthcheck(TopicVersion topicVersion) throws PulsarAdminException;
+ /**
+ * Run a healthcheck on the target broker or on the broker.
+ * @param brokerId target broker id to check the health. If empty, it
checks the health on the connected broker.
+ *
+ * @throws PulsarAdminException if the healthcheck fails.
+ */
+ void healthcheck(TopicVersion topicVersion, Optional<String> brokerId)
throws PulsarAdminException;
+
/**
* Run a healthcheck on the broker asynchronously.
*/
- CompletableFuture<Void> healthcheckAsync(TopicVersion topicVersion);
+ CompletableFuture<Void> healthcheckAsync(TopicVersion topicVersion,
Optional<String> brokerId);
+
/**
* Trigger the current broker to graceful-shutdown asynchronously.
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
index b82c3fd0f41..35b261b196e 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.admin.internal;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.InvocationCallback;
@@ -168,26 +169,35 @@ public class BrokersImpl extends BaseResource implements
Brokers {
@Override
@Deprecated
public void healthcheck() throws PulsarAdminException {
- healthcheck(TopicVersion.V1);
+ healthcheck(TopicVersion.V1, Optional.empty());
}
@Override
@Deprecated
public CompletableFuture<Void> healthcheckAsync() {
- return healthcheckAsync(TopicVersion.V1);
+ return healthcheckAsync(TopicVersion.V1, Optional.empty());
}
+
@Override
public void healthcheck(TopicVersion topicVersion) throws
PulsarAdminException {
- sync(() -> healthcheckAsync(topicVersion));
+ sync(() -> healthcheckAsync(topicVersion, Optional.empty()));
}
@Override
- public CompletableFuture<Void> healthcheckAsync(TopicVersion topicVersion)
{
+ public void healthcheck(TopicVersion topicVersion, Optional<String>
brokerId) throws PulsarAdminException {
+ sync(() -> healthcheckAsync(topicVersion, brokerId));
+ }
+
+ @Override
+ public CompletableFuture<Void> healthcheckAsync(TopicVersion topicVersion,
Optional<String> brokerId) {
WebTarget path = adminBrokers.path("health");
if (topicVersion != null) {
path = path.queryParam("topicVersion", topicVersion);
}
+ if (brokerId.isPresent()) {
+ path = path.queryParam("brokerId", brokerId.get());
+ }
final CompletableFuture<Void> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<String>() {