[hotfix] RocksDB improve resource cleanup (disposal order, dispose all WriteOptions)
This commit ensures that all WriteOption objects are closed and that we do not create unessesary WriteOption objects for each state. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c2c4f492 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c2c4f492 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c2c4f492 Branch: refs/heads/master Commit: c2c4f492fd44f6252e33f0b63b865ac07d51b51c Parents: ca523fd Author: Stefan Richter <[email protected]> Authored: Thu Jan 25 23:02:20 2018 +0100 Committer: Stefan Richter <[email protected]> Committed: Sun Feb 25 15:59:55 2018 +0100 ---------------------------------------------------------------------- .../streaming/state/AbstractRocksDBState.java | 9 +++------ .../state/RocksDBAggregatingState.java | 10 ---------- .../streaming/state/RocksDBFoldingState.java | 10 ---------- .../state/RocksDBKeyedStateBackend.java | 21 ++++++++++++++++---- .../streaming/state/RocksDBListState.java | 10 ---------- .../streaming/state/RocksDBMapState.java | 10 ---------- .../streaming/state/RocksDBReducingState.java | 10 ---------- .../streaming/state/RocksDBValueState.java | 10 ---------- 8 files changed, 20 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c2c4f492/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java index 6db0e86..3464355 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java @@ -64,12 +64,10 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta /** State descriptor from which to create this state instance. */ protected final SD stateDesc; - /** - * We disable writes to the write-ahead-log here. - */ - private final WriteOptions writeOptions; + protected final WriteOptions writeOptions; protected final ByteArrayOutputStreamWithPos keySerializationStream; + protected final DataOutputView keySerializationDataOutputView; private final boolean ambiguousKeyPossible; @@ -89,8 +87,7 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta this.columnFamily = columnFamily; - writeOptions = new WriteOptions(); - writeOptions.setDisableWAL(true); + this.writeOptions = backend.getWriteOptions(); this.stateDesc = Preconditions.checkNotNull(stateDesc, "State Descriptor"); this.keySerializationStream = new ByteArrayOutputStreamWithPos(128); http://git-wip-us.apache.org/repos/asf/flink/blob/c2c4f492/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java index 2c07814..f2d1d86 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java @@ -29,7 +29,6 @@ import org.apache.flink.runtime.state.internal.InternalAggregatingState; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.RocksDBException; -import org.rocksdb.WriteOptions; import java.io.IOException; import java.util.Collection; @@ -54,12 +53,6 @@ public class RocksDBAggregatingState<K, N, T, ACC, R> private final AggregateFunction<T, ACC, R> aggFunction; /** - * We disable writes to the write-ahead-log here. We can't have these in the base class - * because JNI segfaults for some reason if they are. - */ - private final WriteOptions writeOptions; - - /** * Creates a new {@code RocksDBFoldingState}. * * @param namespaceSerializer @@ -77,9 +70,6 @@ public class RocksDBAggregatingState<K, N, T, ACC, R> this.valueSerializer = stateDesc.getSerializer(); this.aggFunction = stateDesc.getAggregateFunction(); - - writeOptions = new WriteOptions(); - writeOptions.setDisableWAL(true); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/c2c4f492/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java index 479565e..d886f44 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java @@ -29,7 +29,6 @@ import org.apache.flink.runtime.state.internal.InternalFoldingState; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.RocksDBException; -import org.rocksdb.WriteOptions; import java.io.IOException; @@ -55,12 +54,6 @@ public class RocksDBFoldingState<K, N, T, ACC> private final FoldFunction<T, ACC> foldFunction; /** - * We disable writes to the write-ahead-log here. We can't have these in the base class - * because JNI segfaults for some reason if they are. - */ - private final WriteOptions writeOptions; - - /** * Creates a new {@code RocksDBFoldingState}. * * @param namespaceSerializer The serializer for the namespace. @@ -76,9 +69,6 @@ public class RocksDBFoldingState<K, N, T, ACC> this.valueSerializer = stateDesc.getSerializer(); this.foldFunction = stateDesc.getFoldFunction(); - - writeOptions = new WriteOptions(); - writeOptions.setDisableWAL(true); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/c2c4f492/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index c02f130..8f95b18 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -102,6 +102,7 @@ import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.rocksdb.RocksIterator; import org.rocksdb.Snapshot; +import org.rocksdb.WriteOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -120,6 +121,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; @@ -191,6 +193,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private ColumnFamilyHandle defaultColumnFamily; /** + * The write options to use in the states. We disable write ahead logging. + */ + private final WriteOptions writeOptions; + + /** * Information about the k/v states as we create them. This is used to retrieve the * column family that is used for a state and also for sanity checks when restoring. */ @@ -266,7 +273,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { this.localRecoveryConfig = Preconditions.checkNotNull(localRecoveryConfig); this.keyGroupPrefixBytes = getNumberOfKeyGroups() > (Byte.MAX_VALUE + 1) ? 2 : 1; - this.kvStateInformation = new HashMap<>(); + this.kvStateInformation = new LinkedHashMap<>(); this.restoredKvStateMetaInfos = new HashMap<>(); this.materializedSstFiles = new TreeMap<>(); this.backendUID = UUID.randomUUID(); @@ -275,6 +282,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { new IncrementalSnapshotStrategy() : new FullSnapshotStrategy(); + this.writeOptions = new WriteOptions().setDisableWAL(true); + LOG.debug("Setting initial keyed backend uid for operator {} to {}.", this.operatorIdentifier, this.backendUID); } @@ -363,12 +372,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { // invalidate the reference db = null; + IOUtils.closeQuietly(columnOptions); + IOUtils.closeQuietly(dbOptions); + IOUtils.closeQuietly(writeOptions); kvStateInformation.clear(); restoredKvStateMetaInfos.clear(); - IOUtils.closeQuietly(dbOptions); - IOUtils.closeQuietly(columnOptions); - cleanInstanceBasePath(); } } @@ -387,6 +396,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { return keyGroupPrefixBytes; } + public WriteOptions getWriteOptions() { + return writeOptions; + } + /** * Triggers an asynchronous snapshot of the keyed state backend from RocksDB. This snapshot can be canceled and * is also stopped when the backend is closed through {@link #dispose()}. For each backend, this method must always http://git-wip-us.apache.org/repos/asf/flink/blob/c2c4f492/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java index f0481ec..62c169b 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java @@ -28,7 +28,6 @@ import org.apache.flink.util.Preconditions; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.RocksDBException; -import org.rocksdb.WriteOptions; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -55,12 +54,6 @@ public class RocksDBListState<K, N, V> private final TypeSerializer<V> valueSerializer; /** - * We disable writes to the write-ahead-log here. We can't have these in the base class - * because JNI segfaults for some reason if they are. - */ - private final WriteOptions writeOptions; - - /** * Separator of StringAppendTestOperator in RocksDB. */ private static final byte DELIMITER = ','; @@ -79,9 +72,6 @@ public class RocksDBListState<K, N, V> super(columnFamily, namespaceSerializer, stateDesc, backend); this.valueSerializer = stateDesc.getElementSerializer(); - - writeOptions = new WriteOptions(); - writeOptions.setDisableWAL(true); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/c2c4f492/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java index 6b7177b..d1e72c9 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java @@ -35,7 +35,6 @@ import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.rocksdb.RocksIterator; -import org.rocksdb.WriteOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,12 +67,6 @@ public class RocksDBMapState<K, N, UK, UV> private final TypeSerializer<UK> userKeySerializer; private final TypeSerializer<UV> userValueSerializer; - /** - * We disable writes to the write-ahead-log here. We can't have these in the base class - * because JNI segfaults for some reason if they are. - */ - private final WriteOptions writeOptions; - /** The offset of User Key offset in raw key bytes. */ private int userKeyOffset; @@ -92,9 +85,6 @@ public class RocksDBMapState<K, N, UK, UV> this.userKeySerializer = stateDesc.getKeySerializer(); this.userValueSerializer = stateDesc.getValueSerializer(); - - writeOptions = new WriteOptions(); - writeOptions.setDisableWAL(true); } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/c2c4f492/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java index b4c3f51..2a7f6e0 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java @@ -29,7 +29,6 @@ import org.apache.flink.runtime.state.internal.InternalReducingState; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.RocksDBException; -import org.rocksdb.WriteOptions; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -53,12 +52,6 @@ public class RocksDBReducingState<K, N, V> private final ReduceFunction<V> reduceFunction; /** - * We disable writes to the write-ahead-log here. We can't have these in the base class - * because JNI segfaults for some reason if they are. - */ - private final WriteOptions writeOptions; - - /** * Creates a new {@code RocksDBReducingState}. * * @param namespaceSerializer The serializer for the namespace. @@ -73,9 +66,6 @@ public class RocksDBReducingState<K, N, V> super(columnFamily, namespaceSerializer, stateDesc, backend); this.valueSerializer = stateDesc.getSerializer(); this.reduceFunction = stateDesc.getReduceFunction(); - - writeOptions = new WriteOptions(); - writeOptions.setDisableWAL(true); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/c2c4f492/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java index da21e8a..99718be 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java @@ -27,7 +27,6 @@ import org.apache.flink.runtime.state.internal.InternalValueState; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.RocksDBException; -import org.rocksdb.WriteOptions; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -47,12 +46,6 @@ public class RocksDBValueState<K, N, V> private final TypeSerializer<V> valueSerializer; /** - * We disable writes to the write-ahead-log here. We can't have these in the base class - * because JNI segfaults for some reason if they are. - */ - private final WriteOptions writeOptions; - - /** * Creates a new {@code RocksDBValueState}. * * @param namespaceSerializer The serializer for the namespace. @@ -66,9 +59,6 @@ public class RocksDBValueState<K, N, V> super(columnFamily, namespaceSerializer, stateDesc, backend); this.valueSerializer = stateDesc.getSerializer(); - - writeOptions = new WriteOptions(); - writeOptions.setDisableWAL(true); } @Override
