This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 50f29ed ISSUE #1540: Bookie/BookieServer components shutdown will
fail to end exit the BookieProcess
50f29ed is described below
commit 50f29ed9425095e2115b7c18518b8e9a81c204e9
Author: Sijie Guo <[email protected]>
AuthorDate: Mon Jul 23 11:02:36 2018 -0700
ISSUE #1540: Bookie/BookieServer components shutdown will fail to end exit
the BookieProcess
Descriptions of the changes in this PR:
### Motivation
Fixes the issue at #1540.
If Bookie/BookieServer components are shutdown internally because of any
fatal errors
(ExitCode - INVALID_CONF, SERVER_EXCEPTION, ZK_EXPIRED, ZK_REG_FAIL,
BOOKIE_EXCEPTION) then
it will go through shutdown method logic and shutdowns components internal
to Bookie/BookieServer
but it will not succeed in bringing down the bookie process.
This is because in BookieServer.main / server.Main.doMain it would wait for
the startComponent
future to complete
http://github.com/apache/bookkeeper/blob/master/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/Main.java#L227
.
The startComponent future will be market complete only in runtime
shutdownhook -
https://github.com/apache/bookkeeper/blob/master/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/component/ComponentStarter.java#L66.
But the problem is nowhere in Bookie/BookieProcess shutdown we are calling
System.exit() and hence
the runtime shutdownhook is not executed to mark the startComponent future
to complete. Hence
Main.doMain will wait forever on this future though Bookie/BookieServer
components are shutdown
because of known fatal errors.
### Regression
Issue #508 introduced this regression. Before this change, the main thread
is blocking using `BookieServer#join()`.
When bookie is dead for any reason, the DeathWatchThread will kill the
bookie and bookie server. so the main thread will quite.
However after #508 is introduced, the lifecycle management is disconnected
from the bookie and bookie server. so when they are dead,
lifecycle management is unaware of the situation and the main thread
doesn't quite.
### Changes
- Add `UncaughtExceptionHandler` to lifecycle components
- When a lifecycle component hits an error, it can use
`UncaughtExceptionHandler` to notify lifecycle component stack to shutdown the
whole stack
Master Issue: #1540
Author: Sijie Guo <[email protected]>
Reviewers: Andrey Yegorov <None>, Charan Reddy Guttapalem
<[email protected]>, Enrico Olivelli <[email protected]>
This closes #1543 from sijie/fix_lifcycle_components, closes #1540
---
.../component/AbstractLifecycleComponent.java | 7 +++
.../common/component/ComponentStarter.java | 15 +++++-
.../common/component/LifecycleComponent.java | 11 +++++
.../common/component/LifecycleComponentStack.java | 6 +++
.../common/component/TestComponentStarter.java | 32 ++++++++++++
.../component/TestLifecycleComponentStack.java | 57 ++++++++++++++++++++++
.../apache/bookkeeper/bookie/BookieException.java | 12 +++--
.../org/apache/bookkeeper/proto/BookieServer.java | 34 +++++++++++--
.../bookkeeper/server/service/BookieService.java | 6 +++
.../bookie/BookieInitializationTest.java | 25 ++++++++++
10 files changed, 195 insertions(+), 10 deletions(-)
diff --git
a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/component/AbstractLifecycleComponent.java
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/component/AbstractLifecycleComponent.java
index 38e73bf..015d54d 100644
---
a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/component/AbstractLifecycleComponent.java
+++
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/component/AbstractLifecycleComponent.java
@@ -19,6 +19,7 @@
package org.apache.bookkeeper.common.component;
import java.io.IOException;
+import java.lang.Thread.UncaughtExceptionHandler;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import lombok.extern.slf4j.Slf4j;
@@ -35,6 +36,7 @@ public abstract class AbstractLifecycleComponent<ConfT
extends ComponentConfigur
protected final Lifecycle lifecycle = new Lifecycle();
private final Set<LifecycleListener> listeners = new
CopyOnWriteArraySet<>();
protected final StatsLogger statsLogger;
+ protected volatile UncaughtExceptionHandler uncaughtExceptionHandler;
protected AbstractLifecycleComponent(String componentName,
ConfT conf,
@@ -43,6 +45,11 @@ public abstract class AbstractLifecycleComponent<ConfT
extends ComponentConfigur
this.statsLogger = statsLogger;
}
+ @Override
+ public void setExceptionHandler(UncaughtExceptionHandler handler) {
+ this.uncaughtExceptionHandler = handler;
+ }
+
protected StatsLogger getStatsLogger() {
return statsLogger;
}
diff --git
a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/component/ComponentStarter.java
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/component/ComponentStarter.java
index fa9e9b1..e952747 100644
---
a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/component/ComponentStarter.java
+++
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/component/ComponentStarter.java
@@ -62,8 +62,19 @@ public class ComponentStarter {
*/
public static CompletableFuture<Void> startComponent(LifecycleComponent
component) {
CompletableFuture<Void> future = new CompletableFuture<>();
- Runtime.getRuntime().addShutdownHook(new Thread(
- new ComponentShutdownHook(component, future),
"component-shutdown-thread"));
+ final Thread shutdownHookThread = new Thread(
+ new ComponentShutdownHook(component, future),
+ "component-shutdown-thread"
+ );
+
+ // register a shutdown hook
+ Runtime.getRuntime().addShutdownHook(shutdownHookThread);
+
+ // register a component exception handler
+ component.setExceptionHandler((t, e) -> {
+ // start the shutdown hook when an uncaught exception happen in
the lifecycle component.
+ shutdownHookThread.start();
+ });
log.info("Starting component {}.", component.getName());
component.start();
diff --git
a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/component/LifecycleComponent.java
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/component/LifecycleComponent.java
index 12917fb..d382066 100644
---
a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/component/LifecycleComponent.java
+++
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/component/LifecycleComponent.java
@@ -18,6 +18,8 @@
package org.apache.bookkeeper.common.component;
+import java.lang.Thread.UncaughtExceptionHandler;
+
/**
* A component based on lifecycle management.
*/
@@ -36,4 +38,13 @@ public interface LifecycleComponent extends AutoCloseable {
void stop();
void close();
+
+ /**
+ * Set the default handler invoked when a lifecycle component
+ * abruptly terminates due an uncaught exception.
+ *
+ * @param handler handler invoked when an uncaught exception happens
+ * in the lifecycle component.
+ */
+ void setExceptionHandler(UncaughtExceptionHandler handler);
}
diff --git
a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/component/LifecycleComponentStack.java
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/component/LifecycleComponentStack.java
index 761cff1..d606910 100644
---
a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/component/LifecycleComponentStack.java
+++
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/component/LifecycleComponentStack.java
@@ -24,6 +24,7 @@ import static
com.google.common.base.Preconditions.checkNotNull;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
+import java.lang.Thread.UncaughtExceptionHandler;
import java.util.List;
/**
@@ -121,4 +122,9 @@ public class LifecycleComponentStack implements
LifecycleComponent {
public void close() {
components.reverse().forEach(component -> component.close());
}
+
+ @Override
+ public void setExceptionHandler(UncaughtExceptionHandler handler) {
+ components.forEach(component ->
component.setExceptionHandler(handler));
+ }
}
diff --git
a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/component/TestComponentStarter.java
b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/component/TestComponentStarter.java
index e99c301..ed97adc 100644
---
a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/component/TestComponentStarter.java
+++
b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/component/TestComponentStarter.java
@@ -18,11 +18,17 @@
package org.apache.bookkeeper.common.component;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
import
org.apache.bookkeeper.common.component.ComponentStarter.ComponentShutdownHook;
import org.junit.Test;
@@ -50,4 +56,30 @@ public class TestComponentStarter {
future.get();
}
+ @Test
+ public void testExceptionHandler() throws Exception {
+ // prepare a mock lifecycle component
+ LifecycleComponent component = mock(LifecycleComponent.class);
+ when(component.getName()).thenReturn("test-exception-handler");
+ AtomicReference<UncaughtExceptionHandler> exceptionHandlerRef = new
AtomicReference<>();
+ doAnswer(invocationOnMock -> {
+ UncaughtExceptionHandler handler = invocationOnMock.getArgument(0);
+ exceptionHandlerRef.set(handler);
+ return null;
+
}).when(component).setExceptionHandler(any(UncaughtExceptionHandler.class));
+
+ // start the future
+ CompletableFuture<Void> startFuture =
ComponentStarter.startComponent(component);
+ verify(component, times(1)).start();
+ verify(component,
times(1)).setExceptionHandler(eq(exceptionHandlerRef.get()));
+
+ // if an exception is signaled through exception handler,
+ // the startFuture will be completed and the component will be shutdown
+ exceptionHandlerRef.get().uncaughtException(
+ Thread.currentThread(), new Exception("test-exception-handler"));
+
+ startFuture.get();
+ verify(component, times(1)).close();
+ }
+
}
diff --git
a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/component/TestLifecycleComponentStack.java
b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/component/TestLifecycleComponentStack.java
index 52e532f..de575ae 100644
---
a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/component/TestLifecycleComponentStack.java
+++
b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/component/TestLifecycleComponentStack.java
@@ -19,10 +19,17 @@
package org.apache.bookkeeper.common.component;
import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;
/**
@@ -113,4 +120,54 @@ public class TestLifecycleComponentStack {
verify(component2).close();
}
+ @Test
+ public void testSetExceptionHandler() {
+ LifecycleComponent component1 = mock(LifecycleComponent.class);
+ LifecycleComponent component2 = mock(LifecycleComponent.class);
+
+ LifecycleComponentStack stack = LifecycleComponentStack.newBuilder()
+ .withName("set-exception-handler-stack")
+ .addComponent(component1)
+ .addComponent(component2)
+ .build();
+
+ UncaughtExceptionHandler handler = mock(UncaughtExceptionHandler.class);
+
+ stack.setExceptionHandler(handler);
+ verify(component1, times(1)).setExceptionHandler(eq(handler));
+ verify(component2, times(1)).setExceptionHandler(eq(handler));
+ }
+
+ @Test
+ public void testExceptionHandlerShutdownLifecycleComponentStack() throws
Exception {
+ LifecycleComponent component1 = mock(LifecycleComponent.class);
+ LifecycleComponent component2 = mock(LifecycleComponent.class);
+ AtomicReference<UncaughtExceptionHandler> handlerRef1 = new
AtomicReference<>();
+ doAnswer(invocationOnMock -> {
+ handlerRef1.set(invocationOnMock.getArgument(0));
+ return null;
+
}).when(component1).setExceptionHandler(any(UncaughtExceptionHandler.class));
+
+ LifecycleComponentStack stack = LifecycleComponentStack.newBuilder()
+ .withName("exception-handler-shutdown-lifecycle-component-stack")
+ .addComponent(component1)
+ .addComponent(component2)
+ .build();
+
+ CompletableFuture<Void> startFuture =
ComponentStarter.startComponent(stack);
+ verify(component1, times(1)).start();
+ verify(component1, times(1)).setExceptionHandler(eq(handlerRef1.get()));
+ verify(component2, times(1)).start();
+ verify(component2, times(1)).setExceptionHandler(eq(handlerRef1.get()));
+
+ // if an exception is signaled through any component,
+ // the startFuture will be completed and all the components will be
shutdown
+ handlerRef1.get().uncaughtException(
+ Thread.currentThread(), new
Exception("exception-handler-shutdown-lifecycle-component-stack"));
+
+ startFuture.get();
+ verify(component1, times(1)).close();
+ verify(component2, times(1)).close();
+ }
+
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java
index b7428b2..3d84148 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java
@@ -27,21 +27,26 @@ package org.apache.bookkeeper.bookie;
@SuppressWarnings("serial")
public abstract class BookieException extends Exception {
- private int code;
+ private final int code;
+
public BookieException(int code) {
+ super();
this.code = code;
}
public BookieException(int code, Throwable t) {
super(t);
+ this.code = code;
}
public BookieException(int code, String reason) {
super(reason);
+ this.code = code;
}
public BookieException(int code, String reason, Throwable t) {
super(reason, t);
+ this.code = code;
}
public static BookieException create(int code) {
@@ -85,10 +90,6 @@ public abstract class BookieException extends Exception {
int OperationRejectedException = -108;
}
- public void setCode(int code) {
- this.code = code;
- }
-
public int getCode() {
return this.code;
}
@@ -299,4 +300,5 @@ public abstract class BookieException extends Exception {
super(Code.UnknownBookieIdException, cause);
}
}
+
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
index 711c232..559873e 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
@@ -26,6 +26,7 @@ import static
org.apache.bookkeeper.conf.AbstractConfiguration.PERMITTED_STARTUP
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
+import java.lang.Thread.UncaughtExceptionHandler;
import java.net.UnknownHostException;
import java.security.AccessControlException;
import java.util.Arrays;
@@ -73,6 +74,9 @@ public class BookieServer {
// Expose Stats
private final StatsLogger statsLogger;
+ // Exception handler
+ private volatile UncaughtExceptionHandler uncaughtExceptionHandler = null;
+
public BookieServer(ServerConfiguration conf) throws IOException,
KeeperException, InterruptedException, BookieException,
UnavailableException, CompatibilityException, SecurityException {
@@ -110,6 +114,18 @@ public class BookieServer {
this.nettyServer.setRequestProcessor(this.requestProcessor);
}
+ /**
+ * Currently the uncaught exception handler is used for DeathWatcher to
notify
+ * lifecycle management that a bookie is dead for some reasons.
+ *
+ * <p>in future, we can register this <tt>exceptionHandler</tt> to
critical threads
+ * so when those threads are dead, it will automatically trigger lifecycle
management
+ * to shutdown the process.
+ */
+ public void setExceptionHandler(UncaughtExceptionHandler exceptionHandler)
{
+ this.uncaughtExceptionHandler = exceptionHandler;
+ }
+
protected Bookie newBookie(ServerConfiguration conf)
throws IOException, KeeperException, InterruptedException,
BookieException {
return conf.isForceReadOnlyBookie()
@@ -128,6 +144,9 @@ public class BookieServer {
running = true;
deathWatcher = new DeathWatcher(conf);
+ if (null != uncaughtExceptionHandler) {
+ deathWatcher.setUncaughtExceptionHandler(uncaughtExceptionHandler);
+ }
deathWatcher.start();
// fixes test flappers at random places until ISSUE#1400 is resolved
@@ -236,6 +255,13 @@ public class BookieServer {
DeathWatcher(ServerConfiguration conf) {
super("BookieDeathWatcher-" + conf.getBookiePort());
watchInterval = conf.getDeathWatchInterval();
+ // set a default uncaught exception handler to shutdown the bookie
server
+ // when it notices the bookie is not running any more.
+ setUncaughtExceptionHandler((thread, cause) -> {
+ LOG.info("BookieDeathWatcher exited loop due to uncaught
exception from thread {}",
+ thread.getName(), cause);
+ shutdown();
+ });
}
@Override
@@ -248,11 +274,13 @@ public class BookieServer {
Thread.currentThread().interrupt();
}
if (!isBookieRunning()) {
- shutdown();
- break;
+ LOG.info("BookieDeathWatcher noticed the bookie is not
running any more, exiting the watch loop!");
+ // death watcher has noticed that bookie is not running
any more
+ // throw an exception to fail the death watcher thread and
it will
+ // trigger the uncaught exception handler to handle this
"bookie not running" situation.
+ throw new RuntimeException("Bookie is not running any
more");
}
}
- LOG.info("BookieDeathWatcher exited loop!");
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/BookieService.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/BookieService.java
index ff9e9dc..d6837f6 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/BookieService.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/BookieService.java
@@ -19,6 +19,7 @@
package org.apache.bookkeeper.server.service;
import java.io.IOException;
+import java.lang.Thread.UncaughtExceptionHandler;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.proto.BookieServer;
import
org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
@@ -42,6 +43,11 @@ public class BookieService extends ServerLifecycleComponent {
this.server = new BookieServer(conf.getServerConf(), statsLogger);
}
+ @Override
+ public void setExceptionHandler(UncaughtExceptionHandler handler) {
+ server.setExceptionHandler(handler);
+ }
+
public BookieServer getServer() {
return server;
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
index 40f00ef..48ea817 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
@@ -45,6 +45,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import
org.apache.bookkeeper.bookie.BookieException.DiskPartitionDuplicationException;
import org.apache.bookkeeper.bookie.BookieException.MetadataStoreException;
@@ -53,6 +54,7 @@ import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.BookKeeperAdmin;
import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.common.component.ComponentStarter;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.conf.TestBKConfiguration;
@@ -63,6 +65,8 @@ import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.proto.BookieServer;
import
org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
import
org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
+import org.apache.bookkeeper.server.conf.BookieConfiguration;
+import org.apache.bookkeeper.server.service.BookieService;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
@@ -470,6 +474,27 @@ public class BookieInitializationTest extends
BookKeeperClusterTestCase {
}
}
+ @Test
+ public void testBookieServiceExceptionHandler() throws Exception {
+ File tmpDir = createTempDir("bookie", "exception-handler");
+ ServerConfiguration conf =
TestBKConfiguration.newServerConfiguration();
+ int port = PortManager.nextFreePort();
+ conf.setBookiePort(port)
+ .setJournalDirName(tmpDir.getPath())
+ .setLedgerDirNames(new String[] { tmpDir.getPath() })
+ .setMetadataServiceUri(metadataServiceUri);
+
+ BookieConfiguration bkConf = new BookieConfiguration(conf);
+ BookieService service = new BookieService(bkConf,
NullStatsLogger.INSTANCE);
+ CompletableFuture<Void> startFuture =
ComponentStarter.startComponent(service);
+
+ // shutdown the bookie service
+ service.getServer().getBookie().shutdown();
+
+ // the bookie service lifecycle component should be shutdown.
+ startFuture.get();
+ }
+
/**
* Verify bookie server starts up on ephemeral ports.
*/