Repository: samza
Updated Branches:
  refs/heads/master 7dbc4c95f -> 67b195363


SAMZA-1228 : StreamProcessor should stop JmxServer

This is not the solution posted in SAMZA-1228. For now, we are moving jmxserver 
lifecycle to be within the container. Ideally, it should be within the 
Streamprocessor so that the job coordinator can also be associated with the 
same instance.

Author: Navina Ramesh <[email protected]>

Reviewers: Xinyu Liu <[email protected]>, Prateek Maheshwari 
<[email protected]>

Closes #162 from navina/SAMZA-1228


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/67b19536
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/67b19536
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/67b19536

Branch: refs/heads/master
Commit: 67b195363a921de537003faf72615b9349998996
Parents: 7dbc4c9
Author: Navina Ramesh <[email protected]>
Authored: Fri May 5 13:22:35 2017 -0700
Committer: nramesh <[email protected]>
Committed: Fri May 5 13:22:35 2017 -0700

----------------------------------------------------------------------
 .../apache/samza/processor/StreamProcessor.java |  7 +--
 .../samza/runtime/LocalContainerRunner.java     | 63 +++++++++-----------
 .../apache/samza/container/SamzaContainer.scala |  9 ++-
 .../samza/job/local/ThreadJobFactory.scala      |  1 -
 .../samza/processor/TestStreamProcessor.java    |  4 +-
 .../samza/container/TestSamzaContainer.scala    | 18 ++----
 .../processor/StreamProcessorTestUtils.scala    |  3 +-
 7 files changed, 43 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/67b19536/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java 
b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 2ee76dc..5c6a474 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -33,7 +33,6 @@ import org.apache.samza.coordinator.JobCoordinatorFactory;
 import org.apache.samza.coordinator.JobCoordinatorListener;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
-import org.apache.samza.metrics.JmxServer;
 import org.apache.samza.metrics.MetricsReporter;
 import org.apache.samza.task.AsyncStreamTaskFactory;
 import org.apache.samza.task.StreamTaskFactory;
@@ -199,12 +198,11 @@ public class StreamProcessor {
 
   }
 
-  SamzaContainer createSamzaContainer(ContainerModel containerModel, int 
maxChangelogStreamPartitions, JmxServer jmxServer) {
+  SamzaContainer createSamzaContainer(ContainerModel containerModel, int 
maxChangelogStreamPartitions) {
     return SamzaContainer.apply(
         containerModel,
         config,
         maxChangelogStreamPartitions,
-        jmxServer,
         Util.<String, MetricsReporter>javaMapAsScalaMap(customMetricsReporter),
         taskFactory);
   }
@@ -297,8 +295,7 @@ public class StreamProcessor {
 
           container = createSamzaContainer(
               jobModel.getContainers().get(processorId),
-              jobModel.maxChangeLogStreamPartitions,
-              new JmxServer());
+              jobModel.maxChangeLogStreamPartitions);
           container.setContainerListener(containerListener);
           LOGGER.info("Starting container " + container.toString());
           executorService = Executors.newSingleThreadExecutor(new 
ThreadFactoryBuilder()

http://git-wip-us.apache.org/repos/asf/samza/blob/67b19536/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java 
b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
index 920cc3d..e02ee23 100644
--- 
a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
+++ 
b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
@@ -28,12 +28,11 @@ import org.apache.samza.config.ShellCommandConfig;
 import org.apache.samza.container.SamzaContainer;
 import org.apache.samza.container.SamzaContainer$;
 import org.apache.samza.container.SamzaContainerExceptionHandler;
+import org.apache.samza.container.SamzaContainerListener;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
-import org.apache.samza.metrics.JmxServer;
 import org.apache.samza.metrics.MetricsReporter;
-import org.apache.samza.container.SamzaContainerListener;
 import org.apache.samza.task.TaskFactoryUtil;
 import org.apache.samza.util.ScalaToJavaUtils;
 import org.apache.samza.util.Util;
@@ -66,44 +65,36 @@ public class LocalContainerRunner extends 
AbstractApplicationRunner {
 
   @Override
   public void run(StreamApplication streamApp) {
-    JmxServer jmxServer = null;
-    try {
-      jmxServer = new JmxServer();
-      ContainerModel containerModel = 
jobModel.getContainers().get(containerId);
-      Object taskFactory = TaskFactoryUtil.createTaskFactory(config, 
streamApp, this);
+    ContainerModel containerModel = jobModel.getContainers().get(containerId);
+    Object taskFactory = TaskFactoryUtil.createTaskFactory(config, streamApp, 
this);
 
-      SamzaContainer container = SamzaContainer$.MODULE$.apply(
-          containerModel,
-          config,
-          jobModel.maxChangeLogStreamPartitions,
-          jmxServer,
-          Util.<String, MetricsReporter>javaMapAsScalaMap(new HashMap<>()),
-          taskFactory);
-      container.setContainerListener(
-          new SamzaContainerListener() {
-            @Override
-            public void onContainerStart() {
-              log.info("Container Started");
-            }
+    SamzaContainer container = SamzaContainer$.MODULE$.apply(
+        containerModel,
+        config,
+        jobModel.maxChangeLogStreamPartitions,
+        Util.<String, MetricsReporter>javaMapAsScalaMap(new HashMap<>()),
+        taskFactory);
+    container.setContainerListener(
+        new SamzaContainerListener() {
+          @Override
+          public void onContainerStart() {
+            log.info("Container Started");
+          }
 
-            @Override
-            public void onContainerStop(boolean invokedExternally) {
-              log.info("Container Stopped");
-            }
+          @Override
+          public void onContainerStop(boolean invokedExternally) {
+            log.info("Container Stopped");
+          }
 
-            @Override
-            public void onContainerFailed(Throwable t) {
-              log.info("Container Failed");
-              containerException = t;
-            }
-          });
+          @Override
+          public void onContainerFailed(Throwable t) {
+            log.info("Container Failed");
+            containerException = t;
+          }
+        });
+
+    container.run();
 
-      container.run();
-    } finally {
-      if (jmxServer != null) {
-        jmxServer.stop();
-      }
-    }
     if (containerException != null) {
       log.error("Container stopped with Exception. Exiting process now.", 
containerException);
       System.exit(1);

http://git-wip-us.apache.org/repos/asf/samza/blob/67b19536/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index c7b2b7c..957cd2b 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -110,7 +110,6 @@ object SamzaContainer extends Logging {
     containerModel: ContainerModel,
     config: Config,
     maxChangeLogStreamPartitions: Int,
-    jmxServer: JmxServer,
     customReporters: Map[String, MetricsReporter] = Map[String, 
MetricsReporter](),
     taskFactory: Object) = {
     val containerId = containerModel.getProcessorId()
@@ -601,7 +600,6 @@ object SamzaContainer extends Logging {
       metrics = samzaContainerMetrics,
       reporters = reporters,
       jvm = jvm,
-      jmxServer = jmxServer,
       diskSpaceMonitor = diskSpaceMonitor,
       hostStatisticsMonitor = memoryStatisticsMonitor,
       taskThreadPool = taskThreadPool)
@@ -615,7 +613,6 @@ class SamzaContainer(
   consumerMultiplexer: SystemConsumers,
   producerMultiplexer: SystemProducers,
   metrics: SamzaContainerMetrics,
-  jmxServer: JmxServer,
   diskSpaceMonitor: DiskSpaceMonitor = null,
   hostStatisticsMonitor: SystemStatisticsMonitor = null,
   offsetManager: OffsetManager = new OffsetManager,
@@ -627,6 +624,7 @@ class SamzaContainer(
 
   val shutdownMs = containerContext.config.getShutdownMs.getOrElse(5000L)
   var shutdownHookThread: Thread = null
+  var jmxServer: JmxServer = null
 
   @volatile private var status = SamzaContainerStatus.NOT_STARTED
   private var exceptionSeen: Throwable = null
@@ -645,6 +643,8 @@ class SamzaContainer(
 
       status = SamzaContainerStatus.STARTING
 
+      jmxServer = new JmxServer()
+
       startMetrics
       startOffsetManager
       startLocalityManager
@@ -673,10 +673,13 @@ class SamzaContainer(
         status = SamzaContainerStatus.FAILED
         exceptionSeen = e
     }
+
     try {
       info("Shutting down.")
       removeShutdownHook
 
+      jmxServer.stop
+
       shutdownConsumers
       shutdownTask
       shutdownStores

http://git-wip-us.apache.org/repos/asf/samza/blob/67b19536/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index cb36863..b8522b9 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -70,7 +70,6 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
         containerModel,
         config,
         jobModel.maxChangeLogStreamPartitions,
-        jmxServer,
         Map[String, MetricsReporter](),
         taskFactory)
       container.setContainerListener(containerListener)

http://git-wip-us.apache.org/repos/asf/samza/blob/67b19536/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java 
b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
index 4a654dc..7aadd28 100644
--- 
a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
+++ 
b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
@@ -26,7 +26,6 @@ import org.apache.samza.container.SamzaContainer;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
-import org.apache.samza.metrics.JmxServer;
 import org.apache.samza.metrics.MetricsReporter;
 import org.apache.samza.task.StreamTask;
 import org.apache.samza.task.StreamTaskFactory;
@@ -62,8 +61,7 @@ public class TestStreamProcessor {
     @Override
     SamzaContainer createSamzaContainer(
         ContainerModel containerModel,
-        int maxChangelogStreamPartitions,
-        JmxServer jmxServer) {
+        int maxChangelogStreamPartitions) {
       RunLoop mockRunLoop = mock(RunLoop.class);
       doAnswer(invocation ->
         {

http://git-wip-us.apache.org/repos/asf/samza/blob/67b19536/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index bc4c47c..e03498c 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -191,8 +191,7 @@ class TestSamzaContainer extends AssertionsForJUnit with 
MockitoSugar {
       runLoop = runLoop,
       consumerMultiplexer = consumerMultiplexer,
       producerMultiplexer = producerMultiplexer,
-      metrics = new SamzaContainerMetrics,
-      jmxServer = null)
+      metrics = new SamzaContainerMetrics)
 
     val containerListener = new SamzaContainerListener {
       override def onContainerFailed(t: Throwable): Unit = {
@@ -272,8 +271,7 @@ class TestSamzaContainer extends AssertionsForJUnit with 
MockitoSugar {
       runLoop = mockRunLoop,
       consumerMultiplexer = consumerMultiplexer,
       producerMultiplexer = producerMultiplexer,
-      metrics = new SamzaContainerMetrics,
-      jmxServer = null)
+      metrics = new SamzaContainerMetrics)
     val containerListener = new SamzaContainerListener {
       override def onContainerFailed(t: Throwable): Unit = {
         onContainerFailedCalled = true
@@ -354,8 +352,7 @@ class TestSamzaContainer extends AssertionsForJUnit with 
MockitoSugar {
       runLoop = runLoop,
       consumerMultiplexer = consumerMultiplexer,
       producerMultiplexer = producerMultiplexer,
-      metrics = new SamzaContainerMetrics,
-      jmxServer = null)
+      metrics = new SamzaContainerMetrics)
     val containerListener = new SamzaContainerListener {
       override def onContainerFailed(t: Throwable): Unit = {
         onContainerFailedCalled = true
@@ -437,8 +434,7 @@ class TestSamzaContainer extends AssertionsForJUnit with 
MockitoSugar {
       runLoop = mockRunLoop,
       consumerMultiplexer = consumerMultiplexer,
       producerMultiplexer = producerMultiplexer,
-      metrics = new SamzaContainerMetrics,
-      jmxServer = null)
+      metrics = new SamzaContainerMetrics)
       val containerListener = new SamzaContainerListener {
         override def onContainerFailed(t: Throwable): Unit = {
           onContainerFailedCalled = true
@@ -514,8 +510,7 @@ class TestSamzaContainer extends AssertionsForJUnit with 
MockitoSugar {
       runLoop = mockRunLoop,
       consumerMultiplexer = consumerMultiplexer,
       producerMultiplexer = producerMultiplexer,
-      metrics = new SamzaContainerMetrics,
-      jmxServer = null)
+      metrics = new SamzaContainerMetrics)
 
     val containerListener = new SamzaContainerListener {
         override def onContainerFailed(t: Throwable): Unit = {
@@ -582,8 +577,7 @@ class TestSamzaContainer extends AssertionsForJUnit with 
MockitoSugar {
       runLoop = null,
       consumerMultiplexer = consumerMultiplexer,
       producerMultiplexer = producerMultiplexer,
-      metrics = containerMetrics,
-      jmxServer = null)
+      metrics = containerMetrics)
 
     container.startStores
     assertNotNull(containerMetrics.taskStoreRestorationMetrics)

http://git-wip-us.apache.org/repos/asf/samza/blob/67b19536/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
 
b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
index f5a8ee5..f437bfc 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
@@ -57,8 +57,7 @@ object StreamProcessorTestUtils {
       runLoop = mockRunloop,
       consumerMultiplexer = consumerMultiplexer,
       producerMultiplexer = producerMultiplexer,
-      metrics = new SamzaContainerMetrics,
-      jmxServer = null)
+      metrics = new SamzaContainerMetrics)
     if (containerListener != null) {
       container.setContainerListener(containerListener)
     }

Reply via email to