Add KeyedWorkItemCoder ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=115354168
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/635541a7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/635541a7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/635541a7 Branch: refs/heads/master Commit: 635541a74438a02d18910fba6a76c5aacc2c7816 Parents: 3904c90 Author: tgroh <[email protected]> Authored: Tue Feb 23 10:12:17 2016 -0800 Committer: Davor Bonaci <[email protected]> Committed: Thu Feb 25 23:58:26 2016 -0800 ---------------------------------------------------------------------- .../dataflow/sdk/util/KeyedWorkItemCoder.java | 120 +++++++++++++++++++ .../cloud/dataflow/sdk/util/KeyedWorkItems.java | 29 ++++- .../sdk/util/KeyedWorkItemCoderTest.java | 61 ++++++++++ 3 files changed, 209 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/635541a7/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/KeyedWorkItemCoder.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/KeyedWorkItemCoder.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/KeyedWorkItemCoder.java new file mode 100644 index 0000000..398e82a --- /dev/null +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/KeyedWorkItemCoder.java @@ -0,0 +1,120 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.util; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.coders.CoderException; +import com.google.cloud.dataflow.sdk.coders.IterableCoder; +import com.google.cloud.dataflow.sdk.coders.StandardCoder; +import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData; +import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerDataCoder; +import com.google.cloud.dataflow.sdk.util.WindowedValue.FullWindowedValueCoder; +import com.google.common.collect.ImmutableList; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; + +/** + * A {@link Coder} for {@link KeyedWorkItem KeyedWorkItems}. + */ +public class KeyedWorkItemCoder<K, ElemT> extends StandardCoder<KeyedWorkItem<K, ElemT>> { + /** + * Create a new {@link KeyedWorkItemCoder} with the provided key coder, element coder, and window + * coder. + */ + public static <K, ElemT> KeyedWorkItemCoder<K, ElemT> of( + Coder<K> keyCoder, Coder<ElemT> elemCoder, Coder<? extends BoundedWindow> windowCoder) { + return new KeyedWorkItemCoder<>(keyCoder, elemCoder, windowCoder); + } + + @JsonCreator + public static <K, ElemT> KeyedWorkItemCoder<K, ElemT> of( + @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) List<Coder<?>> components) { + checkArgument(components.size() == 3, "Expecting 3 components, got %s", components.size()); + @SuppressWarnings("unchecked") + Coder<K> keyCoder = (Coder<K>) components.get(0); + @SuppressWarnings("unchecked") + Coder<ElemT> elemCoder = (Coder<ElemT>) components.get(1); + @SuppressWarnings("unchecked") + Coder<? extends BoundedWindow> windowCoder = (Coder<? extends BoundedWindow>) components.get(2); + return new KeyedWorkItemCoder<>(keyCoder, elemCoder, windowCoder); + } + + private final Coder<K> keyCoder; + private final Coder<ElemT> elemCoder; + private final Coder<? extends BoundedWindow> windowCoder; + private final Coder<Iterable<TimerData>> timersCoder; + private final Coder<Iterable<WindowedValue<ElemT>>> elemsCoder; + + private KeyedWorkItemCoder( + Coder<K> keyCoder, Coder<ElemT> elemCoder, Coder<? extends BoundedWindow> windowCoder) { + this.keyCoder = keyCoder; + this.elemCoder = elemCoder; + this.windowCoder = windowCoder; + this.timersCoder = IterableCoder.of(TimerDataCoder.of(windowCoder)); + this.elemsCoder = IterableCoder.of(FullWindowedValueCoder.of(elemCoder, windowCoder)); + } + + @Override + public void encode(KeyedWorkItem<K, ElemT> value, OutputStream outStream, Coder.Context context) + throws CoderException, IOException { + Coder.Context nestedContext = context.nested(); + keyCoder.encode(value.key(), outStream, nestedContext); + timersCoder.encode(value.timersIterable(), outStream, nestedContext); + elemsCoder.encode(value.elementsIterable(), outStream, nestedContext); + } + + @Override + public KeyedWorkItem<K, ElemT> decode(InputStream inStream, Coder.Context context) + throws CoderException, IOException { + Coder.Context nestedContext = context.nested(); + K key = keyCoder.decode(inStream, nestedContext); + Iterable<TimerData> timers = timersCoder.decode(inStream, nestedContext); + Iterable<WindowedValue<ElemT>> elems = elemsCoder.decode(inStream, nestedContext); + return KeyedWorkItems.workItem(key, timers, elems); + } + + @Override + public List<? extends Coder<?>> getCoderArguments() { + return ImmutableList.of(keyCoder, elemCoder, windowCoder); + } + + @Override + public void verifyDeterministic() throws Coder.NonDeterministicException { + keyCoder.verifyDeterministic(); + timersCoder.verifyDeterministic(); + elemsCoder.verifyDeterministic(); + } + + /** + * {@inheritDoc}. + * + * {@link KeyedWorkItemCoder} is not consistent with equals as it can return a + * {@link KeyedWorkItem} of a type different from the originally encoded type. + */ + @Override + public boolean consistentWithEquals() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/635541a7/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/KeyedWorkItems.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/KeyedWorkItems.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/KeyedWorkItems.java index b94397c..734bd2c 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/KeyedWorkItems.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/KeyedWorkItems.java @@ -16,8 +16,11 @@ package com.google.cloud.dataflow.sdk.util; import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData; +import com.google.common.base.MoreObjects; +import com.google.common.collect.Iterables; import java.util.Collections; +import java.util.Objects; /** * Static utility methods that provide {@link KeyedWorkItem} implementations. @@ -63,7 +66,6 @@ public class KeyedWorkItems { * iterable. */ public static class ComposedKeyedWorkItem<K, ElemT> implements KeyedWorkItem<K, ElemT> { - private final K key; private final Iterable<TimerData> timers; private final Iterable<WindowedValue<ElemT>> elements; @@ -89,5 +91,30 @@ public class KeyedWorkItems { public Iterable<WindowedValue<ElemT>> elementsIterable() { return elements; } + + @Override + public boolean equals(Object other) { + if (other == null || !(other instanceof ComposedKeyedWorkItem)) { + return false; + } + KeyedWorkItem<?, ?> that = (KeyedWorkItem<?, ?>) other; + return Objects.equals(this.key, that.key()) + && Iterables.elementsEqual(this.timersIterable(), that.timersIterable()) + && Iterables.elementsEqual(this.elementsIterable(), that.elementsIterable()); + } + + @Override + public int hashCode() { + return Objects.hash(key, timers, elements); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(ComposedKeyedWorkItem.class) + .add("key", key) + .add("elements", elements) + .add("timers", timers) + .toString(); + } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/635541a7/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/KeyedWorkItemCoderTest.java ---------------------------------------------------------------------- diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/KeyedWorkItemCoderTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/KeyedWorkItemCoderTest.java new file mode 100644 index 0000000..e6cd454 --- /dev/null +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/KeyedWorkItemCoderTest.java @@ -0,0 +1,61 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.util; + +import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; +import com.google.cloud.dataflow.sdk.coders.VarIntCoder; +import com.google.cloud.dataflow.sdk.testing.CoderProperties; +import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow; +import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData; +import com.google.cloud.dataflow.sdk.util.state.StateNamespaces; +import com.google.common.collect.ImmutableList; + +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link KeyedWorkItems}. + */ +@RunWith(JUnit4.class) +public class KeyedWorkItemCoderTest { + @Test + public void testCoderProperties() throws Exception { + CoderProperties.coderSerializable( + KeyedWorkItemCoder.of(StringUtf8Coder.of(), VarIntCoder.of(), GlobalWindow.Coder.INSTANCE)); + } + + @Test + public void testEncodeDecodeEqual() throws Exception { + Iterable<TimerData> timers = + ImmutableList.<TimerData>of( + TimerData.of(StateNamespaces.global(), new Instant(500L), TimeDomain.EVENT_TIME)); + Iterable<WindowedValue<Integer>> elements = + ImmutableList.of( + WindowedValue.valueInGlobalWindow(1), + WindowedValue.valueInGlobalWindow(4), + WindowedValue.valueInGlobalWindow(8)); + + KeyedWorkItemCoder<String, Integer> coder = + KeyedWorkItemCoder.of(StringUtf8Coder.of(), VarIntCoder.of(), GlobalWindow.Coder.INSTANCE); + + CoderProperties.coderDecodeEncodeEqual(coder, KeyedWorkItems.workItem("foo", timers, elements)); + CoderProperties.coderDecodeEncodeEqual(coder, KeyedWorkItems.elementsWorkItem("foo", elements)); + CoderProperties.coderDecodeEncodeEqual( + coder, KeyedWorkItems.<String, Integer>timersWorkItem("foo", timers)); + } +}
