Repository: samza Updated Branches: refs/heads/master 7dbc4c95f -> 67b195363
SAMZA-1228 : StreamProcessor should stop JmxServer This is not the solution posted in SAMZA-1228. For now, we are moving jmxserver lifecycle to be within the container. Ideally, it should be within the Streamprocessor so that the job coordinator can also be associated with the same instance. Author: Navina Ramesh <[email protected]> Reviewers: Xinyu Liu <[email protected]>, Prateek Maheshwari <[email protected]> Closes #162 from navina/SAMZA-1228 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/67b19536 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/67b19536 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/67b19536 Branch: refs/heads/master Commit: 67b195363a921de537003faf72615b9349998996 Parents: 7dbc4c9 Author: Navina Ramesh <[email protected]> Authored: Fri May 5 13:22:35 2017 -0700 Committer: nramesh <[email protected]> Committed: Fri May 5 13:22:35 2017 -0700 ---------------------------------------------------------------------- .../apache/samza/processor/StreamProcessor.java | 7 +-- .../samza/runtime/LocalContainerRunner.java | 63 +++++++++----------- .../apache/samza/container/SamzaContainer.scala | 9 ++- .../samza/job/local/ThreadJobFactory.scala | 1 - .../samza/processor/TestStreamProcessor.java | 4 +- .../samza/container/TestSamzaContainer.scala | 18 ++---- .../processor/StreamProcessorTestUtils.scala | 3 +- 7 files changed, 43 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/67b19536/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java index 2ee76dc..5c6a474 100644 --- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java +++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java @@ -33,7 +33,6 @@ import org.apache.samza.coordinator.JobCoordinatorFactory; import org.apache.samza.coordinator.JobCoordinatorListener; import org.apache.samza.job.model.ContainerModel; import org.apache.samza.job.model.JobModel; -import org.apache.samza.metrics.JmxServer; import org.apache.samza.metrics.MetricsReporter; import org.apache.samza.task.AsyncStreamTaskFactory; import org.apache.samza.task.StreamTaskFactory; @@ -199,12 +198,11 @@ public class StreamProcessor { } - SamzaContainer createSamzaContainer(ContainerModel containerModel, int maxChangelogStreamPartitions, JmxServer jmxServer) { + SamzaContainer createSamzaContainer(ContainerModel containerModel, int maxChangelogStreamPartitions) { return SamzaContainer.apply( containerModel, config, maxChangelogStreamPartitions, - jmxServer, Util.<String, MetricsReporter>javaMapAsScalaMap(customMetricsReporter), taskFactory); } @@ -297,8 +295,7 @@ public class StreamProcessor { container = createSamzaContainer( jobModel.getContainers().get(processorId), - jobModel.maxChangeLogStreamPartitions, - new JmxServer()); + jobModel.maxChangeLogStreamPartitions); container.setContainerListener(containerListener); LOGGER.info("Starting container " + container.toString()); executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() http://git-wip-us.apache.org/repos/asf/samza/blob/67b19536/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java index 920cc3d..e02ee23 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java @@ -28,12 +28,11 @@ import org.apache.samza.config.ShellCommandConfig; import org.apache.samza.container.SamzaContainer; import org.apache.samza.container.SamzaContainer$; import org.apache.samza.container.SamzaContainerExceptionHandler; +import org.apache.samza.container.SamzaContainerListener; import org.apache.samza.job.ApplicationStatus; import org.apache.samza.job.model.ContainerModel; import org.apache.samza.job.model.JobModel; -import org.apache.samza.metrics.JmxServer; import org.apache.samza.metrics.MetricsReporter; -import org.apache.samza.container.SamzaContainerListener; import org.apache.samza.task.TaskFactoryUtil; import org.apache.samza.util.ScalaToJavaUtils; import org.apache.samza.util.Util; @@ -66,44 +65,36 @@ public class LocalContainerRunner extends AbstractApplicationRunner { @Override public void run(StreamApplication streamApp) { - JmxServer jmxServer = null; - try { - jmxServer = new JmxServer(); - ContainerModel containerModel = jobModel.getContainers().get(containerId); - Object taskFactory = TaskFactoryUtil.createTaskFactory(config, streamApp, this); + ContainerModel containerModel = jobModel.getContainers().get(containerId); + Object taskFactory = TaskFactoryUtil.createTaskFactory(config, streamApp, this); - SamzaContainer container = SamzaContainer$.MODULE$.apply( - containerModel, - config, - jobModel.maxChangeLogStreamPartitions, - jmxServer, - Util.<String, MetricsReporter>javaMapAsScalaMap(new HashMap<>()), - taskFactory); - container.setContainerListener( - new SamzaContainerListener() { - @Override - public void onContainerStart() { - log.info("Container Started"); - } + SamzaContainer container = SamzaContainer$.MODULE$.apply( + containerModel, + config, + jobModel.maxChangeLogStreamPartitions, + Util.<String, MetricsReporter>javaMapAsScalaMap(new HashMap<>()), + taskFactory); + container.setContainerListener( + new SamzaContainerListener() { + @Override + public void onContainerStart() { + log.info("Container Started"); + } - @Override - public void onContainerStop(boolean invokedExternally) { - log.info("Container Stopped"); - } + @Override + public void onContainerStop(boolean invokedExternally) { + log.info("Container Stopped"); + } - @Override - public void onContainerFailed(Throwable t) { - log.info("Container Failed"); - containerException = t; - } - }); + @Override + public void onContainerFailed(Throwable t) { + log.info("Container Failed"); + containerException = t; + } + }); + + container.run(); - container.run(); - } finally { - if (jmxServer != null) { - jmxServer.stop(); - } - } if (containerException != null) { log.error("Container stopped with Exception. Exiting process now.", containerException); System.exit(1); http://git-wip-us.apache.org/repos/asf/samza/blob/67b19536/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index c7b2b7c..957cd2b 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -110,7 +110,6 @@ object SamzaContainer extends Logging { containerModel: ContainerModel, config: Config, maxChangeLogStreamPartitions: Int, - jmxServer: JmxServer, customReporters: Map[String, MetricsReporter] = Map[String, MetricsReporter](), taskFactory: Object) = { val containerId = containerModel.getProcessorId() @@ -601,7 +600,6 @@ object SamzaContainer extends Logging { metrics = samzaContainerMetrics, reporters = reporters, jvm = jvm, - jmxServer = jmxServer, diskSpaceMonitor = diskSpaceMonitor, hostStatisticsMonitor = memoryStatisticsMonitor, taskThreadPool = taskThreadPool) @@ -615,7 +613,6 @@ class SamzaContainer( consumerMultiplexer: SystemConsumers, producerMultiplexer: SystemProducers, metrics: SamzaContainerMetrics, - jmxServer: JmxServer, diskSpaceMonitor: DiskSpaceMonitor = null, hostStatisticsMonitor: SystemStatisticsMonitor = null, offsetManager: OffsetManager = new OffsetManager, @@ -627,6 +624,7 @@ class SamzaContainer( val shutdownMs = containerContext.config.getShutdownMs.getOrElse(5000L) var shutdownHookThread: Thread = null + var jmxServer: JmxServer = null @volatile private var status = SamzaContainerStatus.NOT_STARTED private var exceptionSeen: Throwable = null @@ -645,6 +643,8 @@ class SamzaContainer( status = SamzaContainerStatus.STARTING + jmxServer = new JmxServer() + startMetrics startOffsetManager startLocalityManager @@ -673,10 +673,13 @@ class SamzaContainer( status = SamzaContainerStatus.FAILED exceptionSeen = e } + try { info("Shutting down.") removeShutdownHook + jmxServer.stop + shutdownConsumers shutdownTask shutdownStores http://git-wip-us.apache.org/repos/asf/samza/blob/67b19536/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala index cb36863..b8522b9 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala @@ -70,7 +70,6 @@ class ThreadJobFactory extends StreamJobFactory with Logging { containerModel, config, jobModel.maxChangeLogStreamPartitions, - jmxServer, Map[String, MetricsReporter](), taskFactory) container.setContainerListener(containerListener) http://git-wip-us.apache.org/repos/asf/samza/blob/67b19536/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java index 4a654dc..7aadd28 100644 --- a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java +++ b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java @@ -26,7 +26,6 @@ import org.apache.samza.container.SamzaContainer; import org.apache.samza.coordinator.JobCoordinator; import org.apache.samza.job.model.ContainerModel; import org.apache.samza.job.model.JobModel; -import org.apache.samza.metrics.JmxServer; import org.apache.samza.metrics.MetricsReporter; import org.apache.samza.task.StreamTask; import org.apache.samza.task.StreamTaskFactory; @@ -62,8 +61,7 @@ public class TestStreamProcessor { @Override SamzaContainer createSamzaContainer( ContainerModel containerModel, - int maxChangelogStreamPartitions, - JmxServer jmxServer) { + int maxChangelogStreamPartitions) { RunLoop mockRunLoop = mock(RunLoop.class); doAnswer(invocation -> { http://git-wip-us.apache.org/repos/asf/samza/blob/67b19536/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala index bc4c47c..e03498c 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala @@ -191,8 +191,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { runLoop = runLoop, consumerMultiplexer = consumerMultiplexer, producerMultiplexer = producerMultiplexer, - metrics = new SamzaContainerMetrics, - jmxServer = null) + metrics = new SamzaContainerMetrics) val containerListener = new SamzaContainerListener { override def onContainerFailed(t: Throwable): Unit = { @@ -272,8 +271,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { runLoop = mockRunLoop, consumerMultiplexer = consumerMultiplexer, producerMultiplexer = producerMultiplexer, - metrics = new SamzaContainerMetrics, - jmxServer = null) + metrics = new SamzaContainerMetrics) val containerListener = new SamzaContainerListener { override def onContainerFailed(t: Throwable): Unit = { onContainerFailedCalled = true @@ -354,8 +352,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { runLoop = runLoop, consumerMultiplexer = consumerMultiplexer, producerMultiplexer = producerMultiplexer, - metrics = new SamzaContainerMetrics, - jmxServer = null) + metrics = new SamzaContainerMetrics) val containerListener = new SamzaContainerListener { override def onContainerFailed(t: Throwable): Unit = { onContainerFailedCalled = true @@ -437,8 +434,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { runLoop = mockRunLoop, consumerMultiplexer = consumerMultiplexer, producerMultiplexer = producerMultiplexer, - metrics = new SamzaContainerMetrics, - jmxServer = null) + metrics = new SamzaContainerMetrics) val containerListener = new SamzaContainerListener { override def onContainerFailed(t: Throwable): Unit = { onContainerFailedCalled = true @@ -514,8 +510,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { runLoop = mockRunLoop, consumerMultiplexer = consumerMultiplexer, producerMultiplexer = producerMultiplexer, - metrics = new SamzaContainerMetrics, - jmxServer = null) + metrics = new SamzaContainerMetrics) val containerListener = new SamzaContainerListener { override def onContainerFailed(t: Throwable): Unit = { @@ -582,8 +577,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { runLoop = null, consumerMultiplexer = consumerMultiplexer, producerMultiplexer = producerMultiplexer, - metrics = containerMetrics, - jmxServer = null) + metrics = containerMetrics) container.startStores assertNotNull(containerMetrics.taskStoreRestorationMetrics) http://git-wip-us.apache.org/repos/asf/samza/blob/67b19536/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala index f5a8ee5..f437bfc 100644 --- a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala +++ b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala @@ -57,8 +57,7 @@ object StreamProcessorTestUtils { runLoop = mockRunloop, consumerMultiplexer = consumerMultiplexer, producerMultiplexer = producerMultiplexer, - metrics = new SamzaContainerMetrics, - jmxServer = null) + metrics = new SamzaContainerMetrics) if (containerListener != null) { container.setContainerListener(containerListener) }
