http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java index 0eb140a..b33a69e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java @@ -38,10 +38,13 @@ import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.OperatorStreamStateHandle; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.StatePartitionStreamProvider; +import org.apache.flink.runtime.state.TaskLocalStateStore; import org.apache.flink.runtime.state.TaskStateManager; import org.apache.flink.runtime.state.TaskStateManagerImplTest; +import org.apache.flink.runtime.state.TestTaskLocalStateStore; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.runtime.taskmanager.TestCheckpointResponder; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; @@ -56,8 +59,6 @@ import java.io.IOException; import java.util.Collections; import java.util.Random; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -91,19 +92,6 @@ public class StreamTaskStateInitializerImplTest { typeSerializer, closeableRegistry); - verify(stateBackend).createKeyedStateBackend( - any(Environment.class), - any(JobID.class), - any(String.class), - eq(typeSerializer), - anyInt(), - any(KeyGroupRange.class), - any(TaskKvStateRegistry.class)); - - verify(stateBackend).createOperatorStateBackend( - any(Environment.class), - any(String.class)); - OperatorStateBackend operatorStateBackend = stateContext.operatorStateBackend(); AbstractKeyedStateBackend<?> keyedStateBackend = stateContext.keyedStateBackend(); InternalTimeServiceManager<?, ?> timeServiceManager = stateContext.internalTimerServiceManager(); @@ -124,13 +112,8 @@ public class StreamTaskStateInitializerImplTest { keyedStateInputs, operatorStateInputs); - for (KeyGroupStatePartitionStreamProvider keyedStateInput : keyedStateInputs) { - Assert.fail(); - } - - for (StatePartitionStreamProvider operatorStateInput : operatorStateInputs) { - Assert.fail(); - } + Assert.assertFalse(keyedStateInputs.iterator().hasNext()); + Assert.assertFalse(operatorStateInputs.iterator().hasNext()); } @SuppressWarnings("unchecked") @@ -172,14 +155,14 @@ public class StreamTaskStateInitializerImplTest { Random random = new Random(0x42); OperatorSubtaskState operatorSubtaskState = new OperatorSubtaskState( - new OperatorStateHandle( + new OperatorStreamStateHandle( Collections.singletonMap( "a", new OperatorStateHandle.StateMetaInfo( new long[]{0, 10}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE)), CheckpointTestUtils.createDummyStreamStateHandle(random)), - new OperatorStateHandle( + new OperatorStreamStateHandle( Collections.singletonMap( "_default_", new OperatorStateHandle.StateMetaInfo( @@ -209,19 +192,6 @@ public class StreamTaskStateInitializerImplTest { typeSerializer, closeableRegistry); - verify(mockingBackend).createKeyedStateBackend( - any(Environment.class), - any(JobID.class), - any(String.class), - eq(typeSerializer), - anyInt(), - any(KeyGroupRange.class), - any(TaskKvStateRegistry.class)); - - verify(mockingBackend).createOperatorStateBackend( - any(Environment.class), - any(String.class)); - OperatorStateBackend operatorStateBackend = stateContext.operatorStateBackend(); AbstractKeyedStateBackend<?> keyedStateBackend = stateContext.keyedStateBackend(); InternalTimeServiceManager<?, ?> timeServiceManager = stateContext.internalTimerServiceManager(); @@ -276,11 +246,14 @@ public class StreamTaskStateInitializerImplTest { ExecutionAttemptID executionAttemptID = new ExecutionAttemptID(23L, 24L); TestCheckpointResponder checkpointResponderMock = new TestCheckpointResponder(); + TaskLocalStateStore taskLocalStateStore = new TestTaskLocalStateStore(); + TaskStateManager taskStateManager = TaskStateManagerImplTest.taskStateManager( jobID, executionAttemptID, checkpointResponderMock, - jobManagerTaskRestore); + jobManagerTaskRestore, + taskLocalStateStore); DummyEnvironment dummyEnvironment = new DummyEnvironment("test-task", 1, 0); dummyEnvironment.setTaskStateManager(taskStateManager);
http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java index 8e80f44..35e2fbd 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java @@ -538,7 +538,7 @@ public class AsyncWaitOperatorTest extends TestLogger { testHarness.waitForTaskCompletion(); // set the operator state from previous attempt into the restored one - TaskStateSnapshot subtaskStates = taskStateManagerMock.getLastTaskStateSnapshot(); + TaskStateSnapshot subtaskStates = taskStateManagerMock.getLastJobManagerTaskStateSnapshot(); final OneInputStreamTaskTestHarness<Integer, Integer> restoredTaskHarness = new OneInputStreamTaskTestHarness<>( http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java index 6926480..6a13f11 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java @@ -405,7 +405,6 @@ public class WindowOperatorTest extends TestLogger { OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L); TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator()); - testHarness.close(); testHarness = createTestHarness(operator); @@ -483,7 +482,6 @@ public class WindowOperatorTest extends TestLogger { OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L); TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator()); - testHarness.close(); testHarness = createTestHarness(operator); @@ -794,7 +792,6 @@ public class WindowOperatorTest extends TestLogger { OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L); TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator()); - testHarness.close(); expectedOutput.clear(); http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java index 499dfb1..84bcf5a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.blob.TransientBlobCache; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.StateObjectCollection; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; @@ -60,6 +61,7 @@ import org.apache.flink.runtime.state.KeyGroupRangeOffsets; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.OperatorStreamStateHandle; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.TestTaskStateManager; @@ -197,7 +199,7 @@ public class InterruptSensitiveRestoreTest { KeyGroupRangeOffsets keyGroupRangeOffsets = new KeyGroupRangeOffsets(new KeyGroupRange(0, 0)); Collection<OperatorStateHandle> operatorStateHandles = - Collections.singletonList(new OperatorStateHandle(operatorStateMetadata, state)); + Collections.singletonList(new OperatorStreamStateHandle(operatorStateMetadata, state)); List<KeyedStateHandle> keyedStateHandles = Collections.singletonList(new KeyGroupsStateHandle(keyGroupRangeOffsets, state)); @@ -220,10 +222,10 @@ public class InterruptSensitiveRestoreTest { } OperatorSubtaskState operatorSubtaskState = new OperatorSubtaskState( - operatorStateBackend, - operatorStateStream, - keyedStateFromBackend, - keyedStateFromStream); + new StateObjectCollection<>(operatorStateBackend), + new StateObjectCollection<>(operatorStateStream), + new StateObjectCollection<>(keyedStateFromBackend), + new StateObjectCollection<>(keyedStateFromStream)); JobVertexID jobVertexID = new JobVertexID(); OperatorID operatorID = OperatorID.fromJobVertexID(jobVertexID); @@ -254,7 +256,7 @@ public class InterruptSensitiveRestoreTest { TestTaskStateManager taskStateManager = new TestTaskStateManager(); taskStateManager.setReportedCheckpointId(taskRestore.getRestoreCheckpointId()); - taskStateManager.setTaskStateSnapshotsByCheckpointId( + taskStateManager.setJobManagerTaskStateSnapshotsByCheckpointId( Collections.singletonMap( taskRestore.getRestoreCheckpointId(), taskRestore.getTaskStateSnapshot())); http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java new file mode 100644 index 0000000..e35f97c --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.StateObjectCollection; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.DoneFuture; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.LocalRecoveryConfig; +import org.apache.flink.runtime.state.LocalRecoveryDirectoryProviderImpl; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.runtime.state.TaskLocalStateStore; +import org.apache.flink.runtime.state.TaskLocalStateStoreImpl; +import org.apache.flink.runtime.state.TaskStateManagerImpl; +import org.apache.flink.runtime.state.TestTaskStateManager; +import org.apache.flink.runtime.taskmanager.TestCheckpointResponder; +import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures; +import org.apache.flink.util.TestLogger; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nonnegative; +import javax.annotation.Nullable; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executor; +import java.util.concurrent.Future; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.mockito.Mockito.mock; + +/** + * Test for forwarding of state reporting to and from {@link org.apache.flink.runtime.state.TaskStateManager}. + */ +public class LocalStateForwardingTest extends TestLogger { + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + /** + * This tests the forwarding of jm and tm-local state from the futures reported by the backends, through the + * async checkpointing thread to the {@link org.apache.flink.runtime.state.TaskStateManager}. + */ + @Test + public void testReportingFromSnapshotToTaskStateManager() { + + TestTaskStateManager taskStateManager = new TestTaskStateManager(); + + StreamMockEnvironment streamMockEnvironment = new StreamMockEnvironment( + new Configuration(), + new Configuration(), + new ExecutionConfig(), + 1024 * 1024, + new MockInputSplitProvider(), + 0, + taskStateManager); + + StreamTask testStreamTask = new StreamTaskTest.NoOpStreamTask(streamMockEnvironment); + CheckpointMetaData checkpointMetaData = new CheckpointMetaData(0L, 0L); + CheckpointMetrics checkpointMetrics = new CheckpointMetrics(); + + Map<OperatorID, OperatorSnapshotFutures> snapshots = new HashMap<>(1); + OperatorSnapshotFutures osFuture = new OperatorSnapshotFutures(); + + osFuture.setKeyedStateManagedFuture(createSnapshotResult(KeyedStateHandle.class)); + osFuture.setKeyedStateRawFuture(createSnapshotResult(KeyedStateHandle.class)); + osFuture.setOperatorStateManagedFuture(createSnapshotResult(OperatorStateHandle.class)); + osFuture.setOperatorStateRawFuture(createSnapshotResult(OperatorStateHandle.class)); + + OperatorID operatorID = new OperatorID(); + snapshots.put(operatorID, osFuture); + + StreamTask.AsyncCheckpointRunnable checkpointRunnable = + new StreamTask.AsyncCheckpointRunnable( + testStreamTask, + snapshots, + checkpointMetaData, + checkpointMetrics, + 0L); + + checkpointRunnable.run(); + + TaskStateSnapshot lastJobManagerTaskStateSnapshot = taskStateManager.getLastJobManagerTaskStateSnapshot(); + TaskStateSnapshot lastTaskManagerTaskStateSnapshot = taskStateManager.getLastTaskManagerTaskStateSnapshot(); + + OperatorSubtaskState jmState = + lastJobManagerTaskStateSnapshot.getSubtaskStateByOperatorID(operatorID); + + OperatorSubtaskState tmState = + lastTaskManagerTaskStateSnapshot.getSubtaskStateByOperatorID(operatorID); + + performCheck(osFuture.getKeyedStateManagedFuture(), jmState.getManagedKeyedState(), tmState.getManagedKeyedState()); + performCheck(osFuture.getKeyedStateRawFuture(), jmState.getRawKeyedState(), tmState.getRawKeyedState()); + performCheck(osFuture.getOperatorStateManagedFuture(), jmState.getManagedOperatorState(), tmState.getManagedOperatorState()); + performCheck(osFuture.getOperatorStateRawFuture(), jmState.getRawOperatorState(), tmState.getRawOperatorState()); + } + + /** + * This tests that state that was reported to the {@link org.apache.flink.runtime.state.TaskStateManager} is also + * reported to {@link org.apache.flink.runtime.taskmanager.CheckpointResponder} and {@link TaskLocalStateStoreImpl}. + */ + @Test + public void testReportingFromTaskStateManagerToResponderAndTaskLocalStateStore() throws Exception { + + final JobID jobID = new JobID(); + final AllocationID allocationID = new AllocationID(); + final ExecutionAttemptID executionAttemptID = new ExecutionAttemptID(); + final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(42L, 4711L); + final CheckpointMetrics checkpointMetrics = new CheckpointMetrics(); + final int subtaskIdx = 42; + JobVertexID jobVertexID = new JobVertexID(); + + TaskStateSnapshot jmSnapshot = new TaskStateSnapshot(); + TaskStateSnapshot tmSnapshot = new TaskStateSnapshot(); + + final AtomicBoolean jmReported = new AtomicBoolean(false); + final AtomicBoolean tmReported = new AtomicBoolean(false); + + TestCheckpointResponder checkpointResponder = new TestCheckpointResponder() { + + @Override + public void acknowledgeCheckpoint( + JobID lJobID, + ExecutionAttemptID lExecutionAttemptID, + long lCheckpointId, + CheckpointMetrics lCheckpointMetrics, + TaskStateSnapshot lSubtaskState) { + + Assert.assertEquals(jobID, lJobID); + Assert.assertEquals(executionAttemptID, lExecutionAttemptID); + Assert.assertEquals(checkpointMetaData.getCheckpointId(), lCheckpointId); + Assert.assertEquals(checkpointMetrics, lCheckpointMetrics); + jmReported.set(true); + } + }; + + Executor executor = Executors.directExecutor(); + + LocalRecoveryDirectoryProviderImpl directoryProvider = new LocalRecoveryDirectoryProviderImpl( + temporaryFolder.newFolder(), + jobID, + jobVertexID, + subtaskIdx); + + LocalRecoveryConfig localRecoveryConfig = new LocalRecoveryConfig( + LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED, + directoryProvider); + + TaskLocalStateStore taskLocalStateStore = + new TaskLocalStateStoreImpl(jobID, allocationID, jobVertexID, subtaskIdx, localRecoveryConfig, executor) { + @Override + public void storeLocalState( + @Nonnegative long checkpointId, + @Nullable TaskStateSnapshot localState) { + + Assert.assertEquals(tmSnapshot, localState); + tmReported.set(true); + } + }; + + TaskStateManagerImpl taskStateManager = + new TaskStateManagerImpl( + jobID, + executionAttemptID, + taskLocalStateStore, + null, + checkpointResponder); + + taskStateManager.reportTaskStateSnapshots( + checkpointMetaData, + checkpointMetrics, + jmSnapshot, + tmSnapshot); + + Assert.assertTrue("Reporting for JM state was not called.", jmReported.get()); + Assert.assertTrue("Reporting for TM state was not called.", tmReported.get()); + } + + private static <T extends StateObject> void performCheck( + Future<SnapshotResult<T>> resultFuture, + StateObjectCollection<T> jmState, + StateObjectCollection<T> tmState) { + + SnapshotResult<T> snapshotResult; + try { + snapshotResult = resultFuture.get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + Assert.assertEquals( + snapshotResult.getJobManagerOwnedSnapshot(), + jmState.iterator().next()); + + Assert.assertEquals( + snapshotResult.getTaskLocalSnapshot(), + tmState.iterator().next()); + } + + private static <T extends StateObject> RunnableFuture<SnapshotResult<T>> createSnapshotResult(Class<T> clazz) { + return DoneFuture.of(SnapshotResult.withLocalState(mock(clazz), mock(clazz))); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java index 3e0459d..81fb447 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java @@ -538,20 +538,20 @@ public class OneInputStreamTaskTest extends TestLogger { restoredTaskHarness.configureForKeyedStream(keySelector, BasicTypeInfo.STRING_TYPE_INFO); - restoredTaskHarness.setTaskStateSnapshot(checkpointId, taskStateManager.getLastTaskStateSnapshot()); + restoredTaskHarness.setTaskStateSnapshot(checkpointId, taskStateManager.getLastJobManagerTaskStateSnapshot()); StreamConfig restoredTaskStreamConfig = restoredTaskHarness.getStreamConfig(); configureChainedTestingStreamOperator(restoredTaskStreamConfig, numberChainedTasks); - TaskStateSnapshot stateHandles = taskStateManager.getLastTaskStateSnapshot(); + TaskStateSnapshot stateHandles = taskStateManager.getLastJobManagerTaskStateSnapshot(); Assert.assertEquals(numberChainedTasks, stateHandles.getSubtaskStateMappings().size()); TestingStreamOperator.numberRestoreCalls = 0; // transfer state to new harness restoredTaskHarness.taskStateManager.restoreLatestCheckpointState( - taskStateManager.getTaskStateSnapshotsByCheckpointId()); + taskStateManager.getJobManagerTaskStateSnapshotsByCheckpointId()); restoredTaskHarness.invoke(); restoredTaskHarness.endInput(); restoredTaskHarness.waitForTaskCompletion(deadline.timeLeft().toMillis()); http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java index f9f4473..c66040a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java @@ -252,7 +252,7 @@ public class RestoreStreamTaskTest extends TestLogger { JobManagerTaskRestore jobManagerTaskRestore = new JobManagerTaskRestore( taskStateManager.getReportedCheckpointId(), - taskStateManager.getLastTaskStateSnapshot()); + taskStateManager.getLastJobManagerTaskStateSnapshot()); testHarness.endInput(); testHarness.waitForTaskCompletion(); http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java index 0ba081e..32de8d5 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java @@ -46,7 +46,6 @@ import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.query.TaskKvStateRegistry; -import org.apache.flink.runtime.state.TaskLocalStateStore; import org.apache.flink.runtime.state.TaskStateManager; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; @@ -159,8 +158,6 @@ public class StreamMockEnvironment implements Environment { KvStateRegistry registry = new KvStateRegistry(); this.kvStateRegistry = registry.createTaskRegistry(jobID, getJobVertexId()); - - final TaskLocalStateStore localStateStore = new TaskLocalStateStore(jobID, getJobVertexId(), subtaskIndex); } public StreamMockEnvironment( @@ -304,10 +301,11 @@ public class StreamMockEnvironment implements Environment { @Override public void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics, TaskStateSnapshot subtaskState) { - taskStateManager.reportStateHandles( + taskStateManager.reportTaskStateSnapshots( new CheckpointMetaData(checkpointId, 0L), checkpointMetrics, - subtaskState); + subtaskState, + null); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java index 24b2014..62a903b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java @@ -58,6 +58,7 @@ import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.TestTaskStateManager; import org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorage; @@ -282,10 +283,10 @@ public class StreamTaskTerminationTest extends TestLogger { } } - static class BlockingCallable implements Callable<OperatorStateHandle> { + static class BlockingCallable implements Callable<SnapshotResult<OperatorStateHandle>> { @Override - public OperatorStateHandle call() throws Exception { + public SnapshotResult<OperatorStateHandle> call() throws Exception { // notify that we have started the asynchronous checkpointed operation CHECKPOINTING_LATCH.trigger(); // wait until we have reached the StreamTask#cleanup --> This will already cancel this FutureTask http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 99d4e5b..caea662 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.StateObjectCollection; import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.clusterframework.types.AllocationID; @@ -67,15 +68,16 @@ import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.DoneFuture; -import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.OperatorStreamStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; import org.apache.flink.runtime.state.StateBackendFactory; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StatePartitionStreamProvider; -import org.apache.flink.runtime.state.TaskLocalStateStore; +import org.apache.flink.runtime.state.TaskLocalStateStoreImpl; import org.apache.flink.runtime.state.TaskStateManager; import org.apache.flink.runtime.state.TaskStateManagerImpl; import org.apache.flink.runtime.state.TestTaskStateManager; @@ -94,6 +96,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; +import org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer; import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamOperator; @@ -147,7 +150,6 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyCollectionOf; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; @@ -221,7 +223,7 @@ public class StreamTaskTest extends TestLogger { @Test public void testStateBackendLoadingAndClosing() throws Exception { Configuration taskManagerConfig = new Configuration(); - taskManagerConfig.setString(CheckpointingOptions.STATE_BACKEND, MockStateBackend.class.getName()); + taskManagerConfig.setString(CheckpointingOptions.STATE_BACKEND, TestMemoryStateBackendFactory.class.getName()); StreamConfig cfg = new StreamConfig(new Configuration()); cfg.setStateKeySerializer(mock(TypeSerializer.class)); @@ -253,7 +255,7 @@ public class StreamTaskTest extends TestLogger { @Test public void testStateBackendClosingOnFailure() throws Exception { Configuration taskManagerConfig = new Configuration(); - taskManagerConfig.setString(CheckpointingOptions.STATE_BACKEND, MockStateBackend.class.getName()); + taskManagerConfig.setString(CheckpointingOptions.STATE_BACKEND, TestMemoryStateBackendFactory.class.getName()); StreamConfig cfg = new StreamConfig(new Configuration()); cfg.setStateKeySerializer(mock(TypeSerializer.class)); @@ -414,7 +416,7 @@ public class StreamTaskTest extends TestLogger { OperatorSnapshotFutures operatorSnapshotResult2 = mock(OperatorSnapshotFutures.class); OperatorSnapshotFutures operatorSnapshotResult3 = mock(OperatorSnapshotFutures.class); - RunnableFuture<OperatorStateHandle> failingFuture = mock(RunnableFuture.class); + RunnableFuture<SnapshotResult<OperatorStateHandle>> failingFuture = mock(RunnableFuture.class); when(failingFuture.get()).thenThrow(new ExecutionException(new Exception("Test exception"))); when(operatorSnapshotResult3.getOperatorStateRawFuture()).thenReturn(failingFuture); @@ -499,7 +501,7 @@ public class StreamTaskTest extends TestLogger { TaskStateManager taskStateManager = new TaskStateManagerImpl( new JobID(1L, 2L), new ExecutionAttemptID(1L, 2L), - mock(TaskLocalStateStore.class), + mock(TaskLocalStateStoreImpl.class), null, checkpointResponder); @@ -514,17 +516,18 @@ public class StreamTaskTest extends TestLogger { CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp); StreamOperator<?> streamOperator = mock(StreamOperator.class); + when(streamOperator.getOperatorID()).thenReturn(new OperatorID(42, 42)); KeyedStateHandle managedKeyedStateHandle = mock(KeyedStateHandle.class); KeyedStateHandle rawKeyedStateHandle = mock(KeyedStateHandle.class); - OperatorStateHandle managedOperatorStateHandle = mock(OperatorStateHandle.class); - OperatorStateHandle rawOperatorStateHandle = mock(OperatorStateHandle.class); + OperatorStateHandle managedOperatorStateHandle = mock(OperatorStreamStateHandle.class); + OperatorStateHandle rawOperatorStateHandle = mock(OperatorStreamStateHandle.class); OperatorSnapshotFutures operatorSnapshotResult = new OperatorSnapshotFutures( - new DoneFuture<>(managedKeyedStateHandle), - new DoneFuture<>(rawKeyedStateHandle), - new DoneFuture<>(managedOperatorStateHandle), - new DoneFuture<>(rawOperatorStateHandle)); + DoneFuture.of(SnapshotResult.of(managedKeyedStateHandle)), + DoneFuture.of(SnapshotResult.of(rawKeyedStateHandle)), + DoneFuture.of(SnapshotResult.of(managedOperatorStateHandle)), + DoneFuture.of(SnapshotResult.of(rawOperatorStateHandle))); when(streamOperator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult); @@ -561,10 +564,10 @@ public class StreamTaskTest extends TestLogger { OperatorSubtaskState subtaskState = subtaskStates.getSubtaskStateMappings().iterator().next().getValue(); // check that the subtask state contains the expected state handles - assertEquals(Collections.singletonList(managedKeyedStateHandle), subtaskState.getManagedKeyedState()); - assertEquals(Collections.singletonList(rawKeyedStateHandle), subtaskState.getRawKeyedState()); - assertEquals(Collections.singletonList(managedOperatorStateHandle), subtaskState.getManagedOperatorState()); - assertEquals(Collections.singletonList(rawOperatorStateHandle), subtaskState.getRawOperatorState()); + assertEquals(StateObjectCollection.singleton(managedKeyedStateHandle), subtaskState.getManagedKeyedState()); + assertEquals(StateObjectCollection.singleton(rawKeyedStateHandle), subtaskState.getRawKeyedState()); + assertEquals(StateObjectCollection.singleton(managedOperatorStateHandle), subtaskState.getManagedOperatorState()); + assertEquals(StateObjectCollection.singleton(rawOperatorStateHandle), subtaskState.getRawOperatorState()); // check that the state handles have not been discarded verify(managedKeyedStateHandle, never()).discardState(); @@ -602,26 +605,15 @@ public class StreamTaskTest extends TestLogger { Environment mockEnvironment = spy(new MockEnvironment()); - whenNew(OperatorSubtaskState.class). - withArguments( - anyCollectionOf(OperatorStateHandle.class), - anyCollectionOf(OperatorStateHandle.class), - anyCollectionOf(KeyedStateHandle.class), - anyCollectionOf(KeyedStateHandle.class)). - thenAnswer(new Answer<OperatorSubtaskState>() { - @Override - public OperatorSubtaskState answer(InvocationOnMock invocation) throws Throwable { - createSubtask.trigger(); - completeSubtask.await(); - Object[] arguments = invocation.getArguments(); - return new OperatorSubtaskState( - (OperatorStateHandle) arguments[0], - (OperatorStateHandle) arguments[1], - (KeyedStateHandle) arguments[2], - (KeyedStateHandle) arguments[3] - ); - } - }); + whenNew(OperatorSnapshotFinalizer.class). + withAnyArguments(). + thenAnswer((Answer<OperatorSnapshotFinalizer>) invocation -> { + createSubtask.trigger(); + completeSubtask.await(); + Object[] arguments = invocation.getArguments(); + return new OperatorSnapshotFinalizer((OperatorSnapshotFutures) arguments[0]); + } + ); StreamTask<?, ?> streamTask = new EmptyStreamTask(mockEnvironment); CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp); @@ -632,14 +624,14 @@ public class StreamTaskTest extends TestLogger { KeyedStateHandle managedKeyedStateHandle = mock(KeyedStateHandle.class); KeyedStateHandle rawKeyedStateHandle = mock(KeyedStateHandle.class); - OperatorStateHandle managedOperatorStateHandle = mock(OperatorStateHandle.class); - OperatorStateHandle rawOperatorStateHandle = mock(OperatorStateHandle.class); + OperatorStateHandle managedOperatorStateHandle = mock(OperatorStreamStateHandle.class); + OperatorStateHandle rawOperatorStateHandle = mock(OperatorStreamStateHandle.class); OperatorSnapshotFutures operatorSnapshotResult = new OperatorSnapshotFutures( - new DoneFuture<>(managedKeyedStateHandle), - new DoneFuture<>(rawKeyedStateHandle), - new DoneFuture<>(managedOperatorStateHandle), - new DoneFuture<>(rawOperatorStateHandle)); + DoneFuture.of(SnapshotResult.of(managedKeyedStateHandle)), + DoneFuture.of(SnapshotResult.of(rawKeyedStateHandle)), + DoneFuture.of(SnapshotResult.of(managedOperatorStateHandle)), + DoneFuture.of(SnapshotResult.of(rawOperatorStateHandle))); when(streamOperator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult); @@ -722,7 +714,7 @@ public class StreamTaskTest extends TestLogger { TaskStateManager taskStateManager = new TaskStateManagerImpl( new JobID(1L, 2L), new ExecutionAttemptID(1L, 2L), - mock(TaskLocalStateStore.class), + mock(TaskLocalStateStoreImpl.class), null, checkpointResponder); @@ -823,7 +815,13 @@ public class StreamTaskTest extends TestLogger { // Test Utilities // ------------------------------------------------------------------------ - private static class NoOpStreamTask<T, OP extends StreamOperator<T>> extends StreamTask<T, OP> { + /** + * Operator that does nothing. + * + * @param <T> + * @param <OP> + */ + public static class NoOpStreamTask<T, OP extends StreamOperator<T>> extends StreamTask<T, OP> { public NoOpStreamTask(Environment environment) { super(environment, null); @@ -898,9 +896,17 @@ public class StreamTaskTest extends TestLogger { } public static Task createTask( + Class<? extends AbstractInvokable> invokable, + StreamConfig taskConfig, + Configuration taskManagerConfig) throws Exception { + return createTask(invokable, taskConfig, taskManagerConfig, new TestTaskStateManager()); + } + + public static Task createTask( Class<? extends AbstractInvokable> invokable, StreamConfig taskConfig, - Configuration taskManagerConfig) throws Exception { + Configuration taskManagerConfig, + TestTaskStateManager taskStateManager) throws Exception { BlobCacheService blobService = new BlobCacheService(mock(PermanentBlobCache.class), mock(TransientBlobCache.class)); @@ -951,7 +957,7 @@ public class StreamTaskTest extends TestLogger { mock(IOManager.class), network, mock(BroadcastVariableManager.class), - new TestTaskStateManager(), + taskStateManager, mock(TaskManagerActions.class), mock(InputSplitProvider.class), mock(CheckpointResponder.class), @@ -1021,22 +1027,16 @@ public class StreamTaskTest extends TestLogger { /** * Mocked state backend factory which returns mocks for the operator and keyed state backends. */ - public static final class MockStateBackend implements StateBackendFactory<AbstractStateBackend> { + public static final class TestMemoryStateBackendFactory implements StateBackendFactory<AbstractStateBackend> { private static final long serialVersionUID = 1L; @Override public AbstractStateBackend createFromConfig(Configuration config) { - return new MemoryStateBackend() { - @Override - public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier) throws Exception { - return spy(super.createOperatorStateBackend(env, operatorIdentifier)); - } + return new TestSpyWrapperStateBackend(createInnerBackend(config)); + } - @Override - public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(Environment env, JobID jobID, String operatorIdentifier, TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry) { - return spy(super.createKeyedStateBackend(env, jobID, operatorIdentifier, keySerializer, numberOfKeyGroups, keyGroupRange, kvStateRegistry)); - } - }; + protected AbstractStateBackend createInnerBackend(Configuration config) { + return new MemoryStateBackend(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java index bcb833e..08032bd 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java @@ -141,7 +141,7 @@ public class StreamTaskTestHarness<OUT> { public void setTaskStateSnapshot(long checkpointId, TaskStateSnapshot taskStateSnapshot) { taskStateManager.setReportedCheckpointId(checkpointId); - taskStateManager.setTaskStateSnapshotsByCheckpointId( + taskStateManager.setJobManagerTaskStateSnapshotsByCheckpointId( Collections.singletonMap(checkpointId, taskStateSnapshot)); } http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java index 6df33b7..3d3b28e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java @@ -61,6 +61,7 @@ import org.apache.flink.runtime.state.DefaultOperatorStateBackend; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream; import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.runtime.state.StreamStateHandle; @@ -81,9 +82,10 @@ import org.apache.flink.util.TestLogger; import org.junit.Assert; import org.junit.Test; +import javax.annotation.Nullable; + import java.io.IOException; import java.util.Collections; -import java.util.concurrent.Callable; import java.util.concurrent.FutureTask; import java.util.concurrent.RunnableFuture; @@ -303,7 +305,7 @@ public class TaskCheckpointingBehaviourTest extends TestLogger { env.getExecutionConfig(), true) { @Override - public RunnableFuture<OperatorStateHandle> snapshot( + public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot( long checkpointId, long timestamp, CheckpointStreamFactory streamFactory, @@ -332,17 +334,14 @@ public class TaskCheckpointingBehaviourTest extends TestLogger { env.getExecutionConfig(), true) { @Override - public RunnableFuture<OperatorStateHandle> snapshot( + public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot( long checkpointId, long timestamp, CheckpointStreamFactory streamFactory, CheckpointOptions checkpointOptions) throws Exception { - return new FutureTask<>(new Callable<OperatorStateHandle>() { - @Override - public OperatorStateHandle call() throws Exception { - throw new Exception("Async part snapshot exception."); - } + return new FutureTask<>(() -> { + throw new Exception("Async part snapshot exception."); }); } }; @@ -364,6 +363,7 @@ public class TaskCheckpointingBehaviourTest extends TestLogger { private final Object lock = new Object(); private volatile boolean closed; + @Nullable @Override public StreamStateHandle closeAndGetHandle() throws IOException { throw new UnsupportedOperationException(); http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSpyWrapperStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSpyWrapperStateBackend.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSpyWrapperStateBackend.java new file mode 100644 index 0000000..914326b --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSpyWrapperStateBackend.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.CheckpointStorage; +import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; + +import static org.powermock.api.mockito.PowerMockito.spy; + +/** + * This class wraps an {@link AbstractStateBackend} and enriches all the created objects as spies. + */ +public class TestSpyWrapperStateBackend extends AbstractStateBackend { + + private final AbstractStateBackend delegate; + + public TestSpyWrapperStateBackend(AbstractStateBackend delegate) { + this.delegate = Preconditions.checkNotNull(delegate); + } + + @Override + public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend( + Environment env, + JobID jobID, + String operatorIdentifier, + TypeSerializer<K> keySerializer, + int numberOfKeyGroups, + KeyGroupRange keyGroupRange, + TaskKvStateRegistry kvStateRegistry) throws IOException { + return spy(delegate.createKeyedStateBackend( + env, + jobID, + operatorIdentifier, + keySerializer, + numberOfKeyGroups, + keyGroupRange, + kvStateRegistry)); + } + + @Override + public OperatorStateBackend createOperatorStateBackend( + Environment env, String operatorIdentifier) throws Exception { + return spy(delegate.createOperatorStateBackend(env, operatorIdentifier)); + } + + @Override + public CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) throws IOException { + return spy(delegate.resolveCheckpoint(externalPointer)); + } + + @Override + public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException { + return spy(delegate.createCheckpointStorage(jobId)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java index 28ad930..ed2da18 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.checkpoint.OperatorStateRepartitioner; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.checkpoint.RoundRobinOperatorStateRepartitioner; import org.apache.flink.runtime.checkpoint.StateAssignmentOperation; +import org.apache.flink.runtime.checkpoint.StateObjectCollection; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.OperatorID; @@ -47,6 +48,7 @@ import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.AbstractStreamOperatorTest; +import org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer; import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamOperator; @@ -60,7 +62,6 @@ import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; -import org.apache.flink.util.FutureUtil; import org.apache.flink.util.OutputTag; import org.apache.flink.util.Preconditions; @@ -320,11 +321,27 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable { * subtask. */ public void initializeState(OperatorSubtaskState operatorStateHandles) throws Exception { + initializeState(operatorStateHandles, null); + } + + /** + * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#initializeState()}. + * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)} + * if it was not called before. + * + * @param jmOperatorStateHandles the primary state (owned by JM) + * @param tmOperatorStateHandles the (optional) local state (owned by TM) or null. + * @throws Exception + */ + public void initializeState( + OperatorSubtaskState jmOperatorStateHandles, + OperatorSubtaskState tmOperatorStateHandles) throws Exception { + if (!setupCalled) { setup(); } - if (operatorStateHandles != null) { + if (jmOperatorStateHandles != null) { int numKeyGroups = getEnvironment().getTaskInfo().getMaxNumberOfParallelSubtasks(); int numSubtasks = getEnvironment().getTaskInfo().getNumberOfParallelSubtasks(); int subtaskIndex = getEnvironment().getTaskInfo().getIndexOfThisSubtask(); @@ -332,53 +349,54 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable { // create a new OperatorStateHandles that only contains the state for our key-groups List<KeyGroupRange> keyGroupPartitions = StateAssignmentOperation.createKeyGroupPartitions( - numKeyGroups, - numSubtasks); + numKeyGroups, + numSubtasks); - KeyGroupRange localKeyGroupRange = - keyGroupPartitions.get(subtaskIndex); + KeyGroupRange localKeyGroupRange = keyGroupPartitions.get(subtaskIndex); - List<KeyedStateHandle> localManagedKeyGroupState = null; - if (operatorStateHandles.getManagedKeyedState() != null) { - localManagedKeyGroupState = StateAssignmentOperation.getKeyedStateHandles( - operatorStateHandles.getManagedKeyedState(), - localKeyGroupRange); - } + List<KeyedStateHandle> localManagedKeyGroupState = StateAssignmentOperation.getKeyedStateHandles( + jmOperatorStateHandles.getManagedKeyedState(), + localKeyGroupRange); - List<KeyedStateHandle> localRawKeyGroupState = null; - if (operatorStateHandles.getRawKeyedState() != null) { - localRawKeyGroupState = StateAssignmentOperation.getKeyedStateHandles( - operatorStateHandles.getRawKeyedState(), - localKeyGroupRange); - } + List<KeyedStateHandle> localRawKeyGroupState = StateAssignmentOperation.getKeyedStateHandles( + jmOperatorStateHandles.getRawKeyedState(), + localKeyGroupRange); List<OperatorStateHandle> managedOperatorState = new ArrayList<>(); - if (operatorStateHandles.getManagedOperatorState() != null) { - managedOperatorState.addAll(operatorStateHandles.getManagedOperatorState()); - } + + managedOperatorState.addAll(jmOperatorStateHandles.getManagedOperatorState()); + Collection<OperatorStateHandle> localManagedOperatorState = operatorStateRepartitioner.repartitionState( - managedOperatorState, - numSubtasks).get(subtaskIndex); + managedOperatorState, + numSubtasks).get(subtaskIndex); List<OperatorStateHandle> rawOperatorState = new ArrayList<>(); - if (operatorStateHandles.getRawOperatorState() != null) { - rawOperatorState.addAll(operatorStateHandles.getRawOperatorState()); - } + + rawOperatorState.addAll(jmOperatorStateHandles.getRawOperatorState()); + Collection<OperatorStateHandle> localRawOperatorState = operatorStateRepartitioner.repartitionState( - rawOperatorState, - numSubtasks).get(subtaskIndex); + rawOperatorState, + numSubtasks).get(subtaskIndex); - OperatorSubtaskState operatorSubtaskState = new OperatorSubtaskState( - nullToEmptyCollection(localManagedOperatorState), - nullToEmptyCollection(localRawOperatorState), - nullToEmptyCollection(localManagedKeyGroupState), - nullToEmptyCollection(localRawKeyGroupState)); + OperatorSubtaskState processedJmOpSubtaskState = new OperatorSubtaskState( + new StateObjectCollection<>(nullToEmptyCollection(localManagedOperatorState)), + new StateObjectCollection<>(nullToEmptyCollection(localRawOperatorState)), + new StateObjectCollection<>(nullToEmptyCollection(localManagedKeyGroupState)), + new StateObjectCollection<>(nullToEmptyCollection(localRawKeyGroupState))); - TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot(); - taskStateSnapshot.putSubtaskStateByOperatorID(operator.getOperatorID(), operatorSubtaskState); + TaskStateSnapshot jmTaskStateSnapshot = new TaskStateSnapshot(); + jmTaskStateSnapshot.putSubtaskStateByOperatorID(operator.getOperatorID(), processedJmOpSubtaskState); taskStateManager.setReportedCheckpointId(0); - taskStateManager.setTaskStateSnapshotsByCheckpointId(Collections.singletonMap(0L, taskStateSnapshot)); + taskStateManager.setJobManagerTaskStateSnapshotsByCheckpointId( + Collections.singletonMap(0L, jmTaskStateSnapshot)); + + if (tmOperatorStateHandles != null) { + TaskStateSnapshot tmTaskStateSnapshot = new TaskStateSnapshot(); + tmTaskStateSnapshot.putSubtaskStateByOperatorID(operator.getOperatorID(), tmOperatorStateHandles); + taskStateManager.setTaskManagerTaskStateSnapshotsByCheckpointId( + Collections.singletonMap(0L, tmTaskStateSnapshot)); + } } operator.initializeState(); @@ -422,35 +440,24 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable { List<KeyedStateHandle> mergedManagedKeyedState = new ArrayList<>(handles.length); List<KeyedStateHandle> mergedRawKeyedState = new ArrayList<>(handles.length); - for (OperatorSubtaskState handle: handles) { + for (OperatorSubtaskState handle : handles) { Collection<OperatorStateHandle> managedOperatorState = handle.getManagedOperatorState(); Collection<OperatorStateHandle> rawOperatorState = handle.getRawOperatorState(); Collection<KeyedStateHandle> managedKeyedState = handle.getManagedKeyedState(); Collection<KeyedStateHandle> rawKeyedState = handle.getRawKeyedState(); - if (managedOperatorState != null) { - mergedManagedOperatorState.addAll(managedOperatorState); - } - - if (rawOperatorState != null) { - mergedRawOperatorState.addAll(rawOperatorState); - } - - if (managedKeyedState != null) { - mergedManagedKeyedState.addAll(managedKeyedState); - } - - if (rawKeyedState != null) { - mergedRawKeyedState.addAll(rawKeyedState); - } + mergedManagedOperatorState.addAll(managedOperatorState); + mergedRawOperatorState.addAll(rawOperatorState); + mergedManagedKeyedState.addAll(managedKeyedState); + mergedRawKeyedState.addAll(rawKeyedState); } return new OperatorSubtaskState( - mergedManagedOperatorState, - mergedRawOperatorState, - mergedManagedKeyedState, - mergedRawKeyedState); + new StateObjectCollection<>(mergedManagedOperatorState), + new StateObjectCollection<>(mergedRawOperatorState), + new StateObjectCollection<>(mergedManagedKeyedState), + new StateObjectCollection<>(mergedRawKeyedState)); } /** @@ -469,6 +476,13 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable { * Calls {@link StreamOperator#snapshotState(long, long, CheckpointOptions, org.apache.flink.runtime.state.CheckpointStreamFactory)}. */ public OperatorSubtaskState snapshot(long checkpointId, long timestamp) throws Exception { + return snapshotWithLocalState(checkpointId, timestamp).getJobManagerOwnedState(); + } + + /** + * Calls {@link StreamOperator#snapshotState(long, long, CheckpointOptions)}. + */ + public OperatorSnapshotFinalizer snapshotWithLocalState(long checkpointId, long timestamp) throws Exception { OperatorSnapshotFutures operatorStateResult = operator.snapshotState( checkpointId, @@ -476,17 +490,7 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable { CheckpointOptions.forCheckpointWithDefaultLocation(), checkpointStorage.resolveCheckpointStorageLocation(checkpointId, CheckpointStorageLocationReference.getDefault())); - KeyedStateHandle keyedManaged = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateManagedFuture()); - KeyedStateHandle keyedRaw = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateRawFuture()); - - OperatorStateHandle opManaged = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getOperatorStateManagedFuture()); - OperatorStateHandle opRaw = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getOperatorStateRawFuture()); - - return new OperatorSubtaskState( - opManaged != null ? Collections.singletonList(opManaged) : Collections.emptyList(), - opRaw != null ? Collections.singletonList(opRaw) : Collections.emptyList(), - keyedManaged != null ? Collections.singletonList(keyedManaged) : Collections.emptyList(), - keyedRaw != null ? Collections.singletonList(keyedRaw) : Collections.emptyList()); + return new OperatorSnapshotFinalizer(operatorStateResult); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java index 8d37266..1b5113d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.util; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.StateObjectCollection; import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1Serializer; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; @@ -50,7 +51,7 @@ public class OperatorSnapshotUtil { try (DataOutputStream dos = new DataOutputStream(out)) { - // must be here for compatibility + // required for backwards compatibility. dos.writeInt(0); // still required for compatibility @@ -108,15 +109,16 @@ public class OperatorSnapshotUtil { FileInputStream in = new FileInputStream(path); try (DataInputStream dis = new DataInputStream(in)) { - // ignored + // required for backwards compatibility. dis.readInt(); // still required for compatibility to consume the bytes. SavepointV1Serializer.deserializeStreamStateHandle(dis); - List<OperatorStateHandle> rawOperatorState = new ArrayList<>(); + List<OperatorStateHandle> rawOperatorState = null; int numRawOperatorStates = dis.readInt(); if (numRawOperatorStates >= 0) { + rawOperatorState = new ArrayList<>(); for (int i = 0; i < numRawOperatorStates; i++) { OperatorStateHandle operatorState = SavepointV1Serializer.deserializeOperatorStateHandle( dis); @@ -124,9 +126,10 @@ public class OperatorSnapshotUtil { } } - List<OperatorStateHandle> managedOperatorState = new ArrayList<>(); + List<OperatorStateHandle> managedOperatorState = null; int numManagedOperatorStates = dis.readInt(); if (numManagedOperatorStates >= 0) { + managedOperatorState = new ArrayList<>(); for (int i = 0; i < numManagedOperatorStates; i++) { OperatorStateHandle operatorState = SavepointV1Serializer.deserializeOperatorStateHandle( dis); @@ -134,9 +137,10 @@ public class OperatorSnapshotUtil { } } - List<KeyedStateHandle> rawKeyedState = new ArrayList<>(); + List<KeyedStateHandle> rawKeyedState = null; int numRawKeyedStates = dis.readInt(); if (numRawKeyedStates >= 0) { + rawKeyedState = new ArrayList<>(); for (int i = 0; i < numRawKeyedStates; i++) { KeyedStateHandle keyedState = SavepointV1Serializer.deserializeKeyedStateHandle( dis); @@ -144,9 +148,10 @@ public class OperatorSnapshotUtil { } } - List<KeyedStateHandle> managedKeyedState = new ArrayList<>(); + List<KeyedStateHandle> managedKeyedState = null; int numManagedKeyedStates = dis.readInt(); if (numManagedKeyedStates >= 0) { + managedKeyedState = new ArrayList<>(); for (int i = 0; i < numManagedKeyedStates; i++) { KeyedStateHandle keyedState = SavepointV1Serializer.deserializeKeyedStateHandle( dis); @@ -155,10 +160,10 @@ public class OperatorSnapshotUtil { } return new OperatorSubtaskState( - managedOperatorState, - rawOperatorState, - managedKeyedState, - rawKeyedState); + new StateObjectCollection<>(managedOperatorState), + new StateObjectCollection<>(rawOperatorState), + new StateObjectCollection<>(managedKeyedState), + new StateObjectCollection<>(rawKeyedState)); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java index 53d329b..557c097 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java @@ -104,7 +104,7 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog public TestName name = new TestName(); private StateBackendEnum stateBackendEnum; - private AbstractStateBackend stateBackend; + protected AbstractStateBackend stateBackend; AbstractEventTimeWindowCheckpointingITCase(StateBackendEnum stateBackendEnum) { this.stateBackendEnum = stateBackendEnum; @@ -128,23 +128,7 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog zkServer.start(); } - TemporaryFolder temporaryFolder = new TemporaryFolder(); - temporaryFolder.create(); - final File haDir = temporaryFolder.newFolder(); - - Configuration config = new Configuration(); - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM / 2); - config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 48L); - // the default network buffers size (10% of heap max =~ 150MB) seems to much for this test case - config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 80L << 20); // 80 MB - config.setString(AkkaOptions.FRAMESIZE, String.valueOf(MAX_MEM_STATE_SIZE) + "b"); - - if (zkServer != null) { - config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); - config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString()); - config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haDir.toURI().toString()); - } + Configuration config = createClusterConfig(); // purposefully delay in the executor to tease out races final ScheduledExecutorService executor = Executors.newScheduledThreadPool(10); @@ -208,6 +192,27 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog } } + protected Configuration createClusterConfig() throws IOException { + TemporaryFolder temporaryFolder = new TemporaryFolder(); + temporaryFolder.create(); + final File haDir = temporaryFolder.newFolder(); + + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM / 2); + config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 48L); + // the default network buffers size (10% of heap max =~ 150MB) seems to much for this test case + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 80L << 20); // 80 MB + config.setString(AkkaOptions.FRAMESIZE, String.valueOf(MAX_MEM_STATE_SIZE) + "b"); + + if (zkServer != null) { + config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); + config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString()); + config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haDir.toURI().toString()); + } + return config; + } + @After public void stopTestCluster() throws IOException { if (cluster != null) { http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryITCase.java new file mode 100644 index 0000000..51b3b84 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryITCase.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.TestLogger; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +import java.io.IOException; + +import static org.apache.flink.runtime.state.LocalRecoveryConfig.LocalRecoveryMode; +import static org.apache.flink.runtime.state.LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED; +import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum; +import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.FILE_ASYNC; +import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_FULLY_ASYNC; +import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK; + +/** + * This test delegates to instances of {@link AbstractEventTimeWindowCheckpointingITCase} that have been reconfigured + * to use local recovery. + */ +public class LocalRecoveryITCase extends TestLogger { + + @Rule + public TestName testName = new TestName(); + + @Test + public void testLocalRecoveryHeapBackendFileBased() throws Exception { + executeTest( + FILE_ASYNC, + ENABLE_FILE_BASED); + } + + @Test + public void testLocalRecoveryRocksIncrementalFileBased() throws Exception { + executeTest( + ROCKSDB_INCREMENTAL_ZK, + ENABLE_FILE_BASED); + } + + @Test + public void testLocalRecoveryRocksFullFileBased() throws Exception { + executeTest( + ROCKSDB_FULLY_ASYNC, + ENABLE_FILE_BASED); + } + + private void executeTest( + StateBackendEnum backendEnum, + LocalRecoveryMode recoveryMode) throws Exception { + + AbstractEventTimeWindowCheckpointingITCase windowChkITCase = + new AbstractEventTimeWindowCheckpointingITCaseWithLocalRecovery( + backendEnum, + recoveryMode); + + executeTest(windowChkITCase); + } + + private void executeTest(AbstractEventTimeWindowCheckpointingITCase delegate) throws Exception { + delegate.name = testName; + delegate.tempFolder.create(); + try { + delegate.startTestCluster(); + delegate.testTumblingTimeWindow(); + delegate.stopTestCluster(); + + delegate.startTestCluster(); + delegate.testSlidingTimeWindow(); + delegate.stopTestCluster(); + } finally { + delegate.tempFolder.delete(); + } + } + + private static class AbstractEventTimeWindowCheckpointingITCaseWithLocalRecovery + extends AbstractEventTimeWindowCheckpointingITCase { + + private final LocalRecoveryMode recoveryMode; + + AbstractEventTimeWindowCheckpointingITCaseWithLocalRecovery( + StateBackendEnum stateBackendEnum, + LocalRecoveryMode recoveryMode) { + + super(stateBackendEnum); + this.recoveryMode = recoveryMode; + } + + @Override + protected Configuration createClusterConfig() throws IOException { + Configuration config = super.createClusterConfig(); + + config.setString( + CheckpointingOptions.LOCAL_RECOVERY, + recoveryMode.toString()); + + return config; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java index 95763bf..a23c679 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java @@ -550,7 +550,6 @@ public class RescalingITCase extends TestLogger { if (savepointResponse instanceof JobManagerMessages.TriggerSavepointSuccess) { break; } - System.out.println(savepointResponse); } assertTrue(savepointResponse instanceof JobManagerMessages.TriggerSavepointSuccess); http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java index fba3b93..537f864 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java @@ -28,6 +28,7 @@ import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.state.LocalRecoveryConfig; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.testingUtils.TestingCluster; @@ -44,6 +45,7 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import java.io.File; +import java.io.IOException; import java.util.concurrent.CountDownLatch; /** @@ -69,7 +71,8 @@ public class ResumeCheckpointManuallyITCase extends TestLogger { testExternalizedCheckpoints( checkpointDir, null, - new RocksDBStateBackend(checkpointDir.toURI().toString(), true)); + createRocksDBStateBackend(checkpointDir, true), + false); } @Test @@ -78,7 +81,28 @@ public class ResumeCheckpointManuallyITCase extends TestLogger { testExternalizedCheckpoints( checkpointDir, null, - new RocksDBStateBackend(checkpointDir.toURI().toString(), false)); + createRocksDBStateBackend(checkpointDir, false), + false); + } + + @Test + public void testExternalizedIncrementalRocksDBCheckpointsWithLocalRecoveryStandalone() throws Exception { + final File checkpointDir = temporaryFolder.newFolder(); + testExternalizedCheckpoints( + checkpointDir, + null, + createRocksDBStateBackend(checkpointDir, true), + true); + } + + @Test + public void testExternalizedFullRocksDBCheckpointsWithLocalRecoveryStandalone() throws Exception { + final File checkpointDir = temporaryFolder.newFolder(); + testExternalizedCheckpoints( + checkpointDir, + null, + createRocksDBStateBackend(checkpointDir, false), + true); } @Test @@ -87,8 +111,18 @@ public class ResumeCheckpointManuallyITCase extends TestLogger { testExternalizedCheckpoints( checkpointDir, null, - new FsStateBackend(checkpointDir.toURI().toString(), true)); + createFsStateBackend(checkpointDir), + false); + } + @Test + public void testExternalizedFSCheckpointsWithLocalRecoveryStandalone() throws Exception { + final File checkpointDir = temporaryFolder.newFolder(); + testExternalizedCheckpoints( + checkpointDir, + null, + createFsStateBackend(checkpointDir), + true); } @Test @@ -100,7 +134,8 @@ public class ResumeCheckpointManuallyITCase extends TestLogger { testExternalizedCheckpoints( checkpointDir, zkServer.getConnectString(), - new RocksDBStateBackend(checkpointDir.toURI().toString(), true)); + createRocksDBStateBackend(checkpointDir, true), + false); } finally { zkServer.stop(); } @@ -115,7 +150,40 @@ public class ResumeCheckpointManuallyITCase extends TestLogger { testExternalizedCheckpoints( checkpointDir, zkServer.getConnectString(), - new RocksDBStateBackend(checkpointDir.toURI().toString(), false)); + createRocksDBStateBackend(checkpointDir, false), + false); + } finally { + zkServer.stop(); + } + } + + @Test + public void testExternalizedIncrementalRocksDBCheckpointsWithLocalRecoveryZookeeper() throws Exception { + TestingServer zkServer = new TestingServer(); + zkServer.start(); + try { + final File checkpointDir = temporaryFolder.newFolder(); + testExternalizedCheckpoints( + checkpointDir, + zkServer.getConnectString(), + createRocksDBStateBackend(checkpointDir, true), + true); + } finally { + zkServer.stop(); + } + } + + @Test + public void testExternalizedFullRocksDBCheckpointsWithLocalRecoveryZookeeper() throws Exception { + TestingServer zkServer = new TestingServer(); + zkServer.start(); + try { + final File checkpointDir = temporaryFolder.newFolder(); + testExternalizedCheckpoints( + checkpointDir, + zkServer.getConnectString(), + createRocksDBStateBackend(checkpointDir, false), + true); } finally { zkServer.stop(); } @@ -130,16 +198,45 @@ public class ResumeCheckpointManuallyITCase extends TestLogger { testExternalizedCheckpoints( checkpointDir, zkServer.getConnectString(), - new FsStateBackend(checkpointDir.toURI().toString(), true)); + createFsStateBackend(checkpointDir), + false); } finally { zkServer.stop(); } } + @Test + public void testExternalizedFSCheckpointsWithLocalRecoveryZookeeper() throws Exception { + TestingServer zkServer = new TestingServer(); + zkServer.start(); + try { + final File checkpointDir = temporaryFolder.newFolder(); + testExternalizedCheckpoints( + checkpointDir, + zkServer.getConnectString(), + createFsStateBackend(checkpointDir), + true); + } finally { + zkServer.stop(); + } + } + + private FsStateBackend createFsStateBackend(File checkpointDir) throws IOException { + return new FsStateBackend(checkpointDir.toURI().toString(), true); + } + + private RocksDBStateBackend createRocksDBStateBackend( + File checkpointDir, + boolean incrementalCheckpointing) throws IOException { + + return new RocksDBStateBackend(checkpointDir.toURI().toString(), incrementalCheckpointing); + } + private void testExternalizedCheckpoints( File checkpointDir, String zooKeeperQuorum, - StateBackend backend) throws Exception { + StateBackend backend, + boolean localRecovery) throws Exception { final Configuration config = new Configuration(); @@ -152,6 +249,12 @@ public class ResumeCheckpointManuallyITCase extends TestLogger { config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString()); config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString()); + if (localRecovery) { + config.setString( + CheckpointingOptions.LOCAL_RECOVERY, + LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED.toString()); + } + // ZooKeeper recovery mode? if (zooKeeperQuorum != null) { final File haDir = temporaryFolder.newFolder(); http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java b/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java index d65c323..2968f13 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java @@ -64,7 +64,7 @@ public class ManualWindowSpeedITCase extends AbstractTestBase { String checkpoints = tempFolder.newFolder().toURI().toString(); env.setStateBackend(new FsStateBackend(checkpoints)); - env.addSource(new InfiniteTupleSource(10_000)) + env.addSource(new InfiniteTupleSource(1_000)) .keyBy(0) .timeWindow(Time.seconds(3)) .reduce(new ReduceFunction<Tuple2<String, Integer>>() { http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java b/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java index aed6663..aaa96fb 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java @@ -348,7 +348,7 @@ public class PojoSerializerUpgradeTest extends TestLogger { boolean isKeyedState, StateBackend stateBackend, ClassLoader classLoader, - OperatorSubtaskState operatorStateHandles, + OperatorSubtaskState operatorSubtaskState, Iterable<Long> input) throws Exception { try (final MockEnvironment environment = new MockEnvironment( @@ -358,11 +358,11 @@ public class PojoSerializerUpgradeTest extends TestLogger { 256, taskConfiguration, executionConfig, + new TestTaskStateManager(), 16, 1, 0, - classLoader, - new TestTaskStateManager())) { + classLoader)) { OneInputStreamOperatorTestHarness<Long, Long> harness = null; try { @@ -379,7 +379,7 @@ public class PojoSerializerUpgradeTest extends TestLogger { harness.setStateBackend(stateBackend); harness.setup(); - harness.initializeState(operatorStateHandles); + harness.initializeState(operatorSubtaskState); harness.open(); long timestamp = 0L; http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala index e33aa49..9dd0b28 100644 --- a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala +++ b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala @@ -25,6 +25,7 @@ import org.apache.flink.runtime.io.network.NetworkEnvironment import org.apache.flink.runtime.memory.MemoryManager import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup import org.apache.flink.runtime.security.SecurityUtils +import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration import org.apache.flink.runtime.taskmanager.TaskManagerLocation import org.apache.flink.runtime.testingUtils.TestingTaskManagerLike @@ -40,6 +41,7 @@ import org.apache.flink.runtime.testingUtils.TestingTaskManagerLike * @param memoryManager MemoryManager which is responsible for Flink's managed memory allocation * @param ioManager IOManager responsible for I/O * @param network NetworkEnvironment for this actor + * @param taskManagerLocalStateStoresManager Task manager state store manager for this actor * @param numberOfSlots Number of slots for this TaskManager * @param highAvailabilityServices [[HighAvailabilityServices]] to create a leader retrieval * service for retrieving the leading JobManager @@ -51,6 +53,7 @@ class TestingYarnTaskManager( memoryManager: MemoryManager, ioManager: IOManager, network: NetworkEnvironment, + taskManagerLocalStateStoresManager: TaskExecutorLocalStateStoresManager, numberOfSlots: Int, highAvailabilityServices: HighAvailabilityServices, taskManagerMetricGroup : TaskManagerMetricGroup) @@ -61,6 +64,7 @@ class TestingYarnTaskManager( memoryManager, ioManager, network, + taskManagerLocalStateStoresManager, numberOfSlots, highAvailabilityServices, taskManagerMetricGroup) http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala index 08d24b4..d54ae9b 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala @@ -25,6 +25,7 @@ import org.apache.flink.runtime.io.network.NetworkEnvironment import org.apache.flink.runtime.memory.MemoryManager import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup import org.apache.flink.runtime.security.SecurityUtils +import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation} @@ -40,6 +41,7 @@ class YarnTaskManager( memoryManager: MemoryManager, ioManager: IOManager, network: NetworkEnvironment, + taskManagerLocalStateStoresManager: TaskExecutorLocalStateStoresManager, numberOfSlots: Int, highAvailabilityServices: HighAvailabilityServices, taskManagerMetricGroup: TaskManagerMetricGroup) @@ -50,6 +52,7 @@ class YarnTaskManager( memoryManager, ioManager, network, + taskManagerLocalStateStoresManager, numberOfSlots, highAvailabilityServices, taskManagerMetricGroup) {
