[ 
https://issues.apache.org/jira/browse/BEAM-6382?focusedWorklogId=182790&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-182790
 ]

ASF GitHub Bot logged work on BEAM-6382:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Jan/19 01:21
            Start Date: 09/Jan/19 01:21
    Worklog Time Spent: 10m 
      Work Description: xinyuiscool commented on pull request #7443: 
[BEAM-6382] SamzaRunner: add an option to read configs using a user-defined 
factory
URL: https://github.com/apache/beam/pull/7443#discussion_r246227484
 
 

 ##########
 File path: 
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineResult.java
 ##########
 @@ -52,40 +50,41 @@ public State getState() {
   }
 
   @Override
-  public State cancel() throws IOException {
+  public State cancel() {
     runner.kill(app);
-
-    //TODO: runner.waitForFinish() after SAMZA-1653 done
-    return getState();
+    return waitUntilFinish();
   }
 
   @Override
   public State waitUntilFinish(Duration duration) {
-    //TODO: SAMZA-1653
-    throw new UnsupportedOperationException(
-        "waitUntilFinish(duration) is not supported by the SamzaRunner");
+    try {
+      runner.waitForFinish(java.time.Duration.ofMillis(duration.getMillis()));
+    } catch (Exception e) {
+      throw new Pipeline.PipelineExecutionException(e);
+    }
+
+    final StateInfo stateInfo = getStateInfo();
+    if (stateInfo.state == State.FAILED) {
+      throw stateInfo.error;
+    }
+
+    return stateInfo.state;
   }
 
   @Override
   public State waitUntilFinish() {
-    if (runner instanceof LocalApplicationRunner) {
-      try {
-        ((LocalApplicationRunner) runner).waitForFinish();
-      } catch (Exception e) {
-        throw new Pipeline.PipelineExecutionException(e);
-      }
-
-      final StateInfo stateInfo = getStateInfo();
-      if (stateInfo.state == State.FAILED) {
-        throw stateInfo.error;
-      }
-
-      return stateInfo.state;
-    } else {
-      // TODO: SAMZA-1653 support waitForFinish in remote runner too
-      throw new UnsupportedOperationException(
-          "waitUntilFinish is not supported by the SamzaRunner when running 
remotely");
+    try {
 
 Review comment:
   Sure. let me clean up the code to share the logic.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 182790)
    Time Spent: 40m  (was: 0.5h)

> SamzaRunner: add an option to read configs using a user-defined factory
> -----------------------------------------------------------------------
>
>                 Key: BEAM-6382
>                 URL: https://issues.apache.org/jira/browse/BEAM-6382
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-samza
>            Reporter: Xinyu Liu
>            Assignee: Xinyu Liu
>            Priority: Major
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> We need an option to read configs from a factory which is useful in Yarn as 
> well as user-defined file format. By default this config factory is to read 
> property file.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to