Moved KeyedWorkItem and related classes to runners-core
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8a2f020f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8a2f020f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8a2f020f Branch: refs/heads/master Commit: 8a2f020f9781340e60609a5a8ec537871ae29570 Parents: 81d1295 Author: Kenneth Knowles <[email protected]> Authored: Wed Nov 30 20:46:04 2016 -0800 Committer: Kenneth Knowles <[email protected]> Committed: Mon Dec 5 14:55:02 2016 -0800 ---------------------------------------------------------------------- .../operators/ApexGroupByKeyOperator.java | 4 +- .../apache/beam/runners/core/DoFnRunner.java | 1 - .../apache/beam/runners/core/DoFnRunners.java | 1 - .../core/GroupAlsoByWindowViaWindowSetDoFn.java | 1 - .../apache/beam/runners/core/KeyedWorkItem.java | 44 +++++++ .../beam/runners/core/KeyedWorkItemCoder.java | 130 +++++++++++++++++++ .../beam/runners/core/KeyedWorkItems.java | 122 +++++++++++++++++ .../core/LateDataDroppingDoFnRunner.java | 2 - .../beam/runners/core/SplittableParDo.java | 2 - .../runners/core/KeyedWorkItemCoderTest.java | 64 +++++++++ .../beam/runners/core/SplittableParDoTest.java | 2 - ...ectGBKIntoKeyedWorkItemsOverrideFactory.java | 2 +- .../beam/runners/direct/DirectGroupByKey.java | 4 +- .../direct/ExecutorServiceParallelExecutor.java | 4 +- .../GroupAlsoByWindowEvaluatorFactory.java | 2 +- .../direct/GroupByKeyOnlyEvaluatorFactory.java | 4 +- ...littableProcessElementsEvaluatorFactory.java | 2 +- .../direct/GroupByKeyEvaluatorFactoryTest.java | 4 +- .../GroupByKeyOnlyEvaluatorFactoryTest.java | 4 +- .../streaming/SingletonKeyedWorkItem.java | 2 +- .../streaming/SingletonKeyedWorkItemCoder.java | 4 +- .../wrappers/streaming/WindowDoFnOperator.java | 6 +- .../wrappers/streaming/WorkItemKeySelector.java | 2 +- .../org/apache/beam/sdk/util/KeyedWorkItem.java | 43 ------ .../beam/sdk/util/KeyedWorkItemCoder.java | 128 ------------------ .../apache/beam/sdk/util/KeyedWorkItems.java | 121 ----------------- .../beam/sdk/util/KeyedWorkItemCoderTest.java | 62 --------- 27 files changed, 381 insertions(+), 386 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java index f49c785..48ac177 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java @@ -42,6 +42,8 @@ import org.apache.beam.runners.apex.ApexPipelineOptions; import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple; import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions; import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn; +import org.apache.beam.runners.core.KeyedWorkItem; +import org.apache.beam.runners.core.KeyedWorkItems; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; @@ -54,8 +56,6 @@ import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.util.KeyedWorkItem; -import org.apache.beam.sdk.util.KeyedWorkItems; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java index c84122b..aac8e8f 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java @@ -20,7 +20,6 @@ package org.apache.beam.runners.core; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.OldDoFn.ProcessContext; -import org.apache.beam.sdk.util.KeyedWorkItem; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java index 3840423..da16573 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java @@ -27,7 +27,6 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.ExecutionContext.StepContext; -import org.apache.beam.sdk.util.KeyedWorkItem; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java index 8b10813..2082269 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java @@ -24,7 +24,6 @@ import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.KeyedWorkItem; import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.TimerInternals.TimerData; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItem.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItem.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItem.java new file mode 100644 index 0000000..c75fc25 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItem.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import org.apache.beam.sdk.util.TimerInternals.TimerData; +import org.apache.beam.sdk.util.WindowedValue; + +/** + * Interface that contains all the timers and elements associated with a specific work item. + * + * @param <K> the key type + * @param <ElemT> the element type + */ +public interface KeyedWorkItem<K, ElemT> { + /** + * Returns the key. + */ + K key(); + + /** + * Returns an iterable containing the timers. + */ + Iterable<TimerData> timersIterable(); + + /** + * Returns an iterable containing the elements. + */ + Iterable<WindowedValue<ElemT>> elementsIterable(); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java new file mode 100644 index 0000000..95be047 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.StandardCoder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.PropertyNames; +import org.apache.beam.sdk.util.TimerInternals.TimerData; +import org.apache.beam.sdk.util.TimerInternals.TimerDataCoder; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; + +/** + * A {@link Coder} for {@link KeyedWorkItem KeyedWorkItems}. + */ +public class KeyedWorkItemCoder<K, ElemT> extends StandardCoder<KeyedWorkItem<K, ElemT>> { + /** + * Create a new {@link KeyedWorkItemCoder} with the provided key coder, element coder, and window + * coder. + */ + public static <K, ElemT> KeyedWorkItemCoder<K, ElemT> of( + Coder<K> keyCoder, Coder<ElemT> elemCoder, Coder<? extends BoundedWindow> windowCoder) { + return new KeyedWorkItemCoder<>(keyCoder, elemCoder, windowCoder); + } + + @JsonCreator + public static <K, ElemT> KeyedWorkItemCoder<K, ElemT> of( + @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) List<Coder<?>> components) { + checkArgument(components.size() == 3, "Expecting 3 components, got %s", components.size()); + @SuppressWarnings("unchecked") + Coder<K> keyCoder = (Coder<K>) components.get(0); + @SuppressWarnings("unchecked") + Coder<ElemT> elemCoder = (Coder<ElemT>) components.get(1); + @SuppressWarnings("unchecked") + Coder<? extends BoundedWindow> windowCoder = (Coder<? extends BoundedWindow>) components.get(2); + return new KeyedWorkItemCoder<>(keyCoder, elemCoder, windowCoder); + } + + private final Coder<K> keyCoder; + private final Coder<ElemT> elemCoder; + private final Coder<? extends BoundedWindow> windowCoder; + private final Coder<Iterable<TimerData>> timersCoder; + private final Coder<Iterable<WindowedValue<ElemT>>> elemsCoder; + + private KeyedWorkItemCoder( + Coder<K> keyCoder, Coder<ElemT> elemCoder, Coder<? extends BoundedWindow> windowCoder) { + this.keyCoder = keyCoder; + this.elemCoder = elemCoder; + this.windowCoder = windowCoder; + this.timersCoder = IterableCoder.of(TimerDataCoder.of(windowCoder)); + this.elemsCoder = IterableCoder.of(FullWindowedValueCoder.of(elemCoder, windowCoder)); + } + + public Coder<K> getKeyCoder() { + return keyCoder; + } + + public Coder<ElemT> getElementCoder() { + return elemCoder; + } + + @Override + public void encode(KeyedWorkItem<K, ElemT> value, OutputStream outStream, Coder.Context context) + throws CoderException, IOException { + Coder.Context nestedContext = context.nested(); + keyCoder.encode(value.key(), outStream, nestedContext); + timersCoder.encode(value.timersIterable(), outStream, nestedContext); + elemsCoder.encode(value.elementsIterable(), outStream, nestedContext); + } + + @Override + public KeyedWorkItem<K, ElemT> decode(InputStream inStream, Coder.Context context) + throws CoderException, IOException { + Coder.Context nestedContext = context.nested(); + K key = keyCoder.decode(inStream, nestedContext); + Iterable<TimerData> timers = timersCoder.decode(inStream, nestedContext); + Iterable<WindowedValue<ElemT>> elems = elemsCoder.decode(inStream, nestedContext); + return KeyedWorkItems.workItem(key, timers, elems); + } + + @Override + public List<? extends Coder<?>> getCoderArguments() { + return ImmutableList.of(keyCoder, elemCoder, windowCoder); + } + + @Override + public void verifyDeterministic() throws Coder.NonDeterministicException { + keyCoder.verifyDeterministic(); + timersCoder.verifyDeterministic(); + elemsCoder.verifyDeterministic(); + } + + /** + * {@inheritDoc}. + * + * {@link KeyedWorkItemCoder} is not consistent with equals as it can return a + * {@link KeyedWorkItem} of a type different from the originally encoded type. + */ + @Override + public boolean consistentWithEquals() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItems.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItems.java new file mode 100644 index 0000000..94c3bb6 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItems.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import com.google.common.base.MoreObjects; +import com.google.common.collect.Iterables; +import java.util.Collections; +import java.util.Objects; +import org.apache.beam.sdk.util.TimerInternals.TimerData; +import org.apache.beam.sdk.util.WindowedValue; + +/** + * Static utility methods that provide {@link KeyedWorkItem} implementations. + */ +public class KeyedWorkItems { + /** + * Returns an implementation of {@link KeyedWorkItem} that wraps around an elements iterable. + * + * @param <K> the key type + * @param <ElemT> the element type + */ + public static <K, ElemT> KeyedWorkItem<K, ElemT> elementsWorkItem( + K key, Iterable<WindowedValue<ElemT>> elementsIterable) { + return new ComposedKeyedWorkItem<>(key, Collections.<TimerData>emptyList(), elementsIterable); + } + + /** + * Returns an implementation of {@link KeyedWorkItem} that wraps around an timers iterable. + * + * @param <K> the key type + * @param <ElemT> the element type + */ + public static <K, ElemT> KeyedWorkItem<K, ElemT> timersWorkItem( + K key, Iterable<TimerData> timersIterable) { + return new ComposedKeyedWorkItem<>( + key, timersIterable, Collections.<WindowedValue<ElemT>>emptyList()); + } + + /** + * Returns an implementation of {@link KeyedWorkItem} that wraps around + * an timers iterable and an elements iterable. + * + * @param <K> the key type + * @param <ElemT> the element type + */ + public static <K, ElemT> KeyedWorkItem<K, ElemT> workItem( + K key, Iterable<TimerData> timersIterable, Iterable<WindowedValue<ElemT>> elementsIterable) { + return new ComposedKeyedWorkItem<>(key, timersIterable, elementsIterable); + } + + /** + * A {@link KeyedWorkItem} composed of an underlying key, {@link TimerData} iterable, and element + * iterable. + */ + public static class ComposedKeyedWorkItem<K, ElemT> implements KeyedWorkItem<K, ElemT> { + private final K key; + private final Iterable<TimerData> timers; + private final Iterable<WindowedValue<ElemT>> elements; + + private ComposedKeyedWorkItem( + K key, Iterable<TimerData> timers, Iterable<WindowedValue<ElemT>> elements) { + this.key = key; + this.timers = timers; + this.elements = elements; + } + + @Override + public K key() { + return key; + } + + @Override + public Iterable<TimerData> timersIterable() { + return timers; + } + + @Override + public Iterable<WindowedValue<ElemT>> elementsIterable() { + return elements; + } + + @Override + public boolean equals(Object other) { + if (other == null || !(other instanceof ComposedKeyedWorkItem)) { + return false; + } + KeyedWorkItem<?, ?> that = (KeyedWorkItem<?, ?>) other; + return Objects.equals(this.key, that.key()) + && Iterables.elementsEqual(this.timersIterable(), that.timersIterable()) + && Iterables.elementsEqual(this.elementsIterable(), that.elementsIterable()); + } + + @Override + public int hashCode() { + return Objects.hash(key, timers, elements); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(ComposedKeyedWorkItem.class) + .add("key", key) + .add("elements", elements) + .add("timers", timers) + .toString(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java index 63a80d2..b6f700f 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java @@ -24,8 +24,6 @@ import com.google.common.collect.Iterables; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.KeyedWorkItem; -import org.apache.beam.sdk.util.KeyedWorkItems; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowTracing; import org.apache.beam.sdk.util.WindowedValue; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java index 80fd17b..a633111 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java @@ -49,8 +49,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.KeyedWorkItem; -import org.apache.beam.sdk.util.KeyedWorkItemCoder; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.Timer; import org.apache.beam.sdk.util.TimerInternals; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java new file mode 100644 index 0000000..37fabdd --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import com.google.common.collect.ImmutableList; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.testing.CoderProperties; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.TimerInternals.TimerData; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.state.StateNamespaces; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link KeyedWorkItems}. + */ +@RunWith(JUnit4.class) +public class KeyedWorkItemCoderTest { + @Test + public void testCoderProperties() throws Exception { + CoderProperties.coderSerializable( + KeyedWorkItemCoder.of(StringUtf8Coder.of(), VarIntCoder.of(), GlobalWindow.Coder.INSTANCE)); + } + + @Test + public void testEncodeDecodeEqual() throws Exception { + Iterable<TimerData> timers = + ImmutableList.<TimerData>of( + TimerData.of(StateNamespaces.global(), new Instant(500L), TimeDomain.EVENT_TIME)); + Iterable<WindowedValue<Integer>> elements = + ImmutableList.of( + WindowedValue.valueInGlobalWindow(1), + WindowedValue.valueInGlobalWindow(4), + WindowedValue.valueInGlobalWindow(8)); + + KeyedWorkItemCoder<String, Integer> coder = + KeyedWorkItemCoder.of(StringUtf8Coder.of(), VarIntCoder.of(), GlobalWindow.Coder.INSTANCE); + + CoderProperties.coderDecodeEncodeEqual(coder, KeyedWorkItems.workItem("foo", timers, elements)); + CoderProperties.coderDecodeEncodeEqual(coder, KeyedWorkItems.elementsWorkItem("foo", elements)); + CoderProperties.coderDecodeEncodeEqual( + coder, KeyedWorkItems.<String, Integer>timersWorkItem("foo", timers)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java index b13d839..cf96b66 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java @@ -46,8 +46,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.KeyedWorkItem; -import org.apache.beam.sdk.util.KeyedWorkItems; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.state.StateInternals; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java index 04becd7..1fa059c 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java @@ -17,9 +17,9 @@ */ package org.apache.beam.runners.direct; +import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.SplittableParDo; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.util.KeyedWorkItem; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java index efee801..21776e7 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java @@ -20,13 +20,13 @@ package org.apache.beam.runners.direct; import static com.google.common.base.Preconditions.checkArgument; +import org.apache.beam.runners.core.KeyedWorkItem; +import org.apache.beam.runners.core.KeyedWorkItemCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.util.KeyedWorkItem; -import org.apache.beam.sdk.util.KeyedWorkItemCoder; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java index 929d09d..a308295 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java @@ -43,13 +43,13 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; +import org.apache.beam.runners.core.KeyedWorkItem; +import org.apache.beam.runners.core.KeyedWorkItems; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.WatermarkManager.FiredTimers; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.util.KeyedWorkItem; -import org.apache.beam.sdk.util.KeyedWorkItems; import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java index 9d25bc6..5c6b2c1 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java @@ -28,6 +28,7 @@ import org.apache.beam.runners.core.GroupAlsoByWindowsDoFn; import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly; import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow; import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly; +import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.OutputWindowedValue; import org.apache.beam.runners.core.ReduceFnRunner; import org.apache.beam.runners.core.SystemReduceFn; @@ -44,7 +45,6 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.KeyedWorkItem; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.TimerInternals.TimerData; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java index 4d691ea..20d619f 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java @@ -27,6 +27,8 @@ import java.util.List; import java.util.Map; import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly; import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly; +import org.apache.beam.runners.core.KeyedWorkItem; +import org.apache.beam.runners.core.KeyedWorkItems; import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; @@ -36,8 +38,6 @@ import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.util.KeyedWorkItem; -import org.apache.beam.sdk.util.KeyedWorkItems; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java index 0eca710..aae1149 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java @@ -20,13 +20,13 @@ package org.apache.beam.runners.direct; import java.util.Collection; import org.apache.beam.runners.core.DoFnRunners.OutputManager; import org.apache.beam.runners.core.ElementAndRestriction; +import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.OutputWindowedValue; import org.apache.beam.runners.core.SplittableParDo; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.KeyedWorkItem; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.state.StateInternals; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java index a726817..7ba38ce 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java @@ -25,6 +25,8 @@ import static org.mockito.Mockito.when; import com.google.common.collect.HashMultiset; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Multiset; +import org.apache.beam.runners.core.KeyedWorkItem; +import org.apache.beam.runners.core.KeyedWorkItems; import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; @@ -33,8 +35,6 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.util.KeyedWorkItem; -import org.apache.beam.sdk.util.KeyedWorkItems; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java index 3e5af14..23340c6 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java @@ -25,6 +25,8 @@ import static org.mockito.Mockito.when; import com.google.common.collect.HashMultiset; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Multiset; +import org.apache.beam.runners.core.KeyedWorkItem; +import org.apache.beam.runners.core.KeyedWorkItems; import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; @@ -33,8 +35,6 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.util.KeyedWorkItem; -import org.apache.beam.sdk.util.KeyedWorkItems; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java index 6d2582b..b53658e 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java @@ -18,7 +18,7 @@ package org.apache.beam.runners.flink.translation.wrappers.streaming; import java.util.Collections; -import org.apache.beam.sdk.util.KeyedWorkItem; +import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java index 37454a3..ad30688 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java @@ -26,12 +26,12 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.List; +import org.apache.beam.runners.core.KeyedWorkItem; +import org.apache.beam.runners.core.KeyedWorkItemCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.StandardCoder; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.KeyedWorkItem; -import org.apache.beam.sdk.util.KeyedWorkItemCoder; import org.apache.beam.sdk.util.PropertyNames; import org.apache.beam.sdk.util.WindowedValue; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java index 432dc64..f2d7f1c 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java @@ -39,6 +39,8 @@ import java.util.Set; import java.util.concurrent.ScheduledFuture; import javax.annotation.Nullable; import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn; +import org.apache.beam.runners.core.KeyedWorkItem; +import org.apache.beam.runners.core.KeyedWorkItems; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper; import org.apache.beam.sdk.coders.Coder; @@ -47,8 +49,6 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.ExecutionContext; -import org.apache.beam.sdk.util.KeyedWorkItem; -import org.apache.beam.sdk.util.KeyedWorkItems; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; @@ -59,8 +59,6 @@ import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; - - import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.memory.DataInputView; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java index 7829163..1dff367 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java @@ -18,9 +18,9 @@ package org.apache.beam.runners.flink.translation.wrappers.streaming; import java.nio.ByteBuffer; +import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.util.KeyedWorkItem; import org.apache.beam.sdk.util.WindowedValue; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItem.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItem.java deleted file mode 100644 index b273466..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItem.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import org.apache.beam.sdk.util.TimerInternals.TimerData; - -/** - * Interface that contains all the timers and elements associated with a specific work item. - * - * @param <K> the key type - * @param <ElemT> the element type - */ -public interface KeyedWorkItem<K, ElemT> { - /** - * Returns the key. - */ - K key(); - - /** - * Returns an iterable containing the timers. - */ - Iterable<TimerData> timersIterable(); - - /** - * Returns an iterable containing the elements. - */ - Iterable<WindowedValue<ElemT>> elementsIterable(); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItemCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItemCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItemCoder.java deleted file mode 100644 index a6e3d6c..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItemCoder.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.ImmutableList; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.List; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.IterableCoder; -import org.apache.beam.sdk.coders.StandardCoder; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.TimerInternals.TimerData; -import org.apache.beam.sdk.util.TimerInternals.TimerDataCoder; -import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; - -/** - * A {@link Coder} for {@link KeyedWorkItem KeyedWorkItems}. - */ -public class KeyedWorkItemCoder<K, ElemT> extends StandardCoder<KeyedWorkItem<K, ElemT>> { - /** - * Create a new {@link KeyedWorkItemCoder} with the provided key coder, element coder, and window - * coder. - */ - public static <K, ElemT> KeyedWorkItemCoder<K, ElemT> of( - Coder<K> keyCoder, Coder<ElemT> elemCoder, Coder<? extends BoundedWindow> windowCoder) { - return new KeyedWorkItemCoder<>(keyCoder, elemCoder, windowCoder); - } - - @JsonCreator - public static <K, ElemT> KeyedWorkItemCoder<K, ElemT> of( - @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) List<Coder<?>> components) { - checkArgument(components.size() == 3, "Expecting 3 components, got %s", components.size()); - @SuppressWarnings("unchecked") - Coder<K> keyCoder = (Coder<K>) components.get(0); - @SuppressWarnings("unchecked") - Coder<ElemT> elemCoder = (Coder<ElemT>) components.get(1); - @SuppressWarnings("unchecked") - Coder<? extends BoundedWindow> windowCoder = (Coder<? extends BoundedWindow>) components.get(2); - return new KeyedWorkItemCoder<>(keyCoder, elemCoder, windowCoder); - } - - private final Coder<K> keyCoder; - private final Coder<ElemT> elemCoder; - private final Coder<? extends BoundedWindow> windowCoder; - private final Coder<Iterable<TimerData>> timersCoder; - private final Coder<Iterable<WindowedValue<ElemT>>> elemsCoder; - - private KeyedWorkItemCoder( - Coder<K> keyCoder, Coder<ElemT> elemCoder, Coder<? extends BoundedWindow> windowCoder) { - this.keyCoder = keyCoder; - this.elemCoder = elemCoder; - this.windowCoder = windowCoder; - this.timersCoder = IterableCoder.of(TimerDataCoder.of(windowCoder)); - this.elemsCoder = IterableCoder.of(FullWindowedValueCoder.of(elemCoder, windowCoder)); - } - - public Coder<K> getKeyCoder() { - return keyCoder; - } - - public Coder<ElemT> getElementCoder() { - return elemCoder; - } - - @Override - public void encode(KeyedWorkItem<K, ElemT> value, OutputStream outStream, Coder.Context context) - throws CoderException, IOException { - Coder.Context nestedContext = context.nested(); - keyCoder.encode(value.key(), outStream, nestedContext); - timersCoder.encode(value.timersIterable(), outStream, nestedContext); - elemsCoder.encode(value.elementsIterable(), outStream, nestedContext); - } - - @Override - public KeyedWorkItem<K, ElemT> decode(InputStream inStream, Coder.Context context) - throws CoderException, IOException { - Coder.Context nestedContext = context.nested(); - K key = keyCoder.decode(inStream, nestedContext); - Iterable<TimerData> timers = timersCoder.decode(inStream, nestedContext); - Iterable<WindowedValue<ElemT>> elems = elemsCoder.decode(inStream, nestedContext); - return KeyedWorkItems.workItem(key, timers, elems); - } - - @Override - public List<? extends Coder<?>> getCoderArguments() { - return ImmutableList.of(keyCoder, elemCoder, windowCoder); - } - - @Override - public void verifyDeterministic() throws Coder.NonDeterministicException { - keyCoder.verifyDeterministic(); - timersCoder.verifyDeterministic(); - elemsCoder.verifyDeterministic(); - } - - /** - * {@inheritDoc}. - * - * {@link KeyedWorkItemCoder} is not consistent with equals as it can return a - * {@link KeyedWorkItem} of a type different from the originally encoded type. - */ - @Override - public boolean consistentWithEquals() { - return false; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItems.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItems.java deleted file mode 100644 index 7434842..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItems.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import com.google.common.base.MoreObjects; -import com.google.common.collect.Iterables; -import java.util.Collections; -import java.util.Objects; -import org.apache.beam.sdk.util.TimerInternals.TimerData; - -/** - * Static utility methods that provide {@link KeyedWorkItem} implementations. - */ -public class KeyedWorkItems { - /** - * Returns an implementation of {@link KeyedWorkItem} that wraps around an elements iterable. - * - * @param <K> the key type - * @param <ElemT> the element type - */ - public static <K, ElemT> KeyedWorkItem<K, ElemT> elementsWorkItem( - K key, Iterable<WindowedValue<ElemT>> elementsIterable) { - return new ComposedKeyedWorkItem<>(key, Collections.<TimerData>emptyList(), elementsIterable); - } - - /** - * Returns an implementation of {@link KeyedWorkItem} that wraps around an timers iterable. - * - * @param <K> the key type - * @param <ElemT> the element type - */ - public static <K, ElemT> KeyedWorkItem<K, ElemT> timersWorkItem( - K key, Iterable<TimerData> timersIterable) { - return new ComposedKeyedWorkItem<>( - key, timersIterable, Collections.<WindowedValue<ElemT>>emptyList()); - } - - /** - * Returns an implementation of {@link KeyedWorkItem} that wraps around - * an timers iterable and an elements iterable. - * - * @param <K> the key type - * @param <ElemT> the element type - */ - public static <K, ElemT> KeyedWorkItem<K, ElemT> workItem( - K key, Iterable<TimerData> timersIterable, Iterable<WindowedValue<ElemT>> elementsIterable) { - return new ComposedKeyedWorkItem<>(key, timersIterable, elementsIterable); - } - - /** - * A {@link KeyedWorkItem} composed of an underlying key, {@link TimerData} iterable, and element - * iterable. - */ - public static class ComposedKeyedWorkItem<K, ElemT> implements KeyedWorkItem<K, ElemT> { - private final K key; - private final Iterable<TimerData> timers; - private final Iterable<WindowedValue<ElemT>> elements; - - private ComposedKeyedWorkItem( - K key, Iterable<TimerData> timers, Iterable<WindowedValue<ElemT>> elements) { - this.key = key; - this.timers = timers; - this.elements = elements; - } - - @Override - public K key() { - return key; - } - - @Override - public Iterable<TimerData> timersIterable() { - return timers; - } - - @Override - public Iterable<WindowedValue<ElemT>> elementsIterable() { - return elements; - } - - @Override - public boolean equals(Object other) { - if (other == null || !(other instanceof ComposedKeyedWorkItem)) { - return false; - } - KeyedWorkItem<?, ?> that = (KeyedWorkItem<?, ?>) other; - return Objects.equals(this.key, that.key()) - && Iterables.elementsEqual(this.timersIterable(), that.timersIterable()) - && Iterables.elementsEqual(this.elementsIterable(), that.elementsIterable()); - } - - @Override - public int hashCode() { - return Objects.hash(key, timers, elements); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(ComposedKeyedWorkItem.class) - .add("key", key) - .add("elements", elements) - .add("timers", timers) - .toString(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/sdks/java/core/src/test/java/org/apache/beam/sdk/util/KeyedWorkItemCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/KeyedWorkItemCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/KeyedWorkItemCoderTest.java deleted file mode 100644 index 1974d9e..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/KeyedWorkItemCoderTest.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import com.google.common.collect.ImmutableList; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.testing.CoderProperties; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.util.TimerInternals.TimerData; -import org.apache.beam.sdk.util.state.StateNamespaces; -import org.joda.time.Instant; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link KeyedWorkItems}. - */ -@RunWith(JUnit4.class) -public class KeyedWorkItemCoderTest { - @Test - public void testCoderProperties() throws Exception { - CoderProperties.coderSerializable( - KeyedWorkItemCoder.of(StringUtf8Coder.of(), VarIntCoder.of(), GlobalWindow.Coder.INSTANCE)); - } - - @Test - public void testEncodeDecodeEqual() throws Exception { - Iterable<TimerData> timers = - ImmutableList.<TimerData>of( - TimerData.of(StateNamespaces.global(), new Instant(500L), TimeDomain.EVENT_TIME)); - Iterable<WindowedValue<Integer>> elements = - ImmutableList.of( - WindowedValue.valueInGlobalWindow(1), - WindowedValue.valueInGlobalWindow(4), - WindowedValue.valueInGlobalWindow(8)); - - KeyedWorkItemCoder<String, Integer> coder = - KeyedWorkItemCoder.of(StringUtf8Coder.of(), VarIntCoder.of(), GlobalWindow.Coder.INSTANCE); - - CoderProperties.coderDecodeEncodeEqual(coder, KeyedWorkItems.workItem("foo", timers, elements)); - CoderProperties.coderDecodeEncodeEqual(coder, KeyedWorkItems.elementsWorkItem("foo", elements)); - CoderProperties.coderDecodeEncodeEqual( - coder, KeyedWorkItems.<String, Integer>timersWorkItem("foo", timers)); - } -}
