Rename PCollections to PCollectionTranslation This is to give a standard and obvious suffix for all our helper classes for translating to/from Runner API protos.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/940819e2 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/940819e2 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/940819e2 Branch: refs/heads/master Commit: 940819e2acccb8d77b88aaee821ee972aca02eb3 Parents: 4ec3366 Author: Kenneth Knowles <[email protected]> Authored: Tue May 23 15:26:44 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Tue May 23 15:53:41 2017 -0700 ---------------------------------------------------------------------- .../construction/PCollectionTranslation.java | 97 +++++++++ .../runners/core/construction/PCollections.java | 97 --------- .../core/construction/SdkComponents.java | 3 +- .../PCollectionTranslationTest.java | 203 +++++++++++++++++++ .../core/construction/PCollectionsTest.java | 201 ------------------ 5 files changed, 302 insertions(+), 299 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/940819e2/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java new file mode 100644 index 0000000..cad7b97 --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import com.google.protobuf.InvalidProtocolBufferException; +import java.io.IOException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollection.IsBounded; +import org.apache.beam.sdk.values.WindowingStrategy; + +/** + * Utility methods for translating {@link PCollection PCollections} to and from Runner API protos. + */ +public class PCollectionTranslation { + private PCollectionTranslation() {} + + public static RunnerApi.PCollection toProto(PCollection<?> pCollection, SdkComponents components) + throws IOException { + String coderId = components.registerCoder(pCollection.getCoder()); + String windowingStrategyId = + components.registerWindowingStrategy(pCollection.getWindowingStrategy()); + // TODO: Display Data + + return RunnerApi.PCollection.newBuilder() + .setUniqueName(pCollection.getName()) + .setCoderId(coderId) + .setIsBounded(toProto(pCollection.isBounded())) + .setWindowingStrategyId(windowingStrategyId) + .build(); + } + + public static IsBounded isBounded(RunnerApi.PCollection pCollection) { + return fromProto(pCollection.getIsBounded()); + } + + public static Coder<?> getCoder( + RunnerApi.PCollection pCollection, RunnerApi.Components components) throws IOException { + return Coders.fromProto(components.getCodersOrThrow(pCollection.getCoderId()), components); + } + + public static WindowingStrategy<?, ?> getWindowingStrategy( + RunnerApi.PCollection pCollection, RunnerApi.Components components) + throws InvalidProtocolBufferException { + return WindowingStrategies.fromProto( + components.getWindowingStrategiesOrThrow(pCollection.getWindowingStrategyId()), components); + } + + private static RunnerApi.IsBounded toProto(IsBounded bounded) { + switch (bounded) { + case BOUNDED: + return RunnerApi.IsBounded.BOUNDED; + case UNBOUNDED: + return RunnerApi.IsBounded.UNBOUNDED; + default: + throw new IllegalArgumentException( + String.format("Unknown %s %s", IsBounded.class.getSimpleName(), bounded)); + } + } + + private static IsBounded fromProto(RunnerApi.IsBounded isBounded) { + switch (isBounded) { + case BOUNDED: + return IsBounded.BOUNDED; + case UNBOUNDED: + return IsBounded.UNBOUNDED; + case UNRECOGNIZED: + default: + // Whether or not this enum cannot be recognized by the proto (due to the version of the + // generated code we link to) or the switch hasn't been updated to handle it, + // the situation is the same: we don't know what this IsBounded means + throw new IllegalArgumentException( + String.format( + "Cannot convert unknown %s to %s: %s", + RunnerApi.IsBounded.class.getCanonicalName(), + IsBounded.class.getCanonicalName(), + isBounded)); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/940819e2/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollections.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollections.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollections.java deleted file mode 100644 index 0f2fcb7..0000000 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollections.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.core.construction; - -import com.google.protobuf.InvalidProtocolBufferException; -import java.io.IOException; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.common.runner.v1.RunnerApi; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollection.IsBounded; -import org.apache.beam.sdk.values.WindowingStrategy; - -/** - * Utility methods for translating {@link PCollection PCollections} to and from Runner API protos. - */ -public class PCollections { - private PCollections() {} - - public static RunnerApi.PCollection toProto(PCollection<?> pCollection, SdkComponents components) - throws IOException { - String coderId = components.registerCoder(pCollection.getCoder()); - String windowingStrategyId = - components.registerWindowingStrategy(pCollection.getWindowingStrategy()); - // TODO: Display Data - - return RunnerApi.PCollection.newBuilder() - .setUniqueName(pCollection.getName()) - .setCoderId(coderId) - .setIsBounded(toProto(pCollection.isBounded())) - .setWindowingStrategyId(windowingStrategyId) - .build(); - } - - public static IsBounded isBounded(RunnerApi.PCollection pCollection) { - return fromProto(pCollection.getIsBounded()); - } - - public static Coder<?> getCoder( - RunnerApi.PCollection pCollection, RunnerApi.Components components) throws IOException { - return Coders.fromProto(components.getCodersOrThrow(pCollection.getCoderId()), components); - } - - public static WindowingStrategy<?, ?> getWindowingStrategy( - RunnerApi.PCollection pCollection, RunnerApi.Components components) - throws InvalidProtocolBufferException { - return WindowingStrategies.fromProto( - components.getWindowingStrategiesOrThrow(pCollection.getWindowingStrategyId()), components); - } - - private static RunnerApi.IsBounded toProto(IsBounded bounded) { - switch (bounded) { - case BOUNDED: - return RunnerApi.IsBounded.BOUNDED; - case UNBOUNDED: - return RunnerApi.IsBounded.UNBOUNDED; - default: - throw new IllegalArgumentException( - String.format("Unknown %s %s", IsBounded.class.getSimpleName(), bounded)); - } - } - - private static IsBounded fromProto(RunnerApi.IsBounded isBounded) { - switch (isBounded) { - case BOUNDED: - return IsBounded.BOUNDED; - case UNBOUNDED: - return IsBounded.UNBOUNDED; - case UNRECOGNIZED: - default: - // Whether or not this enum cannot be recognized by the proto (due to the version of the - // generated code we link to) or the switch hasn't been updated to handle it, - // the situation is the same: we don't know what this IsBounded means - throw new IllegalArgumentException( - String.format( - "Cannot convert unknown %s to %s: %s", - RunnerApi.IsBounded.class.getCanonicalName(), - IsBounded.class.getCanonicalName(), - isBounded)); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/940819e2/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java index 5714fc5..3d8d4cd 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java @@ -176,7 +176,8 @@ class SdkComponents { } String uniqueName = uniqify(pCollection.getName(), pCollectionIds.values()); pCollectionIds.put(pCollection, uniqueName); - componentsBuilder.putPcollections(uniqueName, PCollections.toProto(pCollection, this)); + componentsBuilder.putPcollections( + uniqueName, PCollectionTranslation.toProto(pCollection, this)); return uniqueName; } http://git-wip-us.apache.org/repos/asf/beam/blob/940819e2/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionTranslationTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionTranslationTest.java new file mode 100644 index 0000000..3b94220 --- /dev/null +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionTranslationTest.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import com.google.auto.value.AutoValue; +import com.google.common.collect.ImmutableList; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Collection; +import java.util.Collections; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.BigEndianLongCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; +import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.windowing.AfterFirst; +import org.apache.beam.sdk.transforms.windowing.AfterPane; +import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException; +import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; +import org.apache.beam.sdk.util.VarInt; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollection.IsBounded; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.hamcrest.Matchers; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +/** + * Tests for {@link PCollectionTranslation}. + */ +@RunWith(Parameterized.class) +public class PCollectionTranslationTest { + // Each spec activates tests of all subsets of its fields + @Parameters(name = "{index}: {0}") + public static Iterable<PCollection<?>> data() { + Pipeline pipeline = TestPipeline.create(); + PCollection<Integer> ints = pipeline.apply("ints", Create.of(1, 2, 3)); + PCollection<Long> longs = pipeline.apply("unbounded longs", GenerateSequence.from(0)); + PCollection<Long> windowedLongs = + longs.apply( + "into fixed windows", + Window.<Long>into(FixedWindows.of(Duration.standardMinutes(10L)))); + PCollection<KV<String, Iterable<String>>> groupedStrings = + pipeline + .apply( + "kvs", Create.of(KV.of("foo", "spam"), KV.of("bar", "ham"), KV.of("baz", "eggs"))) + .apply("group", GroupByKey.<String, String>create()); + PCollection<Long> coderLongs = + pipeline + .apply("counts with alternative coder", GenerateSequence.from(0).to(10)) + .setCoder(BigEndianLongCoder.of()); + PCollection<Integer> allCustomInts = + pipeline + .apply( + "intsWithCustomCoder", + Create.of(1, 2) + .withCoder(new AutoValue_PCollectionTranslationTest_CustomIntCoder())) + .apply( + "into custom windows", + Window.<Integer>into(new CustomWindows()) + .triggering( + AfterWatermark.pastEndOfWindow() + .withEarlyFirings( + AfterFirst.of( + AfterPane.elementCountAtLeast(5), + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(Duration.millis(227L))))) + .accumulatingFiredPanes() + .withAllowedLateness(Duration.standardMinutes(12L))); + return ImmutableList.<PCollection<?>>of(ints, longs, windowedLongs, coderLongs, groupedStrings); + } + + @Parameter(0) + public PCollection<?> testCollection; + + @Test + public void testEncodeDecodeCycle() throws Exception { + SdkComponents sdkComponents = SdkComponents.create(); + RunnerApi.PCollection protoCollection = PCollectionTranslation + .toProto(testCollection, sdkComponents); + RunnerApi.Components protoComponents = sdkComponents.toComponents(); + Coder<?> decodedCoder = PCollectionTranslation.getCoder(protoCollection, protoComponents); + WindowingStrategy<?, ?> decodedStrategy = + PCollectionTranslation.getWindowingStrategy(protoCollection, protoComponents); + IsBounded decodedIsBounded = PCollectionTranslation.isBounded(protoCollection); + + assertThat(decodedCoder, Matchers.<Coder<?>>equalTo(testCollection.getCoder())); + assertThat( + decodedStrategy, + Matchers.<WindowingStrategy<?, ?>>equalTo( + testCollection.getWindowingStrategy().fixDefaults())); + assertThat(decodedIsBounded, equalTo(testCollection.isBounded())); + } + + @AutoValue + abstract static class CustomIntCoder extends CustomCoder<Integer> { + @Override + public Integer decode(InputStream inStream) throws IOException { + return VarInt.decodeInt(inStream); + } + + @Override + public void encode(Integer value, OutputStream outStream) throws IOException { + VarInt.encode(value, outStream); + } + } + + private static class CustomWindows extends NonMergingWindowFn<Integer, BoundedWindow> { + @Override + public Collection<BoundedWindow> assignWindows(final AssignContext c) throws Exception { + return Collections.<BoundedWindow>singleton( + new BoundedWindow() { + @Override + public Instant maxTimestamp() { + return new Instant(c.element().longValue()); + } + }); + } + + @Override + public boolean isCompatible(WindowFn<?, ?> other) { + return other != null && this.getClass().equals(other.getClass()); + } + + @Override + public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException { + if (!this.isCompatible(other)) { + throw new IncompatibleWindowException( + other, + String.format( + "%s is only compatible with %s.", + CustomWindows.class.getSimpleName(), CustomWindows.class.getSimpleName())); + } + } + + @Override + public Coder<BoundedWindow> windowCoder() { + return new AtomicCoder<BoundedWindow>() { + @Override public void verifyDeterministic() {} + + @Override + public void encode(BoundedWindow value, OutputStream outStream) + throws IOException { + VarInt.encode(value.maxTimestamp().getMillis(), outStream); + } + + @Override + public BoundedWindow decode(InputStream inStream) throws IOException { + final Instant ts = new Instant(VarInt.decodeLong(inStream)); + return new BoundedWindow() { + @Override + public Instant maxTimestamp() { + return ts; + } + }; + } + }; + } + + @Override + public WindowMappingFn<BoundedWindow> getDefaultWindowMappingFn() { + throw new UnsupportedOperationException(); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/940819e2/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java deleted file mode 100644 index 9407a5a..0000000 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java +++ /dev/null @@ -1,201 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.core.construction; - -import static org.hamcrest.Matchers.equalTo; -import static org.junit.Assert.assertThat; - -import com.google.auto.value.AutoValue; -import com.google.common.collect.ImmutableList; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Collection; -import java.util.Collections; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.AtomicCoder; -import org.apache.beam.sdk.coders.BigEndianLongCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CustomCoder; -import org.apache.beam.sdk.common.runner.v1.RunnerApi; -import org.apache.beam.sdk.io.GenerateSequence; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.windowing.AfterFirst; -import org.apache.beam.sdk.transforms.windowing.AfterPane; -import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; -import org.apache.beam.sdk.transforms.windowing.AfterWatermark; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException; -import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; -import org.apache.beam.sdk.util.VarInt; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollection.IsBounded; -import org.apache.beam.sdk.values.WindowingStrategy; -import org.hamcrest.Matchers; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameter; -import org.junit.runners.Parameterized.Parameters; - -/** - * Tests for {@link PCollections}. - */ -@RunWith(Parameterized.class) -public class PCollectionsTest { - // Each spec activates tests of all subsets of its fields - @Parameters(name = "{index}: {0}") - public static Iterable<PCollection<?>> data() { - Pipeline pipeline = TestPipeline.create(); - PCollection<Integer> ints = pipeline.apply("ints", Create.of(1, 2, 3)); - PCollection<Long> longs = pipeline.apply("unbounded longs", GenerateSequence.from(0)); - PCollection<Long> windowedLongs = - longs.apply( - "into fixed windows", - Window.<Long>into(FixedWindows.of(Duration.standardMinutes(10L)))); - PCollection<KV<String, Iterable<String>>> groupedStrings = - pipeline - .apply( - "kvs", Create.of(KV.of("foo", "spam"), KV.of("bar", "ham"), KV.of("baz", "eggs"))) - .apply("group", GroupByKey.<String, String>create()); - PCollection<Long> coderLongs = - pipeline - .apply("counts with alternative coder", GenerateSequence.from(0).to(10)) - .setCoder(BigEndianLongCoder.of()); - PCollection<Integer> allCustomInts = - pipeline - .apply( - "intsWithCustomCoder", - Create.of(1, 2).withCoder(new AutoValue_PCollectionsTest_CustomIntCoder())) - .apply( - "into custom windows", - Window.<Integer>into(new CustomWindows()) - .triggering( - AfterWatermark.pastEndOfWindow() - .withEarlyFirings( - AfterFirst.of( - AfterPane.elementCountAtLeast(5), - AfterProcessingTime.pastFirstElementInPane() - .plusDelayOf(Duration.millis(227L))))) - .accumulatingFiredPanes() - .withAllowedLateness(Duration.standardMinutes(12L))); - return ImmutableList.<PCollection<?>>of(ints, longs, windowedLongs, coderLongs, groupedStrings); - } - - @Parameter(0) - public PCollection<?> testCollection; - - @Test - public void testEncodeDecodeCycle() throws Exception { - SdkComponents sdkComponents = SdkComponents.create(); - RunnerApi.PCollection protoCollection = PCollections.toProto(testCollection, sdkComponents); - RunnerApi.Components protoComponents = sdkComponents.toComponents(); - Coder<?> decodedCoder = PCollections.getCoder(protoCollection, protoComponents); - WindowingStrategy<?, ?> decodedStrategy = - PCollections.getWindowingStrategy(protoCollection, protoComponents); - IsBounded decodedIsBounded = PCollections.isBounded(protoCollection); - - assertThat(decodedCoder, Matchers.<Coder<?>>equalTo(testCollection.getCoder())); - assertThat( - decodedStrategy, - Matchers.<WindowingStrategy<?, ?>>equalTo( - testCollection.getWindowingStrategy().fixDefaults())); - assertThat(decodedIsBounded, equalTo(testCollection.isBounded())); - } - - @AutoValue - abstract static class CustomIntCoder extends CustomCoder<Integer> { - @Override - public Integer decode(InputStream inStream) throws IOException { - return VarInt.decodeInt(inStream); - } - - @Override - public void encode(Integer value, OutputStream outStream) throws IOException { - VarInt.encode(value, outStream); - } - } - - private static class CustomWindows extends NonMergingWindowFn<Integer, BoundedWindow> { - @Override - public Collection<BoundedWindow> assignWindows(final AssignContext c) throws Exception { - return Collections.<BoundedWindow>singleton( - new BoundedWindow() { - @Override - public Instant maxTimestamp() { - return new Instant(c.element().longValue()); - } - }); - } - - @Override - public boolean isCompatible(WindowFn<?, ?> other) { - return other != null && this.getClass().equals(other.getClass()); - } - - @Override - public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException { - if (!this.isCompatible(other)) { - throw new IncompatibleWindowException( - other, - String.format( - "%s is only compatible with %s.", - CustomWindows.class.getSimpleName(), CustomWindows.class.getSimpleName())); - } - } - - @Override - public Coder<BoundedWindow> windowCoder() { - return new AtomicCoder<BoundedWindow>() { - @Override public void verifyDeterministic() {} - - @Override - public void encode(BoundedWindow value, OutputStream outStream) - throws IOException { - VarInt.encode(value.maxTimestamp().getMillis(), outStream); - } - - @Override - public BoundedWindow decode(InputStream inStream) throws IOException { - final Instant ts = new Instant(VarInt.decodeLong(inStream)); - return new BoundedWindow() { - @Override - public Instant maxTimestamp() { - return ts; - } - }; - } - }; - } - - @Override - public WindowMappingFn<BoundedWindow> getDefaultWindowMappingFn() { - throw new UnsupportedOperationException(); - } - } -}
