This is an automated email from the ASF dual-hosted git repository. liyu pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
commit f87937be3a4309f9d532254983e33923ea85bfd9 Author: Yu Li <[email protected]> AuthorDate: Sat Jun 20 16:58:59 2020 +0800 Revert "[FLINK-17800][roksdb] Support customized RocksDB write/read options and use RocksDBResourceContainer to get them" This reverts commit 88c22864504d772764c5838afe0b944f1da50a3a. --- .../streaming/state/RocksDBKeyedStateBackend.java | 8 +++-- .../state/RocksDBKeyedStateBackendBuilder.java | 17 +++++++++-- .../streaming/state/RocksDBOptionsFactory.java | 34 ---------------------- .../streaming/state/RocksDBResourceContainer.java | 34 ---------------------- .../state/RocksDBResourceContainerTest.java | 14 --------- 5 files changed, 21 insertions(+), 86 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 591e8b7..505bd2a 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -219,6 +219,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { ExecutionConfig executionConfig, TtlTimeProvider ttlTimeProvider, RocksDB db, + WriteOptions writeOptions, + ReadOptions readOptions, LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation, int keyGroupPrefixBytes, CloseableRegistry cancelStreamRegistry, @@ -257,8 +259,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { this.keyGroupPrefixBytes = keyGroupPrefixBytes; this.kvStateInformation = kvStateInformation; - this.writeOptions = optionsContainer.getWriteOptions(); - this.readOptions = optionsContainer.getReadOptions(); + this.writeOptions = writeOptions; + this.readOptions = readOptions; checkArgument(writeBatchSize >= 0, "Write batch size have to be no negative value."); this.writeBatchSize = writeBatchSize; this.db = db; @@ -368,6 +370,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { columnFamilyOptions.forEach(IOUtils::closeQuietly); IOUtils.closeQuietly(optionsContainer); + IOUtils.closeQuietly(readOptions); + IOUtils.closeQuietly(writeOptions); ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories(); diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java index 94e0d5b..08b4864 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java @@ -54,7 +54,9 @@ import org.apache.flink.util.ResourceGuard; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; +import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; +import org.rocksdb.WriteOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -247,6 +249,9 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken ColumnFamilyHandle defaultColumnFamilyHandle = null; RocksDBNativeMetricMonitor nativeMetricMonitor = null; CloseableRegistry cancelStreamRegistryForBackend = new CloseableRegistry(); + // The write options to use in the states. + WriteOptions writeOptions = null; + ReadOptions readOptions = null; LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation = new LinkedHashMap<>(); RocksDB db = null; AbstractRocksDBRestoreOperation restoreOperation = null; @@ -284,7 +289,9 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken } } - writeBatchWrapper = new RocksDBWriteBatchWrapper(db, optionsContainer.getWriteOptions(), writeBatchSize); + writeOptions = new WriteOptions().setDisableWAL(true); + readOptions = RocksDBOperationUtils.createTotalOrderSeekReadOptions(); + writeBatchWrapper = new RocksDBWriteBatchWrapper(db, writeOptions, writeBatchSize); // it is important that we only create the key builder after the restore, and not before; // restore operations may reconfigure the key serializer, so accessing the key serializer // only now we can be certain that the key serializer used in the builder is final. @@ -300,6 +307,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken keyGroupPrefixBytes, kvStateInformation, db, + readOptions, writeBatchWrapper, nativeMetricMonitor); } catch (Throwable e) { @@ -319,6 +327,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken IOUtils.closeQuietly(restoreOperation); IOUtils.closeAllQuietly(columnFamilyOptions); IOUtils.closeQuietly(optionsContainer); + IOUtils.closeQuietly(readOptions); + IOUtils.closeQuietly(writeOptions); ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories(); kvStateInformation.clear(); try { @@ -349,6 +359,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken this.executionConfig, this.ttlTimeProvider, db, + writeOptions, + readOptions, kvStateInformation, keyGroupPrefixBytes, cancelStreamRegistryForBackend, @@ -477,6 +489,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken int keyGroupPrefixBytes, Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation, RocksDB db, + ReadOptions readOptions, RocksDBWriteBatchWrapper writeBatchWrapper, RocksDBNativeMetricMonitor nativeMetricMonitor) { PriorityQueueSetFactory priorityQueueFactory; @@ -491,7 +504,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken numberOfKeyGroups, kvStateInformation, db, - optionsContainer.getReadOptions(), + readOptions, writeBatchWrapper, nativeMetricMonitor, columnFamilyOptionsFactory diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java index 5ee5098..a5fd1e9 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java @@ -20,8 +20,6 @@ package org.apache.flink.contrib.streaming.state; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; -import org.rocksdb.ReadOptions; -import org.rocksdb.WriteOptions; import java.util.ArrayList; import java.util.Collection; @@ -95,38 +93,6 @@ public interface RocksDBOptionsFactory extends OptionsFactory, java.io.Serializa return nativeMetricOptions; } - /** - * This method should set the additional options on top of the current options object. - * The current options object may contain pre-defined options based on flags that have - * been configured on the state backend. - * - * <p>It is important to set the options on the current object and return the result from - * the setter methods, otherwise the pre-defined options may get lost. - * - * @param currentOptions The options object with the pre-defined options. - * @param handlesToClose The collection to register newly created {@link org.rocksdb.RocksObject}s. - * @return The options object on which the additional options are set. - */ - default WriteOptions createWriteOptions(WriteOptions currentOptions, Collection<AutoCloseable> handlesToClose) { - return currentOptions; - } - - /** - * This method should set the additional options on top of the current options object. - * The current options object may contain pre-defined options based on flags that have - * been configured on the state backend. - * - * <p>It is important to set the options on the current object and return the result from - * the setter methods, otherwise the pre-defined options may get lost. - * - * @param currentOptions The options object with the pre-defined options. - * @param handlesToClose The collection to register newly created {@link org.rocksdb.RocksObject}s. - * @return The options object on which the additional options are set. - */ - default ReadOptions createReadOptions(ReadOptions currentOptions, Collection<AutoCloseable> handlesToClose) { - return currentOptions; - } - // ------------------------------------------------------------------------ // for compatibility // ------------------------------------------------------------------------ diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java index 3aaffa3..2324973 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java @@ -26,9 +26,7 @@ import org.rocksdb.BlockBasedTableConfig; import org.rocksdb.Cache; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; -import org.rocksdb.ReadOptions; import org.rocksdb.TableFormatConfig; -import org.rocksdb.WriteOptions; import javax.annotation.Nullable; @@ -140,38 +138,6 @@ public final class RocksDBResourceContainer implements AutoCloseable { return opt; } - /** - * Gets the RocksDB {@link WriteOptions} to be used for write operations. - */ - public WriteOptions getWriteOptions() { - // Disable WAL by default - WriteOptions opt = new WriteOptions().setDisableWAL(true); - handlesToClose.add(opt); - - // add user-defined options factory, if specified - if (optionsFactory != null) { - opt = optionsFactory.createWriteOptions(opt, handlesToClose); - } - - return opt; - } - - /** - * Gets the RocksDB {@link ReadOptions} to be used for read operations. - */ - public ReadOptions getReadOptions() { - // We ensure total order seek by default to prevent user misuse, see FLINK-17800 for more details - ReadOptions opt = RocksDBOperationUtils.createTotalOrderSeekReadOptions(); - handlesToClose.add(opt); - - // add user-defined options factory, if specified - if (optionsFactory != null) { - opt = optionsFactory.createReadOptions(opt, handlesToClose); - } - - return opt; - } - RocksDBNativeMetricOptions getMemoryWatcherOptions(RocksDBNativeMetricOptions defaultMetricOptions) { return optionsFactory == null ? defaultMetricOptions diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java index eb76e62..73d4bab 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java @@ -31,9 +31,7 @@ import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; import org.rocksdb.LRUCache; import org.rocksdb.NativeLibraryLoader; -import org.rocksdb.ReadOptions; import org.rocksdb.WriteBufferManager; -import org.rocksdb.WriteOptions; import java.io.IOException; import java.lang.reflect.Field; @@ -248,16 +246,4 @@ public class RocksDBResourceContainerTest { assertThat(cache.isOwningHandle(), is(false)); assertThat(wbm.isOwningHandle(), is(false)); } - - @Test - public void testFreeWriteReadOptionsAfterClose() throws Exception { - RocksDBResourceContainer container = new RocksDBResourceContainer(); - WriteOptions writeOptions = container.getWriteOptions(); - ReadOptions readOptions = container.getReadOptions(); - assertThat(writeOptions.isOwningHandle(), is(true)); - assertThat(readOptions.isOwningHandle(), is(true)); - container.close(); - assertThat(writeOptions.isOwningHandle(), is(false)); - assertThat(readOptions.isOwningHandle(), is(false)); - } }
