Add custom rehydration for TestStream
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6abf6f52 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6abf6f52 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6abf6f52 Branch: refs/heads/master Commit: 6abf6f520df9efd5950063019bbc33ddc85a5c97 Parents: 10c63e1 Author: Kenneth Knowles <[email protected]> Authored: Mon Oct 2 20:20:53 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Tue Oct 17 12:45:11 2017 -0700 ---------------------------------------------------------------------- .../construction/TestStreamTranslation.java | 171 +++++++++++++++---- .../construction/TestStreamTranslationTest.java | 4 +- 2 files changed, 142 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/6abf6f52/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java index 8e4c1db..1b18844 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java @@ -22,12 +22,14 @@ import static com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.runners.core.construction.PTransformTranslation.TEST_STREAM_TRANSFORM_URN; import com.google.auto.service.AutoService; +import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import javax.annotation.Nonnull; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator; import org.apache.beam.sdk.coders.Coder; @@ -47,21 +49,74 @@ import org.joda.time.Instant; */ public class TestStreamTranslation { - static <T> RunnerApi.TestStreamPayload testStreamToPayload( - TestStream<T> transform, SdkComponents components) throws IOException { - String coderId = components.registerCoder(transform.getValueCoder()); + private interface TestStreamLike { + Coder<?> getValueCoder(); - RunnerApi.TestStreamPayload.Builder builder = - RunnerApi.TestStreamPayload.newBuilder().setCoderId(coderId); + List<RunnerApi.TestStreamPayload.Event> getEvents(); + } + + @VisibleForTesting + static class RawTestStream<T> extends PTransformTranslation.RawPTransform<PBegin, PCollection<T>> + implements TestStreamLike { + + private final transient RehydratedComponents rehydratedComponents; + private final RunnerApi.TestStreamPayload payload; + private final Coder<T> valueCoder; + private final RunnerApi.FunctionSpec spec; + + public RawTestStream( + RunnerApi.TestStreamPayload payload, RehydratedComponents rehydratedComponents) { + this.payload = payload; + this.spec = + RunnerApi.FunctionSpec.newBuilder() + .setUrn(TEST_STREAM_TRANSFORM_URN) + .setPayload(payload.toByteString()) + .build(); + this.rehydratedComponents = rehydratedComponents; + + // Eagerly extract the coder to throw a good exception here + try { + this.valueCoder = (Coder<T>) rehydratedComponents.getCoder(payload.getCoderId()); + } catch (IOException exc) { + throw new IllegalArgumentException( + String.format( + "Failure extracting coder with id '%s' for %s", + payload.getCoderId(), TestStream.class.getSimpleName()), + exc); + } + } + + @Override + public String getUrn() { + return TEST_STREAM_TRANSFORM_URN; + } + + @Nonnull + @Override + public RunnerApi.FunctionSpec getSpec() { + return spec; + } - for (TestStream.Event<T> event : transform.getEvents()) { - builder.addEvents(toProto(event, transform.getValueCoder())); + @Override + public RunnerApi.FunctionSpec migrate(SdkComponents components) throws IOException { + return RunnerApi.FunctionSpec.newBuilder() + .setUrn(TEST_STREAM_TRANSFORM_URN) + .setPayload(payloadForTestStreamLike(this, components).toByteString()) + .build(); } - return builder.build(); + @Override + public Coder<T> getValueCoder() { + return valueCoder; + } + + @Override + public List<RunnerApi.TestStreamPayload.Event> getEvents() { + return payload.getEventsList(); + } } - private static TestStream<?> fromProto( + private static TestStream<?> testStreamFromProtoPayload( RunnerApi.TestStreamPayload testStreamPayload, RehydratedComponents components) throws IOException { @@ -70,7 +125,7 @@ public class TestStreamTranslation { List<TestStream.Event<Object>> events = new ArrayList<>(); for (RunnerApi.TestStreamPayload.Event event : testStreamPayload.getEventsList()) { - events.add(fromProto(event, coder)); + events.add(eventFromProto(event, coder)); } return TestStream.fromRawEvents(coder, events); } @@ -98,12 +153,12 @@ public class TestStreamTranslation { RunnerApi.TestStreamPayload.parseFrom(transformProto.getSpec().getPayload()); return (TestStream<T>) - fromProto( + testStreamFromProtoPayload( testStreamPayload, RehydratedComponents.forComponents(sdkComponents.toComponents())); } - static <T> RunnerApi.TestStreamPayload.Event toProto(TestStream.Event<T> event, Coder<T> coder) - throws IOException { + static <T> RunnerApi.TestStreamPayload.Event eventToProto( + TestStream.Event<T> event, Coder<T> coder) throws IOException { switch (event.getType()) { case WATERMARK: return RunnerApi.TestStreamPayload.Event.newBuilder() @@ -143,7 +198,7 @@ public class TestStreamTranslation { } } - static <T> TestStream.Event<T> fromProto( + static <T> TestStream.Event<T> eventFromProto( RunnerApi.TestStreamPayload.Event protoEvent, Coder<T> coder) throws IOException { switch (protoEvent.getEventCase()) { case WATERMARK_EVENT: @@ -172,8 +227,8 @@ public class TestStreamTranslation { } } - static class TestStreamTranslator - extends TransformPayloadTranslator.WithDefaultRehydration<TestStream<?>> { + /** A translator registered to translate {@link TestStream} objects to protobuf representation. */ + static class TestStreamTranslator implements TransformPayloadTranslator<TestStream<?>> { @Override public String getUrn(TestStream<?> transform) { return TEST_STREAM_TRANSFORM_URN; @@ -181,27 +236,81 @@ public class TestStreamTranslation { @Override public RunnerApi.FunctionSpec translate( - AppliedPTransform<?, ?, TestStream<?>> transform, SdkComponents components) + final AppliedPTransform<?, ?, TestStream<?>> transform, SdkComponents components) throws IOException { - return RunnerApi.FunctionSpec.newBuilder() - .setUrn(getUrn(transform.getTransform())) - .setPayload(testStreamToPayload(transform.getTransform(), components).toByteString()) - .build(); + return translateTyped(transform.getTransform(), components); } - } - /** Registers {@link TestStreamTranslator}. */ - @AutoService(TransformPayloadTranslatorRegistrar.class) - public static class Registrar implements TransformPayloadTranslatorRegistrar { @Override - public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator> - getTransformPayloadTranslators() { - return Collections.singletonMap(TestStream.class, new TestStreamTranslator()); + public PTransformTranslation.RawPTransform<?, ?> rehydrate( + RunnerApi.PTransform protoTransform, RehydratedComponents rehydratedComponents) + throws IOException { + checkArgument( + protoTransform.getSpec() != null, + "%s received transform with null spec", + getClass().getSimpleName()); + checkArgument(protoTransform.getSpec().getUrn().equals(TEST_STREAM_TRANSFORM_URN)); + return new RawTestStream<>( + RunnerApi.TestStreamPayload.parseFrom(protoTransform.getSpec().getPayload()), + rehydratedComponents); } - @Override - public Map<String, TransformPayloadTranslator> getTransformRehydrators() { - return Collections.emptyMap(); + private <T> RunnerApi.FunctionSpec translateTyped( + final TestStream<T> testStream, SdkComponents components) throws IOException { + return RunnerApi.FunctionSpec.newBuilder() + .setUrn(TEST_STREAM_TRANSFORM_URN) + .setPayload(payloadForTestStream(testStream, components).toByteString()) + .build(); } + + /** Registers {@link TestStreamTranslator}. */ + @AutoService(TransformPayloadTranslatorRegistrar.class) + public static class Registrar implements TransformPayloadTranslatorRegistrar { + @Override + public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator> + getTransformPayloadTranslators() { + return Collections.singletonMap(TestStream.class, new TestStreamTranslator()); + } + + @Override + public Map<String, ? extends TransformPayloadTranslator> getTransformRehydrators() { + return Collections.singletonMap(TEST_STREAM_TRANSFORM_URN, new TestStreamTranslator()); + } + } + } + + /** Produces a {@link RunnerApi.TestStreamPayload} from a portable {@link RawTestStream}. */ + static RunnerApi.TestStreamPayload payloadForTestStreamLike( + TestStreamLike transform, SdkComponents components) throws IOException { + return RunnerApi.TestStreamPayload.newBuilder() + .setCoderId(components.registerCoder(transform.getValueCoder())) + .addAllEvents(transform.getEvents()) + .build(); + } + + @VisibleForTesting + static <T> RunnerApi.TestStreamPayload payloadForTestStream( + final TestStream<T> testStream, SdkComponents components) throws IOException { + return payloadForTestStreamLike( + new TestStreamLike() { + @Override + public Coder<T> getValueCoder() { + return testStream.getValueCoder(); + } + + @Override + public List<RunnerApi.TestStreamPayload.Event> getEvents() { + try { + List<RunnerApi.TestStreamPayload.Event> protoEvents = new ArrayList<>(); + for (TestStream.Event<T> event : testStream.getEvents()) { + protoEvents.add(eventToProto(event, testStream.getValueCoder())); + } + return protoEvents; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }, + components); } } http://git-wip-us.apache.org/repos/asf/beam/blob/6abf6f52/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestStreamTranslationTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestStreamTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestStreamTranslationTest.java index 3678fc7..fc30552 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestStreamTranslationTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestStreamTranslationTest.java @@ -81,7 +81,7 @@ public class TestStreamTranslationTest { public void testEncodedProto() throws Exception { SdkComponents components = SdkComponents.create(); RunnerApi.TestStreamPayload payload = - TestStreamTranslation.testStreamToPayload(testStream, components); + TestStreamTranslation.payloadForTestStream(testStream, components); verifyTestStreamEncoding( testStream, payload, RehydratedComponents.forComponents(components.toComponents())); @@ -122,7 +122,7 @@ public class TestStreamTranslationTest { for (int i = 0; i < payload.getEventsList().size(); ++i) { assertThat( - TestStreamTranslation.fromProto(payload.getEvents(i), testStream.getValueCoder()), + TestStreamTranslation.eventFromProto(payload.getEvents(i), testStream.getValueCoder()), equalTo(testStream.getEvents().get(i))); } }
