[
https://issues.apache.org/jira/browse/BEAM-4176?focusedWorklogId=151026&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-151026
]
ASF GitHub Bot logged work on BEAM-4176:
----------------------------------------
Author: ASF GitHub Bot
Created on: 04/Oct/18 04:22
Start Date: 04/Oct/18 04:22
Worklog Time Spent: 10m
Work Description: tweise commented on a change in pull request #6563:
[BEAM-4176] Cleanup SDK Harness docker container
URL: https://github.com/apache/beam/pull/6563#discussion_r222534143
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java
##########
@@ -29,23 +31,35 @@
import
org.apache.beam.runners.fnexecution.environment.EmbeddedEnvironmentFactory;
import
org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PortablePipelineDebugOptions;
/** Implementation of a {@link FlinkExecutableStageContext}. */
class FlinkDefaultExecutableStageContext implements
FlinkExecutableStageContext, AutoCloseable {
private final JobBundleFactory jobBundleFactory;
private static FlinkDefaultExecutableStageContext create(JobInfo jobInfo) {
- JobBundleFactory jobBundleFactory =
- DefaultJobBundleFactory.create(
- jobInfo,
- ImmutableMap.of(
- BeamUrns.getUrn(StandardEnvironments.Environments.DOCKER),
- new DockerEnvironmentFactory.Provider(),
- BeamUrns.getUrn(StandardEnvironments.Environments.PROCESS),
- new ProcessEnvironmentFactory.Provider(),
- Environments.ENVIRONMENT_EMBEDDED, // Non Public urn for
testing.
- new EmbeddedEnvironmentFactory.Provider()));
- return new FlinkDefaultExecutableStageContext(jobBundleFactory);
+ try {
+ PipelineOptions pipelineOptions =
+ PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions());
+ JobBundleFactory jobBundleFactory =
+ DefaultJobBundleFactory.create(
+ jobInfo,
+ ImmutableMap.of(
+ BeamUrns.getUrn(StandardEnvironments.Environments.DOCKER),
+ new DockerEnvironmentFactory.Provider(
+ pipelineOptions
+ .as(PortablePipelineDebugOptions.class)
Review comment:
This line could be moved up to where the pipeline options are constructed.
It may also be nice to take care of the IOException try/catch clutter in
PipelineOptionsTranslation.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 151026)
Time Spent: 28h 20m (was: 28h 10m)
> Java: Portable batch runner passes all ValidatesRunner tests that
> non-portable runner passes
> --------------------------------------------------------------------------------------------
>
> Key: BEAM-4176
> URL: https://issues.apache.org/jira/browse/BEAM-4176
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Reporter: Ben Sidhom
> Assignee: Ankur Goenka
> Priority: Major
> Attachments: 81VxNWtFtke.png, Screen Shot 2018-08-14 at 4.18.31
> PM.png, Screen Shot 2018-09-03 at 11.07.38 AM.png
>
> Time Spent: 28h 20m
> Remaining Estimate: 0h
>
> We need this as a sanity check that runner execution is correct.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)