This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 6d8b15db185 [fix][broker][branch-3.0] Fail fast if the extensible load
manager failed to start (#23297) (#23302)
6d8b15db185 is described below
commit 6d8b15db1851e81a3d467e6684d1c2a8534cf690
Author: Lari Hotari <[email protected]>
AuthorDate: Sat Sep 14 08:47:56 2024 +0300
[fix][broker][branch-3.0] Fail fast if the extensible load manager failed
to start (#23297) (#23302)
Co-authored-by: Yunze Xu <[email protected]>
---
.../pulsar/broker/PulsarServerException.java | 17 +++
.../org/apache/pulsar/broker/PulsarService.java | 2 +-
.../extensions/ExtensibleLoadManagerImpl.java | 122 +++++++--------------
.../extensions/LoadManagerFailFastTest.java | 120 ++++++++++++++++++++
4 files changed, 179 insertions(+), 82 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/PulsarServerException.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/PulsarServerException.java
index 2235b9a7128..d7c0d0adb3a 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/PulsarServerException.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/PulsarServerException.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker;
import java.io.IOException;
+import java.util.concurrent.CompletionException;
public class PulsarServerException extends IOException {
private static final long serialVersionUID = 1;
@@ -44,4 +45,20 @@ public class PulsarServerException extends IOException {
super(t);
}
}
+
+ public static PulsarServerException from(Throwable throwable) {
+ if (throwable instanceof CompletionException) {
+ return from(throwable.getCause());
+ }
+ if (throwable instanceof PulsarServerException pulsarServerException) {
+ return pulsarServerException;
+ } else {
+ return new PulsarServerException(throwable);
+ }
+ }
+
+ // Wrap this checked exception into a specific unchecked exception
+ public static CompletionException
toUncheckedException(PulsarServerException e) {
+ return new CompletionException(e);
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 827f2d4cde4..a71cf5cecc0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -949,7 +949,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
state = State.Started;
} catch (Exception e) {
LOG.error("Failed to start Pulsar service: {}", e.getMessage(), e);
- PulsarServerException startException = new
PulsarServerException(e);
+ PulsarServerException startException =
PulsarServerException.from(e);
readyForIncomingRequestsFuture.completeExceptionally(startException);
throw startException;
} finally {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index e0fd738a408..af0ea0ea224 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -80,7 +80,6 @@ import
org.apache.pulsar.broker.loadbalance.extensions.scheduler.LoadManagerSche
import
org.apache.pulsar.broker.loadbalance.extensions.scheduler.SplitScheduler;
import
org.apache.pulsar.broker.loadbalance.extensions.scheduler.UnloadScheduler;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
-import
org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreException;
import
org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreFactory;
import
org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy;
import
org.apache.pulsar.broker.loadbalance.extensions.strategy.LeastResourceUsageWithWeight;
@@ -97,10 +96,7 @@ import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.stats.Metrics;
-import org.apache.pulsar.common.util.Backoff;
-import org.apache.pulsar.common.util.BackoffBuilder;
import org.apache.pulsar.common.util.FutureUtil;
-import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
import org.slf4j.Logger;
@@ -123,10 +119,6 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
public static final long COMPACTION_THRESHOLD = 5 * 1024 * 1024;
- public static final int STARTUP_TIMEOUT_SECONDS = 30;
-
- public static final int MAX_RETRY = 5;
-
private static final String ELECTION_ROOT =
"/loadbalance/extension/leader";
public static final Set<String> INTERNAL_TOPICS =
@@ -204,7 +196,7 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
private final ConcurrentHashMap<String,
CompletableFuture<Optional<BrokerLookupData>>>
lookupRequests = new ConcurrentHashMap<>();
- private final CompletableFuture<Void> initWaiter = new
CompletableFuture<>();
+ private final CompletableFuture<Boolean> initWaiter = new
CompletableFuture<>();
/**
* Get all the bundles that are owned by this broker.
@@ -331,7 +323,7 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
return;
}
try {
- this.brokerRegistry = new BrokerRegistryImpl(pulsar);
+ this.brokerRegistry = createBrokerRegistry(pulsar);
this.leaderElectionService = new LeaderElectionService(
pulsar.getCoordinationService(), pulsar.getBrokerId(),
pulsar.getSafeWebServiceAddress(), ELECTION_ROOT,
@@ -346,53 +338,14 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
});
});
});
- this.serviceUnitStateChannel =
ServiceUnitStateChannelImpl.newInstance(pulsar);
+ this.serviceUnitStateChannel =
createServiceUnitStateChannel(pulsar);
this.brokerRegistry.start();
this.splitManager = new SplitManager(splitCounter);
this.unloadManager = new UnloadManager(unloadCounter);
this.serviceUnitStateChannel.listen(unloadManager);
this.serviceUnitStateChannel.listen(splitManager);
this.leaderElectionService.start();
- pulsar.runWhenReadyForIncomingRequests(() -> {
- Backoff backoff = new BackoffBuilder()
- .setInitialTime(100, TimeUnit.MILLISECONDS)
- .setMax(STARTUP_TIMEOUT_SECONDS, TimeUnit.SECONDS)
- .create();
- int retry = 0;
- while (!Thread.currentThread().isInterrupted()) {
- try {
- brokerRegistry.register();
- this.serviceUnitStateChannel.start();
- break;
- } catch (Exception e) {
- log.warn("The broker:{} failed to start service unit
state channel. Retrying {} th ...",
- pulsar.getBrokerId(), ++retry, e);
- try {
- Thread.sleep(backoff.next());
- } catch (InterruptedException ex) {
- log.warn("Interrupted while sleeping.");
- // preserve thread's interrupt status
- Thread.currentThread().interrupt();
- try {
- pulsar.close();
- } catch (PulsarServerException exc) {
- log.error("Failed to close pulsar service.",
exc);
- }
- return;
- }
- failStarting(e);
- if (retry >= MAX_RETRY) {
- log.error("Failed to start the service unit state
channel after retry {} th. "
- + "Closing pulsar service.", retry, e);
- try {
- pulsar.close();
- } catch (PulsarServerException ex) {
- log.error("Failed to close pulsar service.",
ex);
- }
- }
- }
- }
- });
+
this.antiAffinityGroupPolicyHelper =
new AntiAffinityGroupPolicyHelper(pulsar,
serviceUnitStateChannel);
antiAffinityGroupPolicyHelper.listenFailureDomainUpdate();
@@ -401,15 +354,10 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
SimpleResourceAllocationPolicies policies = new
SimpleResourceAllocationPolicies(pulsar);
this.isolationPoliciesHelper = new
IsolationPoliciesHelper(policies);
this.brokerFilterPipeline.add(new
BrokerIsolationPoliciesFilter(isolationPoliciesHelper));
-
- try {
- this.brokerLoadDataStore = LoadDataStoreFactory
- .create(pulsar, BROKER_LOAD_DATA_STORE_TOPIC,
BrokerLoadData.class);
- this.topBundlesLoadDataStore = LoadDataStoreFactory
- .create(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC,
TopBundlesLoadData.class);
- } catch (LoadDataStoreException e) {
- throw new PulsarServerException(e);
- }
+ this.brokerLoadDataStore = LoadDataStoreFactory
+ .create(pulsar, BROKER_LOAD_DATA_STORE_TOPIC,
BrokerLoadData.class);
+ this.topBundlesLoadDataStore = LoadDataStoreFactory
+ .create(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC,
TopBundlesLoadData.class);
this.context = LoadManagerContextImpl.builder()
.configuration(conf)
@@ -433,6 +381,7 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
pulsar.runWhenReadyForIncomingRequests(() -> {
try {
+ this.serviceUnitStateChannel.start();
var interval =
conf.getLoadBalancerReportUpdateMinIntervalMillis();
this.brokerLoadDataReportTask =
this.pulsar.getLoadManagerExecutor()
@@ -467,38 +416,33 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
MONITOR_INTERVAL_IN_MILLIS,
TimeUnit.MILLISECONDS);
this.splitScheduler.start();
- this.initWaiter.complete(null);
+ this.initWaiter.complete(true);
this.started = true;
log.info("Started load manager.");
- } catch (Exception ex) {
- failStarting(ex);
+ } catch (Throwable e) {
+ failStarting(e);
}
});
- } catch (Exception ex) {
+ } catch (Throwable ex) {
failStarting(ex);
}
}
- private void failStarting(Exception ex) {
- log.error("Failed to start the extensible load balance and close
broker registry {}.",
- this.brokerRegistry, ex);
+ private void failStarting(Throwable throwable) {
if (this.brokerRegistry != null) {
try {
- brokerRegistry.unregister();
- } catch (MetadataStoreException e) {
- // ignore
- }
- }
- if (this.serviceUnitStateChannel != null) {
- try {
- serviceUnitStateChannel.close();
- } catch (IOException e) {
- // ignore
+ brokerRegistry.close();
+ } catch (PulsarServerException e) {
+ // If close failed, this broker might still exist in the
metadata store. Then it could be found by other
+ // brokers as an available broker. Hence, print a warning log
for it.
+ log.warn("Failed to close the broker registry: {}",
e.getMessage());
}
}
- initWaiter.completeExceptionally(ex);
+ initWaiter.complete(false); // exit the background thread gracefully
+ throw
PulsarServerException.toUncheckedException(PulsarServerException.from(throwable));
}
+
@Override
public void initialize(PulsarService pulsar) {
this.pulsar = pulsar;
@@ -843,7 +787,9 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
boolean becameFollower = false;
while (!Thread.currentThread().isInterrupted()) {
try {
- initWaiter.get();
+ if (!initWaiter.get()) {
+ return;
+ }
if (!serviceUnitStateChannel.isChannelOwner()) {
becameFollower = true;
break;
@@ -893,7 +839,9 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
boolean becameLeader = false;
while (!Thread.currentThread().isInterrupted()) {
try {
- initWaiter.get();
+ if (!initWaiter.get()) {
+ return;
+ }
if (serviceUnitStateChannel.isChannelOwner()) {
becameLeader = true;
break;
@@ -957,7 +905,9 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
@VisibleForTesting
protected void monitor() {
try {
- initWaiter.get();
+ if (!initWaiter.get()) {
+ return;
+ }
// Monitor role
// Periodically check the role in case ZK watcher fails.
@@ -1012,4 +962,14 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
log.warn("Failed to wait for closing internal topics", e);
}
}
+
+ @VisibleForTesting
+ protected BrokerRegistry createBrokerRegistry(PulsarService pulsar) {
+ return new BrokerRegistryImpl(pulsar);
+ }
+
+ @VisibleForTesting
+ protected ServiceUnitStateChannel
createServiceUnitStateChannel(PulsarService pulsar) {
+ return new ServiceUnitStateChannelImpl(pulsar);
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerFailFastTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerFailFastTest.java
new file mode 100644
index 00000000000..a400bf733e5
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerFailFastTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.Optional;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.LoadManager;
+import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
+import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
+import org.apache.pulsar.common.util.PortManager;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.awaitility.Awaitility;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class LoadManagerFailFastTest {
+
+ private static final String cluster = "test";
+ private final int zkPort = PortManager.nextLockedFreePort();
+ private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2,
zkPort, PortManager::nextLockedFreePort);
+ private final ServiceConfiguration config = new ServiceConfiguration();
+
+ @BeforeClass
+ protected void setup() throws Exception {
+ bk.start();
+ config.setClusterName(cluster);
+ config.setAdvertisedAddress("localhost");
+ config.setBrokerServicePort(Optional.of(0));
+ config.setWebServicePort(Optional.of(0));
+ config.setMetadataStoreUrl("zk:localhost:" + zkPort);
+ }
+
+ @AfterClass
+ protected void cleanup() throws Exception {
+ bk.stop();
+ }
+
+ @Test(timeOut = 30000)
+ public void testBrokerRegistryFailure() throws Exception {
+
config.setLoadManagerClassName(BrokerRegistryLoadManager.class.getName());
+ @Cleanup final var pulsar = new PulsarService(config);
+ try {
+ pulsar.start();
+ Assert.fail();
+ } catch (PulsarServerException e) {
+ Assert.assertNull(e.getCause());
+ Assert.assertEquals(e.getMessage(), "Cannot start BrokerRegistry");
+ }
+
Assert.assertTrue(pulsar.getLocalMetadataStore().getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).get()
+ .isEmpty());
+ }
+
+ @Test(timeOut = 30000)
+ public void testServiceUnitStateChannelFailure() throws Exception {
+ config.setLoadManagerClassName(ChannelLoadManager.class.getName());
+ @Cleanup final var pulsar = new PulsarService(config);
+ try {
+ pulsar.start();
+ Assert.fail();
+ } catch (PulsarServerException e) {
+ Assert.assertNull(e.getCause());
+ Assert.assertEquals(e.getMessage(), "Cannot start
ServiceUnitStateChannel");
+ }
+ Awaitility.await().untilAsserted(() ->
Assert.assertTrue(pulsar.getLocalMetadataStore()
+
.getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).get().isEmpty()));
+ }
+
+ private static class BrokerRegistryLoadManager extends
ExtensibleLoadManagerImpl {
+
+ @Override
+ protected BrokerRegistry createBrokerRegistry(PulsarService pulsar) {
+ final var mockBrokerRegistry =
Mockito.mock(BrokerRegistryImpl.class);
+ try {
+ Mockito.doThrow(new PulsarServerException("Cannot start
BrokerRegistry")).when(mockBrokerRegistry)
+ .start();
+ } catch (PulsarServerException e) {
+ throw new RuntimeException(e);
+ }
+ return mockBrokerRegistry;
+ }
+ }
+
+ private static class ChannelLoadManager extends ExtensibleLoadManagerImpl {
+
+ @Override
+ protected ServiceUnitStateChannel
createServiceUnitStateChannel(PulsarService pulsar) {
+ final var channel =
Mockito.mock(ServiceUnitStateChannelImpl.class);
+ try {
+ Mockito.doThrow(new PulsarServerException("Cannot start
ServiceUnitStateChannel")).when(channel)
+ .start();
+ } catch (PulsarServerException e) {
+ throw new RuntimeException(e);
+ }
+ Mockito.doAnswer(__ -> null).when(channel).listen(Mockito.any());
+ return channel;
+ }
+ }
+}