Repository: samza
Updated Branches:
  refs/heads/master 5e32a1bb0 -> c7ac26377


SAMZA-507; shutdown container when threads fail with uncaught exceptions


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c7ac2637
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c7ac2637
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c7ac2637

Branch: refs/heads/master
Commit: c7ac26377debacbb94f9c5aac951827895c136a2
Parents: 5e32a1b
Author: Chris Riccomini <[email protected]>
Authored: Fri Feb 13 13:12:14 2015 -0800
Committer: Chris Riccomini <[email protected]>
Committed: Fri Feb 13 13:12:14 2015 -0800

----------------------------------------------------------------------
 .../apache/samza/container/SamzaContainer.scala | 10 ++++--
 .../SamzaContainerExceptionHandler.scala        | 34 ++++++++++++++++++
 .../apache/samza/container/TaskInstance.scala   |  4 ++-
 .../samza/container/TestSamzaContainer.scala    | 26 +++++++++++++-
 .../TestSamzaContainerExceptionHandler.scala    | 36 ++++++++++++++++++++
 5 files changed, 106 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/c7ac2637/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 8a6d865..ac6e24f 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -59,13 +59,19 @@ import java.net.URL
 import org.apache.samza.job.model.{TaskModel, ContainerModel, JobModel}
 import org.apache.samza.serializers.model.SamzaObjectMapper
 import org.apache.samza.config.JobConfig.Config2Job
+import java.lang.Thread.UncaughtExceptionHandler
 
 object SamzaContainer extends Logging {
   def main(args: Array[String]) {
-    safeMain(() => new JmxServer)
+    safeMain(() => new JmxServer, new SamzaContainerExceptionHandler(() => 
System.exit(1)))
   }
 
-  def safeMain(newJmxServer: () => JmxServer) {
+  def safeMain(
+    newJmxServer: () => JmxServer,
+    exceptionHandler: UncaughtExceptionHandler = null) {
+    if (exceptionHandler != null) {
+      Thread.setDefaultUncaughtExceptionHandler(exceptionHandler)
+    }
     putMDC("containerName", "samza-container-" + 
System.getenv(ShellCommandConfig.ENV_CONTAINER_ID))
     // Break out the main method to make the JmxServer injectable so we can
     // validate that we don't leak JMX non-daemon threads if we have an

http://git-wip-us.apache.org/repos/asf/samza/blob/c7ac2637/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerExceptionHandler.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerExceptionHandler.scala
 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerExceptionHandler.scala
new file mode 100644
index 0000000..bbb094c
--- /dev/null
+++ 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerExceptionHandler.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container
+
+import java.lang.Thread.UncaughtExceptionHandler
+import org.apache.samza.util.Logging
+
+/**
+ * An UncaughtExceptionHandler that simply shuts down when any thread throws
+ * an uncaught exception.
+ */
+class SamzaContainerExceptionHandler(exit: () => Unit) extends 
UncaughtExceptionHandler with Logging {
+  def uncaughtException(t: Thread, e: Throwable) {
+    error("Uncaught exception in thread (name=%s). Exiting process 
now.".format(t.getName), e)
+    exit()
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/c7ac2637/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index 327299b..a583ff9 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -151,7 +151,9 @@ class TaskInstance(
 
     metrics.commits.inc
 
-    storageManager.flush
+    if (storageManager != null) {
+      storageManager.flush
+    }
 
     trace("Flushing producers for taskName: %s" format taskName)
 

http://git-wip-us.apache.org/repos/asf/samza/blob/c7ac2637/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index acded7d..19ceeaa 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -20,7 +20,6 @@
 package org.apache.samza.container
 
 import scala.collection.JavaConversions._
-
 import org.apache.samza.Partition
 import org.apache.samza.config.Config
 import org.apache.samza.config.MapConfig
@@ -52,6 +51,7 @@ import 
org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin
 import org.junit.Assert._
 import org.junit.Test
 import org.scalatest.junit.AssertionsForJUnit
+import java.lang.Thread.UncaughtExceptionHandler
 
 class TestSamzaContainer extends AssertionsForJUnit {
   @Test
@@ -144,4 +144,28 @@ class TestSamzaContainer extends AssertionsForJUnit {
     }
     assertTrue(task.wasShutdown)
   }
+
+  @Test
+  def testUncaughtExceptionHandler {
+    var caughtException = false
+    val exceptionHandler = new UncaughtExceptionHandler {
+      def uncaughtException(t: Thread, e: Throwable) {
+        caughtException = true
+      }
+    }
+    try {
+      SamzaContainer.safeMain(() => null, exceptionHandler)
+    } catch {
+      case _: Exception =>
+      // Expect some random exception from SamzaContainer because we haven't 
+      // set any environment variables for container ID, etc.
+    }
+    assertFalse(caughtException)
+    val t = new Thread(new Runnable {
+      def run = throw new RuntimeException("Uncaught exception in another 
thread. Catch this.")
+    })
+    t.start
+    t.join
+    assertTrue(caughtException)
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/c7ac2637/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainerExceptionHandler.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainerExceptionHandler.scala
 
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainerExceptionHandler.scala
new file mode 100644
index 0000000..b1d100c
--- /dev/null
+++ 
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainerExceptionHandler.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container
+
+import org.junit.Test
+import org.junit.Assert._
+import org.junit.Before
+import org.apache.samza.SamzaException
+import org.junit.After
+
+class TestSamzaContainerExceptionHandler {
+  @Test
+  def testShutdownProcess {
+    var exitCalled = false
+    val exceptionHandler = new SamzaContainerExceptionHandler(() => exitCalled 
= true)
+    exceptionHandler.uncaughtException(Thread.currentThread, new 
SamzaException)
+    assertTrue(exitCalled)
+  }
+}

Reply via email to