Repository: beam Updated Branches: refs/heads/master bb12a56e6 -> 3bcbba121
Revert "Make WindowedValueCoder an Interface" This reverts commit 691269565b537370c133598c2c32bd3f87eb8c29. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d8b61969 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d8b61969 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d8b61969 Branch: refs/heads/master Commit: d8b61969f6adc276741db95567af0cb98f2ff6f9 Parents: bb12a56 Author: Thomas Groh <[email protected]> Authored: Wed Apr 26 20:12:04 2017 -0700 Committer: Thomas Groh <[email protected]> Committed: Wed Apr 26 20:12:04 2017 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/util/WindowedValue.java | 74 +++++++++++--------- 1 file changed, 42 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/d8b61969/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java index 5d692f2..6b75951 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java @@ -38,7 +38,6 @@ import java.util.Set; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.CollectionCoder; -import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.InstantCoder; import org.apache.beam.sdk.coders.StandardCoder; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -573,24 +572,35 @@ public abstract class WindowedValue<T> { return ValueOnlyWindowedValueCoder.of(valueCoder); } - /** Abstract class for {@code WindowedValue} coder. */ - public interface WindowedValueCoder<T> extends Coder<WindowedValue<T>> { + /** + * Abstract class for {@code WindowedValue} coder. + */ + public abstract static class WindowedValueCoder<T> + extends StandardCoder<WindowedValue<T>> { + final Coder<T> valueCoder; + + WindowedValueCoder(Coder<T> valueCoder) { + this.valueCoder = checkNotNull(valueCoder); + } + /** - * Returns the coder used to encode the values within this {@link WindowedValueCoder}. + * Returns the value coder. */ - Coder<T> getValueCoder(); + public Coder<T> getValueCoder() { + return valueCoder; + } /** - * Returns a new {@code WindowedValueCoder} that is a copy of this one, but with a different - * value coder. + * Returns a new {@code WindowedValueCoder} that is a copy of this one, + * but with a different value coder. */ - <NewT> WindowedValueCoder<NewT> withValueCoder(Coder<NewT> valueCoder); + public abstract <NewT> WindowedValueCoder<NewT> withValueCoder(Coder<NewT> valueCoder); } - /** Coder for {@code WindowedValue}. */ - public static class FullWindowedValueCoder<T> extends StandardCoder<WindowedValue<T>> - implements WindowedValueCoder<T> { - private final Coder<T> valueCoder; + /** + * Coder for {@code WindowedValue}. + */ + public static class FullWindowedValueCoder<T> extends WindowedValueCoder<T> { private final Coder<? extends BoundedWindow> windowCoder; // Precompute and cache the coder for a list of windows. private final Coder<Collection<? extends BoundedWindow>> windowsCoder; @@ -614,7 +624,7 @@ public abstract class WindowedValue<T> { FullWindowedValueCoder(Coder<T> valueCoder, Coder<? extends BoundedWindow> windowCoder) { - this.valueCoder = checkNotNull(valueCoder); + super(valueCoder); this.windowCoder = checkNotNull(windowCoder); // It's not possible to statically type-check correct use of the // windowCoder (we have to ensure externally that we only get @@ -635,14 +645,6 @@ public abstract class WindowedValue<T> { return windowsCoder; } - /** - * Returns the value coder. - */ - @Override - public Coder<T> getValueCoder() { - return valueCoder; - } - @Override public <NewT> WindowedValueCoder<NewT> withValueCoder(Coder<NewT> valueCoder) { return new FullWindowedValueCoder<>(valueCoder, windowCoder); @@ -715,23 +717,24 @@ public abstract class WindowedValue<T> { * Coder for {@code WindowedValue}. * * <p>A {@code ValueOnlyWindowedValueCoder} only encodes and decodes the value. It drops - * timestamps and windows when encoding, and uses a default timestamp and window when decoding. + * timestamp and windows for encoding, and uses defaults timestamp, and windows for decoding. */ - public static class ValueOnlyWindowedValueCoder<T> extends CustomCoder<WindowedValue<T>> - implements WindowedValueCoder<T> { - public static <T> ValueOnlyWindowedValueCoder<T> of(Coder<T> valueCoder) { + public static class ValueOnlyWindowedValueCoder<T> extends WindowedValueCoder<T> { + public static <T> ValueOnlyWindowedValueCoder<T> of( + Coder<T> valueCoder) { return new ValueOnlyWindowedValueCoder<>(valueCoder); } - private final Coder<T> valueCoder; - - ValueOnlyWindowedValueCoder(Coder<T> valueCoder) { - this.valueCoder = valueCoder; + @JsonCreator + public static ValueOnlyWindowedValueCoder<?> of( + @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) + List<Coder<?>> components) { + checkArgument(components.size() == 1, "Expecting 1 component, got " + components.size()); + return of(components.get(0)); } - @Override - public Coder<T> getValueCoder() { - return valueCoder; + ValueOnlyWindowedValueCoder(Coder<T> valueCoder) { + super(valueCoder); } @Override @@ -767,6 +770,13 @@ public abstract class WindowedValue<T> { } @Override + public CloudObject initializeCloudObject() { + CloudObject result = CloudObject.forClass(getClass()); + addBoolean(result, PropertyNames.IS_WRAPPER, true); + return result; + } + + @Override public List<? extends Coder<?>> getCoderArguments() { return Arrays.<Coder<?>>asList(valueCoder); }
