Repository: beam Updated Branches: refs/heads/master 462335caf -> 0cba43ee2
Add TestStream translation Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0cedc61f Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0cedc61f Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0cedc61f Branch: refs/heads/master Commit: 0cedc61ffb59a08a4b5205a5a224fd9fa906f7a7 Parents: 47cea78 Author: Kenneth Knowles <k...@google.com> Authored: Tue May 30 14:41:46 2017 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Fri Jun 2 10:06:52 2017 -0700 ---------------------------------------------------------------------- .../construction/PTransformTranslation.java | 1 + .../construction/TestStreamTranslation.java | 156 +++++++++++++++++++ .../construction/TestStreamTranslationTest.java | 129 +++++++++++++++ 3 files changed, 286 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/0cedc61f/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java index b2f06ac..fd3f9f3 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java @@ -50,6 +50,7 @@ public class PTransformTranslation { public static final String GROUP_BY_KEY_TRANSFORM_URN = "urn:beam:transform:groupbykey:v1"; public static final String READ_TRANSFORM_URN = "urn:beam:transform:read:v1"; public static final String WINDOW_TRANSFORM_URN = "urn:beam:transform:window:v1"; + public static final String TEST_STREAM_TRANSFORM_URN = "urn:beam:transform:teststream:v1"; // Less well-known. And where shall these live? public static final String WRITE_FILES_TRANSFORM_URN = "urn:beam:transform:write_files:0.1"; http://git-wip-us.apache.org/repos/asf/beam/blob/0cedc61f/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java new file mode 100644 index 0000000..90e6304 --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import com.google.auto.service.AutoService; +import com.google.protobuf.Any; +import com.google.protobuf.ByteString; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.values.TimestampedValue; +import org.joda.time.Duration; +import org.joda.time.Instant; + +/** + * Utility methods for translating a {@link TestStream} to and from {@link RunnerApi} + * representations. + */ +public class TestStreamTranslation { + + static <T> RunnerApi.TestStreamPayload testStreamToPayload( + TestStream<T> transform, SdkComponents components) throws IOException { + String coderId = components.registerCoder(transform.getValueCoder()); + + RunnerApi.TestStreamPayload.Builder builder = + RunnerApi.TestStreamPayload.newBuilder().setCoderId(coderId); + + for (TestStream.Event<T> event : transform.getEvents()) { + builder.addEvents(toProto(event, transform.getValueCoder())); + } + + return builder.build(); + } + + static <T> RunnerApi.TestStreamPayload.Event toProto(TestStream.Event<T> event, Coder<T> coder) + throws IOException { + switch (event.getType()) { + case WATERMARK: + return RunnerApi.TestStreamPayload.Event.newBuilder() + .setWatermarkEvent( + RunnerApi.TestStreamPayload.Event.AdvanceWatermark.newBuilder() + .setNewWatermark( + ((TestStream.WatermarkEvent<T>) event).getWatermark().getMillis())) + .build(); + + case PROCESSING_TIME: + return RunnerApi.TestStreamPayload.Event.newBuilder() + .setProcessingTimeEvent( + RunnerApi.TestStreamPayload.Event.AdvanceProcessingTime.newBuilder() + .setAdvanceDuration( + ((TestStream.ProcessingTimeEvent<T>) event) + .getProcessingTimeAdvance() + .getMillis())) + .build(); + + case ELEMENT: + RunnerApi.TestStreamPayload.Event.AddElements.Builder builder = + RunnerApi.TestStreamPayload.Event.AddElements.newBuilder(); + for (TimestampedValue<T> element : ((TestStream.ElementEvent<T>) event).getElements()) { + builder.addElements( + RunnerApi.TestStreamPayload.TimestampedElement.newBuilder() + .setTimestamp(element.getTimestamp().getMillis()) + .setEncodedElement( + ByteString.copyFrom( + CoderUtils.encodeToByteArray(coder, element.getValue())))); + } + return RunnerApi.TestStreamPayload.Event.newBuilder().setElementEvent(builder).build(); + default: + throw new IllegalArgumentException( + String.format( + "Unsupported type of %s: %s", + TestStream.Event.class.getCanonicalName(), event.getType())); + } + } + + static <T> TestStream.Event<T> fromProto( + RunnerApi.TestStreamPayload.Event protoEvent, Coder<T> coder) throws IOException { + switch (protoEvent.getEventCase()) { + case WATERMARK_EVENT: + return TestStream.WatermarkEvent.advanceTo( + new Instant(protoEvent.getWatermarkEvent().getNewWatermark())); + case PROCESSING_TIME_EVENT: + return TestStream.ProcessingTimeEvent.advanceBy( + Duration.millis(protoEvent.getProcessingTimeEvent().getAdvanceDuration())); + case ELEMENT_EVENT: + List<TimestampedValue<T>> decodedElements = new ArrayList<>(); + for (RunnerApi.TestStreamPayload.TimestampedElement element : + protoEvent.getElementEvent().getElementsList()) { + decodedElements.add( + TimestampedValue.of( + CoderUtils.decodeFromByteArray(coder, element.getEncodedElement().toByteArray()), + new Instant(element.getTimestamp()))); + } + return TestStream.ElementEvent.add(decodedElements); + case EVENT_NOT_SET: + default: + throw new IllegalArgumentException( + String.format( + "Unsupported type of %s: %s", + RunnerApi.TestStreamPayload.Event.class.getCanonicalName(), + protoEvent.getEventCase())); + } + } + + static class TestStreamTranslator implements TransformPayloadTranslator<TestStream<?>> { + @Override + public String getUrn(TestStream<?> transform) { + return PTransformTranslation.TEST_STREAM_TRANSFORM_URN; + } + + @Override + public RunnerApi.FunctionSpec translate( + AppliedPTransform<?, ?, TestStream<?>> transform, SdkComponents components) + throws IOException { + return RunnerApi.FunctionSpec.newBuilder() + .setUrn(getUrn(transform.getTransform())) + .setParameter(Any.pack(testStreamToPayload(transform.getTransform(), components))) + .build(); + } + } + + /** Registers {@link TestStreamTranslator}. */ + @AutoService(TransformPayloadTranslatorRegistrar.class) + public static class Registrar implements TransformPayloadTranslatorRegistrar { + @Override + public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator> + getTransformPayloadTranslators() { + return Collections.singletonMap(TestStream.class, new TestStreamTranslator()); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/0cedc61f/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestStreamTranslationTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestStreamTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestStreamTranslationTest.java new file mode 100644 index 0000000..b2029be --- /dev/null +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestStreamTranslationTest.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import static org.apache.beam.runners.core.construction.PTransformTranslation.TEST_STREAM_TRANSFORM_URN; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import com.google.common.collect.ImmutableList; +import org.apache.beam.runners.core.construction.TestStreamTranslationTest.TestStreamPayloadTranslation; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.ParDoPayload; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TimestampedValue; +import org.hamcrest.Matchers; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; +import org.junit.runners.Suite; + +/** Tests for {@link TestStreamTranslation}. */ +@RunWith(Suite.class) +@Suite.SuiteClasses({ + TestStreamPayloadTranslation.class, +}) +public class TestStreamTranslationTest { + + /** Tests for translating various {@link ParDo} transforms to/from {@link ParDoPayload} protos. */ + @RunWith(Parameterized.class) + public static class TestStreamPayloadTranslation { + @Parameters(name = "{index}: {0}") + public static Iterable<TestStream<?>> data() { + return ImmutableList.<TestStream<?>>of( + TestStream.create(VarIntCoder.of()).advanceWatermarkToInfinity(), + TestStream.create(VarIntCoder.of()) + .advanceWatermarkTo(new Instant(42)) + .advanceWatermarkToInfinity(), + TestStream.create(VarIntCoder.of()) + .addElements(TimestampedValue.of(3, new Instant(17))) + .advanceWatermarkToInfinity(), + TestStream.create(StringUtf8Coder.of()) + .advanceProcessingTime(Duration.millis(82)) + .advanceWatermarkToInfinity()); + } + + @Parameter(0) + public TestStream<String> testStream; + + public static TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); + + @Test + public void testEncodedProto() throws Exception { + SdkComponents components = SdkComponents.create(); + RunnerApi.TestStreamPayload payload = + TestStreamTranslation.testStreamToPayload(testStream, components); + + verifyTestStreamEncoding(testStream, payload, components.toComponents()); + } + + @Test + public void testRegistrarEncodedProto() throws Exception { + PCollection<String> output = p.apply(testStream); + + AppliedPTransform<PBegin, PCollection<String>, TestStream<String>> appliedTestStream = + AppliedPTransform.<PBegin, PCollection<String>, TestStream<String>>of( + "fakeName", PBegin.in(p).expand(), output.expand(), testStream, p); + + SdkComponents components = SdkComponents.create(); + RunnerApi.FunctionSpec spec = + PTransformTranslation.toProto(appliedTestStream, components).getSpec(); + + assertThat(spec.getUrn(), equalTo(TEST_STREAM_TRANSFORM_URN)); + + RunnerApi.TestStreamPayload payload = + spec.getParameter().unpack(RunnerApi.TestStreamPayload.class); + + verifyTestStreamEncoding(testStream, payload, components.toComponents()); + } + + private static <T> void verifyTestStreamEncoding( + TestStream<T> testStream, + RunnerApi.TestStreamPayload payload, + RunnerApi.Components protoComponents) + throws Exception { + + // This reverse direction is only valid for Java-based coders + assertThat( + CoderTranslation.fromProto( + protoComponents.getCodersOrThrow(payload.getCoderId()), protoComponents), + Matchers.<Coder<?>>equalTo(testStream.getValueCoder())); + + assertThat(payload.getEventsList().size(), equalTo(testStream.getEvents().size())); + + for (int i = 0; i < payload.getEventsList().size(); ++i) { + assertThat( + TestStreamTranslation.fromProto(payload.getEvents(i), testStream.getValueCoder()), + equalTo(testStream.getEvents().get(i))); + } + } + } +}