Add KeyedWorkItemCoder

----Release Notes----
[]
-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=115354168


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/635541a7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/635541a7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/635541a7

Branch: refs/heads/master
Commit: 635541a74438a02d18910fba6a76c5aacc2c7816
Parents: 3904c90
Author: tgroh <[email protected]>
Authored: Tue Feb 23 10:12:17 2016 -0800
Committer: Davor Bonaci <[email protected]>
Committed: Thu Feb 25 23:58:26 2016 -0800

----------------------------------------------------------------------
 .../dataflow/sdk/util/KeyedWorkItemCoder.java   | 120 +++++++++++++++++++
 .../cloud/dataflow/sdk/util/KeyedWorkItems.java |  29 ++++-
 .../sdk/util/KeyedWorkItemCoderTest.java        |  61 ++++++++++
 3 files changed, 209 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/635541a7/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/KeyedWorkItemCoder.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/KeyedWorkItemCoder.java 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/KeyedWorkItemCoder.java
new file mode 100644
index 0000000..398e82a
--- /dev/null
+++ 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/KeyedWorkItemCoder.java
@@ -0,0 +1,120 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy 
of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.CoderException;
+import com.google.cloud.dataflow.sdk.coders.IterableCoder;
+import com.google.cloud.dataflow.sdk.coders.StandardCoder;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData;
+import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerDataCoder;
+import com.google.cloud.dataflow.sdk.util.WindowedValue.FullWindowedValueCoder;
+import com.google.common.collect.ImmutableList;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+
+/**
+ * A {@link Coder} for {@link KeyedWorkItem KeyedWorkItems}.
+ */
+public class KeyedWorkItemCoder<K, ElemT> extends 
StandardCoder<KeyedWorkItem<K, ElemT>> {
+  /**
+   * Create a new {@link KeyedWorkItemCoder} with the provided key coder, 
element coder, and window
+   * coder.
+   */
+  public static <K, ElemT> KeyedWorkItemCoder<K, ElemT> of(
+      Coder<K> keyCoder, Coder<ElemT> elemCoder, Coder<? extends 
BoundedWindow> windowCoder) {
+    return new KeyedWorkItemCoder<>(keyCoder, elemCoder, windowCoder);
+  }
+
+  @JsonCreator
+  public static <K, ElemT> KeyedWorkItemCoder<K, ElemT> of(
+      @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) List<Coder<?>> 
components) {
+    checkArgument(components.size() == 3, "Expecting 3 components, got %s", 
components.size());
+    @SuppressWarnings("unchecked")
+    Coder<K> keyCoder = (Coder<K>) components.get(0);
+    @SuppressWarnings("unchecked")
+    Coder<ElemT> elemCoder = (Coder<ElemT>) components.get(1);
+    @SuppressWarnings("unchecked")
+    Coder<? extends BoundedWindow> windowCoder = (Coder<? extends 
BoundedWindow>) components.get(2);
+    return new KeyedWorkItemCoder<>(keyCoder, elemCoder, windowCoder);
+  }
+
+  private final Coder<K> keyCoder;
+  private final Coder<ElemT> elemCoder;
+  private final Coder<? extends BoundedWindow> windowCoder;
+  private final Coder<Iterable<TimerData>> timersCoder;
+  private final Coder<Iterable<WindowedValue<ElemT>>> elemsCoder;
+
+  private KeyedWorkItemCoder(
+      Coder<K> keyCoder, Coder<ElemT> elemCoder, Coder<? extends 
BoundedWindow> windowCoder) {
+    this.keyCoder = keyCoder;
+    this.elemCoder = elemCoder;
+    this.windowCoder = windowCoder;
+    this.timersCoder = IterableCoder.of(TimerDataCoder.of(windowCoder));
+    this.elemsCoder = IterableCoder.of(FullWindowedValueCoder.of(elemCoder, 
windowCoder));
+  }
+
+  @Override
+  public void encode(KeyedWorkItem<K, ElemT> value, OutputStream outStream, 
Coder.Context context)
+      throws CoderException, IOException {
+    Coder.Context nestedContext = context.nested();
+    keyCoder.encode(value.key(), outStream, nestedContext);
+    timersCoder.encode(value.timersIterable(), outStream, nestedContext);
+    elemsCoder.encode(value.elementsIterable(), outStream, nestedContext);
+  }
+
+  @Override
+  public KeyedWorkItem<K, ElemT> decode(InputStream inStream, Coder.Context 
context)
+      throws CoderException, IOException {
+    Coder.Context nestedContext = context.nested();
+    K key = keyCoder.decode(inStream, nestedContext);
+    Iterable<TimerData> timers = timersCoder.decode(inStream, nestedContext);
+    Iterable<WindowedValue<ElemT>> elems = elemsCoder.decode(inStream, 
nestedContext);
+    return KeyedWorkItems.workItem(key, timers, elems);
+  }
+
+  @Override
+  public List<? extends Coder<?>> getCoderArguments() {
+    return ImmutableList.of(keyCoder, elemCoder, windowCoder);
+  }
+
+  @Override
+  public void verifyDeterministic() throws Coder.NonDeterministicException {
+    keyCoder.verifyDeterministic();
+    timersCoder.verifyDeterministic();
+    elemsCoder.verifyDeterministic();
+  }
+
+  /**
+   * {@inheritDoc}.
+   *
+   * {@link KeyedWorkItemCoder} is not consistent with equals as it can return 
a
+   * {@link KeyedWorkItem} of a type different from the originally encoded 
type.
+   */
+  @Override
+  public boolean consistentWithEquals() {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/635541a7/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/KeyedWorkItems.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/KeyedWorkItems.java 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/KeyedWorkItems.java
index b94397c..734bd2c 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/KeyedWorkItems.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/KeyedWorkItems.java
@@ -16,8 +16,11 @@
 package com.google.cloud.dataflow.sdk.util;
 
 import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData;
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.Iterables;
 
 import java.util.Collections;
+import java.util.Objects;
 
 /**
  * Static utility methods that provide {@link KeyedWorkItem} implementations.
@@ -63,7 +66,6 @@ public class KeyedWorkItems {
    * iterable.
    */
   public static class ComposedKeyedWorkItem<K, ElemT> implements 
KeyedWorkItem<K, ElemT> {
-
     private final K key;
     private final Iterable<TimerData> timers;
     private final Iterable<WindowedValue<ElemT>> elements;
@@ -89,5 +91,30 @@ public class KeyedWorkItems {
     public Iterable<WindowedValue<ElemT>> elementsIterable() {
       return elements;
     }
+
+    @Override
+    public boolean equals(Object other) {
+      if (other == null || !(other instanceof ComposedKeyedWorkItem)) {
+        return false;
+      }
+      KeyedWorkItem<?, ?> that = (KeyedWorkItem<?, ?>) other;
+      return Objects.equals(this.key, that.key())
+          && Iterables.elementsEqual(this.timersIterable(), 
that.timersIterable())
+          && Iterables.elementsEqual(this.elementsIterable(), 
that.elementsIterable());
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(key, timers, elements);
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(ComposedKeyedWorkItem.class)
+          .add("key", key)
+          .add("elements", elements)
+          .add("timers", timers)
+          .toString();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/635541a7/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/KeyedWorkItemCoderTest.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/KeyedWorkItemCoderTest.java
 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/KeyedWorkItemCoderTest.java
new file mode 100644
index 0000000..e6cd454
--- /dev/null
+++ 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/KeyedWorkItemCoderTest.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy 
of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.util;
+
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.coders.VarIntCoder;
+import com.google.cloud.dataflow.sdk.testing.CoderProperties;
+import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow;
+import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData;
+import com.google.cloud.dataflow.sdk.util.state.StateNamespaces;
+import com.google.common.collect.ImmutableList;
+
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link KeyedWorkItems}.
+ */
+@RunWith(JUnit4.class)
+public class KeyedWorkItemCoderTest {
+  @Test
+  public void testCoderProperties() throws Exception {
+    CoderProperties.coderSerializable(
+        KeyedWorkItemCoder.of(StringUtf8Coder.of(), VarIntCoder.of(), 
GlobalWindow.Coder.INSTANCE));
+  }
+
+  @Test
+  public void testEncodeDecodeEqual() throws Exception {
+    Iterable<TimerData> timers =
+        ImmutableList.<TimerData>of(
+            TimerData.of(StateNamespaces.global(), new Instant(500L), 
TimeDomain.EVENT_TIME));
+    Iterable<WindowedValue<Integer>> elements =
+        ImmutableList.of(
+            WindowedValue.valueInGlobalWindow(1),
+            WindowedValue.valueInGlobalWindow(4),
+            WindowedValue.valueInGlobalWindow(8));
+
+    KeyedWorkItemCoder<String, Integer> coder =
+        KeyedWorkItemCoder.of(StringUtf8Coder.of(), VarIntCoder.of(), 
GlobalWindow.Coder.INSTANCE);
+
+    CoderProperties.coderDecodeEncodeEqual(coder, 
KeyedWorkItems.workItem("foo", timers, elements));
+    CoderProperties.coderDecodeEncodeEqual(coder, 
KeyedWorkItems.elementsWorkItem("foo", elements));
+    CoderProperties.coderDecodeEncodeEqual(
+        coder, KeyedWorkItems.<String, Integer>timersWorkItem("foo", timers));
+  }
+}

Reply via email to