[FLINK-8360][checkpointing] Implement state storage for local recovery and integrate with task lifecycle
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/df3e6bb7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/df3e6bb7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/df3e6bb7 Branch: refs/heads/master Commit: df3e6bb7627db03635febd48eff4c10032b668ef Parents: ea0d16d Author: Stefan Richter <[email protected]> Authored: Tue Feb 6 14:44:01 2018 +0100 Committer: Stefan Richter <[email protected]> Committed: Sun Feb 25 15:14:21 2018 +0100 ---------------------------------------------------------------------- .../kafka/FlinkKafkaProducer011ITCase.java | 31 +- .../configuration/CheckpointingOptions.java | 14 + .../flink/configuration/ConfigurationUtils.java | 28 +- .../flink/core/fs/local/LocalFileSystem.java | 10 +- .../java/org/apache/flink/util/Disposable.java | 36 +++ .../flink/util/MethodForwardingTestUtil.java | 179 ++++++++++ .../flink/docs/rest/RestAPIDocGenerator.java | 4 +- .../clusterframework/MesosTaskManager.scala | 3 + .../KVStateRequestSerializerRocksDBTest.java | 7 +- .../network/KvStateRequestSerializerTest.java | 19 +- .../history/HistoryServerArchiveFetcher.java | 4 +- .../checkpoint/JobManagerTaskRestore.java | 10 +- .../checkpoint/OperatorSubtaskState.java | 97 ++---- .../PrioritizedOperatorSubtaskState.java | 297 +++++++++++++++++ .../RoundRobinOperatorStateRepartitioner.java | 22 +- .../checkpoint/StateAssignmentOperation.java | 10 +- .../checkpoint/StateObjectCollection.java | 205 ++++++++++++ .../flink/runtime/checkpoint/SubtaskState.java | 2 +- .../runtime/checkpoint/TaskStateSnapshot.java | 21 +- .../runtime/checkpoint/savepoint/Savepoint.java | 8 +- .../savepoint/SavepointV1Serializer.java | 7 +- .../checkpoint/savepoint/SavepointV2.java | 2 +- .../savepoint/SavepointV2Serializer.java | 7 +- .../flink/runtime/executiongraph/Execution.java | 2 +- .../runtime/operators/sort/InMemorySorter.java | 10 +- .../state/AbstractKeyedStateBackend.java | 10 +- .../runtime/state/CheckpointStreamFactory.java | 7 +- .../CheckpointStreamWithResultProvider.java | 221 +++++++++++++ .../state/DefaultOperatorStateBackend.java | 31 +- .../state/DirectoryKeyedStateHandle.java | 109 +++++++ .../runtime/state/DirectoryStateHandle.java | 87 +++++ .../apache/flink/runtime/state/DoneFuture.java | 18 +- .../DuplicatingCheckpointOutputStream.java | 283 ++++++++++++++++ .../state/IncrementalLocalKeyedStateHandle.java | 145 +++++++++ .../flink/runtime/state/KeyedStateBackend.java | 7 +- .../runtime/state/LocalRecoveryConfig.java | 99 ++++++ .../state/LocalRecoveryDirectoryProvider.java | 81 +++++ .../LocalRecoveryDirectoryProviderImpl.java | 138 ++++++++ .../runtime/state/MultiStreamStateHandle.java | 104 ------ .../runtime/state/OperatorStateBackend.java | 13 +- .../OperatorStateCheckpointOutputStream.java | 6 +- .../runtime/state/OperatorStateHandle.java | 116 ++----- .../state/OperatorStreamStateHandle.java | 120 +++++++ .../flink/runtime/state/SnapshotDirectory.java | 200 ++++++++++++ .../flink/runtime/state/SnapshotResult.java | 120 +++++++ .../flink/runtime/state/SnapshotStrategy.java | 51 +++ .../flink/runtime/state/Snapshotable.java | 6 +- .../StateSnapshotContextSynchronousImpl.java | 37 ++- .../TaskExecutorLocalStateStoresManager.java | 223 +++++++++++-- .../runtime/state/TaskLocalStateStore.java | 72 +++-- .../runtime/state/TaskLocalStateStoreImpl.java | 288 +++++++++++++++++ .../flink/runtime/state/TaskStateManager.java | 24 +- .../runtime/state/TaskStateManagerImpl.java | 63 +++- .../filesystem/FileBasedStateOutputStream.java | 157 +++++++++ .../filesystem/FsCheckpointStreamFactory.java | 9 +- .../state/filesystem/FsStateBackend.java | 14 +- .../state/heap/HeapKeyedStateBackend.java | 33 +- .../memory/MemCheckpointStreamFactory.java | 6 +- .../state/memory/MemoryStateBackend.java | 9 +- .../runtime/taskexecutor/TaskExecutor.java | 10 +- .../runtime/taskexecutor/TaskManagerRunner.java | 1 + .../taskexecutor/TaskManagerServices.java | 39 ++- .../TaskManagerServicesConfiguration.java | 28 ++ .../apache/flink/runtime/taskmanager/Task.java | 33 +- .../minicluster/LocalFlinkMiniCluster.scala | 5 + .../flink/runtime/taskmanager/TaskManager.scala | 40 ++- .../CheckpointCoordinatorFailureTest.java | 7 +- .../checkpoint/CheckpointCoordinatorTest.java | 23 +- .../CheckpointMetadataLoadingTest.java | 4 +- .../checkpoint/CheckpointStateRestoreTest.java | 9 +- .../CompletedCheckpointStoreTest.java | 10 +- .../PrioritizedOperatorSubtaskStateTest.java | 292 +++++++++++++++++ .../checkpoint/StateHandleDummyUtil.java | 139 ++++++++ .../checkpoint/StateObjectCollectionTest.java | 70 ++++ .../checkpoint/TaskStateSnapshotTest.java | 129 ++++++++ .../savepoint/CheckpointTestUtils.java | 26 +- .../jobmanager/JobManagerHARecoveryTest.java | 32 +- .../ZooKeeperSubmittedJobGraphsStoreITCase.java | 6 +- .../scheduler/SchedulerTestUtils.java | 5 +- .../runtime/metrics/TaskManagerMetricsTest.java | 3 + .../operators/testutils/DummyEnvironment.java | 6 + .../operators/testutils/MockEnvironment.java | 48 ++- .../CheckpointStreamWithResultProviderTest.java | 210 ++++++++++++ .../DuplicatingCheckpointOutputStreamTest.java | 310 ++++++++++++++++++ .../LocalRecoveryDirectoryProviderImplTest.java | 122 +++++++ .../runtime/state/MemoryStateBackendTest.java | 28 +- .../state/MultiStreamStateHandleTest.java | 135 -------- .../runtime/state/OperatorStateBackendTest.java | 65 ++-- .../runtime/state/OperatorStateHandleTest.java | 40 --- .../state/OperatorStreamStateHandleTest.java | 40 +++ .../runtime/state/SnapshotDirectoryTest.java | 206 ++++++++++++ .../flink/runtime/state/SnapshotResultTest.java | 68 ++++ .../runtime/state/StateBackendTestBase.java | 104 +++--- .../state/StateSnapshotCompressionTest.java | 23 +- ...TaskExecutorLocalStateStoresManagerTest.java | 224 +++++++++++++ .../state/TaskLocalStateStoreImplTest.java | 174 ++++++++++ .../runtime/state/TaskStateManagerImplTest.java | 190 +++++++++-- .../runtime/state/TestLocalRecoveryConfig.java | 69 ++++ .../state/TestMemoryCheckpointOutputStream.java | 5 +- .../runtime/state/TestTaskLocalStateStore.java | 113 +++++++ .../runtime/state/TestTaskStateManager.java | 125 +++++-- ...ractCheckpointStateOutputStreamTestBase.java | 324 +++++++++++++++++++ .../FileBasedStateOutputStreamTest.java | 41 +++ .../FsCheckpointMetadataOutputStreamTest.java | 282 +--------------- ...pKeyedStateBackendSnapshotMigrationTest.java | 5 +- .../state/heap/HeapStateBackendTestBase.java | 5 +- .../state/testutils/BackendForTestStream.java | 4 +- .../NetworkBufferCalculationTest.java | 3 + .../taskexecutor/TaskExecutorITCase.java | 12 + .../runtime/taskexecutor/TaskExecutorTest.java | 80 +++++ .../TaskManagerServicesBuilder.java | 3 +- ...askManagerComponentsStartupShutdownTest.java | 9 + .../taskmanager/TaskManagerStartupTest.java | 19 +- .../util/BlockerCheckpointStreamFactory.java | 85 +---- .../util/BlockingCheckpointOutputStream.java | 202 ++++++++++++ .../runtime/util/BlockingFSDataInputStream.java | 196 +++++++++++ .../runtime/util/JvmExitOnFatalErrorTest.java | 21 +- .../TestByteStreamStateHandleDeepCompare.java | 54 ---- .../testingUtils/TestingTaskManager.scala | 5 + .../state/RocksDBKeyedStateBackend.java | 39 ++- .../streaming/state/RocksDBStateBackend.java | 41 +-- .../state/RocksDBAsyncSnapshotTest.java | 21 +- .../state/RocksDBStateBackendConfigTest.java | 4 + .../state/RocksDBStateBackendTest.java | 28 +- .../api/operators/AbstractStreamOperator.java | 5 +- .../api/operators/BackendRestorerProcedure.java | 146 +++++++++ .../operators/OperatorSnapshotFinalizer.java | 80 +++++ .../api/operators/OperatorSnapshotFutures.java | 69 ++-- .../streaming/api/operators/StreamOperator.java | 4 +- .../operators/StreamOperatorStateContext.java | 1 - .../StreamTaskStateInitializerImpl.java | 204 +++++------- .../streaming/runtime/tasks/StreamTask.java | 129 +++++--- .../operators/AbstractStreamOperatorTest.java | 7 +- .../operators/BackendRestorerProcedureTest.java | 206 ++++++++++++ .../OperatorSnapshotFinalizerTest.java | 130 ++++++++ .../operators/OperatorSnapshotFuturesTest.java | 37 ++- .../StateInitializationContextImplTest.java | 17 +- .../StreamTaskStateInitializerImplTest.java | 49 +-- .../operators/async/AsyncWaitOperatorTest.java | 2 +- .../operators/windowing/WindowOperatorTest.java | 3 - .../tasks/InterruptSensitiveRestoreTest.java | 14 +- .../runtime/tasks/LocalStateForwardingTest.java | 238 ++++++++++++++ .../runtime/tasks/OneInputStreamTaskTest.java | 6 +- .../runtime/tasks/RestoreStreamTaskTest.java | 2 +- .../runtime/tasks/StreamMockEnvironment.java | 8 +- .../tasks/StreamTaskTerminationTest.java | 5 +- .../streaming/runtime/tasks/StreamTaskTest.java | 116 +++---- .../runtime/tasks/StreamTaskTestHarness.java | 2 +- .../tasks/TaskCheckpointingBehaviourTest.java | 16 +- .../tasks/TestSpyWrapperStateBackend.java | 82 +++++ .../util/AbstractStreamOperatorTestHarness.java | 138 ++++---- .../streaming/util/OperatorSnapshotUtil.java | 25 +- ...tractEventTimeWindowCheckpointingITCase.java | 41 +-- .../test/checkpointing/LocalRecoveryITCase.java | 120 +++++++ .../test/checkpointing/RescalingITCase.java | 1 - .../ResumeCheckpointManuallyITCase.java | 117 ++++++- .../test/state/ManualWindowSpeedITCase.java | 2 +- .../PojoSerializerUpgradeTest.java | 8 +- .../flink/yarn/TestingYarnTaskManager.scala | 4 + .../org/apache/flink/yarn/YarnTaskManager.scala | 3 + 160 files changed, 8806 insertions(+), 1872 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java index e76cf13..81bf0bf 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.serialization.TypeInformationSerializationSch import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.StateObjectCollection; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; @@ -389,34 +390,34 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase { final int parallelism3 = 3; final int maxParallelism = Math.max(parallelism1, Math.max(parallelism2, parallelism3)); - List<OperatorStateHandle> operatorStateHandles = repartitionAndExecute( + List<OperatorStateHandle> operatorSubtaskState = repartitionAndExecute( topic, Collections.emptyList(), parallelism1, maxParallelism, IntStream.range(0, parallelism1).boxed().iterator()); - operatorStateHandles = repartitionAndExecute( + operatorSubtaskState = repartitionAndExecute( topic, - operatorStateHandles, + operatorSubtaskState, parallelism2, maxParallelism, IntStream.range(parallelism1, parallelism1 + parallelism2).boxed().iterator()); - operatorStateHandles = repartitionAndExecute( + operatorSubtaskState = repartitionAndExecute( topic, - operatorStateHandles, + operatorSubtaskState, parallelism3, maxParallelism, IntStream.range(parallelism1 + parallelism2, parallelism1 + parallelism2 + parallelism3).boxed().iterator()); // After each previous repartitionAndExecute call, we are left with some lingering transactions, that would // not allow us to read all committed messages from the topic. Thus we initialize operators from - // operatorStateHandles once more, but without any new data. This should terminate all ongoing transactions. + // OperatorSubtaskState once more, but without any new data. This should terminate all ongoing transactions. - operatorStateHandles = repartitionAndExecute( + operatorSubtaskState = repartitionAndExecute( topic, - operatorStateHandles, + operatorSubtaskState, 1, maxParallelism, Collections.emptyIterator()); @@ -448,10 +449,10 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase { testHarness.setup(); testHarness.initializeState(new OperatorSubtaskState( - inputStates, - Collections.emptyList(), - Collections.emptyList(), - Collections.emptyList())); + new StateObjectCollection<>(inputStates), + StateObjectCollection.empty(), + StateObjectCollection.empty(), + StateObjectCollection.empty())); testHarness.open(); if (inputData.hasNext()) { @@ -460,9 +461,9 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase { OperatorSubtaskState snapshot = testHarness.snapshot(0, 0); outputStates.addAll(snapshot.getManagedOperatorState()); - checkState(snapshot.getRawOperatorState() == null, "Unexpected raw operator state"); - checkState(snapshot.getManagedKeyedState() == null, "Unexpected managed keyed state"); - checkState(snapshot.getRawKeyedState() == null, "Unexpected raw keyed state"); + checkState(snapshot.getRawOperatorState().isEmpty(), "Unexpected raw operator state"); + checkState(snapshot.getManagedKeyedState().isEmpty(), "Unexpected managed keyed state"); + checkState(snapshot.getRawKeyedState().isEmpty(), "Unexpected raw keyed state"); for (int i = 1; i < FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE - 1; i++) { testHarness.processElement(-nextValue, 0); http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java index c89aff2..c6af7dd 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java @@ -66,6 +66,20 @@ public class CheckpointingOptions { " complete checkpoint state. Some state backends may not support incremental checkpoints and ignore" + " this option."); + /** + * This option configures local recovery for this state backend. + */ + public static final ConfigOption<String> LOCAL_RECOVERY = ConfigOptions + .key("state.backend.local-recovery") + .defaultValue("DISABLED"); + + /** + * The config parameter defining the root directories for storing file-based state for local recovery. + */ + public static final ConfigOption<String> LOCAL_RECOVERY_TASK_MANAGER_STATE_ROOT_DIRS = ConfigOptions + .key("taskmanager.state.local.root-dirs") + .noDefaultValue(); + // ------------------------------------------------------------------------ // Options specific to the file-system-based state backends // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java index e697e6f..8566a43 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java @@ -18,6 +18,8 @@ package org.apache.flink.configuration; +import javax.annotation.Nonnull; + import java.io.File; /** @@ -25,6 +27,8 @@ import java.io.File; */ public class ConfigurationUtils { + private static final String[] EMPTY = new String[0]; + /** * Extracts the task manager directories for temporary files as defined by * {@link org.apache.flink.configuration.CoreOptions#TMP_DIRS}. @@ -32,10 +36,30 @@ public class ConfigurationUtils { * @param configuration configuration object * @return array of configured directories (in order) */ + @Nonnull public static String[] parseTempDirectories(Configuration configuration) { - return configuration.getString(CoreOptions.TMP_DIRS).split(",|" + File.pathSeparator); + return splitPaths(configuration.getString(CoreOptions.TMP_DIRS)); + } + + /** + * Extracts the local state directories as defined by + * {@link CheckpointingOptions#LOCAL_RECOVERY_TASK_MANAGER_STATE_ROOT_DIRS}. + * + * @param configuration configuration object + * @return array of configured directories (in order) + */ + @Nonnull + public static String[] parseLocalStateDirectories(Configuration configuration) { + String configValue = configuration.getString(CheckpointingOptions.LOCAL_RECOVERY_TASK_MANAGER_STATE_ROOT_DIRS, ""); + return splitPaths(configValue); + } + + @Nonnull + private static String[] splitPaths(@Nonnull String separatedPaths) { + return separatedPaths.length() > 0 ? separatedPaths.split(",|" + File.pathSeparator) : EMPTY; } // Make sure that we cannot instantiate this class - private ConfigurationUtils() {} + private ConfigurationUtils() { + } } http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java index ee2ecbe..2d7fbd5 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java @@ -211,10 +211,12 @@ public class LocalFileSystem extends FileSystem { if (f.isDirectory()) { final File[] files = f.listFiles(); - for (File file : files) { - final boolean del = delete(file); - if (!del) { - return false; + if (files != null) { + for (File file : files) { + final boolean del = delete(file); + if (!del) { + return false; + } } } } else { http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-core/src/main/java/org/apache/flink/util/Disposable.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/Disposable.java b/flink-core/src/main/java/org/apache/flink/util/Disposable.java new file mode 100644 index 0000000..12ef763 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/util/Disposable.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +/** + * Interface for classes that can be disposed, i.e. that have a dedicated lifecycle step to "destroy" the object. On + * reason for this is for example to release native resources. From this point, the interface fulfills a similar purpose + * as the {@link java.io.Closeable} interface, but sometimes both should be represented as isolated, independent + * lifecycle steps. + */ +public interface Disposable { + + /** + * Disposes the object and releases all resources. After calling this method, calling any methods on the + * object may result in undefined behavior. + * + * @throws Exception if something goes wrong during disposal. + */ + void dispose() throws Exception; +} http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-core/src/test/java/org/apache/flink/util/MethodForwardingTestUtil.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/util/MethodForwardingTestUtil.java b/flink-core/src/test/java/org/apache/flink/util/MethodForwardingTestUtil.java new file mode 100644 index 0000000..8a9e8ca --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/util/MethodForwardingTestUtil.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.mockito.Mockito; +import org.mockito.internal.util.MockUtil; + +import java.lang.reflect.Array; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Collections; +import java.util.Set; +import java.util.function.Function; +import java.util.function.Supplier; + +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; + +/** + * Helper class with a method that attempts to automatically test method forwarding between a delegate and a wrapper. + */ +public class MethodForwardingTestUtil { + + /** + * This is a best effort automatic test for method forwarding between a delegate and its wrapper, where the wrapper + * class is a subtype of the delegate. This ignores methods that are inherited from Object. + * + * @param delegateClass the class for the delegate. + * @param wrapperFactory factory that produces a wrapper from a delegate. + * @param <D> type of the delegate + * @param <W> type of the wrapper + */ + public static <D, W> void testMethodForwarding( + Class<D> delegateClass, + Function<D, W> wrapperFactory) + throws NoSuchMethodException, IllegalAccessException, InvocationTargetException { + testMethodForwarding(delegateClass, wrapperFactory, () -> spy(delegateClass), Collections.emptySet()); + } + + /** + * This is a best effort automatic test for method forwarding between a delegate and its wrapper, where the wrapper + * class is a subtype of the delegate. This ignores methods that are inherited from Object. + * + * @param delegateClass the class for the delegate. + * @param wrapperFactory factory that produces a wrapper from a delegate. + * @param delegateObjectSupplier supplier for the delegate object passed to the wrapper factory. + * @param <D> type of the delegate + * @param <W> type of the wrapper + * @param <I> type of the object created as delegate, is a subtype of D. + */ + public static <D, W, I extends D> void testMethodForwarding( + Class<D> delegateClass, + Function<I, W> wrapperFactory, + Supplier<I> delegateObjectSupplier) + throws NoSuchMethodException, IllegalAccessException, InvocationTargetException { + testMethodForwarding(delegateClass, wrapperFactory, delegateObjectSupplier, Collections.emptySet()); + } + + /** + * This is a best effort automatic test for method forwarding between a delegate and its wrapper, where the wrapper + * class is a subtype of the delegate. Methods can be remapped in case that the implementation does not call the + * original method. Remapping to null skips the method. This ignores methods that are inherited from Object. + * + * @param delegateClass the class for the delegate. + * @param wrapperFactory factory that produces a wrapper from a delegate. + * @param delegateObjectSupplier supplier for the delegate object passed to the wrapper factory. + * @param skipMethodSet set of methods to ignore. + * @param <D> type of the delegate + * @param <W> type of the wrapper + * @param <I> type of the object created as delegate, is a subtype of D. + */ + public static <D, W, I extends D> void testMethodForwarding( + Class<D> delegateClass, + Function<I, W> wrapperFactory, + Supplier<I> delegateObjectSupplier, + Set<Method> skipMethodSet) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { + + Preconditions.checkNotNull(delegateClass); + Preconditions.checkNotNull(wrapperFactory); + Preconditions.checkNotNull(skipMethodSet); + + I delegate = delegateObjectSupplier.get(); + + //check if we need to wrap the delegate object as a spy, or if it is already testable with Mockito. + MockUtil mockUtil = new MockUtil(); + if (!mockUtil.isSpy(delegate) || !mockUtil.isMock(delegate)) { + delegate = spy(delegate); + } + + W wrapper = wrapperFactory.apply(delegate); + + // ensure that wrapper is a subtype of delegate + Preconditions.checkArgument(delegateClass.isAssignableFrom(wrapper.getClass())); + + for (Method delegateMethod : delegateClass.getMethods()) { + + if (checkSkipMethodForwardCheck(delegateMethod, skipMethodSet)) { + continue; + } + + // find the correct method to substitute the bridge for erased generic types. + // if this doesn't work, the user need to exclude the method and write an additional test. + Method wrapperMethod = wrapper.getClass().getMethod( + delegateMethod.getName(), + delegateMethod.getParameterTypes()); + + // things get a bit fuzzy here, best effort to find a match but this might end up with a wrong method. + if (wrapperMethod.isBridge()) { + for (Method method : wrapper.getClass().getMethods()) { + if (!method.isBridge() + && method.getName().equals(wrapperMethod.getName()) + && method.getParameterCount() == wrapperMethod.getParameterCount()) { + wrapperMethod = method; + break; + } + } + } + + Class<?>[] parameterTypes = wrapperMethod.getParameterTypes(); + Object[] arguments = new Object[parameterTypes.length]; + for (int j = 0; j < arguments.length; j++) { + Class<?> parameterType = parameterTypes[j]; + if (parameterType.isArray()) { + arguments[j] = Array.newInstance(parameterType.getComponentType(), 0); + } else if (parameterType.isPrimitive()) { + if (boolean.class.equals(parameterType)) { + arguments[j] = false; + } else if (char.class.equals(parameterType)) { + arguments[j] = 'a'; + } else { + arguments[j] = (byte) 0; + } + } else { + arguments[j] = Mockito.mock(parameterType); + } + } + + wrapperMethod.invoke(wrapper, arguments); + delegateMethod.invoke(Mockito.verify(delegate, Mockito.times(1)), arguments); + reset(delegate); + } + } + + /** + * Test if this method should be skipped in our check for proper forwarding, e.g. because it is just a bridge. + */ + private static boolean checkSkipMethodForwardCheck(Method delegateMethod, Set<Method> skipMethods) { + + if (delegateMethod.isBridge() + || delegateMethod.isDefault() + || skipMethods.contains(delegateMethod)) { + return true; + } + + // skip methods declared in Object (Mockito doesn't like them) + try { + Object.class.getMethod(delegateMethod.getName(), delegateMethod.getParameterTypes()); + return true; + } catch (Exception ignore) { + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java ---------------------------------------------------------------------- diff --git a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java index b3a5def..be5f677 100644 --- a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java +++ b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java @@ -120,9 +120,7 @@ public class RestAPIDocGenerator { List<MessageHeaders> specs = restEndpoint.getSpecs(); specs.forEach(spec -> html.append(createHtmlEntry(spec))); - if (Files.exists(outputFile)) { - Files.delete(outputFile); - } + Files.deleteIfExists(outputFile); Files.write(outputFile, html.toString().getBytes(StandardCharsets.UTF_8)); } http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala index e69472e..12cc375 100644 --- a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala +++ b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala @@ -24,6 +24,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager import org.apache.flink.runtime.io.network.NetworkEnvironment import org.apache.flink.runtime.memory.MemoryManager import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup +import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation} @@ -37,6 +38,7 @@ class MesosTaskManager( memoryManager: MemoryManager, ioManager: IOManager, network: NetworkEnvironment, + taskManagerLocalStateStoresManager: TaskExecutorLocalStateStoresManager, numberOfSlots: Int, highAvailabilityServices: HighAvailabilityServices, taskManagerMetricGroup : TaskManagerMetricGroup) @@ -47,6 +49,7 @@ class MesosTaskManager( memoryManager, ioManager, network, + taskManagerLocalStateStoresManager, numberOfSlots, highAvailabilityServices, taskManagerMetricGroup) { http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java index 07517ab..dd75dd6 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java @@ -30,6 +30,7 @@ import org.apache.flink.queryablestate.client.VoidNamespace; import org.apache.flink.queryablestate.client.VoidNamespaceSerializer; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.TestLocalRecoveryConfig; import org.apache.flink.runtime.state.internal.InternalListState; import org.apache.flink.runtime.state.internal.InternalMapState; @@ -76,7 +77,8 @@ public final class KVStateRequestSerializerRocksDBTest { super(operatorIdentifier, userCodeClassLoader, instanceBasePath, dbOptions, columnFamilyOptions, kvStateRegistry, keySerializer, - numberOfKeyGroups, keyGroupRange, executionConfig, false); + numberOfKeyGroups, keyGroupRange, executionConfig, false, + TestLocalRecoveryConfig.disabled()); } @Override @@ -152,7 +154,8 @@ public final class KVStateRequestSerializerRocksDBTest { LongSerializer.INSTANCE, 1, new KeyGroupRange(0, 0), new ExecutionConfig(), - false); + false, + TestLocalRecoveryConfig.disabled()); longHeapKeyedStateBackend.restore(null); longHeapKeyedStateBackend.setCurrentKey(key); http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java index d3314ab..8d10141 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java @@ -30,6 +30,7 @@ import org.apache.flink.queryablestate.client.VoidNamespaceSerializer; import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.TestLocalRecoveryConfig; import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; import org.apache.flink.runtime.state.internal.InternalKvState; import org.apache.flink.runtime.state.internal.InternalListState; @@ -194,7 +195,8 @@ public class KvStateRequestSerializerTest { 1, new KeyGroupRange(0, 0), async, - new ExecutionConfig() + new ExecutionConfig(), + TestLocalRecoveryConfig.disabled() ); longHeapKeyedStateBackend.setCurrentKey(key); @@ -290,13 +292,14 @@ public class KvStateRequestSerializerTest { // objects for heap state list serialisation final HeapKeyedStateBackend<Long> longHeapKeyedStateBackend = new HeapKeyedStateBackend<>( - mock(TaskKvStateRegistry.class), - LongSerializer.INSTANCE, - ClassLoader.getSystemClassLoader(), - 1, - new KeyGroupRange(0, 0), - async, - new ExecutionConfig() + mock(TaskKvStateRegistry.class), + LongSerializer.INSTANCE, + ClassLoader.getSystemClassLoader(), + 1, + new KeyGroupRange(0, 0), + async, + new ExecutionConfig(), + TestLocalRecoveryConfig.disabled() ); longHeapKeyedStateBackend.setCurrentKey(key); http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java index 58d532b..450436f 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java @@ -180,9 +180,7 @@ class HistoryServerArchiveFetcher { // We overwrite existing files since this may be another attempt at fetching this archive. // Existing files may be incomplete/corrupt. - if (Files.exists(targetPath)) { - Files.delete(targetPath); - } + Files.deleteIfExists(targetPath); Files.createFile(target.toPath()); try (FileWriter fw = new FileWriter(target)) { http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/JobManagerTaskRestore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/JobManagerTaskRestore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/JobManagerTaskRestore.java index d5ac3e0..2537db1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/JobManagerTaskRestore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/JobManagerTaskRestore.java @@ -18,6 +18,9 @@ package org.apache.flink.runtime.checkpoint; +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; + import java.io.Serializable; /** @@ -27,11 +30,13 @@ public class JobManagerTaskRestore implements Serializable { private static final long serialVersionUID = 1L; + /** The id of the checkpoint from which we restore. */ private final long restoreCheckpointId; + /** The state for this task to restore. */ private final TaskStateSnapshot taskStateSnapshot; - public JobManagerTaskRestore(long restoreCheckpointId, TaskStateSnapshot taskStateSnapshot) { + public JobManagerTaskRestore(@Nonnegative long restoreCheckpointId, @Nonnull TaskStateSnapshot taskStateSnapshot) { this.restoreCheckpointId = restoreCheckpointId; this.taskStateSnapshot = taskStateSnapshot; } @@ -40,13 +45,14 @@ public class JobManagerTaskRestore implements Serializable { return restoreCheckpointId; } + @Nonnull public TaskStateSnapshot getTaskStateSnapshot() { return taskStateSnapshot; } @Override public String toString() { - return "TaskRestore{" + + return "JobManagerTaskRestore{" + "restoreCheckpointId=" + restoreCheckpointId + ", taskStateSnapshot=" + taskStateSnapshot + '}'; http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java index 3df9c4f..c2b8e9a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java @@ -19,8 +19,8 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.runtime.state.CompositeStateHandle; -import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.StateObject; import org.apache.flink.runtime.state.StateUtil; @@ -30,10 +30,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; import java.util.List; /** @@ -50,8 +49,6 @@ import java.util.List; * Under normal circumstances, the expected size of each collection is still 0 or 1, except for scale-down. In * scale-down, one operator subtask can become responsible for the state of multiple previous subtasks. The collections * can then store all the state handles that are relevant to build up the new subtask state. - * - * <p>There is no collection for legacy state because it is not rescalable. */ public class OperatorSubtaskState implements CompositeStateHandle { @@ -63,25 +60,25 @@ public class OperatorSubtaskState implements CompositeStateHandle { * Snapshot from the {@link org.apache.flink.runtime.state.OperatorStateBackend}. */ @Nonnull - private final Collection<OperatorStateHandle> managedOperatorState; + private final StateObjectCollection<OperatorStateHandle> managedOperatorState; /** * Snapshot written using {@link org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream}. */ @Nonnull - private final Collection<OperatorStateHandle> rawOperatorState; + private final StateObjectCollection<OperatorStateHandle> rawOperatorState; /** * Snapshot from {@link org.apache.flink.runtime.state.KeyedStateBackend}. */ @Nonnull - private final Collection<KeyedStateHandle> managedKeyedState; + private final StateObjectCollection<KeyedStateHandle> managedKeyedState; /** * Snapshot written using {@link org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream}. */ @Nonnull - private final Collection<KeyedStateHandle> rawKeyedState; + private final StateObjectCollection<KeyedStateHandle> rawKeyedState; /** * The state size. This is also part of the deserialized state handle. @@ -95,43 +92,39 @@ public class OperatorSubtaskState implements CompositeStateHandle { */ public OperatorSubtaskState() { this( - Collections.emptyList(), - Collections.emptyList(), - Collections.emptyList(), - Collections.emptyList()); + StateObjectCollection.empty(), + StateObjectCollection.empty(), + StateObjectCollection.empty(), + StateObjectCollection.empty()); } public OperatorSubtaskState( - Collection<OperatorStateHandle> managedOperatorState, - Collection<OperatorStateHandle> rawOperatorState, - Collection<KeyedStateHandle> managedKeyedState, - Collection<KeyedStateHandle> rawKeyedState) { + @Nonnull StateObjectCollection<OperatorStateHandle> managedOperatorState, + @Nonnull StateObjectCollection<OperatorStateHandle> rawOperatorState, + @Nonnull StateObjectCollection<KeyedStateHandle> managedKeyedState, + @Nonnull StateObjectCollection<KeyedStateHandle> rawKeyedState) { this.managedOperatorState = Preconditions.checkNotNull(managedOperatorState); this.rawOperatorState = Preconditions.checkNotNull(rawOperatorState); this.managedKeyedState = Preconditions.checkNotNull(managedKeyedState); this.rawKeyedState = Preconditions.checkNotNull(rawKeyedState); - try { - long calculateStateSize = sumAllSizes(managedOperatorState); - calculateStateSize += sumAllSizes(rawOperatorState); - calculateStateSize += sumAllSizes(managedKeyedState); - calculateStateSize += sumAllSizes(rawKeyedState); - stateSize = calculateStateSize; - } catch (Exception e) { - throw new RuntimeException("Failed to get state size.", e); - } + long calculateStateSize = managedOperatorState.getStateSize(); + calculateStateSize += rawOperatorState.getStateSize(); + calculateStateSize += managedKeyedState.getStateSize(); + calculateStateSize += rawKeyedState.getStateSize(); + stateSize = calculateStateSize; } /** * For convenience because the size of the collections is typically 0 or 1. Null values are translated into empty - * Collections (except for legacy state). + * Collections. */ public OperatorSubtaskState( - OperatorStateHandle managedOperatorState, - OperatorStateHandle rawOperatorState, - KeyedStateHandle managedKeyedState, - KeyedStateHandle rawKeyedState) { + @Nullable OperatorStateHandle managedOperatorState, + @Nullable OperatorStateHandle rawOperatorState, + @Nullable KeyedStateHandle managedKeyedState, + @Nullable KeyedStateHandle rawKeyedState) { this( singletonOrEmptyOnNull(managedOperatorState), @@ -140,21 +133,8 @@ public class OperatorSubtaskState implements CompositeStateHandle { singletonOrEmptyOnNull(rawKeyedState)); } - private static <T> Collection<T> singletonOrEmptyOnNull(T element) { - return element != null ? Collections.singletonList(element) : Collections.<T>emptyList(); - } - - private static long sumAllSizes(Collection<? extends StateObject> stateObject) throws Exception { - long size = 0L; - for (StateObject object : stateObject) { - size += getSizeNullSafe(object); - } - - return size; - } - - private static long getSizeNullSafe(StateObject stateObject) throws Exception { - return stateObject != null ? stateObject.getStateSize() : 0L; + private static <T extends StateObject> StateObjectCollection<T> singletonOrEmptyOnNull(T element) { + return element != null ? StateObjectCollection.singleton(element) : StateObjectCollection.empty(); } // -------------------------------------------------------------------------------------------- @@ -163,7 +143,7 @@ public class OperatorSubtaskState implements CompositeStateHandle { * Returns a handle to the managed operator state. */ @Nonnull - public Collection<OperatorStateHandle> getManagedOperatorState() { + public StateObjectCollection<OperatorStateHandle> getManagedOperatorState() { return managedOperatorState; } @@ -171,7 +151,7 @@ public class OperatorSubtaskState implements CompositeStateHandle { * Returns a handle to the raw operator state. */ @Nonnull - public Collection<OperatorStateHandle> getRawOperatorState() { + public StateObjectCollection<OperatorStateHandle> getRawOperatorState() { return rawOperatorState; } @@ -179,7 +159,7 @@ public class OperatorSubtaskState implements CompositeStateHandle { * Returns a handle to the managed keyed state. */ @Nonnull - public Collection<KeyedStateHandle> getManagedKeyedState() { + public StateObjectCollection<KeyedStateHandle> getManagedKeyedState() { return managedKeyedState; } @@ -187,7 +167,7 @@ public class OperatorSubtaskState implements CompositeStateHandle { * Returns a handle to the raw keyed state. */ @Nonnull - public Collection<KeyedStateHandle> getRawKeyedState() { + public StateObjectCollection<KeyedStateHandle> getRawKeyedState() { return rawKeyedState; } @@ -281,18 +261,9 @@ public class OperatorSubtaskState implements CompositeStateHandle { } public boolean hasState() { - return hasState(managedOperatorState) - || hasState(rawOperatorState) - || hasState(managedKeyedState) - || hasState(rawKeyedState); - } - - private boolean hasState(Iterable<? extends StateObject> states) { - for (StateObject state : states) { - if (state != null) { - return true; - } - } - return false; + return managedOperatorState.hasState() + || rawOperatorState.hasState() + || managedKeyedState.hasState() + || rawKeyedState.hasState(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java new file mode 100644 index 0000000..f48d311 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.StateObject; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.function.BiFunction; + +/** + * This class is a wrapper over multiple alternative {@link OperatorSubtaskState} that are (partial) substitutes for + * each other and imposes a priority ordering over all alternatives for the different states which define an order in + * which the operator should attempt to restore the state from them. One OperatorSubtaskState is considered as the + * "ground truth" about which state should be represented. Alternatives may be complete or partial substitutes for + * the "ground truth" with a higher priority (if they had a lower alternative, they would not really be alternatives). + * Substitution is determined on a per-sub-state basis. + */ +@Internal +public class PrioritizedOperatorSubtaskState { + + /** Singleton instance for an empty, non-restored operator state. */ + private static final PrioritizedOperatorSubtaskState EMPTY_NON_RESTORED_INSTANCE = + new PrioritizedOperatorSubtaskState.Builder(new OperatorSubtaskState(), Collections.emptyList(), false) + .build(); + + /** List of prioritized snapshot alternatives for managed operator state. */ + private final List<StateObjectCollection<OperatorStateHandle>> prioritizedManagedOperatorState; + + /** List of prioritized snapshot alternatives for raw operator state. */ + private final List<StateObjectCollection<OperatorStateHandle>> prioritizedRawOperatorState; + + /** List of prioritized snapshot alternatives for managed keyed state. */ + private final List<StateObjectCollection<KeyedStateHandle>> prioritizedManagedKeyedState; + + /** List of prioritized snapshot alternatives for raw keyed state. */ + private final List<StateObjectCollection<KeyedStateHandle>> prioritizedRawKeyedState; + + /** Signal flag if this represents state for a restored operator. */ + private final boolean restored; + + PrioritizedOperatorSubtaskState( + @Nonnull List<StateObjectCollection<KeyedStateHandle>> prioritizedManagedKeyedState, + @Nonnull List<StateObjectCollection<KeyedStateHandle>> prioritizedRawKeyedState, + @Nonnull List<StateObjectCollection<OperatorStateHandle>> prioritizedManagedOperatorState, + @Nonnull List<StateObjectCollection<OperatorStateHandle>> prioritizedRawOperatorState, + boolean restored) { + + this.prioritizedManagedOperatorState = prioritizedManagedOperatorState; + this.prioritizedRawOperatorState = prioritizedRawOperatorState; + this.prioritizedManagedKeyedState = prioritizedManagedKeyedState; + this.prioritizedRawKeyedState = prioritizedRawKeyedState; + this.restored = restored; + } + + // ----------------------------------------------------------------------------------------------------------------- + + /** + * Returns an iterator over all alternative snapshots to restore the managed operator state, in the order in which + * we should attempt to restore. + */ + @Nonnull + public Iterator<StateObjectCollection<OperatorStateHandle>> getPrioritizedManagedOperatorState() { + return prioritizedManagedOperatorState.iterator(); + } + + /** + * Returns an iterator over all alternative snapshots to restore the raw operator state, in the order in which we + * should attempt to restore. + */ + @Nonnull + public Iterator<StateObjectCollection<OperatorStateHandle>> getPrioritizedRawOperatorState() { + return prioritizedRawOperatorState.iterator(); + } + + /** + * Returns an iterator over all alternative snapshots to restore the managed keyed state, in the order in which we + * should attempt to restore. + */ + @Nonnull + public Iterator<StateObjectCollection<KeyedStateHandle>> getPrioritizedManagedKeyedState() { + return prioritizedManagedKeyedState.iterator(); + } + + /** + * Returns an iterator over all alternative snapshots to restore the raw keyed state, in the order in which we + * should attempt to restore. + */ + @Nonnull + public Iterator<StateObjectCollection<KeyedStateHandle>> getPrioritizedRawKeyedState() { + return prioritizedRawKeyedState.iterator(); + } + + // ----------------------------------------------------------------------------------------------------------------- + + /** + * Returns the managed operator state from the job manager, which represents the ground truth about what this state + * should represent. This is the alternative with lowest priority. + */ + @Nonnull + public StateObjectCollection<OperatorStateHandle> getJobManagerManagedOperatorState() { + return lastElement(prioritizedManagedOperatorState); + } + + /** + * Returns the raw operator state from the job manager, which represents the ground truth about what this state + * should represent. This is the alternative with lowest priority. + */ + @Nonnull + public StateObjectCollection<OperatorStateHandle> getJobManagerRawOperatorState() { + return lastElement(prioritizedRawOperatorState); + } + + /** + * Returns the managed keyed state from the job manager, which represents the ground truth about what this state + * should represent. This is the alternative with lowest priority. + */ + @Nonnull + public StateObjectCollection<KeyedStateHandle> getJobManagerManagedKeyedState() { + return lastElement(prioritizedManagedKeyedState); + } + + /** + * Returns the raw keyed state from the job manager, which represents the ground truth about what this state + * should represent. This is the alternative with lowest priority. + */ + @Nonnull + public StateObjectCollection<KeyedStateHandle> getJobManagerRawKeyedState() { + return lastElement(prioritizedRawKeyedState); + } + + // ----------------------------------------------------------------------------------------------------------------- + + /** + * Returns true if this was created for a restored operator, false otherwise. Restored operators are operators that + * participated in a previous checkpoint, even if they did not emit any state snapshots. + */ + public boolean isRestored() { + return restored; + } + + + private static <T extends StateObject> StateObjectCollection<T> lastElement(List<StateObjectCollection<T>> list) { + return list.get(list.size() - 1); + } + + /** + * Returns an empty {@link PrioritizedOperatorSubtaskState} singleton for an empty, not-restored operator state. + */ + public static PrioritizedOperatorSubtaskState emptyNotRestored() { + return EMPTY_NON_RESTORED_INSTANCE; + } + + @Internal + public static class Builder { + + /** Ground truth of state, provided by job manager. */ + @Nonnull + private final OperatorSubtaskState jobManagerState; + + /** (Local) alternatives to the job manager state. */ + @Nonnull + private final List<OperatorSubtaskState> alternativesByPriority; + + /** Flag if the states have been restored. */ + private final boolean restored; + + public Builder( + @Nonnull OperatorSubtaskState jobManagerState, + @Nonnull List<OperatorSubtaskState> alternativesByPriority) { + this(jobManagerState, alternativesByPriority, true); + } + + public Builder( + @Nonnull OperatorSubtaskState jobManagerState, + @Nonnull List<OperatorSubtaskState> alternativesByPriority, + boolean restored) { + + this.jobManagerState = jobManagerState; + this.alternativesByPriority = alternativesByPriority; + this.restored = restored; + } + + public PrioritizedOperatorSubtaskState build() { + int size = alternativesByPriority.size(); + List<StateObjectCollection<OperatorStateHandle>> managedOperatorAlternatives = new ArrayList<>(size); + List<StateObjectCollection<KeyedStateHandle>> managedKeyedAlternatives = new ArrayList<>(size); + List<StateObjectCollection<OperatorStateHandle>> rawOperatorAlternatives = new ArrayList<>(size); + List<StateObjectCollection<KeyedStateHandle>> rawKeyedAlternatives = new ArrayList<>(size); + + for (OperatorSubtaskState subtaskState : alternativesByPriority) { + + if (subtaskState != null) { + managedKeyedAlternatives.add(subtaskState.getManagedKeyedState()); + rawKeyedAlternatives.add(subtaskState.getRawKeyedState()); + managedOperatorAlternatives.add(subtaskState.getManagedOperatorState()); + rawOperatorAlternatives.add(subtaskState.getRawOperatorState()); + } + } + + // Key-groups should match. + BiFunction<KeyedStateHandle, KeyedStateHandle, Boolean> keyedStateApprover = + (ref, alt) -> ref.getKeyGroupRange().equals(alt.getKeyGroupRange()); + + // State meta data should match. + BiFunction<OperatorStateHandle, OperatorStateHandle, Boolean> operatorStateApprover = + (ref, alt) -> ref.getStateNameToPartitionOffsets().equals(alt.getStateNameToPartitionOffsets()); + + return new PrioritizedOperatorSubtaskState( + resolvePrioritizedAlternatives( + jobManagerState.getManagedKeyedState(), + managedKeyedAlternatives, + keyedStateApprover), + resolvePrioritizedAlternatives( + jobManagerState.getRawKeyedState(), + rawKeyedAlternatives, + keyedStateApprover), + resolvePrioritizedAlternatives( + jobManagerState.getManagedOperatorState(), + managedOperatorAlternatives, + operatorStateApprover), + resolvePrioritizedAlternatives( + jobManagerState.getRawOperatorState(), + rawOperatorAlternatives, + operatorStateApprover), + restored); + } + + /** + * This helper method resolves the dependencies between the ground truth of the operator state obtained from the + * job manager and potential alternatives for recovery, e.g. from a task-local source. + */ + protected <T extends StateObject> List<StateObjectCollection<T>> resolvePrioritizedAlternatives( + StateObjectCollection<T> jobManagerState, + List<StateObjectCollection<T>> alternativesByPriority, + BiFunction<T, T, Boolean> approveFun) { + + // Nothing to resolve if there are no alternatives, or the ground truth has already no state, or if we can + // assume that a rescaling happened because we find more than one handle in the JM state (this is more a sanity + // check). + if (alternativesByPriority == null + || alternativesByPriority.isEmpty() + || !jobManagerState.hasState() + || jobManagerState.size() != 1) { + + return Collections.singletonList(jobManagerState); + } + + // As we know size is == 1 + T reference = jobManagerState.iterator().next(); + + // This will contain the end result, we initialize it with the potential max. size. + List<StateObjectCollection<T>> approved = + new ArrayList<>(1 + alternativesByPriority.size()); + + for (StateObjectCollection<T> alternative : alternativesByPriority) { + + // We found an alternative to the JM state if it has state, we have a 1:1 relationship, and the + // approve-function signaled true. + if (alternative != null + && alternative.hasState() + && alternative.size() == 1 + && approveFun.apply(reference, alternative.iterator().next())) { + + approved.add(alternative); + } + } + + // Of course we include the ground truth as last alternative. + approved.add(jobManagerState); + return Collections.unmodifiableList(approved); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java index e09b677..e6fa687 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.OperatorStreamStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.util.Preconditions; @@ -80,7 +81,10 @@ public class RoundRobinOperatorStateRepartitioner implements OperatorStateRepart new EnumMap<>(OperatorStateHandle.Mode.class); for (OperatorStateHandle.Mode mode : OperatorStateHandle.Mode.values()) { - nameToStateByMode.put(mode, new HashMap<>()); + + nameToStateByMode.put( + mode, + new HashMap<>()); } for (OperatorStateHandle psh : previousParallelSubtaskStates) { @@ -171,8 +175,6 @@ public class RoundRobinOperatorStateRepartitioner implements OperatorStateRepart } // Now start collection the partitions for the parallel instance into this list - List<Tuple2<StreamStateHandle, OperatorStateHandle.StateMetaInfo>> parallelOperatorState = - new ArrayList<>(); while (numberOfPartitionsToAssign > 0) { Tuple2<StreamStateHandle, OperatorStateHandle.StateMetaInfo> handleWithOffsets = @@ -194,10 +196,6 @@ public class RoundRobinOperatorStateRepartitioner implements OperatorStateRepart ++lstIdx; } - parallelOperatorState.add(new Tuple2<>( - handleWithOffsets.f0, - new OperatorStateHandle.StateMetaInfo(offs, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE))); - numberOfPartitionsToAssign -= remaining; // As a last step we merge partitions that use the same StreamStateHandle in a single @@ -205,7 +203,7 @@ public class RoundRobinOperatorStateRepartitioner implements OperatorStateRepart Map<StreamStateHandle, OperatorStateHandle> mergeMap = mergeMapList.get(parallelOpIdx); OperatorStateHandle operatorStateHandle = mergeMap.get(handleWithOffsets.f0); if (operatorStateHandle == null) { - operatorStateHandle = new OperatorStateHandle(new HashMap<>(), handleWithOffsets.f0); + operatorStateHandle = new OperatorStreamStateHandle(new HashMap<>(), handleWithOffsets.f0); mergeMap.put(handleWithOffsets.f0, operatorStateHandle); } operatorStateHandle.getStateNameToPartitionOffsets().put( @@ -231,7 +229,7 @@ public class RoundRobinOperatorStateRepartitioner implements OperatorStateRepart for (Tuple2<StreamStateHandle, OperatorStateHandle.StateMetaInfo> handleWithMetaInfo : e.getValue()) { OperatorStateHandle operatorStateHandle = mergeMap.get(handleWithMetaInfo.f0); if (operatorStateHandle == null) { - operatorStateHandle = new OperatorStateHandle(new HashMap<>(), handleWithMetaInfo.f0); + operatorStateHandle = new OperatorStreamStateHandle(new HashMap<>(), handleWithMetaInfo.f0); mergeMap.put(handleWithMetaInfo.f0, operatorStateHandle); } operatorStateHandle.getStateNameToPartitionOffsets().put(e.getKey(), handleWithMetaInfo.f1); @@ -252,13 +250,13 @@ public class RoundRobinOperatorStateRepartitioner implements OperatorStateRepart uniformBroadcastNameToState.entrySet()) { int oldParallelism = e.getValue().size(); - + Tuple2<StreamStateHandle, OperatorStateHandle.StateMetaInfo> handleWithMetaInfo = e.getValue().get(i % oldParallelism); OperatorStateHandle operatorStateHandle = mergeMap.get(handleWithMetaInfo.f0); if (operatorStateHandle == null) { - operatorStateHandle = new OperatorStateHandle(new HashMap<>(), handleWithMetaInfo.f0); + operatorStateHandle = new OperatorStreamStateHandle(new HashMap<>(), handleWithMetaInfo.f0); mergeMap.put(handleWithMetaInfo.f0, operatorStateHandle); } operatorStateHandle.getStateNameToPartitionOffsets().put(e.getKey(), handleWithMetaInfo.f1); @@ -282,4 +280,4 @@ public class RoundRobinOperatorStateRepartitioner implements OperatorStateRepart return byMode.get(mode); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java index 43a4b01..592489f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java @@ -24,11 +24,11 @@ import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.OperatorInstanceID; +import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedStateHandle; -import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; @@ -242,10 +242,10 @@ public class StateAssignmentOperation { checkState(!subRawKeyedState.containsKey(instanceID)); } return new OperatorSubtaskState( - subManagedOperatorState.getOrDefault(instanceID, Collections.emptyList()), - subRawOperatorState.getOrDefault(instanceID, Collections.emptyList()), - subManagedKeyedState.getOrDefault(instanceID, Collections.emptyList()), - subRawKeyedState.getOrDefault(instanceID, Collections.emptyList())); + new StateObjectCollection<>(subManagedOperatorState.getOrDefault(instanceID, Collections.emptyList())), + new StateObjectCollection<>(subRawOperatorState.getOrDefault(instanceID, Collections.emptyList())), + new StateObjectCollection<>(subManagedKeyedState.getOrDefault(instanceID, Collections.emptyList())), + new StateObjectCollection<>(subRawKeyedState.getOrDefault(instanceID, Collections.emptyList()))); } private static boolean isHeadOperator(int opIdx, List<OperatorID> operatorIDs) { http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateObjectCollection.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateObjectCollection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateObjectCollection.java new file mode 100644 index 0000000..38e3d15 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateObjectCollection.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.runtime.state.StateUtil; + +import org.apache.commons.collections.CollectionUtils; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.function.Predicate; + +/** + * This class represents a generic collection for {@link StateObject}s. Being a state object itself, it delegates + * {@link #discardState()} to all contained state objects and computes {@link #getStateSize()} as sum of the state + * sizes of all contained objects. + * + * @param <T> type of the contained state objects. + */ +public class StateObjectCollection<T extends StateObject> implements Collection<T>, StateObject { + + private static final long serialVersionUID = 1L; + + /** The empty StateObjectCollection. */ + private static final StateObjectCollection<?> EMPTY = new StateObjectCollection<>(Collections.emptyList()); + + /** Wrapped collection that contains the state objects. */ + private final Collection<T> stateObjects; + + /** + * Creates a new StateObjectCollection that is backed by an {@link ArrayList}. + */ + public StateObjectCollection() { + this.stateObjects = new ArrayList<>(); + } + + /** + * Creates a new StateObjectCollection wraps the given collection and delegates to it. + * @param stateObjects collection of state objects to wrap. + */ + public StateObjectCollection(Collection<T> stateObjects) { + this.stateObjects = stateObjects != null ? stateObjects : Collections.emptyList(); + } + + @Override + public int size() { + return stateObjects.size(); + } + + @Override + public boolean isEmpty() { + return stateObjects.isEmpty(); + } + + @Override + public boolean contains(Object o) { + return stateObjects.contains(o); + } + + @Override + public Iterator<T> iterator() { + return stateObjects.iterator(); + } + + @Override + public Object[] toArray() { + return stateObjects.toArray(); + } + + @Override + public <T1> T1[] toArray(T1[] a) { + return stateObjects.toArray(a); + } + + @Override + public boolean add(T t) { + return stateObjects.add(t); + } + + @Override + public boolean remove(Object o) { + return stateObjects.remove(o); + } + + @Override + public boolean containsAll(Collection<?> c) { + return stateObjects.containsAll(c); + } + + @Override + public boolean addAll(Collection<? extends T> c) { + return stateObjects.addAll(c); + } + + @Override + public boolean removeAll(Collection<?> c) { + return stateObjects.removeAll(c); + } + + @Override + public boolean removeIf(Predicate<? super T> filter) { + return stateObjects.removeIf(filter); + } + + @Override + public boolean retainAll(Collection<?> c) { + return stateObjects.retainAll(c); + } + + @Override + public void clear() { + stateObjects.clear(); + } + + @Override + public void discardState() throws Exception { + StateUtil.bestEffortDiscardAllStateObjects(stateObjects); + } + + @Override + public long getStateSize() { + return sumAllSizes(stateObjects); + } + + /** + * Returns true if this contains at least one {@link StateObject}. + */ + public boolean hasState() { + for (StateObject state : stateObjects) { + if (state != null) { + return true; + } + } + return false; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + StateObjectCollection<?> that = (StateObjectCollection<?>) o; + + // simple equals can cause troubles here because of how equals works e.g. between lists and sets. + return CollectionUtils.isEqualCollection(stateObjects, that.stateObjects); + } + + @Override + public int hashCode() { + return stateObjects.hashCode(); + } + + @Override + public String toString() { + return "StateObjectCollection{" + stateObjects + '}'; + } + + // ------------------------------------------------------------------------ + // Helper methods. + // ------------------------------------------------------------------------ + + public static <T extends StateObject> StateObjectCollection<T> empty() { + return (StateObjectCollection<T>) EMPTY; + } + + public static <T extends StateObject> StateObjectCollection<T> singleton(T stateObject) { + return new StateObjectCollection<>(Collections.singleton(stateObject)); + } + + private static long sumAllSizes(Collection<? extends StateObject> stateObject) { + long size = 0L; + for (StateObject object : stateObject) { + size += getSizeNullSafe(object); + } + + return size; + } + + private static long getSizeNullSafe(StateObject stateObject) { + return stateObject != null ? stateObject.getStateSize() : 0L; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java index 281693b..5aab33a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java @@ -20,8 +20,8 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.runtime.state.ChainedStateHandle; import org.apache.flink.runtime.state.CompositeStateHandle; -import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.StateObject; import org.apache.flink.runtime.state.StateUtil; http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java index 28edc63..abc96a3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java @@ -24,6 +24,9 @@ import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.StateUtil; import org.apache.flink.util.Preconditions; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -67,6 +70,7 @@ public class TaskStateSnapshot implements CompositeStateHandle { /** * Returns the subtask state for the given operator id (or null if not contained). */ + @Nullable public OperatorSubtaskState getSubtaskStateByOperatorID(OperatorID operatorID) { return subtaskStatesByOperatorID.get(operatorID); } @@ -75,7 +79,10 @@ public class TaskStateSnapshot implements CompositeStateHandle { * Maps the given operator id to the given subtask state. Returns the subtask state of a previous mapping, if such * a mapping existed or null otherwise. */ - public OperatorSubtaskState putSubtaskStateByOperatorID(OperatorID operatorID, OperatorSubtaskState state) { + public OperatorSubtaskState putSubtaskStateByOperatorID( + @Nonnull OperatorID operatorID, + @Nonnull OperatorSubtaskState state) { + return subtaskStatesByOperatorID.put(operatorID, Preconditions.checkNotNull(state)); } @@ -86,6 +93,18 @@ public class TaskStateSnapshot implements CompositeStateHandle { return subtaskStatesByOperatorID.entrySet(); } + /** + * Returns true if at least one {@link OperatorSubtaskState} in subtaskStatesByOperatorID has state. + */ + public boolean hasState() { + for (OperatorSubtaskState operatorSubtaskState : subtaskStatesByOperatorID.values()) { + if (operatorSubtaskState != null && operatorSubtaskState.hasState()) { + return true; + } + } + return false; + } + @Override public void discardState() throws Exception { StateUtil.bestEffortDiscardAllStateObjects(subtaskStatesByOperatorID.values()); http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java index d7966e6..468b12f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java @@ -23,6 +23,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.runtime.checkpoint.MasterState; import org.apache.flink.runtime.checkpoint.OperatorState; import org.apache.flink.runtime.checkpoint.TaskState; +import org.apache.flink.util.Disposable; import java.util.Collection; @@ -36,7 +37,7 @@ import java.util.Collection; * * <p>Savepoints are serialized via a {@link SavepointSerializer}. */ -public interface Savepoint extends Versioned { +public interface Savepoint extends Disposable, Versioned { /** * Returns the checkpoint ID of the savepoint. @@ -74,9 +75,4 @@ public interface Savepoint extends Versioned { */ Collection<OperatorState> getOperatorStates(); - /** - * Disposes the savepoint. - */ - void dispose() throws Exception; - } http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java index c26c983..b3e6e89 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java @@ -25,11 +25,12 @@ import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.checkpoint.TaskState; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.state.ChainedStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeOffsets; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedStateHandle; -import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.OperatorStreamStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.filesystem.FileStateHandle; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; @@ -256,7 +257,7 @@ public class SavepointV1Serializer implements SavepointSerializer<SavepointV2> { @VisibleForTesting public static void serializeOperatorStateHandle( - OperatorStateHandle stateHandle, DataOutputStream dos) throws IOException { + OperatorStateHandle stateHandle, DataOutputStream dos) throws IOException { if (stateHandle != null) { dos.writeByte(PARTITIONABLE_OPERATOR_STATE_HANDLE); @@ -309,7 +310,7 @@ public class SavepointV1Serializer implements SavepointSerializer<SavepointV2> { offsetsMap.put(key, metaInfo); } StreamStateHandle stateHandle = deserializeStreamStateHandle(dis); - return new OperatorStateHandle(offsetsMap, stateHandle); + return new OperatorStreamStateHandle(offsetsMap, stateHandle); } else { throw new IllegalStateException("Reading invalid OperatorStateHandle, type: " + type); } http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java index 9e406df..bd3bfae 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java @@ -27,8 +27,8 @@ import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.ChainedStateHandle; -import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.util.Preconditions; import java.util.Collection; http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java index 5636a52..faee588 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java @@ -23,12 +23,13 @@ import org.apache.flink.runtime.checkpoint.MasterState; import org.apache.flink.runtime.checkpoint.OperatorState; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeOffsets; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedStateHandle; -import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.OperatorStreamStateHandle; import org.apache.flink.runtime.state.StateHandleID; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.filesystem.FileStateHandle; @@ -433,7 +434,7 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> { } private static void serializeOperatorStateHandle( - OperatorStateHandle stateHandle, DataOutputStream dos) throws IOException { + OperatorStateHandle stateHandle, DataOutputStream dos) throws IOException { if (stateHandle != null) { dos.writeByte(PARTITIONABLE_OPERATOR_STATE_HANDLE); @@ -485,7 +486,7 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> { offsetsMap.put(key, metaInfo); } StreamStateHandle stateHandle = deserializeStreamStateHandle(dis); - return new OperatorStateHandle(offsetsMap, stateHandle); + return new OperatorStreamStateHandle(offsetsMap, stateHandle); } else { throw new IllegalStateException("Reading invalid OperatorStateHandle, type: " + type); } http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index a504799..946f6e4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -1240,7 +1240,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution */ @VisibleForTesting public CompletableFuture<Collection<TaskManagerLocation>> calculatePreferredLocations(LocationPreferenceConstraint locationPreferenceConstraint) { - final Collection<CompletableFuture<TaskManagerLocation>> preferredLocationFutures = getVertex().getPreferredLocationsBasedOnInputs(); + final Collection<CompletableFuture<TaskManagerLocation>> preferredLocationFutures = getVertex().getPreferredLocations(); final CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture; switch(locationPreferenceConstraint) { http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorter.java index a47041b..6c3577d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorter.java @@ -18,15 +18,16 @@ package org.apache.flink.runtime.operators.sort; -import java.io.IOException; - import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView; +import org.apache.flink.util.Disposable; import org.apache.flink.util.MutableObjectIterator; +import java.io.IOException; + /** * */ -public interface InMemorySorter<T> extends IndexedSortable { +public interface InMemorySorter<T> extends IndexedSortable, Disposable { /** * Resets the sort buffer back to the state where it is empty. All contained data is discarded. @@ -39,11 +40,12 @@ public interface InMemorySorter<T> extends IndexedSortable { * @return True, if no record is contained, false otherwise. */ boolean isEmpty(); - + /** * Disposes the sorter. * This method does not release the memory segments used by the sorter. */ + @Override void dispose(); /**
