[
https://issues.apache.org/jira/browse/BEAM-4125?focusedWorklogId=98053&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-98053
]
ASF GitHub Bot logged work on BEAM-4125:
----------------------------------------
Author: ASF GitHub Bot
Created on: 03/May/18 21:59
Start Date: 03/May/18 21:59
Worklog Time Spent: 10m
Work Description: tgroh closed pull request #5172: [BEAM-4125] Add
ProtoOverrides
URL: https://github.com/apache/beam/pull/5172
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ProtoOverrides.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ProtoOverrides.java
new file mode 100644
index 00000000000..cd6bc32650a
--- /dev/null
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ProtoOverrides.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction.graph;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
+import org.apache.beam.model.pipeline.v1.RunnerApi.ComponentsOrBuilder;
+import org.apache.beam.model.pipeline.v1.RunnerApi.MessageWithComponents;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.runners.PTransformOverride;
+
+/**
+ * A way to apply a Proto-based {@link PTransformOverride}.
+ *
+ * <p>This should generally be used to replace runner-executed transforms with
runner-executed
+ * composites and simpler runner-executed primitives. It is generically less
powerful than the
+ * native {@link org.apache.beam.sdk.Pipeline#replaceAll(List)} and more
error-prone, so should only
+ * be used for relatively simple replacements.
+ */
+@Experimental
+public class ProtoOverrides {
+ /**
+ * Update all composites present in the {@code originalPipeline} with an URN
equal to the provided
+ * {@code urn} using the provided {@link TransformReplacement}.
+ */
+ public static Pipeline updateTransform(
+ String urn, Pipeline originalPipeline, TransformReplacement
compositeBuilder) {
+ Components.Builder resultComponents =
originalPipeline.getComponents().toBuilder();
+ for (Map.Entry<String, PTransform> pt :
+ originalPipeline.getComponents().getTransformsMap().entrySet()) {
+ if (pt.getValue().getSpec() != null &&
urn.equals(pt.getValue().getSpec().getUrn())) {
+ MessageWithComponents updated =
+ compositeBuilder.getReplacement(pt.getKey(),
originalPipeline.getComponents());
+ checkArgument(
+
updated.getPtransform().getOutputsMap().equals(pt.getValue().getOutputsMap()),
+ "A %s must produce all of the outputs of the original %s",
+ TransformReplacement.class.getSimpleName(),
+ PTransform.class.getSimpleName());
+ removeSubtransforms(pt.getValue(), resultComponents);
+ resultComponents
+ .mergeFrom(updated.getComponents())
+ .putTransforms(pt.getKey(), updated.getPtransform());
+ }
+ }
+ return
originalPipeline.toBuilder().setComponents(resultComponents).build();
+ }
+
+ /**
+ * Remove all subtransforms of the provided transform recursively.A {@link
PTransform} can be the
+ * subtransform of only one enclosing transform.
+ */
+ private static void removeSubtransforms(PTransform pt, Components.Builder
target) {
+ for (String subtransformId : pt.getSubtransformsList()) {
+ PTransform subtransform = target.getTransformsOrThrow(subtransformId);
+ removeSubtransforms(subtransform, target);
+ target.removeTransforms(subtransformId);
+ // TODO: remove PCollections not produced by 'pt' here.
+ }
+ }
+
+ /**
+ * A Function that takes a transform and the existing components and returns
the new composite
+ * PTransform and additional components.
+ */
+ @FunctionalInterface
+ public interface TransformReplacement {
+ /**
+ * Returns the updated composite structure for the provided {@link
PTransform}.
+ *
+ * <p>The returned {@link MessageWithComponents} must contain a single
{@link PTransform}. The
+ * result {@link Components} will be merged into the existing components,
and the result {@link
+ * PTransform} will be set as a replacement of the original {@link
PTransform}. Notably, this
+ * does not require that the {@code existingComponents} are present in the
returned {@link
+ * MessageWithComponents}.
+ *
+ * <p>Introduced components must not collide with any components in the
existing components.
+ */
+ MessageWithComponents getReplacement(
+ String transformId, ComponentsOrBuilder existingComponents);
+ }
+}
diff --git
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ProtoOverridesTest.java
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ProtoOverridesTest.java
new file mode 100644
index 00000000000..988e4652af1
--- /dev/null
+++
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ProtoOverridesTest.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction.graph;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasKey;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.AccumulationMode.Enum;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Coder;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
+import org.apache.beam.model.pipeline.v1.RunnerApi.ComponentsOrBuilder;
+import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
+import org.apache.beam.model.pipeline.v1.RunnerApi.MessageWithComponents;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
+import org.apache.beam.model.pipeline.v1.RunnerApi.SdkFunctionSpec;
+import org.apache.beam.model.pipeline.v1.RunnerApi.WindowingStrategy;
+import
org.apache.beam.runners.core.construction.graph.ProtoOverrides.TransformReplacement;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link ProtoOverrides}.
+ */
+@RunWith(JUnit4.class)
+public class ProtoOverridesTest {
+ @Test
+ public void replacesOnlyMatching() {
+ RunnerApi.Pipeline p =
+ Pipeline.newBuilder()
+ .addAllRootTransformIds(ImmutableList.of("first", "second"))
+ .setComponents(
+ Components.newBuilder()
+ .putTransforms(
+ "first",
+ PTransform.newBuilder()
+
.setSpec(FunctionSpec.newBuilder().setUrn("beam:first"))
+ .build())
+ .putTransforms(
+ "second",
+ PTransform.newBuilder()
+
.setSpec(FunctionSpec.newBuilder().setUrn("beam:second"))
+ .build())
+ .putPcollections(
+ "intermediatePc",
+
PCollection.newBuilder().setUniqueName("intermediate").build())
+ .putCoders(
+ "coder",
+
Coder.newBuilder().setSpec(SdkFunctionSpec.getDefaultInstance()).build()))
+ .build();
+
+ PTransform secondReplacement =
+ PTransform.newBuilder()
+ .addSubtransforms("second_sub")
+ .setSpec(
+ FunctionSpec.newBuilder()
+ .setUrn("beam:second:replacement")
+ .setPayload(ByteString.copyFrom("foo-bar-baz".getBytes())))
+ .build();
+ WindowingStrategy introducedWS =
+
WindowingStrategy.newBuilder().setAccumulationMode(Enum.ACCUMULATING).build();
+ RunnerApi.Components extraComponents =
+ Components.newBuilder()
+ .putPcollections(
+ "intermediatePc",
+
PCollection.newBuilder().setUniqueName("intermediate_replacement").build())
+ .putWindowingStrategies("new_ws", introducedWS)
+ .putTransforms("second_sub", PTransform.getDefaultInstance())
+ .build();
+
+ Pipeline updated =
+ ProtoOverrides.updateTransform(
+ "beam:second", p, new TestReplacer(secondReplacement,
extraComponents));
+ PTransform updatedSecond =
updated.getComponents().getTransformsOrThrow("second");
+
+ assertThat(updatedSecond, equalTo(secondReplacement));
+ assertThat(
+ updated.getComponents().getWindowingStrategiesOrThrow("new_ws"),
equalTo(introducedWS));
+ assertThat(
+ updated.getComponents().getTransformsOrThrow("second_sub"),
+ equalTo(PTransform.getDefaultInstance()));
+
+ // TODO: This might not be appropriate. Merging in the other direction
might force that callers
+ // are well behaved.
+ assertThat(
+
updated.getComponents().getPcollectionsOrThrow("intermediatePc").getUniqueName(),
+ equalTo("intermediate_replacement"));
+
+ // Assert that the untouched components are unchanged.
+ assertThat(
+ updated.getComponents().getTransformsOrThrow("first"),
+ equalTo(p.getComponents().getTransformsOrThrow("first")));
+ assertThat(
+ updated.getComponents().getCodersOrThrow("coder"),
+ equalTo(p.getComponents().getCodersOrThrow("coder")));
+ assertThat(updated.getRootTransformIdsList(),
equalTo(p.getRootTransformIdsList()));
+ }
+
+ @Test
+ public void replacesMultiple() {
+ RunnerApi.Pipeline p =
+ Pipeline.newBuilder()
+ .addAllRootTransformIds(ImmutableList.of("first", "second"))
+ .setComponents(
+ Components.newBuilder()
+ .putTransforms(
+ "first",
+ PTransform.newBuilder()
+
.setSpec(FunctionSpec.newBuilder().setUrn("beam:first"))
+ .build())
+ .putTransforms(
+ "second",
+ PTransform.newBuilder()
+
.setSpec(FunctionSpec.newBuilder().setUrn("beam:repeated"))
+ .build())
+ .putTransforms(
+ "third",
+ PTransform.newBuilder()
+
.setSpec(FunctionSpec.newBuilder().setUrn("beam:repeated"))
+ .build())
+ .putPcollections(
+ "intermediatePc",
+
PCollection.newBuilder().setUniqueName("intermediate").build())
+ .putCoders(
+ "coder",
+
Coder.newBuilder().setSpec(SdkFunctionSpec.getDefaultInstance()).build()))
+ .build();
+
+ ByteString newPayload = ByteString.copyFrom("foo-bar-baz".getBytes());
+ Pipeline updated =
+ ProtoOverrides.updateTransform(
+ "beam:repeated",
+ p,
+ (transformId, existingComponents) -> {
+ String subtransform = String.format("%s_sub", transformId);
+ return MessageWithComponents.newBuilder()
+ .setPtransform(
+ PTransform.newBuilder()
+ .setSpec(
+ FunctionSpec.newBuilder()
+ .setUrn("beam:repeated:replacement")
+ .setPayload(newPayload))
+ .addSubtransforms(subtransform))
+ .setComponents(
+ Components.newBuilder()
+ .putTransforms(
+ subtransform,
+
PTransform.newBuilder().setUniqueName(subtransform).build()))
+ .build();
+ });
+ PTransform updatedSecond =
updated.getComponents().getTransformsOrThrow("second");
+ PTransform updatedThird =
updated.getComponents().getTransformsOrThrow("third");
+
+ assertThat(updatedSecond,
not(equalTo(p.getComponents().getTransformsOrThrow("second"))));
+ assertThat(updatedThird,
not(equalTo(p.getComponents().getTransformsOrThrow("third"))));
+ assertThat(updatedSecond.getSubtransformsList(), contains("second_sub"));
+ assertThat(updatedSecond.getSpec().getPayload(), equalTo(newPayload));
+ assertThat(updatedThird.getSubtransformsList(), contains("third_sub"));
+ assertThat(updatedThird.getSpec().getPayload(), equalTo(newPayload));
+
+ assertThat(updated.getComponents().getTransformsMap(),
hasKey("second_sub"));
+ assertThat(updated.getComponents().getTransformsMap(),
hasKey("third_sub"));
+ assertThat(
+
updated.getComponents().getTransformsOrThrow("second_sub").getUniqueName(),
+ equalTo("second_sub"));
+ assertThat(
+
updated.getComponents().getTransformsOrThrow("third_sub").getUniqueName(),
+ equalTo("third_sub"));
+ }
+
+ @Test
+ public void replaceExistingCompositeSucceeds() {
+ Pipeline p =
+ Pipeline.newBuilder()
+ .addRootTransformIds("root")
+ .setComponents(
+ Components.newBuilder()
+ .putTransforms(
+ "root",
+ PTransform.newBuilder()
+ .addSubtransforms("sub_first")
+
.setSpec(FunctionSpec.newBuilder().setUrn("beam:composite"))
+ .build())
+ .putTransforms(
+ "sub_first",
+ PTransform.newBuilder()
+
.setSpec(FunctionSpec.newBuilder().setUrn("beam:inner"))
+ .build()))
+ .build();
+
+ Pipeline pipeline =
+ ProtoOverrides.updateTransform(
+ "beam:composite",
+ p,
+ new TestReplacer(
+ PTransform.newBuilder()
+ .addSubtransforms("foo")
+ .addSubtransforms("bar")
+ .setSpec(
+ FunctionSpec.getDefaultInstance()
+ .newBuilderForType()
+ .setUrn("beam:composite"))
+ .build(),
+ Components.getDefaultInstance()));
+ assertThat(
+
pipeline.getComponents().getTransformsOrThrow("root").getSpec().getUrn(),
+ equalTo("beam:composite"));
+ assertThat(
+
pipeline.getComponents().getTransformsOrThrow("root").getSubtransformsList(),
+ contains("foo", "bar"));
+ }
+
+ private static class TestReplacer implements TransformReplacement {
+ private final PTransform extraTransform;
+ private final Components extraComponents;
+
+ private TestReplacer(PTransform extraTransform, Components
extraComponents) {
+ this.extraTransform = extraTransform;
+ this.extraComponents = extraComponents;
+ }
+
+ @Override
+ public MessageWithComponents getReplacement(
+ String transformId, ComponentsOrBuilder existingComponents) {
+ return MessageWithComponents.newBuilder()
+ .setPtransform(extraTransform)
+ .setComponents(extraComponents)
+ .build();
+ }
+ }
+}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 98053)
Time Spent: 2h 20m (was: 2h 10m)
> Add a library to manipulate the proto representation of a pipeline
> ------------------------------------------------------------------
>
> Key: BEAM-4125
> URL: https://issues.apache.org/jira/browse/BEAM-4125
> Project: Beam
> Issue Type: New Feature
> Components: runner-core
> Reporter: Thomas Groh
> Assignee: Thomas Groh
> Priority: Major
> Labels: portability
> Fix For: Not applicable
>
> Time Spent: 2h 20m
> Remaining Estimate: 0h
>
> This is important for a transform which includes in-environment transforms
> (such as a lifted Combine), or for runners which use the beam representation
> as their internal representation (such as the directrunner)
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)