Repository: beam
Updated Branches:
  refs/heads/master f23dd6709 -> cd2bcaf67


Make WindowedValueCoder an Interface

Implement it in both FullWindowedValueCoder and
ValueOnlyWindowedValueCoder


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/69126956
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/69126956
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/69126956

Branch: refs/heads/master
Commit: 691269565b537370c133598c2c32bd3f87eb8c29
Parents: f23dd67
Author: Thomas Groh <[email protected]>
Authored: Tue Apr 25 15:55:47 2017 -0700
Committer: Luke Cwik <[email protected]>
Committed: Wed Apr 26 14:03:37 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/util/WindowedValue.java | 74 +++++++++-----------
 1 file changed, 32 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/69126956/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
index 6b75951..5d692f2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
@@ -38,6 +38,7 @@ import java.util.Set;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CollectionCoder;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.StandardCoder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -572,35 +573,24 @@ public abstract class WindowedValue<T> {
     return ValueOnlyWindowedValueCoder.of(valueCoder);
   }
 
-  /**
-   * Abstract class for {@code WindowedValue} coder.
-   */
-  public abstract static class WindowedValueCoder<T>
-      extends StandardCoder<WindowedValue<T>> {
-    final Coder<T> valueCoder;
-
-    WindowedValueCoder(Coder<T> valueCoder) {
-      this.valueCoder = checkNotNull(valueCoder);
-    }
-
+  /** Abstract class for {@code WindowedValue} coder. */
+  public interface WindowedValueCoder<T> extends Coder<WindowedValue<T>> {
     /**
-     * Returns the value coder.
+     * Returns the coder used to encode the values within this {@link 
WindowedValueCoder}.
      */
-    public Coder<T> getValueCoder() {
-      return valueCoder;
-    }
+    Coder<T> getValueCoder();
 
     /**
-     * Returns a new {@code WindowedValueCoder} that is a copy of this one,
-     * but with a different value coder.
+     * Returns a new {@code WindowedValueCoder} that is a copy of this one, 
but with a different
+     * value coder.
      */
-    public abstract <NewT> WindowedValueCoder<NewT> withValueCoder(Coder<NewT> 
valueCoder);
+    <NewT> WindowedValueCoder<NewT> withValueCoder(Coder<NewT> valueCoder);
   }
 
-  /**
-   * Coder for {@code WindowedValue}.
-   */
-  public static class FullWindowedValueCoder<T> extends WindowedValueCoder<T> {
+  /** Coder for {@code WindowedValue}. */
+  public static class FullWindowedValueCoder<T> extends 
StandardCoder<WindowedValue<T>>
+      implements WindowedValueCoder<T> {
+    private final Coder<T> valueCoder;
     private final Coder<? extends BoundedWindow> windowCoder;
     // Precompute and cache the coder for a list of windows.
     private final Coder<Collection<? extends BoundedWindow>> windowsCoder;
@@ -624,7 +614,7 @@ public abstract class WindowedValue<T> {
 
     FullWindowedValueCoder(Coder<T> valueCoder,
                            Coder<? extends BoundedWindow> windowCoder) {
-      super(valueCoder);
+      this.valueCoder = checkNotNull(valueCoder);
       this.windowCoder = checkNotNull(windowCoder);
       // It's not possible to statically type-check correct use of the
       // windowCoder (we have to ensure externally that we only get
@@ -645,6 +635,14 @@ public abstract class WindowedValue<T> {
       return windowsCoder;
     }
 
+    /**
+     * Returns the value coder.
+     */
+    @Override
+    public Coder<T> getValueCoder() {
+      return valueCoder;
+    }
+
     @Override
     public <NewT> WindowedValueCoder<NewT> withValueCoder(Coder<NewT> 
valueCoder) {
       return new FullWindowedValueCoder<>(valueCoder, windowCoder);
@@ -717,24 +715,23 @@ public abstract class WindowedValue<T> {
    * Coder for {@code WindowedValue}.
    *
    * <p>A {@code ValueOnlyWindowedValueCoder} only encodes and decodes the 
value. It drops
-   * timestamp and windows for encoding, and uses defaults timestamp, and 
windows for decoding.
+   * timestamps and windows when encoding, and uses a default timestamp and 
window when decoding.
    */
-  public static class ValueOnlyWindowedValueCoder<T> extends 
WindowedValueCoder<T> {
-    public static <T> ValueOnlyWindowedValueCoder<T> of(
-        Coder<T> valueCoder) {
+  public static class ValueOnlyWindowedValueCoder<T> extends 
CustomCoder<WindowedValue<T>>
+      implements WindowedValueCoder<T> {
+    public static <T> ValueOnlyWindowedValueCoder<T> of(Coder<T> valueCoder) {
       return new ValueOnlyWindowedValueCoder<>(valueCoder);
     }
 
-    @JsonCreator
-    public static ValueOnlyWindowedValueCoder<?> of(
-        @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-        List<Coder<?>> components) {
-      checkArgument(components.size() == 1, "Expecting 1 component, got " + 
components.size());
-      return of(components.get(0));
-    }
+    private final Coder<T> valueCoder;
 
     ValueOnlyWindowedValueCoder(Coder<T> valueCoder) {
-      super(valueCoder);
+      this.valueCoder = valueCoder;
+    }
+
+    @Override
+    public Coder<T> getValueCoder() {
+      return valueCoder;
     }
 
     @Override
@@ -770,13 +767,6 @@ public abstract class WindowedValue<T> {
     }
 
     @Override
-    public CloudObject initializeCloudObject() {
-      CloudObject result = CloudObject.forClass(getClass());
-      addBoolean(result, PropertyNames.IS_WRAPPER, true);
-      return result;
-    }
-
-    @Override
     public List<? extends Coder<?>> getCoderArguments() {
       return Arrays.<Coder<?>>asList(valueCoder);
     }

Reply via email to