http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index b7e4794..f60cb2c 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -33,10 +33,12 @@ import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation; import org.apache.flink.runtime.state.ConfigurableStateBackend; import org.apache.flink.runtime.state.DefaultOperatorStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.LocalRecoveryConfig; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.util.AbstractID; +import org.apache.flink.util.TernaryBoolean; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; @@ -103,10 +105,8 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu @Nullable private OptionsFactory optionsFactory; - /** True if incremental checkpointing is enabled. - * Null if not yet set, in which case the configuration values will be used. */ - @Nullable - private Boolean enableIncrementalCheckpointing; + /** This determines if incremental checkpointing is enabled. */ + private final TernaryBoolean enableIncrementalCheckpointing; // -- runtime values, set on TaskManager when initializing / using the backend @@ -201,7 +201,7 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu * @param checkpointStreamBackend The backend write the checkpoint streams to. */ public RocksDBStateBackend(StateBackend checkpointStreamBackend) { - this.checkpointStreamBackend = checkNotNull(checkpointStreamBackend); + this(checkpointStreamBackend, TernaryBoolean.UNDEFINED); } /** @@ -215,7 +215,7 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu * @param checkpointStreamBackend The backend write the checkpoint streams to. * @param enableIncrementalCheckpointing True if incremental checkpointing is enabled. */ - public RocksDBStateBackend(StateBackend checkpointStreamBackend, boolean enableIncrementalCheckpointing) { + public RocksDBStateBackend(StateBackend checkpointStreamBackend, TernaryBoolean enableIncrementalCheckpointing) { this.checkpointStreamBackend = checkNotNull(checkpointStreamBackend); this.enableIncrementalCheckpointing = enableIncrementalCheckpointing; } @@ -225,16 +225,15 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu */ @Deprecated public RocksDBStateBackend(AbstractStateBackend checkpointStreamBackend) { - this.checkpointStreamBackend = checkNotNull(checkpointStreamBackend); + this(checkpointStreamBackend, TernaryBoolean.UNDEFINED); } /** - * @deprecated Use {@link #RocksDBStateBackend(StateBackend, boolean)} instead. + * @deprecated Use {@link #RocksDBStateBackend(StateBackend, TernaryBoolean)} instead. */ @Deprecated public RocksDBStateBackend(AbstractStateBackend checkpointStreamBackend, boolean enableIncrementalCheckpointing) { - this.checkpointStreamBackend = checkNotNull(checkpointStreamBackend); - this.enableIncrementalCheckpointing = enableIncrementalCheckpointing; + this(checkpointStreamBackend, TernaryBoolean.fromBoolean(enableIncrementalCheckpointing)); } /** @@ -251,13 +250,8 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu originalStreamBackend; // configure incremental checkpoints - if (original.enableIncrementalCheckpointing != null) { - this.enableIncrementalCheckpointing = original.enableIncrementalCheckpointing; - } - else { - this.enableIncrementalCheckpointing = - config.getBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS); - } + this.enableIncrementalCheckpointing = original.enableIncrementalCheckpointing.resolveUndefined( + config.getBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS)); // configure local directories if (original.localRocksDbDirectories != null) { @@ -407,6 +401,9 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu File instanceBasePath = new File(getNextStoragePath(), "job-" + jobId + "_op-" + operatorIdentifier + "_uuid-" + UUID.randomUUID()); + LocalRecoveryConfig localRecoveryConfig = + env.getTaskStateManager().createLocalRecoveryConfig(); + return new RocksDBKeyedStateBackend<>( operatorIdentifier, env.getUserClassLoader(), @@ -418,7 +415,8 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu numberOfKeyGroups, keyGroupRange, env.getExecutionConfig(), - isIncrementalCheckpointsEnabled()); + isIncrementalCheckpointsEnabled(), + localRecoveryConfig); } @Override @@ -511,12 +509,7 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu * Gets whether incremental checkpoints are enabled for this state backend. */ public boolean isIncrementalCheckpointsEnabled() { - if (enableIncrementalCheckpointing != null) { - return enableIncrementalCheckpointing; - } - else { - return CheckpointingOptions.INCREMENTAL_CHECKPOINTS.defaultValue(); - } + return enableIncrementalCheckpointing.getOrDefault(CheckpointingOptions.INCREMENTAL_CHECKPOINTS.defaultValue()); } // ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java index bae1f81..9958577 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java @@ -43,8 +43,10 @@ import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOut import org.apache.flink.runtime.state.CheckpointedStateScope; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.TestLocalRecoveryConfig; import org.apache.flink.runtime.state.TestTaskStateManager; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; @@ -55,6 +57,7 @@ import org.apache.flink.runtime.state.testutils.BackendForTestStream.StreamFacto import org.apache.flink.runtime.state.testutils.TestCheckpointStreamFactory; import org.apache.flink.runtime.taskmanager.CheckpointResponder; import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory; +import org.apache.flink.runtime.util.BlockingCheckpointOutputStream; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; @@ -75,6 +78,8 @@ import org.junit.runner.RunWith; import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.modules.junit4.PowerMockRunner; +import javax.annotation.Nullable; + import java.io.File; import java.io.IOException; import java.lang.reflect.Field; @@ -186,7 +191,8 @@ public class RocksDBAsyncSnapshotTest extends TestLogger { TestTaskStateManager taskStateManagerTestMock = new TestTaskStateManager( jobID, executionAttemptID, - checkpointResponderMock); + checkpointResponderMock, + TestLocalRecoveryConfig.disabled()); StreamMockEnvironment mockEnv = new StreamMockEnvironment( testHarness.jobConfig, @@ -256,12 +262,16 @@ public class RocksDBAsyncSnapshotTest extends TestLogger { int count = 1; @Override - public CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws Exception { + public CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws IOException { // we skip the first created stream, because it is used to checkpoint the timer service, which is // currently not asynchronous. if (count > 0) { --count; - return new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize); + return new BlockingCheckpointOutputStream( + new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize), + null, + null, + Integer.MAX_VALUE); } else { return super.createCheckpointStateOutputStream(scope); } @@ -373,7 +383,7 @@ public class RocksDBAsyncSnapshotTest extends TestLogger { StringSerializer.INSTANCE, new ValueStateDescriptor<>("foobar", String.class)); - RunnableFuture<KeyedStateHandle> snapshotFuture = keyedStateBackend.snapshot( + RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotFuture = keyedStateBackend.snapshot( checkpointId, timestamp, new TestCheckpointStreamFactory(() -> outputStream), CheckpointOptions.forCheckpointWithDefaultLocation()); @@ -459,7 +469,7 @@ public class RocksDBAsyncSnapshotTest extends TestLogger { } @Override - public CheckpointStateOutputStream get() throws Exception { + public CheckpointStateOutputStream get() throws IOException { return factory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE); } } @@ -472,6 +482,7 @@ public class RocksDBAsyncSnapshotTest extends TestLogger { this.testException = testException; } + @Nullable @Override public StreamStateHandle closeAndGetHandle() throws IOException { throw new UnsupportedOperationException(); http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java index 3a39ba0..2dd67f5 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.TestTaskStateManager; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; @@ -411,6 +412,9 @@ public class RocksDBStateBackendConfigTest { TaskManagerRuntimeInfo tmInfo = new TestingTaskManagerRuntimeInfo(new Configuration(), tempDirStrings); when(env.getTaskManagerInfo()).thenReturn(tmInfo); + TestTaskStateManager taskStateManager = new TestTaskStateManager(); + when(env.getTaskStateManager()).thenReturn(taskStateManager); + return env; } } http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java index 54af400..9466bc3 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java @@ -33,9 +33,11 @@ import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.SnapshotResult; import org.apache.flink.runtime.state.StateBackendTestBase; import org.apache.flink.runtime.state.StateHandleID; import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.TestLocalRecoveryConfig; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.filesystem.FsStateBackend; @@ -231,7 +233,8 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa 1, new KeyGroupRange(0, 0), new ExecutionConfig(), - enableIncrementalCheckpointing); + enableIncrementalCheckpointing, + TestLocalRecoveryConfig.disabled()); verify(columnFamilyOptions, Mockito.times(1)) .setMergeOperatorName(RocksDBKeyedStateBackend.MERGE_OPERATOR_NAME); @@ -249,7 +252,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa setupRocksKeyedStateBackend(); try { - RunnableFuture<KeyedStateHandle> snapshot = + RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()); RocksDB spyDB = keyedStateBackend.db; @@ -286,7 +289,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa public void testDismissingSnapshot() throws Exception { setupRocksKeyedStateBackend(); try { - RunnableFuture<KeyedStateHandle> snapshot = + RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()); snapshot.cancel(true); verifyRocksObjectsReleased(); @@ -300,7 +303,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa public void testDismissingSnapshotNotRunnable() throws Exception { setupRocksKeyedStateBackend(); try { - RunnableFuture<KeyedStateHandle> snapshot = + RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()); snapshot.cancel(true); Thread asyncSnapshotThread = new Thread(snapshot); @@ -323,7 +326,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa public void testCompletingSnapshot() throws Exception { setupRocksKeyedStateBackend(); try { - RunnableFuture<KeyedStateHandle> snapshot = + RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()); Thread asyncSnapshotThread = new Thread(snapshot); asyncSnapshotThread.start(); @@ -332,7 +335,9 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa runStateUpdates(); blocker.trigger(); // allow checkpointing to start writing waiter.await(); // wait for snapshot stream writing to run - KeyedStateHandle keyedStateHandle = snapshot.get(); + + SnapshotResult<KeyedStateHandle> snapshotResult = snapshot.get(); + KeyedStateHandle keyedStateHandle = snapshotResult.getJobManagerOwnedSnapshot(); assertNotNull(keyedStateHandle); assertTrue(keyedStateHandle.getStateSize() > 0); assertEquals(2, keyedStateHandle.getKeyGroupRange().getNumberOfKeyGroups()); @@ -349,7 +354,8 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa public void testCancelRunningSnapshot() throws Exception { setupRocksKeyedStateBackend(); try { - RunnableFuture<KeyedStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()); + RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot = + keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()); Thread asyncSnapshotThread = new Thread(snapshot); asyncSnapshotThread.start(); waiter.await(); // wait for snapshot to run @@ -425,7 +431,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa backend.setCurrentKey(checkpointId); state.update("Hello-" + checkpointId); - RunnableFuture<KeyedStateHandle> snapshot = backend.snapshot( + RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot = backend.snapshot( checkpointId, checkpointId, createStreamFactory(), @@ -433,7 +439,11 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa snapshot.run(); - IncrementalKeyedStateHandle stateHandle = (IncrementalKeyedStateHandle) snapshot.get(); + SnapshotResult<KeyedStateHandle> snapshotResult = snapshot.get(); + + IncrementalKeyedStateHandle stateHandle = + (IncrementalKeyedStateHandle) snapshotResult.getJobManagerOwnedSnapshot(); + Map<StateHandleID, StreamStateHandle> sharedState = new HashMap<>(stateHandle.getSharedState()); http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index 4f16259..4d3f9f5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -347,10 +347,7 @@ public abstract class AbstractStreamOperator<OUT> } @Override - public final OperatorSnapshotFutures snapshotState( - long checkpointId, - long timestamp, - CheckpointOptions checkpointOptions, + public final OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, CheckpointStreamFactory factory) throws Exception { KeyGroupRange keyGroupRange = null != keyedStateBackend ? http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java new file mode 100644 index 0000000..ba27a0a --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.state.Snapshotable; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.util.Disposable; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.SupplierWithException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; + +/** + * This class implements the logic that creates (and potentially restores) a state backend. The restore logic + * considers multiple, prioritized options of snapshots to restore from, where all of the options should recreate + * the same state for the backend. When we fail to restore from the snapshot with the highest priority (typically + * the "fastest" to restore), we fallback to the next snapshot with the next highest priority. We also take care + * of cleaning up from failed restore attempts. We only reattempt when the problem occurs during the restore call + * and will only stop after all snapshot alternatives are exhausted and all failed. + * + * @param <T> type of the restored backend. + * @param <S> type of the supplied snapshots from which the backend restores. + */ +public class BackendRestorerProcedure< + T extends Closeable & Disposable & Snapshotable<?, Collection<S>>, + S extends StateObject> { + + /** Logger for this class. */ + private static final Logger LOG = LoggerFactory.getLogger(BackendRestorerProcedure.class); + + /** Factory for new, fresh backends without state. */ + private final SupplierWithException<T, Exception> instanceSupplier; + + /** This registry is used so that recovery can participate in the task lifecycle, i.e. can be canceled. */ + private final CloseableRegistry backendCloseableRegistry; + + /** + * Creates a new backend restorer using the given backend supplier and the closeable registry. + * + * @param instanceSupplier factory function for new, empty backend instances. + * @param backendCloseableRegistry registry to allow participation in task lifecycle, e.g. react to cancel. + */ + public BackendRestorerProcedure( + @Nonnull SupplierWithException<T, Exception> instanceSupplier, + @Nonnull CloseableRegistry backendCloseableRegistry) { + + this.instanceSupplier = Preconditions.checkNotNull(instanceSupplier); + this.backendCloseableRegistry = Preconditions.checkNotNull(backendCloseableRegistry); + } + + /** + * Creates a new state backend and restores it from the provided set of state snapshot alternatives. + * + * @param restoreOptions iterator over a prioritized set of state snapshot alternatives for recovery. + * @return the created (and restored) state backend. + * @throws Exception if the backend could not be created or restored. + */ + public @Nonnull + T createAndRestore(@Nonnull Iterator<? extends Collection<S>> restoreOptions) throws Exception { + + // This ensures that we always call the restore method even if there is no previous state + // (required by some backends). + Collection<S> attemptState = restoreOptions.hasNext() ? + restoreOptions.next() : + Collections.emptyList(); + + while (true) { + try { + return attemptCreateAndRestore(attemptState); + } catch (Exception ex) { + // more attempts? + if (restoreOptions.hasNext()) { + + attemptState = restoreOptions.next(); + LOG.warn("Exception while restoring backend, will retry with another snapshot replica.", ex); + } else { + + throw new FlinkException("Could not restore from any of the provided restore options.", ex); + } + } + } + } + + private T attemptCreateAndRestore(Collection<S> restoreState) throws Exception { + + // create a new, empty backend. + final T backendInstance = instanceSupplier.get(); + + try { + // register the backend with the registry to participate in task lifecycle w.r.t. cancellation. + backendCloseableRegistry.registerCloseable(backendInstance); + + // attempt to restore from snapshot (or null if no state was checkpointed). + backendInstance.restore(restoreState); + + return backendInstance; + } catch (Exception ex) { + + // under failure, we need do close... + if (backendCloseableRegistry.unregisterCloseable(backendInstance)) { + try { + backendInstance.close(); + } catch (IOException closeEx) { + ex = ExceptionUtils.firstOrSuppressed(closeEx, ex); + } + } + + // ... and dispose, e.g. to release native resources. + try { + backendInstance.dispose(); + } catch (Exception disposeEx) { + ex = ExceptionUtils.firstOrSuppressed(disposeEx, ex); + } + + throw ex; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizer.java new file mode 100644 index 0000000..7a93990 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizer.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.util.FutureUtil; + +import javax.annotation.Nonnull; + +import java.util.concurrent.ExecutionException; + +/** + * This class finalizes {@link OperatorSnapshotFutures}. Each object is created with a {@link OperatorSnapshotFutures} + * that is executed. The object can then deliver the results from the execution as {@link OperatorSubtaskState}. + */ +public class OperatorSnapshotFinalizer { + + /** Primary replica of the operator subtask state for report to JM. */ + private final OperatorSubtaskState jobManagerOwnedState; + + /** Secondary replica of the operator subtask state for faster, local recovery on TM. */ + private final OperatorSubtaskState taskLocalState; + + public OperatorSnapshotFinalizer( + @Nonnull OperatorSnapshotFutures snapshotFutures) throws ExecutionException, InterruptedException { + + SnapshotResult<KeyedStateHandle> keyedManaged = + FutureUtil.runIfNotDoneAndGet(snapshotFutures.getKeyedStateManagedFuture()); + + SnapshotResult<KeyedStateHandle> keyedRaw = + FutureUtil.runIfNotDoneAndGet(snapshotFutures.getKeyedStateRawFuture()); + + SnapshotResult<OperatorStateHandle> operatorManaged = + FutureUtil.runIfNotDoneAndGet(snapshotFutures.getOperatorStateManagedFuture()); + + SnapshotResult<OperatorStateHandle> operatorRaw = + FutureUtil.runIfNotDoneAndGet(snapshotFutures.getOperatorStateRawFuture()); + + jobManagerOwnedState = new OperatorSubtaskState( + operatorManaged.getJobManagerOwnedSnapshot(), + operatorRaw.getJobManagerOwnedSnapshot(), + keyedManaged.getJobManagerOwnedSnapshot(), + keyedRaw.getJobManagerOwnedSnapshot() + ); + + taskLocalState = new OperatorSubtaskState( + operatorManaged.getTaskLocalSnapshot(), + operatorRaw.getTaskLocalSnapshot(), + keyedManaged.getTaskLocalSnapshot(), + keyedRaw.getTaskLocalSnapshot() + ); + } + + public OperatorSubtaskState getTaskLocalState() { + return taskLocalState; + } + + public OperatorSubtaskState getJobManagerOwnedState() { + return jobManagerOwnedState; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFutures.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFutures.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFutures.java index bdaf64b..95eb213 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFutures.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFutures.java @@ -18,11 +18,15 @@ package org.apache.flink.streaming.api.operators; +import org.apache.flink.runtime.state.DoneFuture; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; import org.apache.flink.runtime.state.StateUtil; import org.apache.flink.util.ExceptionUtils; +import javax.annotation.Nonnull; + import java.util.concurrent.RunnableFuture; /** @@ -30,55 +34,74 @@ import java.util.concurrent.RunnableFuture; */ public class OperatorSnapshotFutures { - private RunnableFuture<KeyedStateHandle> keyedStateManagedFuture; - private RunnableFuture<KeyedStateHandle> keyedStateRawFuture; - private RunnableFuture<OperatorStateHandle> operatorStateManagedFuture; - private RunnableFuture<OperatorStateHandle> operatorStateRawFuture; + @Nonnull + private RunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateManagedFuture; + + @Nonnull + private RunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateRawFuture; + + @Nonnull + private RunnableFuture<SnapshotResult<OperatorStateHandle>> operatorStateManagedFuture; + + @Nonnull + private RunnableFuture<SnapshotResult<OperatorStateHandle>> operatorStateRawFuture; public OperatorSnapshotFutures() { - this(null, null, null, null); + this( + DoneFuture.of(SnapshotResult.empty()), + DoneFuture.of(SnapshotResult.empty()), + DoneFuture.of(SnapshotResult.empty()), + DoneFuture.of(SnapshotResult.empty())); } public OperatorSnapshotFutures( - RunnableFuture<KeyedStateHandle> keyedStateManagedFuture, - RunnableFuture<KeyedStateHandle> keyedStateRawFuture, - RunnableFuture<OperatorStateHandle> operatorStateManagedFuture, - RunnableFuture<OperatorStateHandle> operatorStateRawFuture) { + @Nonnull RunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateManagedFuture, + @Nonnull RunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateRawFuture, + @Nonnull RunnableFuture<SnapshotResult<OperatorStateHandle>> operatorStateManagedFuture, + @Nonnull RunnableFuture<SnapshotResult<OperatorStateHandle>> operatorStateRawFuture) { this.keyedStateManagedFuture = keyedStateManagedFuture; this.keyedStateRawFuture = keyedStateRawFuture; this.operatorStateManagedFuture = operatorStateManagedFuture; this.operatorStateRawFuture = operatorStateRawFuture; } - public RunnableFuture<KeyedStateHandle> getKeyedStateManagedFuture() { + @Nonnull + public RunnableFuture<SnapshotResult<KeyedStateHandle>> getKeyedStateManagedFuture() { return keyedStateManagedFuture; } - public void setKeyedStateManagedFuture(RunnableFuture<KeyedStateHandle> keyedStateManagedFuture) { + public void setKeyedStateManagedFuture( + @Nonnull RunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateManagedFuture) { this.keyedStateManagedFuture = keyedStateManagedFuture; } - public RunnableFuture<KeyedStateHandle> getKeyedStateRawFuture() { + @Nonnull + public RunnableFuture<SnapshotResult<KeyedStateHandle>> getKeyedStateRawFuture() { return keyedStateRawFuture; } - public void setKeyedStateRawFuture(RunnableFuture<KeyedStateHandle> keyedStateRawFuture) { + public void setKeyedStateRawFuture( + @Nonnull RunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateRawFuture) { this.keyedStateRawFuture = keyedStateRawFuture; } - public RunnableFuture<OperatorStateHandle> getOperatorStateManagedFuture() { + @Nonnull + public RunnableFuture<SnapshotResult<OperatorStateHandle>> getOperatorStateManagedFuture() { return operatorStateManagedFuture; } - public void setOperatorStateManagedFuture(RunnableFuture<OperatorStateHandle> operatorStateManagedFuture) { + public void setOperatorStateManagedFuture( + @Nonnull RunnableFuture<SnapshotResult<OperatorStateHandle>> operatorStateManagedFuture) { this.operatorStateManagedFuture = operatorStateManagedFuture; } - public RunnableFuture<OperatorStateHandle> getOperatorStateRawFuture() { + @Nonnull + public RunnableFuture<SnapshotResult<OperatorStateHandle>> getOperatorStateRawFuture() { return operatorStateRawFuture; } - public void setOperatorStateRawFuture(RunnableFuture<OperatorStateHandle> operatorStateRawFuture) { + public void setOperatorStateRawFuture( + @Nonnull RunnableFuture<SnapshotResult<OperatorStateHandle>> operatorStateRawFuture) { this.operatorStateRawFuture = operatorStateRawFuture; } @@ -119,16 +142,4 @@ public class OperatorSnapshotFutures { throw exception; } } - - public boolean hasKeyedState() { - return keyedStateManagedFuture != null || keyedStateRawFuture != null; - } - - public boolean hasOperatorState() { - return operatorStateManagedFuture != null || operatorStateRawFuture != null; - } - - public boolean hasState() { - return hasKeyedState() || hasOperatorState(); - } } http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java index c3254f6..71f508b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.util.Disposable; import java.io.Serializable; @@ -45,7 +46,7 @@ import java.io.Serializable; * @param <OUT> The output type of the operator */ @PublicEvolving -public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable { +public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Disposable, Serializable { // ------------------------------------------------------------------------ // life cycle @@ -85,6 +86,7 @@ public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Ser * <p>This method is expected to make a thorough effort to release all resources * that the operator has acquired. */ + @Override void dispose() throws Exception; // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java index 75ead44..420b6bf 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java @@ -40,7 +40,6 @@ public interface StreamOperatorStateContext { */ OperatorStateBackend operatorStateBackend(); - /** * Returns the keyed state backend for the stream operator. This method returns null for non-keyed operators. */ http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java index 9887e45..11e2dda 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java @@ -23,7 +23,8 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.FSDataInputStream; -import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.StateObjectCollection; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; @@ -116,10 +117,8 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize final String operatorIdentifierText = operatorSubtaskDescription.toString(); - final OperatorSubtaskState operatorSubtaskStateFromJobManager = - taskStateManager.operatorStates(operatorID); - - final boolean restoring = (operatorSubtaskStateFromJobManager != null); + final PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates = + taskStateManager.prioritizedOperatorState(operatorID); AbstractKeyedStateBackend<?> keyedStatedBackend = null; OperatorStateBackend operatorStateBackend = null; @@ -133,20 +132,22 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize keyedStatedBackend = keyedStatedBackend( keySerializer, operatorIdentifierText, - operatorSubtaskStateFromJobManager, + prioritizedOperatorSubtaskStates, streamTaskCloseableRegistry); // -------------- Operator State Backend -------------- operatorStateBackend = operatorStateBackend( operatorIdentifierText, - operatorSubtaskStateFromJobManager, + prioritizedOperatorSubtaskStates, streamTaskCloseableRegistry); // -------------- Raw State Streams -------------- - rawKeyedStateInputs = rawKeyedStateInputs(operatorSubtaskStateFromJobManager); + rawKeyedStateInputs = rawKeyedStateInputs( + prioritizedOperatorSubtaskStates.getPrioritizedRawKeyedState()); streamTaskCloseableRegistry.registerCloseable(rawKeyedStateInputs); - rawOperatorStateInputs = rawOperatorStateInputs(operatorSubtaskStateFromJobManager); + rawOperatorStateInputs = rawOperatorStateInputs( + prioritizedOperatorSubtaskStates.getPrioritizedRawOperatorState()); streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs); // -------------- Internal Timer Service Manager -------------- @@ -155,7 +156,7 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize // -------------- Preparing return value -------------- return new StreamOperatorStateContextImpl( - restoring, + prioritizedOperatorSubtaskStates.isRestored(), operatorStateBackend, keyedStatedBackend, timeServiceManager, @@ -222,155 +223,114 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize protected OperatorStateBackend operatorStateBackend( String operatorIdentifierText, - OperatorSubtaskState operatorSubtaskStateFromJobManager, + PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates, CloseableRegistry backendCloseableRegistry) throws Exception { - //TODO search in local state for a local recovery opportunity. + BackendRestorerProcedure<OperatorStateBackend, OperatorStateHandle> backendRestorer = + new BackendRestorerProcedure<>( + () -> stateBackend.createOperatorStateBackend(environment, operatorIdentifierText), + backendCloseableRegistry); - return createOperatorStateBackendFromJobManagerState( - operatorIdentifierText, - operatorSubtaskStateFromJobManager, - backendCloseableRegistry); + return backendRestorer.createAndRestore(prioritizedOperatorSubtaskStates.getPrioritizedManagedOperatorState()); } protected <K> AbstractKeyedStateBackend<K> keyedStatedBackend( TypeSerializer<K> keySerializer, String operatorIdentifierText, - OperatorSubtaskState operatorSubtaskStateFromJobManager, + PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates, CloseableRegistry backendCloseableRegistry) throws Exception { if (keySerializer == null) { return null; } - //TODO search in local state for a local recovery opportunity. + TaskInfo taskInfo = environment.getTaskInfo(); + + final KeyGroupRange keyGroupRange = KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex( + taskInfo.getMaxNumberOfParallelSubtasks(), + taskInfo.getNumberOfParallelSubtasks(), + taskInfo.getIndexOfThisSubtask()); - return createKeyedStatedBackendFromJobManagerState( - keySerializer, - operatorIdentifierText, - operatorSubtaskStateFromJobManager, - backendCloseableRegistry); + BackendRestorerProcedure<AbstractKeyedStateBackend<K>, KeyedStateHandle> backendRestorer = + new BackendRestorerProcedure<>( + () -> stateBackend.createKeyedStateBackend( + environment, + environment.getJobID(), + operatorIdentifierText, + keySerializer, + taskInfo.getMaxNumberOfParallelSubtasks(), + keyGroupRange, + environment.getTaskKvStateRegistry()), + backendCloseableRegistry); + + return backendRestorer.createAndRestore(prioritizedOperatorSubtaskStates.getPrioritizedManagedKeyedState()); } protected CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs( - OperatorSubtaskState operatorSubtaskStateFromJobManager) { + Iterator<StateObjectCollection<OperatorStateHandle>> restoreStateAlternatives) { - if (operatorSubtaskStateFromJobManager != null) { + if (restoreStateAlternatives.hasNext()) { final CloseableRegistry closeableRegistry = new CloseableRegistry(); - Collection<OperatorStateHandle> rawOperatorState = - operatorSubtaskStateFromJobManager.getRawOperatorState(); - - return new CloseableIterable<StatePartitionStreamProvider>() { - @Override - public void close() throws IOException { - closeableRegistry.close(); - } - - @Nonnull - @Override - public Iterator<StatePartitionStreamProvider> iterator() { - return new OperatorStateStreamIterator( - DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME, - rawOperatorState.iterator(), closeableRegistry); - } - }; - } - - return CloseableIterable.empty(); - } - - protected CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs( - OperatorSubtaskState operatorSubtaskStateFromJobManager) { + Collection<OperatorStateHandle> rawOperatorState = restoreStateAlternatives.next(); + // TODO currently this does not support local state recovery, so we expect there is only one handle. + Preconditions.checkState( + !restoreStateAlternatives.hasNext(), + "Local recovery is currently not implemented for raw operator state, but found state alternative."); - if (operatorSubtaskStateFromJobManager != null) { + if (rawOperatorState != null) { - Collection<KeyedStateHandle> rawKeyedState = operatorSubtaskStateFromJobManager.getRawKeyedState(); - Collection<KeyGroupsStateHandle> keyGroupsStateHandles = transform(rawKeyedState); - final CloseableRegistry closeableRegistry = new CloseableRegistry(); - - return new CloseableIterable<KeyGroupStatePartitionStreamProvider>() { - @Override - public void close() throws IOException { - closeableRegistry.close(); - } + return new CloseableIterable<StatePartitionStreamProvider>() { + @Override + public void close() throws IOException { + closeableRegistry.close(); + } - @Override - public Iterator<KeyGroupStatePartitionStreamProvider> iterator() { - return new KeyGroupStreamIterator(keyGroupsStateHandles.iterator(), closeableRegistry); - } - }; + @Nonnull + @Override + public Iterator<StatePartitionStreamProvider> iterator() { + return new OperatorStateStreamIterator( + DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME, + rawOperatorState.iterator(), closeableRegistry); + } + }; + } } return CloseableIterable.empty(); } - // ================================================================================================================= - - private OperatorStateBackend createOperatorStateBackendFromJobManagerState( - String operatorIdentifierText, - OperatorSubtaskState operatorSubtaskStateFromJobManager, - CloseableRegistry backendCloseableRegistry) throws Exception { - - final OperatorStateBackend operatorStateBackend = - stateBackend.createOperatorStateBackend(environment, operatorIdentifierText); - - backendCloseableRegistry.registerCloseable(operatorStateBackend); - - Collection<OperatorStateHandle> managedOperatorState = null; - - if (operatorSubtaskStateFromJobManager != null) { - managedOperatorState = operatorSubtaskStateFromJobManager.getManagedOperatorState(); - } - - operatorStateBackend.restore(managedOperatorState); - - return operatorStateBackend; - } + protected CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs( + Iterator<StateObjectCollection<KeyedStateHandle>> restoreStateAlternatives) { - private <K> AbstractKeyedStateBackend<K> createKeyedStatedBackendFromJobManagerState( - TypeSerializer<K> keySerializer, - String operatorIdentifierText, - OperatorSubtaskState operatorSubtaskStateFromJobManager, - CloseableRegistry backendCloseableRegistry) throws Exception { + if (restoreStateAlternatives.hasNext()) { + Collection<KeyedStateHandle> rawKeyedState = restoreStateAlternatives.next(); - final AbstractKeyedStateBackend<K> keyedStateBackend = createKeyedStateBackend( - operatorIdentifierText, - keySerializer); + // TODO currently this does not support local state recovery, so we expect there is only one handle. + Preconditions.checkState( + !restoreStateAlternatives.hasNext(), + "Local recovery is currently not implemented for raw keyed state, but found state alternative."); - backendCloseableRegistry.registerCloseable(keyedStateBackend); + if (rawKeyedState != null) { + Collection<KeyGroupsStateHandle> keyGroupsStateHandles = transform(rawKeyedState); + final CloseableRegistry closeableRegistry = new CloseableRegistry(); - Collection<KeyedStateHandle> managedKeyedState = null; + return new CloseableIterable<KeyGroupStatePartitionStreamProvider>() { + @Override + public void close() throws IOException { + closeableRegistry.close(); + } - if (operatorSubtaskStateFromJobManager != null) { - managedKeyedState = operatorSubtaskStateFromJobManager.getManagedKeyedState(); + @Override + public Iterator<KeyGroupStatePartitionStreamProvider> iterator() { + return new KeyGroupStreamIterator(keyGroupsStateHandles.iterator(), closeableRegistry); + } + }; + } } - keyedStateBackend.restore(managedKeyedState); - - return keyedStateBackend; - } - - private <K> AbstractKeyedStateBackend<K> createKeyedStateBackend( - String operatorIdentifier, - TypeSerializer<K> keySerializer) throws Exception { - - TaskInfo taskInfo = environment.getTaskInfo(); - - final KeyGroupRange keyGroupRange = KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex( - taskInfo.getMaxNumberOfParallelSubtasks(), - taskInfo.getNumberOfParallelSubtasks(), - taskInfo.getIndexOfThisSubtask()); - - return stateBackend.createKeyedStateBackend( - environment, - environment.getJobID(), - operatorIdentifier, - keySerializer, - taskInfo.getMaxNumberOfParallelSubtasks(), //TODO check: this is numberOfKeyGroups !!!! - keyGroupRange, - environment.getTaskKvStateRegistry()); + return CloseableIterable.empty(); } // ================================================================================================================= http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 06cb18b..dba4c87 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -27,7 +27,6 @@ import org.apache.flink.core.fs.FileSystemSafetyNet; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.CheckpointOptions; -import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.execution.Environment; @@ -46,6 +45,7 @@ import org.apache.flink.runtime.util.OperatorSubtaskDescriptionText; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.graph.StreamEdge; +import org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer; import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamOperator; @@ -58,7 +58,6 @@ import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; import org.apache.flink.util.ExceptionUtils; -import org.apache.flink.util.FutureUtil; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; @@ -800,7 +799,11 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> // ------------------------------------------------------------------------ - private static final class AsyncCheckpointRunnable implements Runnable, Closeable { + /** + * This runnable executes the asynchronous parts of all involved backend snapshots for the subtask. + */ + @VisibleForTesting + protected static final class AsyncCheckpointRunnable implements Runnable, Closeable { private final StreamTask<?, ?> owner; @@ -831,11 +834,12 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> @Override public void run() { FileSystemSafetyNet.initializeSafetyNetForThread(); - final long checkpointId = checkpointMetaData.getCheckpointId(); try { - boolean hasState = false; - final TaskStateSnapshot taskOperatorSubtaskStates = + TaskStateSnapshot jobManagerTaskOperatorSubtaskStates = + new TaskStateSnapshot(operatorSnapshotsInProgress.size()); + + TaskStateSnapshot localTaskOperatorSubtaskStates = new TaskStateSnapshot(operatorSnapshotsInProgress.size()); for (Map.Entry<OperatorID, OperatorSnapshotFutures> entry : operatorSnapshotsInProgress.entrySet()) { @@ -843,15 +847,17 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> OperatorID operatorID = entry.getKey(); OperatorSnapshotFutures snapshotInProgress = entry.getValue(); - OperatorSubtaskState operatorSubtaskState = new OperatorSubtaskState( - FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateManagedFuture()), - FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateRawFuture()), - FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getKeyedStateManagedFuture()), - FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getKeyedStateRawFuture()) - ); + // finalize the async part of all by executing all snapshot runnables + OperatorSnapshotFinalizer finalizedSnapshots = + new OperatorSnapshotFinalizer(snapshotInProgress); - hasState |= operatorSubtaskState.hasState(); - taskOperatorSubtaskStates.putSubtaskStateByOperatorID(operatorID, operatorSubtaskState); + jobManagerTaskOperatorSubtaskStates.putSubtaskStateByOperatorID( + operatorID, + finalizedSnapshots.getJobManagerOwnedState()); + + localTaskOperatorSubtaskStates.putSubtaskStateByOperatorID( + operatorID, + finalizedSnapshots.getTaskLocalState()); } final long asyncEndNanos = System.nanoTime(); @@ -862,23 +868,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING, CheckpointingOperation.AsynCheckpointState.COMPLETED)) { - TaskStateSnapshot acknowledgedState = hasState ? taskOperatorSubtaskStates : null; - - TaskStateManager taskStateManager = owner.getEnvironment().getTaskStateManager(); - - // we signal stateless tasks by reporting null, so that there are no attempts to assign empty state - // to stateless tasks on restore. This enables simple job modifications that only concern - // stateless without the need to assign them uids to match their (always empty) states. - taskStateManager.reportStateHandles( - checkpointMetaData, - checkpointMetrics, - acknowledgedState); - - LOG.debug("{} - finished asynchronous part of checkpoint {}. Asynchronous duration: {} ms", - owner.getName(), checkpointMetaData.getCheckpointId(), asyncDurationMillis); - - LOG.trace("{} - reported the following states in snapshot for checkpoint {}: {}.", - owner.getName(), checkpointMetaData.getCheckpointId(), acknowledgedState); + reportCompletedSnapshotStates( + jobManagerTaskOperatorSubtaskStates, + localTaskOperatorSubtaskStates, + asyncDurationMillis); } else { LOG.debug("{} - asynchronous part of checkpoint {} could not be completed because it was closed before.", @@ -886,32 +879,66 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> checkpointMetaData.getCheckpointId()); } } catch (Exception e) { - // the state is completed if an exception occurred in the acknowledgeCheckpoint call - // in order to clean up, we have to set it to RUNNING again. - asyncCheckpointState.compareAndSet( - CheckpointingOperation.AsynCheckpointState.COMPLETED, - CheckpointingOperation.AsynCheckpointState.RUNNING); - - try { - cleanup(); - } catch (Exception cleanupException) { - e.addSuppressed(cleanupException); - } - - Exception checkpointException = new Exception( - "Could not materialize checkpoint " + checkpointId + " for operator " + - owner.getName() + '.', - e); - - owner.asynchronousCheckpointExceptionHandler.tryHandleCheckpointException( - checkpointMetaData, - checkpointException); + handleExecutionException(e); } finally { owner.cancelables.unregisterCloseable(this); FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread(); } } + private void reportCompletedSnapshotStates( + TaskStateSnapshot acknowledgedTaskStateSnapshot, + TaskStateSnapshot localTaskStateSnapshot, + long asyncDurationMillis) { + + TaskStateManager taskStateManager = owner.getEnvironment().getTaskStateManager(); + + boolean hasAckState = acknowledgedTaskStateSnapshot.hasState(); + boolean hasLocalState = localTaskStateSnapshot.hasState(); + + Preconditions.checkState(hasAckState || !hasLocalState, + "Found cached state but no corresponding primary state is reported to the job " + + "manager. This indicates a problem."); + + // we signal stateless tasks by reporting null, so that there are no attempts to assign empty state + // to stateless tasks on restore. This enables simple job modifications that only concern + // stateless without the need to assign them uids to match their (always empty) states. + taskStateManager.reportTaskStateSnapshots( + checkpointMetaData, + checkpointMetrics, + hasAckState ? acknowledgedTaskStateSnapshot : null, + hasLocalState ? localTaskStateSnapshot : null); + + LOG.debug("{} - finished asynchronous part of checkpoint {}. Asynchronous duration: {} ms", + owner.getName(), checkpointMetaData.getCheckpointId(), asyncDurationMillis); + + LOG.trace("{} - reported the following states in snapshot for checkpoint {}: {}.", + owner.getName(), checkpointMetaData.getCheckpointId(), acknowledgedTaskStateSnapshot); + } + + private void handleExecutionException(Exception e) { + // the state is completed if an exception occurred in the acknowledgeCheckpoint call + // in order to clean up, we have to set it to RUNNING again. + asyncCheckpointState.compareAndSet( + CheckpointingOperation.AsynCheckpointState.COMPLETED, + CheckpointingOperation.AsynCheckpointState.RUNNING); + + try { + cleanup(); + } catch (Exception cleanupException) { + e.addSuppressed(cleanupException); + } + + Exception checkpointException = new Exception( + "Could not materialize checkpoint " + checkpointMetaData.getCheckpointId() + " for operator " + + owner.getName() + '.', + e); + + owner.asynchronousCheckpointExceptionHandler.tryHandleCheckpointException( + checkpointMetaData, + checkpointException); + } + @Override public void close() { try { http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java index 85069b5..904ff64 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; @@ -569,8 +570,8 @@ public class AbstractStreamOperatorTest { final CloseableRegistry closeableRegistry = new CloseableRegistry(); - RunnableFuture<KeyedStateHandle> futureKeyedStateHandle = mock(RunnableFuture.class); - RunnableFuture<OperatorStateHandle> futureOperatorStateHandle = mock(RunnableFuture.class); + RunnableFuture<SnapshotResult<KeyedStateHandle>> futureKeyedStateHandle = mock(RunnableFuture.class); + RunnableFuture<SnapshotResult<OperatorStateHandle>> futureOperatorStateHandle = mock(RunnableFuture.class); StateSnapshotContextSynchronousImpl context = mock(StateSnapshotContextSynchronousImpl.class); when(context.getKeyedStateStreamFuture()).thenReturn(futureKeyedStateHandle); @@ -593,7 +594,7 @@ public class AbstractStreamOperatorTest { doReturn(containingTask).when(operator).getContainingTask(); - RunnableFuture<OperatorStateHandle> futureManagedOperatorStateHandle = mock(RunnableFuture.class); + RunnableFuture<SnapshotResult<OperatorStateHandle>> futureManagedOperatorStateHandle = mock(RunnableFuture.class); OperatorStateBackend operatorStateBackend = mock(OperatorStateBackend.class); when(operatorStateBackend.snapshot( http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedureTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedureTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedureTest.java new file mode 100644 index 0000000..2126f70 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedureTest.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.StateObjectCollection; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.DefaultOperatorStateBackend; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory; +import org.apache.flink.runtime.util.BlockingFSDataInputStream; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.SupplierWithException; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.atomic.AtomicReference; + +import static org.mockito.Mockito.verify; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.spy; +import static org.powermock.api.mockito.PowerMockito.verifyZeroInteractions; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Tests for {@link BackendRestorerProcedure}. + */ +public class BackendRestorerProcedureTest extends TestLogger { + + private final SupplierWithException<OperatorStateBackend, Exception> backendSupplier = + () -> new DefaultOperatorStateBackend( + getClass().getClassLoader(), + new ExecutionConfig(), + true); + + /** + * Tests that the restore procedure follows the order of the iterator and will retries failed attempts if there are + * more options. + */ + @Test + public void testRestoreProcedureOrderAndFailure() throws Exception { + + CloseableRegistry closeableRegistry = new CloseableRegistry(); + CheckpointStreamFactory checkpointStreamFactory = new MemCheckpointStreamFactory(1024); + + ListStateDescriptor<Integer> stateDescriptor = new ListStateDescriptor<>("test-state", Integer.class); + OperatorStateBackend originalBackend = backendSupplier.get(); + SnapshotResult<OperatorStateHandle> snapshotResult; + + try { + ListState<Integer> listState = originalBackend.getListState(stateDescriptor); + + listState.add(0); + listState.add(1); + listState.add(2); + listState.add(3); + + RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot = + originalBackend.snapshot(0L, 0L, checkpointStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()); + + snapshot.run(); + snapshotResult = snapshot.get(); + + } finally { + originalBackend.close(); + originalBackend.dispose(); + } + + OperatorStateHandle firstFailHandle = mock(OperatorStateHandle.class); + OperatorStateHandle secondSuccessHandle = spy(snapshotResult.getJobManagerOwnedSnapshot()); + OperatorStateHandle thirdNotUsedHandle = mock(OperatorStateHandle.class); + + List<StateObjectCollection<OperatorStateHandle>> sortedRestoreOptions = Arrays.asList( + new StateObjectCollection<>(Collections.singletonList(firstFailHandle)), + new StateObjectCollection<>(Collections.singletonList(secondSuccessHandle)), + new StateObjectCollection<>(Collections.singletonList(thirdNotUsedHandle))); + Iterator<StateObjectCollection<OperatorStateHandle>> iterator = sortedRestoreOptions.iterator(); + + BackendRestorerProcedure<OperatorStateBackend, OperatorStateHandle> restorerProcedure = + new BackendRestorerProcedure<>(backendSupplier, closeableRegistry); + + OperatorStateBackend restoredBackend = restorerProcedure.createAndRestore(iterator); + Assert.assertNotNull(restoredBackend); + + try { + Assert.assertTrue(iterator.hasNext()); + Assert.assertTrue(thirdNotUsedHandle == iterator.next().iterator().next()); + verify(firstFailHandle).openInputStream(); + verify(secondSuccessHandle).openInputStream(); + verifyZeroInteractions(thirdNotUsedHandle); + Assert.assertFalse(iterator.hasNext()); + + ListState<Integer> listState = restoredBackend.getListState(stateDescriptor); + + Iterator<Integer> stateIterator = listState.get().iterator(); + Assert.assertEquals(0, (int) stateIterator.next()); + Assert.assertEquals(1, (int) stateIterator.next()); + Assert.assertEquals(2, (int) stateIterator.next()); + Assert.assertEquals(3, (int) stateIterator.next()); + Assert.assertFalse(stateIterator.hasNext()); + + } finally { + restoredBackend.close(); + restoredBackend.dispose(); + } + } + + /** + * Tests if there is an exception if all restore attempts are exhausted and failed. + */ + @Test + public void testExceptionThrownIfAllRestoresFailed() throws Exception { + + CloseableRegistry closeableRegistry = new CloseableRegistry(); + + OperatorStateHandle firstFailHandle = mock(OperatorStateHandle.class); + OperatorStateHandle secondFailHandle = mock(OperatorStateHandle.class); + OperatorStateHandle thirdFailHandle = mock(OperatorStateHandle.class); + + List<StateObjectCollection<OperatorStateHandle>> sortedRestoreOptions = Arrays.asList( + new StateObjectCollection<>(Collections.singletonList(firstFailHandle)), + new StateObjectCollection<>(Collections.singletonList(secondFailHandle)), + new StateObjectCollection<>(Collections.singletonList(thirdFailHandle))); + Iterator<StateObjectCollection<OperatorStateHandle>> iterator = sortedRestoreOptions.iterator(); + + BackendRestorerProcedure<OperatorStateBackend, OperatorStateHandle> restorerProcedure = + new BackendRestorerProcedure<>(backendSupplier, closeableRegistry); + + try { + restorerProcedure.createAndRestore(iterator); + Assert.fail(); + } catch (Exception ignore) { + } + + verify(firstFailHandle).openInputStream(); + verify(secondFailHandle).openInputStream(); + verify(thirdFailHandle).openInputStream(); + Assert.assertFalse(iterator.hasNext()); + } + + /** + * Test that the restore can be stopped via the provided closeable registry. + */ + @Test + public void testCanBeCanceledViaRegistry() throws Exception { + CloseableRegistry closeableRegistry = new CloseableRegistry(); + OneShotLatch waitForBlock = new OneShotLatch(); + OneShotLatch unblock = new OneShotLatch(); + OperatorStateHandle blockingRestoreHandle = mock(OperatorStateHandle.class); + when(blockingRestoreHandle.openInputStream()).thenReturn(new BlockingFSDataInputStream(waitForBlock, unblock)); + + List<StateObjectCollection<OperatorStateHandle>> sortedRestoreOptions = + Collections.singletonList(new StateObjectCollection<>(Collections.singletonList(blockingRestoreHandle))); + + BackendRestorerProcedure<OperatorStateBackend, OperatorStateHandle> restorerProcedure = + new BackendRestorerProcedure<>(backendSupplier, closeableRegistry); + + AtomicReference<Exception> exceptionReference = new AtomicReference<>(null); + Thread restoreThread = new Thread(() -> { + try { + restorerProcedure.createAndRestore(sortedRestoreOptions.iterator()); + } catch (Exception e) { + exceptionReference.set(e); + } + }); + + restoreThread.start(); + waitForBlock.await(); + closeableRegistry.close(); + unblock.trigger(); + restoreThread.join(); + + Exception exception = exceptionReference.get(); + Assert.assertTrue(exception instanceof FlinkException); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizerTest.java new file mode 100644 index 0000000..173b49c --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizerTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.StateHandleDummyUtil; +import org.apache.flink.runtime.checkpoint.StateObjectCollection; +import org.apache.flink.runtime.state.DoneFuture; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.util.TestLogger; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Random; + +/** + * Tests for {@link OperatorSnapshotFinalizer}. + */ +public class OperatorSnapshotFinalizerTest extends TestLogger { + + /** + * Test that the runnable futures are executed and the result is correctly extracted. + */ + @Test + public void testRunAndExtract() throws Exception{ + + Random random = new Random(0x42); + + KeyedStateHandle keyedTemplate = + StateHandleDummyUtil.createNewKeyedStateHandle(new KeyGroupRange(0, 0)); + OperatorStateHandle operatorTemplate = + StateHandleDummyUtil.createNewOperatorStateHandle(2, random); + + SnapshotResult<KeyedStateHandle> snapKeyMan = SnapshotResult.withLocalState( + StateHandleDummyUtil.deepDummyCopy(keyedTemplate), + StateHandleDummyUtil.deepDummyCopy(keyedTemplate)); + + SnapshotResult<KeyedStateHandle> snapKeyRaw = SnapshotResult.withLocalState( + StateHandleDummyUtil.deepDummyCopy(keyedTemplate), + StateHandleDummyUtil.deepDummyCopy(keyedTemplate)); + + SnapshotResult<OperatorStateHandle> snapOpMan = SnapshotResult.withLocalState( + StateHandleDummyUtil.deepDummyCopy(operatorTemplate), + StateHandleDummyUtil.deepDummyCopy(operatorTemplate)); + + SnapshotResult<OperatorStateHandle> snapOpRaw = SnapshotResult.withLocalState( + StateHandleDummyUtil.deepDummyCopy(operatorTemplate), + StateHandleDummyUtil.deepDummyCopy(operatorTemplate)); + + DoneFuture<SnapshotResult<KeyedStateHandle>> managedKeyed = new PseudoNotDoneFuture<>(snapKeyMan); + DoneFuture<SnapshotResult<KeyedStateHandle>> rawKeyed = new PseudoNotDoneFuture<>(snapKeyRaw); + DoneFuture<SnapshotResult<OperatorStateHandle>> managedOp = new PseudoNotDoneFuture<>(snapOpMan); + DoneFuture<SnapshotResult<OperatorStateHandle>> rawOp = new PseudoNotDoneFuture<>(snapOpRaw); + + Assert.assertFalse(managedKeyed.isDone()); + Assert.assertFalse(rawKeyed.isDone()); + Assert.assertFalse(managedOp.isDone()); + Assert.assertFalse(rawOp.isDone()); + + OperatorSnapshotFutures futures = new OperatorSnapshotFutures(managedKeyed, rawKeyed, managedOp, rawOp); + OperatorSnapshotFinalizer operatorSnapshotFinalizer = new OperatorSnapshotFinalizer(futures); + + Assert.assertTrue(managedKeyed.isDone()); + Assert.assertTrue(rawKeyed.isDone()); + Assert.assertTrue(managedOp.isDone()); + Assert.assertTrue(rawOp.isDone()); + + OperatorSubtaskState jobManagerOwnedState = operatorSnapshotFinalizer.getJobManagerOwnedState(); + Assert.assertTrue(checkResult(snapKeyMan.getJobManagerOwnedSnapshot(), jobManagerOwnedState.getManagedKeyedState())); + Assert.assertTrue(checkResult(snapKeyRaw.getJobManagerOwnedSnapshot(), jobManagerOwnedState.getRawKeyedState())); + Assert.assertTrue(checkResult(snapOpMan.getJobManagerOwnedSnapshot(), jobManagerOwnedState.getManagedOperatorState())); + Assert.assertTrue(checkResult(snapOpRaw.getJobManagerOwnedSnapshot(), jobManagerOwnedState.getRawOperatorState())); + + OperatorSubtaskState taskLocalState = operatorSnapshotFinalizer.getTaskLocalState(); + Assert.assertTrue(checkResult(snapKeyMan.getTaskLocalSnapshot(), taskLocalState.getManagedKeyedState())); + Assert.assertTrue(checkResult(snapKeyRaw.getTaskLocalSnapshot(), taskLocalState.getRawKeyedState())); + Assert.assertTrue(checkResult(snapOpMan.getTaskLocalSnapshot(), taskLocalState.getManagedOperatorState())); + Assert.assertTrue(checkResult(snapOpRaw.getTaskLocalSnapshot(), taskLocalState.getRawOperatorState())); + } + + private <T extends StateObject> boolean checkResult(T expected, StateObjectCollection<T> actual) { + if (expected == null) { + return actual.isEmpty(); + } + + return actual.size() == 1 && expected == actual.iterator().next(); + } + + static class PseudoNotDoneFuture<T> extends DoneFuture<T> { + + private boolean done; + + PseudoNotDoneFuture(T payload) { + super(payload); + this.done = false; + } + + @Override + public void run() { + super.run(); + this.done = true; + } + + @Override + public boolean isDone() { + return done; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFuturesTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFuturesTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFuturesTest.java index 6da39af..4122a71 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFuturesTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFuturesTest.java @@ -18,8 +18,11 @@ package org.apache.flink.streaming.api.operators; +import org.apache.flink.runtime.state.DoneFuture; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.OperatorStreamStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -28,7 +31,7 @@ import java.util.concurrent.RunnableFuture; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; -import static org.powermock.api.mockito.PowerMockito.when; +import static org.powermock.api.mockito.PowerMockito.spy; /** * Tests for {@link OperatorSnapshotFutures}. @@ -46,20 +49,28 @@ public class OperatorSnapshotFuturesTest extends TestLogger { operatorSnapshotResult.cancel(); KeyedStateHandle keyedManagedStateHandle = mock(KeyedStateHandle.class); - RunnableFuture<KeyedStateHandle> keyedStateManagedFuture = mock(RunnableFuture.class); - when(keyedStateManagedFuture.get()).thenReturn(keyedManagedStateHandle); + SnapshotResult<KeyedStateHandle> keyedStateManagedResult = + SnapshotResult.of(keyedManagedStateHandle); + RunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateManagedFuture = + spy(DoneFuture.of(keyedStateManagedResult)); KeyedStateHandle keyedRawStateHandle = mock(KeyedStateHandle.class); - RunnableFuture<KeyedStateHandle> keyedStateRawFuture = mock(RunnableFuture.class); - when(keyedStateRawFuture.get()).thenReturn(keyedRawStateHandle); - - OperatorStateHandle operatorManagedStateHandle = mock(OperatorStateHandle.class); - RunnableFuture<OperatorStateHandle> operatorStateManagedFuture = mock(RunnableFuture.class); - when(operatorStateManagedFuture.get()).thenReturn(operatorManagedStateHandle); - - OperatorStateHandle operatorRawStateHandle = mock(OperatorStateHandle.class); - RunnableFuture<OperatorStateHandle> operatorStateRawFuture = mock(RunnableFuture.class); - when(operatorStateRawFuture.get()).thenReturn(operatorRawStateHandle); + SnapshotResult<KeyedStateHandle> keyedStateRawResult = + SnapshotResult.of(keyedRawStateHandle); + RunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateRawFuture = + spy(DoneFuture.of(keyedStateRawResult)); + + OperatorStateHandle operatorManagedStateHandle = mock(OperatorStreamStateHandle.class); + SnapshotResult<OperatorStateHandle> operatorStateManagedResult = + SnapshotResult.of(operatorManagedStateHandle); + RunnableFuture<SnapshotResult<OperatorStateHandle>> operatorStateManagedFuture = + spy(DoneFuture.of(operatorStateManagedResult)); + + OperatorStateHandle operatorRawStateHandle = mock(OperatorStreamStateHandle.class); + SnapshotResult<OperatorStateHandle> operatorStateRawResult = + SnapshotResult.of(operatorRawStateHandle); + RunnableFuture<SnapshotResult<OperatorStateHandle>> operatorStateRawFuture = + spy(DoneFuture.of(operatorStateRawResult)); operatorSnapshotResult = new OperatorSnapshotFutures( keyedStateManagedFuture, http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java index c5e5df8..90649f2 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java @@ -30,6 +30,7 @@ import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.StateObjectCollection; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.jobgraph.OperatorID; @@ -42,12 +43,13 @@ import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.OperatorStreamStateHandle; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.StateInitializationContextImpl; import org.apache.flink.runtime.state.StatePartitionStreamProvider; -import org.apache.flink.runtime.state.TaskLocalStateStore; import org.apache.flink.runtime.state.TaskStateManager; import org.apache.flink.runtime.state.TaskStateManagerImpl; +import org.apache.flink.runtime.state.TestTaskLocalStateStore; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.runtime.taskmanager.CheckpointResponder; @@ -61,7 +63,6 @@ import org.junit.Test; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -135,15 +136,15 @@ public class StateInitializationContextImplTest { DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME, new OperatorStateHandle.StateMetaInfo(offsets.toArray(), OperatorStateHandle.Mode.SPLIT_DISTRIBUTE)); OperatorStateHandle operatorStateHandle = - new OperatorStateHandle(offsetsMap, new ByteStateHandleCloseChecking("os-" + i, out.toByteArray())); + new OperatorStreamStateHandle(offsetsMap, new ByteStateHandleCloseChecking("os-" + i, out.toByteArray())); operatorStateHandles.add(operatorStateHandle); } OperatorSubtaskState operatorSubtaskState = new OperatorSubtaskState( - Collections.emptyList(), - operatorStateHandles, - Collections.emptyList(), - keyedStateHandles); + StateObjectCollection.empty(), + new StateObjectCollection<>(operatorStateHandles), + StateObjectCollection.empty(), + new StateObjectCollection<>(keyedStateHandles)); OperatorID operatorID = new OperatorID(); TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot(); @@ -154,7 +155,7 @@ public class StateInitializationContextImplTest { TaskStateManager manager = new TaskStateManagerImpl( new JobID(), new ExecutionAttemptID(), - mock(TaskLocalStateStore.class), + new TestTaskLocalStateStore(), jobManagerTaskRestore, mock(CheckpointResponder.class));
