Repository: flink Updated Branches: refs/heads/master 89866a5ad -> cd5527417
http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java index b1c94cb..8aa76a5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java @@ -18,7 +18,7 @@ package org.apache.flink.streaming.api.operators; -import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StateUtil; import org.apache.flink.util.ExceptionUtils; @@ -30,8 +30,8 @@ import java.util.concurrent.RunnableFuture; */ public class OperatorSnapshotResult { - private RunnableFuture<KeyGroupsStateHandle> keyedStateManagedFuture; - private RunnableFuture<KeyGroupsStateHandle> keyedStateRawFuture; + private RunnableFuture<KeyedStateHandle> keyedStateManagedFuture; + private RunnableFuture<KeyedStateHandle> keyedStateRawFuture; private RunnableFuture<OperatorStateHandle> operatorStateManagedFuture; private RunnableFuture<OperatorStateHandle> operatorStateRawFuture; @@ -40,8 +40,8 @@ public class OperatorSnapshotResult { } public OperatorSnapshotResult( - RunnableFuture<KeyGroupsStateHandle> keyedStateManagedFuture, - RunnableFuture<KeyGroupsStateHandle> keyedStateRawFuture, + RunnableFuture<KeyedStateHandle> keyedStateManagedFuture, + RunnableFuture<KeyedStateHandle> keyedStateRawFuture, RunnableFuture<OperatorStateHandle> operatorStateManagedFuture, RunnableFuture<OperatorStateHandle> operatorStateRawFuture) { this.keyedStateManagedFuture = keyedStateManagedFuture; @@ -50,19 +50,19 @@ public class OperatorSnapshotResult { this.operatorStateRawFuture = operatorStateRawFuture; } - public RunnableFuture<KeyGroupsStateHandle> getKeyedStateManagedFuture() { + public RunnableFuture<KeyedStateHandle> getKeyedStateManagedFuture() { return keyedStateManagedFuture; } - public void setKeyedStateManagedFuture(RunnableFuture<KeyGroupsStateHandle> keyedStateManagedFuture) { + public void setKeyedStateManagedFuture(RunnableFuture<KeyedStateHandle> keyedStateManagedFuture) { this.keyedStateManagedFuture = keyedStateManagedFuture; } - public RunnableFuture<KeyGroupsStateHandle> getKeyedStateRawFuture() { + public RunnableFuture<KeyedStateHandle> getKeyedStateRawFuture() { return keyedStateRawFuture; } - public void setKeyedStateRawFuture(RunnableFuture<KeyGroupsStateHandle> keyedStateRawFuture) { + public void setKeyedStateRawFuture(RunnableFuture<KeyedStateHandle> keyedStateRawFuture) { this.keyedStateRawFuture = keyedStateRawFuture; } http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java index 7abf8d9..30d07b7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java @@ -21,7 +21,7 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.state.ChainedStateHandle; -import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.TaskStateHandles; @@ -42,16 +42,16 @@ public class OperatorStateHandles { private final StreamStateHandle legacyOperatorState; - private final Collection<KeyGroupsStateHandle> managedKeyedState; - private final Collection<KeyGroupsStateHandle> rawKeyedState; + private final Collection<KeyedStateHandle> managedKeyedState; + private final Collection<KeyedStateHandle> rawKeyedState; private final Collection<OperatorStateHandle> managedOperatorState; private final Collection<OperatorStateHandle> rawOperatorState; public OperatorStateHandles( int operatorChainIndex, StreamStateHandle legacyOperatorState, - Collection<KeyGroupsStateHandle> managedKeyedState, - Collection<KeyGroupsStateHandle> rawKeyedState, + Collection<KeyedStateHandle> managedKeyedState, + Collection<KeyedStateHandle> rawKeyedState, Collection<OperatorStateHandle> managedOperatorState, Collection<OperatorStateHandle> rawOperatorState) { @@ -83,11 +83,11 @@ public class OperatorStateHandles { return legacyOperatorState; } - public Collection<KeyGroupsStateHandle> getManagedKeyedState() { + public Collection<KeyedStateHandle> getManagedKeyedState() { return managedKeyedState; } - public Collection<KeyGroupsStateHandle> getRawKeyedState() { + public Collection<KeyedStateHandle> getRawKeyedState() { return rawKeyedState; } http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 76b2b98..11e8e0d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -37,7 +37,7 @@ import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.ChainedStateHandle; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.KeyGroupRange; -import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StateBackend; @@ -849,8 +849,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> private final List<OperatorSnapshotResult> snapshotInProgressList; - private RunnableFuture<KeyGroupsStateHandle> futureKeyedBackendStateHandles; - private RunnableFuture<KeyGroupsStateHandle> futureKeyedStreamStateHandles; + private RunnableFuture<KeyedStateHandle> futureKeyedBackendStateHandles; + private RunnableFuture<KeyedStateHandle> futureKeyedStreamStateHandles; private List<StreamStateHandle> nonPartitionedStateHandles; @@ -892,8 +892,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> public void run() { try { // Keyed state handle future, currently only one (the head) operator can have this - KeyGroupsStateHandle keyedStateHandleBackend = FutureUtil.runIfNotDoneAndGet(futureKeyedBackendStateHandles); - KeyGroupsStateHandle keyedStateHandleStream = FutureUtil.runIfNotDoneAndGet(futureKeyedStreamStateHandles); + KeyedStateHandle keyedStateHandleBackend = FutureUtil.runIfNotDoneAndGet(futureKeyedBackendStateHandles); + KeyedStateHandle keyedStateHandleStream = FutureUtil.runIfNotDoneAndGet(futureKeyedStreamStateHandles); List<OperatorStateHandle> operatorStatesBackend = new ArrayList<>(snapshotInProgressList.size()); List<OperatorStateHandle> operatorStatesStream = new ArrayList<>(snapshotInProgressList.size()); @@ -987,8 +987,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> ChainedStateHandle<StreamStateHandle> chainedNonPartitionedOperatorsState, ChainedStateHandle<OperatorStateHandle> chainedOperatorStateBackend, ChainedStateHandle<OperatorStateHandle> chainedOperatorStateStream, - KeyGroupsStateHandle keyedStateHandleBackend, - KeyGroupsStateHandle keyedStateHandleStream) { + KeyedStateHandle keyedStateHandleBackend, + KeyedStateHandle keyedStateHandleStream) { boolean hasAnyState = keyedStateHandleBackend != null || keyedStateHandleStream != null http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java index eeee8dc..8f42c1a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java @@ -51,7 +51,7 @@ import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; -import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; @@ -559,11 +559,11 @@ public class AbstractStreamOperatorTest { final CloseableRegistry closeableRegistry = new CloseableRegistry(); - RunnableFuture<KeyGroupsStateHandle> futureKeyGroupStateHandle = mock(RunnableFuture.class); + RunnableFuture<KeyedStateHandle> futureKeyedStateHandle = mock(RunnableFuture.class); RunnableFuture<OperatorStateHandle> futureOperatorStateHandle = mock(RunnableFuture.class); StateSnapshotContextSynchronousImpl context = mock(StateSnapshotContextSynchronousImpl.class); - when(context.getKeyedStateStreamFuture()).thenReturn(futureKeyGroupStateHandle); + when(context.getKeyedStateStreamFuture()).thenReturn(futureKeyedStateHandle); when(context.getOperatorStateStreamFuture()).thenReturn(futureOperatorStateHandle); OperatorSnapshotResult operatorSnapshotResult = spy(new OperatorSnapshotResult()); @@ -609,9 +609,9 @@ public class AbstractStreamOperatorTest { verify(context).close(); verify(operatorSnapshotResult).cancel(); - verify(futureKeyGroupStateHandle).cancel(anyBoolean()); + verify(futureKeyedStateHandle).cancel(anyBoolean()); verify(futureOperatorStateHandle).cancel(anyBoolean()); - verify(futureKeyGroupStateHandle).cancel(anyBoolean()); + verify(futureKeyedStateHandle).cancel(anyBoolean()); } @Test http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java index 490df52..f57eed1 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java @@ -18,7 +18,7 @@ package org.apache.flink.streaming.api.operators; -import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -41,12 +41,12 @@ public class OperatorSnapshotResultTest extends TestLogger { operatorSnapshotResult.cancel(); - KeyGroupsStateHandle keyedManagedStateHandle = mock(KeyGroupsStateHandle.class); - RunnableFuture<KeyGroupsStateHandle> keyedStateManagedFuture = mock(RunnableFuture.class); + KeyedStateHandle keyedManagedStateHandle = mock(KeyedStateHandle.class); + RunnableFuture<KeyedStateHandle> keyedStateManagedFuture = mock(RunnableFuture.class); when(keyedStateManagedFuture.get()).thenReturn(keyedManagedStateHandle); - KeyGroupsStateHandle keyedRawStateHandle = mock(KeyGroupsStateHandle.class); - RunnableFuture<KeyGroupsStateHandle> keyedStateRawFuture = mock(RunnableFuture.class); + KeyedStateHandle keyedRawStateHandle = mock(KeyedStateHandle.class); + RunnableFuture<KeyedStateHandle> keyedStateRawFuture = mock(RunnableFuture.class); when(keyedStateRawFuture.get()).thenReturn(keyedRawStateHandle); OperatorStateHandle operatorManagedStateHandle = mock(OperatorStateHandle.class); http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java index 963c42c..8e0edfc 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeOffsets; import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StateInitializationContextImpl; import org.apache.flink.runtime.state.StatePartitionStreamProvider; @@ -75,7 +76,7 @@ public class StateInitializationContextImplTest { ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos(64); - List<KeyGroupsStateHandle> keyGroupsStateHandles = new ArrayList<>(NUM_HANDLES); + List<KeyedStateHandle> keyedStateHandles = new ArrayList<>(NUM_HANDLES); int prev = 0; for (int i = 0; i < NUM_HANDLES; ++i) { out.reset(); @@ -91,10 +92,10 @@ public class StateInitializationContextImplTest { ++writtenKeyGroups; } - KeyGroupsStateHandle handle = + KeyedStateHandle handle = new KeyGroupsStateHandle(offsets, new ByteStateHandleCloseChecking("kg-" + i, out.toByteArray())); - keyGroupsStateHandles.add(handle); + keyedStateHandles.add(handle); } List<OperatorStateHandle> operatorStateHandles = new ArrayList<>(NUM_HANDLES); @@ -125,7 +126,7 @@ public class StateInitializationContextImplTest { true, stateStore, mock(KeyedStateStore.class), - keyGroupsStateHandles, + keyedStateHandles, operatorStateHandles, closableRegistry); } http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java index 58cfefd..4435247 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java @@ -51,6 +51,7 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeOffsets; import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StreamStateHandle; @@ -186,8 +187,8 @@ public class InterruptSensitiveRestoreTest { ChainedStateHandle<StreamStateHandle> operatorState = null; - List<KeyGroupsStateHandle> keyGroupStateFromBackend = Collections.emptyList(); - List<KeyGroupsStateHandle> keyGroupStateFromStream = Collections.emptyList(); + List<KeyedStateHandle> keyedStateFromBackend = Collections.emptyList(); + List<KeyedStateHandle> keyedStateFromStream = Collections.emptyList(); List<Collection<OperatorStateHandle>> operatorStateBackend = Collections.emptyList(); List<Collection<OperatorStateHandle>> operatorStateStream = Collections.emptyList(); @@ -201,8 +202,8 @@ public class InterruptSensitiveRestoreTest { Collection<OperatorStateHandle> operatorStateHandles = Collections.singletonList(new OperatorStateHandle(operatorStateMetadata, state)); - List<KeyGroupsStateHandle> keyGroupsStateHandles = - Collections.singletonList(new KeyGroupsStateHandle(keyGroupRangeOffsets, state)); + List<KeyedStateHandle> keyedStateHandles = + Collections.<KeyedStateHandle>singletonList(new KeyGroupsStateHandle(keyGroupRangeOffsets, state)); switch (mode) { case OPERATOR_MANAGED: @@ -212,10 +213,10 @@ public class InterruptSensitiveRestoreTest { operatorStateStream = Collections.singletonList(operatorStateHandles); break; case KEYED_MANAGED: - keyGroupStateFromBackend = keyGroupsStateHandles; + keyedStateFromBackend = keyedStateHandles; break; case KEYED_RAW: - keyGroupStateFromStream = keyGroupsStateHandles; + keyedStateFromStream = keyedStateHandles; break; case LEGACY: operatorState = new ChainedStateHandle<>(Collections.singletonList(state)); @@ -228,8 +229,8 @@ public class InterruptSensitiveRestoreTest { operatorState, operatorStateBackend, operatorStateStream, - keyGroupStateFromBackend, - keyGroupStateFromStream); + keyedStateFromBackend, + keyedStateFromStream); JobInformation jobInformation = new JobInformation( new JobID(), http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index d7e3d6c..f34522b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -61,7 +61,7 @@ import org.apache.flink.runtime.state.ChainedStateHandle; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.DoneFuture; import org.apache.flink.runtime.state.KeyGroupRange; -import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StateBackendFactory; @@ -458,8 +458,8 @@ public class StreamTaskTest extends TestLogger { StreamOperator<?> streamOperator = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class)); - KeyGroupsStateHandle managedKeyedStateHandle = mock(KeyGroupsStateHandle.class); - KeyGroupsStateHandle rawKeyedStateHandle = mock(KeyGroupsStateHandle.class); + KeyedStateHandle managedKeyedStateHandle = mock(KeyedStateHandle.class); + KeyedStateHandle rawKeyedStateHandle = mock(KeyedStateHandle.class); OperatorStateHandle managedOperatorStateHandle = mock(OperatorStateHandle.class); OperatorStateHandle rawOperatorStateHandle = mock(OperatorStateHandle.class); @@ -563,8 +563,8 @@ public class StreamTaskTest extends TestLogger { (ChainedStateHandle<StreamStateHandle>)invocation.getArguments()[0], (ChainedStateHandle<OperatorStateHandle>)invocation.getArguments()[1], (ChainedStateHandle<OperatorStateHandle>)invocation.getArguments()[2], - (KeyGroupsStateHandle)invocation.getArguments()[3], - (KeyGroupsStateHandle)invocation.getArguments()[4]); + (KeyedStateHandle)invocation.getArguments()[3], + (KeyedStateHandle)invocation.getArguments()[4]); } }); @@ -574,8 +574,8 @@ public class StreamTaskTest extends TestLogger { StreamOperator<?> streamOperator = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class)); - KeyGroupsStateHandle managedKeyedStateHandle = mock(KeyGroupsStateHandle.class); - KeyGroupsStateHandle rawKeyedStateHandle = mock(KeyGroupsStateHandle.class); + KeyedStateHandle managedKeyedStateHandle = mock(KeyedStateHandle.class); + KeyedStateHandle rawKeyedStateHandle = mock(KeyedStateHandle.class); OperatorStateHandle managedOperatorStateHandle = mock(OperatorStateHandle.class); OperatorStateHandle rawOperatorStateHandle = mock(OperatorStateHandle.class); http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java index 945103c..912d579 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java @@ -41,6 +41,7 @@ import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; @@ -318,7 +319,7 @@ public class AbstractStreamOperatorTestHarness<OUT> { StreamStateHandle stateHandle = SavepointV0Serializer.convertOperatorAndFunctionState(state); - List<KeyGroupsStateHandle> keyGroupStatesList = new ArrayList<>(); + List<KeyedStateHandle> keyGroupStatesList = new ArrayList<>(); if (state.getKvStates() != null) { KeyGroupsStateHandle keyedStateHandle = SavepointV0Serializer.convertKeyedBackendState( state.getKvStates(), @@ -331,7 +332,7 @@ public class AbstractStreamOperatorTestHarness<OUT> { initializeState(new OperatorStateHandles(0, stateHandle, keyGroupStatesList, - Collections.<KeyGroupsStateHandle>emptyList(), + Collections.<KeyedStateHandle>emptyList(), Collections.<OperatorStateHandle>emptyList(), Collections.<OperatorStateHandle>emptyList())); } @@ -364,16 +365,16 @@ public class AbstractStreamOperatorTestHarness<OUT> { KeyGroupRange localKeyGroupRange = keyGroupPartitions.get(subtaskIndex); - List<KeyGroupsStateHandle> localManagedKeyGroupState = null; + List<KeyedStateHandle> localManagedKeyGroupState = null; if (operatorStateHandles.getManagedKeyedState() != null) { - localManagedKeyGroupState = StateAssignmentOperation.getKeyGroupsStateHandles( + localManagedKeyGroupState = StateAssignmentOperation.getKeyedStateHandles( operatorStateHandles.getManagedKeyedState(), localKeyGroupRange); } - List<KeyGroupsStateHandle> localRawKeyGroupState = null; + List<KeyedStateHandle> localRawKeyGroupState = null; if (operatorStateHandles.getRawKeyedState() != null) { - localRawKeyGroupState = StateAssignmentOperation.getKeyGroupsStateHandles( + localRawKeyGroupState = StateAssignmentOperation.getKeyedStateHandles( operatorStateHandles.getRawKeyedState(), localKeyGroupRange); } @@ -442,15 +443,15 @@ public class AbstractStreamOperatorTestHarness<OUT> { List<OperatorStateHandle> mergedManagedOperatorState = new ArrayList<>(handles.length); List<OperatorStateHandle> mergedRawOperatorState = new ArrayList<>(handles.length); - List<KeyGroupsStateHandle> mergedManagedKeyedState = new ArrayList<>(handles.length); - List<KeyGroupsStateHandle> mergedRawKeyedState = new ArrayList<>(handles.length); + List<KeyedStateHandle> mergedManagedKeyedState = new ArrayList<>(handles.length); + List<KeyedStateHandle> mergedRawKeyedState = new ArrayList<>(handles.length); for (OperatorStateHandles handle: handles) { Collection<OperatorStateHandle> managedOperatorState = handle.getManagedOperatorState(); Collection<OperatorStateHandle> rawOperatorState = handle.getRawOperatorState(); - Collection<KeyGroupsStateHandle> managedKeyedState = handle.getManagedKeyedState(); - Collection<KeyGroupsStateHandle> rawKeyedState = handle.getRawKeyedState(); + Collection<KeyedStateHandle> managedKeyedState = handle.getManagedKeyedState(); + Collection<KeyedStateHandle> rawKeyedState = handle.getRawKeyedState(); if (managedOperatorState != null) { mergedManagedOperatorState.addAll(managedOperatorState); @@ -502,8 +503,8 @@ public class AbstractStreamOperatorTestHarness<OUT> { timestamp, CheckpointOptions.forFullCheckpoint()); - KeyGroupsStateHandle keyedManaged = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateManagedFuture()); - KeyGroupsStateHandle keyedRaw = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateRawFuture()); + KeyedStateHandle keyedManaged = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateManagedFuture()); + KeyedStateHandle keyedRaw = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateRawFuture()); OperatorStateHandle opManaged = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getOperatorStateManagedFuture()); OperatorStateHandle opRaw = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getOperatorStateRawFuture()); http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java index d45ae21..d9c7387 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.memory.MemoryStateBackend; @@ -65,7 +66,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> // when we restore we keep the state here so that we can call restore // when the operator requests the keyed state backend - private List<KeyGroupsStateHandle> restoredKeyedState = null; + private List<KeyedStateHandle> restoredKeyedState = null; public KeyedOneInputStreamOperatorTestHarness( OneInputStreamOperator<IN, OUT> operator, @@ -144,7 +145,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> } if (keyedStateBackend != null) { - RunnableFuture<KeyGroupsStateHandle> keyedSnapshotRunnable = keyedStateBackend.snapshot( + RunnableFuture<KeyedStateHandle> keyedSnapshotRunnable = keyedStateBackend.snapshot( checkpointId, timestamp, streamFactory, @@ -177,14 +178,14 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> byte keyedStatePresent = (byte) inStream.read(); if (keyedStatePresent == 1) { ObjectInputStream ois = new ObjectInputStream(inStream); - this.restoredKeyedState = Collections.singletonList((KeyGroupsStateHandle) ois.readObject()); + this.restoredKeyedState = Collections.singletonList((KeyedStateHandle) ois.readObject()); } } } - private static boolean hasMigrationHandles(Collection<KeyGroupsStateHandle> allKeyGroupsHandles) { - for (KeyGroupsStateHandle handle : allKeyGroupsHandles) { + private static boolean hasMigrationHandles(Collection<KeyedStateHandle> allKeyGroupsHandles) { + for (KeyedStateHandle handle : allKeyGroupsHandles) { if (handle instanceof Migration) { return true; } @@ -225,17 +226,17 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> keyGroupPartitions.get(subtaskIndex); restoredKeyedState = null; - Collection<KeyGroupsStateHandle> managedKeyedState = operatorStateHandles.getManagedKeyedState(); + Collection<KeyedStateHandle> managedKeyedState = operatorStateHandles.getManagedKeyedState(); if (managedKeyedState != null) { // if we have migration handles, don't reshuffle state and preserve // the migration tag if (hasMigrationHandles(managedKeyedState)) { - List<KeyGroupsStateHandle> result = new ArrayList<>(managedKeyedState.size()); + List<KeyedStateHandle> result = new ArrayList<>(managedKeyedState.size()); result.addAll(managedKeyedState); restoredKeyedState = result; } else { - restoredKeyedState = StateAssignmentOperation.getKeyGroupsStateHandles( + restoredKeyedState = StateAssignmentOperation.getKeyedStateHandles( managedKeyedState, localKeyGroupRange); } http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java index 8e76f70..41a083a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.mockito.invocation.InvocationOnMock; @@ -50,7 +51,7 @@ public class KeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT> // when we restore we keep the state here so that we can call restore // when the operator requests the keyed state backend - private Collection<KeyGroupsStateHandle> restoredKeyedState = null; + private Collection<KeyedStateHandle> restoredKeyedState = null; public KeyedTwoInputStreamOperatorTestHarness( TwoInputStreamOperator<IN1, IN2, OUT> operator,
