boyuanzz commented on a change in pull request #14074:
URL: https://github.com/apache/beam/pull/14074#discussion_r592020157



##########
File path: 
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -927,39 +927,68 @@ public DataflowPipelineJob run(Pipeline pipeline) {
       if (!experiments.contains("beam_fn_api")) {
         experiments.add("beam_fn_api");
       }
-      options.setExperiments(experiments);
+      if (!experiments.contains("use_portable_job_submission")) {
+        experiments.add("use_portable_job_submission");
+      }
+      options.setExperiments(ImmutableList.copyOf(experiments));
     }
 
     logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
     if (containsUnboundedPCollection(pipeline)) {
       options.setStreaming(true);
     }
-    replaceTransforms(pipeline);
 
     LOG.info(
         "Executing pipeline on the Dataflow Service, which will have billing 
implications "
             + "related to Google Compute Engine usage and other Google Cloud 
Services.");
 
-    // Capture the sdkComponents for look up during step translations
-    SdkComponents sdkComponents = SdkComponents.create();
-
     DataflowPipelineOptions dataflowOptions = 
options.as(DataflowPipelineOptions.class);
     String workerHarnessContainerImageURL = 
DataflowRunner.getContainerImageForJob(dataflowOptions);
+
+    // This incorrectly puns the worker harness container image (which 
implements v1beta3 API)
+    // with the SDK harness image (which implements Fn API).
+    //
+    // The same Environment is used in different and contradictory ways, 
depending on whether
+    // it is a v1 or v2 job submission.
     RunnerApi.Environment defaultEnvironmentForDataflow =
         Environments.createDockerEnvironment(workerHarnessContainerImageURL);
 
-    sdkComponents.registerEnvironment(
+    // The SdkComponents for portable an non-portable job submission must be 
kept distinct. Both
+    // need the default environment.
+    SdkComponents portableComponents = SdkComponents.create();
+    portableComponents.registerEnvironment(
         defaultEnvironmentForDataflow
             .toBuilder()
             .addAllDependencies(getDefaultArtifacts())
             .addAllCapabilities(Environments.getJavaCapabilities())
             .build());
 
-    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, 
sdkComponents, true);
-
-    LOG.debug("Portable pipeline proto:\n{}", 
TextFormat.printToString(pipelineProto));
+    RunnerApi.Pipeline portablePipelineProto =
+        PipelineTranslation.toProto(pipeline, portableComponents, false);
+    LOG.debug("Portable pipeline proto:\n{}", 
TextFormat.printToString(portablePipelineProto));
+    // Stage the portable pipeline proto, retrieving the staged pipeline path, 
then update
+    // the options on the new job
+    // TODO: add an explicit `pipeline` parameter to the submission instead of 
pipeline options

Review comment:
       Could you please share more details on this TODO? Like why pipeline 
option is not preferred?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to