Repository: samza Updated Branches: refs/heads/master 8ce1bd556 -> e310c5cd2
SAMZA-1769: Remote app runner status Call for 'status' or 'kill' does not require Execution plan calculation. Author: Boris S <[email protected]> Author: Boris Shkolnik <[email protected]> Reviewers: Xinyu Liu <[email protected]> Closes #597 from sborya/RemoteAppRunnerStatus and squashes the following commits: 7e1feea0 [Boris S] retry 1c0b3e4f [Boris S] checkstyle e8d8d517 [Boris S] skipp graph planner for app status command 88f85595 [Boris S] Merge branch 'master' of https://github.com/apache/samza 0edf343b [Boris S] Merge branch 'master' of https://github.com/apache/samza 67e611ee [Boris S] Merge branch 'master' of https://github.com/apache/samza dd39d089 [Boris S] Merge branch 'master' of https://github.com/apache/samza 1ad58d43 [Boris S] Merge branch 'master' of https://github.com/apache/samza 06b1ac36 [Boris Shkolnik] Merge branch 'master' of https://github.com/sborya/samza 5e6f5fb5 [Boris Shkolnik] Merge branch 'master' of https://github.com/apache/samza 010fa168 [Boris S] Merge branch 'master' of https://github.com/apache/samza bbffb79b [Boris S] Merge branch 'master' of https://github.com/apache/samza d4620d66 [Boris S] Merge branch 'master' of https://github.com/apache/samza 410ce78b [Boris S] Merge branch 'master' of https://github.com/apache/samza a31a7aa2 [Boris Shkolnik] reduce debugging from info to debug in KafkaCheckpointManager.java Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e310c5cd Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e310c5cd Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e310c5cd Branch: refs/heads/master Commit: e310c5cd2c561e3f83ec97935a5343bd2866e681 Parents: 8ce1bd5 Author: Boris Shkolnik <[email protected]> Authored: Fri Aug 3 11:21:43 2018 -0700 Committer: Boris S <[email protected]> Committed: Fri Aug 3 11:21:43 2018 -0700 ---------------------------------------------------------------------- .../samza/runtime/RemoteApplicationRunner.java | 69 ++++--------------- .../runtime/TestRemoteApplicationRunner.java | 70 ++++++++++++++++++++ 2 files changed, 82 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/e310c5cd/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java index 99fdc51..0ecb35e 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java @@ -96,74 +96,29 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner { @Override public void kill(StreamApplication app) { - StreamManager streamManager = null; - try { - streamManager = buildAndStartStreamManager(); - ExecutionPlan plan = getExecutionPlan(app, streamManager); - plan.getJobConfigs().forEach(jobConfig -> { - LOG.info("Killing job {}", jobConfig.getName()); - JobRunner runner = new JobRunner(jobConfig); - runner.kill(); - }); + // since currently we only support single actual remote job, we can get its status without + // building the execution plan. + try { + JobConfig jc = new JobConfig(config); + LOG.info("Killing job {}", jc.getName()); + JobRunner runner = new JobRunner(jc); + runner.kill(); } catch (Throwable t) { throw new SamzaException("Failed to kill application", t); - } finally { - if (streamManager != null) { - streamManager.stop(); - } } } @Override public ApplicationStatus status(StreamApplication app) { - StreamManager streamManager = null; - try { - boolean hasNewJobs = false; - boolean hasRunningJobs = false; - ApplicationStatus unsuccessfulFinishStatus = null; - - streamManager = buildAndStartStreamManager(); - ExecutionPlan plan = getExecutionPlan(app, streamManager); - for (JobConfig jobConfig : plan.getJobConfigs()) { - ApplicationStatus status = getApplicationStatus(jobConfig); - - switch (status.getStatusCode()) { - case New: - hasNewJobs = true; - break; - case Running: - hasRunningJobs = true; - break; - case UnsuccessfulFinish: - unsuccessfulFinishStatus = status; - break; - case SuccessfulFinish: - break; - default: - // Do nothing - } - } - if (hasNewJobs) { - // There are jobs not started, report as New - return New; - } else if (hasRunningJobs) { - // All jobs are started, some are running - return Running; - } else if (unsuccessfulFinishStatus != null) { - // All jobs are finished, some are not successful - return unsuccessfulFinishStatus; - } else { - // All jobs are finished successfully - return SuccessfulFinish; - } + // since currently we only support single actual remote job, we can get its status without + // building the execution plan + try { + JobConfig jc = new JobConfig(config); + return getApplicationStatus(jc); } catch (Throwable t) { throw new SamzaException("Failed to get status for application", t); - } finally { - if (streamManager != null) { - streamManager.stop(); - } } } http://git-wip-us.apache.org/repos/asf/samza/blob/e310c5cd/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java index 2ef2b33..2734d56 100644 --- a/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java +++ b/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java @@ -20,9 +20,15 @@ package org.apache.samza.runtime; import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.job.ApplicationStatus; +import org.apache.samza.job.StreamJob; +import org.apache.samza.job.StreamJobFactory; +import org.junit.Assert; import org.junit.Test; import static org.junit.Assert.*; @@ -50,4 +56,68 @@ public class TestRemoteApplicationRunner { boolean finished = runner.waitForFinish(Duration.ofMillis(1000)); assertFalse("Application finished before the timeout.", finished); } + + @Test + public void testGetStatus() { + Map m = new HashMap<String, String>(); + m.put(JobConfig.JOB_NAME(), "jobName"); + m.put(JobConfig.STREAM_JOB_FACTORY_CLASS(), MockStreamJobFactory.class.getName()); + + m.put(JobConfig.JOB_ID(), "newJob"); + RemoteApplicationRunner runner = new RemoteApplicationRunner(new MapConfig()); + Assert.assertEquals(ApplicationStatus.New, runner.getApplicationStatus(new JobConfig(new MapConfig(m)))); + + m.put(JobConfig.JOB_ID(), "runningJob"); + runner = new RemoteApplicationRunner(new JobConfig(new MapConfig(m))); + Assert.assertEquals(ApplicationStatus.Running, runner.getApplicationStatus(new JobConfig(new MapConfig(m)))); + } + + static public class MockStreamJobFactory implements StreamJobFactory { + + public MockStreamJobFactory() { + } + + @Override + public StreamJob getJob(final Config config) { + + StreamJob streamJob = new StreamJob() { + JobConfig c = (JobConfig) config; + + @Override + public StreamJob submit() { + return null; + } + + @Override + public StreamJob kill() { + return null; + } + + @Override + public ApplicationStatus waitForFinish(long timeoutMs) { + return null; + } + + @Override + public ApplicationStatus waitForStatus(ApplicationStatus status, long timeoutMs) { + return null; + } + + @Override + public ApplicationStatus getStatus() { + String jobId = c.getJobId().get(); + switch (jobId) { + case "newJob": + return ApplicationStatus.New; + case "runningJob": + return ApplicationStatus.Running; + default: + return ApplicationStatus.UnsuccessfulFinish; + } + } + }; + + return streamJob; + } + } }
