[FLINK-5146] Improved resource cleanup in RocksDB keyed state backend
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e4f802dd Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e4f802dd Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e4f802dd Branch: refs/heads/master Commit: e4f802dd502f38b922f668c2813728d5511ca289 Parents: 6e98a93 Author: Stefan Richter <[email protected]> Authored: Sat Nov 12 21:13:28 2016 +0100 Committer: Aljoscha Krettek <[email protected]> Committed: Fri Dec 2 18:10:13 2016 +0100 ---------------------------------------------------------------------- .../state/RocksDBKeyedStateBackend.java | 312 ++++++++++------ .../state/RocksDBStateBackendTest.java | 364 ++++++++++++++++++- .../io/async/AbstractAsyncIOCallable.java | 2 +- .../runtime/io/async/AsyncDoneCallback.java | 4 +- .../async/AsyncStoppableTaskWithCallback.java | 8 +- .../memory/MemCheckpointStreamFactory.java | 2 +- .../api/operators/OperatorSnapshotResult.java | 9 +- .../streaming/runtime/tasks/StreamTask.java | 67 ++-- .../flink/core/testutils/OneShotLatch.java | 9 + 9 files changed, 624 insertions(+), 153 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e4f802dd/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index e498b34..bc5b17d 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -61,7 +61,7 @@ import org.rocksdb.Snapshot; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.concurrent.GuardedBy; +import java.io.Closeable; import java.io.File; import java.io.IOException; import java.util.ArrayList; @@ -103,14 +103,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { * asynchronous checkpoints and when disposing the DB. Otherwise, the asynchronous snapshot might try * iterating over a disposed DB. After aquriring the lock, always first check if (db == null). */ - private final SerializableObject dbDisposeLock = new SerializableObject(); + private final SerializableObject asyncSnapshotLock = new SerializableObject(); /** * Our RocksDB data base, this is used by the actual subclasses of {@link AbstractRocksDBState} * to store state. The different k/v states that we have don't each have their own RocksDB * instance. They all write to this instance but to their own column family. */ - @GuardedBy("dbDisposeLock") protected RocksDB db; /** @@ -136,7 +135,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { ) throws Exception { super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange); - this.operatorIdentifier = operatorIdentifier; this.jobId = jobId; this.columnOptions = columnFamilyOptions; @@ -206,8 +204,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { LOG.debug("Restoring snapshot from state handles: {}.", restoreState); } - RocksDBRestoreOperation restoreOperation = new RocksDBRestoreOperation(this); - restoreOperation.doRestore(restoreState); + try { + RocksDBRestoreOperation restoreOperation = new RocksDBRestoreOperation(this); + restoreOperation.doRestore(restoreState); + } catch (Exception ex) { + dispose(); + throw ex; + } } /** @@ -217,23 +220,22 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { public void dispose() { super.dispose(); - final RocksDB cleanupRockDBReference; - - // Acquire the log on dbDisposeLock, so that no ongoing snapshots access the db during cleanup - synchronized (dbDisposeLock) { + // Acquire the lock, so that no ongoing snapshots access the db during cleanup + synchronized (asyncSnapshotLock) { // IMPORTANT: null reference to signal potential async checkpoint workers that the db was disposed, as // working on the disposed object results in SEGFAULTS. Other code has to check field #db for null // and access it in a synchronized block that locks on #dbDisposeLock. - cleanupRockDBReference = db; - db = null; - } + if (db != null) { - // Dispose decoupled db - if (cleanupRockDBReference != null) { - for (Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>> column : kvStateInformation.values()) { - column.f0.close(); + for (Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>> column : kvStateInformation.values()) { + column.f0.close(); + } + + kvStateInformation.clear(); + + db.close(); + db = null; } - cleanupRockDBReference.close(); } try { @@ -252,10 +254,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { * is also stopped when the backend is closed through {@link #dispose()}. For each backend, this method must always * be called by the same thread. * - * @param checkpointId The Id of the checkpoint. - * @param timestamp The timestamp of the checkpoint. + * @param checkpointId The Id of the checkpoint. + * @param timestamp The timestamp of the checkpoint. * @param streamFactory The factory that we can use for writing our state to streams. - * * @return Future to the state handle of the snapshot data. * @throws Exception */ @@ -267,14 +268,17 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { long startTime = System.currentTimeMillis(); - if (kvStateInformation.isEmpty()) { - LOG.info("Asynchronous RocksDB snapshot performed on empty keyed state at " + timestamp + " . Returning null."); - return new DoneFuture<>(null); - } - final RocksDBSnapshotOperation snapshotOperation = new RocksDBSnapshotOperation(this, streamFactory); // hold the db lock while operation on the db to guard us against async db disposal - synchronized (dbDisposeLock) { + synchronized (asyncSnapshotLock) { + + if (kvStateInformation.isEmpty()) { + LOG.info("Asynchronous RocksDB snapshot performed on empty keyed state at " + timestamp + + " . Returning null."); + + return new DoneFuture<>(null); + } + if (db != null) { snapshotOperation.takeDBSnapShot(checkpointId, timestamp); } else { @@ -295,18 +299,18 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { @Override public KeyGroupsStateHandle performOperation() throws Exception { long startTime = System.currentTimeMillis(); - try { - // hold the db lock while operation on the db to guard us against async db disposal - synchronized (dbDisposeLock) { - if (db != null) { - snapshotOperation.writeDBSnapshot(); - } else { + synchronized (asyncSnapshotLock) { + try { + // hold the db lock while operation on the db to guard us against async db disposal + if (db == null) { throw new IOException("RocksDB closed."); } - } - } finally { - snapshotOperation.closeCheckpointStream(); + snapshotOperation.writeDBSnapshot(); + + } finally { + snapshotOperation.closeCheckpointStream(); + } } LOG.info("Asynchronous RocksDB snapshot (" + streamFactory + ", asynchronous part) in thread " + @@ -315,15 +319,17 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { return snapshotOperation.getSnapshotResultStateHandle(); } - @Override - public void done() { + private void releaseSnapshotOperationResources(boolean canceled) { // hold the db lock while operation on the db to guard us against async db disposal - synchronized (dbDisposeLock) { - if (db != null) { - snapshotOperation.releaseDBSnapshot(); - } + synchronized (asyncSnapshotLock) { + snapshotOperation.releaseSnapshotResources(canceled); } } + + @Override + public void done(boolean canceled) { + releaseSnapshotOperationResources(canceled); + } }; LOG.info("Asynchronous RocksDB snapshot (" + streamFactory + ", synchronous part) in thread " + @@ -348,14 +354,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private long checkpointTimeStamp; private Snapshot snapshot; + private ReadOptions readOptions; private CheckpointStreamFactory.CheckpointStateOutputStream outStream; private DataOutputView outputView; private List<Tuple2<RocksIterator, Integer>> kvStateIterators; private KeyGroupsStateHandle snapshotResultStateHandle; - - - public RocksDBSnapshotOperation( + RocksDBSnapshotOperation( RocksDBKeyedStateBackend<?> stateBackend, CheckpointStreamFactory checkpointStreamFactory) { @@ -397,7 +402,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { * @throws IOException */ public void writeDBSnapshot() throws IOException, InterruptedException { - Preconditions.checkNotNull(snapshot, "No ongoing snapshot to write."); + + if (null == snapshot) { + throw new IOException("No snapshot available. Might be released due to cancellation."); + } + Preconditions.checkNotNull(outStream, "No output stream to write snapshot."); writeKVStateMetaData(); writeKVStateData(); @@ -412,6 +421,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { if (outStream != null) { stateBackend.cancelStreamRegistry.unregisterClosable(outStream); snapshotResultStateHandle = closeSnapshotStreamAndGetHandle(); + } else { + snapshotResultStateHandle = null; } } @@ -419,13 +430,36 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { * 5) Release the snapshot object for RocksDB and clean up. * */ - public void releaseDBSnapshot() { - Preconditions.checkNotNull(snapshot, "No ongoing snapshot to release."); - stateBackend.db.releaseSnapshot(snapshot); - snapshot = null; - outStream = null; - outputView = null; - kvStateIterators = null; + public void releaseSnapshotResources(boolean canceled) { + if (null != kvStateIterators) { + for (Tuple2<RocksIterator, Integer> kvStateIterator : kvStateIterators) { + kvStateIterator.f0.close(); + } + kvStateIterators = null; + } + + if (null != snapshot) { + if(null != stateBackend.db) { + stateBackend.db.releaseSnapshot(snapshot); + } + snapshot.close(); + snapshot = null; + } + + if (null != readOptions) { + readOptions.close(); + readOptions = null; + } + + if (canceled) { + try { + if (null != snapshotResultStateHandle) { + snapshotResultStateHandle.discardState(); + } + } catch (Exception ignored) { + LOG.warn("Exception occurred during snapshot state handle cleanup: " + ignored); + } + } } /** @@ -462,7 +496,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { InstantiationUtil.serializeObject(outStream, column.getValue().f1); //retrieve iterator for this k/v states - ReadOptions readOptions = new ReadOptions(); + readOptions = new ReadOptions(); readOptions.setSnapshot(snapshot); RocksIterator iterator = stateBackend.db.newIterator(column.getValue().f0, readOptions); kvStateIterators.add(new Tuple2<>(iterator, kvStateId)); @@ -472,59 +506,64 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private void writeKVStateData() throws IOException, InterruptedException { - RocksDBMergeIterator iterator = new RocksDBMergeIterator(kvStateIterators, stateBackend.keyGroupPrefixBytes); - byte[] previousKey = null; byte[] previousValue = null; - //preamble: setup with first key-group as our lookahead - if (iterator.isValid()) { - //begin first key-group by recording the offset - keyGroupRangeOffsets.setKeyGroupOffset(iterator.keyGroup(), outStream.getPos()); - //write the k/v-state id as metadata - //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible - outputView.writeShort(iterator.kvStateId()); - previousKey = iterator.key(); - previousValue = iterator.value(); - iterator.next(); - } + List<Tuple2<RocksIterator, Integer>> kvStateIteratorsHandover = this.kvStateIterators; + this.kvStateIterators = null; - //main loop: write k/v pairs ordered by (key-group, kv-state), thereby tracking key-group offsets. - while (iterator.isValid()) { + // Here we transfer ownership of RocksIterators to the RocksDBMergeIterator + try (RocksDBMergeIterator mergeIterator = new RocksDBMergeIterator( + kvStateIteratorsHandover, stateBackend.keyGroupPrefixBytes)) { - assert (!hasMetaDataFollowsFlag(previousKey)); + //preamble: setup with first key-group as our lookahead + if (mergeIterator.isValid()) { + //begin first key-group by recording the offset + keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), outStream.getPos()); + //write the k/v-state id as metadata + //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible + outputView.writeShort(mergeIterator.kvStateId()); + previousKey = mergeIterator.key(); + previousValue = mergeIterator.value(); + mergeIterator.next(); + } - //set signal in first key byte that meta data will follow in the stream after this k/v pair - if (iterator.isNewKeyGroup() || iterator.isNewKeyValueState()) { + //main loop: write k/v pairs ordered by (key-group, kv-state), thereby tracking key-group offsets. + while (mergeIterator.isValid()) { - //be cooperative and check for interruption from time to time in the hot loop - checkInterrupted(); + assert (!hasMetaDataFollowsFlag(previousKey)); - setMetaDataFollowsFlagInKey(previousKey); - } + //set signal in first key byte that meta data will follow in the stream after this k/v pair + if (mergeIterator.isNewKeyGroup() || mergeIterator.isNewKeyValueState()) { - writeKeyValuePair(previousKey, previousValue); + //be cooperative and check for interruption from time to time in the hot loop + checkInterrupted(); - //write meta data if we have to - if (iterator.isNewKeyGroup()) { - // - //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible - outputView.writeShort(END_OF_KEY_GROUP_MARK); - //begin new key-group - keyGroupRangeOffsets.setKeyGroupOffset(iterator.keyGroup(), outStream.getPos()); - //write the kev-state - //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible - outputView.writeShort(iterator.kvStateId()); - } else if (iterator.isNewKeyValueState()) { - //write the k/v-state - //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible - outputView.writeShort(iterator.kvStateId()); - } + setMetaDataFollowsFlagInKey(previousKey); + } - //request next k/v pair - previousKey = iterator.key(); - previousValue = iterator.value(); - iterator.next(); + writeKeyValuePair(previousKey, previousValue); + + //write meta data if we have to + if (mergeIterator.isNewKeyGroup()) { + //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible + outputView.writeShort(END_OF_KEY_GROUP_MARK); + //begin new key-group + keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), outStream.getPos()); + //write the kev-state + //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible + outputView.writeShort(mergeIterator.kvStateId()); + } else if (mergeIterator.isNewKeyValueState()) { + //write the k/v-state + //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible + outputView.writeShort(mergeIterator.kvStateId()); + } + + //request next k/v pair + previousKey = mergeIterator.key(); + previousValue = mergeIterator.value(); + mergeIterator.next(); + } } //epilogue: write last key-group @@ -540,11 +579,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private KeyGroupsStateHandle closeSnapshotStreamAndGetHandle() throws IOException { StreamStateHandle stateHandle = outStream.closeAndGetHandle(); outStream = null; - if (stateHandle != null) { - return new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle); - } else { - throw new IOException("Output stream returned null on close."); - } + return stateHandle != null ? new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle) : null; } private void writeKeyValuePair(byte[] key, byte[] value) throws IOException { @@ -566,7 +601,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private static void checkInterrupted() throws InterruptedException { if(Thread.currentThread().isInterrupted()) { - throw new InterruptedException("Snapshot canceled."); + throw new InterruptedException("RocksDB snapshot interrupted."); } } } @@ -655,7 +690,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { //restore the empty columns for the k/v states through the metadata for (int i = 0; i < numColumns; i++) { - StateDescriptor<?, ?> stateDescriptor = (StateDescriptor<?, ?>) InstantiationUtil.deserializeObject( + StateDescriptor<?, ?> stateDescriptor = InstantiationUtil.deserializeObject( currentStateHandleInStream, rocksDBKeyedStateBackend.userCodeClassLoader); @@ -829,13 +864,17 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { public int getKvStateId() { return kvStateId; } + + public void close() { + this.iterator.close(); + } } /** * Iterator that merges multiple RocksDB iterators to partition all states into contiguous key-groups. * The resulting iteration sequence is ordered by (key-group, kv-state). */ - static final class RocksDBMergeIterator { + static final class RocksDBMergeIterator implements Closeable { private final PriorityQueue<MergeIterator> heap; private final int keyGroupPrefixByteCount; @@ -845,18 +884,29 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private MergeIterator currentSubIterator; - RocksDBMergeIterator(List<Tuple2<RocksIterator, Integer>> kvStateIterators, final int keyGroupPrefixByteCount) throws IOException { + private static final List<Comparator<MergeIterator>> COMPARATORS; + + static { + int maxBytes = 4; + COMPARATORS = new ArrayList<>(maxBytes); + for (int i = 0; i < maxBytes; ++i) { + final int currentBytes = i; + COMPARATORS.add(new Comparator<MergeIterator>() { + @Override + public int compare(MergeIterator o1, MergeIterator o2) { + int arrayCmpRes = compareKeyGroupsForByteArrays( + o1.currentKey, o2.currentKey, currentBytes); + return arrayCmpRes == 0 ? o1.getKvStateId() - o2.getKvStateId() : arrayCmpRes; + } + }); + } + } + + RocksDBMergeIterator(List<Tuple2<RocksIterator, Integer>> kvStateIterators, final int keyGroupPrefixByteCount) { Preconditions.checkNotNull(kvStateIterators); this.keyGroupPrefixByteCount = keyGroupPrefixByteCount; - Comparator<MergeIterator> iteratorComparator = new Comparator<MergeIterator>() { - @Override - public int compare(MergeIterator o1, MergeIterator o2) { - int arrayCmpRes = compareKeyGroupsForByteArrays( - o1.currentKey, o2.currentKey, keyGroupPrefixByteCount); - return arrayCmpRes == 0 ? o1.getKvStateId() - o2.getKvStateId() : arrayCmpRes; - } - }; + Comparator<MergeIterator> iteratorComparator = COMPARATORS.get(keyGroupPrefixByteCount); if (kvStateIterators.size() > 0) { this.heap = new PriorityQueue<>(kvStateIterators.size(), iteratorComparator); @@ -866,8 +916,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { rocksIterator.seekToFirst(); if (rocksIterator.isValid()) { heap.offer(new MergeIterator(rocksIterator, rocksIteratorWithKVStateId.f1)); + } else { + rocksIterator.close(); } } + + kvStateIterators.clear(); + this.valid = !heap.isEmpty(); this.currentSubIterator = heap.poll(); } else { @@ -901,14 +956,18 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { newKVState = currentSubIterator.getIterator() != rocksIterator; detectNewKeyGroup(oldKey); } - } else if (heap.isEmpty()) { - valid = false; } else { - currentSubIterator = heap.poll(); - newKVState = true; - detectNewKeyGroup(oldKey); - } + rocksIterator.close(); + if (heap.isEmpty()) { + currentSubIterator = null; + valid = false; + } else { + currentSubIterator = heap.poll(); + newKVState = true; + detectNewKeyGroup(oldKey); + } + } } private boolean isDifferentKeyGroup(byte[] a, byte[] b) { @@ -986,6 +1045,21 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } return 0; } + + @Override + public void close() { + + if (null != currentSubIterator) { + currentSubIterator.close(); + currentSubIterator = null; + } + + for (MergeIterator iterator : heap) { + iterator.close(); + } + + heap.clear(); + } } /** http://git-wip-us.apache.org/repos/asf/flink/blob/e4f802dd/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java index 9d25434..314717b 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java @@ -18,26 +18,72 @@ package org.apache.flink.contrib.streaming.state; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.StateBackendTestBase; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory; import org.apache.flink.util.OperatingSystem; import org.junit.Assume; import org.junit.Before; import org.junit.Rule; +import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksIterator; +import org.rocksdb.RocksObject; +import org.rocksdb.Snapshot; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.RunnableFuture; + +import static junit.framework.TestCase.assertNotNull; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.verify; +import static org.mockito.internal.verification.VerificationModeFactory.times; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.spy; /** * Tests for the partitioned state part of {@link RocksDBStateBackend}. */ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBackend> { + private OneShotLatch blocker; + private OneShotLatch waiter; + private BlockerCheckpointStreamFactory testStreamFactory; + private RocksDBKeyedStateBackend<Integer> keyedStateBackend; + private List<RocksObject> allCreatedCloseables; + private ValueState<Integer> testState1; + private ValueState<String> testState2; + @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); @Before - public void checkOperatingSystem() { + public void checkOS() throws Exception { Assume.assumeTrue("This test can't run successfully on Windows.", !OperatingSystem.isWindows()); } @@ -49,4 +95,320 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa backend.setDbStoragePath(dbPath); return backend; } + + public void setupRocksKeyedStateBackend() throws Exception { + + blocker = new OneShotLatch(); + waiter = new OneShotLatch(); + testStreamFactory = new BlockerCheckpointStreamFactory(1024 * 1024); + testStreamFactory.setBlockerLatch(blocker); + testStreamFactory.setWaiterLatch(waiter); + testStreamFactory.setAfterNumberInvocations(100); + + RocksDBStateBackend backend = getStateBackend(); + Environment env = new DummyEnvironment("TestTask", 1, 0); + + keyedStateBackend = (RocksDBKeyedStateBackend<Integer>) backend.createKeyedStateBackend( + env, + new JobID(), + "Test", + IntSerializer.INSTANCE, + 2, + new KeyGroupRange(0, 1), + mock(TaskKvStateRegistry.class)); + + testState1 = keyedStateBackend.getPartitionedState( + VoidNamespace.INSTANCE, + VoidNamespaceSerializer.INSTANCE, + new ValueStateDescriptor<>("TestState-1", Integer.class, 0)); + + testState2 = keyedStateBackend.getPartitionedState( + VoidNamespace.INSTANCE, + VoidNamespaceSerializer.INSTANCE, + new ValueStateDescriptor<>("TestState-2", String.class, "")); + + allCreatedCloseables = new ArrayList<>(); + + keyedStateBackend.db = spy(keyedStateBackend.db); + + doAnswer(new Answer<Object>() { + + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + RocksIterator rocksIterator = spy((RocksIterator) invocationOnMock.callRealMethod()); + allCreatedCloseables.add(rocksIterator); + return rocksIterator; + } + }).when(keyedStateBackend.db).newIterator(any(ColumnFamilyHandle.class), any(ReadOptions.class)); + + doAnswer(new Answer<Object>() { + + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + Snapshot snapshot = spy((Snapshot) invocationOnMock.callRealMethod()); + allCreatedCloseables.add(snapshot); + return snapshot; + } + }).when(keyedStateBackend.db).getSnapshot(); + + doAnswer(new Answer<Object>() { + + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + ColumnFamilyHandle snapshot = spy((ColumnFamilyHandle) invocationOnMock.callRealMethod()); + allCreatedCloseables.add(snapshot); + return snapshot; + } + }).when(keyedStateBackend.db).createColumnFamily(any(ColumnFamilyDescriptor.class)); + + for (int i = 0; i < 100; ++i) { + keyedStateBackend.setCurrentKey(i); + testState1.update(4200 + i); + testState2.update("S-" + (4200 + i)); + } + } + + @Test + public void testRunningSnapshotAfterBackendClosed() throws Exception { + setupRocksKeyedStateBackend(); + RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory); + + RocksDB spyDB = keyedStateBackend.db; + + verify(spyDB, times(1)).getSnapshot(); + verify(spyDB, times(0)).releaseSnapshot(any(Snapshot.class)); + + this.keyedStateBackend.dispose(); + verify(spyDB, times(1)).close(); + assertEquals(null, keyedStateBackend.db); + + //Ensure every RocksObjects not closed yet + for (RocksObject rocksCloseable : allCreatedCloseables) { + verify(rocksCloseable, times(0)).close(); + } + + Thread asyncSnapshotThread = new Thread(snapshot); + asyncSnapshotThread.start(); + try { + snapshot.get(); + fail(); + } catch (Exception ignored) { + + } + + asyncSnapshotThread.join(); + + //Ensure every RocksObject was closed exactly once + for (RocksObject rocksCloseable : allCreatedCloseables) { + verify(rocksCloseable, times(1)).close(); + } + + } + + @Test + public void testReleasingSnapshotAfterBackendClosed() throws Exception { + setupRocksKeyedStateBackend(); + RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory); + + RocksDB spyDB = keyedStateBackend.db; + + verify(spyDB, times(1)).getSnapshot(); + verify(spyDB, times(0)).releaseSnapshot(any(Snapshot.class)); + + this.keyedStateBackend.dispose(); + verify(spyDB, times(1)).close(); + assertEquals(null, keyedStateBackend.db); + + //Ensure every RocksObjects not closed yet + for (RocksObject rocksCloseable : allCreatedCloseables) { + verify(rocksCloseable, times(0)).close(); + } + + snapshot.cancel(true); + + //Ensure every RocksObjects was closed exactly once + for (RocksObject rocksCloseable : allCreatedCloseables) { + verify(rocksCloseable, times(1)).close(); + } + + } + + @Test + public void testDismissingSnapshot() throws Exception { + setupRocksKeyedStateBackend(); + RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory); + snapshot.cancel(true); + verifyRocksObjectsReleased(); + } + + @Test + public void testDismissingSnapshotNotRunnable() throws Exception { + setupRocksKeyedStateBackend(); + RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory); + snapshot.cancel(true); + Thread asyncSnapshotThread = new Thread(snapshot); + asyncSnapshotThread.start(); + try { + snapshot.get(); + fail(); + } catch (Exception ignored) { + + } + asyncSnapshotThread.join(); + verifyRocksObjectsReleased(); + } + + @Test + public void testCompletingSnapshot() throws Exception { + setupRocksKeyedStateBackend(); + RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory); + Thread asyncSnapshotThread = new Thread(snapshot); + asyncSnapshotThread.start(); + waiter.await(); // wait for snapshot to run + waiter.reset(); + runStateUpdates(); + blocker.trigger(); // allow checkpointing to start writing + waiter.await(); // wait for snapshot stream writing to run + KeyGroupsStateHandle keyGroupsStateHandle = snapshot.get(); + assertNotNull(keyGroupsStateHandle); + assertTrue(keyGroupsStateHandle.getStateSize() > 0); + assertEquals(2, keyGroupsStateHandle.getNumberOfKeyGroups()); + assertTrue(testStreamFactory.getLastCreatedStream().isClosed()); + asyncSnapshotThread.join(); + verifyRocksObjectsReleased(); + } + + @Test + public void testCancelRunningSnapshot() throws Exception { + setupRocksKeyedStateBackend(); + RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory); + Thread asyncSnapshotThread = new Thread(snapshot); + asyncSnapshotThread.start(); + waiter.await(); // wait for snapshot to run + waiter.reset(); + runStateUpdates(); + blocker.trigger(); // allow checkpointing to start writing + snapshot.cancel(true); + assertTrue(testStreamFactory.getLastCreatedStream().isClosed()); + waiter.await(); // wait for snapshot stream writing to run + try { + snapshot.get(); + fail(); + } catch (Exception ignored) { + } + + verifyRocksObjectsReleased(); + asyncSnapshotThread.join(); + } + + private void runStateUpdates() throws Exception{ + for (int i = 50; i < 150; ++i) { + if (i % 10 == 0) { + Thread.sleep(1); + } + keyedStateBackend.setCurrentKey(i); + testState1.update(4200 + i); + testState2.update("S-" + (4200 + i)); + } + } + + private void verifyRocksObjectsReleased() { + //Ensure every RocksObject was closed exactly once + for (RocksObject rocksCloseable : allCreatedCloseables) { + verify(rocksCloseable, times(1)).close(); + } + + assertNotNull(null, keyedStateBackend.db); + RocksDB spyDB = keyedStateBackend.db; + + verify(spyDB, times(1)).getSnapshot(); + verify(spyDB, times(1)).releaseSnapshot(any(Snapshot.class)); + + keyedStateBackend.dispose(); + verify(spyDB, times(1)).close(); + assertEquals(null, keyedStateBackend.db); + } + + static class BlockerCheckpointStreamFactory implements CheckpointStreamFactory { + + private final int maxSize; + private int afterNumberInvocations; + private OneShotLatch blocker; + private OneShotLatch waiter; + + MemCheckpointStreamFactory.MemoryCheckpointOutputStream lastCreatedStream; + + public MemCheckpointStreamFactory.MemoryCheckpointOutputStream getLastCreatedStream() { + return lastCreatedStream; + } + + public BlockerCheckpointStreamFactory(int maxSize) { + this.maxSize = maxSize; + } + + public void setAfterNumberInvocations(int afterNumberInvocations) { + this.afterNumberInvocations = afterNumberInvocations; + } + + public void setBlockerLatch(OneShotLatch latch) { + this.blocker = latch; + } + + public void setWaiterLatch(OneShotLatch latch) { + this.waiter = latch; + } + + @Override + public MemCheckpointStreamFactory.MemoryCheckpointOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception { + waiter.trigger(); + this.lastCreatedStream = new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize) { + + private int afterNInvocations = afterNumberInvocations; + private final OneShotLatch streamBlocker = blocker; + private final OneShotLatch streamWaiter = waiter; + + @Override + public void write(int b) throws IOException { + + if (afterNInvocations > 0) { + --afterNInvocations; + } + + if (0 == afterNInvocations && null != streamBlocker) { + try { + streamBlocker.await(); + } catch (InterruptedException ignored) { + } + } + try { + super.write(b); + } catch (IOException ex) { + if (null != streamWaiter) { + streamWaiter.trigger(); + } + throw ex; + } + + if (0 == afterNInvocations && null != streamWaiter) { + streamWaiter.trigger(); + } + } + + @Override + public void close() { + super.close(); + if (null != streamWaiter) { + streamWaiter.trigger(); + } + } + }; + + return lastCreatedStream; + } + + @Override + public void close() throws Exception { + + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/e4f802dd/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncIOCallable.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncIOCallable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncIOCallable.java index 989e868..1968d40 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncIOCallable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncIOCallable.java @@ -131,7 +131,7 @@ public abstract class AbstractAsyncIOCallable<V, D extends Closeable> implements * it finished or was stopped. */ @Override - public void done() { + public void done(boolean canceled) { //optional callback hook } http://git-wip-us.apache.org/repos/asf/flink/blob/e4f802dd/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncDoneCallback.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncDoneCallback.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncDoneCallback.java index 13d9057..dcc5525 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncDoneCallback.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncDoneCallback.java @@ -25,7 +25,9 @@ public interface AsyncDoneCallback { /** * the callback + * + * @param canceled true if the callback is done, but was canceled */ - void done(); + void done(boolean canceled); } http://git-wip-us.apache.org/repos/asf/flink/blob/e4f802dd/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppableTaskWithCallback.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppableTaskWithCallback.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppableTaskWithCallback.java index 8316e4f..1ca109c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppableTaskWithCallback.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppableTaskWithCallback.java @@ -36,17 +36,13 @@ public class AsyncStoppableTaskWithCallback<V> extends FutureTask<V> { @Override public boolean cancel(boolean mayInterruptIfRunning) { - - if (mayInterruptIfRunning) { - stoppableCallbackCallable.stop(); - } - + stoppableCallbackCallable.stop(); return super.cancel(mayInterruptIfRunning); } @Override protected void done() { - stoppableCallbackCallable.done(); + stoppableCallbackCallable.done(isCancelled()); } public static <V> AsyncStoppableTaskWithCallback<V> from(StoppableCallbackCallable<V> callable) { http://git-wip-us.apache.org/repos/asf/flink/blob/e4f802dd/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java index 30de638..9b2b46f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java @@ -144,7 +144,7 @@ public class MemCheckpointStreamFactory implements CheckpointStreamFactory { return bytes; } else { - throw new IllegalStateException("stream has already been closed"); + throw new IOException("stream has already been closed"); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/e4f802dd/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java index 52c89f8..4265edc 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java @@ -78,4 +78,11 @@ public class OperatorSnapshotResult { public void setOperatorStateRawFuture(RunnableFuture<OperatorStateHandle> operatorStateRawFuture) { this.operatorStateRawFuture = operatorStateRawFuture; } -} \ No newline at end of file + + public void cancel() { + getKeyedStateManagedFuture().cancel(true); + getOperatorStateManagedFuture().cancel(true); + getKeyedStateRawFuture().cancel(true); + getOperatorStateRawFuture().cancel(true); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e4f802dd/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 6595901..fac37c2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -942,9 +942,9 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> @Override public void close() { - //TODO Handle other state futures in case we actually run them. Currently they are just DoneFutures. - if (futureKeyedBackendStateHandles != null) { - futureKeyedBackendStateHandles.cancel(true); + // cleanup/release ongoing snapshot operations + for (OperatorSnapshotResult snapshotResult : snapshotInProgressList) { + snapshotResult.cancel(); } } } @@ -985,35 +985,55 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> startSyncPartNano = System.nanoTime(); - for (StreamOperator<?> op : allOperators) { + boolean failed = true; + try { - createStreamFactory(op); - snapshotNonPartitionableState(op); + for (StreamOperator<?> op : allOperators) { - OperatorSnapshotResult snapshotInProgress = - op.snapshotState(checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), streamFactory); + createStreamFactory(op); + snapshotNonPartitionableState(op); - snapshotInProgressList.add(snapshotInProgress); - } + OperatorSnapshotResult snapshotInProgress = + op.snapshotState(checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), streamFactory); - if (LOG.isDebugEnabled()) { - LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}", - checkpointMetaData.getCheckpointId(), owner.getName()); - } + snapshotInProgressList.add(snapshotInProgress); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}", + checkpointMetaData.getCheckpointId(), owner.getName()); + } + + startAsyncPartNano = System.nanoTime(); - startAsyncPartNano= System.nanoTime(); + checkpointMetaData.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000); - checkpointMetaData.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000); + // at this point we are transferring ownership over snapshotInProgressList for cleanup to the thread + runAsyncCheckpointingAndAcknowledge(); + failed = false; - runAsyncCheckpointingAndAcknowledge(); + if (LOG.isDebugEnabled()) { + LOG.debug("{} - finished synchronous part of checkpoint {}." + + "Alignment duration: {} ms, snapshot duration {} ms", + owner.getName(), checkpointMetaData.getCheckpointId(), + checkpointMetaData.getAlignmentDurationNanos() / 1_000_000, + checkpointMetaData.getSyncDurationMillis()); + } + } finally { + if (failed) { + // Cleanup to release resources + for (OperatorSnapshotResult operatorSnapshotResult : snapshotInProgressList) { + operatorSnapshotResult.cancel(); + } - if (LOG.isDebugEnabled()) { - LOG.debug("{} - finished synchronous part of checkpoint {}." + - "Alignment duration: {} ms, snapshot duration {} ms", - owner.getName(), checkpointMetaData.getCheckpointId(), - checkpointMetaData.getAlignmentDurationNanos() / 1_000_000, - checkpointMetaData.getSyncDurationMillis()); + if (LOG.isDebugEnabled()) { + LOG.debug("{} - did NOT finish synchronous part of checkpoint {}." + + "Alignment duration: {} ms, snapshot duration {} ms", + owner.getName(), checkpointMetaData.getCheckpointId()); + } + } } + } private void createStreamFactory(StreamOperator<?> operator) throws IOException { @@ -1051,6 +1071,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> } public void runAsyncCheckpointingAndAcknowledge() throws IOException { + AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable( owner, nonPartitionedStates, http://git-wip-us.apache.org/repos/asf/flink/blob/e4f802dd/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java index 0418bf5..b3d86e5 100644 --- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java +++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java @@ -107,4 +107,13 @@ public final class OneShotLatch { public boolean isTriggered() { return triggered; } + + /** + * resets the latch to triggered = false + */ + public void reset() { + synchronized (lock) { + triggered = false; + } + } }
