[ 
https://issues.apache.org/jira/browse/BEAM-4176?focusedWorklogId=151026&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-151026
 ]

ASF GitHub Bot logged work on BEAM-4176:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 04/Oct/18 04:22
            Start Date: 04/Oct/18 04:22
    Worklog Time Spent: 10m 
      Work Description: tweise commented on a change in pull request #6563: 
[BEAM-4176] Cleanup SDK Harness docker container
URL: https://github.com/apache/beam/pull/6563#discussion_r222534143
 
 

 ##########
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java
 ##########
 @@ -29,23 +31,35 @@
 import 
org.apache.beam.runners.fnexecution.environment.EmbeddedEnvironmentFactory;
 import 
org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory;
 import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PortablePipelineDebugOptions;
 
 /** Implementation of a {@link FlinkExecutableStageContext}. */
 class FlinkDefaultExecutableStageContext implements 
FlinkExecutableStageContext, AutoCloseable {
   private final JobBundleFactory jobBundleFactory;
 
   private static FlinkDefaultExecutableStageContext create(JobInfo jobInfo) {
-    JobBundleFactory jobBundleFactory =
-        DefaultJobBundleFactory.create(
-            jobInfo,
-            ImmutableMap.of(
-                BeamUrns.getUrn(StandardEnvironments.Environments.DOCKER),
-                new DockerEnvironmentFactory.Provider(),
-                BeamUrns.getUrn(StandardEnvironments.Environments.PROCESS),
-                new ProcessEnvironmentFactory.Provider(),
-                Environments.ENVIRONMENT_EMBEDDED, // Non Public urn for 
testing.
-                new EmbeddedEnvironmentFactory.Provider()));
-    return new FlinkDefaultExecutableStageContext(jobBundleFactory);
+    try {
+      PipelineOptions pipelineOptions =
+          PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions());
+      JobBundleFactory jobBundleFactory =
+          DefaultJobBundleFactory.create(
+              jobInfo,
+              ImmutableMap.of(
+                  BeamUrns.getUrn(StandardEnvironments.Environments.DOCKER),
+                  new DockerEnvironmentFactory.Provider(
+                      pipelineOptions
+                          .as(PortablePipelineDebugOptions.class)
 
 Review comment:
   This line could be moved up to where the pipeline options are constructed. 
It may also be nice to take care of the IOException try/catch clutter in 
PipelineOptionsTranslation.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 151026)
    Time Spent: 28h 20m  (was: 28h 10m)

> Java: Portable batch runner passes all ValidatesRunner tests that 
> non-portable runner passes
> --------------------------------------------------------------------------------------------
>
>                 Key: BEAM-4176
>                 URL: https://issues.apache.org/jira/browse/BEAM-4176
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>            Reporter: Ben Sidhom
>            Assignee: Ankur Goenka
>            Priority: Major
>         Attachments: 81VxNWtFtke.png, Screen Shot 2018-08-14 at 4.18.31 
> PM.png, Screen Shot 2018-09-03 at 11.07.38 AM.png
>
>          Time Spent: 28h 20m
>  Remaining Estimate: 0h
>
> We need this as a sanity check that runner execution is correct.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to