This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 53df683b0f7 [fix][broker] Ensure that PulsarService is ready for
serving incoming requests (#22977)
53df683b0f7 is described below
commit 53df683b0f78f5f7c12f87e6fbb4d73637ca5bd5
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Jun 26 17:54:19 2024 +0300
[fix][broker] Ensure that PulsarService is ready for serving incoming
requests (#22977)
---
.../org/apache/pulsar/broker/PulsarService.java | 16 ++-
.../extensions/ExtensibleLoadManagerImpl.java | 131 ++++++++++++---------
.../pulsar/broker/namespace/NamespaceService.java | 4 +-
.../broker/service/PulsarChannelInitializer.java | 7 +-
.../apache/pulsar/broker/service/ServerCnx.java | 4 +
.../org/apache/pulsar/broker/web/WebService.java | 50 ++++++++
6 files changed, 156 insertions(+), 56 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 4fa773dace9..0d8bc571c57 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -291,6 +291,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
private final ExecutorProvider transactionExecutorProvider;
private final DefaultMonotonicSnapshotClock monotonicSnapshotClock;
private String brokerId;
+ private final CompletableFuture<Void> readyForIncomingRequestsFuture = new
CompletableFuture<>();
public enum State {
Init, Started, Closing, Closed
@@ -999,6 +1000,9 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
this.metricsGenerator = new MetricsGenerator(this);
+ // the broker is ready to accept incoming requests by Pulsar
binary protocol and http/https
+ readyForIncomingRequestsFuture.complete(null);
+
// Initialize the message protocol handlers.
// start the protocol handlers only after the broker is ready,
// so that the protocol handlers can access broker service
properly.
@@ -1047,12 +1051,22 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
state = State.Started;
} catch (Exception e) {
LOG.error("Failed to start Pulsar service: {}", e.getMessage(), e);
- throw new PulsarServerException(e);
+ PulsarServerException startException = new
PulsarServerException(e);
+
readyForIncomingRequestsFuture.completeExceptionally(startException);
+ throw startException;
} finally {
mutex.unlock();
}
}
+ public void runWhenReadyForIncomingRequests(Runnable runnable) {
+ readyForIncomingRequestsFuture.thenRun(runnable);
+ }
+
+ public void waitUntilReadyForIncomingRequests() throws ExecutionException,
InterruptedException {
+ readyForIncomingRequestsFuture.get();
+ }
+
protected BrokerInterceptor newBrokerInterceptor() throws IOException {
return BrokerInterceptors.load(config);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 92dcf8001ad..4a7ba90aad9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -36,7 +36,6 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -167,10 +166,10 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
private TopBundleLoadDataReporter topBundleLoadDataReporter;
- private ScheduledFuture brokerLoadDataReportTask;
- private ScheduledFuture topBundlesLoadDataReportTask;
+ private volatile ScheduledFuture brokerLoadDataReportTask;
+ private volatile ScheduledFuture topBundlesLoadDataReportTask;
- private ScheduledFuture monitorTask;
+ private volatile ScheduledFuture monitorTask;
private SplitScheduler splitScheduler;
private UnloadManager unloadManager;
@@ -199,7 +198,7 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
private final ConcurrentHashMap<String,
CompletableFuture<Optional<BrokerLookupData>>>
lookupRequests = new ConcurrentHashMap<>();
- private final CountDownLatch initWaiter = new CountDownLatch(1);
+ private final CompletableFuture<Void> initWaiter = new
CompletableFuture<>();
/**
* Get all the bundles that are owned by this broker.
@@ -376,12 +375,14 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
pulsar.getCoordinationService(), pulsar.getBrokerId(),
pulsar.getSafeWebServiceAddress(), ELECTION_ROOT,
state -> {
- pulsar.getLoadManagerExecutor().execute(() -> {
- if (state == LeaderElectionState.Leading) {
- playLeader();
- } else {
- playFollower();
- }
+ pulsar.runWhenReadyForIncomingRequests(() -> {
+ pulsar.getLoadManagerExecutor().execute(() -> {
+ if (state == LeaderElectionState.Leading) {
+ playLeader();
+ } else {
+ playFollower();
+ }
+ });
});
});
this.serviceUnitStateChannel = new
ServiceUnitStateChannelImpl(pulsar);
@@ -391,7 +392,13 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
this.serviceUnitStateChannel.listen(unloadManager);
this.serviceUnitStateChannel.listen(splitManager);
this.leaderElectionService.start();
- this.serviceUnitStateChannel.start();
+ pulsar.runWhenReadyForIncomingRequests(() -> {
+ try {
+ this.serviceUnitStateChannel.start();
+ } catch (Exception e) {
+ failStarting(e);
+ }
+ });
this.antiAffinityGroupPolicyHelper =
new AntiAffinityGroupPolicyHelper(pulsar,
serviceUnitStateChannel);
antiAffinityGroupPolicyHelper.listenFailureDomainUpdate();
@@ -423,54 +430,72 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
new TopBundleLoadDataReporter(pulsar,
brokerRegistry.getBrokerId(), topBundlesLoadDataStore);
this.serviceUnitStateChannel.listen(brokerLoadDataReporter);
this.serviceUnitStateChannel.listen(topBundleLoadDataReporter);
- var interval = conf.getLoadBalancerReportUpdateMinIntervalMillis();
- this.brokerLoadDataReportTask =
this.pulsar.getLoadManagerExecutor()
- .scheduleAtFixedRate(() -> {
- try {
- brokerLoadDataReporter.reportAsync(false);
- // TODO: update broker load metrics using
getLocalData
- } catch (Throwable e) {
- log.error("Failed to run the broker load
manager executor job.", e);
- }
- },
- interval,
- interval, TimeUnit.MILLISECONDS);
-
- this.topBundlesLoadDataReportTask =
this.pulsar.getLoadManagerExecutor()
- .scheduleAtFixedRate(() -> {
- try {
- // TODO: consider excluding the bundles
that are in the process of split.
-
topBundleLoadDataReporter.reportAsync(false);
- } catch (Throwable e) {
- log.error("Failed to run the top bundles
load manager executor job.", e);
- }
- },
- interval,
- interval, TimeUnit.MILLISECONDS);
-
- this.monitorTask = this.pulsar.getLoadManagerExecutor()
- .scheduleAtFixedRate(() -> {
- monitor();
- },
- MONITOR_INTERVAL_IN_MILLIS,
- MONITOR_INTERVAL_IN_MILLIS, TimeUnit.MILLISECONDS);
this.unloadScheduler = new UnloadScheduler(
pulsar, pulsar.getLoadManagerExecutor(), unloadManager,
context,
serviceUnitStateChannel, unloadCounter, unloadMetrics);
this.splitScheduler = new SplitScheduler(
pulsar, serviceUnitStateChannel, splitManager,
splitCounter, splitMetrics, context);
- this.splitScheduler.start();
- this.initWaiter.countDown();
- this.started = true;
- log.info("Started load manager.");
+
+ pulsar.runWhenReadyForIncomingRequests(() -> {
+ try {
+ var interval =
conf.getLoadBalancerReportUpdateMinIntervalMillis();
+
+ this.brokerLoadDataReportTask =
this.pulsar.getLoadManagerExecutor()
+ .scheduleAtFixedRate(() -> {
+ try {
+
brokerLoadDataReporter.reportAsync(false);
+ // TODO: update broker load
metrics using getLocalData
+ } catch (Throwable e) {
+ log.error("Failed to run the
broker load manager executor job.", e);
+ }
+ },
+ interval,
+ interval, TimeUnit.MILLISECONDS);
+
+ this.topBundlesLoadDataReportTask =
this.pulsar.getLoadManagerExecutor()
+ .scheduleAtFixedRate(() -> {
+ try {
+ // TODO: consider excluding the
bundles that are in the process of split.
+
topBundleLoadDataReporter.reportAsync(false);
+ } catch (Throwable e) {
+ log.error("Failed to run the top
bundles load manager executor job.", e);
+ }
+ },
+ interval,
+ interval, TimeUnit.MILLISECONDS);
+
+ this.monitorTask = this.pulsar.getLoadManagerExecutor()
+ .scheduleAtFixedRate(() -> {
+ monitor();
+ },
+ MONITOR_INTERVAL_IN_MILLIS,
+ MONITOR_INTERVAL_IN_MILLIS,
TimeUnit.MILLISECONDS);
+
+ this.splitScheduler.start();
+ this.initWaiter.complete(null);
+ this.started = true;
+ log.info("Started load manager.");
+ } catch (Exception ex) {
+ failStarting(ex);
+ }
+ });
} catch (Exception ex) {
- log.error("Failed to start the extensible load balance and close
broker registry {}.",
- this.brokerRegistry, ex);
- if (this.brokerRegistry != null) {
+ failStarting(ex);
+ }
+ }
+
+ private void failStarting(Exception ex) {
+ log.error("Failed to start the extensible load balance and close
broker registry {}.",
+ this.brokerRegistry, ex);
+ if (this.brokerRegistry != null) {
+ try {
brokerRegistry.close();
+ } catch (PulsarServerException e) {
+ // ignore
}
}
+ initWaiter.completeExceptionally(ex);
}
@Override
@@ -816,7 +841,7 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
boolean becameFollower = false;
while (!Thread.currentThread().isInterrupted()) {
try {
- initWaiter.await();
+ initWaiter.get();
if (!serviceUnitStateChannel.isChannelOwner()) {
becameFollower = true;
break;
@@ -866,7 +891,7 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
boolean becameLeader = false;
while (!Thread.currentThread().isInterrupted()) {
try {
- initWaiter.await();
+ initWaiter.get();
if (serviceUnitStateChannel.isChannelOwner()) {
becameLeader = true;
break;
@@ -936,7 +961,7 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
@VisibleForTesting
protected void monitor() {
try {
- initWaiter.await();
+ initWaiter.get();
// Monitor role
// Periodically check the role in case ZK watcher fails.
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index df6a141ddcf..dfd03dfbc6e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1335,7 +1335,9 @@ public class NamespaceService implements AutoCloseable {
bundleOwnershipListeners.add(listener);
}
}
- getOwnedServiceUnits().forEach(bundle ->
notifyNamespaceBundleOwnershipListener(bundle, listeners));
+ pulsar.runWhenReadyForIncomingRequests(() -> {
+ getOwnedServiceUnits().forEach(bundle ->
notifyNamespaceBundleOwnershipListener(bundle, listeners));
+ });
}
public void
addNamespaceBundleSplitListener(NamespaceBundleSplitListener... listeners) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
index 5308b3c981e..e276ea24fed 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service;
import com.google.common.annotations.VisibleForTesting;
+import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
@@ -104,6 +105,9 @@ public class PulsarChannelInitializer extends
ChannelInitializer<SocketChannel>
@Override
protected void initChannel(SocketChannel ch) throws Exception {
+ // disable auto read explicitly so that requests aren't served until
auto read is enabled
+ // ServerCnx must enable auto read in channelActive after
PulsarService is ready to accept incoming requests
+ ch.config().setAutoRead(false);
ch.pipeline().addLast("consolidation", new
FlushConsolidationHandler(1024, true));
if (this.enableTls) {
if (this.tlsEnabledWithKeyStore) {
@@ -128,7 +132,8 @@ public class PulsarChannelInitializer extends
ChannelInitializer<SocketChannel>
// ServerCnx ends up reading higher number of messages and broker can
not throttle the messages by disabling
// auto-read.
ch.pipeline().addLast("flowController", new FlowControlHandler());
- ServerCnx cnx = newServerCnx(pulsar, listenerName);
+ // using "ChannelHandler" type to workaround an IntelliJ bug that
shows a false positive error
+ ChannelHandler cnx = newServerCnx(pulsar, listenerName);
ch.pipeline().addLast("handler", cnx);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index b184f794949..4933aee974d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -369,6 +369,10 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
this.commandSender = new PulsarCommandSenderImpl(brokerInterceptor,
this);
this.service.getPulsarStats().recordConnectionCreate();
cnxsPerThread.get().add(this);
+ service.getPulsar().runWhenReadyForIncomingRequests(() -> {
+ // enable auto read after PulsarService is ready to accept
incoming requests
+ ctx.channel().config().setAutoRead(true);
+ });
}
@Override
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
index bf484d4f41f..c969f40ad43 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
@@ -20,12 +20,21 @@ package org.apache.pulsar.broker.web;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.jetty.JettyStatisticsCollector;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.ExecutionException;
import javax.servlet.DispatcherType;
+import javax.servlet.Filter;
+import javax.servlet.FilterChain;
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServletResponse;
import lombok.Getter;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
@@ -232,6 +241,7 @@ public class WebService implements AutoCloseable {
private final FilterHolder authenticationFilterHolder;
FilterInitializer(PulsarService pulsarService) {
ServiceConfiguration config = pulsarService.getConfiguration();
+
if (config.getMaxConcurrentHttpRequests() > 0) {
FilterHolder filterHolder = new FilterHolder(QoSFilter.class);
filterHolder.setInitParameter("maxRequests",
String.valueOf(config.getMaxConcurrentHttpRequests()));
@@ -243,6 +253,10 @@ public class WebService implements AutoCloseable {
new
RateLimitingFilter(config.getHttpRequestsMaxPerSecond())));
}
+ // wait until the PulsarService is ready to serve incoming requests
+ filterHolders.add(
+ new FilterHolder(new
WaitUntilPulsarServiceIsReadyForIncomingRequestsFilter(pulsarService)));
+
boolean brokerInterceptorEnabled =
pulsarService.getBrokerInterceptor() != null;
if (brokerInterceptorEnabled) {
ExceptionHandler handler = new ExceptionHandler();
@@ -284,6 +298,42 @@ public class WebService implements AutoCloseable {
}
}
+ // Filter that waits until the PulsarService is ready to serve
incoming requests
+ private static class
WaitUntilPulsarServiceIsReadyForIncomingRequestsFilter implements Filter {
+ private final PulsarService pulsarService;
+
+ public
WaitUntilPulsarServiceIsReadyForIncomingRequestsFilter(PulsarService
pulsarService) {
+ this.pulsarService = pulsarService;
+ }
+
+ @Override
+ public void init(FilterConfig filterConfig) throws
ServletException {
+
+ }
+
+ @Override
+ public void doFilter(ServletRequest request, ServletResponse
response, FilterChain chain)
+ throws IOException, ServletException {
+ try {
+ // Wait until the PulsarService is ready to serve incoming
requests
+ pulsarService.waitUntilReadyForIncomingRequests();
+ } catch (ExecutionException e) {
+ ((HttpServletResponse)
response).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE,
+ "PulsarService failed to start.");
+ return;
+ } catch (InterruptedException e) {
+ ((HttpServletResponse)
response).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE,
+ "PulsarService is not ready.");
+ return;
+ }
+ chain.doFilter(request, response);
+ }
+
+ @Override
+ public void destroy() {
+
+ }
+ }
}
public void addServlet(String path, ServletHolder servletHolder, boolean
requiresAuthentication,