[ https://issues.apache.org/jira/browse/BEAM-5649?focusedWorklogId=154334&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-154334 ]
ASF GitHub Bot logged work on BEAM-5649: ---------------------------------------- Author: ASF GitHub Bot Created on: 15/Oct/18 15:08 Start Date: 15/Oct/18 15:08 Worklog Time Spent: 10m Work Description: robertwb closed pull request #6689: [BEAM-5649] Conditionally remove CREATE_VIEW from java generated protos. URL: https://github.com/apache/beam/pull/6689 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java index 57f4906df3a..a71ceeda2f9 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java @@ -19,11 +19,16 @@ package org.apache.beam.runners.core.construction; import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Iterables; import com.google.common.collect.ListMultimap; import java.io.IOException; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.construction.graph.PipelineValidator; import org.apache.beam.sdk.Pipeline; @@ -38,8 +43,19 @@ return toProto(pipeline, SdkComponents.create(pipeline.getOptions())); } + public static RunnerApi.Pipeline toProto(Pipeline pipeline, boolean useDeprecatedViewTransforms) { + return toProto( + pipeline, SdkComponents.create(pipeline.getOptions()), useDeprecatedViewTransforms); + } + + public static RunnerApi.Pipeline toProto(Pipeline pipeline, SdkComponents components) { + return toProto(pipeline, components, false); + } + public static RunnerApi.Pipeline toProto( - final Pipeline pipeline, final SdkComponents components) { + final Pipeline pipeline, + final SdkComponents components, + boolean useDeprecatedViewTransforms) { final Collection<String> rootIds = new HashSet<>(); pipeline.traverseTopologically( new PipelineVisitor.Defaults() { @@ -81,8 +97,83 @@ public void visitPrimitiveTransform(Node node) { .setComponents(components.toComponents()) .addAllRootTransformIds(rootIds) .build(); + if (!useDeprecatedViewTransforms) { + // TODO(JIRA-5649): Don't even emit these transforms in the generated protos. + res = elideDeprecatedViews(res); + } // Validate that translation didn't produce an invalid pipeline. PipelineValidator.validate(res); return res; } + + private static RunnerApi.Pipeline elideDeprecatedViews(RunnerApi.Pipeline pipeline) { + // Record data on CreateView operations. + Set<String> viewTransforms = new HashSet<>(); + Map<String, String> viewOutputsToInputs = new HashMap<>(); + pipeline + .getComponents() + .getTransformsMap() + .forEach( + (transformId, transform) -> { + if (transform + .getSpec() + .getUrn() + .equals(PTransformTranslation.CREATE_VIEW_TRANSFORM_URN)) { + viewTransforms.add(transformId); + viewOutputsToInputs.put( + Iterables.getOnlyElement(transform.getOutputsMap().values()), + Iterables.getOnlyElement(transform.getInputsMap().values())); + } + }); + // Fix up view references. + Map<String, RunnerApi.PTransform> newTransforms = new HashMap<>(); + pipeline + .getComponents() + .getTransformsMap() + .forEach( + (transformId, transform) -> { + RunnerApi.PTransform.Builder transformBuilder = transform.toBuilder(); + transform + .getInputsMap() + .forEach( + (key, value) -> { + if (viewOutputsToInputs.containsKey(value)) { + transformBuilder.putInputs(key, viewOutputsToInputs.get(value)); + } + }); + transform + .getOutputsMap() + .forEach( + (key, value) -> { + if (viewOutputsToInputs.containsKey(value)) { + transformBuilder.putOutputs(key, viewOutputsToInputs.get(value)); + } + }); + // Unfortunately transformBuilder.getSubtransformsList().removeAll(viewTransforms) + // throws UnsupportedOperationException. + transformBuilder.clearSubtransforms(); + transformBuilder.addAllSubtransforms( + transform + .getSubtransformsList() + .stream() + .filter(id -> !viewTransforms.contains(id)) + .collect(Collectors.toList())); + newTransforms.put(transformId, transformBuilder.build()); + }); + + RunnerApi.Pipeline.Builder newPipeline = pipeline.toBuilder(); + // Replace transforms. + newPipeline.getComponentsBuilder().putAllTransforms(newTransforms); + // Remove CreateView operation components. + viewTransforms.forEach(newPipeline.getComponentsBuilder()::removeTransforms); + viewOutputsToInputs.keySet().forEach(newPipeline.getComponentsBuilder()::removePcollections); + newPipeline.clearRootTransformIds(); + newPipeline.addAllRootTransformIds( + pipeline + .getRootTransformIdsList() + .stream() + .filter(id -> !viewTransforms.contains(id)) + .collect(Collectors.toList())); + return newPipeline.build(); + } } diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineTranslationTest.java index 971bcb1378a..1537d77d0d0 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineTranslationTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineTranslationTest.java @@ -110,20 +110,30 @@ public void process(ProcessContext c) { @Test public void testProtoDirectly() { - final RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline); - pipeline.traverseTopologically(new PipelineProtoVerificationVisitor(pipelineProto)); + final RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, false); + pipeline.traverseTopologically(new PipelineProtoVerificationVisitor(pipelineProto, false)); + } + + @Test + public void testProtoDirectlyWithViewTransform() { + final RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, true); + pipeline.traverseTopologically(new PipelineProtoVerificationVisitor(pipelineProto, true)); } private static class PipelineProtoVerificationVisitor extends PipelineVisitor.Defaults { private final RunnerApi.Pipeline pipelineProto; + private boolean useDeprecatedViewTransforms; Set<Node> transforms; Set<PCollection<?>> pcollections; Set<Equivalence.Wrapper<? extends Coder<?>>> coders; Set<WindowingStrategy<?, ?>> windowingStrategies; + int missingViewTransforms = 0; - public PipelineProtoVerificationVisitor(RunnerApi.Pipeline pipelineProto) { + public PipelineProtoVerificationVisitor( + RunnerApi.Pipeline pipelineProto, boolean useDeprecatedViewTransforms) { this.pipelineProto = pipelineProto; + this.useDeprecatedViewTransforms = useDeprecatedViewTransforms; transforms = new HashSet<>(); pcollections = new HashSet<>(); coders = new HashSet<>(); @@ -136,11 +146,11 @@ public void leaveCompositeTransform(Node node) { assertThat( "Unexpected number of PTransforms", pipelineProto.getComponents().getTransformsCount(), - equalTo(transforms.size())); + equalTo(transforms.size() - missingViewTransforms)); assertThat( "Unexpected number of PCollections", pipelineProto.getComponents().getPcollectionsCount(), - equalTo(pcollections.size())); + equalTo(pcollections.size() - missingViewTransforms)); assertThat( "Unexpected number of Coders", pipelineProto.getComponents().getCodersCount(), @@ -167,6 +177,11 @@ public void leaveCompositeTransform(Node node) { @Override public void visitPrimitiveTransform(Node node) { transforms.add(node); + if (!useDeprecatedViewTransforms + && PTransformTranslation.CREATE_VIEW_TRANSFORM_URN.equals( + PTransformTranslation.urnForTransformOrNull(node.getTransform()))) { + missingViewTransforms += 1; + } } @Override diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java index e67020d42f0..bb3a8903100 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java @@ -140,13 +140,6 @@ public static FlinkBatchPortablePipelineTranslator createTranslator() { translatorMap.put( PTransformTranslation.RESHUFFLE_URN, FlinkBatchPortablePipelineTranslator::translateReshuffle); - translatorMap.put( - PTransformTranslation.CREATE_VIEW_TRANSFORM_URN, - // https://issues.apache.org/jira/browse/BEAM-5649 - // Need to support this via a NOOP until the primitive is removed - (PTransformNode transform, - RunnerApi.Pipeline pipeline, - BatchTranslationContext context) -> {}); return new FlinkBatchPortablePipelineTranslator(translatorMap.build()); } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java index be638331c19..42b9c1114a7 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java @@ -170,12 +170,6 @@ public StreamExecutionEnvironment getExecutionEnvironment() { translatorMap.put(ExecutableStage.URN, this::translateExecutableStage); translatorMap.put(PTransformTranslation.RESHUFFLE_URN, this::translateReshuffle); - translatorMap.put( - // https://issues.apache.org/jira/browse/BEAM-5649 - // Need to support this via a NOOP until the primitive is removed - PTransformTranslation.CREATE_VIEW_TRANSFORM_URN, - (String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext context) -> {}); - this.urnToTransformTranslator = translatorMap.build(); } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index bcb4a4efeaf..6ca0757a2bf 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -161,7 +161,7 @@ public JobSpecification translate( // Capture the sdkComponents for look up during step translations SdkComponents sdkComponents = SdkComponents.create(); sdkComponents.registerEnvironment(Environments.JAVA_SDK_HARNESS_ENVIRONMENT); - RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents); + RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true); LOG.debug("Portable pipeline proto:\n{}", TextFormat.printToString(pipelineProto)); diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java index 5b2ee9ccee9..7f8fd596bc6 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java @@ -322,7 +322,7 @@ public void testBasicWithSideInputsAndOutputs() throws Exception { .withOutputTags(mainOutput, TupleTagList.of(additionalOutput))); SdkComponents sdkComponents = SdkComponents.create(p.getOptions()); - RunnerApi.Pipeline pProto = PipelineTranslation.toProto(p, sdkComponents); + RunnerApi.Pipeline pProto = PipelineTranslation.toProto(p, sdkComponents, true); String inputPCollectionId = sdkComponents.registerPCollection(valuePCollection); String outputPCollectionId = sdkComponents.registerPCollection(outputPCollection.get(mainOutput)); @@ -445,7 +445,7 @@ public void testSideInputIsAccessibleForDownstreamCallers() throws Exception { .withSideInputs(iterableSideInputView)); SdkComponents sdkComponents = SdkComponents.create(p.getOptions()); - RunnerApi.Pipeline pProto = PipelineTranslation.toProto(p, sdkComponents); + RunnerApi.Pipeline pProto = PipelineTranslation.toProto(p, sdkComponents, true); String inputPCollectionId = sdkComponents.registerPCollection(valuePCollection); String outputPCollectionId = sdkComponents.registerPCollection(outputPCollection); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 154334) Time Spent: 50m (was: 40m) > Remove deprecated primitive CREATE_VIEW transform from Runner API > ----------------------------------------------------------------- > > Key: BEAM-5649 > URL: https://issues.apache.org/jira/browse/BEAM-5649 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core > Reporter: Maximilian Michels > Priority: Minor > Labels: portability > Time Spent: 50m > Remaining Estimate: 0h > > The deprecated {{CREATE_VIEW}} transform is still generated as part of the > Java SDK pipeline construction but not as part of the Python SDK. We had > previously removed support for the transform in the Portable FlinkRunner > because end-to-end tests were only run with Python. Since the Java SDK still > generates the transform we've re-added support for it but would like to > eventually get it removed. > This issue tracks removal of the transform from the Runner API. The transform > can stay in the non-portable Runner pipeline translation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)