[ 
https://issues.apache.org/jira/browse/BEAM-5649?focusedWorklogId=154334&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-154334
 ]

ASF GitHub Bot logged work on BEAM-5649:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 15/Oct/18 15:08
            Start Date: 15/Oct/18 15:08
    Worklog Time Spent: 10m 
      Work Description: robertwb closed pull request #6689: [BEAM-5649] 
Conditionally remove CREATE_VIEW from java generated protos.
URL: https://github.com/apache/beam/pull/6689
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
index 57f4906df3a..a71ceeda2f9 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
@@ -19,11 +19,16 @@
 package org.apache.beam.runners.core.construction;
 
 import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.ListMultimap;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.construction.graph.PipelineValidator;
 import org.apache.beam.sdk.Pipeline;
@@ -38,8 +43,19 @@
     return toProto(pipeline, SdkComponents.create(pipeline.getOptions()));
   }
 
+  public static RunnerApi.Pipeline toProto(Pipeline pipeline, boolean 
useDeprecatedViewTransforms) {
+    return toProto(
+        pipeline, SdkComponents.create(pipeline.getOptions()), 
useDeprecatedViewTransforms);
+  }
+
+  public static RunnerApi.Pipeline toProto(Pipeline pipeline, SdkComponents 
components) {
+    return toProto(pipeline, components, false);
+  }
+
   public static RunnerApi.Pipeline toProto(
-      final Pipeline pipeline, final SdkComponents components) {
+      final Pipeline pipeline,
+      final SdkComponents components,
+      boolean useDeprecatedViewTransforms) {
     final Collection<String> rootIds = new HashSet<>();
     pipeline.traverseTopologically(
         new PipelineVisitor.Defaults() {
@@ -81,8 +97,83 @@ public void visitPrimitiveTransform(Node node) {
             .setComponents(components.toComponents())
             .addAllRootTransformIds(rootIds)
             .build();
+    if (!useDeprecatedViewTransforms) {
+      // TODO(JIRA-5649): Don't even emit these transforms in the generated 
protos.
+      res = elideDeprecatedViews(res);
+    }
     // Validate that translation didn't produce an invalid pipeline.
     PipelineValidator.validate(res);
     return res;
   }
+
+  private static RunnerApi.Pipeline elideDeprecatedViews(RunnerApi.Pipeline 
pipeline) {
+    // Record data on CreateView operations.
+    Set<String> viewTransforms = new HashSet<>();
+    Map<String, String> viewOutputsToInputs = new HashMap<>();
+    pipeline
+        .getComponents()
+        .getTransformsMap()
+        .forEach(
+            (transformId, transform) -> {
+              if (transform
+                  .getSpec()
+                  .getUrn()
+                  .equals(PTransformTranslation.CREATE_VIEW_TRANSFORM_URN)) {
+                viewTransforms.add(transformId);
+                viewOutputsToInputs.put(
+                    
Iterables.getOnlyElement(transform.getOutputsMap().values()),
+                    
Iterables.getOnlyElement(transform.getInputsMap().values()));
+              }
+            });
+    // Fix up view references.
+    Map<String, RunnerApi.PTransform> newTransforms = new HashMap<>();
+    pipeline
+        .getComponents()
+        .getTransformsMap()
+        .forEach(
+            (transformId, transform) -> {
+              RunnerApi.PTransform.Builder transformBuilder = 
transform.toBuilder();
+              transform
+                  .getInputsMap()
+                  .forEach(
+                      (key, value) -> {
+                        if (viewOutputsToInputs.containsKey(value)) {
+                          transformBuilder.putInputs(key, 
viewOutputsToInputs.get(value));
+                        }
+                      });
+              transform
+                  .getOutputsMap()
+                  .forEach(
+                      (key, value) -> {
+                        if (viewOutputsToInputs.containsKey(value)) {
+                          transformBuilder.putOutputs(key, 
viewOutputsToInputs.get(value));
+                        }
+                      });
+              // Unfortunately 
transformBuilder.getSubtransformsList().removeAll(viewTransforms)
+              // throws UnsupportedOperationException.
+              transformBuilder.clearSubtransforms();
+              transformBuilder.addAllSubtransforms(
+                  transform
+                      .getSubtransformsList()
+                      .stream()
+                      .filter(id -> !viewTransforms.contains(id))
+                      .collect(Collectors.toList()));
+              newTransforms.put(transformId, transformBuilder.build());
+            });
+
+    RunnerApi.Pipeline.Builder newPipeline = pipeline.toBuilder();
+    // Replace transforms.
+    newPipeline.getComponentsBuilder().putAllTransforms(newTransforms);
+    // Remove CreateView operation components.
+    
viewTransforms.forEach(newPipeline.getComponentsBuilder()::removeTransforms);
+    
viewOutputsToInputs.keySet().forEach(newPipeline.getComponentsBuilder()::removePcollections);
+    newPipeline.clearRootTransformIds();
+    newPipeline.addAllRootTransformIds(
+        pipeline
+            .getRootTransformIdsList()
+            .stream()
+            .filter(id -> !viewTransforms.contains(id))
+            .collect(Collectors.toList()));
+    return newPipeline.build();
+  }
 }
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineTranslationTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineTranslationTest.java
index 971bcb1378a..1537d77d0d0 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineTranslationTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineTranslationTest.java
@@ -110,20 +110,30 @@ public void process(ProcessContext c) {
 
   @Test
   public void testProtoDirectly() {
-    final RunnerApi.Pipeline pipelineProto = 
PipelineTranslation.toProto(pipeline);
-    pipeline.traverseTopologically(new 
PipelineProtoVerificationVisitor(pipelineProto));
+    final RunnerApi.Pipeline pipelineProto = 
PipelineTranslation.toProto(pipeline, false);
+    pipeline.traverseTopologically(new 
PipelineProtoVerificationVisitor(pipelineProto, false));
+  }
+
+  @Test
+  public void testProtoDirectlyWithViewTransform() {
+    final RunnerApi.Pipeline pipelineProto = 
PipelineTranslation.toProto(pipeline, true);
+    pipeline.traverseTopologically(new 
PipelineProtoVerificationVisitor(pipelineProto, true));
   }
 
   private static class PipelineProtoVerificationVisitor extends 
PipelineVisitor.Defaults {
 
     private final RunnerApi.Pipeline pipelineProto;
+    private boolean useDeprecatedViewTransforms;
     Set<Node> transforms;
     Set<PCollection<?>> pcollections;
     Set<Equivalence.Wrapper<? extends Coder<?>>> coders;
     Set<WindowingStrategy<?, ?>> windowingStrategies;
+    int missingViewTransforms = 0;
 
-    public PipelineProtoVerificationVisitor(RunnerApi.Pipeline pipelineProto) {
+    public PipelineProtoVerificationVisitor(
+        RunnerApi.Pipeline pipelineProto, boolean useDeprecatedViewTransforms) 
{
       this.pipelineProto = pipelineProto;
+      this.useDeprecatedViewTransforms = useDeprecatedViewTransforms;
       transforms = new HashSet<>();
       pcollections = new HashSet<>();
       coders = new HashSet<>();
@@ -136,11 +146,11 @@ public void leaveCompositeTransform(Node node) {
         assertThat(
             "Unexpected number of PTransforms",
             pipelineProto.getComponents().getTransformsCount(),
-            equalTo(transforms.size()));
+            equalTo(transforms.size() - missingViewTransforms));
         assertThat(
             "Unexpected number of PCollections",
             pipelineProto.getComponents().getPcollectionsCount(),
-            equalTo(pcollections.size()));
+            equalTo(pcollections.size() - missingViewTransforms));
         assertThat(
             "Unexpected number of Coders",
             pipelineProto.getComponents().getCodersCount(),
@@ -167,6 +177,11 @@ public void leaveCompositeTransform(Node node) {
     @Override
     public void visitPrimitiveTransform(Node node) {
       transforms.add(node);
+      if (!useDeprecatedViewTransforms
+          && PTransformTranslation.CREATE_VIEW_TRANSFORM_URN.equals(
+              
PTransformTranslation.urnForTransformOrNull(node.getTransform()))) {
+        missingViewTransforms += 1;
+      }
     }
 
     @Override
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
index e67020d42f0..bb3a8903100 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
@@ -140,13 +140,6 @@ public static FlinkBatchPortablePipelineTranslator 
createTranslator() {
     translatorMap.put(
         PTransformTranslation.RESHUFFLE_URN,
         FlinkBatchPortablePipelineTranslator::translateReshuffle);
-    translatorMap.put(
-        PTransformTranslation.CREATE_VIEW_TRANSFORM_URN,
-        // https://issues.apache.org/jira/browse/BEAM-5649
-        // Need to support this via a NOOP until the primitive is removed
-        (PTransformNode transform,
-            RunnerApi.Pipeline pipeline,
-            BatchTranslationContext context) -> {});
     return new FlinkBatchPortablePipelineTranslator(translatorMap.build());
   }
 
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
index be638331c19..42b9c1114a7 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
@@ -170,12 +170,6 @@ public StreamExecutionEnvironment 
getExecutionEnvironment() {
     translatorMap.put(ExecutableStage.URN, this::translateExecutableStage);
     translatorMap.put(PTransformTranslation.RESHUFFLE_URN, 
this::translateReshuffle);
 
-    translatorMap.put(
-        // https://issues.apache.org/jira/browse/BEAM-5649
-        // Need to support this via a NOOP until the primitive is removed
-        PTransformTranslation.CREATE_VIEW_TRANSFORM_URN,
-        (String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext 
context) -> {});
-
     this.urnToTransformTranslator = translatorMap.build();
   }
 
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index bcb4a4efeaf..6ca0757a2bf 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -161,7 +161,7 @@ public JobSpecification translate(
     // Capture the sdkComponents for look up during step translations
     SdkComponents sdkComponents = SdkComponents.create();
     
sdkComponents.registerEnvironment(Environments.JAVA_SDK_HARNESS_ENVIRONMENT);
-    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, 
sdkComponents);
+    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, 
sdkComponents, true);
 
     LOG.debug("Portable pipeline proto:\n{}", 
TextFormat.printToString(pipelineProto));
 
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
index 5b2ee9ccee9..7f8fd596bc6 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
@@ -322,7 +322,7 @@ public void testBasicWithSideInputsAndOutputs() throws 
Exception {
                 .withOutputTags(mainOutput, 
TupleTagList.of(additionalOutput)));
 
     SdkComponents sdkComponents = SdkComponents.create(p.getOptions());
-    RunnerApi.Pipeline pProto = PipelineTranslation.toProto(p, sdkComponents);
+    RunnerApi.Pipeline pProto = PipelineTranslation.toProto(p, sdkComponents, 
true);
     String inputPCollectionId = 
sdkComponents.registerPCollection(valuePCollection);
     String outputPCollectionId =
         sdkComponents.registerPCollection(outputPCollection.get(mainOutput));
@@ -445,7 +445,7 @@ public void testSideInputIsAccessibleForDownstreamCallers() 
throws Exception {
                 .withSideInputs(iterableSideInputView));
 
     SdkComponents sdkComponents = SdkComponents.create(p.getOptions());
-    RunnerApi.Pipeline pProto = PipelineTranslation.toProto(p, sdkComponents);
+    RunnerApi.Pipeline pProto = PipelineTranslation.toProto(p, sdkComponents, 
true);
     String inputPCollectionId = 
sdkComponents.registerPCollection(valuePCollection);
     String outputPCollectionId = 
sdkComponents.registerPCollection(outputPCollection);
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 154334)
    Time Spent: 50m  (was: 40m)

> Remove deprecated primitive CREATE_VIEW transform from Runner API
> -----------------------------------------------------------------
>
>                 Key: BEAM-5649
>                 URL: https://issues.apache.org/jira/browse/BEAM-5649
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-core
>            Reporter: Maximilian Michels
>            Priority: Minor
>              Labels: portability
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> The deprecated {{CREATE_VIEW}} transform is still generated as part of the 
> Java SDK pipeline construction but not as part of the Python SDK. We had 
> previously removed support for the transform in the Portable FlinkRunner 
> because end-to-end tests were only run with Python. Since the Java SDK still 
> generates the transform we've re-added support for it but would like to 
> eventually get it removed.
> This issue tracks removal of the transform from the Runner API. The transform 
> can stay in the non-portable Runner pipeline translation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to