Repository: beam Updated Branches: refs/heads/master 3c10c0bc8 -> a32bef96d
Add ReadTranslator This translates Read transforms to ReadPayloads and back Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7f35c98b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7f35c98b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7f35c98b Branch: refs/heads/master Commit: 7f35c98b5f77069bd21dc7dea4a7d046883d13a6 Parents: b633abe Author: Thomas Groh <[email protected]> Authored: Thu May 18 10:23:35 2017 -0700 Committer: Thomas Groh <[email protected]> Committed: Mon May 22 16:04:27 2017 -0700 ---------------------------------------------------------------------- .../core/construction/ReadTranslator.java | 127 +++++++++++++ .../core/construction/ReadTranslatorTest.java | 179 +++++++++++++++++++ .../beam/runners/dataflow/ReadTranslator.java | 5 +- .../org/apache/beam/sdk/io/CountingSource.java | 42 +++++ 4 files changed, 350 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/7f35c98b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslator.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslator.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslator.java new file mode 100644 index 0000000..f944938 --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslator.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.protobuf.Any; +import com.google.protobuf.ByteString; +import com.google.protobuf.BytesValue; +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.IsBounded; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.ReadPayload; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.Source; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.SerializableUtils; + +/** + * Methods for translating {@link Read.Bounded} and {@link Read.Unbounded} + * {@link PTransform PTransforms} into {@link ReadPayload} protos. + */ +public class ReadTranslator { + private static final String JAVA_SERIALIZED_BOUNDED_SOURCE = "urn:beam:java:boundedsource:v1"; + private static final String JAVA_SERIALIZED_UNBOUNDED_SOURCE = "urn:beam:java:unboundedsource:v1"; + + public static ReadPayload toProto(Read.Bounded<?> read) { + return ReadPayload.newBuilder() + .setIsBounded(IsBounded.BOUNDED) + .setSource(toProto(read.getSource())) + .build(); + } + + public static ReadPayload toProto(Read.Unbounded<?> read) { + return ReadPayload.newBuilder() + .setIsBounded(IsBounded.UNBOUNDED) + .setSource(toProto(read.getSource())) + .build(); + } + + public static SdkFunctionSpec toProto(Source<?> source) { + if (source instanceof BoundedSource) { + return toProto((BoundedSource) source); + } else if (source instanceof UnboundedSource) { + return toProto((UnboundedSource<?, ?>) source); + } else { + throw new IllegalArgumentException( + String.format("Unknown %s type %s", Source.class.getSimpleName(), source.getClass())); + } + } + + private static SdkFunctionSpec toProto(BoundedSource<?> source) { + return SdkFunctionSpec.newBuilder() + .setSpec( + FunctionSpec.newBuilder() + .setUrn(JAVA_SERIALIZED_BOUNDED_SOURCE) + .setParameter( + Any.pack( + BytesValue.newBuilder() + .setValue( + ByteString.copyFrom(SerializableUtils.serializeToByteArray(source))) + .build()))) + .build(); + } + + public static BoundedSource<?> boundedSourceFromProto(ReadPayload payload) + throws InvalidProtocolBufferException { + checkArgument(payload.getIsBounded().equals(IsBounded.BOUNDED)); + return (BoundedSource<?>) SerializableUtils.deserializeFromByteArray( + payload + .getSource() + .getSpec() + .getParameter() + .unpack(BytesValue.class) + .getValue() + .toByteArray(), + "BoundedSource"); + } + + private static SdkFunctionSpec toProto(UnboundedSource<?, ?> source) { + return SdkFunctionSpec.newBuilder() + .setSpec( + FunctionSpec.newBuilder() + .setUrn(JAVA_SERIALIZED_UNBOUNDED_SOURCE) + .setParameter( + Any.pack( + BytesValue.newBuilder() + .setValue( + ByteString.copyFrom(SerializableUtils.serializeToByteArray(source))) + .build()))) + .build(); + } + + public static UnboundedSource<?, ?> unboundedSourceFromProto(ReadPayload payload) + throws InvalidProtocolBufferException { + checkArgument(payload.getIsBounded().equals(IsBounded.UNBOUNDED)); + return (UnboundedSource<?, ?>) SerializableUtils.deserializeFromByteArray( + payload + .getSource() + .getSpec() + .getParameter() + .unpack(BytesValue.class) + .getValue() + .toByteArray(), + "BoundedSource"); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/7f35c98b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslatorTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslatorTest.java new file mode 100644 index 0000000..a603e34 --- /dev/null +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslatorTest.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertThat; +import static org.junit.Assume.assumeThat; + +import com.google.common.collect.ImmutableList; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.ReadPayload; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.CountingSource; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.Source; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; +import org.apache.beam.sdk.options.PipelineOptions; +import org.hamcrest.Matchers; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +/** + * Tests for {@link ReadTranslator}. + */ +@RunWith(Parameterized.class) +public class ReadTranslatorTest { + + @Parameters(name = "{index}: {0}") + public static Iterable<Source<?>> data() { + return ImmutableList.<Source<?>>of( + CountingSource.unbounded(), + CountingSource.upTo(100L), + new TestBoundedSource(), + new TestUnboundedSource()); + } + + @Parameter(0) + public Source<?> source; + + @Test + public void testToFromProtoBounded() throws Exception { + // TODO: Split into two tests. + assumeThat(source, instanceOf(BoundedSource.class)); + BoundedSource<?> boundedSource = (BoundedSource<?>) this.source; + Read.Bounded<?> boundedRead = Read.from(boundedSource); + ReadPayload payload = ReadTranslator.toProto(boundedRead); + assertThat(payload.getIsBounded(), equalTo(RunnerApi.IsBounded.BOUNDED)); + BoundedSource<?> deserializedSource = ReadTranslator.boundedSourceFromProto(payload); + assertThat(deserializedSource, Matchers.<Source<?>>equalTo(source)); + } + + @Test + public void testToFromProtoUnbounded() throws Exception { + assumeThat(source, instanceOf(UnboundedSource.class)); + UnboundedSource<?, ?> unboundedSource = (UnboundedSource<?, ?>) this.source; + Read.Unbounded<?> unboundedRead = Read.from(unboundedSource); + ReadPayload payload = ReadTranslator.toProto(unboundedRead); + assertThat(payload.getIsBounded(), equalTo(RunnerApi.IsBounded.UNBOUNDED)); + UnboundedSource<?, ?> deserializedSource = ReadTranslator.unboundedSourceFromProto(payload); + assertThat(deserializedSource, Matchers.<Source<?>>equalTo(source)); + } + + private static class TestBoundedSource extends BoundedSource<String> { + @Override + public List<? extends BoundedSource<String>> split( + long desiredBundleSizeBytes, PipelineOptions options) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public BoundedReader<String> createReader(PipelineOptions options) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void validate() {} + + @Override + public Coder<String> getDefaultOutputCoder() { + return StringUtf8Coder.of(); + } + + @Override + public boolean equals(Object other) { + return other != null && other.getClass().equals(TestBoundedSource.class); + } + + @Override + public int hashCode() { + return TestBoundedSource.class.hashCode(); + } + } + + private static class TestUnboundedSource extends UnboundedSource<byte[], CheckpointMark> { + @Override + public void validate() {} + + @Override + public Coder<byte[]> getDefaultOutputCoder() { + return ByteArrayCoder.of(); + } + + @Override + public List<? extends UnboundedSource<byte[], CheckpointMark>> split( + int desiredNumSplits, PipelineOptions options) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public UnboundedReader<byte[]> createReader( + PipelineOptions options, @Nullable CheckpointMark checkpointMark) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public Coder<CheckpointMark> getCheckpointMarkCoder() { + return new TestCheckpointMarkCoder(); + } + + @Override + public boolean equals(Object other) { + return other != null && other.getClass().equals(TestUnboundedSource.class); + } + + @Override + public int hashCode() { + return TestUnboundedSource.class.hashCode(); + } + + private class TestCheckpointMarkCoder extends AtomicCoder<CheckpointMark> { + @Override + public void encode(CheckpointMark value, OutputStream outStream) + throws CoderException, IOException { + throw new UnsupportedOperationException(); + } + + @Override + public CheckpointMark decode(InputStream inStream) throws CoderException, IOException { + throw new UnsupportedOperationException(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/7f35c98b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java index 30ecbf5..0b22d7e 100755 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java @@ -40,9 +40,8 @@ class ReadTranslator implements TransformTranslator<Read.Bounded<?>> { translateReadHelper(transform.getSource(), transform, context); } - public static <T> void translateReadHelper(Source<T> source, - PTransform<?, ? extends PValue> transform, - TranslationContext context) { + public static <T> void translateReadHelper( + Source<T> source, PTransform<?, ? extends PValue> transform, TranslationContext context) { try { StepTranslationContext stepContext = context.addStep(transform, "ParallelRead"); stepContext.addInput(PropertyNames.FORMAT, PropertyNames.CUSTOM_SOURCE_FORMAT); http://git-wip-us.apache.org/repos/asf/beam/blob/7f35c98b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java index 81082e5..6202c2b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableList; import java.io.IOException; import java.util.List; import java.util.NoSuchElementException; +import java.util.Objects; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.DefaultCoder; @@ -136,6 +137,16 @@ public class CountingSource { public Instant apply(Long input) { return Instant.now(); } + + @Override + public boolean equals(Object other) { + return other instanceof NowTimestampFn; + } + + @Override + public int hashCode() { + return getClass().hashCode(); + } } /** @@ -180,6 +191,21 @@ public class CountingSource { public Coder<Long> getDefaultOutputCoder() { return VarLongCoder.of(); } + + @Override + public boolean equals(Object other) { + if (!(other instanceof BoundedCountingSource)) { + return false; + } + BoundedCountingSource that = (BoundedCountingSource) other; + return this.getStartOffset() == that.getStartOffset() + && this.getEndOffset() == that.getEndOffset(); + } + + @Override + public int hashCode() { + return Objects.hash(this.getStartOffset(), (int) this.getEndOffset()); + } } /** @@ -341,6 +367,22 @@ public class CountingSource { public Coder<Long> getDefaultOutputCoder() { return VarLongCoder.of(); } + + public boolean equals(Object other) { + if (!(other instanceof UnboundedCountingSource)) { + return false; + } + UnboundedCountingSource that = (UnboundedCountingSource) other; + return this.start == that.start + && this.stride == that.stride + && this.elementsPerPeriod == that.elementsPerPeriod + && Objects.equals(this.period, that.period) + && Objects.equals(this.timestampFn, that.timestampFn); + } + + public int hashCode() { + return Objects.hash(start, stride, elementsPerPeriod, period, timestampFn); + } } /**
