Simplify type parameters of StateSpec and related

Before this change, almost all uses of state had a type variable that
existing only to support the esoteric use of a KeyedCombineFn in
a state cell. KeyedCombineFn is now gone, so the key is no longer
required.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8fe59c35
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8fe59c35
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8fe59c35

Branch: refs/heads/master
Commit: 8fe59c3555e65c21c0a0bbaa5a0ba9eac39316dd
Parents: eec903f
Author: Kenneth Knowles <[email protected]>
Authored: Thu Apr 20 20:46:37 2017 -0700
Committer: Kenneth Knowles <[email protected]>
Committed: Tue May 2 11:55:18 2017 -0700

----------------------------------------------------------------------
 .../operators/ApexParDoOperator.java            |   4 +-
 .../translation/utils/ApexStateInternals.java   |  42 +++--
 .../apex/translation/utils/NoOpStepContext.java |   2 +-
 .../translation/utils/StateInternalsProxy.java  |   6 +-
 .../utils/ApexStateInternalsTest.java           |  12 +-
 .../construction/PTransformMatchersTest.java    |   2 +-
 .../beam/runners/core/BaseExecutionContext.java |   2 +-
 .../beam/runners/core/ExecutionContext.java     |   2 +-
 .../GroupAlsoByWindowViaOutputBufferDoFn.java   |   2 +-
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java |   2 +-
 .../GroupAlsoByWindowViaWindowSetNewDoFn.java   |   2 +-
 .../runners/core/InMemoryStateInternals.java    |  36 ++--
 .../runners/core/MergingActiveWindowSet.java    |   4 +-
 .../beam/runners/core/MergingStateAccessor.java |   2 +-
 .../apache/beam/runners/core/NonEmptyPanes.java |   2 +-
 .../beam/runners/core/PaneInfoTracker.java      |   2 +-
 .../runners/core/ReduceFnContextFactory.java    |  37 ++--
 .../beam/runners/core/ReduceFnRunner.java       |   4 +-
 .../beam/runners/core/SideInputHandler.java     |  14 +-
 .../beam/runners/core/SimpleDoFnRunner.java     |   8 +-
 .../beam/runners/core/SimpleOldDoFnRunner.java  |   2 +-
 .../beam/runners/core/SplittableParDo.java      |   8 +-
 .../apache/beam/runners/core/StateAccessor.java |   2 +-
 .../beam/runners/core/StateInternals.java       |   8 +-
 .../runners/core/StateInternalsFactory.java     |   2 +-
 .../apache/beam/runners/core/StateMerging.java  |  16 +-
 .../apache/beam/runners/core/StateTable.java    |  10 +-
 .../org/apache/beam/runners/core/StateTag.java  |  28 ++-
 .../org/apache/beam/runners/core/StateTags.java |  70 ++++----
 .../beam/runners/core/StatefulDoFnRunner.java   |   6 +-
 .../beam/runners/core/SystemReduceFn.java       |   8 +-
 .../core/TestInMemoryStateInternals.java        |   6 +-
 .../apache/beam/runners/core/WatermarkHold.java |   8 +-
 .../beam/runners/core/WindowingInternals.java   |   2 +-
 .../AfterDelayFromFirstElementStateMachine.java |   2 +-
 .../core/triggers/AfterPaneStateMachine.java    |   2 +-
 .../TriggerStateMachineContextFactory.java      |  12 +-
 .../triggers/TriggerStateMachineRunner.java     |   2 +-
 .../core/GroupAlsoByWindowsProperties.java      |  10 +-
 .../core/InMemoryStateInternalsTest.java        |  16 +-
 .../core/MergingActiveWindowSetTest.java        |   2 +-
 .../beam/runners/core/ReduceFnTester.java       |  18 +-
 .../beam/runners/core/SplittableParDoTest.java  |   2 +-
 .../apache/beam/runners/core/StateTagTest.java  |  62 +++----
 .../runners/core/StatefulDoFnRunnerTest.java    |   4 +-
 .../CopyOnAccessInMemoryStateInternals.java     | 118 ++++++-------
 .../runners/direct/DirectExecutionContext.java  |  15 +-
 .../beam/runners/direct/EvaluationContext.java  |   8 +-
 .../GroupAlsoByWindowEvaluatorFactory.java      |   6 +-
 .../beam/runners/direct/ParDoEvaluator.java     |   2 +-
 ...littableProcessElementsEvaluatorFactory.java |   2 +-
 .../direct/StatefulParDoEvaluatorFactory.java   |   5 +-
 .../runners/direct/StepTransformResult.java     |   4 +-
 .../beam/runners/direct/TransformResult.java    |   2 +-
 .../CopyOnAccessInMemoryStateInternalsTest.java | 106 +++++------
 .../beam/runners/direct/DirectRunnerTest.java   |   1 +
 .../runners/direct/EvaluationContextTest.java   |  12 +-
 .../StatefulParDoEvaluatorFactoryTest.java      |  10 +-
 .../functions/FlinkNoOpStepContext.java         |   2 +-
 .../functions/FlinkStatefulDoFnFunction.java    |   2 +-
 .../wrappers/streaming/DoFnOperator.java        |   6 +-
 .../streaming/SplittableDoFnOperator.java       |   4 +-
 .../wrappers/streaming/WindowDoFnOperator.java  |   4 +-
 .../state/FlinkBroadcastStateInternals.java     |  45 +++--
 .../state/FlinkKeyGroupStateInternals.java      |  29 ++-
 .../state/FlinkSplitStateInternals.java         |  29 ++-
 .../streaming/state/FlinkStateInternals.java    |  48 ++---
 .../flink/streaming/DoFnOperatorTest.java       |   4 +-
 .../FlinkBroadcastStateInternalsTest.java       |   6 +-
 .../FlinkKeyGroupStateInternalsTest.java        |   2 +-
 .../streaming/FlinkSplitStateInternalsTest.java |   2 +-
 .../streaming/FlinkStateInternalsTest.java      |  12 +-
 .../BatchStatefulParDoOverridesTest.java        |   2 +-
 .../DataflowPipelineTranslatorTest.java         |   2 +-
 .../spark/stateful/SparkStateInternals.java     |  44 +++--
 ...SparkGroupAlsoByWindowViaOutputBufferFn.java |   2 +-
 .../spark/translation/SparkProcessContext.java  |   2 +-
 .../spark/translation/TranslationUtils.java     |   2 +-
 .../org/apache/beam/sdk/transforms/DoFn.java    |   8 +-
 .../beam/sdk/transforms/GroupIntoBatches.java   |   6 +-
 .../org/apache/beam/sdk/transforms/ParDo.java   |   2 +-
 .../apache/beam/sdk/util/state/StateBinder.java |  28 +--
 .../apache/beam/sdk/util/state/StateSpec.java   |  15 +-
 .../apache/beam/sdk/util/state/StateSpecs.java  |  92 +++++-----
 .../apache/beam/sdk/transforms/ParDoTest.java   | 177 +++++++++----------
 .../transforms/reflect/DoFnInvokersTest.java    |   2 +-
 .../transforms/reflect/DoFnSignaturesTest.java  |  26 +--
 .../beam/fn/harness/fake/FakeStepContext.java   |   2 +-
 88 files changed, 692 insertions(+), 701 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index b66d818..d5dd0dd 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -94,7 +94,7 @@ public class ApexParDoOperator<InputT, OutputT> extends 
BaseOperator implements
   private StateInternalsProxy<?> currentKeyStateInternals;
   private final ApexTimerInternals<Object> currentKeyTimerInternals;
 
-  private final StateInternals<Void> sideInputStateInternals;
+  private final StateInternals sideInputStateInternals;
   private final ValueAndCoderKryoSerializable<List<WindowedValue<InputT>>> 
pushedBack;
   private LongMin pushedBackWatermark = new LongMin();
   private long currentInputWatermark = Long.MIN_VALUE;
@@ -327,7 +327,7 @@ public class ApexParDoOperator<InputT, OutputT> extends 
BaseOperator implements
     NoOpStepContext stepContext = new NoOpStepContext() {
 
       @Override
-      public StateInternals<?> stateInternals() {
+      public StateInternals stateInternals() {
         return currentKeyStateInternals;
       }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
index e682894..4300567 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
@@ -65,7 +65,7 @@ import org.joda.time.Instant;
  * <p>For fields that need to be serialized, use {@link 
ApexStateInternalsFactory}
  * or {@link StateInternalsProxy}
  */
-public class ApexStateInternals<K> implements StateInternals<K> {
+public class ApexStateInternals<K> implements StateInternals {
   private final K key;
   private final Table<String, String, byte[]> stateTable;
 
@@ -80,46 +80,44 @@ public class ApexStateInternals<K> implements 
StateInternals<K> {
   }
 
   @Override
-  public <T extends State> T state(StateNamespace namespace, StateTag<? super 
K, T> address) {
+  public <T extends State> T state(StateNamespace namespace, StateTag<T> 
address) {
     return state(namespace, address, StateContexts.nullContext());
   }
 
   @Override
   public <T extends State> T state(
-      StateNamespace namespace, StateTag<? super K, T> address, final 
StateContext<?> c) {
-    return address.bind(new ApexStateBinder(key, namespace, address, c));
+      StateNamespace namespace, StateTag<T> address, final StateContext<?> c) {
+    return address.bind(new ApexStateBinder(namespace, address, c));
   }
 
   /**
    * A {@link StateBinder} that returns {@link State} wrappers for serialized 
state.
    */
-  private class ApexStateBinder implements StateBinder<K> {
-    private final K key;
+  private class ApexStateBinder implements StateBinder {
     private final StateNamespace namespace;
     private final StateContext<?> c;
 
-    private ApexStateBinder(K key, StateNamespace namespace, StateTag<? super 
K, ?> address,
+    private ApexStateBinder(StateNamespace namespace, StateTag<?> address,
         StateContext<?> c) {
-      this.key = key;
       this.namespace = namespace;
       this.c = c;
     }
 
     @Override
     public <T> ValueState<T> bindValue(
-        StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
+        StateTag<ValueState<T>> address, Coder<T> coder) {
       return new ApexValueState<>(namespace, address, coder);
     }
 
     @Override
     public <T> BagState<T> bindBag(
-        final StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
+        final StateTag<BagState<T>> address, Coder<T> elemCoder) {
       return new ApexBagState<>(namespace, address, elemCoder);
     }
 
     @Override
     public <T> SetState<T> bindSet(
-        StateTag<? super K, SetState<T>> address,
+        StateTag<SetState<T>> address,
         Coder<T> elemCoder) {
       throw new UnsupportedOperationException(
           String.format("%s is not supported", 
SetState.class.getSimpleName()));
@@ -127,7 +125,7 @@ public class ApexStateInternals<K> implements 
StateInternals<K> {
 
     @Override
     public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
-        StateTag<? super K, MapState<KeyT, ValueT>> spec,
+        StateTag<MapState<KeyT, ValueT>> spec,
         Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
       throw new UnsupportedOperationException(
           String.format("%s is not supported", 
MapState.class.getSimpleName()));
@@ -136,7 +134,7 @@ public class ApexStateInternals<K> implements 
StateInternals<K> {
     @Override
     public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
         bindCombiningValue(
-            StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> 
address,
+            StateTag<CombiningState<InputT, AccumT, OutputT>> address,
             Coder<AccumT> accumCoder,
             final CombineFn<InputT, AccumT, OutputT> combineFn) {
       return new ApexCombiningState<>(
@@ -149,8 +147,8 @@ public class ApexStateInternals<K> implements 
StateInternals<K> {
     }
 
     @Override
-    public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
-        StateTag<? super K, WatermarkHoldState> address,
+    public WatermarkHoldState bindWatermark(
+        StateTag<WatermarkHoldState> address,
         TimestampCombiner timestampCombiner) {
       return new ApexWatermarkHoldState<>(namespace, address, 
timestampCombiner);
     }
@@ -158,7 +156,7 @@ public class ApexStateInternals<K> implements 
StateInternals<K> {
     @Override
     public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
         bindCombiningValueWithContext(
-            StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> 
address,
+            StateTag<CombiningState<InputT, AccumT, OutputT>> address,
             Coder<AccumT> accumCoder,
             CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
       return bindCombiningValue(address, accumCoder, 
CombineFnUtil.bindContext(combineFn, c));
@@ -167,12 +165,12 @@ public class ApexStateInternals<K> implements 
StateInternals<K> {
 
   private class AbstractState<T> {
     protected final StateNamespace namespace;
-    protected final StateTag<?, ? extends State> address;
+    protected final StateTag<? extends State> address;
     protected final Coder<T> coder;
 
     private AbstractState(
         StateNamespace namespace,
-        StateTag<?, ? extends State> address,
+        StateTag<? extends State> address,
         Coder<T> coder) {
       this.namespace = namespace;
       this.address = address;
@@ -233,7 +231,7 @@ public class ApexStateInternals<K> implements 
StateInternals<K> {
 
     private ApexValueState(
         StateNamespace namespace,
-        StateTag<?, ValueState<T>> address,
+        StateTag<ValueState<T>> address,
         Coder<T> coder) {
       super(namespace, address, coder);
     }
@@ -261,7 +259,7 @@ public class ApexStateInternals<K> implements 
StateInternals<K> {
 
     public ApexWatermarkHoldState(
         StateNamespace namespace,
-        StateTag<?, WatermarkHoldState> address,
+        StateTag<WatermarkHoldState> address,
         TimestampCombiner timestampCombiner) {
       super(namespace, address, InstantCoder.of());
       this.timestampCombiner = timestampCombiner;
@@ -312,7 +310,7 @@ public class ApexStateInternals<K> implements 
StateInternals<K> {
     private final CombineFn<InputT, AccumT, OutputT> combineFn;
 
     private ApexCombiningState(StateNamespace namespace,
-        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+        StateTag<CombiningState<InputT, AccumT, OutputT>> address,
         Coder<AccumT> coder,
         K key, CombineFn<InputT, AccumT, OutputT> combineFn) {
       super(namespace, address, coder);
@@ -376,7 +374,7 @@ public class ApexStateInternals<K> implements 
StateInternals<K> {
   private final class ApexBagState<T> extends AbstractState<List<T>> 
implements BagState<T> {
     private ApexBagState(
         StateNamespace namespace,
-        StateTag<?, BagState<T>> address,
+        StateTag<BagState<T>> address,
         Coder<T> coder) {
       super(namespace, address, ListCoder.of(coder));
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
index cc64c7c..721eecd 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
@@ -60,7 +60,7 @@ public class NoOpStepContext implements 
ExecutionContext.StepContext, Serializab
   }
 
   @Override
-  public StateInternals<?> stateInternals() {
+  public StateInternals stateInternals() {
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/StateInternalsProxy.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/StateInternalsProxy.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/StateInternalsProxy.java
index 1f28364..746be2f 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/StateInternalsProxy.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/StateInternalsProxy.java
@@ -32,7 +32,7 @@ import org.apache.beam.sdk.util.state.StateContext;
  * @param <K>
  */
 @DefaultSerializer(JavaSerializer.class)
-public class StateInternalsProxy<K> implements StateInternals<K>, Serializable 
{
+public class StateInternalsProxy<K> implements StateInternals, Serializable {
 
   private final StateInternalsFactory<K> factory;
   private transient K currentKey;
@@ -55,12 +55,12 @@ public class StateInternalsProxy<K> implements 
StateInternals<K>, Serializable {
   }
 
   @Override
-  public <T extends State> T state(StateNamespace namespace, StateTag<? super 
K, T> address) {
+  public <T extends State> T state(StateNamespace namespace, StateTag<T> 
address) {
     return factory.stateInternalsForKey(currentKey).state(namespace, address);
   }
 
   @Override
-  public <T extends State> T state(StateNamespace namespace, StateTag<? super 
K, T> address,
+  public <T extends State> T state(StateNamespace namespace, StateTag<T> 
address,
       StateContext<?> c) {
     return factory.stateInternalsForKey(currentKey).state(namespace, address, 
c);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
index 225b654..8b48a74 100644
--- 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
+++ 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
@@ -58,19 +58,19 @@ public class ApexStateInternalsTest {
   private static final StateNamespace NAMESPACE_2 = new 
StateNamespaceForTest("ns2");
   private static final StateNamespace NAMESPACE_3 = new 
StateNamespaceForTest("ns3");
 
-  private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
+  private static final StateTag<ValueState<String>> STRING_VALUE_ADDR =
       StateTags.value("stringValue", StringUtf8Coder.of());
-  private static final StateTag<Object, CombiningState<Integer, int[], 
Integer>>
+  private static final StateTag<CombiningState<Integer, int[], Integer>>
       SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
           "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
-  private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
+  private static final StateTag<BagState<String>> STRING_BAG_ADDR =
       StateTags.bag("stringBag", StringUtf8Coder.of());
-  private static final StateTag<Object, WatermarkHoldState>
+  private static final StateTag<WatermarkHoldState>
       WATERMARK_EARLIEST_ADDR =
       StateTags.watermarkStateInternal("watermark", 
TimestampCombiner.EARLIEST);
-  private static final StateTag<Object, WatermarkHoldState> 
WATERMARK_LATEST_ADDR =
+  private static final StateTag<WatermarkHoldState> WATERMARK_LATEST_ADDR =
       StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST);
-  private static final StateTag<Object, WatermarkHoldState> WATERMARK_EOW_ADDR 
=
+  private static final StateTag<WatermarkHoldState> WATERMARK_EOW_ADDR =
       StateTags.watermarkStateInternal("watermark", 
TimestampCombiner.END_OF_WINDOW);
 
   private ApexStateInternals<String> underTest;

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index 9754bb3..33ba80c 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -169,7 +169,7 @@ public class PTransformMatchersTest implements Serializable 
{
         private final String stateId = "mystate";
 
         @StateId(stateId)
-        private final StateSpec<Object, ValueState<Integer>> intState =
+        private final StateSpec<ValueState<Integer>> intState =
             StateSpecs.value(VarIntCoder.of());
 
         @ProcessElement

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
index cc7b574..aa1474c 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
@@ -165,7 +165,7 @@ public abstract class BaseExecutionContext<T extends 
ExecutionContext.StepContex
     }
 
     @Override
-    public abstract StateInternals<?> stateInternals();
+    public abstract StateInternals stateInternals();
 
     @Override
     public abstract TimerInternals timerInternals();

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
index ecd30c0..c0d01ae 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
@@ -92,7 +92,7 @@ public interface ExecutionContext {
         Coder<W> windowCoder)
             throws IOException;
 
-    StateInternals<?> stateInternals();
+    StateInternals stateInternals();
 
     TimerInternals timerInternals();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
index 9e66f07..311ed1c 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
@@ -57,7 +57,7 @@ public class GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, 
OutputT, W extends
     InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
     timerInternals.advanceProcessingTime(Instant.now());
     timerInternals.advanceSynchronizedProcessingTime(Instant.now());
-    StateInternals<K> stateInternals = 
stateInternalsFactory.stateInternalsForKey(key);
+    StateInternals stateInternals = 
stateInternalsFactory.stateInternalsForKey(key);
 
     ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner =
         new ReduceFnRunner<>(

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
index 651458f..34afa5d 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
@@ -63,7 +63,7 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
 
     K key = keyedWorkItem.key();
     TimerInternals timerInternals = c.windowingInternals().timerInternals();
-    StateInternals<K> stateInternals = 
stateInternalsFactory.stateInternalsForKey(key);
+    StateInternals stateInternals = 
stateInternalsFactory.stateInternalsForKey(key);
 
     ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner =
         new ReduceFnRunner<>(

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
index de4ac29..c553dbf 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
@@ -114,7 +114,7 @@ public class GroupAlsoByWindowViaWindowSetNewDoFn<
     KeyedWorkItem<K, InputT> keyedWorkItem = c.element();
 
     K key = keyedWorkItem.key();
-    StateInternals<K> stateInternals = 
stateInternalsFactory.stateInternalsForKey(key);
+    StateInternals stateInternals = 
stateInternalsFactory.stateInternalsForKey(key);
     TimerInternals timerInternals = 
timerInternalsFactory.timerInternalsForKey(key);
 
     ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner =

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
index 2c02282..199ce41 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
@@ -53,7 +53,7 @@ import org.joda.time.Instant;
  * and for running tests that need state.
  */
 @Experimental(Kind.STATE)
-public class InMemoryStateInternals<K> implements StateInternals<K> {
+public class InMemoryStateInternals<K> implements StateInternals {
 
   public static <K> InMemoryStateInternals<K> forKey(K key) {
     return new InMemoryStateInternals<>(key);
@@ -79,10 +79,10 @@ public class InMemoryStateInternals<K> implements 
StateInternals<K> {
     T copy();
   }
 
-  protected final StateTable<K> inMemoryState = new StateTable<K>() {
+  protected final StateTable inMemoryState = new StateTable() {
     @Override
-    protected StateBinder<K> binderForNamespace(StateNamespace namespace, 
StateContext<?> c) {
-      return new InMemoryStateBinder<K>(key, c);
+    protected StateBinder binderForNamespace(StateNamespace namespace, 
StateContext<?> c) {
+      return new InMemoryStateBinder(c);
     }
   };
 
@@ -99,48 +99,46 @@ public class InMemoryStateInternals<K> implements 
StateInternals<K> {
   }
 
   @Override
-  public <T extends State> T state(StateNamespace namespace, StateTag<? super 
K, T> address) {
+  public <T extends State> T state(StateNamespace namespace, StateTag<T> 
address) {
     return inMemoryState.get(namespace, address, StateContexts.nullContext());
   }
 
   @Override
   public <T extends State> T state(
-      StateNamespace namespace, StateTag<? super K, T> address, final 
StateContext<?> c) {
+      StateNamespace namespace, StateTag<T> address, final StateContext<?> c) {
     return inMemoryState.get(namespace, address, c);
   }
 
   /**
    * A {@link StateBinder} that returns In Memory {@link State} objects.
    */
-  public static class InMemoryStateBinder<K> implements StateBinder<K> {
-    private final K key;
+  public static class InMemoryStateBinder implements StateBinder {
     private final StateContext<?> c;
 
-    public InMemoryStateBinder(K key, StateContext<?> c) {
-      this.key = key;
+    public InMemoryStateBinder(StateContext<?> c) {
       this.c = c;
     }
 
     @Override
     public <T> ValueState<T> bindValue(
-        StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
+        StateTag<ValueState<T>> address, Coder<T> coder) {
       return new InMemoryValue<T>();
     }
 
     @Override
     public <T> BagState<T> bindBag(
-        final StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
+        final StateTag<BagState<T>> address, Coder<T> elemCoder) {
       return new InMemoryBag<T>();
     }
 
     @Override
-    public <T> SetState<T> bindSet(StateTag<? super K, SetState<T>> spec, 
Coder<T> elemCoder) {
+    public <T> SetState<T> bindSet(StateTag<SetState<T>> spec, Coder<T> 
elemCoder) {
       return new InMemorySet<>();
     }
 
     @Override
     public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
-        StateTag<? super K, MapState<KeyT, ValueT>> spec,
+        StateTag<MapState<KeyT, ValueT>> spec,
         Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
       return new InMemoryMap<>();
     }
@@ -148,23 +146,23 @@ public class InMemoryStateInternals<K> implements 
StateInternals<K> {
     @Override
     public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
         bindCombiningValue(
-            StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> 
address,
+            StateTag<CombiningState<InputT, AccumT, OutputT>> address,
             Coder<AccumT> accumCoder,
             final CombineFn<InputT, AccumT, OutputT> combineFn) {
       return new InMemoryCombiningState<>(combineFn);
     }
 
     @Override
-    public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
-        StateTag<? super K, WatermarkHoldState> address,
+    public WatermarkHoldState bindWatermark(
+        StateTag<WatermarkHoldState> address,
         TimestampCombiner timestampCombiner) {
-      return new InMemoryWatermarkHold<W>(timestampCombiner);
+      return new InMemoryWatermarkHold(timestampCombiner);
     }
 
     @Override
     public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
         bindCombiningValueWithContext(
-            StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> 
address,
+            StateTag<CombiningState<InputT, AccumT, OutputT>> address,
             Coder<AccumT> accumCoder,
             CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
       return bindCombiningValue(address, accumCoder, 
CombineFnUtil.bindContext(combineFn, c));

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingActiveWindowSet.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingActiveWindowSet.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingActiveWindowSet.java
index b4e864c..2faedbb 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingActiveWindowSet.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingActiveWindowSet.java
@@ -67,10 +67,10 @@ public class MergingActiveWindowSet<W extends 
BoundedWindow> implements ActiveWi
    */
   private final ValueState<Map<W, Set<W>>> valueState;
 
-  public MergingActiveWindowSet(WindowFn<Object, W> windowFn, 
StateInternals<?> state) {
+  public MergingActiveWindowSet(WindowFn<Object, W> windowFn, StateInternals 
state) {
     this.windowFn = windowFn;
 
-    StateTag<Object, ValueState<Map<W, Set<W>>>> tag =
+    StateTag<ValueState<Map<W, Set<W>>>> tag =
         StateTags.makeSystemTagInternal(StateTags.value(
             "tree", MapCoder.of(windowFn.windowCoder(), 
SetCoder.of(windowFn.windowCoder()))));
     valueState = state.state(StateNamespaces.global(), tag);

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingStateAccessor.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingStateAccessor.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingStateAccessor.java
index e948650..5ffb9a2 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingStateAccessor.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingStateAccessor.java
@@ -37,5 +37,5 @@ public interface MergingStateAccessor<K, W extends 
BoundedWindow>
    * are known to have state.
    */
   <StateT extends State> Map<W, StateT> accessInEachMergingWindow(
-      StateTag<? super K, StateT> address);
+      StateTag<StateT> address);
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
index 3e875c2..06dcc9c 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
@@ -113,7 +113,7 @@ public abstract class NonEmptyPanes<K, W extends 
BoundedWindow> {
   private static class GeneralNonEmptyPanes<K, W extends BoundedWindow>
       extends NonEmptyPanes<K, W> {
 
-    private static final StateTag<Object, CombiningState<Long, long[], Long>>
+    private static final StateTag<CombiningState<Long, long[], Long>>
         PANE_ADDITIONS_TAG =
         
StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
             "count", VarLongCoder.of(), Sum.ofLongs()));

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
index 4cf4d67..66b3960 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
@@ -43,7 +43,7 @@ public class PaneInfoTracker {
   }
 
   @VisibleForTesting
-  static final StateTag<Object, ValueState<PaneInfo>> PANE_INFO_TAG =
+  static final StateTag<ValueState<PaneInfo>> PANE_INFO_TAG =
       StateTags.makeSystemTagInternal(StateTags.value("pane", 
PaneInfoCoder.INSTANCE));
 
   public void clear(StateAccessor<?> state) {

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
index 8493474..3031ebf 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
@@ -51,7 +51,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends 
BoundedWindow> {
   private final K key;
   private final ReduceFn<K, InputT, OutputT, W> reduceFn;
   private final WindowingStrategy<?, W> windowingStrategy;
-  private final StateInternals<K> stateInternals;
+  private final StateInternals stateInternals;
   private final ActiveWindowSet<W> activeWindows;
   private final TimerInternals timerInternals;
   private final SideInputReader sideInputReader;
@@ -61,7 +61,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends 
BoundedWindow> {
       K key,
       ReduceFn<K, InputT, OutputT, W> reduceFn,
       WindowingStrategy<?, W> windowingStrategy,
-      StateInternals<K> stateInternals,
+      StateInternals stateInternals,
       ActiveWindowSet<W> activeWindows,
       TimerInternals timerInternals,
       SideInputReader sideInputReader,
@@ -165,11 +165,15 @@ class ReduceFnContextFactory<K, InputT, OutputT, W 
extends BoundedWindow> {
     protected final StateContext<W> context;
     protected final StateNamespace windowNamespace;
     protected final Coder<W> windowCoder;
-    protected final StateInternals<K> stateInternals;
+    protected final StateInternals stateInternals;
     protected final StateStyle style;
 
-    public StateAccessorImpl(ActiveWindowSet<W> activeWindows, Coder<W> 
windowCoder,
-        StateInternals<K> stateInternals, StateContext<W> context, StateStyle 
style) {
+    public StateAccessorImpl(
+        ActiveWindowSet<W> activeWindows,
+        Coder<W> windowCoder,
+        StateInternals stateInternals,
+        StateContext<W> context,
+        StateStyle style) {
 
       this.activeWindows = activeWindows;
       this.windowCoder = windowCoder;
@@ -196,7 +200,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends 
BoundedWindow> {
     }
 
     @Override
-    public <StateT extends State> StateT access(StateTag<? super K, StateT> 
address) {
+    public <StateT extends State> StateT access(StateTag<StateT> address) {
       switch (style) {
         case DIRECT:
           return stateInternals.state(windowNamespace(), address, context);
@@ -212,8 +216,12 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends 
BoundedWindow> {
       extends StateAccessorImpl<K, W> implements MergingStateAccessor<K, W> {
     private final Collection<W> activeToBeMerged;
 
-    public MergingStateAccessorImpl(ActiveWindowSet<W> activeWindows, Coder<W> 
windowCoder,
-        StateInternals<K> stateInternals, StateStyle style, Collection<W> 
activeToBeMerged,
+    public MergingStateAccessorImpl(
+        ActiveWindowSet<W> activeWindows,
+        Coder<W> windowCoder,
+        StateInternals stateInternals,
+        StateStyle style,
+        Collection<W> activeToBeMerged,
         W mergeResult) {
       super(activeWindows, windowCoder, stateInternals,
           stateContextForWindowOnly(mergeResult), style);
@@ -221,7 +229,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends 
BoundedWindow> {
     }
 
     @Override
-    public <StateT extends State> StateT access(StateTag<? super K, StateT> 
address) {
+    public <StateT extends State> StateT access(StateTag<StateT> address) {
       switch (style) {
         case DIRECT:
           return stateInternals.state(windowNamespace(), address, context);
@@ -237,7 +245,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends 
BoundedWindow> {
 
     @Override
     public <StateT extends State> Map<W, StateT> accessInEachMergingWindow(
-        StateTag<? super K, StateT> address) {
+        StateTag<StateT> address) {
       ImmutableMap.Builder<W, StateT> builder = ImmutableMap.builder();
       for (W mergingWindow : activeToBeMerged) {
         StateNamespace namespace = null;
@@ -258,8 +266,11 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends 
BoundedWindow> {
 
   static class PremergingStateAccessorImpl<K, W extends BoundedWindow>
       extends StateAccessorImpl<K, W> implements MergingStateAccessor<K, W> {
-    public PremergingStateAccessorImpl(ActiveWindowSet<W> activeWindows, 
Coder<W> windowCoder,
-        StateInternals<K> stateInternals, W window) {
+    public PremergingStateAccessorImpl(
+        ActiveWindowSet<W> activeWindows,
+        Coder<W> windowCoder,
+        StateInternals stateInternals,
+        W window) {
       super(activeWindows, windowCoder, stateInternals,
           stateContextForWindowOnly(window), StateStyle.RENAMED);
     }
@@ -270,7 +281,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends 
BoundedWindow> {
 
     @Override
     public <StateT extends State> Map<W, StateT> accessInEachMergingWindow(
-        StateTag<? super K, StateT> address) {
+        StateTag<StateT> address) {
       ImmutableMap.Builder<W, StateT> builder = ImmutableMap.builder();
       for (W stateAddressWindow : 
activeWindows.readStateAddresses(context.window())) {
         StateT stateForWindow =

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index 08014ec..d3dc067 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -108,7 +108,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
 
   private final OutputWindowedValue<KV<K, OutputT>> outputter;
 
-  private final StateInternals<K> stateInternals;
+  private final StateInternals stateInternals;
 
   private final Counter droppedDueToClosedWindow;
 
@@ -214,7 +214,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
       K key,
       WindowingStrategy<?, W> windowingStrategy,
       ExecutableTriggerStateMachine triggerStateMachine,
-      StateInternals<K> stateInternals,
+      StateInternals stateInternals,
       TimerInternals timerInternals,
       OutputWindowedValue<KV<K, OutputT>> outputter,
       SideInputReader sideInputReader,

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
index 26e920a..5c67148 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
@@ -61,7 +61,7 @@ public class SideInputHandler implements 
ReadyCheckingSideInputReader {
    * be keep locally but if side inputs are broadcast to all parallel 
operators then all will
    * have the same view of the state.
    */
-  private final StateInternals<Void> stateInternals;
+  private final StateInternals stateInternals;
 
   /**
    * A state tag for each side input that we handle. The state is used to track
@@ -70,7 +70,6 @@ public class SideInputHandler implements 
ReadyCheckingSideInputReader {
   private final Map<
       PCollectionView<?>,
       StateTag<
-          Object,
           CombiningState<
                         BoundedWindow,
                         Set<BoundedWindow>,
@@ -81,7 +80,7 @@ public class SideInputHandler implements 
ReadyCheckingSideInputReader {
    */
   private final Map<
       PCollectionView<?>,
-      StateTag<Object, ValueState<Iterable<WindowedValue<?>>>>> 
sideInputContentsTags;
+      StateTag<ValueState<Iterable<WindowedValue<?>>>>> sideInputContentsTags;
 
   /**
    * Creates a new {@code SideInputHandler} for the given side inputs that uses
@@ -89,7 +88,7 @@ public class SideInputHandler implements 
ReadyCheckingSideInputReader {
    */
   public SideInputHandler(
       Collection<PCollectionView<?>> sideInputs,
-      StateInternals<Void> stateInternals) {
+      StateInternals stateInternals) {
     this.sideInputs = sideInputs;
     this.stateInternals = stateInternals;
     this.availableWindowsTags = new HashMap<>();
@@ -105,7 +104,6 @@ public class SideInputHandler implements 
ReadyCheckingSideInputReader {
               .windowCoder();
 
       StateTag<
-          Object,
           CombiningState<
                         BoundedWindow,
                         Set<BoundedWindow>,
@@ -117,7 +115,7 @@ public class SideInputHandler implements 
ReadyCheckingSideInputReader {
       availableWindowsTags.put(sideInput, availableTag);
 
       Coder<Iterable<WindowedValue<?>>> coder = sideInput.getCoderInternal();
-      StateTag<Object, ValueState<Iterable<WindowedValue<?>>>> stateTag =
+      StateTag<ValueState<Iterable<WindowedValue<?>>>> stateTag =
           StateTags.value("side-input-data-" + 
sideInput.getTagInternal().getId(), coder);
       sideInputContentsTags.put(sideInput, stateTag);
     }
@@ -145,7 +143,7 @@ public class SideInputHandler implements 
ReadyCheckingSideInputReader {
       inputWithReifiedWindows.add(value.withValue(e));
     }
 
-    StateTag<Object, ValueState<Iterable<WindowedValue<?>>>> stateTag =
+    StateTag<ValueState<Iterable<WindowedValue<?>>>> stateTag =
         sideInputContentsTags.get(sideInput);
 
     for (BoundedWindow window: value.getWindows()) {
@@ -175,7 +173,7 @@ public class SideInputHandler implements 
ReadyCheckingSideInputReader {
             .getWindowFn()
             .windowCoder();
 
-    StateTag<Object, ValueState<Iterable<WindowedValue<?>>>> stateTag =
+    StateTag<ValueState<Iterable<WindowedValue<?>>>> stateTag =
         sideInputContentsTags.get(sideInput);
 
     ValueState<Iterable<WindowedValue<?>>> state =

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 1865d54..edce9a2 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -614,8 +614,8 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
     @Override
     public State state(String stateId) {
       try {
-        StateSpec<?, ?> spec =
-            (StateSpec<?, ?>) 
signature.stateDeclarations().get(stateId).field().get(fn);
+        StateSpec<?> spec =
+            (StateSpec<?>) 
signature.stateDeclarations().get(stateId).field().get(fn);
         return stepContext
             .stateInternals()
             .state(getNamespace(), StateTags.tagForSpec(stateId, (StateSpec) 
spec));
@@ -726,8 +726,8 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
     @Override
     public State state(String stateId) {
       try {
-        StateSpec<?, ?> spec =
-            (StateSpec<?, ?>) 
signature.stateDeclarations().get(stateId).field().get(fn);
+        StateSpec<?> spec =
+            (StateSpec<?>) 
signature.stateDeclarations().get(stateId).field().get(fn);
         return stepContext
             .stateInternals()
             .state(getNamespace(), StateTags.tagForSpec(stateId, (StateSpec) 
spec));

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
index b8db491..b5f8f45 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
@@ -489,7 +489,7 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, OutputT
         }
 
         @Override
-        public StateInternals<?> stateInternals() {
+        public StateInternals stateInternals() {
           return context.stepContext.stateInternals();
         }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index 0fa6f76..94f5f35 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -353,7 +353,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
      * the input watermark when the first {@link DoFn.ProcessElement} call for 
this element
      * completes.
      */
-    private static final StateTag<Object, WatermarkHoldState> watermarkHoldTag 
=
+    private static final StateTag<WatermarkHoldState> watermarkHoldTag =
         StateTags.makeSystemTagInternal(
             StateTags.<GlobalWindow>watermarkStateInternal(
                 "hold", TimestampCombiner.LATEST));
@@ -363,13 +363,13 @@ public class SplittableParDo<InputT, OutputT, 
RestrictionT>
      * DoFn.ProcessElement} call and read during subsequent calls in response 
to timer firings, when
      * the original element is no longer available.
      */
-    private final StateTag<Object, ValueState<WindowedValue<InputT>>> 
elementTag;
+    private final StateTag<ValueState<WindowedValue<InputT>>> elementTag;
 
     /**
      * The state cell containing a restriction representing the unprocessed 
part of work for this
      * element.
      */
-    private StateTag<Object, ValueState<RestrictionT>> restrictionTag;
+    private StateTag<ValueState<RestrictionT>> restrictionTag;
 
     private final DoFn<InputT, OutputT> fn;
     private final Coder<InputT> elementCoder;
@@ -453,7 +453,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
     @ProcessElement
     public void processElement(final ProcessContext c) {
       String key = c.element().key();
-      StateInternals<String> stateInternals =
+      StateInternals stateInternals =
           stateInternalsFactory.stateInternalsForKey(key);
       TimerInternals timerInternals = 
timerInternalsFactory.timerInternalsForKey(key);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/StateAccessor.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateAccessor.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateAccessor.java
index 87353f2..eda896b 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateAccessor.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateAccessor.java
@@ -34,5 +34,5 @@ public interface StateAccessor<K> {
    * <p>Never accounts for merged windows. When windows are merged, any state 
accessed via
    * this method must be eagerly combined and written into the result window.
    */
-  <StateT extends State> StateT access(StateTag<? super K, StateT> address);
+  <StateT extends State> StateT access(StateTag<StateT> address);
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternals.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternals.java
index e6440bf..c2e9412 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternals.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternals.java
@@ -40,20 +40,20 @@ import org.apache.beam.sdk.util.state.StateContext;
  * used directly, and is highly likely to change.
  */
 @Experimental(Kind.STATE)
-public interface StateInternals<K> {
+public interface StateInternals {
 
   /** The key for this {@link StateInternals}. */
-  K getKey();
+  Object getKey();
 
   /**
    * Return the state associated with {@code address} in the specified {@code 
namespace}.
    */
-  <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> 
address);
+  <T extends State> T state(StateNamespace namespace, StateTag<T> address);
 
   /**
    * Return the state associated with {@code address} in the specified {@code 
namespace}
    * with the {@link StateContext}.
    */
   <T extends State> T state(
-      StateNamespace namespace, StateTag<? super K, T> address, 
StateContext<?> c);
+      StateNamespace namespace, StateTag<T> address, StateContext<?> c);
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternalsFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternalsFactory.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternalsFactory.java
index ea7d742..c756364 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternalsFactory.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternalsFactory.java
@@ -31,5 +31,5 @@ import org.apache.beam.sdk.annotations.Experimental.Kind;
 public interface StateInternalsFactory<K> {
 
   /** Returns {@link StateInternals} for the provided key. */
-  StateInternals<K> stateInternalsForKey(K key);
+  StateInternals stateInternalsForKey(K key);
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
index ce37fd3..f6b9103 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
@@ -42,7 +42,7 @@ public class StateMerging {
    * in {@code context}.
    */
   public static <K, StateT extends State, W extends BoundedWindow> void clear(
-      MergingStateAccessor<K, W> context, StateTag<? super K, StateT> address) 
{
+      MergingStateAccessor<K, W> context, StateTag<StateT> address) {
     for (StateT state : context.accessInEachMergingWindow(address).values()) {
       state.clear();
     }
@@ -54,7 +54,7 @@ public class StateMerging {
    * blindly append to.
    */
   public static <K, T, W extends BoundedWindow> void prefetchBags(
-      MergingStateAccessor<K, W> context, StateTag<? super K, BagState<T>> 
address) {
+      MergingStateAccessor<K, W> context, StateTag<BagState<T>> address) {
     Map<W, BagState<T>> map = context.accessInEachMergingWindow(address);
     if (map.isEmpty()) {
       // Nothing to prefetch.
@@ -73,7 +73,7 @@ public class StateMerging {
    * Merge all bag state in {@code address} across all windows under merge.
    */
   public static <K, T, W extends BoundedWindow> void mergeBags(
-      MergingStateAccessor<K, W> context, StateTag<? super K, BagState<T>> 
address) {
+      MergingStateAccessor<K, W> context, StateTag<BagState<T>> address) {
     mergeBags(context.accessInEachMergingWindow(address).values(), 
context.access(address));
   }
 
@@ -116,7 +116,7 @@ public class StateMerging {
    * Merge all set state in {@code address} across all windows under merge.
    */
   public static <K, T, W extends BoundedWindow> void mergeSets(
-      MergingStateAccessor<K, W> context, StateTag<? super K, SetState<T>> 
address) {
+      MergingStateAccessor<K, W> context, StateTag<SetState<T>> address) {
     mergeSets(context.accessInEachMergingWindow(address).values(), 
context.access(address));
   }
 
@@ -161,7 +161,7 @@ public class StateMerging {
    */
   public static <K, StateT extends GroupingState<?, ?>, W extends 
BoundedWindow> void
       prefetchCombiningValues(MergingStateAccessor<K, W> context,
-          StateTag<? super K, StateT> address) {
+          StateTag<StateT> address) {
     for (StateT state : context.accessInEachMergingWindow(address).values()) {
       prefetchRead(state);
     }
@@ -172,7 +172,7 @@ public class StateMerging {
    */
   public static <K, InputT, AccumT, OutputT, W extends BoundedWindow> void 
mergeCombiningValues(
       MergingStateAccessor<K, W> context,
-      StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address) {
+      StateTag<CombiningState<InputT, AccumT, OutputT>> address) {
     mergeCombiningValues(
         context.accessInEachMergingWindow(address).values(), 
context.access(address));
   }
@@ -218,7 +218,7 @@ public class StateMerging {
    */
   public static <K, W extends BoundedWindow> void prefetchWatermarks(
       MergingStateAccessor<K, W> context,
-      StateTag<? super K, WatermarkHoldState> address) {
+      StateTag<WatermarkHoldState> address) {
     Map<W, WatermarkHoldState> map = 
context.accessInEachMergingWindow(address);
     WatermarkHoldState result = context.access(address);
     if (map.isEmpty()) {
@@ -250,7 +250,7 @@ public class StateMerging {
    */
   public static <K, W extends BoundedWindow> void mergeWatermarks(
       MergingStateAccessor<K, W> context,
-      StateTag<? super K, WatermarkHoldState> address,
+      StateTag<WatermarkHoldState> address,
       W mergeResult) {
     mergeWatermarks(
         context.accessInEachMergingWindow(address).values(), 
context.access(address), mergeResult);

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTable.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTable.java 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTable.java
index d2511c9..1bf4ff5 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTable.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTable.java
@@ -28,9 +28,9 @@ import org.apache.beam.sdk.util.state.StateContext;
 /**
  * Table mapping {@code StateNamespace} and {@code StateTag<?>} to a {@code 
State} instance.
  */
-public abstract class StateTable<K> {
+public abstract class StateTable {
 
-  private final Table<StateNamespace, StateTag<? super K, ?>, State> 
stateTable =
+  private final Table<StateNamespace, StateTag<?>, State> stateTable =
       HashBasedTable.create();
 
   /**
@@ -39,7 +39,7 @@ public abstract class StateTable<K> {
    * already present in this {@link StateTable}.
    */
   public <StateT extends State> StateT get(
-      StateNamespace namespace, StateTag<? super K, StateT> tag, 
StateContext<?> c) {
+      StateNamespace namespace, StateTag<StateT> tag, StateContext<?> c) {
     State storage = stateTable.get(namespace, tag);
     if (storage != null) {
       @SuppressWarnings("unchecked")
@@ -68,7 +68,7 @@ public abstract class StateTable<K> {
     return stateTable.containsRow(namespace);
   }
 
-  public Map<StateTag<? super K, ?>, State> getTagsInUse(StateNamespace 
namespace) {
+  public Map<StateTag<?>, State> getTagsInUse(StateNamespace namespace) {
     return stateTable.row(namespace);
   }
 
@@ -80,5 +80,5 @@ public abstract class StateTable<K> {
    * Provide the {@code StateBinder} to use for creating {@code Storage} 
instances
    * in the specified {@code namespace}.
    */
-  protected abstract StateBinder<K> binderForNamespace(StateNamespace 
namespace, StateContext<?> c);
+  protected abstract StateBinder binderForNamespace(StateNamespace namespace, 
StateContext<?> c);
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
index aaeecf0..38e9dea 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
@@ -25,7 +25,6 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
 import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.util.state.BagState;
 import org.apache.beam.sdk.util.state.CombiningState;
@@ -46,12 +45,10 @@ import org.apache.beam.sdk.util.state.WatermarkHoldState;
  *
  * <p>Currently, this can only be used in a step immediately following a 
{@link GroupByKey}.
  *
- * @param <K> The type of key that must be used with the state tag. 
Contravariant: methods should
- *            accept values of type {@code KeyedStateTag<? super K, StateT>}.
  * @param <StateT> The type of state being tagged.
  */
 @Experimental(Kind.STATE)
-public interface StateTag<K, StateT extends State> extends Serializable {
+public interface StateTag<StateT extends State> extends Serializable {
 
   /** Append the UTF-8 encoding of this tag to the given {@link Appendable}. */
   void appendTo(Appendable sb) throws IOException;
@@ -64,7 +61,7 @@ public interface StateTag<K, StateT extends State> extends 
Serializable {
   /**
    * The specification for the state stored in the referenced cell.
    */
-  StateSpec<K, StateT> getSpec();
+  StateSpec<StateT> getSpec();
 
   /**
    * Bind this state tag. See {@link StateSpec#bind}.
@@ -72,35 +69,34 @@ public interface StateTag<K, StateT extends State> extends 
Serializable {
    * @deprecated Use the {@link StateSpec#bind} method via {@link #getSpec} 
for now.
    */
   @Deprecated
-  StateT bind(StateBinder<? extends K> binder);
+  StateT bind(StateBinder binder);
 
   /**
    * Visitor for binding a {@link StateSpec} and to the associated {@link 
State}.
    *
-   * @param <K> the type of key this binder embodies.
    * @deprecated for migration only; runners should reference the top level 
{@link StateBinder}
    * and move towards {@link StateSpec} rather than {@link StateTag}.
    */
   @Deprecated
-  public interface StateBinder<K> {
-    <T> ValueState<T> bindValue(StateTag<? super K, ValueState<T>> spec, 
Coder<T> coder);
+  public interface StateBinder {
+    <T> ValueState<T> bindValue(StateTag<ValueState<T>> spec, Coder<T> coder);
 
-    <T> BagState<T> bindBag(StateTag<? super K, BagState<T>> spec, Coder<T> 
elemCoder);
+    <T> BagState<T> bindBag(StateTag<BagState<T>> spec, Coder<T> elemCoder);
 
-    <T> SetState<T> bindSet(StateTag<? super K, SetState<T>> spec, Coder<T> 
elemCoder);
+    <T> SetState<T> bindSet(StateTag<SetState<T>> spec, Coder<T> elemCoder);
 
     <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
-        StateTag<? super K, MapState<KeyT, ValueT>> spec,
+        StateTag<MapState<KeyT, ValueT>> spec,
         Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder);
 
     <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> 
bindCombiningValue(
-        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
+        StateTag<CombiningState<InputT, AccumT, OutputT>> spec,
         Coder<AccumT> accumCoder,
         CombineFn<InputT, AccumT, OutputT> combineFn);
 
     <InputT, AccumT, OutputT>
         CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(
-            StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
+            StateTag<CombiningState<InputT, AccumT, OutputT>> spec,
             Coder<AccumT> accumCoder,
             CombineFnWithContext<InputT, AccumT, OutputT> combineFn);
 
@@ -110,7 +106,7 @@ public interface StateTag<K, StateT extends State> extends 
Serializable {
      * <p>This accepts the {@link TimestampCombiner} that dictates how 
watermark hold timestamps
      * added to the returned {@link WatermarkHoldState} are to be combined.
      */
-    <W extends BoundedWindow> WatermarkHoldState bindWatermark(
-        StateTag<? super K, WatermarkHoldState> spec, TimestampCombiner 
timestampCombiner);
+    WatermarkHoldState bindWatermark(
+        StateTag<WatermarkHoldState> spec, TimestampCombiner 
timestampCombiner);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
index fe99f27..ca8b238 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
@@ -50,29 +50,29 @@ public class StateTags {
 
   /** @deprecated for migration purposes only */
   @Deprecated
-  private static <K> StateBinder<K> adaptTagBinder(final 
StateTag.StateBinder<K> binder) {
-    return new StateBinder<K>() {
+  private static StateBinder adaptTagBinder(final StateTag.StateBinder binder) 
{
+    return new StateBinder() {
       @Override
       public <T> ValueState<T> bindValue(
-          String id, StateSpec<? super K, ValueState<T>> spec, Coder<T> coder) 
{
+          String id, StateSpec<ValueState<T>> spec, Coder<T> coder) {
         return binder.bindValue(tagForSpec(id, spec), coder);
       }
 
       @Override
       public <T> BagState<T> bindBag(
-          String id, StateSpec<? super K, BagState<T>> spec, Coder<T> 
elemCoder) {
+          String id, StateSpec<BagState<T>> spec, Coder<T> elemCoder) {
         return binder.bindBag(tagForSpec(id, spec), elemCoder);
       }
 
       @Override
       public <T> SetState<T> bindSet(
-          String id, StateSpec<? super K, SetState<T>> spec, Coder<T> 
elemCoder) {
+          String id, StateSpec<SetState<T>> spec, Coder<T> elemCoder) {
         return binder.bindSet(tagForSpec(id, spec), elemCoder);
       }
 
       @Override
       public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
-          String id, StateSpec<? super K, MapState<KeyT, ValueT>> spec,
+          String id, StateSpec<MapState<KeyT, ValueT>> spec,
           Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
         return binder.bindMap(tagForSpec(id, spec), mapKeyCoder, 
mapValueCoder);
       }
@@ -81,7 +81,7 @@ public class StateTags {
       public <InputT, AccumT, OutputT>
       CombiningState<InputT, AccumT, OutputT> bindCombining(
               String id,
-              StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> 
spec,
+              StateSpec<CombiningState<InputT, AccumT, OutputT>> spec,
               Coder<AccumT> accumCoder,
               CombineFn<InputT, AccumT, OutputT> combineFn) {
         return binder.bindCombiningValue(tagForSpec(id, spec), accumCoder, 
combineFn);
@@ -91,7 +91,7 @@ public class StateTags {
       public <InputT, AccumT, OutputT>
       CombiningState<InputT, AccumT, OutputT> bindCombiningWithContext(
               String id,
-              StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> 
spec,
+              StateSpec<CombiningState<InputT, AccumT, OutputT>> spec,
               Coder<AccumT> accumCoder,
               CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
         return binder.bindCombiningValueWithContext(
@@ -99,9 +99,9 @@ public class StateTags {
       }
 
       @Override
-      public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
+      public WatermarkHoldState bindWatermark(
           String id,
-          StateSpec<? super K, WatermarkHoldState> spec,
+          StateSpec<WatermarkHoldState> spec,
           TimestampCombiner timestampCombiner) {
         return binder.bindWatermark(tagForSpec(id, spec), timestampCombiner);
       }
@@ -121,20 +121,20 @@ public class StateTags {
 
   private StateTags() { }
 
-  private interface SystemStateTag<K, StateT extends State> {
-    StateTag<K, StateT> asKind(StateKind kind);
+  private interface SystemStateTag<StateT extends State> {
+    StateTag<StateT> asKind(StateKind kind);
   }
 
   /** Create a state tag for the given id and spec. */
-  public static <K, StateT extends State> StateTag<K, StateT> tagForSpec(
-      String id, StateSpec<K, StateT> spec) {
+  public static <StateT extends State> StateTag<StateT> tagForSpec(
+      String id, StateSpec<StateT> spec) {
     return new SimpleStateTag<>(new StructuredId(id), spec);
   }
 
   /**
    * Create a simple state tag for values of type {@code T}.
    */
-  public static <T> StateTag<Object, ValueState<T>> value(String id, Coder<T> 
valueCoder) {
+  public static <T> StateTag<ValueState<T>> value(String id, Coder<T> 
valueCoder) {
     return new SimpleStateTag<>(new StructuredId(id), 
StateSpecs.value(valueCoder));
   }
 
@@ -143,7 +143,7 @@ public class StateTags {
    * multiple {@code InputT}s into a single {@code OutputT}.
    */
   public static <InputT, AccumT, OutputT>
-    StateTag<Object, CombiningState<InputT, AccumT, OutputT>>
+    StateTag<CombiningState<InputT, AccumT, OutputT>>
     combiningValue(
       String id, Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> 
combineFn) {
     return new SimpleStateTag<>(
@@ -155,7 +155,7 @@ public class StateTags {
    * merge multiple {@code InputT}s into a single {@code OutputT}.
    */
   public static <InputT, AccumT, OutputT>
-      StateTag<Object, CombiningState<InputT, AccumT, OutputT>>
+      StateTag<CombiningState<InputT, AccumT, OutputT>>
       combiningValueWithContext(
           String id,
           Coder<AccumT> accumCoder,
@@ -172,7 +172,7 @@ public class StateTags {
    * should only be used to initialize static values.
    */
   public static <InputT, AccumT, OutputT>
-      StateTag<Object, CombiningState<InputT, AccumT, OutputT>>
+      StateTag<CombiningState<InputT, AccumT, OutputT>>
       combiningValueFromInputInternal(
           String id, Coder<InputT> inputCoder, CombineFn<InputT, AccumT, 
OutputT> combineFn) {
     return new SimpleStateTag<>(
@@ -183,21 +183,21 @@ public class StateTags {
    * Create a state tag that is optimized for adding values frequently, and
    * occasionally retrieving all the values that have been added.
    */
-  public static <T> StateTag<Object, BagState<T>> bag(String id, Coder<T> 
elemCoder) {
+  public static <T> StateTag<BagState<T>> bag(String id, Coder<T> elemCoder) {
     return new SimpleStateTag<>(new StructuredId(id), 
StateSpecs.bag(elemCoder));
   }
 
   /**
    * Create a state spec that supporting for {@link java.util.Set} like access 
patterns.
    */
-  public static <T> StateTag<Object, SetState<T>> set(String id, Coder<T> 
elemCoder) {
+  public static <T> StateTag<SetState<T>> set(String id, Coder<T> elemCoder) {
     return new SimpleStateTag<>(new StructuredId(id), 
StateSpecs.set(elemCoder));
   }
 
   /**
    * Create a state spec that supporting for {@link java.util.Map} like access 
patterns.
    */
-  public static <K, V> StateTag<Object, MapState<K, V>> map(
+  public static <K, V> StateTag<MapState<K, V>> map(
       String id, Coder<K> keyCoder, Coder<V> valueCoder) {
     return new SimpleStateTag<>(new StructuredId(id), StateSpecs.map(keyCoder, 
valueCoder));
   }
@@ -205,7 +205,7 @@ public class StateTags {
   /**
    * Create a state tag for holding the watermark.
    */
-  public static <W extends BoundedWindow> StateTag<Object, WatermarkHoldState>
+  public static <W extends BoundedWindow> StateTag<WatermarkHoldState>
       watermarkStateInternal(String id, TimestampCombiner timestampCombiner) {
     return new SimpleStateTag<>(
         new StructuredId(id), 
StateSpecs.watermarkStateInternal(timestampCombiner));
@@ -215,20 +215,20 @@ public class StateTags {
    * Convert an arbitrary {@link StateTag} to a system-internal tag that is 
guaranteed not to
    * collide with any user tags.
    */
-  public static <K, StateT extends State> StateTag<K, StateT> 
makeSystemTagInternal(
-      StateTag<K, StateT> tag) {
+  public static <StateT extends State> StateTag<StateT> makeSystemTagInternal(
+      StateTag<StateT> tag) {
     if (!(tag instanceof SystemStateTag)) {
       throw new IllegalArgumentException("Expected subclass of SimpleStateTag, 
got " + tag);
     }
     // Checked above
     @SuppressWarnings("unchecked")
-    SystemStateTag<K, StateT> typedTag = (SystemStateTag<K, StateT>) tag;
+    SystemStateTag<StateT> typedTag = (SystemStateTag<StateT>) tag;
     return typedTag.asKind(StateKind.SYSTEM);
   }
 
-  public static <K, InputT, AccumT, OutputT> StateTag<Object, BagState<AccumT>>
+  public static <InputT, AccumT, OutputT> StateTag<BagState<AccumT>>
       convertToBagTagInternal(
-          StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> 
combiningTag) {
+          StateTag<CombiningState<InputT, AccumT, OutputT>> combiningTag) {
     return new SimpleStateTag<>(
         new StructuredId(combiningTag.getId()),
         StateSpecs.convertToBagSpecInternal(combiningTag.getSpec()));
@@ -291,20 +291,20 @@ public class StateTags {
   /**
    * A basic {@link StateTag} implementation that manages the structured ids.
    */
-  private static class SimpleStateTag<K, StateT extends State>
-      implements StateTag<K, StateT>, SystemStateTag<K, StateT> {
+  private static class SimpleStateTag<StateT extends State>
+      implements StateTag<StateT>, SystemStateTag<StateT> {
 
-    private final StateSpec<K, StateT> spec;
+    private final StateSpec<StateT> spec;
     private final StructuredId id;
 
-    public SimpleStateTag(StructuredId id, StateSpec<K, StateT> spec) {
+    public SimpleStateTag(StructuredId id, StateSpec<StateT> spec) {
       this.id = id;
       this.spec = spec;
     }
 
     @Override
     @Deprecated
-    public StateT bind(StateTag.StateBinder<? extends K> binder) {
+    public StateT bind(StateTag.StateBinder binder) {
       return spec.bind(
           this.id.getRawId(), adaptTagBinder(binder));
     }
@@ -315,7 +315,7 @@ public class StateTags {
     }
 
     @Override
-    public StateSpec<K, StateT> getSpec() {
+    public StateSpec<StateT> getSpec() {
       return spec;
     }
 
@@ -332,7 +332,7 @@ public class StateTags {
     }
 
     @Override
-    public StateTag<K, StateT> asKind(StateKind kind) {
+    public StateTag<StateT> asKind(StateKind kind) {
       return new SimpleStateTag<>(id.asKind(kind), spec);
     }
 
@@ -342,7 +342,7 @@ public class StateTags {
         return false;
       }
 
-      SimpleStateTag<?, ?> otherTag = (SimpleStateTag<?, ?>) other;
+      SimpleStateTag<?> otherTag = (SimpleStateTag<?>) other;
       return Objects.equals(this.getId(), otherTag.getId())
           && Objects.equals(this.getSpec(), otherTag.getSpec());
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
index 7a20590..e3717a8 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
@@ -236,12 +236,12 @@ public class StatefulDoFnRunner<InputT, OutputT, W 
extends BoundedWindow>
 
     private final DoFn<?, ?> fn;
     private final DoFnSignature signature;
-    private final StateInternals<?> stateInternals;
+    private final StateInternals stateInternals;
     private final Coder<W> windowCoder;
 
     public StateInternalsStateCleaner(
         DoFn<?, ?> fn,
-        StateInternals<?> stateInternals,
+        StateInternals stateInternals,
         Coder<W> windowCoder) {
       this.fn = fn;
       this.signature = DoFnSignatures.getSignature(fn.getClass());
@@ -254,7 +254,7 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends 
BoundedWindow>
       for (Map.Entry<String, DoFnSignature.StateDeclaration> entry :
           signature.stateDeclarations().entrySet()) {
         try {
-          StateSpec<?, ?> spec = (StateSpec<?, ?>) 
entry.getValue().field().get(fn);
+          StateSpec<?> spec = (StateSpec<?>) entry.getValue().field().get(fn);
           State state = 
stateInternals.state(StateNamespaces.window(windowCoder, window),
               StateTags.tagForSpec(entry.getKey(), (StateSpec) spec));
           state.clear();

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
index 86a7fd7..f18460a 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
@@ -47,7 +47,7 @@ public abstract class SystemReduceFn<K, InputT, AccumT, 
OutputT, W extends Bound
    */
   public static <K, T, W extends BoundedWindow> SystemReduceFn<K, T, 
Iterable<T>, Iterable<T>, W>
       buffering(final Coder<T> inputCoder) {
-    final StateTag<Object, BagState<T>> bufferTag =
+    final StateTag<BagState<T>> bufferTag =
         StateTags.makeSystemTagInternal(StateTags.bag(BUFFER_NAME, 
inputCoder));
     return new SystemReduceFn<K, T, Iterable<T>, Iterable<T>, W>(bufferTag) {
       @Override
@@ -70,7 +70,7 @@ public abstract class SystemReduceFn<K, InputT, AccumT, 
OutputT, W extends Bound
       AccumT, OutputT, W>
       combining(
           final Coder<K> keyCoder, final AppliedCombineFn<K, InputT, AccumT, 
OutputT> combineFn) {
-    final StateTag<Object, CombiningState<InputT, AccumT, OutputT>> bufferTag;
+    final StateTag<CombiningState<InputT, AccumT, OutputT>> bufferTag;
     if (combineFn.getFn() instanceof CombineFnWithContext) {
       bufferTag = StateTags.makeSystemTagInternal(
           StateTags.<InputT, AccumT, OutputT>combiningValueWithContext(
@@ -96,10 +96,10 @@ public abstract class SystemReduceFn<K, InputT, AccumT, 
OutputT, W extends Bound
     };
   }
 
-  private StateTag<? super K, ? extends GroupingState<InputT, OutputT>> 
bufferTag;
+  private StateTag<? extends GroupingState<InputT, OutputT>> bufferTag;
 
   public SystemReduceFn(
-      StateTag<? super K, ? extends GroupingState<InputT, OutputT>> bufferTag) 
{
+      StateTag<? extends GroupingState<InputT, OutputT>> bufferTag) {
     this.bufferTag = bufferTag;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java
index 1dfb85f..18b50db 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java
@@ -32,9 +32,9 @@ public class TestInMemoryStateInternals<K> extends 
InMemoryStateInternals<K> {
     super(key);
   }
 
-  public Set<StateTag<? super K, ?>> getTagsInUse(StateNamespace namespace) {
-    Set<StateTag<? super K, ?>> inUse = new HashSet<>();
-    for (Map.Entry<StateTag<? super K, ?>, State> entry :
+  public Set<StateTag<?>> getTagsInUse(StateNamespace namespace) {
+    Set<StateTag<?>> inUse = new HashSet<>();
+    for (Map.Entry<StateTag<?>, State> entry :
       inMemoryState.getTagsInUse(namespace).entrySet()) {
       if (!isEmptyForTesting(entry.getValue())) {
         inUse.add(entry.getKey());

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
index 9bb9c62..e6e4ffb 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
@@ -54,9 +54,9 @@ class WatermarkHold<W extends BoundedWindow> implements 
Serializable {
    * used for elements.
    */
   public static <W extends BoundedWindow>
-      StateTag<Object, WatermarkHoldState> 
watermarkHoldTagForTimestampCombiner(
+      StateTag<WatermarkHoldState> watermarkHoldTagForTimestampCombiner(
           TimestampCombiner timestampCombiner) {
-    return StateTags.<Object, WatermarkHoldState>makeSystemTagInternal(
+    return StateTags.<WatermarkHoldState>makeSystemTagInternal(
         StateTags.<W>watermarkStateInternal("hold", timestampCombiner));
   }
 
@@ -67,13 +67,13 @@ class WatermarkHold<W extends BoundedWindow> implements 
Serializable {
    * would take the end-of-window time as its element time.)
    */
   @VisibleForTesting
-  public static final StateTag<Object, WatermarkHoldState> EXTRA_HOLD_TAG =
+  public static final StateTag<WatermarkHoldState> EXTRA_HOLD_TAG =
       StateTags.makeSystemTagInternal(StateTags.watermarkStateInternal(
           "extra", TimestampCombiner.EARLIEST));
 
   private final TimerInternals timerInternals;
   private final WindowingStrategy<?, W> windowingStrategy;
-  private final StateTag<Object, WatermarkHoldState> elementHoldTag;
+  private final StateTag<WatermarkHoldState> elementHoldTag;
 
   public WatermarkHold(TimerInternals timerInternals, WindowingStrategy<?, W> 
windowingStrategy) {
     this.timerInternals = timerInternals;

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java
index 5005065..a824a7b 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java
@@ -40,7 +40,7 @@ public interface WindowingInternals<InputT, OutputT> {
    * Unsupported state internals. The key type is unknown. It is up to the 
user to use the
    * correct type of key.
    */
-  StateInternals<?> stateInternals();
+  StateInternals stateInternals();
 
   /**
    * Output the value at the specified timestamp in the listed windows.

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
index b416788..ed2c26f 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
@@ -55,7 +55,7 @@ public abstract class AfterDelayFromFirstElementStateMachine 
extends OnceTrigger
   protected static final List<SerializableFunction<Instant, Instant>> IDENTITY 
=
       ImmutableList.<SerializableFunction<Instant, Instant>>of();
 
-  protected static final StateTag<Object, CombiningState<Instant,
+  protected static final StateTag<CombiningState<Instant,
                                                 Holder<Instant>, Instant>> 
DELAYED_UNTIL_TAG =
       
StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
           "delayed", InstantCoder.of(), Min.<Instant>naturalOrder()));

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
index 11323cc..52fb5ff 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.util.state.CombiningState;
 @Experimental(Experimental.Kind.TRIGGER)
 public class AfterPaneStateMachine extends OnceTriggerStateMachine {
 
-private static final StateTag<Object, CombiningState<Long, long[], Long>>
+private static final StateTag<CombiningState<Long, long[], Long>>
       ELEMENTS_IN_PANE_TAG =
       
StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
           "count", VarLongCoder.of(), Sum.ofLongs()));

Reply via email to