Repository: incubator-samza Updated Branches: refs/heads/master 0d544aed1 -> 42df2b44f
SAMZA-382; properly shutdown jmx server when an exception occurs in samza container Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/42df2b44 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/42df2b44 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/42df2b44 Branch: refs/heads/master Commit: 42df2b44f1552b213547079ca17b6eb0221577e2 Parents: 0d544ae Author: Chris Riccomini <[email protected]> Authored: Tue Aug 19 14:33:45 2014 -0700 Committer: Chris Riccomini <[email protected]> Committed: Tue Aug 19 14:33:45 2014 -0700 ---------------------------------------------------------------------- .../apache/samza/container/SamzaContainer.scala | 34 ++++++++++++-------- .../samza/container/TestSamzaContainer.scala | 24 ++++++++++++-- 2 files changed, 41 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/42df2b44/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 0ab8a55..04edf50 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -82,21 +82,27 @@ object SamzaContainer extends Logging { } def main(args: Array[String]) { - val jmxServer = new JmxServer - val containerName = System.getenv(ShellCommandConfig.ENV_CONTAINER_NAME) - - /** - * If the compressed option is enabled in config, de-compress the 'ENV_CONFIG' and 'ENV_SYSTEM_STREAMS' - * properties. Note: This is a temporary workaround to reduce the size of the config and hence size - * of the environment variable(s) exported while starting a Samza container (SAMZA-337) - */ - val isCompressed = System.getenv(ShellCommandConfig.ENV_COMPRESS_CONFIG).equals("TRUE") - val configStr = getParameter(System.getenv(ShellCommandConfig.ENV_CONFIG), isCompressed) - val config = JsonConfigSerializer.fromJson(configStr) - val sspTaskNames = getTaskNameToSystemStreamPartition(getParameter(System.getenv(ShellCommandConfig.ENV_SYSTEM_STREAMS), isCompressed)) - val taskNameToChangeLogPartitionMapping = getTaskNameToChangeLogPartitionMapping(getParameter(System.getenv(ShellCommandConfig.ENV_TASK_NAME_TO_CHANGELOG_PARTITION_MAPPING), isCompressed)) + safeMain() + } + def safeMain(jmxServer: JmxServer = new JmxServer) { + // Break out the main method to make the JmxServer injectable so we can + // validate that we don't leak JMX non-daemon threads if we have an + // exception in the main method. try { + val containerName = System.getenv(ShellCommandConfig.ENV_CONTAINER_NAME) + + /** + * If the compressed option is enabled in config, de-compress the 'ENV_CONFIG' and 'ENV_SYSTEM_STREAMS' + * properties. Note: This is a temporary workaround to reduce the size of the config and hence size + * of the environment variable(s) exported while starting a Samza container (SAMZA-337) + */ + val isCompressed = System.getenv(ShellCommandConfig.ENV_COMPRESS_CONFIG).equals("TRUE") + val configStr = getParameter(System.getenv(ShellCommandConfig.ENV_CONFIG), isCompressed) + val config = JsonConfigSerializer.fromJson(configStr) + val sspTaskNames = getTaskNameToSystemStreamPartition(getParameter(System.getenv(ShellCommandConfig.ENV_SYSTEM_STREAMS), isCompressed)) + val taskNameToChangeLogPartitionMapping = getTaskNameToChangeLogPartitionMapping(getParameter(System.getenv(ShellCommandConfig.ENV_TASK_NAME_TO_CHANGELOG_PARTITION_MAPPING), isCompressed)) + SamzaContainer(containerName, sspTaskNames, taskNameToChangeLogPartitionMapping, config).run } finally { jmxServer.stop @@ -104,7 +110,7 @@ object SamzaContainer extends Logging { } def getTaskNameToSystemStreamPartition(SSPTaskNamesJSON: String) = { - // Covert into a standard Java map + // Convert into a standard Java map val sspTaskNamesAsJava: Map[TaskName, Set[SystemStreamPartition]] = ShellCommandBuilder.deserializeSystemStreamPartitionSetFromJSON(SSPTaskNamesJSON) // From that map build the TaskNamesToSystemStreamPartitions http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/42df2b44/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala index 9fc6771..8a04a8a 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala @@ -42,8 +42,27 @@ import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin import org.apache.samza.system.SystemStream import org.apache.samza.system.StreamMetadataCache import org.apache.samza.task.TaskInstanceCollector +import org.scalatest.junit.AssertionsForJUnit +import org.apache.samza.metrics.JmxServer + +class TestSamzaContainer extends AssertionsForJUnit { + @Test + def testJmxServerShutdownOnException { + var stopped = false + val jmxServer = new JmxServer { + override def stop { + super.stop + stopped = true + } + } + intercept[Exception] { + // Calling main will trigger an NPE since the container checks for an + // isCompressed environment variable, which isn't set. + SamzaContainer.safeMain(jmxServer) + } + assertTrue(stopped) + } -class TestSamzaContainer { @Test def testGetInputStreamMetadata { val inputStreams = Set( @@ -98,8 +117,7 @@ class TestSamzaContainer { val runLoop = new RunLoop( taskInstances = Map(taskName -> taskInstance), consumerMultiplexer = consumerMultiplexer, - metrics = new SamzaContainerMetrics - ) + metrics = new SamzaContainerMetrics) val container = new SamzaContainer( Map(taskName -> taskInstance), runLoop,
