Repository: samza
Updated Branches:
  refs/heads/master 8ce1bd556 -> e310c5cd2


SAMZA-1769: Remote app runner status

Call for 'status' or 'kill' does not require Execution plan calculation.

Author: Boris S <[email protected]>
Author: Boris Shkolnik <[email protected]>

Reviewers: Xinyu Liu <[email protected]>

Closes #597 from sborya/RemoteAppRunnerStatus and squashes the following 
commits:

7e1feea0 [Boris S] retry
1c0b3e4f [Boris S] checkstyle
e8d8d517 [Boris S] skipp graph planner for app status command
88f85595 [Boris S] Merge branch 'master' of https://github.com/apache/samza
0edf343b [Boris S] Merge branch 'master' of https://github.com/apache/samza
67e611ee [Boris S] Merge branch 'master' of https://github.com/apache/samza
dd39d089 [Boris S] Merge branch 'master' of https://github.com/apache/samza
1ad58d43 [Boris S] Merge branch 'master' of https://github.com/apache/samza
06b1ac36 [Boris Shkolnik] Merge branch 'master' of 
https://github.com/sborya/samza
5e6f5fb5 [Boris Shkolnik] Merge branch 'master' of 
https://github.com/apache/samza
010fa168 [Boris S] Merge branch 'master' of https://github.com/apache/samza
bbffb79b [Boris S] Merge branch 'master' of https://github.com/apache/samza
d4620d66 [Boris S] Merge branch 'master' of https://github.com/apache/samza
410ce78b [Boris S] Merge branch 'master' of https://github.com/apache/samza
a31a7aa2 [Boris Shkolnik] reduce debugging from info to debug in 
KafkaCheckpointManager.java


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e310c5cd
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e310c5cd
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e310c5cd

Branch: refs/heads/master
Commit: e310c5cd2c561e3f83ec97935a5343bd2866e681
Parents: 8ce1bd5
Author: Boris Shkolnik <[email protected]>
Authored: Fri Aug 3 11:21:43 2018 -0700
Committer: Boris S <[email protected]>
Committed: Fri Aug 3 11:21:43 2018 -0700

----------------------------------------------------------------------
 .../samza/runtime/RemoteApplicationRunner.java  | 69 ++++---------------
 .../runtime/TestRemoteApplicationRunner.java    | 70 ++++++++++++++++++++
 2 files changed, 82 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/e310c5cd/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
 
b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
index 99fdc51..0ecb35e 100644
--- 
a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
+++ 
b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
@@ -96,74 +96,29 @@ public class RemoteApplicationRunner extends 
AbstractApplicationRunner {
 
   @Override
   public void kill(StreamApplication app) {
-    StreamManager streamManager = null;
-    try {
-      streamManager = buildAndStartStreamManager();
-      ExecutionPlan plan = getExecutionPlan(app, streamManager);
 
-      plan.getJobConfigs().forEach(jobConfig -> {
-          LOG.info("Killing job {}", jobConfig.getName());
-          JobRunner runner = new JobRunner(jobConfig);
-          runner.kill();
-        });
+    // since currently we only support single actual remote job, we can get 
its status without
+    // building the execution plan.
+    try {
+      JobConfig jc = new JobConfig(config);
+      LOG.info("Killing job {}", jc.getName());
+      JobRunner runner = new JobRunner(jc);
+      runner.kill();
     } catch (Throwable t) {
       throw new SamzaException("Failed to kill application", t);
-    } finally {
-      if (streamManager != null) {
-        streamManager.stop();
-      }
     }
   }
 
   @Override
   public ApplicationStatus status(StreamApplication app) {
-    StreamManager streamManager = null;
-    try {
-      boolean hasNewJobs = false;
-      boolean hasRunningJobs = false;
-      ApplicationStatus unsuccessfulFinishStatus = null;
-
-      streamManager = buildAndStartStreamManager();
-      ExecutionPlan plan = getExecutionPlan(app, streamManager);
-      for (JobConfig jobConfig : plan.getJobConfigs()) {
-        ApplicationStatus status = getApplicationStatus(jobConfig);
-
-        switch (status.getStatusCode()) {
-          case New:
-            hasNewJobs = true;
-            break;
-          case Running:
-            hasRunningJobs = true;
-            break;
-          case UnsuccessfulFinish:
-            unsuccessfulFinishStatus = status;
-            break;
-          case SuccessfulFinish:
-            break;
-          default:
-            // Do nothing
-        }
-      }
 
-      if (hasNewJobs) {
-        // There are jobs not started, report as New
-        return New;
-      } else if (hasRunningJobs) {
-        // All jobs are started, some are running
-        return Running;
-      } else if (unsuccessfulFinishStatus != null) {
-        // All jobs are finished, some are not successful
-        return unsuccessfulFinishStatus;
-      } else {
-        // All jobs are finished successfully
-        return SuccessfulFinish;
-      }
+    // since currently we only support single actual remote job, we can get 
its status without
+    // building the execution plan
+    try {
+      JobConfig jc = new JobConfig(config);
+      return getApplicationStatus(jc);
     } catch (Throwable t) {
       throw new SamzaException("Failed to get status for application", t);
-    } finally {
-      if (streamManager != null) {
-        streamManager.stop();
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/e310c5cd/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java
 
b/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java
index 2ef2b33..2734d56 100644
--- 
a/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java
+++ 
b/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java
@@ -20,9 +20,15 @@
 package org.apache.samza.runtime;
 
 import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.job.ApplicationStatus;
+import org.apache.samza.job.StreamJob;
+import org.apache.samza.job.StreamJobFactory;
+import org.junit.Assert;
 import org.junit.Test;
 
 import static org.junit.Assert.*;
@@ -50,4 +56,68 @@ public class TestRemoteApplicationRunner {
     boolean finished = runner.waitForFinish(Duration.ofMillis(1000));
     assertFalse("Application finished before the timeout.", finished);
   }
+
+  @Test
+  public void testGetStatus() {
+    Map m = new HashMap<String, String>();
+    m.put(JobConfig.JOB_NAME(), "jobName");
+    m.put(JobConfig.STREAM_JOB_FACTORY_CLASS(), 
MockStreamJobFactory.class.getName());
+
+    m.put(JobConfig.JOB_ID(), "newJob");
+    RemoteApplicationRunner runner = new RemoteApplicationRunner(new 
MapConfig());
+    Assert.assertEquals(ApplicationStatus.New, runner.getApplicationStatus(new 
JobConfig(new MapConfig(m))));
+
+    m.put(JobConfig.JOB_ID(), "runningJob");
+    runner = new RemoteApplicationRunner(new JobConfig(new MapConfig(m)));
+    Assert.assertEquals(ApplicationStatus.Running, 
runner.getApplicationStatus(new JobConfig(new MapConfig(m))));
+  }
+
+  static public class MockStreamJobFactory implements StreamJobFactory {
+
+    public MockStreamJobFactory() {
+    }
+
+    @Override
+    public StreamJob getJob(final Config config) {
+
+      StreamJob streamJob = new StreamJob() {
+        JobConfig c = (JobConfig) config;
+
+        @Override
+        public StreamJob submit() {
+          return null;
+        }
+
+        @Override
+        public StreamJob kill() {
+          return null;
+        }
+
+        @Override
+        public ApplicationStatus waitForFinish(long timeoutMs) {
+          return null;
+        }
+
+        @Override
+        public ApplicationStatus waitForStatus(ApplicationStatus status, long 
timeoutMs) {
+          return null;
+        }
+
+        @Override
+        public ApplicationStatus getStatus() {
+          String jobId = c.getJobId().get();
+          switch (jobId) {
+            case "newJob":
+              return ApplicationStatus.New;
+            case "runningJob":
+              return ApplicationStatus.Running;
+            default:
+              return ApplicationStatus.UnsuccessfulFinish;
+          }
+        }
+      };
+
+      return streamJob;
+    }
+  }
 }

Reply via email to