This is an automated email from the ASF dual-hosted git repository.

liyu pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 88c22864504d772764c5838afe0b944f1da50a3a
Author: Yu Li <l...@apache.org>
AuthorDate: Tue Jun 16 14:01:29 2020 +0800

    [FLINK-17800][roksdb] Support customized RocksDB write/read options and use 
RocksDBResourceContainer to get them
---
 .../streaming/state/RocksDBKeyedStateBackend.java  |  8 ++---
 .../state/RocksDBKeyedStateBackendBuilder.java     | 17 ++---------
 .../streaming/state/RocksDBOptionsFactory.java     | 34 ++++++++++++++++++++++
 .../streaming/state/RocksDBResourceContainer.java  | 34 ++++++++++++++++++++++
 .../state/RocksDBResourceContainerTest.java        | 14 +++++++++
 5 files changed, 86 insertions(+), 21 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 505bd2a..591e8b7 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -219,8 +219,6 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                ExecutionConfig executionConfig,
                TtlTimeProvider ttlTimeProvider,
                RocksDB db,
-               WriteOptions writeOptions,
-               ReadOptions readOptions,
                LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation,
                int keyGroupPrefixBytes,
                CloseableRegistry cancelStreamRegistry,
@@ -259,8 +257,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                this.keyGroupPrefixBytes = keyGroupPrefixBytes;
                this.kvStateInformation = kvStateInformation;
 
-               this.writeOptions = writeOptions;
-               this.readOptions = readOptions;
+               this.writeOptions = optionsContainer.getWriteOptions();
+               this.readOptions = optionsContainer.getReadOptions();
                checkArgument(writeBatchSize >= 0, "Write batch size have to be 
no negative value.");
                this.writeBatchSize = writeBatchSize;
                this.db = db;
@@ -370,8 +368,6 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        columnFamilyOptions.forEach(IOUtils::closeQuietly);
 
                        IOUtils.closeQuietly(optionsContainer);
-                       IOUtils.closeQuietly(readOptions);
-                       IOUtils.closeQuietly(writeOptions);
 
                        
ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories();
 
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index 08b4864..94e0d5b 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -54,9 +54,7 @@ import org.apache.flink.util.ResourceGuard;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
-import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
-import org.rocksdb.WriteOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -249,9 +247,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                ColumnFamilyHandle defaultColumnFamilyHandle = null;
                RocksDBNativeMetricMonitor nativeMetricMonitor = null;
                CloseableRegistry cancelStreamRegistryForBackend = new 
CloseableRegistry();
-               // The write options to use in the states.
-               WriteOptions writeOptions = null;
-               ReadOptions readOptions = null;
                LinkedHashMap<String, 
RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation = new 
LinkedHashMap<>();
                RocksDB db = null;
                AbstractRocksDBRestoreOperation restoreOperation = null;
@@ -289,9 +284,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                                }
                        }
 
-                       writeOptions = new WriteOptions().setDisableWAL(true);
-                       readOptions = 
RocksDBOperationUtils.createTotalOrderSeekReadOptions();
-                       writeBatchWrapper = new RocksDBWriteBatchWrapper(db, 
writeOptions, writeBatchSize);
+                       writeBatchWrapper = new RocksDBWriteBatchWrapper(db, 
optionsContainer.getWriteOptions(), writeBatchSize);
                        // it is important that we only create the key builder 
after the restore, and not before;
                        // restore operations may reconfigure the key 
serializer, so accessing the key serializer
                        // only now we can be certain that the key serializer 
used in the builder is final.
@@ -307,7 +300,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                                keyGroupPrefixBytes,
                                kvStateInformation,
                                db,
-                               readOptions,
                                writeBatchWrapper,
                                nativeMetricMonitor);
                } catch (Throwable e) {
@@ -327,8 +319,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                        IOUtils.closeQuietly(restoreOperation);
                        IOUtils.closeAllQuietly(columnFamilyOptions);
                        IOUtils.closeQuietly(optionsContainer);
-                       IOUtils.closeQuietly(readOptions);
-                       IOUtils.closeQuietly(writeOptions);
                        
ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories();
                        kvStateInformation.clear();
                        try {
@@ -359,8 +349,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                        this.executionConfig,
                        this.ttlTimeProvider,
                        db,
-                       writeOptions,
-                       readOptions,
                        kvStateInformation,
                        keyGroupPrefixBytes,
                        cancelStreamRegistryForBackend,
@@ -489,7 +477,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                int keyGroupPrefixBytes,
                Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> 
kvStateInformation,
                RocksDB db,
-               ReadOptions readOptions,
                RocksDBWriteBatchWrapper writeBatchWrapper,
                RocksDBNativeMetricMonitor nativeMetricMonitor) {
                PriorityQueueSetFactory priorityQueueFactory;
@@ -504,7 +491,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                                        numberOfKeyGroups,
                                        kvStateInformation,
                                        db,
-                                       readOptions,
+                                       optionsContainer.getReadOptions(),
                                        writeBatchWrapper,
                                        nativeMetricMonitor,
                                        columnFamilyOptionsFactory
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java
index a5fd1e9..5ee5098 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java
@@ -20,6 +20,8 @@ package org.apache.flink.contrib.streaming.state;
 
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.WriteOptions;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -93,6 +95,38 @@ public interface RocksDBOptionsFactory extends 
OptionsFactory, java.io.Serializa
                return nativeMetricOptions;
        }
 
+       /**
+        * This method should set the additional options on top of the current 
options object.
+        * The current options object may contain pre-defined options based on 
flags that have
+        * been configured on the state backend.
+        *
+        * <p>It is important to set the options on the current object and 
return the result from
+        * the setter methods, otherwise the pre-defined options may get lost.
+        *
+        * @param currentOptions The options object with the pre-defined 
options.
+        * @param handlesToClose The collection to register newly created 
{@link org.rocksdb.RocksObject}s.
+        * @return The options object on which the additional options are set.
+        */
+       default WriteOptions createWriteOptions(WriteOptions currentOptions, 
Collection<AutoCloseable> handlesToClose) {
+               return currentOptions;
+       }
+
+       /**
+        * This method should set the additional options on top of the current 
options object.
+        * The current options object may contain pre-defined options based on 
flags that have
+        * been configured on the state backend.
+        *
+        * <p>It is important to set the options on the current object and 
return the result from
+        * the setter methods, otherwise the pre-defined options may get lost.
+        *
+        * @param currentOptions The options object with the pre-defined 
options.
+        * @param handlesToClose The collection to register newly created 
{@link org.rocksdb.RocksObject}s.
+        * @return The options object on which the additional options are set.
+        */
+       default ReadOptions createReadOptions(ReadOptions currentOptions, 
Collection<AutoCloseable> handlesToClose) {
+               return currentOptions;
+       }
+
        // 
------------------------------------------------------------------------
        //  for compatibility
        // 
------------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
index 2324973..3aaffa3 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
@@ -26,7 +26,9 @@ import org.rocksdb.BlockBasedTableConfig;
 import org.rocksdb.Cache;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
+import org.rocksdb.ReadOptions;
 import org.rocksdb.TableFormatConfig;
+import org.rocksdb.WriteOptions;
 
 import javax.annotation.Nullable;
 
@@ -138,6 +140,38 @@ public final class RocksDBResourceContainer implements 
AutoCloseable {
                return opt;
        }
 
+       /**
+        * Gets the RocksDB {@link WriteOptions} to be used for write 
operations.
+        */
+       public WriteOptions getWriteOptions() {
+               // Disable WAL by default
+               WriteOptions opt = new WriteOptions().setDisableWAL(true);
+               handlesToClose.add(opt);
+
+               // add user-defined options factory, if specified
+               if (optionsFactory != null) {
+                       opt = optionsFactory.createWriteOptions(opt, 
handlesToClose);
+               }
+
+               return opt;
+       }
+
+       /**
+        * Gets the RocksDB {@link ReadOptions} to be used for read operations.
+        */
+       public ReadOptions getReadOptions() {
+               // We ensure total order seek by default to prevent user 
misuse, see FLINK-17800 for more details
+               ReadOptions opt = 
RocksDBOperationUtils.createTotalOrderSeekReadOptions();
+               handlesToClose.add(opt);
+
+               // add user-defined options factory, if specified
+               if (optionsFactory != null) {
+                       opt = optionsFactory.createReadOptions(opt, 
handlesToClose);
+               }
+
+               return opt;
+       }
+
        RocksDBNativeMetricOptions 
getMemoryWatcherOptions(RocksDBNativeMetricOptions defaultMetricOptions) {
                return optionsFactory == null
                                ? defaultMetricOptions
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
index 73d4bab..eb76e62 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
@@ -31,7 +31,9 @@ import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
 import org.rocksdb.LRUCache;
 import org.rocksdb.NativeLibraryLoader;
+import org.rocksdb.ReadOptions;
 import org.rocksdb.WriteBufferManager;
+import org.rocksdb.WriteOptions;
 
 import java.io.IOException;
 import java.lang.reflect.Field;
@@ -246,4 +248,16 @@ public class RocksDBResourceContainerTest {
                assertThat(cache.isOwningHandle(), is(false));
                assertThat(wbm.isOwningHandle(), is(false));
        }
+
+       @Test
+       public void testFreeWriteReadOptionsAfterClose() throws Exception {
+               RocksDBResourceContainer container = new 
RocksDBResourceContainer();
+               WriteOptions writeOptions = container.getWriteOptions();
+               ReadOptions readOptions = container.getReadOptions();
+               assertThat(writeOptions.isOwningHandle(), is(true));
+               assertThat(readOptions.isOwningHandle(), is(true));
+               container.close();
+               assertThat(writeOptions.isOwningHandle(), is(false));
+               assertThat(readOptions.isOwningHandle(), is(false));
+       }
 }

Reply via email to