Repository: samza
Updated Branches:
  refs/heads/master fa49e7228 -> 40154b4f5


SAMZA-1653: Support waitForFinish in remote application runner and add 
waitForFinish

Added the following APIs to ApplicationRunner
 `void waitForFinish()`
 `boolean waitForFinish(Duration timeout)`

Implemented the wait for finish methods in remote application runner. Note 
currently, there is disparity in the APIs in terms of associating runners with 
stream application. Ideally, we want to decide on the cardinal relation between 
them and change the APIs accordingly.

The goal of the PR is limited to introduce API (waitForFinish) parity between 
runners in the current setup.
xinyuiscool

Author: Bharath Kumarasubramanian <[email protected]>

Reviewers: Xinyu Liu <[email protected]>

Closes #503 from bharathkk/samza-1653


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/40154b4f
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/40154b4f
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/40154b4f

Branch: refs/heads/master
Commit: 40154b4f589f48eeedf2685d706d29ae89af83f1
Parents: fa49e72
Author: Bharath Kumarasubramanian <[email protected]>
Authored: Mon May 7 11:11:01 2018 -0700
Committer: xiliu <[email protected]>
Committed: Mon May 7 11:11:01 2018 -0700

----------------------------------------------------------------------
 .../apache/samza/runtime/ApplicationRunner.java | 19 ++++++
 .../samza/runtime/LocalApplicationRunner.java   | 40 ++++++++++-
 .../samza/runtime/RemoteApplicationRunner.java  | 71 ++++++++++++++++++--
 .../runtime/TestLocalApplicationRunner.java     | 20 ++++++
 .../runtime/TestRemoteApplicationRunner.java    | 53 +++++++++++++++
 .../sql/runner/SamzaSqlApplicationRunner.java   |  2 +-
 6 files changed, 195 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/40154b4f/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java 
b/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
index 440dd33..8339429 100644
--- a/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
+++ b/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.runtime;
 
+import java.time.Duration;
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
@@ -107,6 +108,24 @@ public abstract class ApplicationRunner {
   public abstract ApplicationStatus status(StreamApplication streamApp);
 
   /**
+   * Waits until the application finishes.
+   */
+  public void waitForFinish() {
+    throw new UnsupportedOperationException(getClass().getName() + " does not 
support waitForFinish.");
+  }
+
+  /**
+   * Waits for {@code timeout} duration for the application to finish.
+   *
+   * @param timeout time to wait for the application to finish
+   * @return true - application finished before timeout
+   *         false - otherwise
+   */
+  public boolean waitForFinish(Duration timeout) {
+    throw new UnsupportedOperationException(getClass().getName() + " does not 
support timed waitForFinish.");
+  }
+
+  /**
    * Constructs a {@link StreamSpec} from the configuration for the specified 
streamId.
    *
    * The stream configurations are read from the following properties in the 
config:

http://git-wip-us.apache.org/repos/asf/samza/blob/40154b4f/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java 
b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index 8f481cd..1284060 100644
--- 
a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ 
b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -19,6 +19,8 @@
 
 package org.apache.samza.runtime;
 
+import com.google.common.annotations.VisibleForTesting;
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Set;
@@ -194,15 +196,42 @@ public class LocalApplicationRunner extends 
AbstractApplicationRunner {
   }
 
   /**
-   * Block until the application finishes
+   * Waits until the application finishes.
    */
+  @Override
   public void waitForFinish() {
+    waitForFinish(Duration.ofMillis(0));
+  }
+
+  /**
+   * Waits for {@code timeout} duration for the application to finish.
+   * If timeout &lt; 1, blocks the caller indefinitely.
+   *
+   * @param timeout time to wait for the application to finish
+   * @return true - application finished before timeout
+   *         false - otherwise
+   */
+  @Override
+  public boolean waitForFinish(Duration timeout) {
+    long timeoutInMs = timeout.toMillis();
+    boolean finished = true;
+
     try {
-      shutdownLatch.await();
+      if (timeoutInMs < 1) {
+        shutdownLatch.await();
+      } else {
+        finished = shutdownLatch.await(timeoutInMs, TimeUnit.MILLISECONDS);
+
+        if (!finished) {
+          LOG.warn("Timed out waiting for application to finish.");
+        }
+      }
     } catch (Exception e) {
-      LOG.error("Wait is interrupted by exception", e);
+      LOG.error("Error waiting for application to finish", e);
       throw new SamzaException(e);
     }
+
+    return finished;
   }
 
   /**
@@ -280,4 +309,9 @@ public class LocalApplicationRunner extends 
AbstractApplicationRunner {
   Set<StreamProcessor> getProcessors() {
     return processors;
   }
+
+  @VisibleForTesting
+  CountDownLatch getShutdownLatch() {
+    return shutdownLatch;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/40154b4f/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
 
b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
index ea218d0..202fa76 100644
--- 
a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
+++ 
b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.runtime;
 
+import java.time.Duration;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.ApplicationConfig;
@@ -34,6 +35,8 @@ import org.slf4j.LoggerFactory;
 
 import java.util.UUID;
 
+import static org.apache.samza.job.ApplicationStatus.*;
+
 
 /**
  * This class implements the {@link ApplicationRunner} that runs the 
applications in a remote cluster
@@ -41,6 +44,7 @@ import java.util.UUID;
 public class RemoteApplicationRunner extends AbstractApplicationRunner {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(RemoteApplicationRunner.class);
+  private static final long DEFAULT_SLEEP_DURATION_MS = 2000;
 
   public RemoteApplicationRunner(Config config) {
     super(config);
@@ -110,9 +114,7 @@ public class RemoteApplicationRunner extends 
AbstractApplicationRunner {
 
       ExecutionPlan plan = getExecutionPlan(app);
       for (JobConfig jobConfig : plan.getJobConfigs()) {
-        JobRunner runner = new JobRunner(jobConfig);
-        ApplicationStatus status = runner.status();
-        LOG.debug("Status is {} for job {}", new Object[]{status, 
jobConfig.getName()});
+        ApplicationStatus status = getApplicationStatus(jobConfig);
 
         switch (status.getStatusCode()) {
           case New:
@@ -133,22 +135,79 @@ public class RemoteApplicationRunner extends 
AbstractApplicationRunner {
 
       if (hasNewJobs) {
         // There are jobs not started, report as New
-        return ApplicationStatus.New;
+        return New;
       } else if (hasRunningJobs) {
         // All jobs are started, some are running
-        return ApplicationStatus.Running;
+        return Running;
       } else if (unsuccessfulFinishStatus != null) {
         // All jobs are finished, some are not successful
         return unsuccessfulFinishStatus;
       } else {
         // All jobs are finished successfully
-        return ApplicationStatus.SuccessfulFinish;
+        return SuccessfulFinish;
       }
     } catch (Throwable t) {
       throw new SamzaException("Failed to get status for application", t);
     }
   }
 
+  /* package private */ ApplicationStatus getApplicationStatus(JobConfig 
jobConfig) {
+    JobRunner runner = new JobRunner(jobConfig);
+    ApplicationStatus status = runner.status();
+    LOG.debug("Status is {} for job {}", new Object[]{status, 
jobConfig.getName()});
+    return status;
+  }
+
+  /**
+   * Waits until the application finishes.
+   */
+  public void waitForFinish() {
+    waitForFinish(Duration.ofMillis(0));
+  }
+
+  /**
+   * Waits for {@code timeout} duration for the application to finish.
+   * If timeout &lt; 1, blocks the caller indefinitely.
+   *
+   * @param timeout time to wait for the application to finish
+   * @return true - application finished before timeout
+   *         false - otherwise
+   */
+  public boolean waitForFinish(Duration timeout) {
+    JobConfig jobConfig = new JobConfig(config);
+    boolean finished = true;
+    long timeoutInMs = timeout.toMillis();
+    long startTimeInMs = System.currentTimeMillis();
+    long timeElapsed = 0L;
+
+    long sleepDurationInMs = timeoutInMs < 1 ?
+        DEFAULT_SLEEP_DURATION_MS : Math.min(timeoutInMs, 
DEFAULT_SLEEP_DURATION_MS);
+    ApplicationStatus status;
+
+    try {
+      while (timeoutInMs < 1 || timeElapsed <= timeoutInMs) {
+        status = getApplicationStatus(jobConfig);
+        if (status == SuccessfulFinish || status == UnsuccessfulFinish) {
+          LOG.info("Application finished with status {}", status);
+          break;
+        }
+
+        Thread.sleep(sleepDurationInMs);
+        timeElapsed = System.currentTimeMillis() - startTimeInMs;
+      }
+
+      if (timeElapsed > timeoutInMs) {
+        LOG.warn("Timed out waiting for application to finish.");
+        finished = false;
+      }
+    } catch (Exception e) {
+      LOG.error("Error waiting for application to finish", e);
+      throw new SamzaException(e);
+    }
+
+    return finished;
+  }
+
   private Config getConfigFromPrevRun() {
     CoordinatorStreamSystemConsumer consumer = new 
CoordinatorStreamSystemConsumer(config, new MetricsRegistryMap());
     consumer.register();

http://git-wip-us.apache.org/repos/asf/samza/blob/40154b4f/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
 
b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
index b4a2259..84ecc6c 100644
--- 
a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
+++ 
b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
@@ -20,6 +20,7 @@
 package org.apache.samza.runtime;
 
 import com.google.common.collect.ImmutableList;
+import java.time.Duration;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -51,6 +52,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.*;
@@ -304,6 +306,24 @@ public class TestLocalApplicationRunner {
         planIdBeforeShuffle.equals(getExecutionPlanId(updatedStreamSpecs)));
   }
 
+  @Test
+  public void testWaitForFinishReturnsBeforeTimeout() {
+    LocalApplicationRunner runner = new LocalApplicationRunner(new 
MapConfig());
+    long timeoutInMs = 1000;
+
+    runner.getShutdownLatch().countDown();
+    boolean finished = runner.waitForFinish(Duration.ofMillis(timeoutInMs));
+    assertTrue("Application did not finish before the timeout.", finished);
+  }
+
+  @Test
+  public void testWaitForFinishTimesout() {
+    LocalApplicationRunner runner = new LocalApplicationRunner(new 
MapConfig());
+    long timeoutInMs = 100;
+    boolean finished = runner.waitForFinish(Duration.ofMillis(timeoutInMs));
+    assertFalse("Application finished before the timeout.", finished);
+  }
+
   private String getExecutionPlanId(List<StreamSpec> updatedStreamSpecs) {
     String intermediateStreamJson =
         
updatedStreamSpecs.stream().map(this::streamSpecToJson).collect(Collectors.joining(","));

http://git-wip-us.apache.org/repos/asf/samza/blob/40154b4f/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java
 
b/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java
new file mode 100644
index 0000000..2ef2b33
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.runtime;
+
+import java.time.Duration;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.job.ApplicationStatus;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+
+/**
+ * A test class for {@link RemoteApplicationRunner}.
+ */
+public class TestRemoteApplicationRunner {
+  @Test
+  public void testWaitForFinishReturnsBeforeTimeout() {
+    RemoteApplicationRunner runner = spy(new RemoteApplicationRunner(new 
MapConfig()));
+    
doReturn(ApplicationStatus.SuccessfulFinish).when(runner).getApplicationStatus(any(JobConfig.class));
+
+    boolean finished = runner.waitForFinish(Duration.ofMillis(5000));
+    assertTrue("Application did not finish before the timeout.", finished);
+  }
+
+  @Test
+  public void testWaitForFinishTimesout() {
+    RemoteApplicationRunner runner = spy(new RemoteApplicationRunner(new 
MapConfig()));
+    
doReturn(ApplicationStatus.Running).when(runner).getApplicationStatus(any(JobConfig.class));
+
+    boolean finished = runner.waitForFinish(Duration.ofMillis(1000));
+    assertFalse("Application finished before the timeout.", finished);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/40154b4f/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
index 4497a7c..f3093a7 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
@@ -110,7 +110,7 @@ public class SamzaSqlApplicationRunner extends 
AbstractApplicationRunner {
     Validate.isTrue(localRunner, "This method can be called only in standalone 
mode.");
     SamzaSqlApplication app = new SamzaSqlApplication();
     run(app);
-    ((LocalApplicationRunner) appRunner).waitForFinish();
+    appRunner.waitForFinish();
   }
 
   @Override

Reply via email to