Repository: beam Updated Branches: refs/heads/master 21a2b96a1 -> 4c4fdf244
Test all Known Coders to ensure they Serialize via URN Fix Coders to not encode VarIntCoders as VarLongCoders, and VarLongCoders not as known coders. Ensure that all known coders are tested. Use getComponents rather than getCoderArguments for known coders. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/71204403 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/71204403 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/71204403 Branch: refs/heads/master Commit: 71204403aa8b880549dc8a533efe46e67870dbd6 Parents: 21a2b96 Author: Thomas Groh <[email protected]> Authored: Wed Apr 12 16:31:53 2017 -0700 Committer: Luke Cwik <[email protected]> Committed: Thu Apr 13 09:07:07 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/core/construction/Coders.java | 26 ++-- .../runners/core/construction/CodersTest.java | 144 +++++++++++++------ 2 files changed, 122 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/71204403/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java index 7b96240..043a010 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java @@ -18,7 +18,10 @@ package org.apache.beam.runners.core.construction; +import static com.google.common.base.Preconditions.checkArgument; + import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.BiMap; import com.google.common.collect.ImmutableBiMap; import com.google.protobuf.Any; @@ -32,7 +35,7 @@ import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.coders.StandardCoder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components; @@ -54,11 +57,12 @@ public class Coders { public static final String CUSTOM_CODER_URN = "urn:beam:coders:javasdk:0.1"; // The URNs for coders which are shared across languages - private static final BiMap<Class<? extends Coder>, String> KNOWN_CODER_URNS = - ImmutableBiMap.<Class<? extends Coder>, String>builder() + @VisibleForTesting + static final BiMap<Class<? extends StandardCoder>, String> KNOWN_CODER_URNS = + ImmutableBiMap.<Class<? extends StandardCoder>, String>builder() .put(ByteArrayCoder.class, "urn:beam:coders:bytes:0.1") .put(KvCoder.class, "urn:beam:coders:kv:0.1") - .put(VarIntCoder.class, "urn:beam:coders:varint:0.1") + .put(VarLongCoder.class, "urn:beam:coders:varint:0.1") .put(IntervalWindowCoder.class, "urn:beam:coders:interval_window:0.1") .put(IterableCoder.class, "urn:beam:coders:stream:0.1") .put(GlobalWindow.Coder.class, "urn:beam:coders:global_window:0.1") @@ -75,11 +79,17 @@ public class Coders { private static RunnerApi.Coder toKnownCoder(Coder<?> coder, SdkComponents components) throws IOException { + checkArgument( + coder instanceof StandardCoder, + "A Known %s must implement %s, but %s of class %s does not", + Coder.class.getSimpleName(), + StandardCoder.class.getSimpleName(), + coder, + coder.getClass().getName()); + StandardCoder<?> stdCoder = (StandardCoder<?>) coder; List<String> componentIds = new ArrayList<>(); - if (coder.getCoderArguments() != null) { - for (Coder<?> componentCoder : coder.getCoderArguments()) { - componentIds.add(components.registerCoder(componentCoder)); - } + for (Coder<?> componentCoder : stdCoder.getComponents()) { + componentIds.add(components.registerCoder(componentCoder)); } return RunnerApi.Coder.newBuilder() .addAllComponentCoderIds(componentIds) http://git-wip-us.apache.org/repos/asf/beam/blob/71204403/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java index 1a657b2..b2b9955 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java @@ -18,82 +18,146 @@ package org.apache.beam.runners.core.construction; +import static com.google.common.base.Preconditions.checkState; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertThat; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; +import java.util.HashSet; +import java.util.Set; import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.StandardCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder; +import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; import org.hamcrest.Matchers; import org.junit.Test; +import org.junit.experimental.runners.Enclosed; import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameter; import org.junit.runners.Parameterized.Parameters; -/** - * Tests for {@link Coders}. - */ -@RunWith(Parameterized.class) +/** Tests for {@link Coders}. */ +@RunWith(Enclosed.class) public class CodersTest { - @Parameters(name = "{index}: {0}") - public static Iterable<Coder<?>> data() { - return ImmutableList.<Coder<?>>of( - StringUtf8Coder.of(), - IterableCoder.of(VarLongCoder.of()), - KvCoder.of(StringUtf8Coder.of(), ListCoder.of(VarLongCoder.of())), - SerializableCoder.of(Record.class), - new RecordCoder(), - KvCoder.of(new RecordCoder(), AvroCoder.of(Record.class))); + private static final Set<StandardCoder<?>> KNOWN_CODERS = + ImmutableSet.<StandardCoder<?>>builder() + .add(ByteArrayCoder.of()) + .add(KvCoder.of(VarLongCoder.of(), VarLongCoder.of())) + .add(VarLongCoder.of()) + .add(IntervalWindowCoder.of()) + .add(IterableCoder.of(ByteArrayCoder.of())) + .add(GlobalWindow.Coder.INSTANCE) + .add( + FullWindowedValueCoder.of( + IterableCoder.of(VarLongCoder.of()), IntervalWindowCoder.of())) + .build(); + + /** + * Tests that all known coders are present in the parameters that will be used by + * {@link ToFromProtoTest}. + */ + @RunWith(JUnit4.class) + public static class ValidateKnownCodersPresentTest { + @Test + public void validateKnownCoders() { + // Validates that every known coder in the Coders class is represented in a "Known Coder" + // tests, which demonstrates that they are serialized via components and specified URNs rather + // than java serialized + Set<Class<? extends StandardCoder>> knownCoderClasses = Coders.KNOWN_CODER_URNS.keySet(); + Set<Class<? extends StandardCoder>> knownCoderTests = new HashSet<>(); + for (StandardCoder<?> coder : KNOWN_CODERS) { + knownCoderTests.add(coder.getClass()); + } + Set<Class<? extends StandardCoder>> missingKnownCoders = new HashSet<>(knownCoderClasses); + missingKnownCoders.removeAll(knownCoderTests); + checkState( + missingKnownCoders.isEmpty(), + "Missing validation of known coder %s in %s", + missingKnownCoders, + CodersTest.class.getSimpleName()); + } } - @Parameter(0) - public Coder<?> coder; - @Test - public void toAndFromProto() throws Exception { - SdkComponents componentsBuilder = SdkComponents.create(); - RunnerApi.Coder coderProto = Coders.toProto(coder, componentsBuilder); + /** + * Tests round-trip coder encodings for both known and unknown {@link Coder coders}. + */ + @RunWith(Parameterized.class) + public static class ToFromProtoTest { + @Parameters(name = "{index}: {0}") + public static Iterable<Coder<?>> data() { + return ImmutableList.<Coder<?>>builder() + .addAll(KNOWN_CODERS) + .add( + StringUtf8Coder.of(), + SerializableCoder.of(Record.class), + new RecordCoder(), + KvCoder.of(new RecordCoder(), AvroCoder.of(Record.class))) + .build(); + } - Components encodedComponents = componentsBuilder.toComponents(); - Coder<?> decodedCoder = Coders.fromProto(coderProto, encodedComponents); - assertThat(decodedCoder, Matchers.<Coder<?>>equalTo(coder)); - } + @Parameter(0) + public Coder<?> coder; - static class Record implements Serializable { - } + @Test + public void toAndFromProto() throws Exception { + SdkComponents componentsBuilder = SdkComponents.create(); + RunnerApi.Coder coderProto = Coders.toProto(coder, componentsBuilder); - private static class RecordCoder extends CustomCoder<Record> { - @Override - public void encode(Record value, OutputStream outStream, Context context) - throws CoderException, IOException {} + Components encodedComponents = componentsBuilder.toComponents(); + Coder<?> decodedCoder = Coders.fromProto(coderProto, encodedComponents); + assertThat(decodedCoder, Matchers.<Coder<?>>equalTo(coder)); - @Override - public Record decode(InputStream inStream, Context context) throws CoderException, IOException { - return new Record(); + if (KNOWN_CODERS.contains(coder)) { + for (RunnerApi.Coder encodedCoder : encodedComponents.getCodersMap().values()) { + assertThat( + encodedCoder.getSpec().getSpec().getUrn(), not(equalTo(Coders.CUSTOM_CODER_URN))); + } + } } - @Override - public boolean equals(Object other) { - return other != null && getClass().equals(other.getClass()); - } + static class Record implements Serializable {} + + private static class RecordCoder extends CustomCoder<Record> { + @Override + public void encode(Record value, OutputStream outStream, Context context) + throws CoderException, IOException {} + + @Override + public Record decode(InputStream inStream, Context context) + throws CoderException, IOException { + return new Record(); + } + + @Override + public boolean equals(Object other) { + return other != null && getClass().equals(other.getClass()); + } - @Override - public int hashCode() { - return getClass().hashCode(); + @Override + public int hashCode() { + return getClass().hashCode(); + } } } }
