Repository: beam Updated Branches: refs/heads/master 1cc6dc120 -> 6d64c6ec1
Add CreatePCollectionView translation Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c3b036a2 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c3b036a2 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c3b036a2 Branch: refs/heads/master Commit: c3b036a243c768546f0273e22fb44eaa2fcfb245 Parents: ae7bc1d Author: Kenneth Knowles <[email protected]> Authored: Thu May 25 06:56:23 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Mon Jun 5 19:48:27 2017 -0700 ---------------------------------------------------------------------- .../CreatePCollectionViewTranslation.java | 126 +++++++++++++++++ .../construction/PTransformTranslation.java | 10 +- .../CreatePCollectionViewTranslationTest.java | 136 +++++++++++++++++++ 3 files changed, 270 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/c3b036a2/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java new file mode 100644 index 0000000..aa24909 --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.service.AutoService; +import com.google.protobuf.Any; +import com.google.protobuf.ByteString; +import com.google.protobuf.BytesValue; +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.View.CreatePCollectionView; +import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; + +/** + * Utility methods for translating a {@link View} transforms to and from {@link RunnerApi} + * representations. + * + * @deprecated this should generally be done as part of {@link ParDo} translation, or moved into a + * dedicated runners-core-construction auxiliary class + */ +@Deprecated +public class CreatePCollectionViewTranslation { + + /** + * @deprecated Since {@link CreatePCollectionView} is not a part of the Beam model, there is no + * SDK-agnostic specification. Using this method means your runner is tied to Java. + */ + @Deprecated + public static <ElemT, ViewT> PCollectionView<ViewT> getView( + AppliedPTransform< + PCollection<ElemT>, PCollectionView<ViewT>, + PTransform<PCollection<ElemT>, PCollectionView<ViewT>>> + application) + throws IOException { + + RunnerApi.PTransform transformProto = + PTransformTranslation.toProto( + application, + Collections.<AppliedPTransform<?, ?, ?>>emptyList(), + SdkComponents.create()); + + checkArgument( + PTransformTranslation.CREATE_VIEW_TRANSFORM_URN.equals(transformProto.getSpec().getUrn()), + "Illegal attempt to extract %s from transform %s with name \"%s\" and URN \"%s\"", + PCollectionView.class.getSimpleName(), + application.getTransform(), + application.getFullName(), + transformProto.getSpec().getUrn()); + + return (PCollectionView<ViewT>) + SerializableUtils.deserializeFromByteArray( + transformProto + .getSpec() + .getParameter() + .unpack(BytesValue.class) + .getValue() + .toByteArray(), + PCollectionView.class.getSimpleName()); + } + + @Deprecated + static class CreatePCollectionViewTranslator + implements TransformPayloadTranslator<View.CreatePCollectionView<?, ?>> { + @Override + public String getUrn(View.CreatePCollectionView<?, ?> transform) { + return PTransformTranslation.CREATE_VIEW_TRANSFORM_URN; + } + + @Override + public FunctionSpec translate( + AppliedPTransform<?, ?, View.CreatePCollectionView<?, ?>> transform, + SdkComponents components) { + return FunctionSpec.newBuilder() + .setUrn(getUrn(transform.getTransform())) + .setParameter( + Any.pack( + BytesValue.newBuilder() + .setValue( + ByteString.copyFrom( + SerializableUtils.serializeToByteArray( + transform.getTransform().getView()))) + .build())) + .build(); + } + } + + /** Registers {@link CreatePCollectionViewTranslator}. */ + @AutoService(TransformPayloadTranslatorRegistrar.class) + @Deprecated + public static class Registrar implements TransformPayloadTranslatorRegistrar { + @Override + public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator> + getTransformPayloadTranslators() { + return Collections.singletonMap( + View.CreatePCollectionView.class, new CreatePCollectionViewTranslator()); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c3b036a2/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java index fcbe84b..7c5c593 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java @@ -56,6 +56,9 @@ public class PTransformTranslation { // Less well-known. And where shall these live? public static final String WRITE_FILES_TRANSFORM_URN = "urn:beam:transform:write_files:0.1"; + @Deprecated + public static final String CREATE_VIEW_TRANSFORM_URN = "urn:beam:transform:create_view:v1"; + private static final Map<Class<? extends PTransform>, TransformPayloadTranslator> KNOWN_PAYLOAD_TRANSLATORS = loadTransformPayloadTranslators(); @@ -141,9 +144,11 @@ public class PTransformTranslation { return tag.getId(); } + /** + * Returns the URN for the transform if it is known, otherwise throws. + */ public static String urnForTransform(PTransform<?, ?> transform) { - TransformPayloadTranslator translator = - KNOWN_PAYLOAD_TRANSLATORS.get(transform.getClass()); + TransformPayloadTranslator translator = KNOWN_PAYLOAD_TRANSLATORS.get(transform.getClass()); if (translator == null) { throw new IllegalStateException( String.format("No translator known for %s", transform.getClass().getName())); @@ -158,6 +163,7 @@ public class PTransformTranslation { */ public interface TransformPayloadTranslator<T extends PTransform<?, ?>> { String getUrn(T transform); + FunctionSpec translate(AppliedPTransform<?, ?, T> application, SdkComponents components) throws IOException; } http://git-wip-us.apache.org/repos/asf/beam/blob/c3b036a2/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java new file mode 100644 index 0000000..0d209a0 --- /dev/null +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import static org.junit.Assert.assertThat; + +import com.google.common.collect.ImmutableList; +import com.google.protobuf.BytesValue; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.ParDoPayload; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.View.CreatePCollectionView; +import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PCollectionViews; +import org.hamcrest.Matchers; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; +import org.junit.runners.Suite; + +/** Tests for {@link CreatePCollectionViewTranslation}. */ +@RunWith(Suite.class) [email protected]({ + CreatePCollectionViewTranslationTest.TestCreatePCollectionViewPayloadTranslation.class, +}) +public class CreatePCollectionViewTranslationTest { + + /** Tests for translating various {@link ParDo} transforms to/from {@link ParDoPayload} protos. */ + @RunWith(Parameterized.class) + public static class TestCreatePCollectionViewPayloadTranslation { + + // Two parameters suffices because the nature of the serialization/deserialization of + // the view is not what is being tested; it is just important that the round trip + // is not vacuous. + @Parameters(name = "{index}: {0}") + public static Iterable<CreatePCollectionView<?, ?>> data() { + return ImmutableList.<CreatePCollectionView<?, ?>>of( + CreatePCollectionView.of( + PCollectionViews.singletonView( + testPCollection, + testPCollection.getWindowingStrategy(), + false, + null, + testPCollection.getCoder())), + CreatePCollectionView.of( + PCollectionViews.listView( + testPCollection, + testPCollection.getWindowingStrategy(), + testPCollection.getCoder()))); + } + + @Parameter(0) + public CreatePCollectionView<?, ?> createViewTransform; + + public static TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); + + private static final PCollection<String> testPCollection = p.apply(Create.of("one")); + + @Test + public void testEncodedProto() throws Exception { + SdkComponents components = SdkComponents.create(); + components.registerPCollection(testPCollection); + + AppliedPTransform<?, ?, ?> appliedPTransform = + AppliedPTransform.of( + "foo", + testPCollection.expand(), + createViewTransform.getView().expand(), + createViewTransform, + p); + + FunctionSpec payload = PTransformTranslation.toProto(appliedPTransform, components).getSpec(); + + // Checks that the payload is what it should be + PCollectionView<?> deserializedView = + (PCollectionView<?>) + SerializableUtils.deserializeFromByteArray( + payload.getParameter().unpack(BytesValue.class).getValue().toByteArray(), + PCollectionView.class.getSimpleName()); + + assertThat( + deserializedView, Matchers.<PCollectionView<?>>equalTo(createViewTransform.getView())); + } + + @Test + public void testExtractionDirectFromTransform() throws Exception { + SdkComponents components = SdkComponents.create(); + components.registerPCollection(testPCollection); + + AppliedPTransform<?, ?, ?> appliedPTransform = + AppliedPTransform.of( + "foo", + testPCollection.expand(), + createViewTransform.getView().expand(), + createViewTransform, + p); + + CreatePCollectionViewTranslation.getView((AppliedPTransform) appliedPTransform); + + FunctionSpec payload = PTransformTranslation.toProto(appliedPTransform, components).getSpec(); + + // Checks that the payload is what it should be + PCollectionView<?> deserializedView = + (PCollectionView<?>) + SerializableUtils.deserializeFromByteArray( + payload.getParameter().unpack(BytesValue.class).getValue().toByteArray(), + PCollectionView.class.getSimpleName()); + + assertThat( + deserializedView, Matchers.<PCollectionView<?>>equalTo(createViewTransform.getView())); + } + } +}
