Repository: flink
Updated Branches:
  refs/heads/master ccc3e44cb -> 61d69a229


[FLINK-3948] Protect RocksDB cleanup by cleanup lock

Before, it could happen that an asynchronous checkpoint was going on
when trying to do cleanup. Now we protect cleanup and asynchronous
checkpointing by a lock.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/61d69a22
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/61d69a22
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/61d69a22

Branch: refs/heads/master
Commit: 61d69a229b40e52460f26804e4a36cf12e150004
Parents: cfffdc8
Author: Aljoscha Krettek <[email protected]>
Authored: Fri May 20 22:37:14 2016 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Mon Jun 6 09:29:33 2016 +0200

----------------------------------------------------------------------
 .../flink-statebackend-rocksdb/pom.xml          |   2 +-
 .../streaming/state/RocksDBStateBackend.java    | 110 ++++++++++++-------
 2 files changed, 70 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/61d69a22/flink-contrib/flink-statebackend-rocksdb/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/pom.xml 
b/flink-contrib/flink-statebackend-rocksdb/pom.xml
index cccdc20..115752c 100644
--- a/flink-contrib/flink-statebackend-rocksdb/pom.xml
+++ b/flink-contrib/flink-statebackend-rocksdb/pom.xml
@@ -50,7 +50,7 @@ under the License.
                <dependency>
                        <groupId>org.rocksdb</groupId>
                        <artifactId>rocksdbjni</artifactId>
-                       <version>4.1.0</version>
+                       <version>4.5.1</version>
                </dependency>
                <dependency>
                        <groupId>org.apache.flink</groupId>

http://git-wip-us.apache.org/repos/asf/flink/blob/61d69a22/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 4b7d7ee..4c44249 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -155,7 +155,14 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
         * to store state. The different k/v states that we have don't each 
have their own RocksDB
         * instance. They all write to this instance but to their own column 
family.
         */
-       protected transient RocksDB db;
+       protected volatile transient RocksDB db;
+
+       /**
+        * Lock for protecting cleanup of the RocksDB db. We acquire this when 
doing asynchronous
+        * checkpoints and when disposing the db. Otherwise, the asynchronous 
snapshot might try
+        * iterating over a disposed db.
+        */
+       private Object dbCleanupLock;
 
        /**
         * Information about the k/v states as we create them. This is used to 
retrieve the
@@ -282,6 +289,8 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
                        throw new RuntimeException("Error cleaning RocksDB data 
directory.", e);
                }
 
+               dbCleanupLock = new Object();
+
                List<ColumnFamilyDescriptor> columnFamilyDescriptors = new 
ArrayList<>(1);
                // RocksDB seems to need this...
                columnFamilyDescriptors.add(new 
ColumnFamilyDescriptor("default".getBytes()));
@@ -305,28 +314,44 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
                super.dispose();
                nonPartitionedStateBackend.dispose();
 
-               if (this.dbOptions != null) {
-                       this.dbOptions.dispose();
-                       this.dbOptions = null;
-               }
-               for (Tuple2<ColumnFamilyHandle, StateDescriptor> column: 
kvStateInformation.values()) {
-                       column.f0.dispose();
+               // we have to lock because we might have an asynchronous 
checkpoint going on
+               synchronized (dbCleanupLock) {
+                       if (db != null) {
+                               if (this.dbOptions != null) {
+                                       this.dbOptions.dispose();
+                                       this.dbOptions = null;
+                               }
+
+                               for (Tuple2<ColumnFamilyHandle, 
StateDescriptor> column : kvStateInformation.values()) {
+                                       column.f0.dispose();
+                               }
+
+                               db.dispose();
+                               db = null;
+                       }
                }
-               db.dispose();
        }
 
        @Override
        public void close() throws Exception {
                nonPartitionedStateBackend.close();
-               
-               if (this.dbOptions != null) {
-                       this.dbOptions.dispose();
-                       this.dbOptions = null;
-               }
-               for (Tuple2<ColumnFamilyHandle, StateDescriptor> column: 
kvStateInformation.values()) {
-                       column.f0.dispose();
+
+               // we have to lock because we might have an asynchronous 
checkpoint going on
+               synchronized (dbCleanupLock) {
+                       if (db != null) {
+                               if (this.dbOptions != null) {
+                                       this.dbOptions.dispose();
+                                       this.dbOptions = null;
+                               }
+
+                               for (Tuple2<ColumnFamilyHandle, 
StateDescriptor> column : kvStateInformation.values()) {
+                                       column.f0.dispose();
+                               }
+
+                               db.dispose();
+                               db = null;
+                       }
                }
-               db.dispose();
        }
 
        private File getDbPath(String stateName) {
@@ -441,8 +466,7 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
                // draw a copy in case it get's changed while performing the 
async snapshot
                Map<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> 
columnFamiliesCopy = new HashMap<>();
                columnFamiliesCopy.putAll(kvStateInformation);
-               FullyAsynSnapshot dummySnapshot = new FullyAsynSnapshot(db,
-                               snapshot,
+               FullyAsyncSnapshot dummySnapshot = new 
FullyAsyncSnapshot(snapshot,
                                this,
                                backupUri,
                                columnFamiliesCopy,
@@ -672,10 +696,9 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
         * This does the snapshot using a RocksDB snapshot and an iterator over 
all keys
         * at the point of that snapshot.
         */
-       private static class FullyAsynSnapshot extends 
AsynchronousKvStateSnapshot<Object, Object, ValueState<Object>, 
ValueStateDescriptor<Object>, RocksDBStateBackend> {
+       private class FullyAsyncSnapshot extends 
AsynchronousKvStateSnapshot<Object, Object, ValueState<Object>, 
ValueStateDescriptor<Object>, RocksDBStateBackend> {
                private static final long serialVersionUID = 1L;
 
-               private transient final RocksDB db;
                private transient org.rocksdb.Snapshot snapshot;
                private transient AbstractStateBackend backend;
 
@@ -683,13 +706,11 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
                private final Map<String, Tuple2<ColumnFamilyHandle, 
StateDescriptor>> columnFamilies;
                private final long checkpointId;
 
-               private FullyAsynSnapshot(RocksDB db,
-                               org.rocksdb.Snapshot snapshot,
+               private FullyAsyncSnapshot(org.rocksdb.Snapshot snapshot,
                                AbstractStateBackend backend,
                                URI backupUri,
                                Map<String, Tuple2<ColumnFamilyHandle, 
StateDescriptor>> columnFamilies,
                                long checkpointId) {
-                       this.db = db;
                        this.snapshot = snapshot;
                        this.backend = backend;
                        this.backupUri = backupUri;
@@ -723,17 +744,27 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
                                        count++;
                                }
 
+                               ReadOptions readOptions = new ReadOptions();
+                               readOptions.setSnapshot(snapshot);
+
                                for (Map.Entry<String, 
Tuple2<ColumnFamilyHandle, StateDescriptor>> column: columnFamilies.entrySet()) 
{
                                        byte columnByte = 
columnFamilyMapping.get(column.getKey());
-                                       ReadOptions readOptions = new 
ReadOptions();
-                                       readOptions.setSnapshot(snapshot);
-                                       RocksIterator iterator = 
db.newIterator(column.getValue().f0, readOptions);
-                                       iterator.seekToFirst();
-                                       while (iterator.isValid()) {
-                                               
outputView.writeByte(columnByte);
-                                               
BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.key(), outputView);
-                                               
BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.value(), outputView);
-                                               iterator.next();
+
+                                       synchronized (dbCleanupLock) {
+                                               if (db == null) {
+                                                       throw new 
RuntimeException("RocksDB instance was disposed. This happens " +
+                                                                       "when 
we are in the middle of a checkpoint and the job fails.");
+                                               }
+                                               RocksIterator iterator = 
db.newIterator(column.getValue().f0, readOptions);
+                                               iterator.seekToFirst();
+                                               while (iterator.isValid()) {
+                                                       
outputView.writeByte(columnByte);
+                                                       
BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.key(),
+                                                                       
outputView);
+                                                       
BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.value(),
+                                                                       
outputView);
+                                                       iterator.next();
+                                               }
                                        }
                                }
 
@@ -743,23 +774,20 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
                                LOG.info("Fully asynchronous RocksDB 
materialization to " + backupUri + " (asynchronous part) took " + (endTime - 
startTime) + " ms.");
                                return new FinalFullyAsyncSnapshot(stateHandle, 
checkpointId);
                        } finally {
-                               db.releaseSnapshot(snapshot);
+                               synchronized (dbCleanupLock) {
+                                       if (db != null) {
+                                               db.releaseSnapshot(snapshot);
+                                       }
+                               }
                                snapshot = null;
                        }
                }
 
-               @Override
-               protected void finalize() throws Throwable {
-                       if (snapshot != null) {
-                               db.releaseSnapshot(snapshot);
-                       }
-                       super.finalize();
-               }
        }
 
        /**
         * Dummy {@link KvStateSnapshot} that holds the state of our one 
RocksDB data base. This
-        * results from {@link FullyAsynSnapshot}.
+        * results from {@link FullyAsyncSnapshot}.
         */
        private static class FinalFullyAsyncSnapshot implements 
KvStateSnapshot<Object, Object, ValueState<Object>, 
ValueStateDescriptor<Object>, RocksDBStateBackend> {
                private static final long serialVersionUID = 1L;

Reply via email to