Repository: flink Updated Branches: refs/heads/master ccc3e44cb -> 61d69a229
[FLINK-3948] Protect RocksDB cleanup by cleanup lock Before, it could happen that an asynchronous checkpoint was going on when trying to do cleanup. Now we protect cleanup and asynchronous checkpointing by a lock. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/61d69a22 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/61d69a22 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/61d69a22 Branch: refs/heads/master Commit: 61d69a229b40e52460f26804e4a36cf12e150004 Parents: cfffdc8 Author: Aljoscha Krettek <[email protected]> Authored: Fri May 20 22:37:14 2016 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Mon Jun 6 09:29:33 2016 +0200 ---------------------------------------------------------------------- .../flink-statebackend-rocksdb/pom.xml | 2 +- .../streaming/state/RocksDBStateBackend.java | 110 ++++++++++++------- 2 files changed, 70 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/61d69a22/flink-contrib/flink-statebackend-rocksdb/pom.xml ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/pom.xml b/flink-contrib/flink-statebackend-rocksdb/pom.xml index cccdc20..115752c 100644 --- a/flink-contrib/flink-statebackend-rocksdb/pom.xml +++ b/flink-contrib/flink-statebackend-rocksdb/pom.xml @@ -50,7 +50,7 @@ under the License. <dependency> <groupId>org.rocksdb</groupId> <artifactId>rocksdbjni</artifactId> - <version>4.1.0</version> + <version>4.5.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> http://git-wip-us.apache.org/repos/asf/flink/blob/61d69a22/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index 4b7d7ee..4c44249 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -155,7 +155,14 @@ public class RocksDBStateBackend extends AbstractStateBackend { * to store state. The different k/v states that we have don't each have their own RocksDB * instance. They all write to this instance but to their own column family. */ - protected transient RocksDB db; + protected volatile transient RocksDB db; + + /** + * Lock for protecting cleanup of the RocksDB db. We acquire this when doing asynchronous + * checkpoints and when disposing the db. Otherwise, the asynchronous snapshot might try + * iterating over a disposed db. + */ + private Object dbCleanupLock; /** * Information about the k/v states as we create them. This is used to retrieve the @@ -282,6 +289,8 @@ public class RocksDBStateBackend extends AbstractStateBackend { throw new RuntimeException("Error cleaning RocksDB data directory.", e); } + dbCleanupLock = new Object(); + List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(1); // RocksDB seems to need this... columnFamilyDescriptors.add(new ColumnFamilyDescriptor("default".getBytes())); @@ -305,28 +314,44 @@ public class RocksDBStateBackend extends AbstractStateBackend { super.dispose(); nonPartitionedStateBackend.dispose(); - if (this.dbOptions != null) { - this.dbOptions.dispose(); - this.dbOptions = null; - } - for (Tuple2<ColumnFamilyHandle, StateDescriptor> column: kvStateInformation.values()) { - column.f0.dispose(); + // we have to lock because we might have an asynchronous checkpoint going on + synchronized (dbCleanupLock) { + if (db != null) { + if (this.dbOptions != null) { + this.dbOptions.dispose(); + this.dbOptions = null; + } + + for (Tuple2<ColumnFamilyHandle, StateDescriptor> column : kvStateInformation.values()) { + column.f0.dispose(); + } + + db.dispose(); + db = null; + } } - db.dispose(); } @Override public void close() throws Exception { nonPartitionedStateBackend.close(); - - if (this.dbOptions != null) { - this.dbOptions.dispose(); - this.dbOptions = null; - } - for (Tuple2<ColumnFamilyHandle, StateDescriptor> column: kvStateInformation.values()) { - column.f0.dispose(); + + // we have to lock because we might have an asynchronous checkpoint going on + synchronized (dbCleanupLock) { + if (db != null) { + if (this.dbOptions != null) { + this.dbOptions.dispose(); + this.dbOptions = null; + } + + for (Tuple2<ColumnFamilyHandle, StateDescriptor> column : kvStateInformation.values()) { + column.f0.dispose(); + } + + db.dispose(); + db = null; + } } - db.dispose(); } private File getDbPath(String stateName) { @@ -441,8 +466,7 @@ public class RocksDBStateBackend extends AbstractStateBackend { // draw a copy in case it get's changed while performing the async snapshot Map<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> columnFamiliesCopy = new HashMap<>(); columnFamiliesCopy.putAll(kvStateInformation); - FullyAsynSnapshot dummySnapshot = new FullyAsynSnapshot(db, - snapshot, + FullyAsyncSnapshot dummySnapshot = new FullyAsyncSnapshot(snapshot, this, backupUri, columnFamiliesCopy, @@ -672,10 +696,9 @@ public class RocksDBStateBackend extends AbstractStateBackend { * This does the snapshot using a RocksDB snapshot and an iterator over all keys * at the point of that snapshot. */ - private static class FullyAsynSnapshot extends AsynchronousKvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> { + private class FullyAsyncSnapshot extends AsynchronousKvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> { private static final long serialVersionUID = 1L; - private transient final RocksDB db; private transient org.rocksdb.Snapshot snapshot; private transient AbstractStateBackend backend; @@ -683,13 +706,11 @@ public class RocksDBStateBackend extends AbstractStateBackend { private final Map<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> columnFamilies; private final long checkpointId; - private FullyAsynSnapshot(RocksDB db, - org.rocksdb.Snapshot snapshot, + private FullyAsyncSnapshot(org.rocksdb.Snapshot snapshot, AbstractStateBackend backend, URI backupUri, Map<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> columnFamilies, long checkpointId) { - this.db = db; this.snapshot = snapshot; this.backend = backend; this.backupUri = backupUri; @@ -723,17 +744,27 @@ public class RocksDBStateBackend extends AbstractStateBackend { count++; } + ReadOptions readOptions = new ReadOptions(); + readOptions.setSnapshot(snapshot); + for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> column: columnFamilies.entrySet()) { byte columnByte = columnFamilyMapping.get(column.getKey()); - ReadOptions readOptions = new ReadOptions(); - readOptions.setSnapshot(snapshot); - RocksIterator iterator = db.newIterator(column.getValue().f0, readOptions); - iterator.seekToFirst(); - while (iterator.isValid()) { - outputView.writeByte(columnByte); - BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.key(), outputView); - BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.value(), outputView); - iterator.next(); + + synchronized (dbCleanupLock) { + if (db == null) { + throw new RuntimeException("RocksDB instance was disposed. This happens " + + "when we are in the middle of a checkpoint and the job fails."); + } + RocksIterator iterator = db.newIterator(column.getValue().f0, readOptions); + iterator.seekToFirst(); + while (iterator.isValid()) { + outputView.writeByte(columnByte); + BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.key(), + outputView); + BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.value(), + outputView); + iterator.next(); + } } } @@ -743,23 +774,20 @@ public class RocksDBStateBackend extends AbstractStateBackend { LOG.info("Fully asynchronous RocksDB materialization to " + backupUri + " (asynchronous part) took " + (endTime - startTime) + " ms."); return new FinalFullyAsyncSnapshot(stateHandle, checkpointId); } finally { - db.releaseSnapshot(snapshot); + synchronized (dbCleanupLock) { + if (db != null) { + db.releaseSnapshot(snapshot); + } + } snapshot = null; } } - @Override - protected void finalize() throws Throwable { - if (snapshot != null) { - db.releaseSnapshot(snapshot); - } - super.finalize(); - } } /** * Dummy {@link KvStateSnapshot} that holds the state of our one RocksDB data base. This - * results from {@link FullyAsynSnapshot}. + * results from {@link FullyAsyncSnapshot}. */ private static class FinalFullyAsyncSnapshot implements KvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> { private static final long serialVersionUID = 1L;
