Repository: samza Updated Branches: refs/heads/master fa49e7228 -> 40154b4f5
SAMZA-1653: Support waitForFinish in remote application runner and add waitForFinish Added the following APIs to ApplicationRunner `void waitForFinish()` `boolean waitForFinish(Duration timeout)` Implemented the wait for finish methods in remote application runner. Note currently, there is disparity in the APIs in terms of associating runners with stream application. Ideally, we want to decide on the cardinal relation between them and change the APIs accordingly. The goal of the PR is limited to introduce API (waitForFinish) parity between runners in the current setup. xinyuiscool Author: Bharath Kumarasubramanian <[email protected]> Reviewers: Xinyu Liu <[email protected]> Closes #503 from bharathkk/samza-1653 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/40154b4f Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/40154b4f Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/40154b4f Branch: refs/heads/master Commit: 40154b4f589f48eeedf2685d706d29ae89af83f1 Parents: fa49e72 Author: Bharath Kumarasubramanian <[email protected]> Authored: Mon May 7 11:11:01 2018 -0700 Committer: xiliu <[email protected]> Committed: Mon May 7 11:11:01 2018 -0700 ---------------------------------------------------------------------- .../apache/samza/runtime/ApplicationRunner.java | 19 ++++++ .../samza/runtime/LocalApplicationRunner.java | 40 ++++++++++- .../samza/runtime/RemoteApplicationRunner.java | 71 ++++++++++++++++++-- .../runtime/TestLocalApplicationRunner.java | 20 ++++++ .../runtime/TestRemoteApplicationRunner.java | 53 +++++++++++++++ .../sql/runner/SamzaSqlApplicationRunner.java | 2 +- 6 files changed, 195 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/40154b4f/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java b/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java index 440dd33..8339429 100644 --- a/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java +++ b/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java @@ -18,6 +18,7 @@ */ package org.apache.samza.runtime; +import java.time.Duration; import org.apache.samza.annotation.InterfaceStability; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; @@ -107,6 +108,24 @@ public abstract class ApplicationRunner { public abstract ApplicationStatus status(StreamApplication streamApp); /** + * Waits until the application finishes. + */ + public void waitForFinish() { + throw new UnsupportedOperationException(getClass().getName() + " does not support waitForFinish."); + } + + /** + * Waits for {@code timeout} duration for the application to finish. + * + * @param timeout time to wait for the application to finish + * @return true - application finished before timeout + * false - otherwise + */ + public boolean waitForFinish(Duration timeout) { + throw new UnsupportedOperationException(getClass().getName() + " does not support timed waitForFinish."); + } + + /** * Constructs a {@link StreamSpec} from the configuration for the specified streamId. * * The stream configurations are read from the following properties in the config: http://git-wip-us.apache.org/repos/asf/samza/blob/40154b4f/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java index 8f481cd..1284060 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java @@ -19,6 +19,8 @@ package org.apache.samza.runtime; +import com.google.common.annotations.VisibleForTesting; +import java.time.Duration; import java.util.HashMap; import java.util.List; import java.util.Set; @@ -194,15 +196,42 @@ public class LocalApplicationRunner extends AbstractApplicationRunner { } /** - * Block until the application finishes + * Waits until the application finishes. */ + @Override public void waitForFinish() { + waitForFinish(Duration.ofMillis(0)); + } + + /** + * Waits for {@code timeout} duration for the application to finish. + * If timeout < 1, blocks the caller indefinitely. + * + * @param timeout time to wait for the application to finish + * @return true - application finished before timeout + * false - otherwise + */ + @Override + public boolean waitForFinish(Duration timeout) { + long timeoutInMs = timeout.toMillis(); + boolean finished = true; + try { - shutdownLatch.await(); + if (timeoutInMs < 1) { + shutdownLatch.await(); + } else { + finished = shutdownLatch.await(timeoutInMs, TimeUnit.MILLISECONDS); + + if (!finished) { + LOG.warn("Timed out waiting for application to finish."); + } + } } catch (Exception e) { - LOG.error("Wait is interrupted by exception", e); + LOG.error("Error waiting for application to finish", e); throw new SamzaException(e); } + + return finished; } /** @@ -280,4 +309,9 @@ public class LocalApplicationRunner extends AbstractApplicationRunner { Set<StreamProcessor> getProcessors() { return processors; } + + @VisibleForTesting + CountDownLatch getShutdownLatch() { + return shutdownLatch; + } } http://git-wip-us.apache.org/repos/asf/samza/blob/40154b4f/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java index ea218d0..202fa76 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java @@ -19,6 +19,7 @@ package org.apache.samza.runtime; +import java.time.Duration; import org.apache.samza.SamzaException; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.ApplicationConfig; @@ -34,6 +35,8 @@ import org.slf4j.LoggerFactory; import java.util.UUID; +import static org.apache.samza.job.ApplicationStatus.*; + /** * This class implements the {@link ApplicationRunner} that runs the applications in a remote cluster @@ -41,6 +44,7 @@ import java.util.UUID; public class RemoteApplicationRunner extends AbstractApplicationRunner { private static final Logger LOG = LoggerFactory.getLogger(RemoteApplicationRunner.class); + private static final long DEFAULT_SLEEP_DURATION_MS = 2000; public RemoteApplicationRunner(Config config) { super(config); @@ -110,9 +114,7 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner { ExecutionPlan plan = getExecutionPlan(app); for (JobConfig jobConfig : plan.getJobConfigs()) { - JobRunner runner = new JobRunner(jobConfig); - ApplicationStatus status = runner.status(); - LOG.debug("Status is {} for job {}", new Object[]{status, jobConfig.getName()}); + ApplicationStatus status = getApplicationStatus(jobConfig); switch (status.getStatusCode()) { case New: @@ -133,22 +135,79 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner { if (hasNewJobs) { // There are jobs not started, report as New - return ApplicationStatus.New; + return New; } else if (hasRunningJobs) { // All jobs are started, some are running - return ApplicationStatus.Running; + return Running; } else if (unsuccessfulFinishStatus != null) { // All jobs are finished, some are not successful return unsuccessfulFinishStatus; } else { // All jobs are finished successfully - return ApplicationStatus.SuccessfulFinish; + return SuccessfulFinish; } } catch (Throwable t) { throw new SamzaException("Failed to get status for application", t); } } + /* package private */ ApplicationStatus getApplicationStatus(JobConfig jobConfig) { + JobRunner runner = new JobRunner(jobConfig); + ApplicationStatus status = runner.status(); + LOG.debug("Status is {} for job {}", new Object[]{status, jobConfig.getName()}); + return status; + } + + /** + * Waits until the application finishes. + */ + public void waitForFinish() { + waitForFinish(Duration.ofMillis(0)); + } + + /** + * Waits for {@code timeout} duration for the application to finish. + * If timeout < 1, blocks the caller indefinitely. + * + * @param timeout time to wait for the application to finish + * @return true - application finished before timeout + * false - otherwise + */ + public boolean waitForFinish(Duration timeout) { + JobConfig jobConfig = new JobConfig(config); + boolean finished = true; + long timeoutInMs = timeout.toMillis(); + long startTimeInMs = System.currentTimeMillis(); + long timeElapsed = 0L; + + long sleepDurationInMs = timeoutInMs < 1 ? + DEFAULT_SLEEP_DURATION_MS : Math.min(timeoutInMs, DEFAULT_SLEEP_DURATION_MS); + ApplicationStatus status; + + try { + while (timeoutInMs < 1 || timeElapsed <= timeoutInMs) { + status = getApplicationStatus(jobConfig); + if (status == SuccessfulFinish || status == UnsuccessfulFinish) { + LOG.info("Application finished with status {}", status); + break; + } + + Thread.sleep(sleepDurationInMs); + timeElapsed = System.currentTimeMillis() - startTimeInMs; + } + + if (timeElapsed > timeoutInMs) { + LOG.warn("Timed out waiting for application to finish."); + finished = false; + } + } catch (Exception e) { + LOG.error("Error waiting for application to finish", e); + throw new SamzaException(e); + } + + return finished; + } + private Config getConfigFromPrevRun() { CoordinatorStreamSystemConsumer consumer = new CoordinatorStreamSystemConsumer(config, new MetricsRegistryMap()); consumer.register(); http://git-wip-us.apache.org/repos/asf/samza/blob/40154b4f/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java index b4a2259..84ecc6c 100644 --- a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java +++ b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java @@ -20,6 +20,7 @@ package org.apache.samza.runtime; import com.google.common.collect.ImmutableList; +import java.time.Duration; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -51,6 +52,7 @@ import org.powermock.modules.junit4.PowerMockRunner; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.*; @@ -304,6 +306,24 @@ public class TestLocalApplicationRunner { planIdBeforeShuffle.equals(getExecutionPlanId(updatedStreamSpecs))); } + @Test + public void testWaitForFinishReturnsBeforeTimeout() { + LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig()); + long timeoutInMs = 1000; + + runner.getShutdownLatch().countDown(); + boolean finished = runner.waitForFinish(Duration.ofMillis(timeoutInMs)); + assertTrue("Application did not finish before the timeout.", finished); + } + + @Test + public void testWaitForFinishTimesout() { + LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig()); + long timeoutInMs = 100; + boolean finished = runner.waitForFinish(Duration.ofMillis(timeoutInMs)); + assertFalse("Application finished before the timeout.", finished); + } + private String getExecutionPlanId(List<StreamSpec> updatedStreamSpecs) { String intermediateStreamJson = updatedStreamSpecs.stream().map(this::streamSpecToJson).collect(Collectors.joining(",")); http://git-wip-us.apache.org/repos/asf/samza/blob/40154b4f/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java new file mode 100644 index 0000000..2ef2b33 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.runtime; + +import java.time.Duration; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.MapConfig; +import org.apache.samza.job.ApplicationStatus; +import org.junit.Test; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + + +/** + * A test class for {@link RemoteApplicationRunner}. + */ +public class TestRemoteApplicationRunner { + @Test + public void testWaitForFinishReturnsBeforeTimeout() { + RemoteApplicationRunner runner = spy(new RemoteApplicationRunner(new MapConfig())); + doReturn(ApplicationStatus.SuccessfulFinish).when(runner).getApplicationStatus(any(JobConfig.class)); + + boolean finished = runner.waitForFinish(Duration.ofMillis(5000)); + assertTrue("Application did not finish before the timeout.", finished); + } + + @Test + public void testWaitForFinishTimesout() { + RemoteApplicationRunner runner = spy(new RemoteApplicationRunner(new MapConfig())); + doReturn(ApplicationStatus.Running).when(runner).getApplicationStatus(any(JobConfig.class)); + + boolean finished = runner.waitForFinish(Duration.ofMillis(1000)); + assertFalse("Application finished before the timeout.", finished); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/40154b4f/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java index 4497a7c..f3093a7 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java @@ -110,7 +110,7 @@ public class SamzaSqlApplicationRunner extends AbstractApplicationRunner { Validate.isTrue(localRunner, "This method can be called only in standalone mode."); SamzaSqlApplication app = new SamzaSqlApplication(); run(app); - ((LocalApplicationRunner) appRunner).waitForFinish(); + appRunner.waitForFinish(); } @Override
