This is an automated email from the ASF dual-hosted git repository. liyu pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 88c22864504d772764c5838afe0b944f1da50a3a Author: Yu Li <l...@apache.org> AuthorDate: Tue Jun 16 14:01:29 2020 +0800 [FLINK-17800][roksdb] Support customized RocksDB write/read options and use RocksDBResourceContainer to get them --- .../streaming/state/RocksDBKeyedStateBackend.java | 8 ++--- .../state/RocksDBKeyedStateBackendBuilder.java | 17 ++--------- .../streaming/state/RocksDBOptionsFactory.java | 34 ++++++++++++++++++++++ .../streaming/state/RocksDBResourceContainer.java | 34 ++++++++++++++++++++++ .../state/RocksDBResourceContainerTest.java | 14 +++++++++ 5 files changed, 86 insertions(+), 21 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 505bd2a..591e8b7 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -219,8 +219,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { ExecutionConfig executionConfig, TtlTimeProvider ttlTimeProvider, RocksDB db, - WriteOptions writeOptions, - ReadOptions readOptions, LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation, int keyGroupPrefixBytes, CloseableRegistry cancelStreamRegistry, @@ -259,8 +257,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { this.keyGroupPrefixBytes = keyGroupPrefixBytes; this.kvStateInformation = kvStateInformation; - this.writeOptions = writeOptions; - this.readOptions = readOptions; + this.writeOptions = optionsContainer.getWriteOptions(); + this.readOptions = optionsContainer.getReadOptions(); checkArgument(writeBatchSize >= 0, "Write batch size have to be no negative value."); this.writeBatchSize = writeBatchSize; this.db = db; @@ -370,8 +368,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { columnFamilyOptions.forEach(IOUtils::closeQuietly); IOUtils.closeQuietly(optionsContainer); - IOUtils.closeQuietly(readOptions); - IOUtils.closeQuietly(writeOptions); ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories(); diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java index 08b4864..94e0d5b 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java @@ -54,9 +54,7 @@ import org.apache.flink.util.ResourceGuard; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; -import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; -import org.rocksdb.WriteOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -249,9 +247,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken ColumnFamilyHandle defaultColumnFamilyHandle = null; RocksDBNativeMetricMonitor nativeMetricMonitor = null; CloseableRegistry cancelStreamRegistryForBackend = new CloseableRegistry(); - // The write options to use in the states. - WriteOptions writeOptions = null; - ReadOptions readOptions = null; LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation = new LinkedHashMap<>(); RocksDB db = null; AbstractRocksDBRestoreOperation restoreOperation = null; @@ -289,9 +284,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken } } - writeOptions = new WriteOptions().setDisableWAL(true); - readOptions = RocksDBOperationUtils.createTotalOrderSeekReadOptions(); - writeBatchWrapper = new RocksDBWriteBatchWrapper(db, writeOptions, writeBatchSize); + writeBatchWrapper = new RocksDBWriteBatchWrapper(db, optionsContainer.getWriteOptions(), writeBatchSize); // it is important that we only create the key builder after the restore, and not before; // restore operations may reconfigure the key serializer, so accessing the key serializer // only now we can be certain that the key serializer used in the builder is final. @@ -307,7 +300,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken keyGroupPrefixBytes, kvStateInformation, db, - readOptions, writeBatchWrapper, nativeMetricMonitor); } catch (Throwable e) { @@ -327,8 +319,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken IOUtils.closeQuietly(restoreOperation); IOUtils.closeAllQuietly(columnFamilyOptions); IOUtils.closeQuietly(optionsContainer); - IOUtils.closeQuietly(readOptions); - IOUtils.closeQuietly(writeOptions); ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories(); kvStateInformation.clear(); try { @@ -359,8 +349,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken this.executionConfig, this.ttlTimeProvider, db, - writeOptions, - readOptions, kvStateInformation, keyGroupPrefixBytes, cancelStreamRegistryForBackend, @@ -489,7 +477,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken int keyGroupPrefixBytes, Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation, RocksDB db, - ReadOptions readOptions, RocksDBWriteBatchWrapper writeBatchWrapper, RocksDBNativeMetricMonitor nativeMetricMonitor) { PriorityQueueSetFactory priorityQueueFactory; @@ -504,7 +491,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken numberOfKeyGroups, kvStateInformation, db, - readOptions, + optionsContainer.getReadOptions(), writeBatchWrapper, nativeMetricMonitor, columnFamilyOptionsFactory diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java index a5fd1e9..5ee5098 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java @@ -20,6 +20,8 @@ package org.apache.flink.contrib.streaming.state; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; +import org.rocksdb.ReadOptions; +import org.rocksdb.WriteOptions; import java.util.ArrayList; import java.util.Collection; @@ -93,6 +95,38 @@ public interface RocksDBOptionsFactory extends OptionsFactory, java.io.Serializa return nativeMetricOptions; } + /** + * This method should set the additional options on top of the current options object. + * The current options object may contain pre-defined options based on flags that have + * been configured on the state backend. + * + * <p>It is important to set the options on the current object and return the result from + * the setter methods, otherwise the pre-defined options may get lost. + * + * @param currentOptions The options object with the pre-defined options. + * @param handlesToClose The collection to register newly created {@link org.rocksdb.RocksObject}s. + * @return The options object on which the additional options are set. + */ + default WriteOptions createWriteOptions(WriteOptions currentOptions, Collection<AutoCloseable> handlesToClose) { + return currentOptions; + } + + /** + * This method should set the additional options on top of the current options object. + * The current options object may contain pre-defined options based on flags that have + * been configured on the state backend. + * + * <p>It is important to set the options on the current object and return the result from + * the setter methods, otherwise the pre-defined options may get lost. + * + * @param currentOptions The options object with the pre-defined options. + * @param handlesToClose The collection to register newly created {@link org.rocksdb.RocksObject}s. + * @return The options object on which the additional options are set. + */ + default ReadOptions createReadOptions(ReadOptions currentOptions, Collection<AutoCloseable> handlesToClose) { + return currentOptions; + } + // ------------------------------------------------------------------------ // for compatibility // ------------------------------------------------------------------------ diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java index 2324973..3aaffa3 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java @@ -26,7 +26,9 @@ import org.rocksdb.BlockBasedTableConfig; import org.rocksdb.Cache; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; +import org.rocksdb.ReadOptions; import org.rocksdb.TableFormatConfig; +import org.rocksdb.WriteOptions; import javax.annotation.Nullable; @@ -138,6 +140,38 @@ public final class RocksDBResourceContainer implements AutoCloseable { return opt; } + /** + * Gets the RocksDB {@link WriteOptions} to be used for write operations. + */ + public WriteOptions getWriteOptions() { + // Disable WAL by default + WriteOptions opt = new WriteOptions().setDisableWAL(true); + handlesToClose.add(opt); + + // add user-defined options factory, if specified + if (optionsFactory != null) { + opt = optionsFactory.createWriteOptions(opt, handlesToClose); + } + + return opt; + } + + /** + * Gets the RocksDB {@link ReadOptions} to be used for read operations. + */ + public ReadOptions getReadOptions() { + // We ensure total order seek by default to prevent user misuse, see FLINK-17800 for more details + ReadOptions opt = RocksDBOperationUtils.createTotalOrderSeekReadOptions(); + handlesToClose.add(opt); + + // add user-defined options factory, if specified + if (optionsFactory != null) { + opt = optionsFactory.createReadOptions(opt, handlesToClose); + } + + return opt; + } + RocksDBNativeMetricOptions getMemoryWatcherOptions(RocksDBNativeMetricOptions defaultMetricOptions) { return optionsFactory == null ? defaultMetricOptions diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java index 73d4bab..eb76e62 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java @@ -31,7 +31,9 @@ import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; import org.rocksdb.LRUCache; import org.rocksdb.NativeLibraryLoader; +import org.rocksdb.ReadOptions; import org.rocksdb.WriteBufferManager; +import org.rocksdb.WriteOptions; import java.io.IOException; import java.lang.reflect.Field; @@ -246,4 +248,16 @@ public class RocksDBResourceContainerTest { assertThat(cache.isOwningHandle(), is(false)); assertThat(wbm.isOwningHandle(), is(false)); } + + @Test + public void testFreeWriteReadOptionsAfterClose() throws Exception { + RocksDBResourceContainer container = new RocksDBResourceContainer(); + WriteOptions writeOptions = container.getWriteOptions(); + ReadOptions readOptions = container.getReadOptions(); + assertThat(writeOptions.isOwningHandle(), is(true)); + assertThat(readOptions.isOwningHandle(), is(true)); + container.close(); + assertThat(writeOptions.isOwningHandle(), is(false)); + assertThat(readOptions.isOwningHandle(), is(false)); + } }