sijie closed pull request #1726: Introduce lifecycle components for managing
components in AutoRecovery
URL: https://github.com/apache/bookkeeper/pull/1726
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/bookkeeper-server/pom.xml b/bookkeeper-server/pom.xml
index 90dcf0a5c4..89e696357f 100644
--- a/bookkeeper-server/pom.xml
+++ b/bookkeeper-server/pom.xml
@@ -131,6 +131,18 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.bookkeeper.stats</groupId>
+ <artifactId>prometheus-metrics-provider</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.bookkeeper.http</groupId>
+ <artifactId>vertx-http-server</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
index d63e19eac3..cd4aee2ce0 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
@@ -27,19 +27,26 @@
import java.io.File;
import java.io.IOException;
+import java.lang.Thread.UncaughtExceptionHandler;
import java.net.MalformedURLException;
+import java.util.concurrent.ExecutionException;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieCriticalThread;
import org.apache.bookkeeper.bookie.ExitCode;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.common.component.ComponentStarter;
+import org.apache.bookkeeper.common.component.LifecycleComponent;
+import org.apache.bookkeeper.common.component.LifecycleComponentStack;
import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.http.HttpServer;
-import org.apache.bookkeeper.http.HttpServerLoader;
import
org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
import
org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
+import org.apache.bookkeeper.server.conf.BookieConfiguration;
import org.apache.bookkeeper.server.http.BKHttpServiceProvider;
+import org.apache.bookkeeper.server.service.AutoRecoveryService;
+import org.apache.bookkeeper.server.service.HttpService;
+import org.apache.bookkeeper.server.service.StatsProviderService;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.commons.cli.BasicParser;
@@ -70,6 +77,9 @@
private volatile boolean shuttingDown = false;
private volatile boolean running = false;
+ // Exception handler
+ private volatile UncaughtExceptionHandler uncaughtExceptionHandler = null;
+
public AutoRecoveryMain(ServerConfiguration conf) throws IOException,
InterruptedException, KeeperException, UnavailableException,
CompatibilityException {
@@ -102,6 +112,9 @@ public AutoRecoveryMain(ServerConfiguration conf,
StatsLogger statsLogger)
public void start() throws UnavailableException {
auditorElector.start();
replicationWorker.start();
+ if (null != uncaughtExceptionHandler) {
+ deathWatcher.setUncaughtExceptionHandler(uncaughtExceptionHandler);
+ }
deathWatcher.start();
running = true;
}
@@ -129,13 +142,6 @@ private void shutdown(int exitCode) {
shuttingDown = true;
running = false;
this.exitCode = exitCode;
- try {
- deathWatcher.interrupt();
- deathWatcher.join();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOG.warn("Interrupted shutting down auto recovery", e);
- }
try {
auditorElector.shutdown();
@@ -158,6 +164,18 @@ private int getExitCode() {
return exitCode;
}
+ /**
+ * Currently the uncaught exception handler is used for DeathWatcher to
notify
+ * lifecycle management that a bookie is dead for some reasons.
+ *
+ * <p>in future, we can register this <tt>exceptionHandler</tt> to
critical threads
+ * so when those threads are dead, it will automatically trigger lifecycle
management
+ * to shutdown the process.
+ */
+ public void setExceptionHandler(UncaughtExceptionHandler exceptionHandler)
{
+ this.uncaughtExceptionHandler = exceptionHandler;
+ }
+
@VisibleForTesting
public Auditor getAuditor() {
return auditorElector.getAuditor();
@@ -171,7 +189,7 @@ public boolean isAutoRecoveryRunning() {
/*
* DeathWatcher for AutoRecovery daemons.
*/
- private static class AutoRecoveryDeathWatcher extends BookieCriticalThread
{
+ private class AutoRecoveryDeathWatcher extends BookieCriticalThread {
private int watchInterval;
private AutoRecoveryMain autoRecoveryMain;
@@ -180,6 +198,13 @@ public AutoRecoveryDeathWatcher(AutoRecoveryMain
autoRecoveryMain) {
+ autoRecoveryMain.conf.getBookiePort());
this.autoRecoveryMain = autoRecoveryMain;
watchInterval = autoRecoveryMain.conf.getDeathWatchInterval();
+ // set a default uncaught exception handler to shutdown the
AutoRecovery
+ // when it notices the AutoRecovery is not running any more.
+ setUncaughtExceptionHandler((thread, cause) -> {
+ LOG.info("AutoRecoveryDeathWatcher exited loop due to uncaught
exception from thread {}",
+ thread.getName(), cause);
+ shutdown();
+ });
}
@Override
@@ -189,13 +214,20 @@ public void run() {
Thread.sleep(watchInterval);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
- break;
}
// If any one service not running, then shutdown peer.
- if (!autoRecoveryMain.auditorElector.isRunning()
- || !autoRecoveryMain.replicationWorker.isRunning()) {
- autoRecoveryMain.shutdown(ExitCode.SERVER_EXCEPTION);
- break;
+ if (!autoRecoveryMain.auditorElector.isRunning() ||
!autoRecoveryMain.replicationWorker.isRunning()) {
+ LOG.info(
+ "AutoRecoveryDeathWatcher noticed the AutoRecovery
is not running any more,"
+ + "exiting the watch loop!");
+ /*
+ * death watcher has noticed that AutoRecovery is not
+ * running any more throw an exception to fail the death
+ * watcher thread and it will trigger the uncaught
exception
+ * handler to handle this "AutoRecovery not running"
+ * situation.
+ */
+ throw new RuntimeException("AutoRecovery is not running
any more");
}
}
}
@@ -266,45 +298,73 @@ private static ServerConfiguration parseArgs(String[]
args)
}
public static void main(String[] args) {
- ServerConfiguration conf = null;
+ int retCode = doMain(args);
+ Runtime.getRuntime().exit(retCode);
+ }
+
+ static int doMain(String[] args) {
+
+ ServerConfiguration conf;
+
+ // 0. parse command line
try {
conf = parseArgs(args);
} catch (IllegalArgumentException iae) {
- LOG.error("Error parsing command line arguments : ", iae);
- System.err.println(iae.getMessage());
- printUsage();
- System.exit(ExitCode.INVALID_CONF);
+ return ExitCode.INVALID_CONF;
}
+ // 1. building the component stack:
+ LifecycleComponent server;
try {
- final AutoRecoveryMain autoRecoveryMain = new
AutoRecoveryMain(conf);
- autoRecoveryMain.start();
- HttpServerLoader.loadHttpServer(conf);
- final HttpServer httpServer = HttpServerLoader.get();
- if (conf.isHttpServerEnabled() && httpServer != null) {
- BKHttpServiceProvider serviceProvider = new
BKHttpServiceProvider.Builder()
- .setAutoRecovery(autoRecoveryMain)
- .setServerConfiguration(conf)
- .build();
- httpServer.initialize(serviceProvider);
- httpServer.startServer(conf.getHttpServerPort());
- }
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- autoRecoveryMain.shutdown();
- if (httpServer != null && httpServer.isRunning()) {
- httpServer.stopServer();
- }
- LOG.info("Shutdown AutoRecoveryMain successfully");
- }
- });
- LOG.info("Register shutdown hook successfully");
- autoRecoveryMain.join();
- System.exit(autoRecoveryMain.getExitCode());
+ server = buildAutoRecoveryServer(new BookieConfiguration(conf));
} catch (Exception e) {
- LOG.error("Exception running AutoRecoveryMain : ", e);
- System.exit(ExitCode.SERVER_EXCEPTION);
+ LOG.error("Failed to build AutoRecovery Server", e);
+ return ExitCode.SERVER_EXCEPTION;
+ }
+
+ // 2. start the server
+ try {
+ ComponentStarter.startComponent(server).get();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ // the server is interrupted
+ LOG.info("AutoRecovery server is interrupted. Exiting ...");
+ } catch (ExecutionException ee) {
+ LOG.error("Error in bookie shutdown", ee.getCause());
+ return ExitCode.SERVER_EXCEPTION;
}
+ return ExitCode.OK;
+ }
+
+ public static LifecycleComponentStack
buildAutoRecoveryServer(BookieConfiguration conf) throws Exception {
+ LifecycleComponentStack.Builder serverBuilder =
LifecycleComponentStack.newBuilder()
+ .withName("autorecovery-server");
+
+ // 1. build stats provider
+ StatsProviderService statsProviderService = new
StatsProviderService(conf);
+ StatsLogger rootStatsLogger =
statsProviderService.getStatsProvider().getStatsLogger("");
+
+ serverBuilder.addComponent(statsProviderService);
+ LOG.info("Load lifecycle component : {}",
StatsProviderService.class.getName());
+
+ // 2. build AutoRecovery server
+ AutoRecoveryService autoRecoveryService = new
AutoRecoveryService(conf, rootStatsLogger);
+
+ serverBuilder.addComponent(autoRecoveryService);
+ LOG.info("Load lifecycle component : {}",
AutoRecoveryService.class.getName());
+
+ // 4. build http service
+ if (conf.getServerConf().isHttpServerEnabled()) {
+ BKHttpServiceProvider provider = new
BKHttpServiceProvider.Builder()
+
.setAutoRecovery(autoRecoveryService.getAutoRecoveryServer())
+ .setServerConfiguration(conf.getServerConf())
+
.setStatsProvider(statsProviderService.getStatsProvider()).build();
+ HttpService httpService = new HttpService(provider, conf,
rootStatsLogger);
+
+ serverBuilder.addComponent(httpService);
+ LOG.info("Load lifecycle component : {}",
HttpService.class.getName());
+ }
+
+ return serverBuilder.build();
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/Main.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/Main.java
index ae92955a32..b991d31b89 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/Main.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/Main.java
@@ -284,7 +284,7 @@ private static ServerConfiguration
parseCommandLine(String[] args)
* @param conf bookie server configuration
* @return lifecycle stack
*/
- static LifecycleComponentStack buildBookieServer(BookieConfiguration conf)
throws Exception {
+ public static LifecycleComponentStack
buildBookieServer(BookieConfiguration conf) throws Exception {
LifecycleComponentStack.Builder serverBuilder =
LifecycleComponentStack.newBuilder().withName("bookie-server");
// 1. build stats provider
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/AutoRecoveryService.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/AutoRecoveryService.java
index b2b8f07d0c..f8389df69d 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/AutoRecoveryService.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/AutoRecoveryService.java
@@ -19,6 +19,8 @@
package org.apache.bookkeeper.server.service;
import java.io.IOException;
+import java.lang.Thread.UncaughtExceptionHandler;
+
import org.apache.bookkeeper.replication.AutoRecoveryMain;
import
org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
import org.apache.bookkeeper.server.component.ServerLifecycleComponent;
@@ -41,6 +43,15 @@ public AutoRecoveryService(BookieConfiguration conf,
StatsLogger statsLogger) th
statsLogger);
}
+ @Override
+ public void setExceptionHandler(UncaughtExceptionHandler handler) {
+ main.setExceptionHandler(handler);
+ }
+
+ public AutoRecoveryMain getAutoRecoveryServer() {
+ return main;
+ }
+
@Override
protected void doStart() {
try {
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
index 48ea817f5e..6500e0b544 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
@@ -33,16 +33,23 @@
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.BindException;
import java.net.InetAddress;
+import java.net.URL;
+import java.net.URLConnection;
import java.security.AccessControlException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
@@ -55,20 +62,29 @@
import org.apache.bookkeeper.client.BookKeeperAdmin;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.common.component.ComponentStarter;
+import org.apache.bookkeeper.common.component.Lifecycle;
+import org.apache.bookkeeper.common.component.LifecycleComponent;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.discover.RegistrationManager;
+import org.apache.bookkeeper.http.HttpRouter;
+import org.apache.bookkeeper.http.HttpServerLoader;
import org.apache.bookkeeper.meta.MetadataBookieDriver;
import org.apache.bookkeeper.meta.zk.ZKMetadataBookieDriver;
import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.replication.AutoRecoveryMain;
import
org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
import
org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
+import org.apache.bookkeeper.replication.ReplicationStats;
+import org.apache.bookkeeper.server.Main;
import org.apache.bookkeeper.server.conf.BookieConfiguration;
+import org.apache.bookkeeper.server.service.AutoRecoveryService;
import org.apache.bookkeeper.server.service.BookieService;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.prometheus.PrometheusMetricsProvider;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.test.PortManager;
import org.apache.bookkeeper.tls.SecurityException;
@@ -91,6 +107,8 @@
private static final Logger LOG = LoggerFactory
.getLogger(BookieInitializationTest.class);
+ private static ObjectMapper om = new ObjectMapper();
+
@Rule
public final TestName runtime = new TestName();
ZKMetadataBookieDriver driver;
@@ -495,6 +513,22 @@ public void testBookieServiceExceptionHandler() throws
Exception {
startFuture.get();
}
+ @Test
+ public void testAutoRecoveryServiceExceptionHandler() throws Exception {
+ ServerConfiguration conf =
TestBKConfiguration.newServerConfiguration();
+ conf.setMetadataServiceUri(metadataServiceUri);
+
+ BookieConfiguration bkConf = new BookieConfiguration(conf);
+ AutoRecoveryService service = new AutoRecoveryService(bkConf,
NullStatsLogger.INSTANCE);
+ CompletableFuture<Void> startFuture =
ComponentStarter.startComponent(service);
+
+ // shutdown the AutoRecovery service
+ service.getAutoRecoveryServer().shutdown();
+
+ // the AutoRecovery lifecycle component should be shutdown.
+ startFuture.get();
+ }
+
/**
* Verify bookie server starts up on ephemeral ports.
*/
@@ -1072,4 +1106,100 @@ private void corruptFile(File file) throws IOException {
}
}
+ @Test
+ public void
testIOVertexHTTPServerEndpointForBookieWithPrometheusProvider() throws
Exception {
+ File tmpDir = createTempDir("bookie", "test");
+
+ final ServerConfiguration conf =
TestBKConfiguration.newServerConfiguration()
+ .setJournalDirName(tmpDir.getPath()).setLedgerDirNames(new
String[] { tmpDir.getPath() })
+
.setBookiePort(PortManager.nextFreePort()).setMetadataServiceUri(metadataServiceUri)
+ .setListeningInterface(null);
+
+ /*
+ * enable io.vertx http server
+ */
+ int nextFreePort = PortManager.nextFreePort();
+ conf.setStatsProviderClass(PrometheusMetricsProvider.class);
+ conf.setHttpServerEnabled(true);
+ conf.setProperty(HttpServerLoader.HTTP_SERVER_CLASS,
"org.apache.bookkeeper.http.vertx.VertxHttpServer");
+ conf.setHttpServerPort(nextFreePort);
+
+ // 1. building the component stack:
+ LifecycleComponent server = Main.buildBookieServer(new
BookieConfiguration(conf));
+ // 2. start the server
+ CompletableFuture<Void> stackComponentFuture =
ComponentStarter.startComponent(server);
+ while (server.lifecycleState() != Lifecycle.State.STARTED) {
+ Thread.sleep(100);
+ }
+
+ // Now, hit the rest endpoint for metrics
+ URL url = new URL("http://localhost:" + nextFreePort +
HttpRouter.METRICS);
+ URLConnection urlc = url.openConnection();
+ BufferedReader in = new BufferedReader(new
InputStreamReader(urlc.getInputStream()));
+ String inputLine;
+ StringBuilder metricsStringBuilder = new StringBuilder();
+ while ((inputLine = in.readLine()) != null) {
+ metricsStringBuilder.append(inputLine);
+ }
+ in.close();
+ String metrics = metricsStringBuilder.toString();
+ // do primitive checks if metrics string contains some stats
+ assertTrue("Metrics should contain basic counters",
metrics.contains(BookKeeperServerStats.BOOKIE_ADD_ENTRY));
+
+ // Now, hit the rest endpoint for configs
+ url = new URL("http://localhost:" + nextFreePort +
HttpRouter.SERVER_CONFIG);
+ @SuppressWarnings("unchecked")
+ Map<String, Object> configMap = om.readValue(url, Map.class);
+ if (configMap.isEmpty() || !configMap.containsKey("bookiePort")) {
+ Assert.fail("Failed to map configurations to valid JSON entries.");
+ }
+ stackComponentFuture.cancel(true);
+ }
+
+ @Test
+ public void testIOVertexHTTPServerEndpointForARWithPrometheusProvider()
throws Exception {
+ final ServerConfiguration conf =
TestBKConfiguration.newServerConfiguration()
+
.setMetadataServiceUri(metadataServiceUri).setListeningInterface(null);
+
+ /*
+ * enable io.vertx http server
+ */
+ int nextFreePort = PortManager.nextFreePort();
+ conf.setStatsProviderClass(PrometheusMetricsProvider.class);
+ conf.setHttpServerEnabled(true);
+ conf.setProperty(HttpServerLoader.HTTP_SERVER_CLASS,
"org.apache.bookkeeper.http.vertx.VertxHttpServer");
+ conf.setHttpServerPort(nextFreePort);
+
+ // 1. building the component stack:
+ LifecycleComponent server =
AutoRecoveryMain.buildAutoRecoveryServer(new BookieConfiguration(conf));
+ // 2. start the server
+ CompletableFuture<Void> stackComponentFuture =
ComponentStarter.startComponent(server);
+ while (server.lifecycleState() != Lifecycle.State.STARTED) {
+ Thread.sleep(100);
+ }
+
+ // Now, hit the rest endpoint for metrics
+ URL url = new URL("http://localhost:" + nextFreePort +
HttpRouter.METRICS);
+ URLConnection urlc = url.openConnection();
+ BufferedReader in = new BufferedReader(new
InputStreamReader(urlc.getInputStream()));
+ String inputLine;
+ StringBuilder metricsStringBuilder = new StringBuilder();
+ while ((inputLine = in.readLine()) != null) {
+ metricsStringBuilder.append(inputLine);
+ }
+ in.close();
+ String metrics = metricsStringBuilder.toString();
+ // do primitive checks if metrics string contains some stats
+ assertTrue("Metrics should contain basic counters",
+
metrics.contains(ReplicationStats.NUM_UNDER_REPLICATED_LEDGERS));
+
+ // Now, hit the rest endpoint for configs
+ url = new URL("http://localhost:" + nextFreePort +
HttpRouter.SERVER_CONFIG);
+ @SuppressWarnings("unchecked")
+ Map<String, Object> configMap = om.readValue(url, Map.class);
+ if (configMap.isEmpty() ||
!configMap.containsKey("metadataServiceUri")) {
+ Assert.fail("Failed to map configurations to valid JSON entries.");
+ }
+ stackComponentFuture.cancel(true);
+ }
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services