[hotfix] RocksDB improve resource cleanup (disposal order, dispose all 
WriteOptions)

This commit ensures that all WriteOption objects are closed and that we do not 
create unessesary
WriteOption objects for each state.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c2c4f492
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c2c4f492
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c2c4f492

Branch: refs/heads/master
Commit: c2c4f492fd44f6252e33f0b63b865ac07d51b51c
Parents: ca523fd
Author: Stefan Richter <[email protected]>
Authored: Thu Jan 25 23:02:20 2018 +0100
Committer: Stefan Richter <[email protected]>
Committed: Sun Feb 25 15:59:55 2018 +0100

----------------------------------------------------------------------
 .../streaming/state/AbstractRocksDBState.java   |  9 +++------
 .../state/RocksDBAggregatingState.java          | 10 ----------
 .../streaming/state/RocksDBFoldingState.java    | 10 ----------
 .../state/RocksDBKeyedStateBackend.java         | 21 ++++++++++++++++----
 .../streaming/state/RocksDBListState.java       | 10 ----------
 .../streaming/state/RocksDBMapState.java        | 10 ----------
 .../streaming/state/RocksDBReducingState.java   | 10 ----------
 .../streaming/state/RocksDBValueState.java      | 10 ----------
 8 files changed, 20 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c2c4f492/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index 6db0e86..3464355 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -64,12 +64,10 @@ public abstract class AbstractRocksDBState<K, N, S extends 
State, SD extends Sta
        /** State descriptor from which to create this state instance. */
        protected final SD stateDesc;
 
-       /**
-        * We disable writes to the write-ahead-log here.
-        */
-       private final WriteOptions writeOptions;
+       protected final WriteOptions writeOptions;
 
        protected final ByteArrayOutputStreamWithPos keySerializationStream;
+
        protected final DataOutputView keySerializationDataOutputView;
 
        private final boolean ambiguousKeyPossible;
@@ -89,8 +87,7 @@ public abstract class AbstractRocksDBState<K, N, S extends 
State, SD extends Sta
 
                this.columnFamily = columnFamily;
 
-               writeOptions = new WriteOptions();
-               writeOptions.setDisableWAL(true);
+               this.writeOptions = backend.getWriteOptions();
                this.stateDesc = Preconditions.checkNotNull(stateDesc, "State 
Descriptor");
 
                this.keySerializationStream = new 
ByteArrayOutputStreamWithPos(128);

http://git-wip-us.apache.org/repos/asf/flink/blob/c2c4f492/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
index 2c07814..f2d1d86 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
@@ -29,7 +29,6 @@ import 
org.apache.flink.runtime.state.internal.InternalAggregatingState;
 
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDBException;
-import org.rocksdb.WriteOptions;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -54,12 +53,6 @@ public class RocksDBAggregatingState<K, N, T, ACC, R>
        private final AggregateFunction<T, ACC, R> aggFunction;
 
        /**
-        * We disable writes to the write-ahead-log here. We can't have these 
in the base class
-        * because JNI segfaults for some reason if they are.
-        */
-       private final WriteOptions writeOptions;
-
-       /**
         * Creates a new {@code RocksDBFoldingState}.
         *
         * @param namespaceSerializer
@@ -77,9 +70,6 @@ public class RocksDBAggregatingState<K, N, T, ACC, R>
 
                this.valueSerializer = stateDesc.getSerializer();
                this.aggFunction = stateDesc.getAggregateFunction();
-
-               writeOptions = new WriteOptions();
-               writeOptions.setDisableWAL(true);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/c2c4f492/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
index 479565e..d886f44 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
@@ -29,7 +29,6 @@ import 
org.apache.flink.runtime.state.internal.InternalFoldingState;
 
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDBException;
-import org.rocksdb.WriteOptions;
 
 import java.io.IOException;
 
@@ -55,12 +54,6 @@ public class RocksDBFoldingState<K, N, T, ACC>
        private final FoldFunction<T, ACC> foldFunction;
 
        /**
-        * We disable writes to the write-ahead-log here. We can't have these 
in the base class
-        * because JNI segfaults for some reason if they are.
-        */
-       private final WriteOptions writeOptions;
-
-       /**
         * Creates a new {@code RocksDBFoldingState}.
         *
         * @param namespaceSerializer The serializer for the namespace.
@@ -76,9 +69,6 @@ public class RocksDBFoldingState<K, N, T, ACC>
 
                this.valueSerializer = stateDesc.getSerializer();
                this.foldFunction = stateDesc.getFoldFunction();
-
-               writeOptions = new WriteOptions();
-               writeOptions.setDisableWAL(true);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/c2c4f492/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index c02f130..8f95b18 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -102,6 +102,7 @@ import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.RocksIterator;
 import org.rocksdb.Snapshot;
+import org.rocksdb.WriteOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -120,6 +121,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
@@ -191,6 +193,11 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        private ColumnFamilyHandle defaultColumnFamily;
 
        /**
+        * The write options to use in the states. We disable write ahead 
logging.
+        */
+       private final WriteOptions writeOptions;
+
+       /**
         * Information about the k/v states as we create them. This is used to 
retrieve the
         * column family that is used for a state and also for sanity checks 
when restoring.
         */
@@ -266,7 +273,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                this.localRecoveryConfig = 
Preconditions.checkNotNull(localRecoveryConfig);
                this.keyGroupPrefixBytes = getNumberOfKeyGroups() > 
(Byte.MAX_VALUE + 1) ? 2 : 1;
-               this.kvStateInformation = new HashMap<>();
+               this.kvStateInformation = new LinkedHashMap<>();
                this.restoredKvStateMetaInfos = new HashMap<>();
                this.materializedSstFiles = new TreeMap<>();
                this.backendUID = UUID.randomUUID();
@@ -275,6 +282,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        new IncrementalSnapshotStrategy() :
                        new FullSnapshotStrategy();
 
+               this.writeOptions = new WriteOptions().setDisableWAL(true);
+
                LOG.debug("Setting initial keyed backend uid for operator {} to 
{}.", this.operatorIdentifier, this.backendUID);
        }
 
@@ -363,12 +372,12 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        // invalidate the reference
                        db = null;
 
+                       IOUtils.closeQuietly(columnOptions);
+                       IOUtils.closeQuietly(dbOptions);
+                       IOUtils.closeQuietly(writeOptions);
                        kvStateInformation.clear();
                        restoredKvStateMetaInfos.clear();
 
-                       IOUtils.closeQuietly(dbOptions);
-                       IOUtils.closeQuietly(columnOptions);
-
                        cleanInstanceBasePath();
                }
        }
@@ -387,6 +396,10 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                return keyGroupPrefixBytes;
        }
 
+       public WriteOptions getWriteOptions() {
+               return writeOptions;
+       }
+
        /**
         * Triggers an asynchronous snapshot of the keyed state backend from 
RocksDB. This snapshot can be canceled and
         * is also stopped when the backend is closed through {@link 
#dispose()}. For each backend, this method must always

http://git-wip-us.apache.org/repos/asf/flink/blob/c2c4f492/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index f0481ec..62c169b 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -28,7 +28,6 @@ import org.apache.flink.util.Preconditions;
 
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDBException;
-import org.rocksdb.WriteOptions;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -55,12 +54,6 @@ public class RocksDBListState<K, N, V>
        private final TypeSerializer<V> valueSerializer;
 
        /**
-        * We disable writes to the write-ahead-log here. We can't have these 
in the base class
-        * because JNI segfaults for some reason if they are.
-        */
-       private final WriteOptions writeOptions;
-
-       /**
         * Separator of StringAppendTestOperator in RocksDB.
         */
        private static final byte DELIMITER = ',';
@@ -79,9 +72,6 @@ public class RocksDBListState<K, N, V>
 
                super(columnFamily, namespaceSerializer, stateDesc, backend);
                this.valueSerializer = stateDesc.getElementSerializer();
-
-               writeOptions = new WriteOptions();
-               writeOptions.setDisableWAL(true);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/c2c4f492/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index 6b7177b..d1e72c9 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -35,7 +35,6 @@ import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.RocksIterator;
-import org.rocksdb.WriteOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -68,12 +67,6 @@ public class RocksDBMapState<K, N, UK, UV>
        private final TypeSerializer<UK> userKeySerializer;
        private final TypeSerializer<UV> userValueSerializer;
 
-       /**
-        * We disable writes to the write-ahead-log here. We can't have these 
in the base class
-        * because JNI segfaults for some reason if they are.
-        */
-       private final WriteOptions writeOptions;
-
        /** The offset of User Key offset in raw key bytes. */
        private int userKeyOffset;
 
@@ -92,9 +85,6 @@ public class RocksDBMapState<K, N, UK, UV>
 
                this.userKeySerializer = stateDesc.getKeySerializer();
                this.userValueSerializer = stateDesc.getValueSerializer();
-
-               writeOptions = new WriteOptions();
-               writeOptions.setDisableWAL(true);
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/c2c4f492/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
index b4c3f51..2a7f6e0 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
@@ -29,7 +29,6 @@ import 
org.apache.flink.runtime.state.internal.InternalReducingState;
 
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDBException;
-import org.rocksdb.WriteOptions;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -53,12 +52,6 @@ public class RocksDBReducingState<K, N, V>
        private final ReduceFunction<V> reduceFunction;
 
        /**
-        * We disable writes to the write-ahead-log here. We can't have these 
in the base class
-        * because JNI segfaults for some reason if they are.
-        */
-       private final WriteOptions writeOptions;
-
-       /**
         * Creates a new {@code RocksDBReducingState}.
         *
         * @param namespaceSerializer The serializer for the namespace.
@@ -73,9 +66,6 @@ public class RocksDBReducingState<K, N, V>
                super(columnFamily, namespaceSerializer, stateDesc, backend);
                this.valueSerializer = stateDesc.getSerializer();
                this.reduceFunction = stateDesc.getReduceFunction();
-
-               writeOptions = new WriteOptions();
-               writeOptions.setDisableWAL(true);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/c2c4f492/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
index da21e8a..99718be 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
@@ -27,7 +27,6 @@ import 
org.apache.flink.runtime.state.internal.InternalValueState;
 
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDBException;
-import org.rocksdb.WriteOptions;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -47,12 +46,6 @@ public class RocksDBValueState<K, N, V>
        private final TypeSerializer<V> valueSerializer;
 
        /**
-        * We disable writes to the write-ahead-log here. We can't have these 
in the base class
-        * because JNI segfaults for some reason if they are.
-        */
-       private final WriteOptions writeOptions;
-
-       /**
         * Creates a new {@code RocksDBValueState}.
         *
         * @param namespaceSerializer The serializer for the namespace.
@@ -66,9 +59,6 @@ public class RocksDBValueState<K, N, V>
 
                super(columnFamily, namespaceSerializer, stateDesc, backend);
                this.valueSerializer = stateDesc.getSerializer();
-
-               writeOptions = new WriteOptions();
-               writeOptions.setDisableWAL(true);
        }
 
        @Override

Reply via email to