This is an automated email from the ASF dual-hosted git repository.

ibzib pushed a commit to branch release-2.21.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.21.0 by this push:
     new d87c0ab  [BEAM-9001, BEAM-6327] Ensure that all transforms (except for 
required runner implemented transforms) have an environment id. (#11670)
     new e859735  Merge pull request #11703 from ibzib/BEAM-9001
d87c0ab is described below

commit d87c0abc51815736c190f05530eca880e6b394ea
Author: Lukasz Cwik <[email protected]>
AuthorDate: Wed May 13 15:45:16 2020 -0700

    [BEAM-9001, BEAM-6327] Ensure that all transforms (except for required 
runner implemented transforms) have an environment id. (#11670)
    
    * [BEAM-9001, BEAM-6327] Ensure that all transforms (except for required 
runner implemented transforms) have an environment id.
    
    * fixup! Fix native transform expander to not reinsert deleted transforms.
    
    * fixup! Address chamikara's PR comments
---
 .../pipeline/src/main/proto/beam_runner_api.proto  | 18 ++++++---
 .../core/construction/PTransformTranslation.java   | 40 +++++++++++--------
 .../core/construction/graph/PipelineValidator.java |  9 +++++
 ...er.java => TrivialNativeTransformExpander.java} | 45 ++++++++++++----------
 .../construction/graph/QueryablePipelineTest.java  |  6 ++-
 .../beam/runners/flink/FlinkPipelineRunner.java    |  5 ++-
 .../FlinkStreamingPortablePipelineTranslator.java  |  2 +-
 .../beam/runners/spark/SparkPipelineRunner.java    |  5 ++-
 sdks/go/pkg/beam/core/runtime/graphx/translate.go  | 22 ++++++-----
 sdks/python/apache_beam/pipeline.py                | 24 +++++-------
 10 files changed, 106 insertions(+), 70 deletions(-)

diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto 
b/model/pipeline/src/main/proto/beam_runner_api.proto
index 0b8d651..2bef4ee 100644
--- a/model/pipeline/src/main/proto/beam_runner_api.proto
+++ b/model/pipeline/src/main/proto/beam_runner_api.proto
@@ -151,8 +151,9 @@ message PTransform {
   // details.
   FunctionSpec spec = 1;
 
-  // (Optional) if this node is a composite, a list of the ids of
-  // transforms that it contains.
+  // (Optional) A list of the ids of transforms that it contains.
+  //
+  // Primitive transforms are not allowed to specify this.
   repeated string subtransforms = 2;
 
   // (Required) A map from local names of inputs (unique only with this map, 
and
@@ -184,9 +185,10 @@ message PTransform {
   // there is none, it may be omitted.
   repeated DisplayData display_data = 6;
 
-  // (Optional) Environment where the current PTransform should be executed in.
-  // Runner that executes the pipeline may choose to override this if needed. 
If
-  // not specified, environment will be decided by the runner.
+  // Environment where the current PTransform should be executed in.
+  //
+  // Transforms that are required to be implemented by a runner must omit this.
+  // All other transforms are required to specify this.
   string environment_id = 7;
 }
 
@@ -227,12 +229,18 @@ message StandardPTransforms {
     // See https://beam.apache.org/documentation/programming-guide/#groupbykey
     // for additional details.
     //
+    // Never defines an environment as the runner is required to implement this
+    // transform.
+    //
     // Payload: None
     GROUP_BY_KEY = 2 [(beam_urn) = "beam:transform:group_by_key:v1"];
 
     // A transform which produces a single empty byte array at the minimum
     // timestamp in the GlobalWindow.
     //
+    // Never defines an environment as the runner is required to implement this
+    // transform.
+    //
     // Payload: None
     IMPULSE = 3 [(beam_urn) = "beam:transform:impulse:v1"];
 
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
index 1ff1d69..cf3a7e5 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -50,8 +50,8 @@ import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSortedSet;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
@@ -74,6 +74,10 @@ public class PTransformTranslation {
   public static final String MAP_WINDOWS_TRANSFORM_URN = 
"beam:transform:map_windows:v1";
   public static final String MERGE_WINDOWS_TRANSFORM_URN = 
"beam:transform:merge_windows:v1";
 
+  // Required runner implemented transforms. These transforms should never 
specify an environment.
+  public static final ImmutableSet<String> RUNNER_IMPLEMENTED_TRANSFORMS =
+      ImmutableSet.of(GROUP_BY_KEY_TRANSFORM_URN, IMPULSE_TRANSFORM_URN);
+
   // DeprecatedPrimitives
   /**
    * @deprecated SDKs should move away from creating `Read` transforms and 
migrate to using Impulse
@@ -350,10 +354,15 @@ public class PTransformTranslation {
 
       // A composite transform is permitted to have a null spec. There are 
also some pseudo-
       // primitives not yet supported by the portability framework that have 
null specs
+      String urn = "";
       if (spec != null) {
+        urn = spec.getUrn();
         transformBuilder.setSpec(spec);
       }
-      transformBuilder.setEnvironmentId(components.getOnlyEnvironmentId());
+
+      if (!RUNNER_IMPLEMENTED_TRANSFORMS.contains(urn)) {
+        transformBuilder.setEnvironmentId(components.getOnlyEnvironmentId());
+      }
       return transformBuilder.build();
     }
   }
@@ -367,11 +376,6 @@ public class PTransformTranslation {
     private static final Map<Class<? extends PTransform>, 
TransformPayloadTranslator>
         KNOWN_PAYLOAD_TRANSLATORS = loadTransformPayloadTranslators();
 
-    // TODO: BEAM-9001 - set environment ID in all transforms and allow 
runners to override.
-    private static List<String> sdkTransformsWithEnvironment =
-        ImmutableList.of(
-            PAR_DO_TRANSFORM_URN, COMBINE_PER_KEY_TRANSFORM_URN, 
ASSIGN_WINDOWS_TRANSFORM_URN);
-
     private static Map<Class<? extends PTransform>, TransformPayloadTranslator>
         loadTransformPayloadTranslators() {
       HashMap<Class<? extends PTransform>, TransformPayloadTranslator> 
translators =
@@ -423,14 +427,20 @@ public class PTransformTranslation {
       if (spec != null) {
         transformBuilder.setSpec(spec);
 
-        if (sdkTransformsWithEnvironment.contains(spec.getUrn())) {
-          transformBuilder.setEnvironmentId(components.getOnlyEnvironmentId());
-        } else if (spec.getUrn().equals(READ_TRANSFORM_URN)
-            && (appliedPTransform.getTransform().getClass() == 
Read.Bounded.class)) {
-          // Only assigning environment to Bounded reads. Not assigning an 
environment to Unbounded
-          // reads since they are a Runner translated transform, unless, in 
the future, we have an
-          // adapter available for splittable DoFn.
-          transformBuilder.setEnvironmentId(components.getOnlyEnvironmentId());
+        // Required runner implemented transforms should not have an 
environment id.
+        if (!RUNNER_IMPLEMENTED_TRANSFORMS.contains(spec.getUrn())) {
+          // TODO(BEAM-9309): Remove existing hacks around deprecated READ 
transform.
+          if (spec.getUrn().equals(READ_TRANSFORM_URN)) {
+            // Only assigning environment to Bounded reads. Not assigning an 
environment to
+            // Unbounded
+            // reads since they are a Runner translated transform, unless, in 
the future, we have an
+            // adapter available for splittable DoFn.
+            if (appliedPTransform.getTransform().getClass() == 
Read.Bounded.class) {
+              
transformBuilder.setEnvironmentId(components.getOnlyEnvironmentId());
+            }
+          } else {
+            
transformBuilder.setEnvironmentId(components.getOnlyEnvironmentId());
+          }
         }
       }
       return transformBuilder.build();
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineValidator.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineValidator.java
index 34e74a7..a27b220 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineValidator.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineValidator.java
@@ -208,6 +208,15 @@ public class PipelineValidator {
     }
 
     String urn = transform.getSpec().getUrn();
+    if (PTransformTranslation.RUNNER_IMPLEMENTED_TRANSFORMS.contains(urn)) {
+      checkArgument(
+          transform.getEnvironmentId().isEmpty(),
+          "Transform %s references environment %s when no environment should 
be specified since it is a required runner implemented transform %s.",
+          id,
+          transform.getEnvironmentId(),
+          urn);
+    }
+
     if (VALIDATORS.containsKey(urn)) {
       try {
         VALIDATORS.get(urn).validate(id, transform, components, requirements);
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineTrimmer.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TrivialNativeTransformExpander.java
similarity index 54%
rename from 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineTrimmer.java
rename to 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TrivialNativeTransformExpander.java
index 894a5d1..8ad95e8 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineTrimmer.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TrivialNativeTransformExpander.java
@@ -23,20 +23,22 @@ import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-// TODO(BEAM-6327): Remove the need for this.
-
-/** PipelineTrimmer removes subcomponents of native transforms that shouldn't 
be fused. */
-public class PipelineTrimmer {
-  private static final Logger LOG = 
LoggerFactory.getLogger(PipelineTrimmer.class);
+/**
+ * TrivialNativeTransformExpander is used to replace transforms with known 
URNs with their native
+ * equivalent.
+ */
+public class TrivialNativeTransformExpander {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TrivialNativeTransformExpander.class);
 
   /**
-   * Remove subcomponents of native transforms that shouldn't be fused.
+   * Replaces transforms with the known URN with a native equivalent stripping 
the environment and
+   * removing any sub-transforms from the returned pipeline.
    *
    * @param pipeline the pipeline to be trimmed
    * @param knownUrns set of URNs for the runner's native transforms
    * @return the trimmed pipeline
    */
-  public static Pipeline trim(Pipeline pipeline, Set<String> knownUrns) {
+  public static Pipeline forKnownUrns(Pipeline pipeline, Set<String> 
knownUrns) {
     return makeKnownUrnsPrimitives(pipeline, knownUrns);
   }
 
@@ -44,26 +46,29 @@ public class PipelineTrimmer {
       RunnerApi.Pipeline pipeline, Set<String> knownUrns) {
     RunnerApi.Pipeline.Builder trimmedPipeline = pipeline.toBuilder();
     for (String ptransformId : 
pipeline.getComponents().getTransformsMap().keySet()) {
-      if (knownUrns.contains(
-          
pipeline.getComponents().getTransformsOrThrow(ptransformId).getSpec().getUrn()))
 {
-        LOG.debug("Removing descendants of known PTransform {}" + 
ptransformId);
+      // Skip over previously removed transforms from the original pipeline 
since we iterate
+      // over all transforms from the original pipeline and not the trimmed 
down version.
+      RunnerApi.PTransform currentTransform =
+          trimmedPipeline.getComponents().getTransformsOrDefault(ptransformId, 
null);
+      if (currentTransform != null && 
knownUrns.contains(currentTransform.getSpec().getUrn())) {
+        LOG.debug(
+            "Removing descendants and environment of known native PTransform 
{}" + ptransformId);
         removeDescendants(trimmedPipeline, ptransformId);
+        trimmedPipeline
+            .getComponentsBuilder()
+            .putTransforms(
+                ptransformId,
+                
currentTransform.toBuilder().clearSubtransforms().clearEnvironmentId().build());
       }
     }
     return trimmedPipeline.build();
   }
 
   private static void removeDescendants(RunnerApi.Pipeline.Builder pipeline, 
String parentId) {
-    RunnerApi.PTransform parentProto =
-        pipeline.getComponents().getTransformsOrDefault(parentId, null);
-    if (parentProto != null) {
-      for (String childId : parentProto.getSubtransformsList()) {
-        removeDescendants(pipeline, childId);
-        pipeline.getComponentsBuilder().removeTransforms(childId);
-      }
-      pipeline
-          .getComponentsBuilder()
-          .putTransforms(parentId, 
parentProto.toBuilder().clearSubtransforms().build());
+    RunnerApi.PTransform parentProto = 
pipeline.getComponents().getTransformsOrThrow(parentId);
+    for (String childId : parentProto.getSubtransformsList()) {
+      removeDescendants(pipeline, childId);
+      pipeline.getComponentsBuilder().removeTransforms(childId);
     }
   }
 }
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/QueryablePipelineTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/QueryablePipelineTest.java
index 3b041b4..ff797ef 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/QueryablePipelineTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/QueryablePipelineTest.java
@@ -50,9 +50,11 @@ import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.PBegin;
@@ -342,7 +344,7 @@ public class QueryablePipelineTest {
   public void getEnvironmentWithEnvironment() {
     Pipeline p = Pipeline.create();
     PCollection<Long> longs = p.apply("BoundedRead", 
Read.from(CountingSource.upTo(100L)));
-    PCollectionList.of(longs).and(longs).and(longs).apply("flatten", 
Flatten.pCollections());
+    longs.apply(WithKeys.of("a")).apply("groupByKey", GroupByKey.create());
 
     Components components = PipelineTranslation.toProto(p).getComponents();
     QueryablePipeline qp = QueryablePipeline.forPrimitivesIn(components);
@@ -350,7 +352,7 @@ public class QueryablePipelineTest {
     PTransformNode environmentalRead =
         PipelineNode.pTransform("BoundedRead", 
components.getTransformsOrThrow("BoundedRead"));
     PTransformNode nonEnvironmentalTransform =
-        PipelineNode.pTransform("flatten", 
components.getTransformsOrThrow("flatten"));
+        PipelineNode.pTransform("groupByKey", 
components.getTransformsOrThrow("groupByKey"));
 
     assertThat(qp.getEnvironment(environmentalRead).isPresent(), is(true));
     assertThat(
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
index 0729f25..8945d9d 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
@@ -30,9 +30,9 @@ import 
org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
 import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
-import org.apache.beam.runners.core.construction.graph.PipelineTrimmer;
 import org.apache.beam.runners.core.construction.graph.ProtoOverrides;
 import org.apache.beam.runners.core.construction.graph.SplittableParDoExpander;
+import 
org.apache.beam.runners.core.construction.graph.TrivialNativeTransformExpander;
 import org.apache.beam.runners.core.metrics.MetricsPusher;
 import 
org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineJarUtils;
 import 
org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineResult;
@@ -98,7 +98,8 @@ public class FlinkPipelineRunner implements 
PortablePipelineRunner {
 
     // Don't let the fuser fuse any subcomponents of native transforms.
     Pipeline trimmedPipeline =
-        PipelineTrimmer.trim(pipelineWithSdfExpanded, translator.knownUrns());
+        TrivialNativeTransformExpander.forKnownUrns(
+            pipelineWithSdfExpanded, translator.knownUrns());
 
     // Fused pipeline proto.
     // TODO: Consider supporting partially-fused graphs.
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
index 5494749..f041d0c 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
@@ -228,7 +228,7 @@ public class FlinkStreamingPortablePipelineTranslator
 
   @Override
   public Set<String> knownUrns() {
-    // Do not expose Read as a known URN because PipelineTrimmer otherwise 
removes
+    // Do not expose Read as a known URN because 
TrivialNativeTransformExpander otherwise removes
     // the subtransforms which are added in case of bounded reads. We only 
have a
     // translator here for unbounded Reads which are native transforms which 
do not
     // have subtransforms. Unbounded Reads are used by cross-language 
transforms, e.g.
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
index e5166a3..9727757 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
@@ -30,9 +30,9 @@ import 
org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
 import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
-import org.apache.beam.runners.core.construction.graph.PipelineTrimmer;
 import org.apache.beam.runners.core.construction.graph.ProtoOverrides;
 import org.apache.beam.runners.core.construction.graph.SplittableParDoExpander;
+import 
org.apache.beam.runners.core.construction.graph.TrivialNativeTransformExpander;
 import org.apache.beam.runners.core.metrics.MetricsPusher;
 import 
org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineJarUtils;
 import 
org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineResult;
@@ -82,7 +82,8 @@ public class SparkPipelineRunner implements 
PortablePipelineRunner {
 
     // Don't let the fuser fuse any subcomponents of native transforms.
     Pipeline trimmedPipeline =
-        PipelineTrimmer.trim(pipelineWithSdfExpanded, translator.knownUrns());
+        TrivialNativeTransformExpander.forKnownUrns(
+            pipelineWithSdfExpanded, translator.knownUrns());
 
     // Fused pipeline proto.
     // TODO: Consider supporting partially-fused graphs.
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go 
b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
index 9ff84c2..9441151 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
@@ -177,6 +177,7 @@ func (m *marshaller) addScopeTree(s *ScopeTree) string {
        transform := &pipepb.PTransform{
                UniqueName:    s.Scope.Name,
                Subtransforms: subtransforms,
+               EnvironmentId: m.addDefaultEnv(),
        }
 
        m.updateIfCombineComposite(s, transform)
@@ -208,7 +209,6 @@ func (m *marshaller) updateIfCombineComposite(s *ScopeTree, 
transform *pipepb.PT
                AccumulatorCoderId: acID,
        }
        transform.Spec = &pipepb.FunctionSpec{Urn: URNCombinePerKey, Payload: 
protox.MustEncode(payload)}
-       transform.EnvironmentId = m.addDefaultEnv()
 }
 
 func (m *marshaller) addMultiEdge(edge NamedEdge) []string {
@@ -238,10 +238,8 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) []string 
{
        // allPIds tracks additional PTransformIDs generated for the pipeline
        var allPIds []string
        var spec *pipepb.FunctionSpec
-       var transformEnvID = ""
        switch edge.Edge.Op {
        case graph.Impulse:
-               // TODO(herohde) 7/18/2018: Encode data?
                spec = &pipepb.FunctionSpec{Urn: URNImpulse}
 
        case graph.ParDo:
@@ -315,7 +313,6 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) []string {
                if edge.Edge.DoFn.IsSplittable() {
                        payload.RestrictionCoderId = 
m.coders.Add(edge.Edge.RestrictionCoder)
                }
-               transformEnvID = m.addDefaultEnv()
                spec = &pipepb.FunctionSpec{Urn: URNParDo, Payload: 
protox.MustEncode(payload)}
 
        case graph.Combine:
@@ -325,7 +322,6 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) []string {
                                Payload: 
[]byte(mustEncodeMultiEdgeBase64(edge.Edge)),
                        },
                }
-               transformEnvID = m.addDefaultEnv()
                spec = &pipepb.FunctionSpec{Urn: URNParDo, Payload: 
protox.MustEncode(payload)}
 
        case graph.Flatten:
@@ -347,6 +343,11 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) []string 
{
                panic(fmt.Sprintf("Unexpected opcode: %v", edge.Edge.Op))
        }
 
+       var transformEnvID = ""
+       if !(spec.Urn == URNGBK || spec.Urn == URNImpulse) {
+               transformEnvID = m.addDefaultEnv()
+       }
+
        transform := &pipepb.PTransform{
                UniqueName:    edge.Name,
                Spec:          spec,
@@ -413,10 +414,11 @@ func (m *marshaller) expandCoGBK(edge NamedEdge) string {
 
        flattenID := fmt.Sprintf("%v_flatten", id)
        flatten := &pipepb.PTransform{
-               UniqueName: flattenID,
-               Spec:       &pipepb.FunctionSpec{Urn: URNFlatten},
-               Inputs:     inputs,
-               Outputs:    map[string]string{"i0": out},
+               UniqueName:    flattenID,
+               Spec:          &pipepb.FunctionSpec{Urn: URNFlatten},
+               Inputs:        inputs,
+               Outputs:       map[string]string{"i0": out},
+               EnvironmentId: m.addDefaultEnv(),
        }
        m.transforms[flattenID] = flatten
        subtransforms = append(subtransforms, flattenID)
@@ -468,6 +470,7 @@ func (m *marshaller) expandCoGBK(edge NamedEdge) string {
        m.transforms[cogbkID] = &pipepb.PTransform{
                UniqueName:    edge.Name,
                Subtransforms: subtransforms,
+               EnvironmentId: m.addDefaultEnv(),
        }
        return cogbkID
 }
@@ -632,6 +635,7 @@ func (m *marshaller) expandReshuffle(edge NamedEdge) string 
{
                Spec: &pipepb.FunctionSpec{
                        Urn: URNReshuffle,
                },
+               EnvironmentId: m.addDefaultEnv(),
        }
        return reshuffleID
 }
diff --git a/sdks/python/apache_beam/pipeline.py 
b/sdks/python/apache_beam/pipeline.py
index bb55486..e6aef62 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -119,20 +119,16 @@ class Pipeline(object):
   should be used to designate new names
   (e.g. ``input | "label" >> my_tranform``).
   """
-
-  # TODO: BEAM-9001 - set environment ID in all transforms and allow runners to
-  # override.
   @classmethod
-  def sdk_transforms_with_environment(cls):
+  def runner_implemented_transforms(cls):
     # type: () -> FrozenSet[str]
-    from apache_beam.runners.portability.fn_api_runner import translations
-    sets = [
-        translations.PAR_DO_URNS,
-        translations.COMBINE_URNS,
-        frozenset([common_urns.primitives.ASSIGN_WINDOWS.urn])
-    ]
-    result = frozenset()  # type: FrozenSet[str]
-    return result.union(*sets)
+
+    # This set should only contain transforms which are required to be
+    # implemented by a runner.
+    return frozenset([
+        common_urns.primitives.GROUP_BY_KEY.urn,
+        common_urns.primitives.IMPULSE.urn,
+    ])
 
   def __init__(self, runner=None, options=None, argv=None):
     # type: (Optional[Union[str, PipelineRunner]], Optional[PipelineOptions], 
Optional[List[str]]) -> None
@@ -1117,8 +1113,8 @@ class AppliedPTransform(object):
     transform_spec = transform_to_runner_api(self.transform, context)
     environment_id = self.environment_id
     transform_urn = transform_spec.urn if transform_spec else None
-    if (not environment_id and transform_urn and
-        (transform_urn in Pipeline.sdk_transforms_with_environment())):
+    if (not environment_id and
+        (transform_urn not in Pipeline.runner_implemented_transforms())):
       environment_id = context.default_environment_id()
 
     def _maybe_preserve_tag(new_tag, pc, input_tags_to_preserve):

Reply via email to