Repository: samza Updated Branches: refs/heads/master 5534fba33 -> b9814daf0
SAMZA-1724: Guarantee exit from ApplicationRunnerMain during deploys Author: Prateek Maheshwari <[email protected]> Reviewers: Jagadish Venkatraman <[email protected]> Closes #538 from prateekm/process-exit Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/b9814daf Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/b9814daf Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/b9814daf Branch: refs/heads/master Commit: b9814daf029d1e9017638936b75fff205696b973 Parents: 5534fba Author: Prateek Maheshwari <[email protected]> Authored: Wed May 30 10:37:23 2018 -0700 Committer: Prateek Maheshwari <[email protected]> Committed: Wed May 30 10:37:23 2018 -0700 ---------------------------------------------------------------------- .../SamzaContainerExceptionHandler.java | 61 ----------------- .../samza/runtime/ApplicationRunnerMain.java | 9 +++ .../samza/runtime/LocalContainerRunner.java | 10 ++- .../util/SamzaUncaughtExceptionHandler.java | 69 ++++++++++++++++++++ .../TestSamzaContainerExceptionHandler.java | 39 ----------- .../TestSamzaUncaughtExceptionHandler.java | 40 ++++++++++++ 6 files changed, 126 insertions(+), 102 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/b9814daf/samza-core/src/main/java/org/apache/samza/container/SamzaContainerExceptionHandler.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/SamzaContainerExceptionHandler.java b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerExceptionHandler.java deleted file mode 100644 index 229f5ef..0000000 --- a/samza-core/src/main/java/org/apache/samza/container/SamzaContainerExceptionHandler.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.container; - -import java.lang.Thread.UncaughtExceptionHandler; - -import org.apache.samza.util.Util; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * An UncaughtExceptionHandler for SamzaContainer that simply executes the configured {@link #runnable} - * when any thread throws an uncaught exception. - */ -public class SamzaContainerExceptionHandler implements UncaughtExceptionHandler { - private static final Logger LOGGER = LoggerFactory.getLogger(SamzaContainerExceptionHandler.class); - private final Runnable runnable; - - public SamzaContainerExceptionHandler(Runnable runnable) { - this.runnable = runnable; - } - /** - * Method invoked when the given thread terminates due to the - * given uncaught exception. - * <p>Any exception thrown by this method will be ignored by the - * Java Virtual Machine. - * - * @param t the thread - * @param e the exception - */ - @Override - public void uncaughtException(Thread t, Throwable e) { - String msg = String.format("Uncaught exception in thread %s.", t.getName()); - LOGGER.error(msg, e); - System.err.println(msg); - e.printStackTrace(System.err); - try { - Util.logThreadDump("Thread dump from uncaught exception handler."); - runnable.run(); - } catch (Throwable throwable) { - // Ignore to avoid further exception propagation - } - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/b9814daf/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java b/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java index 84427ea..f9f7467 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java @@ -23,6 +23,7 @@ import joptsimple.OptionSet; import joptsimple.OptionSpec; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; +import org.apache.samza.util.SamzaUncaughtExceptionHandler; import org.apache.samza.job.JobRunner$; import org.apache.samza.util.CommandLine; import org.apache.samza.util.Util; @@ -52,6 +53,12 @@ public class ApplicationRunnerMain { } public static void main(String[] args) throws Exception { + Thread.setDefaultUncaughtExceptionHandler( + new SamzaUncaughtExceptionHandler(() -> { + System.out.println("Exiting process now."); + System.exit(1); + })); + ApplicationRunnerCommandLine cmdLine = new ApplicationRunnerCommandLine(); OptionSet options = cmdLine.parser().parse(args); Config orgConfig = cmdLine.loadConfig(options); @@ -78,6 +85,8 @@ public class ApplicationRunnerMain { } else { JobRunner$.MODULE$.main(args); } + + System.exit(0); } } http://git-wip-us.apache.org/repos/asf/samza/blob/b9814daf/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java index 7751241..66176d7 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java @@ -31,7 +31,7 @@ import org.apache.samza.container.ContainerHeartbeatClient; import org.apache.samza.container.ContainerHeartbeatMonitor; import org.apache.samza.container.SamzaContainer; import org.apache.samza.container.SamzaContainer$; -import org.apache.samza.container.SamzaContainerExceptionHandler; +import org.apache.samza.util.SamzaUncaughtExceptionHandler; import org.apache.samza.container.SamzaContainerListener; import org.apache.samza.job.ApplicationStatus; import org.apache.samza.job.model.JobModel; @@ -78,6 +78,7 @@ public class LocalContainerRunner extends AbstractApplicationRunner { config, ScalaJavaUtil.toScalaMap(new HashMap<>()), taskFactory); + container.setContainerListener( new SamzaContainerListener() { @Override @@ -96,9 +97,11 @@ public class LocalContainerRunner extends AbstractApplicationRunner { containerRunnerException = t; } }); + startContainerHeartbeatMonitor(); container.run(); stopContainerHeartbeatMonitor(); + if (containerRunnerException != null) { log.error("Container stopped with Exception. Exiting process now.", containerRunnerException); System.exit(1); @@ -127,10 +130,11 @@ public class LocalContainerRunner extends AbstractApplicationRunner { public static void main(String[] args) throws Exception { Thread.setDefaultUncaughtExceptionHandler( - new SamzaContainerExceptionHandler(() -> { + new SamzaUncaughtExceptionHandler(() -> { log.info("Exiting process now."); System.exit(1); })); + String containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID()); log.info(String.format("Got container ID: %s", containerId)); System.out.println(String.format("Container ID: %s", containerId)); @@ -153,6 +157,8 @@ public class LocalContainerRunner extends AbstractApplicationRunner { StreamApplication streamApp = TaskFactoryUtil.createStreamApplication(config); LocalContainerRunner localContainerRunner = new LocalContainerRunner(jobModel, containerId); localContainerRunner.run(streamApp); + + System.exit(0); } private void startContainerHeartbeatMonitor() { http://git-wip-us.apache.org/repos/asf/samza/blob/b9814daf/samza-core/src/main/java/org/apache/samza/util/SamzaUncaughtExceptionHandler.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/util/SamzaUncaughtExceptionHandler.java b/samza-core/src/main/java/org/apache/samza/util/SamzaUncaughtExceptionHandler.java new file mode 100644 index 0000000..d94b2d9 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/util/SamzaUncaughtExceptionHandler.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.util; + +import java.lang.Thread.UncaughtExceptionHandler; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An UncaughtExceptionHandler that logs the uncaught exception, logs a thread dump, and then + * executes the provided {@code runnable}. + * <p> + * Example usage: Exit process if any thread throws an uncaught exception: + * <pre> + * Thread.setDefaultUncaughtExceptionHandler( + * new SamzaUncaughtExceptionHandler(() -> { + * System.exit(1); + * }) + * ); + * </pre> + */ +public class SamzaUncaughtExceptionHandler implements UncaughtExceptionHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(SamzaUncaughtExceptionHandler.class); + private final Runnable runnable; + + public SamzaUncaughtExceptionHandler(Runnable runnable) { + this.runnable = runnable; + } + /** + * Method invoked when the given thread terminates due to the + * given uncaught exception. + * <p>Any exception thrown by this method will be ignored by the + * Java Virtual Machine. + * + * @param t the thread + * @param e the exception + */ + @Override + public void uncaughtException(Thread t, Throwable e) { + String msg = String.format("Uncaught exception in thread %s.", t.getName()); + LOGGER.error(msg, e); + System.err.println(msg); + e.printStackTrace(System.err); + try { + Util.logThreadDump("Thread dump from uncaught exception handler."); + runnable.run(); + } catch (Throwable throwable) { + // Ignore to avoid further exception propagation + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/b9814daf/samza-core/src/test/java/org/apache/samza/container/TestSamzaContainerExceptionHandler.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/container/TestSamzaContainerExceptionHandler.java b/samza-core/src/test/java/org/apache/samza/container/TestSamzaContainerExceptionHandler.java deleted file mode 100644 index 387bbd4..0000000 --- a/samza-core/src/test/java/org/apache/samza/container/TestSamzaContainerExceptionHandler.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.container; - -import org.apache.samza.SamzaException; -import org.junit.Test; - -import java.util.concurrent.atomic.AtomicBoolean; - -import static org.junit.Assert.assertTrue; - -public class TestSamzaContainerExceptionHandler { - - @Test - public void testExceptionHandler() { - final AtomicBoolean exitCalled = new AtomicBoolean(false); - Thread.UncaughtExceptionHandler exceptionHandler = - new SamzaContainerExceptionHandler(() -> exitCalled.getAndSet(true)); - exceptionHandler.uncaughtException(Thread.currentThread(), new SamzaException()); - assertTrue(exitCalled.get()); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/b9814daf/samza-core/src/test/java/org/apache/samza/container/TestSamzaUncaughtExceptionHandler.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/container/TestSamzaUncaughtExceptionHandler.java b/samza-core/src/test/java/org/apache/samza/container/TestSamzaUncaughtExceptionHandler.java new file mode 100644 index 0000000..d15f7da --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/container/TestSamzaUncaughtExceptionHandler.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.container; + +import org.apache.samza.SamzaException; +import org.apache.samza.util.SamzaUncaughtExceptionHandler; +import org.junit.Test; + +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.Assert.assertTrue; + +public class TestSamzaUncaughtExceptionHandler { + + @Test + public void testExceptionHandler() { + final AtomicBoolean exitCalled = new AtomicBoolean(false); + Thread.UncaughtExceptionHandler exceptionHandler = + new SamzaUncaughtExceptionHandler(() -> exitCalled.getAndSet(true)); + exceptionHandler.uncaughtException(Thread.currentThread(), new SamzaException()); + assertTrue(exitCalled.get()); + } +}
