Repository: beam Updated Branches: refs/heads/master d233a240e -> 8cc4d59c2
Rename StandardCoder to StructuredCoder StandardCoder has improper connotations - mainly, "Standard" as in "Standardized" as opposed to "Standard" as in "normal". StructuredCoder communicates the important part of the class, which is that the coder has some meaningful structure, and that structure can be used by a runner. Update Dataflow Worker Version Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2d13bacf Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2d13bacf Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2d13bacf Branch: refs/heads/master Commit: 2d13bacf9880801fb8398bc5f214e6518e62cce8 Parents: d233a24 Author: Thomas Groh <[email protected]> Authored: Wed Apr 26 16:30:57 2017 -0700 Committer: Thomas Groh <[email protected]> Committed: Fri Apr 28 18:43:20 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/core/construction/Coders.java | 12 +- .../runners/core/construction/CodersTest.java | 14 +- ...aultCoderCloudObjectTranslatorRegistrar.java | 1 - .../apache/beam/sdk/coders/ByteArrayCoder.java | 2 +- .../org/apache/beam/sdk/coders/CustomCoder.java | 4 +- .../apache/beam/sdk/coders/DelegateCoder.java | 2 +- .../beam/sdk/coders/IterableLikeCoder.java | 2 +- .../org/apache/beam/sdk/coders/KvCoder.java | 2 +- .../beam/sdk/coders/LengthPrefixCoder.java | 12 +- .../apache/beam/sdk/coders/NullableCoder.java | 12 +- .../beam/sdk/coders/SerializableCoder.java | 2 +- .../apache/beam/sdk/coders/StandardCoder.java | 231 ------------------ .../apache/beam/sdk/coders/StructuredCoder.java | 231 ++++++++++++++++++ .../apache/beam/sdk/coders/VarLongCoder.java | 2 +- .../sdk/transforms/windowing/GlobalWindow.java | 4 +- .../transforms/windowing/IntervalWindow.java | 4 +- .../org/apache/beam/sdk/util/WindowedValue.java | 4 +- .../beam/sdk/coders/LengthPrefixCoderTest.java | 3 +- .../beam/sdk/coders/StandardCoderTest.java | 238 ------------------- .../beam/sdk/coders/StructuredCoderTest.java | 238 +++++++++++++++++++ 20 files changed, 510 insertions(+), 510 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/2d13bacf/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java index 6fe5dc9..8793df4 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java @@ -36,7 +36,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.LengthPrefixCoder; -import org.apache.beam.sdk.coders.StandardCoder; +import org.apache.beam.sdk.coders.StructuredCoder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components; @@ -59,8 +59,8 @@ public class Coders { // The URNs for coders which are shared across languages @VisibleForTesting - static final BiMap<Class<? extends StandardCoder>, String> KNOWN_CODER_URNS = - ImmutableBiMap.<Class<? extends StandardCoder>, String>builder() + static final BiMap<Class<? extends StructuredCoder>, String> KNOWN_CODER_URNS = + ImmutableBiMap.<Class<? extends StructuredCoder>, String>builder() .put(ByteArrayCoder.class, "urn:beam:coders:bytes:0.1") .put(KvCoder.class, "urn:beam:coders:kv:0.1") .put(VarLongCoder.class, "urn:beam:coders:varint:0.1") @@ -82,13 +82,13 @@ public class Coders { private static RunnerApi.Coder toKnownCoder(Coder<?> coder, SdkComponents components) throws IOException { checkArgument( - coder instanceof StandardCoder, + coder instanceof StructuredCoder, "A Known %s must implement %s, but %s of class %s does not", Coder.class.getSimpleName(), - StandardCoder.class.getSimpleName(), + StructuredCoder.class.getSimpleName(), coder, coder.getClass().getName()); - StandardCoder<?> stdCoder = (StandardCoder<?>) coder; + StructuredCoder<?> stdCoder = (StructuredCoder<?>) coder; List<String> componentIds = new ArrayList<>(); for (Coder<?> componentCoder : stdCoder.getComponents()) { componentIds.add(components.registerCoder(componentCoder)); http://git-wip-us.apache.org/repos/asf/beam/blob/2d13bacf/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java index c9d32ee..ca0fdc9 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java @@ -40,8 +40,8 @@ import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.LengthPrefixCoder; import org.apache.beam.sdk.coders.SerializableCoder; -import org.apache.beam.sdk.coders.StandardCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.StructuredCoder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components; @@ -60,8 +60,8 @@ import org.junit.runners.Parameterized.Parameters; /** Tests for {@link Coders}. */ @RunWith(Enclosed.class) public class CodersTest { - private static final Set<StandardCoder<?>> KNOWN_CODERS = - ImmutableSet.<StandardCoder<?>>builder() + private static final Set<StructuredCoder<?>> KNOWN_CODERS = + ImmutableSet.<StructuredCoder<?>>builder() .add(ByteArrayCoder.of()) .add(KvCoder.of(VarLongCoder.of(), VarLongCoder.of())) .add(VarLongCoder.of()) @@ -85,12 +85,12 @@ public class CodersTest { // Validates that every known coder in the Coders class is represented in a "Known Coder" // tests, which demonstrates that they are serialized via components and specified URNs rather // than java serialized - Set<Class<? extends StandardCoder>> knownCoderClasses = Coders.KNOWN_CODER_URNS.keySet(); - Set<Class<? extends StandardCoder>> knownCoderTests = new HashSet<>(); - for (StandardCoder<?> coder : KNOWN_CODERS) { + Set<Class<? extends StructuredCoder>> knownCoderClasses = Coders.KNOWN_CODER_URNS.keySet(); + Set<Class<? extends StructuredCoder>> knownCoderTests = new HashSet<>(); + for (StructuredCoder<?> coder : KNOWN_CODERS) { knownCoderTests.add(coder.getClass()); } - Set<Class<? extends StandardCoder>> missingKnownCoders = new HashSet<>(knownCoderClasses); + Set<Class<? extends StructuredCoder>> missingKnownCoders = new HashSet<>(knownCoderClasses); missingKnownCoders.removeAll(knownCoderTests); checkState( missingKnownCoders.isEmpty(), http://git-wip-us.apache.org/repos/asf/beam/blob/2d13bacf/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java index 72fd9ce..3d7b534 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java @@ -30,7 +30,6 @@ import org.apache.beam.sdk.coders.Coder; @AutoService(CoderCloudObjectTranslatorRegistrar.class) public class DefaultCoderCloudObjectTranslatorRegistrar implements CoderCloudObjectTranslatorRegistrar { - @Override public Map<String, CloudObjectTranslator<? extends Coder>> classNamesToTranslators() { // TODO: Add translators return Collections.emptyMap(); http://git-wip-us.apache.org/repos/asf/beam/blob/2d13bacf/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java index dd34f28..cba8d49 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java @@ -40,7 +40,7 @@ import org.apache.beam.sdk.values.TypeDescriptor; * encoded via a {@link VarIntCoder}.</li> * </ul> */ -public class ByteArrayCoder extends StandardCoder<byte[]> { +public class ByteArrayCoder extends StructuredCoder<byte[]> { @JsonCreator public static ByteArrayCoder of() { http://git-wip-us.apache.org/repos/asf/beam/blob/2d13bacf/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java index 55ec2aa..1627f8a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java @@ -40,7 +40,7 @@ import org.apache.beam.sdk.util.StringUtils; * * @param <T> the type of elements handled by this coder */ -public abstract class CustomCoder<T> extends StandardCoder<T> +public abstract class CustomCoder<T> extends StructuredCoder<T> implements Serializable { @JsonCreator @@ -116,5 +116,5 @@ public abstract class CustomCoder<T> extends StandardCoder<T> // This coder inherits isRegisterByteSizeObserverCheap, // getEncodedElementByteSize and registerByteSizeObserver - // from StandardCoder. Override if we can do better. + // from StructuredCoder. Override if we can do better. } http://git-wip-us.apache.org/repos/asf/beam/blob/2d13bacf/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java index de9659b..86077eb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java @@ -170,7 +170,7 @@ public final class DelegateCoder<T, IntermediateT> extends CustomCoder<T> { private final CodingFunction<IntermediateT, T> fromFn; // null unless the user explicitly provides a TypeDescriptor. - // If null, then the machinery from the superclass (StandardCoder) will be used + // If null, then the machinery from the superclass (StructuredCoder) will be used // to try to deduce a good type descriptor. @Nullable private final TypeDescriptor<T> typeDescriptor; http://git-wip-us.apache.org/repos/asf/beam/blob/2d13bacf/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java index 61402ac..8e10ca2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java @@ -58,7 +58,7 @@ import org.apache.beam.sdk.util.common.ElementByteSizeObserver; * @param <IterableT> the type of the Iterables being transcoded */ public abstract class IterableLikeCoder<T, IterableT extends Iterable<T>> - extends StandardCoder<IterableT> { + extends StructuredCoder<IterableT> { public Coder<T> getElemCoder() { return elementCoder; } http://git-wip-us.apache.org/repos/asf/beam/blob/2d13bacf/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java index fcb906c..3d813b6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java @@ -40,7 +40,7 @@ import org.apache.beam.sdk.values.TypeParameter; * @param <K> the type of the keys of the KVs being transcoded * @param <V> the type of the values of the KVs being transcoded */ -public class KvCoder<K, V> extends StandardCoder<KV<K, V>> { +public class KvCoder<K, V> extends StructuredCoder<KV<K, V>> { public static <K, V> KvCoder<K, V> of(Coder<K> keyCoder, Coder<V> valueCoder) { return new KvCoder<>(keyCoder, valueCoder); http://git-wip-us.apache.org/repos/asf/beam/blob/2d13bacf/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java index d123a38..0972b1e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java @@ -41,7 +41,7 @@ import org.apache.beam.sdk.util.VarInt; * * @param <T> the type of the values being transcoded */ -public class LengthPrefixCoder<T> extends StandardCoder<T> { +public class LengthPrefixCoder<T> extends StructuredCoder<T> { public static <T> LengthPrefixCoder<T> of( Coder<T> valueCoder) { @@ -112,7 +112,7 @@ public class LengthPrefixCoder<T> extends StandardCoder<T> { } /** - * Overridden to short-circuit the default {@code StandardCoder} behavior of encoding and + * Overridden to short-circuit the default {@code StructuredCoder} behavior of encoding and * counting the bytes. The size is known to be the size of the value plus the number of bytes * required to prefix the length. * @@ -120,15 +120,15 @@ public class LengthPrefixCoder<T> extends StandardCoder<T> { */ @Override protected long getEncodedElementByteSize(T value, Context context) throws Exception { - if (valueCoder instanceof StandardCoder) { - // If valueCoder is a StandardCoder then we can ask it directly for the encoded size of + if (valueCoder instanceof StructuredCoder) { + // If valueCoder is a StructuredCoder then we can ask it directly for the encoded size of // the value, adding the number of bytes to represent the length. - long valueSize = ((StandardCoder<T>) valueCoder).getEncodedElementByteSize( + long valueSize = ((StructuredCoder<T>) valueCoder).getEncodedElementByteSize( value, Context.OUTER); return VarInt.getLength(valueSize) + valueSize; } - // If value is not a StandardCoder then fall back to the default StandardCoder behavior + // If value is not a StructuredCoder then fall back to the default StructuredCoder behavior // of encoding and counting the bytes. The encoding will include the length prefix. return super.getEncodedElementByteSize(value, context); } http://git-wip-us.apache.org/repos/asf/beam/blob/2d13bacf/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java index 1fd9a99..747d91c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java @@ -119,7 +119,7 @@ public class NullableCoder<T> extends CustomCoder<T> { } /** - * Overridden to short-circuit the default {@code StandardCoder} behavior of encoding and + * Overridden to short-circuit the default {@code StructuredCoder} behavior of encoding and * counting the bytes. The size is known (1 byte) when {@code value} is {@code null}, otherwise * the size is 1 byte plus the size of nested {@code Coder}'s encoding of {@code value}. * @@ -135,7 +135,7 @@ public class NullableCoder<T> extends CustomCoder<T> { } /** - * Overridden to short-circuit the default {@code StandardCoder} behavior of encoding and + * Overridden to short-circuit the default {@code StructuredCoder} behavior of encoding and * counting the bytes. The size is known (1 byte) when {@code value} is {@code null}, otherwise * the size is 1 byte plus the size of nested {@code Coder}'s encoding of {@code value}. * @@ -147,14 +147,14 @@ public class NullableCoder<T> extends CustomCoder<T> { return 1; } - if (valueCoder instanceof StandardCoder) { - // If valueCoder is a StandardCoder then we can ask it directly for the encoded size of + if (valueCoder instanceof StructuredCoder) { + // If valueCoder is a StructuredCoder then we can ask it directly for the encoded size of // the value, adding 1 byte to count the null indicator. - return 1 + ((StandardCoder<T>) valueCoder) + return 1 + ((StructuredCoder<T>) valueCoder) .getEncodedElementByteSize(value, context); } - // If value is not a StandardCoder then fall back to the default StandardCoder behavior + // If value is not a StructuredCoder then fall back to the default StructuredCoder behavior // of encoding and counting the bytes. The encoding will include the null indicator byte. return super.getEncodedElementByteSize(value, context); } http://git-wip-us.apache.org/repos/asf/beam/blob/2d13bacf/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java index 1a737ab..b52b9db 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java @@ -152,6 +152,6 @@ public class SerializableCoder<T extends Serializable> extends CustomCoder<T> { // This coder inherits isRegisterByteSizeObserverCheap, // getEncodedElementByteSize and registerByteSizeObserver - // from StandardCoder. Looks like we cannot do much better + // from StructuredCoder. Looks like we cannot do much better // in this case. } http://git-wip-us.apache.org/repos/asf/beam/blob/2d13bacf/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java deleted file mode 100644 index f8d82a5..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java +++ /dev/null @@ -1,231 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.coders; - -import static org.apache.beam.sdk.util.Structs.addList; - -import com.google.common.io.ByteStreams; -import com.google.common.io.CountingOutputStream; -import java.io.ByteArrayOutputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import org.apache.beam.sdk.util.CloudObject; -import org.apache.beam.sdk.util.PropertyNames; -import org.apache.beam.sdk.util.common.ElementByteSizeObserver; -import org.apache.beam.sdk.values.TypeDescriptor; - -/** - * An abstract base class to implement a {@link Coder} that defines equality, hashing, and printing - * via the class name and recursively using {@link #getComponents}. - * - * <p>To extend {@link StandardCoder}, override the following methods as appropriate: - * - * <ul> - * <li>{@link #getComponents}: the default implementation returns {@link #getCoderArguments}.</li> - * <li>{@link #getEncodedElementByteSize} and - * {@link #isRegisterByteSizeObserverCheap}: the - * default implementation encodes values to bytes and counts the bytes, which is considered - * expensive.</li> - * </ul> - */ -public abstract class StandardCoder<T> implements Coder<T> { - protected StandardCoder() {} - - /** - * Returns the list of {@link Coder Coders} that are components of this {@link Coder}. - */ - public List<? extends Coder<?>> getComponents() { - List<? extends Coder<?>> coderArguments = getCoderArguments(); - if (coderArguments == null) { - return Collections.emptyList(); - } else { - return coderArguments; - } - } - - /** - * {@inheritDoc} - * - * @return {@code true} if the two {@link StandardCoder} instances have the - * same class and equal components. - */ - @Override - public boolean equals(Object o) { - if (o == null || this.getClass() != o.getClass()) { - return false; - } - StandardCoder<?> that = (StandardCoder<?>) o; - return this.getComponents().equals(that.getComponents()); - } - - @Override - public int hashCode() { - return getClass().hashCode() * 31 + getComponents().hashCode(); - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - String s = getClass().getName(); - builder.append(s.substring(s.lastIndexOf('.') + 1)); - - List<? extends Coder<?>> componentCoders = getComponents(); - if (!componentCoders.isEmpty()) { - builder.append('('); - boolean first = true; - for (Coder<?> componentCoder : componentCoders) { - if (first) { - first = false; - } else { - builder.append(','); - } - builder.append(componentCoder.toString()); - } - builder.append(')'); - } - return builder.toString(); - } - - /** - * Adds the following properties to the {@link CloudObject} representation: - * <ul> - * <li>component_encodings: A list of coders represented as {@link CloudObject}s - * equivalent to the {@link #getCoderArguments}.</li> - * </ul> - * - * <p>{@link StandardCoder} implementations should override {@link #initializeCloudObject} - * to customize the {@link CloudObject} representation. - */ - @Override - public final CloudObject asCloudObject() { - CloudObject result = initializeCloudObject(); - - List<? extends Coder<?>> components = getComponents(); - if (!components.isEmpty()) { - List<CloudObject> cloudComponents = new ArrayList<>(components.size()); - for (Coder<?> coder : components) { - cloudComponents.add(coder.asCloudObject()); - } - addList(result, PropertyNames.COMPONENT_ENCODINGS, cloudComponents); - } - - return result; - } - - /** - * Subclasses should override this method to customize the {@link CloudObject} - * representation. {@link StandardCoder#asCloudObject} delegates to this method - * to provide an initial {@link CloudObject}. - * - * <p>The default implementation returns a {@link CloudObject} using - * {@link Object#getClass} for the type. - */ - protected CloudObject initializeCloudObject() { - return CloudObject.forClass(getClass()); - } - - /** - * {@inheritDoc} - * - * @return {@code false} unless it is overridden. {@link StandardCoder#registerByteSizeObserver} - * invokes {@link #getEncodedElementByteSize} which requires re-encoding an element - * unless it is overridden. This is considered expensive. - */ - @Override - public boolean isRegisterByteSizeObserverCheap(T value, Context context) { - return false; - } - - /** - * Returns the size in bytes of the encoded value using this coder. - */ - protected long getEncodedElementByteSize(T value, Context context) - throws Exception { - try (CountingOutputStream os = new CountingOutputStream(ByteStreams.nullOutputStream())) { - encode(value, os, context); - return os.getCount(); - } catch (Exception exn) { - throw new IllegalArgumentException( - "Unable to encode element '" + value + "' with coder '" + this + "'.", exn); - } - } - - /** - * {@inheritDoc} - * - * <p>For {@link StandardCoder} subclasses, this notifies {@code observer} about the byte size - * of the encoded value using this coder as returned by {@link #getEncodedElementByteSize}. - */ - @Override - public void registerByteSizeObserver( - T value, ElementByteSizeObserver observer, Context context) - throws Exception { - observer.update(getEncodedElementByteSize(value, context)); - } - - protected void verifyDeterministic(String message, Iterable<Coder<?>> coders) - throws NonDeterministicException { - for (Coder<?> coder : coders) { - try { - coder.verifyDeterministic(); - } catch (NonDeterministicException e) { - throw new NonDeterministicException(this, message, e); - } - } - } - - protected void verifyDeterministic(String message, Coder<?>... coders) - throws NonDeterministicException { - verifyDeterministic(message, Arrays.asList(coders)); - } - - /** - * {@inheritDoc} - * - * @return {@code false} for {@link StandardCoder} unless overridden. - */ - @Override - public boolean consistentWithEquals() { - return false; - } - - @Override - public Object structuralValue(T value) { - if (value != null && consistentWithEquals()) { - return value; - } else { - try { - ByteArrayOutputStream os = new ByteArrayOutputStream(); - encode(value, os, Context.OUTER); - return new StructuralByteArray(os.toByteArray()); - } catch (Exception exn) { - throw new IllegalArgumentException( - "Unable to encode element '" + value + "' with coder '" + this + "'.", exn); - } - } - } - - @SuppressWarnings("unchecked") - @Override - public TypeDescriptor<T> getEncodedTypeDescriptor() { - return (TypeDescriptor<T>) - TypeDescriptor.of(getClass()).resolveType(new TypeDescriptor<T>() {}.getType()); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/2d13bacf/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java new file mode 100644 index 0000000..bce382c --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.coders; + +import static org.apache.beam.sdk.util.Structs.addList; + +import com.google.common.io.ByteStreams; +import com.google.common.io.CountingOutputStream; +import java.io.ByteArrayOutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.util.CloudObject; +import org.apache.beam.sdk.util.PropertyNames; +import org.apache.beam.sdk.util.common.ElementByteSizeObserver; +import org.apache.beam.sdk.values.TypeDescriptor; + +/** + * An abstract base class to implement a {@link Coder} that defines equality, hashing, and printing + * via the class name and recursively using {@link #getComponents}. + * + * <p>To extend {@link StructuredCoder}, override the following methods as appropriate: + * + * <ul> + * <li>{@link #getComponents}: the default implementation returns {@link #getCoderArguments}.</li> + * <li>{@link #getEncodedElementByteSize} and + * {@link #isRegisterByteSizeObserverCheap}: the + * default implementation encodes values to bytes and counts the bytes, which is considered + * expensive.</li> + * </ul> + */ +public abstract class StructuredCoder<T> implements Coder<T> { + protected StructuredCoder() {} + + /** + * Returns the list of {@link Coder Coders} that are components of this {@link Coder}. + */ + public List<? extends Coder<?>> getComponents() { + List<? extends Coder<?>> coderArguments = getCoderArguments(); + if (coderArguments == null) { + return Collections.emptyList(); + } else { + return coderArguments; + } + } + + /** + * {@inheritDoc} + * + * @return {@code true} if the two {@link StructuredCoder} instances have the + * same class and equal components. + */ + @Override + public boolean equals(Object o) { + if (o == null || this.getClass() != o.getClass()) { + return false; + } + StructuredCoder<?> that = (StructuredCoder<?>) o; + return this.getComponents().equals(that.getComponents()); + } + + @Override + public int hashCode() { + return getClass().hashCode() * 31 + getComponents().hashCode(); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + String s = getClass().getName(); + builder.append(s.substring(s.lastIndexOf('.') + 1)); + + List<? extends Coder<?>> componentCoders = getComponents(); + if (!componentCoders.isEmpty()) { + builder.append('('); + boolean first = true; + for (Coder<?> componentCoder : componentCoders) { + if (first) { + first = false; + } else { + builder.append(','); + } + builder.append(componentCoder.toString()); + } + builder.append(')'); + } + return builder.toString(); + } + + /** + * Adds the following properties to the {@link CloudObject} representation: + * <ul> + * <li>component_encodings: A list of coders represented as {@link CloudObject}s + * equivalent to the {@link #getCoderArguments}.</li> + * </ul> + * + * <p>{@link StructuredCoder} implementations should override {@link #initializeCloudObject} + * to customize the {@link CloudObject} representation. + */ + @Override + public final CloudObject asCloudObject() { + CloudObject result = initializeCloudObject(); + + List<? extends Coder<?>> components = getComponents(); + if (!components.isEmpty()) { + List<CloudObject> cloudComponents = new ArrayList<>(components.size()); + for (Coder<?> coder : components) { + cloudComponents.add(coder.asCloudObject()); + } + addList(result, PropertyNames.COMPONENT_ENCODINGS, cloudComponents); + } + + return result; + } + + /** + * Subclasses should override this method to customize the {@link CloudObject} + * representation. {@link StructuredCoder#asCloudObject} delegates to this method + * to provide an initial {@link CloudObject}. + * + * <p>The default implementation returns a {@link CloudObject} using + * {@link Object#getClass} for the type. + */ + protected CloudObject initializeCloudObject() { + return CloudObject.forClass(getClass()); + } + + /** + * {@inheritDoc} + * + * @return {@code false} unless it is overridden. {@link StructuredCoder#registerByteSizeObserver} + * invokes {@link #getEncodedElementByteSize} which requires re-encoding an element + * unless it is overridden. This is considered expensive. + */ + @Override + public boolean isRegisterByteSizeObserverCheap(T value, Context context) { + return false; + } + + /** + * Returns the size in bytes of the encoded value using this coder. + */ + protected long getEncodedElementByteSize(T value, Context context) + throws Exception { + try (CountingOutputStream os = new CountingOutputStream(ByteStreams.nullOutputStream())) { + encode(value, os, context); + return os.getCount(); + } catch (Exception exn) { + throw new IllegalArgumentException( + "Unable to encode element '" + value + "' with coder '" + this + "'.", exn); + } + } + + /** + * {@inheritDoc} + * + * <p>For {@link StructuredCoder} subclasses, this notifies {@code observer} about the byte size + * of the encoded value using this coder as returned by {@link #getEncodedElementByteSize}. + */ + @Override + public void registerByteSizeObserver( + T value, ElementByteSizeObserver observer, Context context) + throws Exception { + observer.update(getEncodedElementByteSize(value, context)); + } + + protected void verifyDeterministic(String message, Iterable<Coder<?>> coders) + throws NonDeterministicException { + for (Coder<?> coder : coders) { + try { + coder.verifyDeterministic(); + } catch (NonDeterministicException e) { + throw new NonDeterministicException(this, message, e); + } + } + } + + protected void verifyDeterministic(String message, Coder<?>... coders) + throws NonDeterministicException { + verifyDeterministic(message, Arrays.asList(coders)); + } + + /** + * {@inheritDoc} + * + * @return {@code false} for {@link StructuredCoder} unless overridden. + */ + @Override + public boolean consistentWithEquals() { + return false; + } + + @Override + public Object structuralValue(T value) { + if (value != null && consistentWithEquals()) { + return value; + } else { + try { + ByteArrayOutputStream os = new ByteArrayOutputStream(); + encode(value, os, Context.OUTER); + return new StructuralByteArray(os.toByteArray()); + } catch (Exception exn) { + throw new IllegalArgumentException( + "Unable to encode element '" + value + "' with coder '" + this + "'.", exn); + } + } + } + + @SuppressWarnings("unchecked") + @Override + public TypeDescriptor<T> getEncodedTypeDescriptor() { + return (TypeDescriptor<T>) + TypeDescriptor.of(getClass()).resolveType(new TypeDescriptor<T>() {}.getType()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/2d13bacf/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java index 16474ba..7fc094f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java @@ -32,7 +32,7 @@ import org.apache.beam.sdk.values.TypeDescriptor; * numbers always take 10 bytes, so {@link BigEndianLongCoder} may be preferable for * longs that are known to often be large or negative. */ -public class VarLongCoder extends StandardCoder<Long> { +public class VarLongCoder extends StructuredCoder<Long> { public static VarLongCoder of() { return INSTANCE; } http://git-wip-us.apache.org/repos/asf/beam/blob/2d13bacf/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java index ffc8011..79c9352 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java @@ -21,7 +21,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.util.Collections; import java.util.List; -import org.apache.beam.sdk.coders.StandardCoder; +import org.apache.beam.sdk.coders.StructuredCoder; import org.apache.beam.sdk.util.CloudObject; import org.joda.time.Duration; import org.joda.time.Instant; @@ -64,7 +64,7 @@ public class GlobalWindow extends BoundedWindow { /** * {@link Coder} for encoding and decoding {@code GlobalWindow}s. */ - public static class Coder extends StandardCoder<GlobalWindow> { + public static class Coder extends StructuredCoder<GlobalWindow> { public static final Coder INSTANCE = new Coder(); @Override http://git-wip-us.apache.org/repos/asf/beam/blob/2d13bacf/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java index aaa2e83..55bf585 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java @@ -26,7 +26,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.DurationCoder; import org.apache.beam.sdk.coders.InstantCoder; -import org.apache.beam.sdk.coders.StandardCoder; +import org.apache.beam.sdk.coders.StructuredCoder; import org.apache.beam.sdk.util.CloudObject; import org.joda.time.Duration; import org.joda.time.Instant; @@ -168,7 +168,7 @@ public class IntervalWindow extends BoundedWindow /** * Encodes an {@link IntervalWindow} as a pair of its upper bound and duration. */ - public static class IntervalWindowCoder extends StandardCoder<IntervalWindow> { + public static class IntervalWindowCoder extends StructuredCoder<IntervalWindow> { private static final IntervalWindowCoder INSTANCE = new IntervalWindowCoder(); http://git-wip-us.apache.org/repos/asf/beam/blob/2d13bacf/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java index 6b75951..fc9a404 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java @@ -39,7 +39,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.CollectionCoder; import org.apache.beam.sdk.coders.InstantCoder; -import org.apache.beam.sdk.coders.StandardCoder; +import org.apache.beam.sdk.coders.StructuredCoder; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; @@ -576,7 +576,7 @@ public abstract class WindowedValue<T> { * Abstract class for {@code WindowedValue} coder. */ public abstract static class WindowedValueCoder<T> - extends StandardCoder<WindowedValue<T>> { + extends StructuredCoder<WindowedValue<T>> { final Coder<T> valueCoder; WindowedValueCoder(Coder<T> valueCoder) { http://git-wip-us.apache.org/repos/asf/beam/blob/2d13bacf/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java index 27ac48a..fa81a7c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java @@ -33,7 +33,8 @@ import org.junit.runners.JUnit4; /** Tests for {@link LengthPrefixCoder}. */ @RunWith(JUnit4.class) public class LengthPrefixCoderTest { - private static final StandardCoder<byte[]> TEST_CODER = LengthPrefixCoder.of(ByteArrayCoder.of()); + private static final StructuredCoder<byte[]> TEST_CODER = + LengthPrefixCoder.of(ByteArrayCoder.of()); private static final List<byte[]> TEST_VALUES = Arrays.asList( new byte[]{ 0xa, 0xb, 0xc }, http://git-wip-us.apache.org/repos/asf/beam/blob/2d13bacf/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/StandardCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/StandardCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/StandardCoderTest.java deleted file mode 100644 index a948f78..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/StandardCoderTest.java +++ /dev/null @@ -1,238 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.coders; - -import com.google.common.collect.ImmutableList; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import javax.annotation.Nullable; -import org.apache.beam.sdk.testing.CoderProperties; -import org.apache.beam.sdk.values.TypeDescriptor; -import org.hamcrest.CoreMatchers; -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Test case for {@link StandardCoder}. - */ -@RunWith(JUnit4.class) -public class StandardCoderTest { - - /** - * A coder for nullable {@code Boolean} values that is consistent with equals. - */ - private static class NullBooleanCoder extends StandardCoder<Boolean> { - - private static final long serialVersionUID = 0L; - - @Override - public void encode(@Nullable Boolean value, OutputStream outStream, Context context) - throws CoderException, IOException { - if (value == null) { - outStream.write(2); - } else if (value) { - outStream.write(1); - } else { - outStream.write(0); - } - } - - @Override - @Nullable - public Boolean decode( - InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context) - throws CoderException, IOException { - int value = inStream.read(); - if (value == 0) { - return false; - } else if (value == 1) { - return true; - } else if (value == 2) { - return null; - } - throw new CoderException("Invalid value for nullable Boolean: " + value); - } - - @Override - public List<? extends Coder<?>> getCoderArguments() { - return Collections.emptyList(); - } - - @Override - public void verifyDeterministic() throws NonDeterministicException { } - - @Override - public boolean consistentWithEquals() { - return true; - } - } - - /** - * A boxed {@code int} with {@code equals()} that compares object identity. - */ - private static class ObjectIdentityBoolean { - private final boolean value; - public ObjectIdentityBoolean(boolean value) { - this.value = value; - } - public boolean getValue() { - return value; - } - } - - /** - * A coder for nullable boxed {@code Boolean} values that is not consistent with equals. - */ - private static class ObjectIdentityBooleanCoder extends StandardCoder<ObjectIdentityBoolean> { - - private static final long serialVersionUID = 0L; - - @Override - public void encode( - @Nullable ObjectIdentityBoolean value, OutputStream outStream, Context context) - throws CoderException, IOException { - if (value == null) { - outStream.write(2); - } else if (value.getValue()){ - outStream.write(1); - } else { - outStream.write(0); - } - } - - @Override - @Nullable - public ObjectIdentityBoolean decode( - InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context) - throws CoderException, IOException { - int value = inStream.read(); - if (value == 0) { - return new ObjectIdentityBoolean(false); - } else if (value == 1) { - return new ObjectIdentityBoolean(true); - } else if (value == 2) { - return null; - } - throw new CoderException("Invalid value for nullable Boolean: " + value); - } - - @Override - public List<? extends Coder<?>> getCoderArguments() { - return Collections.emptyList(); - } - - @Override - public void verifyDeterministic() throws NonDeterministicException { } - - @Override - public boolean consistentWithEquals() { - return false; - } - } - - /** - * Tests that {@link StandardCoder#structuralValue()} is correct whenever a subclass has a correct - * {@link Coder#consistentWithEquals()}. - */ - @Test - public void testStructuralValue() throws Exception { - List<Boolean> testBooleans = Arrays.asList(null, true, false); - List<ObjectIdentityBoolean> testInconsistentBooleans = - Arrays.asList(null, new ObjectIdentityBoolean(true), new ObjectIdentityBoolean(false)); - - Coder<Boolean> consistentCoder = new NullBooleanCoder(); - for (Boolean value1 : testBooleans) { - for (Boolean value2 : testBooleans) { - CoderProperties.structuralValueConsistentWithEquals(consistentCoder, value1, value2); - } - } - - Coder<ObjectIdentityBoolean> inconsistentCoder = new ObjectIdentityBooleanCoder(); - for (ObjectIdentityBoolean value1 : testInconsistentBooleans) { - for (ObjectIdentityBoolean value2 : testInconsistentBooleans) { - CoderProperties.structuralValueConsistentWithEquals(inconsistentCoder, value1, value2); - } - } - } - - /** - * Test for verifying {@link StandardCoder#toString()}. - */ - @Test - public void testToString() { - Assert.assertThat(new ObjectIdentityBooleanCoder().toString(), - CoreMatchers.equalTo("StandardCoderTest$ObjectIdentityBooleanCoder")); - - ObjectIdentityBooleanCoder coderWithArgs = new ObjectIdentityBooleanCoder() { - @Override - public List<? extends Coder<?>> getCoderArguments() { - return ImmutableList.<Coder<?>>builder() - .add(BigDecimalCoder.of(), BigIntegerCoder.of()) - .build(); - } - }; - - Assert.assertThat(coderWithArgs.toString(), - CoreMatchers.equalTo("StandardCoderTest$1(BigDecimalCoder,BigIntegerCoder)")); - } - - @Test - public void testGenericStandardCoderFallsBackToT() throws Exception { - Assert.assertThat( - new Foo<String>().getEncodedTypeDescriptor().getType(), - CoreMatchers.not(TypeDescriptor.of(String.class).getType())); - } - - @Test - public void testGenericStandardCoder() throws Exception { - Assert.assertThat(new FooTwo().getEncodedTypeDescriptor(), - CoreMatchers.equalTo(TypeDescriptor.of(String.class))); - } - - private static class Foo<T> extends StandardCoder<T> { - - @Override - public void encode(T value, OutputStream outStream, Coder.Context context) - throws CoderException, IOException { - throw new UnsupportedOperationException(); - } - - @Override - public T decode(InputStream inStream, Coder.Context context) - throws CoderException, IOException { - throw new UnsupportedOperationException(); - } - - @Override - public List<? extends Coder<?>> getCoderArguments() { - throw new UnsupportedOperationException(); - } - - @Override - public void verifyDeterministic() throws Coder.NonDeterministicException {} - } - - private static class FooTwo extends Foo<String> { - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/2d13bacf/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/StructuredCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/StructuredCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/StructuredCoderTest.java new file mode 100644 index 0000000..af2c94e --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/StructuredCoderTest.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.coders; + +import com.google.common.collect.ImmutableList; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.beam.sdk.testing.CoderProperties; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.hamcrest.CoreMatchers; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Test case for {@link StructuredCoder}. + */ +@RunWith(JUnit4.class) +public class StructuredCoderTest { + + /** + * A coder for nullable {@code Boolean} values that is consistent with equals. + */ + private static class NullBooleanCoder extends StructuredCoder<Boolean> { + + private static final long serialVersionUID = 0L; + + @Override + public void encode(@Nullable Boolean value, OutputStream outStream, Context context) + throws CoderException, IOException { + if (value == null) { + outStream.write(2); + } else if (value) { + outStream.write(1); + } else { + outStream.write(0); + } + } + + @Override + @Nullable + public Boolean decode( + InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context) + throws CoderException, IOException { + int value = inStream.read(); + if (value == 0) { + return false; + } else if (value == 1) { + return true; + } else if (value == 2) { + return null; + } + throw new CoderException("Invalid value for nullable Boolean: " + value); + } + + @Override + public List<? extends Coder<?>> getCoderArguments() { + return Collections.emptyList(); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { } + + @Override + public boolean consistentWithEquals() { + return true; + } + } + + /** + * A boxed {@code int} with {@code equals()} that compares object identity. + */ + private static class ObjectIdentityBoolean { + private final boolean value; + public ObjectIdentityBoolean(boolean value) { + this.value = value; + } + public boolean getValue() { + return value; + } + } + + /** + * A coder for nullable boxed {@code Boolean} values that is not consistent with equals. + */ + private static class ObjectIdentityBooleanCoder extends StructuredCoder<ObjectIdentityBoolean> { + + private static final long serialVersionUID = 0L; + + @Override + public void encode( + @Nullable ObjectIdentityBoolean value, OutputStream outStream, Context context) + throws CoderException, IOException { + if (value == null) { + outStream.write(2); + } else if (value.getValue()){ + outStream.write(1); + } else { + outStream.write(0); + } + } + + @Override + @Nullable + public ObjectIdentityBoolean decode( + InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context) + throws CoderException, IOException { + int value = inStream.read(); + if (value == 0) { + return new ObjectIdentityBoolean(false); + } else if (value == 1) { + return new ObjectIdentityBoolean(true); + } else if (value == 2) { + return null; + } + throw new CoderException("Invalid value for nullable Boolean: " + value); + } + + @Override + public List<? extends Coder<?>> getCoderArguments() { + return Collections.emptyList(); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { } + + @Override + public boolean consistentWithEquals() { + return false; + } + } + + /** + * Tests that {@link StructuredCoder#structuralValue()} is correct whenever a subclass has a + * correct {@link Coder#consistentWithEquals()}. + */ + @Test + public void testStructuralValue() throws Exception { + List<Boolean> testBooleans = Arrays.asList(null, true, false); + List<ObjectIdentityBoolean> testInconsistentBooleans = + Arrays.asList(null, new ObjectIdentityBoolean(true), new ObjectIdentityBoolean(false)); + + Coder<Boolean> consistentCoder = new NullBooleanCoder(); + for (Boolean value1 : testBooleans) { + for (Boolean value2 : testBooleans) { + CoderProperties.structuralValueConsistentWithEquals(consistentCoder, value1, value2); + } + } + + Coder<ObjectIdentityBoolean> inconsistentCoder = new ObjectIdentityBooleanCoder(); + for (ObjectIdentityBoolean value1 : testInconsistentBooleans) { + for (ObjectIdentityBoolean value2 : testInconsistentBooleans) { + CoderProperties.structuralValueConsistentWithEquals(inconsistentCoder, value1, value2); + } + } + } + + /** + * Test for verifying {@link StructuredCoder#toString()}. + */ + @Test + public void testToString() { + Assert.assertThat(new ObjectIdentityBooleanCoder().toString(), + CoreMatchers.equalTo("StructuredCoderTest$ObjectIdentityBooleanCoder")); + + ObjectIdentityBooleanCoder coderWithArgs = new ObjectIdentityBooleanCoder() { + @Override + public List<? extends Coder<?>> getCoderArguments() { + return ImmutableList.<Coder<?>>builder() + .add(BigDecimalCoder.of(), BigIntegerCoder.of()) + .build(); + } + }; + + Assert.assertThat(coderWithArgs.toString(), + CoreMatchers.equalTo("StructuredCoderTest$1(BigDecimalCoder,BigIntegerCoder)")); + } + + @Test + public void testGenericStandardCoderFallsBackToT() throws Exception { + Assert.assertThat( + new Foo<String>().getEncodedTypeDescriptor().getType(), + CoreMatchers.not(TypeDescriptor.of(String.class).getType())); + } + + @Test + public void testGenericStandardCoder() throws Exception { + Assert.assertThat(new FooTwo().getEncodedTypeDescriptor(), + CoreMatchers.equalTo(TypeDescriptor.of(String.class))); + } + + private static class Foo<T> extends StructuredCoder<T> { + + @Override + public void encode(T value, OutputStream outStream, Coder.Context context) + throws CoderException, IOException { + throw new UnsupportedOperationException(); + } + + @Override + public T decode(InputStream inStream, Coder.Context context) + throws CoderException, IOException { + throw new UnsupportedOperationException(); + } + + @Override + public List<? extends Coder<?>> getCoderArguments() { + throw new UnsupportedOperationException(); + } + + @Override + public void verifyDeterministic() throws Coder.NonDeterministicException {} + } + + private static class FooTwo extends Foo<String> { + } +}
