[ 
https://issues.apache.org/jira/browse/BEAM-5464?focusedWorklogId=161500&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-161500
 ]

ASF GitHub Bot logged work on BEAM-5464:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 01/Nov/18 09:54
            Start Date: 01/Nov/18 09:54
    Worklog Time Spent: 10m 
      Work Description: asfgit closed pull request #6897: [BEAM-5464] Use 
BATCH_FORCED as the default ExecutionMode for batch pipeline
URL: https://github.com/apache/beam/pull/6897
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
index ddc5b722c9a..ddacf3f1ed1 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
@@ -77,6 +77,9 @@ static ExecutionEnvironment createBatchExecutionEnvironment(
       flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
     }
 
+    // Set the execution more for data exchange.
+    
flinkBatchEnv.getConfig().setExecutionMode(options.getExecutionModeForBatch());
+
     // set the correct parallelism.
     if (options.getParallelism() != -1 && !(flinkBatchEnv instanceof 
CollectionEnvironment)) {
       flinkBatchEnv.setParallelism(options.getParallelism());
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index b695c370cf1..a84b964face 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -24,6 +24,7 @@
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.StreamingOptions;
+import org.apache.flink.api.common.ExecutionMode;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.streaming.api.CheckpointingMode;
 
@@ -32,6 +33,7 @@
     extends PipelineOptions, ApplicationNameOptions, StreamingOptions {
 
   String AUTO = "[auto]";
+  String PIPELINED = "PIPELINED";
 
   /**
    * List of local files to make available to workers.
@@ -187,4 +189,12 @@
   Long getLatencyTrackingInterval();
 
   void setLatencyTrackingInterval(Long interval);
+
+  @Description(
+      "Flink mode for data exchange for batch pipeline. "
+          + "Reference {@link org.apache.flink.api.common.ExecutionMode}")
+  @Default.Enum(PIPELINED)
+  ExecutionMode getExecutionModeForBatch();
+
+  void setExecutionModeForBatch(ExecutionMode executionMode);
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 161500)
    Time Spent: 1h 40m  (was: 1.5h)

> Portable beam hangs while running TFX preprocessing step on a distributed 
> cluster
> ---------------------------------------------------------------------------------
>
>                 Key: BEAM-5464
>                 URL: https://issues.apache.org/jira/browse/BEAM-5464
>             Project: Beam
>          Issue Type: Bug
>          Components: java-fn-execution
>            Reporter: Axel Magnuson
>            Assignee: Ankur Goenka
>            Priority: Major
>          Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> Recently I went through the exercise of running the TFX taxi example on a 
> dataproc cluster.  However it would always hang indefinitely.  The flink UI 
> indicated that the job was halfway done.  However I could not see any clear 
> errors in the job driver logs, the job service logs, or the Flink logs.  The 
> root cause is still a mystery to me.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to