Rename AccumulatorCombiningState to CombiningState
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ef480a37 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ef480a37 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ef480a37 Branch: refs/heads/master Commit: ef480a37ebe039d0eaa2d4ca758ea015893e9089 Parents: 24c0495 Author: Kenneth Knowles <[email protected]> Authored: Mon Apr 3 11:27:26 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Thu Apr 6 11:57:21 2017 -0700 ---------------------------------------------------------------------- .../translation/utils/ApexStateInternals.java | 28 ++++---- .../utils/ApexStateInternalsTest.java | 14 ++-- .../runners/core/InMemoryStateInternals.java | 36 +++++------ .../apache/beam/runners/core/NonEmptyPanes.java | 4 +- .../beam/runners/core/SideInputHandler.java | 18 +++--- .../apache/beam/runners/core/StateMerging.java | 14 ++-- .../org/apache/beam/runners/core/StateTag.java | 14 ++-- .../org/apache/beam/runners/core/StateTags.java | 24 +++---- .../beam/runners/core/SystemReduceFn.java | 4 +- .../AfterDelayFromFirstElementStateMachine.java | 8 +-- .../core/triggers/AfterPaneStateMachine.java | 4 +- .../core/InMemoryStateInternalsTest.java | 14 ++-- .../CopyOnAccessInMemoryStateInternals.java | 46 +++++++------ .../CopyOnAccessInMemoryStateInternalsTest.java | 6 +- .../state/FlinkBroadcastStateInternals.java | 68 ++++++++++---------- .../state/FlinkKeyGroupStateInternals.java | 16 ++--- .../state/FlinkSplitStateInternals.java | 16 ++--- .../streaming/state/FlinkStateInternals.java | 68 ++++++++++---------- .../FlinkBroadcastStateInternalsTest.java | 14 ++-- .../streaming/FlinkStateInternalsTest.java | 14 ++-- .../spark/stateful/SparkStateInternals.java | 30 ++++----- .../beam/sdk/transforms/GroupIntoBatches.java | 10 +-- .../util/state/AccumulatorCombiningState.java | 53 --------------- .../beam/sdk/util/state/CombiningState.java | 53 +++++++++++++++ .../apache/beam/sdk/util/state/StateBinder.java | 12 ++-- .../apache/beam/sdk/util/state/StateSpecs.java | 30 ++++----- .../apache/beam/sdk/transforms/ParDoTest.java | 39 +++++------ 27 files changed, 328 insertions(+), 329 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java index 7634366..c59afc5 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java @@ -43,8 +43,8 @@ import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithConte import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.util.CombineFnUtil; -import org.apache.beam.sdk.util.state.AccumulatorCombiningState; import org.apache.beam.sdk.util.state.BagState; +import org.apache.beam.sdk.util.state.CombiningState; import org.apache.beam.sdk.util.state.MapState; import org.apache.beam.sdk.util.state.ReadableState; import org.apache.beam.sdk.util.state.SetState; @@ -139,12 +139,12 @@ public class ApexStateInternals<K> implements StateInternals<K>, Serializable { } @Override - public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> + public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValue( - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, final CombineFn<InputT, AccumT, OutputT> combineFn) { - return new ApexAccumulatorCombiningState<>( + return new ApexCombiningState<>( namespace, address, accumCoder, @@ -161,12 +161,12 @@ public class ApexStateInternals<K> implements StateInternals<K>, Serializable { } @Override - public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> + public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue( - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) { - return new ApexAccumulatorCombiningState<>( + return new ApexCombiningState<>( namespace, address, accumCoder, @@ -174,9 +174,9 @@ public class ApexStateInternals<K> implements StateInternals<K>, Serializable { } @Override - public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> + public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext( - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) { return bindKeyedCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c)); @@ -323,14 +323,14 @@ public class ApexStateInternals<K> implements StateInternals<K>, Serializable { } - private final class ApexAccumulatorCombiningState<K, InputT, AccumT, OutputT> + private final class ApexCombiningState<K, InputT, AccumT, OutputT> extends AbstractState<AccumT> - implements AccumulatorCombiningState<InputT, AccumT, OutputT> { + implements CombiningState<InputT, AccumT, OutputT> { private final K key; private final KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn; - private ApexAccumulatorCombiningState(StateNamespace namespace, - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + private ApexCombiningState(StateNamespace namespace, + StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> coder, K key, KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) { super(namespace, address, coder); @@ -339,7 +339,7 @@ public class ApexStateInternals<K> implements StateInternals<K>, Serializable { } @Override - public ApexAccumulatorCombiningState<K, InputT, AccumT, OutputT> readLater() { + public ApexCombiningState<K, InputT, AccumT, OutputT> readLater() { return this; } http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java index a1494ad..4f4ecfb 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java @@ -35,8 +35,8 @@ import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; -import org.apache.beam.sdk.util.state.AccumulatorCombiningState; import org.apache.beam.sdk.util.state.BagState; +import org.apache.beam.sdk.util.state.CombiningState; import org.apache.beam.sdk.util.state.GroupingState; import org.apache.beam.sdk.util.state.ReadableState; import org.apache.beam.sdk.util.state.ValueState; @@ -58,7 +58,7 @@ public class ApexStateInternalsTest { private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR = StateTags.value("stringValue", StringUtf8Coder.of()); - private static final StateTag<Object, AccumulatorCombiningState<Integer, int[], Integer>> + private static final StateTag<Object, CombiningState<Integer, int[], Integer>> SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal( "sumInteger", VarIntCoder.of(), Sum.ofIntegers()); private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR = @@ -181,9 +181,9 @@ public class ApexStateInternalsTest { @Test public void testMergeCombiningValueIntoSource() throws Exception { - AccumulatorCombiningState<Integer, int[], Integer> value1 = + CombiningState<Integer, int[], Integer> value1 = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); - AccumulatorCombiningState<Integer, int[], Integer> value2 = + CombiningState<Integer, int[], Integer> value2 = underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR); value1.add(5); @@ -202,11 +202,11 @@ public class ApexStateInternalsTest { @Test public void testMergeCombiningValueIntoNewNamespace() throws Exception { - AccumulatorCombiningState<Integer, int[], Integer> value1 = + CombiningState<Integer, int[], Integer> value1 = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); - AccumulatorCombiningState<Integer, int[], Integer> value2 = + CombiningState<Integer, int[], Integer> value2 = underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR); - AccumulatorCombiningState<Integer, int[], Integer> value3 = + CombiningState<Integer, int[], Integer> value3 = underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR); value1.add(5); http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java index b4b2b38..0d5b058 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java @@ -38,8 +38,8 @@ import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithConte import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.util.CombineFnUtil; -import org.apache.beam.sdk.util.state.AccumulatorCombiningState; import org.apache.beam.sdk.util.state.BagState; +import org.apache.beam.sdk.util.state.CombiningState; import org.apache.beam.sdk.util.state.MapState; import org.apache.beam.sdk.util.state.ReadableState; import org.apache.beam.sdk.util.state.SetState; @@ -148,12 +148,12 @@ public class InMemoryStateInternals<K> implements StateInternals<K> { } @Override - public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> + public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValue( - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, final CombineFn<InputT, AccumT, OutputT> combineFn) { - return new InMemoryCombiningValue<K, InputT, AccumT, OutputT>(key, combineFn.<K>asKeyedFn()); + return new InMemoryCombiningState<K, InputT, AccumT, OutputT>(key, combineFn.<K>asKeyedFn()); } @Override @@ -164,18 +164,18 @@ public class InMemoryStateInternals<K> implements StateInternals<K> { } @Override - public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> + public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue( - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) { - return new InMemoryCombiningValue<K, InputT, AccumT, OutputT>(key, combineFn); + return new InMemoryCombiningState<K, InputT, AccumT, OutputT>(key, combineFn); } @Override - public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> + public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext( - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) { return bindKeyedCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c)); @@ -307,17 +307,17 @@ public class InMemoryStateInternals<K> implements StateInternals<K> { } /** - * An {@link InMemoryState} implementation of {@link AccumulatorCombiningState}. + * An {@link InMemoryState} implementation of {@link CombiningState}. */ - public static final class InMemoryCombiningValue<K, InputT, AccumT, OutputT> - implements AccumulatorCombiningState<InputT, AccumT, OutputT>, - InMemoryState<InMemoryCombiningValue<K, InputT, AccumT, OutputT>> { + public static final class InMemoryCombiningState<K, InputT, AccumT, OutputT> + implements CombiningState<InputT, AccumT, OutputT>, + InMemoryState<InMemoryCombiningState<K, InputT, AccumT, OutputT>> { private final K key; private boolean isCleared = true; private final KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn; private AccumT accum; - public InMemoryCombiningValue( + public InMemoryCombiningState( K key, KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) { this.key = key; this.combineFn = combineFn; @@ -325,7 +325,7 @@ public class InMemoryStateInternals<K> implements StateInternals<K> { } @Override - public InMemoryCombiningValue<K, InputT, AccumT, OutputT> readLater() { + public InMemoryCombiningState<K, InputT, AccumT, OutputT> readLater() { return this; } @@ -384,9 +384,9 @@ public class InMemoryStateInternals<K> implements StateInternals<K> { } @Override - public InMemoryCombiningValue<K, InputT, AccumT, OutputT> copy() { - InMemoryCombiningValue<K, InputT, AccumT, OutputT> that = - new InMemoryCombiningValue<>(key, combineFn); + public InMemoryCombiningState<K, InputT, AccumT, OutputT> copy() { + InMemoryCombiningState<K, InputT, AccumT, OutputT> that = + new InMemoryCombiningState<>(key, combineFn); if (!this.isCleared) { that.isCleared = this.isCleared; that.addAccum(accum); http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java index aa033ce..3e875c2 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java @@ -22,7 +22,7 @@ import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; -import org.apache.beam.sdk.util.state.AccumulatorCombiningState; +import org.apache.beam.sdk.util.state.CombiningState; import org.apache.beam.sdk.util.state.ReadableState; /** @@ -113,7 +113,7 @@ public abstract class NonEmptyPanes<K, W extends BoundedWindow> { private static class GeneralNonEmptyPanes<K, W extends BoundedWindow> extends NonEmptyPanes<K, W> { - private static final StateTag<Object, AccumulatorCombiningState<Long, long[], Long>> + private static final StateTag<Object, CombiningState<Long, long[], Long>> PANE_ADDITIONS_TAG = StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal( "count", VarLongCoder.of(), Sum.ofLongs())); http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java index 24f326d..26e920a 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java @@ -31,7 +31,7 @@ import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.state.AccumulatorCombiningState; +import org.apache.beam.sdk.util.state.CombiningState; import org.apache.beam.sdk.util.state.ValueState; import org.apache.beam.sdk.values.PCollectionView; @@ -71,10 +71,10 @@ public class SideInputHandler implements ReadyCheckingSideInputReader { PCollectionView<?>, StateTag< Object, - AccumulatorCombiningState< - BoundedWindow, - Set<BoundedWindow>, - Set<BoundedWindow>>>> availableWindowsTags; + CombiningState< + BoundedWindow, + Set<BoundedWindow>, + Set<BoundedWindow>>>> availableWindowsTags; /** * State tag for the actual contents of each side input per window. @@ -106,10 +106,10 @@ public class SideInputHandler implements ReadyCheckingSideInputReader { StateTag< Object, - AccumulatorCombiningState< - BoundedWindow, - Set<BoundedWindow>, - Set<BoundedWindow>>> availableTag = StateTags.combiningValue( + CombiningState< + BoundedWindow, + Set<BoundedWindow>, + Set<BoundedWindow>>> availableTag = StateTags.combiningValue( "side-input-available-windows-" + sideInput.getTagInternal().getId(), SetCoder.of(windowCoder), new WindowSetCombineFn()); http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java index 593d697..3410850 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java @@ -24,8 +24,8 @@ import java.util.Collection; import java.util.List; import java.util.Map; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.state.AccumulatorCombiningState; import org.apache.beam.sdk.util.state.BagState; +import org.apache.beam.sdk.util.state.CombiningState; import org.apache.beam.sdk.util.state.GroupingState; import org.apache.beam.sdk.util.state.ReadableState; import org.apache.beam.sdk.util.state.SetState; @@ -172,7 +172,7 @@ public class StateMerging { */ public static <K, InputT, AccumT, OutputT, W extends BoundedWindow> void mergeCombiningValues( MergingStateAccessor<K, W> context, - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address) { + StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address) { mergeCombiningValues( context.accessInEachMergingWindow(address).values(), context.access(address)); } @@ -182,8 +182,8 @@ public class StateMerging { * {@code result}. */ public static <InputT, AccumT, OutputT, W extends BoundedWindow> void mergeCombiningValues( - Collection<AccumulatorCombiningState<InputT, AccumT, OutputT>> sources, - AccumulatorCombiningState<InputT, AccumT, OutputT> result) { + Collection<CombiningState<InputT, AccumT, OutputT>> sources, + CombiningState<InputT, AccumT, OutputT> result) { if (sources.isEmpty()) { // Nothing to merge. return; @@ -194,18 +194,18 @@ public class StateMerging { } // Prefetch. List<ReadableState<AccumT>> futures = new ArrayList<>(sources.size()); - for (AccumulatorCombiningState<InputT, AccumT, OutputT> source : sources) { + for (CombiningState<InputT, AccumT, OutputT> source : sources) { prefetchRead(source); } // Read. List<AccumT> accumulators = new ArrayList<>(futures.size()); - for (AccumulatorCombiningState<InputT, AccumT, OutputT> source : sources) { + for (CombiningState<InputT, AccumT, OutputT> source : sources) { accumulators.add(source.getAccum()); } // Merge (possibly update and return one of the existing accumulators). AccumT merged = result.mergeAccumulators(accumulators); // Clear sources. - for (AccumulatorCombiningState<InputT, AccumT, OutputT> source : sources) { + for (CombiningState<InputT, AccumT, OutputT> source : sources) { source.clear(); } // Update result. http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java index 802aede..12c59ad 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java @@ -28,8 +28,8 @@ import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithConte import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; -import org.apache.beam.sdk.util.state.AccumulatorCombiningState; import org.apache.beam.sdk.util.state.BagState; +import org.apache.beam.sdk.util.state.CombiningState; import org.apache.beam.sdk.util.state.MapState; import org.apache.beam.sdk.util.state.SetState; import org.apache.beam.sdk.util.state.State; @@ -94,20 +94,20 @@ public interface StateTag<K, StateT extends State> extends Serializable { StateTag<? super K, MapState<KeyT, ValueT>> spec, Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder); - <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindCombiningValue( - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec, + <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValue( + StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> spec, Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn); <InputT, AccumT, OutputT> - AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue( - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec, + CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue( + StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> spec, Coder<AccumT> accumCoder, KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn); <InputT, AccumT, OutputT> - AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext( - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec, + CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext( + StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> spec, Coder<AccumT> accumCoder, KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn); http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java index 1c70dff..4893919 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java @@ -30,8 +30,8 @@ import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; -import org.apache.beam.sdk.util.state.AccumulatorCombiningState; import org.apache.beam.sdk.util.state.BagState; +import org.apache.beam.sdk.util.state.CombiningState; import org.apache.beam.sdk.util.state.MapState; import org.apache.beam.sdk.util.state.SetState; import org.apache.beam.sdk.util.state.State; @@ -84,9 +84,9 @@ public class StateTags { @Override public <InputT, AccumT, OutputT> - AccumulatorCombiningState<InputT, AccumT, OutputT> bindCombiningValue( + CombiningState<InputT, AccumT, OutputT> bindCombiningValue( String id, - StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec, + StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec, Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) { return binder.bindCombiningValue(tagForSpec(id, spec), accumCoder, combineFn); @@ -94,9 +94,9 @@ public class StateTags { @Override public <InputT, AccumT, OutputT> - AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue( + CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue( String id, - StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec, + StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec, Coder<AccumT> accumCoder, KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) { return binder.bindKeyedCombiningValue(tagForSpec(id, spec), accumCoder, combineFn); @@ -104,9 +104,9 @@ public class StateTags { @Override public <InputT, AccumT, OutputT> - AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext( + CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext( String id, - StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec, + StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec, Coder<AccumT> accumCoder, KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) { return binder.bindKeyedCombiningValueWithContext( @@ -158,7 +158,7 @@ public class StateTags { * multiple {@code InputT}s into a single {@code OutputT}. */ public static <InputT, AccumT, OutputT> - StateTag<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>> + StateTag<Object, CombiningState<InputT, AccumT, OutputT>> combiningValue( String id, Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) { return new SimpleStateTag<>( @@ -170,7 +170,7 @@ public class StateTags { * multiple {@code InputT}s into a single {@code OutputT}. */ public static <K, InputT, AccumT, - OutputT> StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> + OutputT> StateTag<K, CombiningState<InputT, AccumT, OutputT>> keyedCombiningValue(String id, Coder<AccumT> accumCoder, KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) { return new SimpleStateTag<>( @@ -182,7 +182,7 @@ public class StateTags { * merge multiple {@code InputT}s into a single {@code OutputT}. */ public static <K, InputT, AccumT, OutputT> - StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> + StateTag<K, CombiningState<InputT, AccumT, OutputT>> keyedCombiningValueWithContext( String id, Coder<AccumT> accumCoder, @@ -199,7 +199,7 @@ public class StateTags { * should only be used to initialize static values. */ public static <InputT, AccumT, OutputT> - StateTag<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>> + StateTag<Object, CombiningState<InputT, AccumT, OutputT>> combiningValueFromInputInternal( String id, Coder<InputT> inputCoder, CombineFn<InputT, AccumT, OutputT> combineFn) { return new SimpleStateTag<>( @@ -255,7 +255,7 @@ public class StateTags { public static <K, InputT, AccumT, OutputT> StateTag<Object, BagState<AccumT>> convertToBagTagInternal( - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> combiningTag) { + StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> combiningTag) { return new SimpleStateTag<>( new StructuredId(combiningTag.getId()), StateSpecs.convertToBagSpecInternal(combiningTag.getSpec())); http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java index 0f2f790..f618d88 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java @@ -25,8 +25,8 @@ import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithConte import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.AppliedCombineFn; -import org.apache.beam.sdk.util.state.AccumulatorCombiningState; import org.apache.beam.sdk.util.state.BagState; +import org.apache.beam.sdk.util.state.CombiningState; import org.apache.beam.sdk.util.state.GroupingState; import org.apache.beam.sdk.util.state.ReadableState; @@ -71,7 +71,7 @@ public abstract class SystemReduceFn<K, InputT, AccumT, OutputT, W extends Bound AccumT, OutputT, W> combining( final Coder<K> keyCoder, final AppliedCombineFn<K, InputT, AccumT, OutputT> combineFn) { - final StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> bufferTag; + final StateTag<K, CombiningState<InputT, AccumT, OutputT>> bufferTag; if (combineFn.getFn() instanceof KeyedCombineFnWithContext) { bufferTag = StateTags.makeSystemTagInternal( StateTags.<K, InputT, AccumT, OutputT>keyedCombiningValueWithContext( http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java index 29c29a7..b416788 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java @@ -30,11 +30,11 @@ import org.apache.beam.runners.core.StateTags; import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.InstantCoder; -import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Combine.Holder; import org.apache.beam.sdk.transforms.Min; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.state.AccumulatorCombiningState; +import org.apache.beam.sdk.util.state.CombiningState; import org.apache.beam.sdk.util.state.GroupingState; import org.joda.time.Duration; import org.joda.time.Instant; @@ -55,8 +55,8 @@ public abstract class AfterDelayFromFirstElementStateMachine extends OnceTrigger protected static final List<SerializableFunction<Instant, Instant>> IDENTITY = ImmutableList.<SerializableFunction<Instant, Instant>>of(); - protected static final StateTag<Object, AccumulatorCombiningState<Instant, - Combine.Holder<Instant>, Instant>> DELAYED_UNTIL_TAG = + protected static final StateTag<Object, CombiningState<Instant, + Holder<Instant>, Instant>> DELAYED_UNTIL_TAG = StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal( "delayed", InstantCoder.of(), Min.<Instant>naturalOrder())); http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java index 1dd5b65..11323cc 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java @@ -27,7 +27,7 @@ import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStat import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.transforms.Sum; -import org.apache.beam.sdk.util.state.AccumulatorCombiningState; +import org.apache.beam.sdk.util.state.CombiningState; /** * {@link TriggerStateMachine}s that fire based on properties of the elements in the current pane. @@ -35,7 +35,7 @@ import org.apache.beam.sdk.util.state.AccumulatorCombiningState; @Experimental(Experimental.Kind.TRIGGER) public class AfterPaneStateMachine extends OnceTriggerStateMachine { -private static final StateTag<Object, AccumulatorCombiningState<Long, long[], Long>> +private static final StateTag<Object, CombiningState<Long, long[], Long>> ELEMENTS_IN_PANE_TAG = StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal( "count", VarLongCoder.of(), Sum.ofLongs())); http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java index 5f90084..e4fb5c1 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java @@ -35,8 +35,8 @@ import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; -import org.apache.beam.sdk.util.state.AccumulatorCombiningState; import org.apache.beam.sdk.util.state.BagState; +import org.apache.beam.sdk.util.state.CombiningState; import org.apache.beam.sdk.util.state.GroupingState; import org.apache.beam.sdk.util.state.MapState; import org.apache.beam.sdk.util.state.ReadableState; @@ -61,7 +61,7 @@ public class InMemoryStateInternalsTest { private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR = StateTags.value("stringValue", StringUtf8Coder.of()); - private static final StateTag<Object, AccumulatorCombiningState<Integer, int[], Integer>> + private static final StateTag<Object, CombiningState<Integer, int[], Integer>> SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal( "sumInteger", VarIntCoder.of(), Sum.ofIntegers()); private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR = @@ -411,9 +411,9 @@ public class InMemoryStateInternalsTest { @Test public void testMergeCombiningValueIntoSource() throws Exception { - AccumulatorCombiningState<Integer, int[], Integer> value1 = + CombiningState<Integer, int[], Integer> value1 = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); - AccumulatorCombiningState<Integer, int[], Integer> value2 = + CombiningState<Integer, int[], Integer> value2 = underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR); value1.add(5); @@ -432,11 +432,11 @@ public class InMemoryStateInternalsTest { @Test public void testMergeCombiningValueIntoNewNamespace() throws Exception { - AccumulatorCombiningState<Integer, int[], Integer> value1 = + CombiningState<Integer, int[], Integer> value1 = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); - AccumulatorCombiningState<Integer, int[], Integer> value2 = + CombiningState<Integer, int[], Integer> value2 = underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR); - AccumulatorCombiningState<Integer, int[], Integer> value3 = + CombiningState<Integer, int[], Integer> value3 = underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR); value1.add(5); http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java index ff5c23c..0665812 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java @@ -26,7 +26,7 @@ import java.util.HashSet; import java.util.Map; import javax.annotation.Nullable; import org.apache.beam.runners.core.InMemoryStateInternals.InMemoryBag; -import org.apache.beam.runners.core.InMemoryStateInternals.InMemoryCombiningValue; +import org.apache.beam.runners.core.InMemoryStateInternals.InMemoryCombiningState; import org.apache.beam.runners.core.InMemoryStateInternals.InMemoryMap; import org.apache.beam.runners.core.InMemoryStateInternals.InMemorySet; import org.apache.beam.runners.core.InMemoryStateInternals.InMemoryState; @@ -45,8 +45,8 @@ import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithConte import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.util.CombineFnUtil; -import org.apache.beam.sdk.util.state.AccumulatorCombiningState; import org.apache.beam.sdk.util.state.BagState; +import org.apache.beam.sdk.util.state.CombiningState; import org.apache.beam.sdk.util.state.MapState; import org.apache.beam.sdk.util.state.SetState; import org.apache.beam.sdk.util.state.State; @@ -306,19 +306,18 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K> } @Override - public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> + public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValue( - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) { if (containedInUnderlying(namespace, address)) { @SuppressWarnings("unchecked") - InMemoryState<? extends AccumulatorCombiningState<InputT, AccumT, OutputT>> - existingState = ( - InMemoryState<? extends AccumulatorCombiningState<InputT, AccumT, - OutputT>>) underlying.get().get(namespace, address, c); + InMemoryState<? extends CombiningState<InputT, AccumT, OutputT>> existingState = + (InMemoryState<? extends CombiningState<InputT, AccumT, OutputT>>) + underlying.get().get(namespace, address, c); return existingState.copy(); } else { - return new InMemoryCombiningValue<>( + return new InMemoryCombiningState<>( key, combineFn.asKeyedFn()); } } @@ -367,27 +366,26 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K> } @Override - public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> + public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue( - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) { if (containedInUnderlying(namespace, address)) { @SuppressWarnings("unchecked") - InMemoryState<? extends AccumulatorCombiningState<InputT, AccumT, OutputT>> - existingState = ( - InMemoryState<? extends AccumulatorCombiningState<InputT, AccumT, - OutputT>>) underlying.get().get(namespace, address, c); + InMemoryState<? extends CombiningState<InputT, AccumT, OutputT>> existingState = + (InMemoryState<? extends CombiningState<InputT, AccumT, OutputT>>) + underlying.get().get(namespace, address, c); return existingState.copy(); } else { - return new InMemoryCombiningValue<>(key, combineFn); + return new InMemoryCombiningState<>(key, combineFn); } } @Override - public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> + public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext( - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) { return bindKeyedCombiningValue( @@ -449,9 +447,9 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K> } @Override - public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> + public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValue( - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) { return underlying.get(namespace, address, c); } @@ -476,18 +474,18 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K> } @Override - public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> + public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue( - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) { return underlying.get(namespace, address, c); } @Override - public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> + public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext( - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) { return bindKeyedCombiningValue( http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java index 59c0a37..142af32 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java @@ -45,8 +45,8 @@ import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; -import org.apache.beam.sdk.util.state.AccumulatorCombiningState; import org.apache.beam.sdk.util.state.BagState; +import org.apache.beam.sdk.util.state.CombiningState; import org.apache.beam.sdk.util.state.GroupingState; import org.apache.beam.sdk.util.state.MapState; import org.apache.beam.sdk.util.state.SetState; @@ -229,7 +229,7 @@ public class CopyOnAccessInMemoryStateInternalsTest { StateNamespace namespace = new StateNamespaceForTest("foo"); CoderRegistry reg = pipeline.getCoderRegistry(); - StateTag<Object, AccumulatorCombiningState<Long, long[], Long>> stateTag = + StateTag<Object, CombiningState<Long, long[], Long>> stateTag = StateTags.combiningValue("summer", sumLongFn.getAccumulatorCoder(reg, reg.getDefaultCoder(Long.class)), sumLongFn); GroupingState<Long, Long> underlyingValue = underlying.state(namespace, stateTag); @@ -259,7 +259,7 @@ public class CopyOnAccessInMemoryStateInternalsTest { StateNamespace namespace = new StateNamespaceForTest("foo"); CoderRegistry reg = pipeline.getCoderRegistry(); - StateTag<String, AccumulatorCombiningState<Long, long[], Long>> stateTag = + StateTag<String, CombiningState<Long, long[], Long>> stateTag = StateTags.keyedCombiningValue( "summer", sumLongFn.getAccumulatorCoder( http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java index bcc3660..3203446 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java @@ -37,8 +37,8 @@ import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.util.CombineContextFactory; -import org.apache.beam.sdk.util.state.AccumulatorCombiningState; import org.apache.beam.sdk.util.state.BagState; +import org.apache.beam.sdk.util.state.CombiningState; import org.apache.beam.sdk.util.state.MapState; import org.apache.beam.sdk.util.state.ReadableState; import org.apache.beam.sdk.util.state.SetState; @@ -133,23 +133,23 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> { @Override public <InputT, AccumT, OutputT> - AccumulatorCombiningState<InputT, AccumT, OutputT> + CombiningState<InputT, AccumT, OutputT> bindCombiningValue( - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, Combine.CombineFn<InputT, AccumT, OutputT> combineFn) { - return new FlinkAccumulatorCombiningState<>( + return new FlinkCombiningState<>( stateBackend, address, combineFn, namespace, accumCoder); } @Override public <InputT, AccumT, OutputT> - AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue( - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue( + StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) { - return new FlinkKeyedAccumulatorCombiningState<>( + return new FlinkKeyedCombiningState<>( stateBackend, address, combineFn, @@ -160,12 +160,12 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> { @Override public <InputT, AccumT, OutputT> - AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext( - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext( + StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, CombineWithContext.KeyedCombineFnWithContext< ? super K, InputT, AccumT, OutputT> combineFn) { - return new FlinkAccumulatorCombiningStateWithContext<>( + return new FlinkCombiningStateWithContext<>( stateBackend, address, combineFn, @@ -464,17 +464,17 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> { } } - private class FlinkAccumulatorCombiningState<K, InputT, AccumT, OutputT> + private class FlinkCombiningState<K, InputT, AccumT, OutputT> extends AbstractBroadcastState<AccumT> - implements AccumulatorCombiningState<InputT, AccumT, OutputT> { + implements CombiningState<InputT, AccumT, OutputT> { private final StateNamespace namespace; - private final StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address; + private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address; private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn; - FlinkAccumulatorCombiningState( + FlinkCombiningState( DefaultOperatorStateBackend flinkStateBackend, - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, Combine.CombineFn<InputT, AccumT, OutputT> combineFn, StateNamespace namespace, Coder<AccumT> accumCoder) { @@ -486,7 +486,7 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> { } @Override - public AccumulatorCombiningState<InputT, AccumT, OutputT> readLater() { + public CombiningState<InputT, AccumT, OutputT> readLater() { return this; } @@ -566,8 +566,8 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> { return false; } - FlinkAccumulatorCombiningState<?, ?, ?, ?> that = - (FlinkAccumulatorCombiningState<?, ?, ?, ?>) o; + FlinkCombiningState<?, ?, ?, ?> that = + (FlinkCombiningState<?, ?, ?, ?>) o; return namespace.equals(that.namespace) && address.equals(that.address); @@ -581,18 +581,18 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> { } } - private class FlinkKeyedAccumulatorCombiningState<K, InputT, AccumT, OutputT> + private class FlinkKeyedCombiningState<K, InputT, AccumT, OutputT> extends AbstractBroadcastState<AccumT> - implements AccumulatorCombiningState<InputT, AccumT, OutputT> { + implements CombiningState<InputT, AccumT, OutputT> { private final StateNamespace namespace; - private final StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address; + private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address; private final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn; private final FlinkBroadcastStateInternals<K> flinkStateInternals; - FlinkKeyedAccumulatorCombiningState( + FlinkKeyedCombiningState( DefaultOperatorStateBackend flinkStateBackend, - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn, StateNamespace namespace, Coder<AccumT> accumCoder, @@ -607,7 +607,7 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> { } @Override - public AccumulatorCombiningState<InputT, AccumT, OutputT> readLater() { + public CombiningState<InputT, AccumT, OutputT> readLater() { return this; } @@ -706,8 +706,8 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> { return false; } - FlinkKeyedAccumulatorCombiningState<?, ?, ?, ?> that = - (FlinkKeyedAccumulatorCombiningState<?, ?, ?, ?>) o; + FlinkKeyedCombiningState<?, ?, ?, ?> that = + (FlinkKeyedCombiningState<?, ?, ?, ?>) o; return namespace.equals(that.namespace) && address.equals(that.address); @@ -721,20 +721,20 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> { } } - private class FlinkAccumulatorCombiningStateWithContext<K, InputT, AccumT, OutputT> + private class FlinkCombiningStateWithContext<K, InputT, AccumT, OutputT> extends AbstractBroadcastState<AccumT> - implements AccumulatorCombiningState<InputT, AccumT, OutputT> { + implements CombiningState<InputT, AccumT, OutputT> { private final StateNamespace namespace; - private final StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address; + private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address; private final CombineWithContext.KeyedCombineFnWithContext< ? super K, InputT, AccumT, OutputT> combineFn; private final FlinkBroadcastStateInternals<K> flinkStateInternals; private final CombineWithContext.Context context; - FlinkAccumulatorCombiningStateWithContext( + FlinkCombiningStateWithContext( DefaultOperatorStateBackend flinkStateBackend, - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, CombineWithContext.KeyedCombineFnWithContext< ? super K, InputT, AccumT, OutputT> combineFn, StateNamespace namespace, @@ -752,7 +752,7 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> { } @Override - public AccumulatorCombiningState<InputT, AccumT, OutputT> readLater() { + public CombiningState<InputT, AccumT, OutputT> readLater() { return this; } @@ -847,8 +847,8 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> { return false; } - FlinkAccumulatorCombiningStateWithContext<?, ?, ?, ?> that = - (FlinkAccumulatorCombiningStateWithContext<?, ?, ?, ?>) o; + FlinkCombiningStateWithContext<?, ?, ?, ?> that = + (FlinkCombiningStateWithContext<?, ?, ?, ?>) o; return namespace.equals(that.namespace) && address.equals(that.address); http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java index a29b1b2..24b340e 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java @@ -40,8 +40,8 @@ import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.util.state.AccumulatorCombiningState; import org.apache.beam.sdk.util.state.BagState; +import org.apache.beam.sdk.util.state.CombiningState; import org.apache.beam.sdk.util.state.MapState; import org.apache.beam.sdk.util.state.ReadableState; import org.apache.beam.sdk.util.state.SetState; @@ -156,9 +156,9 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> { @Override public <InputT, AccumT, OutputT> - AccumulatorCombiningState<InputT, AccumT, OutputT> + CombiningState<InputT, AccumT, OutputT> bindCombiningValue( - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, Combine.CombineFn<InputT, AccumT, OutputT> combineFn) { throw new UnsupportedOperationException("bindCombiningValue is not supported."); @@ -166,8 +166,8 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> { @Override public <InputT, AccumT, OutputT> - AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue( - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue( + StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) { throw new UnsupportedOperationException("bindKeyedCombiningValue is not supported."); @@ -176,8 +176,8 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> { @Override public <InputT, AccumT, OutputT> - AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext( - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext( + StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, CombineWithContext.KeyedCombineFnWithContext< ? super K, InputT, AccumT, OutputT> combineFn) { @@ -190,7 +190,7 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> { StateTag<? super K, WatermarkHoldState<W>> address, OutputTimeFn<? super W> outputTimeFn) { throw new UnsupportedOperationException( - String.format("%s is not supported", AccumulatorCombiningState.class.getSimpleName())); + String.format("%s is not supported", CombiningState.class.getSimpleName())); } }); } http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java index d9e87d1..2bf0bf1 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java @@ -28,8 +28,8 @@ import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; -import org.apache.beam.sdk.util.state.AccumulatorCombiningState; import org.apache.beam.sdk.util.state.BagState; +import org.apache.beam.sdk.util.state.CombiningState; import org.apache.beam.sdk.util.state.MapState; import org.apache.beam.sdk.util.state.ReadableState; import org.apache.beam.sdk.util.state.SetState; @@ -116,9 +116,9 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> { @Override public <InputT, AccumT, OutputT> - AccumulatorCombiningState<InputT, AccumT, OutputT> + CombiningState<InputT, AccumT, OutputT> bindCombiningValue( - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, Combine.CombineFn<InputT, AccumT, OutputT> combineFn) { throw new UnsupportedOperationException("bindCombiningValue is not supported."); @@ -126,8 +126,8 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> { @Override public <InputT, AccumT, OutputT> - AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue( - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue( + StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) { throw new UnsupportedOperationException("bindKeyedCombiningValue is not supported."); @@ -136,8 +136,8 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> { @Override public <InputT, AccumT, OutputT> - AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext( - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext( + StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, CombineWithContext.KeyedCombineFnWithContext< ? super K, InputT, AccumT, OutputT> combineFn) { @@ -150,7 +150,7 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> { StateTag<? super K, WatermarkHoldState<W>> address, OutputTimeFn<? super W> outputTimeFn) { throw new UnsupportedOperationException( - String.format("%s is not supported", AccumulatorCombiningState.class.getSimpleName())); + String.format("%s is not supported", CombiningState.class.getSimpleName())); } }); } http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java index 9033ba7..4f961e5 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java @@ -35,8 +35,8 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.CombineContextFactory; -import org.apache.beam.sdk.util.state.AccumulatorCombiningState; import org.apache.beam.sdk.util.state.BagState; +import org.apache.beam.sdk.util.state.CombiningState; import org.apache.beam.sdk.util.state.MapState; import org.apache.beam.sdk.util.state.ReadableState; import org.apache.beam.sdk.util.state.SetState; @@ -142,23 +142,23 @@ public class FlinkStateInternals<K> implements StateInternals<K> { @Override public <InputT, AccumT, OutputT> - AccumulatorCombiningState<InputT, AccumT, OutputT> + CombiningState<InputT, AccumT, OutputT> bindCombiningValue( - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, Combine.CombineFn<InputT, AccumT, OutputT> combineFn) { - return new FlinkAccumulatorCombiningState<>( + return new FlinkCombiningState<>( flinkStateBackend, address, combineFn, namespace, accumCoder); } @Override public <InputT, AccumT, OutputT> - AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue( - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue( + StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) { - return new FlinkKeyedAccumulatorCombiningState<>( + return new FlinkKeyedCombiningState<>( flinkStateBackend, address, combineFn, @@ -169,12 +169,12 @@ public class FlinkStateInternals<K> implements StateInternals<K> { @Override public <InputT, AccumT, OutputT> - AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext( - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext( + StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, CombineWithContext.KeyedCombineFnWithContext< ? super K, InputT, AccumT, OutputT> combineFn) { - return new FlinkAccumulatorCombiningStateWithContext<>( + return new FlinkCombiningStateWithContext<>( flinkStateBackend, address, combineFn, @@ -393,18 +393,18 @@ public class FlinkStateInternals<K> implements StateInternals<K> { } } - private static class FlinkAccumulatorCombiningState<K, InputT, AccumT, OutputT> - implements AccumulatorCombiningState<InputT, AccumT, OutputT> { + private static class FlinkCombiningState<K, InputT, AccumT, OutputT> + implements CombiningState<InputT, AccumT, OutputT> { private final StateNamespace namespace; - private final StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address; + private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address; private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn; private final ValueStateDescriptor<AccumT> flinkStateDescriptor; private final KeyedStateBackend<ByteBuffer> flinkStateBackend; - FlinkAccumulatorCombiningState( + FlinkCombiningState( KeyedStateBackend<ByteBuffer> flinkStateBackend, - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, Combine.CombineFn<InputT, AccumT, OutputT> combineFn, StateNamespace namespace, Coder<AccumT> accumCoder) { @@ -420,7 +420,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> { } @Override - public AccumulatorCombiningState<InputT, AccumT, OutputT> readLater() { + public CombiningState<InputT, AccumT, OutputT> readLater() { return this; } @@ -546,8 +546,8 @@ public class FlinkStateInternals<K> implements StateInternals<K> { return false; } - FlinkAccumulatorCombiningState<?, ?, ?, ?> that = - (FlinkAccumulatorCombiningState<?, ?, ?, ?>) o; + FlinkCombiningState<?, ?, ?, ?> that = + (FlinkCombiningState<?, ?, ?, ?>) o; return namespace.equals(that.namespace) && address.equals(that.address); @@ -561,19 +561,19 @@ public class FlinkStateInternals<K> implements StateInternals<K> { } } - private static class FlinkKeyedAccumulatorCombiningState<K, InputT, AccumT, OutputT> - implements AccumulatorCombiningState<InputT, AccumT, OutputT> { + private static class FlinkKeyedCombiningState<K, InputT, AccumT, OutputT> + implements CombiningState<InputT, AccumT, OutputT> { private final StateNamespace namespace; - private final StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address; + private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address; private final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn; private final ValueStateDescriptor<AccumT> flinkStateDescriptor; private final KeyedStateBackend<ByteBuffer> flinkStateBackend; private final FlinkStateInternals<K> flinkStateInternals; - FlinkKeyedAccumulatorCombiningState( + FlinkKeyedCombiningState( KeyedStateBackend<ByteBuffer> flinkStateBackend, - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn, StateNamespace namespace, Coder<AccumT> accumCoder, @@ -591,7 +591,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> { } @Override - public AccumulatorCombiningState<InputT, AccumT, OutputT> readLater() { + public CombiningState<InputT, AccumT, OutputT> readLater() { return this; } @@ -721,8 +721,8 @@ public class FlinkStateInternals<K> implements StateInternals<K> { return false; } - FlinkKeyedAccumulatorCombiningState<?, ?, ?, ?> that = - (FlinkKeyedAccumulatorCombiningState<?, ?, ?, ?>) o; + FlinkKeyedCombiningState<?, ?, ?, ?> that = + (FlinkKeyedCombiningState<?, ?, ?, ?>) o; return namespace.equals(that.namespace) && address.equals(that.address); @@ -736,11 +736,11 @@ public class FlinkStateInternals<K> implements StateInternals<K> { } } - private static class FlinkAccumulatorCombiningStateWithContext<K, InputT, AccumT, OutputT> - implements AccumulatorCombiningState<InputT, AccumT, OutputT> { + private static class FlinkCombiningStateWithContext<K, InputT, AccumT, OutputT> + implements CombiningState<InputT, AccumT, OutputT> { private final StateNamespace namespace; - private final StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address; + private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address; private final CombineWithContext.KeyedCombineFnWithContext< ? super K, InputT, AccumT, OutputT> combineFn; private final ValueStateDescriptor<AccumT> flinkStateDescriptor; @@ -748,9 +748,9 @@ public class FlinkStateInternals<K> implements StateInternals<K> { private final FlinkStateInternals<K> flinkStateInternals; private final CombineWithContext.Context context; - FlinkAccumulatorCombiningStateWithContext( + FlinkCombiningStateWithContext( KeyedStateBackend<ByteBuffer> flinkStateBackend, - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, CombineWithContext.KeyedCombineFnWithContext< ? super K, InputT, AccumT, OutputT> combineFn, StateNamespace namespace, @@ -771,7 +771,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> { } @Override - public AccumulatorCombiningState<InputT, AccumT, OutputT> readLater() { + public CombiningState<InputT, AccumT, OutputT> readLater() { return this; } @@ -896,8 +896,8 @@ public class FlinkStateInternals<K> implements StateInternals<K> { return false; } - FlinkAccumulatorCombiningStateWithContext<?, ?, ?, ?> that = - (FlinkAccumulatorCombiningStateWithContext<?, ?, ?, ?>) o; + FlinkCombiningStateWithContext<?, ?, ?, ?> that = + (FlinkCombiningStateWithContext<?, ?, ?, ?>) o; return namespace.equals(that.namespace) && address.equals(that.address); http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java index f4e3ea8..7e7d1e1 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java @@ -32,8 +32,8 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkB import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.transforms.Sum; -import org.apache.beam.sdk.util.state.AccumulatorCombiningState; import org.apache.beam.sdk.util.state.BagState; +import org.apache.beam.sdk.util.state.CombiningState; import org.apache.beam.sdk.util.state.GroupingState; import org.apache.beam.sdk.util.state.ReadableState; import org.apache.beam.sdk.util.state.ValueState; @@ -58,7 +58,7 @@ public class FlinkBroadcastStateInternalsTest { private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR = StateTags.value("stringValue", StringUtf8Coder.of()); - private static final StateTag<Object, AccumulatorCombiningState<Integer, int[], Integer>> + private static final StateTag<Object, CombiningState<Integer, int[], Integer>> SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal( "sumInteger", VarIntCoder.of(), Sum.ofIntegers()); private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR = @@ -202,9 +202,9 @@ public class FlinkBroadcastStateInternalsTest { @Test public void testMergeCombiningValueIntoSource() throws Exception { - AccumulatorCombiningState<Integer, int[], Integer> value1 = + CombiningState<Integer, int[], Integer> value1 = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); - AccumulatorCombiningState<Integer, int[], Integer> value2 = + CombiningState<Integer, int[], Integer> value2 = underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR); value1.add(5); @@ -223,11 +223,11 @@ public class FlinkBroadcastStateInternalsTest { @Test public void testMergeCombiningValueIntoNewNamespace() throws Exception { - AccumulatorCombiningState<Integer, int[], Integer> value1 = + CombiningState<Integer, int[], Integer> value1 = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); - AccumulatorCombiningState<Integer, int[], Integer> value2 = + CombiningState<Integer, int[], Integer> value2 = underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR); - AccumulatorCombiningState<Integer, int[], Integer> value3 = + CombiningState<Integer, int[], Integer> value3 = underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR); value1.add(5); http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java index 27747dd..d140271 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java @@ -37,8 +37,8 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.util.state.AccumulatorCombiningState; import org.apache.beam.sdk.util.state.BagState; +import org.apache.beam.sdk.util.state.CombiningState; import org.apache.beam.sdk.util.state.GroupingState; import org.apache.beam.sdk.util.state.ReadableState; import org.apache.beam.sdk.util.state.ValueState; @@ -72,7 +72,7 @@ public class FlinkStateInternalsTest { private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR = StateTags.value("stringValue", StringUtf8Coder.of()); - private static final StateTag<Object, AccumulatorCombiningState<Integer, int[], Integer>> + private static final StateTag<Object, CombiningState<Integer, int[], Integer>> SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal( "sumInteger", VarIntCoder.of(), Sum.ofIntegers()); private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR = @@ -232,9 +232,9 @@ public class FlinkStateInternalsTest { @Test public void testMergeCombiningValueIntoSource() throws Exception { - AccumulatorCombiningState<Integer, int[], Integer> value1 = + CombiningState<Integer, int[], Integer> value1 = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); - AccumulatorCombiningState<Integer, int[], Integer> value2 = + CombiningState<Integer, int[], Integer> value2 = underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR); value1.add(5); @@ -253,11 +253,11 @@ public class FlinkStateInternalsTest { @Test public void testMergeCombiningValueIntoNewNamespace() throws Exception { - AccumulatorCombiningState<Integer, int[], Integer> value1 = + CombiningState<Integer, int[], Integer> value1 = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); - AccumulatorCombiningState<Integer, int[], Integer> value2 = + CombiningState<Integer, int[], Integer> value2 = underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR); - AccumulatorCombiningState<Integer, int[], Integer> value3 = + CombiningState<Integer, int[], Integer> value3 = underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR); value1.add(5); http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java index 43fb383..725e9d3 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java @@ -36,8 +36,8 @@ import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithConte import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.util.CombineFnUtil; -import org.apache.beam.sdk.util.state.AccumulatorCombiningState; import org.apache.beam.sdk.util.state.BagState; +import org.apache.beam.sdk.util.state.CombiningState; import org.apache.beam.sdk.util.state.MapState; import org.apache.beam.sdk.util.state.ReadableState; import org.apache.beam.sdk.util.state.SetState; @@ -137,31 +137,31 @@ class SparkStateInternals<K> implements StateInternals<K> { } @Override - public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> + public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValue( - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) { - return new SparkAccumulatorCombiningState<>(namespace, address, accumCoder, key, + return new SparkCombiningState<>(namespace, address, accumCoder, key, combineFn.<K>asKeyedFn()); } @Override - public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> + public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue( - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) { - return new SparkAccumulatorCombiningState<>(namespace, address, accumCoder, key, combineFn); + return new SparkCombiningState<>(namespace, address, accumCoder, key, combineFn); } @Override - public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> + public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext( - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) { - return new SparkAccumulatorCombiningState<>(namespace, address, accumCoder, key, + return new SparkCombiningState<>(namespace, address, accumCoder, key, CombineFnUtil.bindContext(combineFn, c)); } @@ -300,16 +300,16 @@ class SparkStateInternals<K> implements StateInternals<K> { } } - private class SparkAccumulatorCombiningState<K, InputT, AccumT, OutputT> + private class SparkCombiningState<K, InputT, AccumT, OutputT> extends AbstractState<AccumT> - implements AccumulatorCombiningState<InputT, AccumT, OutputT> { + implements CombiningState<InputT, AccumT, OutputT> { private final K key; private final KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn; - private SparkAccumulatorCombiningState( + private SparkCombiningState( StateNamespace namespace, - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> coder, K key, KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) { @@ -319,7 +319,7 @@ class SparkStateInternals<K> implements StateInternals<K> { } @Override - public SparkAccumulatorCombiningState<K, InputT, AccumT, OutputT> readLater() { + public SparkCombiningState<K, InputT, AccumT, OutputT> readLater() { return this; }
