Repository: beam Updated Branches: refs/heads/master f23dd6709 -> cd2bcaf67
Make WindowedValueCoder an Interface Implement it in both FullWindowedValueCoder and ValueOnlyWindowedValueCoder Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/69126956 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/69126956 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/69126956 Branch: refs/heads/master Commit: 691269565b537370c133598c2c32bd3f87eb8c29 Parents: f23dd67 Author: Thomas Groh <[email protected]> Authored: Tue Apr 25 15:55:47 2017 -0700 Committer: Luke Cwik <[email protected]> Committed: Wed Apr 26 14:03:37 2017 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/util/WindowedValue.java | 74 +++++++++----------- 1 file changed, 32 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/69126956/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java index 6b75951..5d692f2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java @@ -38,6 +38,7 @@ import java.util.Set; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.CollectionCoder; +import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.InstantCoder; import org.apache.beam.sdk.coders.StandardCoder; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -572,35 +573,24 @@ public abstract class WindowedValue<T> { return ValueOnlyWindowedValueCoder.of(valueCoder); } - /** - * Abstract class for {@code WindowedValue} coder. - */ - public abstract static class WindowedValueCoder<T> - extends StandardCoder<WindowedValue<T>> { - final Coder<T> valueCoder; - - WindowedValueCoder(Coder<T> valueCoder) { - this.valueCoder = checkNotNull(valueCoder); - } - + /** Abstract class for {@code WindowedValue} coder. */ + public interface WindowedValueCoder<T> extends Coder<WindowedValue<T>> { /** - * Returns the value coder. + * Returns the coder used to encode the values within this {@link WindowedValueCoder}. */ - public Coder<T> getValueCoder() { - return valueCoder; - } + Coder<T> getValueCoder(); /** - * Returns a new {@code WindowedValueCoder} that is a copy of this one, - * but with a different value coder. + * Returns a new {@code WindowedValueCoder} that is a copy of this one, but with a different + * value coder. */ - public abstract <NewT> WindowedValueCoder<NewT> withValueCoder(Coder<NewT> valueCoder); + <NewT> WindowedValueCoder<NewT> withValueCoder(Coder<NewT> valueCoder); } - /** - * Coder for {@code WindowedValue}. - */ - public static class FullWindowedValueCoder<T> extends WindowedValueCoder<T> { + /** Coder for {@code WindowedValue}. */ + public static class FullWindowedValueCoder<T> extends StandardCoder<WindowedValue<T>> + implements WindowedValueCoder<T> { + private final Coder<T> valueCoder; private final Coder<? extends BoundedWindow> windowCoder; // Precompute and cache the coder for a list of windows. private final Coder<Collection<? extends BoundedWindow>> windowsCoder; @@ -624,7 +614,7 @@ public abstract class WindowedValue<T> { FullWindowedValueCoder(Coder<T> valueCoder, Coder<? extends BoundedWindow> windowCoder) { - super(valueCoder); + this.valueCoder = checkNotNull(valueCoder); this.windowCoder = checkNotNull(windowCoder); // It's not possible to statically type-check correct use of the // windowCoder (we have to ensure externally that we only get @@ -645,6 +635,14 @@ public abstract class WindowedValue<T> { return windowsCoder; } + /** + * Returns the value coder. + */ + @Override + public Coder<T> getValueCoder() { + return valueCoder; + } + @Override public <NewT> WindowedValueCoder<NewT> withValueCoder(Coder<NewT> valueCoder) { return new FullWindowedValueCoder<>(valueCoder, windowCoder); @@ -717,24 +715,23 @@ public abstract class WindowedValue<T> { * Coder for {@code WindowedValue}. * * <p>A {@code ValueOnlyWindowedValueCoder} only encodes and decodes the value. It drops - * timestamp and windows for encoding, and uses defaults timestamp, and windows for decoding. + * timestamps and windows when encoding, and uses a default timestamp and window when decoding. */ - public static class ValueOnlyWindowedValueCoder<T> extends WindowedValueCoder<T> { - public static <T> ValueOnlyWindowedValueCoder<T> of( - Coder<T> valueCoder) { + public static class ValueOnlyWindowedValueCoder<T> extends CustomCoder<WindowedValue<T>> + implements WindowedValueCoder<T> { + public static <T> ValueOnlyWindowedValueCoder<T> of(Coder<T> valueCoder) { return new ValueOnlyWindowedValueCoder<>(valueCoder); } - @JsonCreator - public static ValueOnlyWindowedValueCoder<?> of( - @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) - List<Coder<?>> components) { - checkArgument(components.size() == 1, "Expecting 1 component, got " + components.size()); - return of(components.get(0)); - } + private final Coder<T> valueCoder; ValueOnlyWindowedValueCoder(Coder<T> valueCoder) { - super(valueCoder); + this.valueCoder = valueCoder; + } + + @Override + public Coder<T> getValueCoder() { + return valueCoder; } @Override @@ -770,13 +767,6 @@ public abstract class WindowedValue<T> { } @Override - public CloudObject initializeCloudObject() { - CloudObject result = CloudObject.forClass(getClass()); - addBoolean(result, PropertyNames.IS_WRAPPER, true); - return result; - } - - @Override public List<? extends Coder<?>> getCoderArguments() { return Arrays.<Coder<?>>asList(valueCoder); }
