Rename AccumulatorCombiningState to CombiningState

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ef480a37
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ef480a37
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ef480a37

Branch: refs/heads/master
Commit: ef480a37ebe039d0eaa2d4ca758ea015893e9089
Parents: 24c0495
Author: Kenneth Knowles <[email protected]>
Authored: Mon Apr 3 11:27:26 2017 -0700
Committer: Kenneth Knowles <[email protected]>
Committed: Thu Apr 6 11:57:21 2017 -0700

----------------------------------------------------------------------
 .../translation/utils/ApexStateInternals.java   | 28 ++++----
 .../utils/ApexStateInternalsTest.java           | 14 ++--
 .../runners/core/InMemoryStateInternals.java    | 36 +++++------
 .../apache/beam/runners/core/NonEmptyPanes.java |  4 +-
 .../beam/runners/core/SideInputHandler.java     | 18 +++---
 .../apache/beam/runners/core/StateMerging.java  | 14 ++--
 .../org/apache/beam/runners/core/StateTag.java  | 14 ++--
 .../org/apache/beam/runners/core/StateTags.java | 24 +++----
 .../beam/runners/core/SystemReduceFn.java       |  4 +-
 .../AfterDelayFromFirstElementStateMachine.java |  8 +--
 .../core/triggers/AfterPaneStateMachine.java    |  4 +-
 .../core/InMemoryStateInternalsTest.java        | 14 ++--
 .../CopyOnAccessInMemoryStateInternals.java     | 46 +++++++------
 .../CopyOnAccessInMemoryStateInternalsTest.java |  6 +-
 .../state/FlinkBroadcastStateInternals.java     | 68 ++++++++++----------
 .../state/FlinkKeyGroupStateInternals.java      | 16 ++---
 .../state/FlinkSplitStateInternals.java         | 16 ++---
 .../streaming/state/FlinkStateInternals.java    | 68 ++++++++++----------
 .../FlinkBroadcastStateInternalsTest.java       | 14 ++--
 .../streaming/FlinkStateInternalsTest.java      | 14 ++--
 .../spark/stateful/SparkStateInternals.java     | 30 ++++-----
 .../beam/sdk/transforms/GroupIntoBatches.java   | 10 +--
 .../util/state/AccumulatorCombiningState.java   | 53 ---------------
 .../beam/sdk/util/state/CombiningState.java     | 53 +++++++++++++++
 .../apache/beam/sdk/util/state/StateBinder.java | 12 ++--
 .../apache/beam/sdk/util/state/StateSpecs.java  | 30 ++++-----
 .../apache/beam/sdk/transforms/ParDoTest.java   | 39 +++++------
 27 files changed, 328 insertions(+), 329 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
index 7634366..c59afc5 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
@@ -43,8 +43,8 @@ import 
org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithConte
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.util.CombineFnUtil;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.MapState;
 import org.apache.beam.sdk.util.state.ReadableState;
 import org.apache.beam.sdk.util.state.SetState;
@@ -139,12 +139,12 @@ public class ApexStateInternals<K> implements 
StateInternals<K>, Serializable {
     }
 
     @Override
-    public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, 
OutputT>
+    public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
         bindCombiningValue(
-            StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
+            StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> 
address,
             Coder<AccumT> accumCoder,
             final CombineFn<InputT, AccumT, OutputT> combineFn) {
-      return new ApexAccumulatorCombiningState<>(
+      return new ApexCombiningState<>(
           namespace,
           address,
           accumCoder,
@@ -161,12 +161,12 @@ public class ApexStateInternals<K> implements 
StateInternals<K>, Serializable {
     }
 
     @Override
-    public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, 
OutputT>
+    public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
         bindKeyedCombiningValue(
-            StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
+            StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> 
address,
             Coder<AccumT> accumCoder,
             KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
-      return new ApexAccumulatorCombiningState<>(
+      return new ApexCombiningState<>(
           namespace,
           address,
           accumCoder,
@@ -174,9 +174,9 @@ public class ApexStateInternals<K> implements 
StateInternals<K>, Serializable {
     }
 
     @Override
-    public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, 
OutputT>
+    public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
         bindKeyedCombiningValueWithContext(
-            StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
+            StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> 
address,
             Coder<AccumT> accumCoder,
             KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> 
combineFn) {
       return bindKeyedCombiningValue(address, accumCoder, 
CombineFnUtil.bindContext(combineFn, c));
@@ -323,14 +323,14 @@ public class ApexStateInternals<K> implements 
StateInternals<K>, Serializable {
 
   }
 
-  private final class ApexAccumulatorCombiningState<K, InputT, AccumT, OutputT>
+  private final class ApexCombiningState<K, InputT, AccumT, OutputT>
       extends AbstractState<AccumT>
-      implements AccumulatorCombiningState<InputT, AccumT, OutputT> {
+      implements CombiningState<InputT, AccumT, OutputT> {
     private final K key;
     private final KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn;
 
-    private ApexAccumulatorCombiningState(StateNamespace namespace,
-        StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
+    private ApexCombiningState(StateNamespace namespace,
+        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
         Coder<AccumT> coder,
         K key, KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
       super(namespace, address, coder);
@@ -339,7 +339,7 @@ public class ApexStateInternals<K> implements 
StateInternals<K>, Serializable {
     }
 
     @Override
-    public ApexAccumulatorCombiningState<K, InputT, AccumT, OutputT> 
readLater() {
+    public ApexCombiningState<K, InputT, AccumT, OutputT> readLater() {
       return this;
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
index a1494ad..4f4ecfb 100644
--- 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
+++ 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
@@ -35,8 +35,8 @@ import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.GroupingState;
 import org.apache.beam.sdk.util.state.ReadableState;
 import org.apache.beam.sdk.util.state.ValueState;
@@ -58,7 +58,7 @@ public class ApexStateInternalsTest {
 
   private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
       StateTags.value("stringValue", StringUtf8Coder.of());
-  private static final StateTag<Object, AccumulatorCombiningState<Integer, 
int[], Integer>>
+  private static final StateTag<Object, CombiningState<Integer, int[], 
Integer>>
       SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
           "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
   private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
@@ -181,9 +181,9 @@ public class ApexStateInternalsTest {
 
   @Test
   public void testMergeCombiningValueIntoSource() throws Exception {
-    AccumulatorCombiningState<Integer, int[], Integer> value1 =
+    CombiningState<Integer, int[], Integer> value1 =
         underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-    AccumulatorCombiningState<Integer, int[], Integer> value2 =
+    CombiningState<Integer, int[], Integer> value2 =
         underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
 
     value1.add(5);
@@ -202,11 +202,11 @@ public class ApexStateInternalsTest {
 
   @Test
   public void testMergeCombiningValueIntoNewNamespace() throws Exception {
-    AccumulatorCombiningState<Integer, int[], Integer> value1 =
+    CombiningState<Integer, int[], Integer> value1 =
         underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-    AccumulatorCombiningState<Integer, int[], Integer> value2 =
+    CombiningState<Integer, int[], Integer> value2 =
         underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
-    AccumulatorCombiningState<Integer, int[], Integer> value3 =
+    CombiningState<Integer, int[], Integer> value3 =
         underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR);
 
     value1.add(5);

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
index b4b2b38..0d5b058 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
@@ -38,8 +38,8 @@ import 
org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithConte
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.util.CombineFnUtil;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.MapState;
 import org.apache.beam.sdk.util.state.ReadableState;
 import org.apache.beam.sdk.util.state.SetState;
@@ -148,12 +148,12 @@ public class InMemoryStateInternals<K> implements 
StateInternals<K> {
     }
 
     @Override
-    public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, 
OutputT>
+    public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
         bindCombiningValue(
-            StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
+            StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> 
address,
             Coder<AccumT> accumCoder,
             final CombineFn<InputT, AccumT, OutputT> combineFn) {
-      return new InMemoryCombiningValue<K, InputT, AccumT, OutputT>(key, 
combineFn.<K>asKeyedFn());
+      return new InMemoryCombiningState<K, InputT, AccumT, OutputT>(key, 
combineFn.<K>asKeyedFn());
     }
 
     @Override
@@ -164,18 +164,18 @@ public class InMemoryStateInternals<K> implements 
StateInternals<K> {
     }
 
     @Override
-    public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, 
OutputT>
+    public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
         bindKeyedCombiningValue(
-            StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
+            StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> 
address,
             Coder<AccumT> accumCoder,
             KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
-      return new InMemoryCombiningValue<K, InputT, AccumT, OutputT>(key, 
combineFn);
+      return new InMemoryCombiningState<K, InputT, AccumT, OutputT>(key, 
combineFn);
     }
 
     @Override
-    public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, 
OutputT>
+    public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
         bindKeyedCombiningValueWithContext(
-            StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
+            StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> 
address,
             Coder<AccumT> accumCoder,
             KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> 
combineFn) {
       return bindKeyedCombiningValue(address, accumCoder, 
CombineFnUtil.bindContext(combineFn, c));
@@ -307,17 +307,17 @@ public class InMemoryStateInternals<K> implements 
StateInternals<K> {
   }
 
   /**
-   * An {@link InMemoryState} implementation of {@link 
AccumulatorCombiningState}.
+   * An {@link InMemoryState} implementation of {@link CombiningState}.
    */
-  public static final class InMemoryCombiningValue<K, InputT, AccumT, OutputT>
-      implements AccumulatorCombiningState<InputT, AccumT, OutputT>,
-          InMemoryState<InMemoryCombiningValue<K, InputT, AccumT, OutputT>> {
+  public static final class InMemoryCombiningState<K, InputT, AccumT, OutputT>
+      implements CombiningState<InputT, AccumT, OutputT>,
+          InMemoryState<InMemoryCombiningState<K, InputT, AccumT, OutputT>> {
     private final K key;
     private boolean isCleared = true;
     private final KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn;
     private AccumT accum;
 
-    public InMemoryCombiningValue(
+    public InMemoryCombiningState(
         K key, KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
       this.key = key;
       this.combineFn = combineFn;
@@ -325,7 +325,7 @@ public class InMemoryStateInternals<K> implements 
StateInternals<K> {
     }
 
     @Override
-    public InMemoryCombiningValue<K, InputT, AccumT, OutputT> readLater() {
+    public InMemoryCombiningState<K, InputT, AccumT, OutputT> readLater() {
       return this;
     }
 
@@ -384,9 +384,9 @@ public class InMemoryStateInternals<K> implements 
StateInternals<K> {
     }
 
     @Override
-    public InMemoryCombiningValue<K, InputT, AccumT, OutputT> copy() {
-      InMemoryCombiningValue<K, InputT, AccumT, OutputT> that =
-          new InMemoryCombiningValue<>(key, combineFn);
+    public InMemoryCombiningState<K, InputT, AccumT, OutputT> copy() {
+      InMemoryCombiningState<K, InputT, AccumT, OutputT> that =
+          new InMemoryCombiningState<>(key, combineFn);
       if (!this.isCleared) {
         that.isCleared = this.isCleared;
         that.addAccum(accum);

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
index aa033ce..3e875c2 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
@@ -22,7 +22,7 @@ import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
+import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.ReadableState;
 
 /**
@@ -113,7 +113,7 @@ public abstract class NonEmptyPanes<K, W extends 
BoundedWindow> {
   private static class GeneralNonEmptyPanes<K, W extends BoundedWindow>
       extends NonEmptyPanes<K, W> {
 
-    private static final StateTag<Object, AccumulatorCombiningState<Long, 
long[], Long>>
+    private static final StateTag<Object, CombiningState<Long, long[], Long>>
         PANE_ADDITIONS_TAG =
         
StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
             "count", VarLongCoder.of(), Sum.ofLongs()));

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
index 24f326d..26e920a 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
@@ -31,7 +31,7 @@ import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
+import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.ValueState;
 import org.apache.beam.sdk.values.PCollectionView;
 
@@ -71,10 +71,10 @@ public class SideInputHandler implements 
ReadyCheckingSideInputReader {
       PCollectionView<?>,
       StateTag<
           Object,
-          AccumulatorCombiningState<
-              BoundedWindow,
-              Set<BoundedWindow>,
-              Set<BoundedWindow>>>> availableWindowsTags;
+          CombiningState<
+                        BoundedWindow,
+                        Set<BoundedWindow>,
+                        Set<BoundedWindow>>>> availableWindowsTags;
 
   /**
    * State tag for the actual contents of each side input per window.
@@ -106,10 +106,10 @@ public class SideInputHandler implements 
ReadyCheckingSideInputReader {
 
       StateTag<
           Object,
-          AccumulatorCombiningState<
-              BoundedWindow,
-              Set<BoundedWindow>,
-              Set<BoundedWindow>>> availableTag = StateTags.combiningValue(
+          CombiningState<
+                        BoundedWindow,
+                        Set<BoundedWindow>,
+                        Set<BoundedWindow>>> availableTag = 
StateTags.combiningValue(
           "side-input-available-windows-" + sideInput.getTagInternal().getId(),
           SetCoder.of(windowCoder),
           new WindowSetCombineFn());

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
index 593d697..3410850 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
@@ -24,8 +24,8 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.GroupingState;
 import org.apache.beam.sdk.util.state.ReadableState;
 import org.apache.beam.sdk.util.state.SetState;
@@ -172,7 +172,7 @@ public class StateMerging {
    */
   public static <K, InputT, AccumT, OutputT, W extends BoundedWindow> void 
mergeCombiningValues(
       MergingStateAccessor<K, W> context,
-      StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> 
address) {
+      StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address) {
     mergeCombiningValues(
         context.accessInEachMergingWindow(address).values(), 
context.access(address));
   }
@@ -182,8 +182,8 @@ public class StateMerging {
    * {@code result}.
    */
   public static <InputT, AccumT, OutputT, W extends BoundedWindow> void 
mergeCombiningValues(
-      Collection<AccumulatorCombiningState<InputT, AccumT, OutputT>> sources,
-      AccumulatorCombiningState<InputT, AccumT, OutputT> result) {
+      Collection<CombiningState<InputT, AccumT, OutputT>> sources,
+      CombiningState<InputT, AccumT, OutputT> result) {
     if (sources.isEmpty()) {
       // Nothing to merge.
       return;
@@ -194,18 +194,18 @@ public class StateMerging {
     }
     // Prefetch.
     List<ReadableState<AccumT>> futures = new ArrayList<>(sources.size());
-    for (AccumulatorCombiningState<InputT, AccumT, OutputT> source : sources) {
+    for (CombiningState<InputT, AccumT, OutputT> source : sources) {
       prefetchRead(source);
     }
     // Read.
     List<AccumT> accumulators = new ArrayList<>(futures.size());
-    for (AccumulatorCombiningState<InputT, AccumT, OutputT> source : sources) {
+    for (CombiningState<InputT, AccumT, OutputT> source : sources) {
       accumulators.add(source.getAccum());
     }
     // Merge (possibly update and return one of the existing accumulators).
     AccumT merged = result.mergeAccumulators(accumulators);
     // Clear sources.
-    for (AccumulatorCombiningState<InputT, AccumT, OutputT> source : sources) {
+    for (CombiningState<InputT, AccumT, OutputT> source : sources) {
       source.clear();
     }
     // Update result.

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
index 802aede..12c59ad 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
@@ -28,8 +28,8 @@ import 
org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithConte
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.MapState;
 import org.apache.beam.sdk.util.state.SetState;
 import org.apache.beam.sdk.util.state.State;
@@ -94,20 +94,20 @@ public interface StateTag<K, StateT extends State> extends 
Serializable {
         StateTag<? super K, MapState<KeyT, ValueT>> spec,
         Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder);
 
-    <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, 
OutputT> bindCombiningValue(
-        StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> spec,
+    <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> 
bindCombiningValue(
+        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
         Coder<AccumT> accumCoder,
         CombineFn<InputT, AccumT, OutputT> combineFn);
 
     <InputT, AccumT, OutputT>
-    AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
-        StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> spec,
+    CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
+        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
         Coder<AccumT> accumCoder,
         KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn);
 
     <InputT, AccumT, OutputT>
-    AccumulatorCombiningState<InputT, AccumT, OutputT> 
bindKeyedCombiningValueWithContext(
-        StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> spec,
+    CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
+        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
         Coder<AccumT> accumCoder,
         KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT>
             combineFn);

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
index 1c70dff..4893919 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
@@ -30,8 +30,8 @@ import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
 import 
org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.MapState;
 import org.apache.beam.sdk.util.state.SetState;
 import org.apache.beam.sdk.util.state.State;
@@ -84,9 +84,9 @@ public class StateTags {
 
       @Override
       public <InputT, AccumT, OutputT>
-          AccumulatorCombiningState<InputT, AccumT, OutputT> 
bindCombiningValue(
+      CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
               String id,
-              StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> spec,
+              StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> 
spec,
               Coder<AccumT> accumCoder,
               CombineFn<InputT, AccumT, OutputT> combineFn) {
         return binder.bindCombiningValue(tagForSpec(id, spec), accumCoder, 
combineFn);
@@ -94,9 +94,9 @@ public class StateTags {
 
       @Override
       public <InputT, AccumT, OutputT>
-          AccumulatorCombiningState<InputT, AccumT, OutputT> 
bindKeyedCombiningValue(
+      CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
               String id,
-              StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> spec,
+              StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> 
spec,
               Coder<AccumT> accumCoder,
               KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
         return binder.bindKeyedCombiningValue(tagForSpec(id, spec), 
accumCoder, combineFn);
@@ -104,9 +104,9 @@ public class StateTags {
 
       @Override
       public <InputT, AccumT, OutputT>
-          AccumulatorCombiningState<InputT, AccumT, OutputT> 
bindKeyedCombiningValueWithContext(
+      CombiningState<InputT, AccumT, OutputT> 
bindKeyedCombiningValueWithContext(
               String id,
-              StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> spec,
+              StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> 
spec,
               Coder<AccumT> accumCoder,
               KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> 
combineFn) {
         return binder.bindKeyedCombiningValueWithContext(
@@ -158,7 +158,7 @@ public class StateTags {
    * multiple {@code InputT}s into a single {@code OutputT}.
    */
   public static <InputT, AccumT, OutputT>
-    StateTag<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>>
+    StateTag<Object, CombiningState<InputT, AccumT, OutputT>>
     combiningValue(
       String id, Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> 
combineFn) {
     return new SimpleStateTag<>(
@@ -170,7 +170,7 @@ public class StateTags {
    * multiple {@code InputT}s into a single {@code OutputT}.
    */
   public static <K, InputT, AccumT,
-      OutputT> StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>>
+      OutputT> StateTag<K, CombiningState<InputT, AccumT, OutputT>>
       keyedCombiningValue(String id, Coder<AccumT> accumCoder,
           KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
     return new SimpleStateTag<>(
@@ -182,7 +182,7 @@ public class StateTags {
    * merge multiple {@code InputT}s into a single {@code OutputT}.
    */
   public static <K, InputT, AccumT, OutputT>
-      StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>>
+      StateTag<K, CombiningState<InputT, AccumT, OutputT>>
       keyedCombiningValueWithContext(
           String id,
           Coder<AccumT> accumCoder,
@@ -199,7 +199,7 @@ public class StateTags {
    * should only be used to initialize static values.
    */
   public static <InputT, AccumT, OutputT>
-      StateTag<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>>
+      StateTag<Object, CombiningState<InputT, AccumT, OutputT>>
       combiningValueFromInputInternal(
           String id, Coder<InputT> inputCoder, CombineFn<InputT, AccumT, 
OutputT> combineFn) {
     return new SimpleStateTag<>(
@@ -255,7 +255,7 @@ public class StateTags {
 
   public static <K, InputT, AccumT, OutputT> StateTag<Object, BagState<AccumT>>
       convertToBagTagInternal(
-          StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> combiningTag) {
+          StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> 
combiningTag) {
     return new SimpleStateTag<>(
         new StructuredId(combiningTag.getId()),
         StateSpecs.convertToBagSpecInternal(combiningTag.getSpec()));

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
index 0f2f790..f618d88 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
@@ -25,8 +25,8 @@ import 
org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithConte
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.AppliedCombineFn;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.GroupingState;
 import org.apache.beam.sdk.util.state.ReadableState;
 
@@ -71,7 +71,7 @@ public abstract class SystemReduceFn<K, InputT, AccumT, 
OutputT, W extends Bound
       AccumT, OutputT, W>
       combining(
           final Coder<K> keyCoder, final AppliedCombineFn<K, InputT, AccumT, 
OutputT> combineFn) {
-    final StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> 
bufferTag;
+    final StateTag<K, CombiningState<InputT, AccumT, OutputT>> bufferTag;
     if (combineFn.getFn() instanceof KeyedCombineFnWithContext) {
       bufferTag = StateTags.makeSystemTagInternal(
           StateTags.<K, InputT, AccumT, OutputT>keyedCombiningValueWithContext(

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
index 29c29a7..b416788 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
@@ -30,11 +30,11 @@ import org.apache.beam.runners.core.StateTags;
 import 
org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.InstantCoder;
-import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Combine.Holder;
 import org.apache.beam.sdk.transforms.Min;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
+import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.GroupingState;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -55,8 +55,8 @@ public abstract class AfterDelayFromFirstElementStateMachine 
extends OnceTrigger
   protected static final List<SerializableFunction<Instant, Instant>> IDENTITY 
=
       ImmutableList.<SerializableFunction<Instant, Instant>>of();
 
-  protected static final StateTag<Object, AccumulatorCombiningState<Instant,
-                                              Combine.Holder<Instant>, 
Instant>> DELAYED_UNTIL_TAG =
+  protected static final StateTag<Object, CombiningState<Instant,
+                                                Holder<Instant>, Instant>> 
DELAYED_UNTIL_TAG =
       
StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
           "delayed", InstantCoder.of(), Min.<Instant>naturalOrder()));
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
index 1dd5b65..11323cc 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
@@ -27,7 +27,7 @@ import 
org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStat
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
+import org.apache.beam.sdk.util.state.CombiningState;
 
 /**
  * {@link TriggerStateMachine}s that fire based on properties of the elements 
in the current pane.
@@ -35,7 +35,7 @@ import 
org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 @Experimental(Experimental.Kind.TRIGGER)
 public class AfterPaneStateMachine extends OnceTriggerStateMachine {
 
-private static final StateTag<Object, AccumulatorCombiningState<Long, long[], 
Long>>
+private static final StateTag<Object, CombiningState<Long, long[], Long>>
       ELEMENTS_IN_PANE_TAG =
       
StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
           "count", VarLongCoder.of(), Sum.ofLongs()));

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
index 5f90084..e4fb5c1 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
@@ -35,8 +35,8 @@ import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.GroupingState;
 import org.apache.beam.sdk.util.state.MapState;
 import org.apache.beam.sdk.util.state.ReadableState;
@@ -61,7 +61,7 @@ public class InMemoryStateInternalsTest {
 
   private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
       StateTags.value("stringValue", StringUtf8Coder.of());
-  private static final StateTag<Object, AccumulatorCombiningState<Integer, 
int[], Integer>>
+  private static final StateTag<Object, CombiningState<Integer, int[], 
Integer>>
       SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
           "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
   private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
@@ -411,9 +411,9 @@ public class InMemoryStateInternalsTest {
 
   @Test
   public void testMergeCombiningValueIntoSource() throws Exception {
-    AccumulatorCombiningState<Integer, int[], Integer> value1 =
+    CombiningState<Integer, int[], Integer> value1 =
         underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-    AccumulatorCombiningState<Integer, int[], Integer> value2 =
+    CombiningState<Integer, int[], Integer> value2 =
         underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
 
     value1.add(5);
@@ -432,11 +432,11 @@ public class InMemoryStateInternalsTest {
 
   @Test
   public void testMergeCombiningValueIntoNewNamespace() throws Exception {
-    AccumulatorCombiningState<Integer, int[], Integer> value1 =
+    CombiningState<Integer, int[], Integer> value1 =
         underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-    AccumulatorCombiningState<Integer, int[], Integer> value2 =
+    CombiningState<Integer, int[], Integer> value2 =
         underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
-    AccumulatorCombiningState<Integer, int[], Integer> value3 =
+    CombiningState<Integer, int[], Integer> value3 =
         underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR);
 
     value1.add(5);

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
index ff5c23c..0665812 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
@@ -26,7 +26,7 @@ import java.util.HashSet;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.InMemoryStateInternals.InMemoryBag;
-import 
org.apache.beam.runners.core.InMemoryStateInternals.InMemoryCombiningValue;
+import 
org.apache.beam.runners.core.InMemoryStateInternals.InMemoryCombiningState;
 import org.apache.beam.runners.core.InMemoryStateInternals.InMemoryMap;
 import org.apache.beam.runners.core.InMemoryStateInternals.InMemorySet;
 import org.apache.beam.runners.core.InMemoryStateInternals.InMemoryState;
@@ -45,8 +45,8 @@ import 
org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithConte
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.util.CombineFnUtil;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.MapState;
 import org.apache.beam.sdk.util.state.SetState;
 import org.apache.beam.sdk.util.state.State;
@@ -306,19 +306,18 @@ public class CopyOnAccessInMemoryStateInternals<K> 
implements StateInternals<K>
           }
 
           @Override
-          public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, 
AccumT, OutputT>
+          public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, 
OutputT>
               bindCombiningValue(
-                  StateTag<? super K, AccumulatorCombiningState<InputT, 
AccumT, OutputT>> address,
+                  StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> 
address,
                   Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> 
combineFn) {
             if (containedInUnderlying(namespace, address)) {
               @SuppressWarnings("unchecked")
-              InMemoryState<? extends AccumulatorCombiningState<InputT, 
AccumT, OutputT>>
-                  existingState = (
-                  InMemoryState<? extends AccumulatorCombiningState<InputT, 
AccumT,
-                                            OutputT>>) 
underlying.get().get(namespace, address, c);
+              InMemoryState<? extends CombiningState<InputT, AccumT, OutputT>> 
existingState =
+                  (InMemoryState<? extends CombiningState<InputT, AccumT, 
OutputT>>)
+                      underlying.get().get(namespace, address, c);
               return existingState.copy();
             } else {
-              return new InMemoryCombiningValue<>(
+              return new InMemoryCombiningState<>(
                   key, combineFn.asKeyedFn());
             }
           }
@@ -367,27 +366,26 @@ public class CopyOnAccessInMemoryStateInternals<K> 
implements StateInternals<K>
           }
 
           @Override
-          public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, 
AccumT, OutputT>
+          public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, 
OutputT>
               bindKeyedCombiningValue(
-                  StateTag<? super K, AccumulatorCombiningState<InputT, 
AccumT, OutputT>> address,
+                  StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> 
address,
                   Coder<AccumT> accumCoder,
                   KeyedCombineFn<? super K, InputT, AccumT, OutputT> 
combineFn) {
             if (containedInUnderlying(namespace, address)) {
               @SuppressWarnings("unchecked")
-              InMemoryState<? extends AccumulatorCombiningState<InputT, 
AccumT, OutputT>>
-                  existingState = (
-                  InMemoryState<? extends AccumulatorCombiningState<InputT, 
AccumT,
-                                            OutputT>>) 
underlying.get().get(namespace, address, c);
+              InMemoryState<? extends CombiningState<InputT, AccumT, OutputT>> 
existingState =
+                  (InMemoryState<? extends CombiningState<InputT, AccumT, 
OutputT>>)
+                      underlying.get().get(namespace, address, c);
               return existingState.copy();
             } else {
-              return new InMemoryCombiningValue<>(key, combineFn);
+              return new InMemoryCombiningState<>(key, combineFn);
             }
           }
 
           @Override
-          public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, 
AccumT, OutputT>
+          public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, 
OutputT>
           bindKeyedCombiningValueWithContext(
-                  StateTag<? super K, AccumulatorCombiningState<InputT, 
AccumT, OutputT>> address,
+                  StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> 
address,
                   Coder<AccumT> accumCoder,
                   KeyedCombineFnWithContext<? super K, InputT, AccumT, 
OutputT> combineFn) {
             return bindKeyedCombiningValue(
@@ -449,9 +447,9 @@ public class CopyOnAccessInMemoryStateInternals<K> 
implements StateInternals<K>
           }
 
           @Override
-          public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, 
AccumT, OutputT>
+          public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, 
OutputT>
               bindCombiningValue(
-                  StateTag<? super K, AccumulatorCombiningState<InputT, 
AccumT, OutputT>> address,
+                  StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> 
address,
                   Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> 
combineFn) {
             return underlying.get(namespace, address, c);
           }
@@ -476,18 +474,18 @@ public class CopyOnAccessInMemoryStateInternals<K> 
implements StateInternals<K>
           }
 
           @Override
-          public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, 
AccumT, OutputT>
+          public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, 
OutputT>
               bindKeyedCombiningValue(
-                  StateTag<? super K, AccumulatorCombiningState<InputT, 
AccumT, OutputT>> address,
+                  StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> 
address,
                   Coder<AccumT> accumCoder,
                   KeyedCombineFn<? super K, InputT, AccumT, OutputT> 
combineFn) {
             return underlying.get(namespace, address, c);
           }
 
           @Override
-          public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, 
AccumT, OutputT>
+          public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, 
OutputT>
           bindKeyedCombiningValueWithContext(
-                  StateTag<? super K, AccumulatorCombiningState<InputT, 
AccumT, OutputT>> address,
+                  StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> 
address,
                   Coder<AccumT> accumCoder,
                   KeyedCombineFnWithContext<? super K, InputT, AccumT, 
OutputT> combineFn) {
             return bindKeyedCombiningValue(

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
index 59c0a37..142af32 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
@@ -45,8 +45,8 @@ import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.GroupingState;
 import org.apache.beam.sdk.util.state.MapState;
 import org.apache.beam.sdk.util.state.SetState;
@@ -229,7 +229,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
     StateNamespace namespace = new StateNamespaceForTest("foo");
     CoderRegistry reg = pipeline.getCoderRegistry();
-    StateTag<Object, AccumulatorCombiningState<Long, long[], Long>> stateTag =
+    StateTag<Object, CombiningState<Long, long[], Long>> stateTag =
         StateTags.combiningValue("summer",
             sumLongFn.getAccumulatorCoder(reg, 
reg.getDefaultCoder(Long.class)), sumLongFn);
     GroupingState<Long, Long> underlyingValue = underlying.state(namespace, 
stateTag);
@@ -259,7 +259,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
     StateNamespace namespace = new StateNamespaceForTest("foo");
     CoderRegistry reg = pipeline.getCoderRegistry();
-    StateTag<String, AccumulatorCombiningState<Long, long[], Long>> stateTag =
+    StateTag<String, CombiningState<Long, long[], Long>> stateTag =
         StateTags.keyedCombiningValue(
             "summer",
             sumLongFn.getAccumulatorCoder(

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
index bcc3660..3203446 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
@@ -37,8 +37,8 @@ import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.util.CombineContextFactory;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.MapState;
 import org.apache.beam.sdk.util.state.ReadableState;
 import org.apache.beam.sdk.util.state.SetState;
@@ -133,23 +133,23 @@ public class FlinkBroadcastStateInternals<K> implements 
StateInternals<K> {
 
       @Override
       public <InputT, AccumT, OutputT>
-          AccumulatorCombiningState<InputT, AccumT, OutputT>
+      CombiningState<InputT, AccumT, OutputT>
       bindCombiningValue(
-          StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
+          StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
           Coder<AccumT> accumCoder,
           Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
 
-        return new FlinkAccumulatorCombiningState<>(
+        return new FlinkCombiningState<>(
             stateBackend, address, combineFn, namespace, accumCoder);
       }
 
       @Override
       public <InputT, AccumT, OutputT>
-          AccumulatorCombiningState<InputT, AccumT, OutputT> 
bindKeyedCombiningValue(
-          StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
+      CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
+          StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
           Coder<AccumT> accumCoder,
           final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> 
combineFn) {
-        return new FlinkKeyedAccumulatorCombiningState<>(
+        return new FlinkKeyedCombiningState<>(
             stateBackend,
             address,
             combineFn,
@@ -160,12 +160,12 @@ public class FlinkBroadcastStateInternals<K> implements 
StateInternals<K> {
 
       @Override
       public <InputT, AccumT, OutputT>
-          AccumulatorCombiningState<InputT, AccumT, OutputT> 
bindKeyedCombiningValueWithContext(
-          StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
+      CombiningState<InputT, AccumT, OutputT> 
bindKeyedCombiningValueWithContext(
+          StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
           Coder<AccumT> accumCoder,
           CombineWithContext.KeyedCombineFnWithContext<
               ? super K, InputT, AccumT, OutputT> combineFn) {
-        return new FlinkAccumulatorCombiningStateWithContext<>(
+        return new FlinkCombiningStateWithContext<>(
             stateBackend,
             address,
             combineFn,
@@ -464,17 +464,17 @@ public class FlinkBroadcastStateInternals<K> implements 
StateInternals<K> {
     }
   }
 
-  private class FlinkAccumulatorCombiningState<K, InputT, AccumT, OutputT>
+  private class FlinkCombiningState<K, InputT, AccumT, OutputT>
       extends AbstractBroadcastState<AccumT>
-      implements AccumulatorCombiningState<InputT, AccumT, OutputT> {
+      implements CombiningState<InputT, AccumT, OutputT> {
 
     private final StateNamespace namespace;
-    private final StateTag<? super K, AccumulatorCombiningState<InputT, 
AccumT, OutputT>> address;
+    private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> 
address;
     private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
 
-    FlinkAccumulatorCombiningState(
+    FlinkCombiningState(
         DefaultOperatorStateBackend flinkStateBackend,
-        StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
+        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
         Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
         StateNamespace namespace,
         Coder<AccumT> accumCoder) {
@@ -486,7 +486,7 @@ public class FlinkBroadcastStateInternals<K> implements 
StateInternals<K> {
     }
 
     @Override
-    public AccumulatorCombiningState<InputT, AccumT, OutputT> readLater() {
+    public CombiningState<InputT, AccumT, OutputT> readLater() {
       return this;
     }
 
@@ -566,8 +566,8 @@ public class FlinkBroadcastStateInternals<K> implements 
StateInternals<K> {
         return false;
       }
 
-      FlinkAccumulatorCombiningState<?, ?, ?, ?> that =
-          (FlinkAccumulatorCombiningState<?, ?, ?, ?>) o;
+      FlinkCombiningState<?, ?, ?, ?> that =
+          (FlinkCombiningState<?, ?, ?, ?>) o;
 
       return namespace.equals(that.namespace) && address.equals(that.address);
 
@@ -581,18 +581,18 @@ public class FlinkBroadcastStateInternals<K> implements 
StateInternals<K> {
     }
   }
 
-  private class FlinkKeyedAccumulatorCombiningState<K, InputT, AccumT, OutputT>
+  private class FlinkKeyedCombiningState<K, InputT, AccumT, OutputT>
       extends AbstractBroadcastState<AccumT>
-      implements AccumulatorCombiningState<InputT, AccumT, OutputT> {
+      implements CombiningState<InputT, AccumT, OutputT> {
 
     private final StateNamespace namespace;
-    private final StateTag<? super K, AccumulatorCombiningState<InputT, 
AccumT, OutputT>> address;
+    private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> 
address;
     private final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> 
combineFn;
     private final FlinkBroadcastStateInternals<K> flinkStateInternals;
 
-    FlinkKeyedAccumulatorCombiningState(
+    FlinkKeyedCombiningState(
         DefaultOperatorStateBackend flinkStateBackend,
-        StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
+        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
         Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn,
         StateNamespace namespace,
         Coder<AccumT> accumCoder,
@@ -607,7 +607,7 @@ public class FlinkBroadcastStateInternals<K> implements 
StateInternals<K> {
     }
 
     @Override
-    public AccumulatorCombiningState<InputT, AccumT, OutputT> readLater() {
+    public CombiningState<InputT, AccumT, OutputT> readLater() {
       return this;
     }
 
@@ -706,8 +706,8 @@ public class FlinkBroadcastStateInternals<K> implements 
StateInternals<K> {
         return false;
       }
 
-      FlinkKeyedAccumulatorCombiningState<?, ?, ?, ?> that =
-          (FlinkKeyedAccumulatorCombiningState<?, ?, ?, ?>) o;
+      FlinkKeyedCombiningState<?, ?, ?, ?> that =
+          (FlinkKeyedCombiningState<?, ?, ?, ?>) o;
 
       return namespace.equals(that.namespace) && address.equals(that.address);
 
@@ -721,20 +721,20 @@ public class FlinkBroadcastStateInternals<K> implements 
StateInternals<K> {
     }
   }
 
-  private class FlinkAccumulatorCombiningStateWithContext<K, InputT, AccumT, 
OutputT>
+  private class FlinkCombiningStateWithContext<K, InputT, AccumT, OutputT>
       extends AbstractBroadcastState<AccumT>
-      implements AccumulatorCombiningState<InputT, AccumT, OutputT> {
+      implements CombiningState<InputT, AccumT, OutputT> {
 
     private final StateNamespace namespace;
-    private final StateTag<? super K, AccumulatorCombiningState<InputT, 
AccumT, OutputT>> address;
+    private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> 
address;
     private final CombineWithContext.KeyedCombineFnWithContext<
         ? super K, InputT, AccumT, OutputT> combineFn;
     private final FlinkBroadcastStateInternals<K> flinkStateInternals;
     private final CombineWithContext.Context context;
 
-    FlinkAccumulatorCombiningStateWithContext(
+    FlinkCombiningStateWithContext(
         DefaultOperatorStateBackend flinkStateBackend,
-        StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
+        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
         CombineWithContext.KeyedCombineFnWithContext<
             ? super K, InputT, AccumT, OutputT> combineFn,
         StateNamespace namespace,
@@ -752,7 +752,7 @@ public class FlinkBroadcastStateInternals<K> implements 
StateInternals<K> {
     }
 
     @Override
-    public AccumulatorCombiningState<InputT, AccumT, OutputT> readLater() {
+    public CombiningState<InputT, AccumT, OutputT> readLater() {
       return this;
     }
 
@@ -847,8 +847,8 @@ public class FlinkBroadcastStateInternals<K> implements 
StateInternals<K> {
         return false;
       }
 
-      FlinkAccumulatorCombiningStateWithContext<?, ?, ?, ?> that =
-          (FlinkAccumulatorCombiningStateWithContext<?, ?, ?, ?>) o;
+      FlinkCombiningStateWithContext<?, ?, ?, ?> that =
+          (FlinkCombiningStateWithContext<?, ?, ?, ?>) o;
 
       return namespace.equals(that.namespace) && address.equals(that.address);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
index a29b1b2..24b340e 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
@@ -40,8 +40,8 @@ import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.MapState;
 import org.apache.beam.sdk.util.state.ReadableState;
 import org.apache.beam.sdk.util.state.SetState;
@@ -156,9 +156,9 @@ public class FlinkKeyGroupStateInternals<K> implements 
StateInternals<K> {
 
       @Override
       public <InputT, AccumT, OutputT>
-          AccumulatorCombiningState<InputT, AccumT, OutputT>
+      CombiningState<InputT, AccumT, OutputT>
       bindCombiningValue(
-          StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
+          StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
           Coder<AccumT> accumCoder,
           Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
         throw new UnsupportedOperationException("bindCombiningValue is not 
supported.");
@@ -166,8 +166,8 @@ public class FlinkKeyGroupStateInternals<K> implements 
StateInternals<K> {
 
       @Override
       public <InputT, AccumT, OutputT>
-          AccumulatorCombiningState<InputT, AccumT, OutputT> 
bindKeyedCombiningValue(
-          StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
+      CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
+          StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
           Coder<AccumT> accumCoder,
           final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> 
combineFn) {
         throw new UnsupportedOperationException("bindKeyedCombiningValue is 
not supported.");
@@ -176,8 +176,8 @@ public class FlinkKeyGroupStateInternals<K> implements 
StateInternals<K> {
 
       @Override
       public <InputT, AccumT, OutputT>
-          AccumulatorCombiningState<InputT, AccumT, OutputT> 
bindKeyedCombiningValueWithContext(
-          StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
+      CombiningState<InputT, AccumT, OutputT> 
bindKeyedCombiningValueWithContext(
+          StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
           Coder<AccumT> accumCoder,
           CombineWithContext.KeyedCombineFnWithContext<
               ? super K, InputT, AccumT, OutputT> combineFn) {
@@ -190,7 +190,7 @@ public class FlinkKeyGroupStateInternals<K> implements 
StateInternals<K> {
           StateTag<? super K, WatermarkHoldState<W>> address,
           OutputTimeFn<? super W> outputTimeFn) {
         throw new UnsupportedOperationException(
-            String.format("%s is not supported", 
AccumulatorCombiningState.class.getSimpleName()));
+            String.format("%s is not supported", 
CombiningState.class.getSimpleName()));
       }
     });
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
index d9e87d1..2bf0bf1 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
@@ -28,8 +28,8 @@ import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.MapState;
 import org.apache.beam.sdk.util.state.ReadableState;
 import org.apache.beam.sdk.util.state.SetState;
@@ -116,9 +116,9 @@ public class FlinkSplitStateInternals<K> implements 
StateInternals<K> {
 
       @Override
       public <InputT, AccumT, OutputT>
-      AccumulatorCombiningState<InputT, AccumT, OutputT>
+      CombiningState<InputT, AccumT, OutputT>
       bindCombiningValue(
-          StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
+          StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
           Coder<AccumT> accumCoder,
           Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
         throw new UnsupportedOperationException("bindCombiningValue is not 
supported.");
@@ -126,8 +126,8 @@ public class FlinkSplitStateInternals<K> implements 
StateInternals<K> {
 
       @Override
       public <InputT, AccumT, OutputT>
-      AccumulatorCombiningState<InputT, AccumT, OutputT> 
bindKeyedCombiningValue(
-          StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
+      CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
+          StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
           Coder<AccumT> accumCoder,
           final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> 
combineFn) {
         throw new UnsupportedOperationException("bindKeyedCombiningValue is 
not supported.");
@@ -136,8 +136,8 @@ public class FlinkSplitStateInternals<K> implements 
StateInternals<K> {
 
       @Override
       public <InputT, AccumT, OutputT>
-      AccumulatorCombiningState<InputT, AccumT, OutputT> 
bindKeyedCombiningValueWithContext(
-          StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
+      CombiningState<InputT, AccumT, OutputT> 
bindKeyedCombiningValueWithContext(
+          StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
           Coder<AccumT> accumCoder,
           CombineWithContext.KeyedCombineFnWithContext<
               ? super K, InputT, AccumT, OutputT> combineFn) {
@@ -150,7 +150,7 @@ public class FlinkSplitStateInternals<K> implements 
StateInternals<K> {
           StateTag<? super K, WatermarkHoldState<W>> address,
           OutputTimeFn<? super W> outputTimeFn) {
         throw new UnsupportedOperationException(
-            String.format("%s is not supported", 
AccumulatorCombiningState.class.getSimpleName()));
+            String.format("%s is not supported", 
CombiningState.class.getSimpleName()));
       }
     });
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
index 9033ba7..4f961e5 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
@@ -35,8 +35,8 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.CombineContextFactory;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.MapState;
 import org.apache.beam.sdk.util.state.ReadableState;
 import org.apache.beam.sdk.util.state.SetState;
@@ -142,23 +142,23 @@ public class FlinkStateInternals<K> implements 
StateInternals<K> {
 
       @Override
       public <InputT, AccumT, OutputT>
-          AccumulatorCombiningState<InputT, AccumT, OutputT>
+      CombiningState<InputT, AccumT, OutputT>
       bindCombiningValue(
-          StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
+          StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
           Coder<AccumT> accumCoder,
           Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
 
-        return new FlinkAccumulatorCombiningState<>(
+        return new FlinkCombiningState<>(
             flinkStateBackend, address, combineFn, namespace, accumCoder);
       }
 
       @Override
       public <InputT, AccumT, OutputT>
-          AccumulatorCombiningState<InputT, AccumT, OutputT> 
bindKeyedCombiningValue(
-          StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
+      CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
+          StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
           Coder<AccumT> accumCoder,
           final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> 
combineFn) {
-        return new FlinkKeyedAccumulatorCombiningState<>(
+        return new FlinkKeyedCombiningState<>(
             flinkStateBackend,
             address,
             combineFn,
@@ -169,12 +169,12 @@ public class FlinkStateInternals<K> implements 
StateInternals<K> {
 
       @Override
       public <InputT, AccumT, OutputT>
-          AccumulatorCombiningState<InputT, AccumT, OutputT> 
bindKeyedCombiningValueWithContext(
-          StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
+      CombiningState<InputT, AccumT, OutputT> 
bindKeyedCombiningValueWithContext(
+          StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
           Coder<AccumT> accumCoder,
           CombineWithContext.KeyedCombineFnWithContext<
               ? super K, InputT, AccumT, OutputT> combineFn) {
-        return new FlinkAccumulatorCombiningStateWithContext<>(
+        return new FlinkCombiningStateWithContext<>(
             flinkStateBackend,
             address,
             combineFn,
@@ -393,18 +393,18 @@ public class FlinkStateInternals<K> implements 
StateInternals<K> {
     }
   }
 
-  private static class FlinkAccumulatorCombiningState<K, InputT, AccumT, 
OutputT>
-      implements AccumulatorCombiningState<InputT, AccumT, OutputT> {
+  private static class FlinkCombiningState<K, InputT, AccumT, OutputT>
+      implements CombiningState<InputT, AccumT, OutputT> {
 
     private final StateNamespace namespace;
-    private final StateTag<? super K, AccumulatorCombiningState<InputT, 
AccumT, OutputT>> address;
+    private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> 
address;
     private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
     private final ValueStateDescriptor<AccumT> flinkStateDescriptor;
     private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
 
-    FlinkAccumulatorCombiningState(
+    FlinkCombiningState(
         KeyedStateBackend<ByteBuffer> flinkStateBackend,
-        StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
+        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
         Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
         StateNamespace namespace,
         Coder<AccumT> accumCoder) {
@@ -420,7 +420,7 @@ public class FlinkStateInternals<K> implements 
StateInternals<K> {
     }
 
     @Override
-    public AccumulatorCombiningState<InputT, AccumT, OutputT> readLater() {
+    public CombiningState<InputT, AccumT, OutputT> readLater() {
       return this;
     }
 
@@ -546,8 +546,8 @@ public class FlinkStateInternals<K> implements 
StateInternals<K> {
         return false;
       }
 
-      FlinkAccumulatorCombiningState<?, ?, ?, ?> that =
-          (FlinkAccumulatorCombiningState<?, ?, ?, ?>) o;
+      FlinkCombiningState<?, ?, ?, ?> that =
+          (FlinkCombiningState<?, ?, ?, ?>) o;
 
       return namespace.equals(that.namespace) && address.equals(that.address);
 
@@ -561,19 +561,19 @@ public class FlinkStateInternals<K> implements 
StateInternals<K> {
     }
   }
 
-  private static class FlinkKeyedAccumulatorCombiningState<K, InputT, AccumT, 
OutputT>
-      implements AccumulatorCombiningState<InputT, AccumT, OutputT> {
+  private static class FlinkKeyedCombiningState<K, InputT, AccumT, OutputT>
+      implements CombiningState<InputT, AccumT, OutputT> {
 
     private final StateNamespace namespace;
-    private final StateTag<? super K, AccumulatorCombiningState<InputT, 
AccumT, OutputT>> address;
+    private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> 
address;
     private final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> 
combineFn;
     private final ValueStateDescriptor<AccumT> flinkStateDescriptor;
     private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
     private final FlinkStateInternals<K> flinkStateInternals;
 
-    FlinkKeyedAccumulatorCombiningState(
+    FlinkKeyedCombiningState(
         KeyedStateBackend<ByteBuffer> flinkStateBackend,
-        StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
+        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
         Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn,
         StateNamespace namespace,
         Coder<AccumT> accumCoder,
@@ -591,7 +591,7 @@ public class FlinkStateInternals<K> implements 
StateInternals<K> {
     }
 
     @Override
-    public AccumulatorCombiningState<InputT, AccumT, OutputT> readLater() {
+    public CombiningState<InputT, AccumT, OutputT> readLater() {
       return this;
     }
 
@@ -721,8 +721,8 @@ public class FlinkStateInternals<K> implements 
StateInternals<K> {
         return false;
       }
 
-      FlinkKeyedAccumulatorCombiningState<?, ?, ?, ?> that =
-          (FlinkKeyedAccumulatorCombiningState<?, ?, ?, ?>) o;
+      FlinkKeyedCombiningState<?, ?, ?, ?> that =
+          (FlinkKeyedCombiningState<?, ?, ?, ?>) o;
 
       return namespace.equals(that.namespace) && address.equals(that.address);
 
@@ -736,11 +736,11 @@ public class FlinkStateInternals<K> implements 
StateInternals<K> {
     }
   }
 
-  private static class FlinkAccumulatorCombiningStateWithContext<K, InputT, 
AccumT, OutputT>
-      implements AccumulatorCombiningState<InputT, AccumT, OutputT> {
+  private static class FlinkCombiningStateWithContext<K, InputT, AccumT, 
OutputT>
+      implements CombiningState<InputT, AccumT, OutputT> {
 
     private final StateNamespace namespace;
-    private final StateTag<? super K, AccumulatorCombiningState<InputT, 
AccumT, OutputT>> address;
+    private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> 
address;
     private final CombineWithContext.KeyedCombineFnWithContext<
         ? super K, InputT, AccumT, OutputT> combineFn;
     private final ValueStateDescriptor<AccumT> flinkStateDescriptor;
@@ -748,9 +748,9 @@ public class FlinkStateInternals<K> implements 
StateInternals<K> {
     private final FlinkStateInternals<K> flinkStateInternals;
     private final CombineWithContext.Context context;
 
-    FlinkAccumulatorCombiningStateWithContext(
+    FlinkCombiningStateWithContext(
         KeyedStateBackend<ByteBuffer> flinkStateBackend,
-        StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
+        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
         CombineWithContext.KeyedCombineFnWithContext<
             ? super K, InputT, AccumT, OutputT> combineFn,
         StateNamespace namespace,
@@ -771,7 +771,7 @@ public class FlinkStateInternals<K> implements 
StateInternals<K> {
     }
 
     @Override
-    public AccumulatorCombiningState<InputT, AccumT, OutputT> readLater() {
+    public CombiningState<InputT, AccumT, OutputT> readLater() {
       return this;
     }
 
@@ -896,8 +896,8 @@ public class FlinkStateInternals<K> implements 
StateInternals<K> {
         return false;
       }
 
-      FlinkAccumulatorCombiningStateWithContext<?, ?, ?, ?> that =
-          (FlinkAccumulatorCombiningStateWithContext<?, ?, ?, ?>) o;
+      FlinkCombiningStateWithContext<?, ?, ?, ?> that =
+          (FlinkCombiningStateWithContext<?, ?, ?, ?>) o;
 
       return namespace.equals(that.namespace) && address.equals(that.address);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
index f4e3ea8..7e7d1e1 100644
--- 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
+++ 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
@@ -32,8 +32,8 @@ import 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkB
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.GroupingState;
 import org.apache.beam.sdk.util.state.ReadableState;
 import org.apache.beam.sdk.util.state.ValueState;
@@ -58,7 +58,7 @@ public class FlinkBroadcastStateInternalsTest {
 
   private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
       StateTags.value("stringValue", StringUtf8Coder.of());
-  private static final StateTag<Object, AccumulatorCombiningState<Integer, 
int[], Integer>>
+  private static final StateTag<Object, CombiningState<Integer, int[], 
Integer>>
       SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
           "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
   private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
@@ -202,9 +202,9 @@ public class FlinkBroadcastStateInternalsTest {
 
   @Test
   public void testMergeCombiningValueIntoSource() throws Exception {
-    AccumulatorCombiningState<Integer, int[], Integer> value1 =
+    CombiningState<Integer, int[], Integer> value1 =
         underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-    AccumulatorCombiningState<Integer, int[], Integer> value2 =
+    CombiningState<Integer, int[], Integer> value2 =
         underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
 
     value1.add(5);
@@ -223,11 +223,11 @@ public class FlinkBroadcastStateInternalsTest {
 
   @Test
   public void testMergeCombiningValueIntoNewNamespace() throws Exception {
-    AccumulatorCombiningState<Integer, int[], Integer> value1 =
+    CombiningState<Integer, int[], Integer> value1 =
         underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-    AccumulatorCombiningState<Integer, int[], Integer> value2 =
+    CombiningState<Integer, int[], Integer> value2 =
         underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
-    AccumulatorCombiningState<Integer, int[], Integer> value3 =
+    CombiningState<Integer, int[], Integer> value3 =
         underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR);
 
     value1.add(5);

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
index 27747dd..d140271 100644
--- 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
+++ 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
@@ -37,8 +37,8 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.GroupingState;
 import org.apache.beam.sdk.util.state.ReadableState;
 import org.apache.beam.sdk.util.state.ValueState;
@@ -72,7 +72,7 @@ public class FlinkStateInternalsTest {
 
   private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
       StateTags.value("stringValue", StringUtf8Coder.of());
-  private static final StateTag<Object, AccumulatorCombiningState<Integer, 
int[], Integer>>
+  private static final StateTag<Object, CombiningState<Integer, int[], 
Integer>>
       SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
           "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
   private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
@@ -232,9 +232,9 @@ public class FlinkStateInternalsTest {
 
   @Test
   public void testMergeCombiningValueIntoSource() throws Exception {
-    AccumulatorCombiningState<Integer, int[], Integer> value1 =
+    CombiningState<Integer, int[], Integer> value1 =
         underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-    AccumulatorCombiningState<Integer, int[], Integer> value2 =
+    CombiningState<Integer, int[], Integer> value2 =
         underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
 
     value1.add(5);
@@ -253,11 +253,11 @@ public class FlinkStateInternalsTest {
 
   @Test
   public void testMergeCombiningValueIntoNewNamespace() throws Exception {
-    AccumulatorCombiningState<Integer, int[], Integer> value1 =
+    CombiningState<Integer, int[], Integer> value1 =
         underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-    AccumulatorCombiningState<Integer, int[], Integer> value2 =
+    CombiningState<Integer, int[], Integer> value2 =
         underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
-    AccumulatorCombiningState<Integer, int[], Integer> value3 =
+    CombiningState<Integer, int[], Integer> value3 =
         underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR);
 
     value1.add(5);

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
index 43fb383..725e9d3 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
@@ -36,8 +36,8 @@ import 
org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithConte
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.util.CombineFnUtil;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.MapState;
 import org.apache.beam.sdk.util.state.ReadableState;
 import org.apache.beam.sdk.util.state.SetState;
@@ -137,31 +137,31 @@ class SparkStateInternals<K> implements StateInternals<K> 
{
     }
 
     @Override
-    public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, 
OutputT>
+    public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
         bindCombiningValue(
-            StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
+            StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> 
address,
             Coder<AccumT> accumCoder,
             CombineFn<InputT, AccumT, OutputT> combineFn) {
-      return new SparkAccumulatorCombiningState<>(namespace, address, 
accumCoder, key,
+      return new SparkCombiningState<>(namespace, address, accumCoder, key,
           combineFn.<K>asKeyedFn());
     }
 
     @Override
-    public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, 
OutputT>
+    public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
         bindKeyedCombiningValue(
-            StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
+            StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> 
address,
             Coder<AccumT> accumCoder,
             KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
-      return new SparkAccumulatorCombiningState<>(namespace, address, 
accumCoder, key, combineFn);
+      return new SparkCombiningState<>(namespace, address, accumCoder, key, 
combineFn);
     }
 
     @Override
-    public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, 
OutputT>
+    public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
         bindKeyedCombiningValueWithContext(
-            StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
+            StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> 
address,
             Coder<AccumT> accumCoder,
             KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> 
combineFn) {
-      return new SparkAccumulatorCombiningState<>(namespace, address, 
accumCoder, key,
+      return new SparkCombiningState<>(namespace, address, accumCoder, key,
           CombineFnUtil.bindContext(combineFn, c));
     }
 
@@ -300,16 +300,16 @@ class SparkStateInternals<K> implements StateInternals<K> 
{
     }
   }
 
-  private class SparkAccumulatorCombiningState<K, InputT, AccumT, OutputT>
+  private class SparkCombiningState<K, InputT, AccumT, OutputT>
       extends AbstractState<AccumT>
-          implements AccumulatorCombiningState<InputT, AccumT, OutputT> {
+          implements CombiningState<InputT, AccumT, OutputT> {
 
     private final K key;
     private final KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn;
 
-    private SparkAccumulatorCombiningState(
+    private SparkCombiningState(
         StateNamespace namespace,
-        StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
+        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
         Coder<AccumT> coder,
         K key,
         KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
@@ -319,7 +319,7 @@ class SparkStateInternals<K> implements StateInternals<K> {
     }
 
     @Override
-    public SparkAccumulatorCombiningState<K, InputT, AccumT, OutputT> 
readLater() {
+    public SparkCombiningState<K, InputT, AccumT, OutputT> readLater() {
       return this;
     }
 

Reply via email to