This is an automated email from the ASF dual-hosted git repository.

liyu pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f87937be3a4309f9d532254983e33923ea85bfd9
Author: Yu Li <[email protected]>
AuthorDate: Sat Jun 20 16:58:59 2020 +0800

    Revert "[FLINK-17800][roksdb] Support customized RocksDB write/read options 
and use RocksDBResourceContainer to get them"
    
    This reverts commit 88c22864504d772764c5838afe0b944f1da50a3a.
---
 .../streaming/state/RocksDBKeyedStateBackend.java  |  8 +++--
 .../state/RocksDBKeyedStateBackendBuilder.java     | 17 +++++++++--
 .../streaming/state/RocksDBOptionsFactory.java     | 34 ----------------------
 .../streaming/state/RocksDBResourceContainer.java  | 34 ----------------------
 .../state/RocksDBResourceContainerTest.java        | 14 ---------
 5 files changed, 21 insertions(+), 86 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 591e8b7..505bd2a 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -219,6 +219,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                ExecutionConfig executionConfig,
                TtlTimeProvider ttlTimeProvider,
                RocksDB db,
+               WriteOptions writeOptions,
+               ReadOptions readOptions,
                LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation,
                int keyGroupPrefixBytes,
                CloseableRegistry cancelStreamRegistry,
@@ -257,8 +259,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                this.keyGroupPrefixBytes = keyGroupPrefixBytes;
                this.kvStateInformation = kvStateInformation;
 
-               this.writeOptions = optionsContainer.getWriteOptions();
-               this.readOptions = optionsContainer.getReadOptions();
+               this.writeOptions = writeOptions;
+               this.readOptions = readOptions;
                checkArgument(writeBatchSize >= 0, "Write batch size have to be 
no negative value.");
                this.writeBatchSize = writeBatchSize;
                this.db = db;
@@ -368,6 +370,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        columnFamilyOptions.forEach(IOUtils::closeQuietly);
 
                        IOUtils.closeQuietly(optionsContainer);
+                       IOUtils.closeQuietly(readOptions);
+                       IOUtils.closeQuietly(writeOptions);
 
                        
ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories();
 
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index 94e0d5b..08b4864 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -54,7 +54,9 @@ import org.apache.flink.util.ResourceGuard;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
+import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -247,6 +249,9 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                ColumnFamilyHandle defaultColumnFamilyHandle = null;
                RocksDBNativeMetricMonitor nativeMetricMonitor = null;
                CloseableRegistry cancelStreamRegistryForBackend = new 
CloseableRegistry();
+               // The write options to use in the states.
+               WriteOptions writeOptions = null;
+               ReadOptions readOptions = null;
                LinkedHashMap<String, 
RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation = new 
LinkedHashMap<>();
                RocksDB db = null;
                AbstractRocksDBRestoreOperation restoreOperation = null;
@@ -284,7 +289,9 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                                }
                        }
 
-                       writeBatchWrapper = new RocksDBWriteBatchWrapper(db, 
optionsContainer.getWriteOptions(), writeBatchSize);
+                       writeOptions = new WriteOptions().setDisableWAL(true);
+                       readOptions = 
RocksDBOperationUtils.createTotalOrderSeekReadOptions();
+                       writeBatchWrapper = new RocksDBWriteBatchWrapper(db, 
writeOptions, writeBatchSize);
                        // it is important that we only create the key builder 
after the restore, and not before;
                        // restore operations may reconfigure the key 
serializer, so accessing the key serializer
                        // only now we can be certain that the key serializer 
used in the builder is final.
@@ -300,6 +307,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                                keyGroupPrefixBytes,
                                kvStateInformation,
                                db,
+                               readOptions,
                                writeBatchWrapper,
                                nativeMetricMonitor);
                } catch (Throwable e) {
@@ -319,6 +327,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                        IOUtils.closeQuietly(restoreOperation);
                        IOUtils.closeAllQuietly(columnFamilyOptions);
                        IOUtils.closeQuietly(optionsContainer);
+                       IOUtils.closeQuietly(readOptions);
+                       IOUtils.closeQuietly(writeOptions);
                        
ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories();
                        kvStateInformation.clear();
                        try {
@@ -349,6 +359,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                        this.executionConfig,
                        this.ttlTimeProvider,
                        db,
+                       writeOptions,
+                       readOptions,
                        kvStateInformation,
                        keyGroupPrefixBytes,
                        cancelStreamRegistryForBackend,
@@ -477,6 +489,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                int keyGroupPrefixBytes,
                Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> 
kvStateInformation,
                RocksDB db,
+               ReadOptions readOptions,
                RocksDBWriteBatchWrapper writeBatchWrapper,
                RocksDBNativeMetricMonitor nativeMetricMonitor) {
                PriorityQueueSetFactory priorityQueueFactory;
@@ -491,7 +504,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                                        numberOfKeyGroups,
                                        kvStateInformation,
                                        db,
-                                       optionsContainer.getReadOptions(),
+                                       readOptions,
                                        writeBatchWrapper,
                                        nativeMetricMonitor,
                                        columnFamilyOptionsFactory
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java
index 5ee5098..a5fd1e9 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java
@@ -20,8 +20,6 @@ package org.apache.flink.contrib.streaming.state;
 
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
-import org.rocksdb.ReadOptions;
-import org.rocksdb.WriteOptions;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -95,38 +93,6 @@ public interface RocksDBOptionsFactory extends 
OptionsFactory, java.io.Serializa
                return nativeMetricOptions;
        }
 
-       /**
-        * This method should set the additional options on top of the current 
options object.
-        * The current options object may contain pre-defined options based on 
flags that have
-        * been configured on the state backend.
-        *
-        * <p>It is important to set the options on the current object and 
return the result from
-        * the setter methods, otherwise the pre-defined options may get lost.
-        *
-        * @param currentOptions The options object with the pre-defined 
options.
-        * @param handlesToClose The collection to register newly created 
{@link org.rocksdb.RocksObject}s.
-        * @return The options object on which the additional options are set.
-        */
-       default WriteOptions createWriteOptions(WriteOptions currentOptions, 
Collection<AutoCloseable> handlesToClose) {
-               return currentOptions;
-       }
-
-       /**
-        * This method should set the additional options on top of the current 
options object.
-        * The current options object may contain pre-defined options based on 
flags that have
-        * been configured on the state backend.
-        *
-        * <p>It is important to set the options on the current object and 
return the result from
-        * the setter methods, otherwise the pre-defined options may get lost.
-        *
-        * @param currentOptions The options object with the pre-defined 
options.
-        * @param handlesToClose The collection to register newly created 
{@link org.rocksdb.RocksObject}s.
-        * @return The options object on which the additional options are set.
-        */
-       default ReadOptions createReadOptions(ReadOptions currentOptions, 
Collection<AutoCloseable> handlesToClose) {
-               return currentOptions;
-       }
-
        // 
------------------------------------------------------------------------
        //  for compatibility
        // 
------------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
index 3aaffa3..2324973 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
@@ -26,9 +26,7 @@ import org.rocksdb.BlockBasedTableConfig;
 import org.rocksdb.Cache;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
-import org.rocksdb.ReadOptions;
 import org.rocksdb.TableFormatConfig;
-import org.rocksdb.WriteOptions;
 
 import javax.annotation.Nullable;
 
@@ -140,38 +138,6 @@ public final class RocksDBResourceContainer implements 
AutoCloseable {
                return opt;
        }
 
-       /**
-        * Gets the RocksDB {@link WriteOptions} to be used for write 
operations.
-        */
-       public WriteOptions getWriteOptions() {
-               // Disable WAL by default
-               WriteOptions opt = new WriteOptions().setDisableWAL(true);
-               handlesToClose.add(opt);
-
-               // add user-defined options factory, if specified
-               if (optionsFactory != null) {
-                       opt = optionsFactory.createWriteOptions(opt, 
handlesToClose);
-               }
-
-               return opt;
-       }
-
-       /**
-        * Gets the RocksDB {@link ReadOptions} to be used for read operations.
-        */
-       public ReadOptions getReadOptions() {
-               // We ensure total order seek by default to prevent user 
misuse, see FLINK-17800 for more details
-               ReadOptions opt = 
RocksDBOperationUtils.createTotalOrderSeekReadOptions();
-               handlesToClose.add(opt);
-
-               // add user-defined options factory, if specified
-               if (optionsFactory != null) {
-                       opt = optionsFactory.createReadOptions(opt, 
handlesToClose);
-               }
-
-               return opt;
-       }
-
        RocksDBNativeMetricOptions 
getMemoryWatcherOptions(RocksDBNativeMetricOptions defaultMetricOptions) {
                return optionsFactory == null
                                ? defaultMetricOptions
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
index eb76e62..73d4bab 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
@@ -31,9 +31,7 @@ import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
 import org.rocksdb.LRUCache;
 import org.rocksdb.NativeLibraryLoader;
-import org.rocksdb.ReadOptions;
 import org.rocksdb.WriteBufferManager;
-import org.rocksdb.WriteOptions;
 
 import java.io.IOException;
 import java.lang.reflect.Field;
@@ -248,16 +246,4 @@ public class RocksDBResourceContainerTest {
                assertThat(cache.isOwningHandle(), is(false));
                assertThat(wbm.isOwningHandle(), is(false));
        }
-
-       @Test
-       public void testFreeWriteReadOptionsAfterClose() throws Exception {
-               RocksDBResourceContainer container = new 
RocksDBResourceContainer();
-               WriteOptions writeOptions = container.getWriteOptions();
-               ReadOptions readOptions = container.getReadOptions();
-               assertThat(writeOptions.isOwningHandle(), is(true));
-               assertThat(readOptions.isOwningHandle(), is(true));
-               container.close();
-               assertThat(writeOptions.isOwningHandle(), is(false));
-               assertThat(readOptions.isOwningHandle(), is(false));
-       }
 }

Reply via email to