carp84 commented on a change in pull request #14341:
URL: https://github.com/apache/flink/pull/14341#discussion_r579248839
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
##########
@@ -143,6 +151,12 @@ public ColumnFamilyOptions getColumnOptions() {
"We currently only support BlockBasedTableConfig When
bounding total memory.");
blockBasedTableConfig = (BlockBasedTableConfig)
tableFormatConfig;
}
+ if (rocksResources.isUsingPartitionedIndexFilters()) {
+
blockBasedTableConfig.setIndexType(IndexType.kTwoLevelIndexSearch);
+ blockBasedTableConfig.setPartitionFilters(true);
+ blockBasedTableConfig.setPinTopLevelIndexAndFilter(true);
+ this.overwriteFilterIfExist(blockBasedTableConfig);
+ }
Review comment:
```suggestion
if (rocksResources.isUsingPartitionedIndexFilters() &&
overwriteFilterIfExist(blockBasedTableConfig)) {
blockBasedTableConfig.setIndexType(IndexType.kTwoLevelIndexSearch);
blockBasedTableConfig.setPartitionFilters(true);
blockBasedTableConfig.setPinTopLevelIndexAndFilter(true);
}
```
This suggestion indicates that full filters is a necessity to enable
partitioned index filters, and if the `overwriteFilterIfExist` is changed to
always do the overwriting, then it will always return true and the code could
be adjusted accordingly.
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
##########
@@ -264,4 +272,51 @@ public void testFreeWriteReadOptionsAfterClose() throws
Exception {
assertThat(writeOptions.isOwningHandle(), is(false));
assertThat(readOptions.isOwningHandle(), is(false));
}
+
+ @Test
+ public void testGetColumnFamilyOptionsWithPartitionedIndex() throws
Exception {
+ LRUCache cache = new LRUCache(1024L);
+ WriteBufferManager wbm = new WriteBufferManager(1024L, cache);
+ RocksDBSharedResources sharedResources =
+ new RocksDBSharedResources(cache, wbm, 1024L, true);
+ final ThrowingRunnable<Exception> disposer = sharedResources::close;
+ OpaqueMemoryResource<RocksDBSharedResources> opaqueResource =
+ new OpaqueMemoryResource<>(sharedResources, 1024L, disposer);
+ BloomFilter existFilter = new BloomFilter();
+ RocksDBOptionsFactory setBloomFilterOptionFactory =
+ new RocksDBOptionsFactory() {
+
+ @Override
+ public DBOptions createDBOptions(
+ DBOptions currentOptions,
Collection<AutoCloseable> handlesToClose) {
+ return currentOptions;
+ }
+
+ @Override
+ public ColumnFamilyOptions createColumnOptions(
+ ColumnFamilyOptions currentOptions,
+ Collection<AutoCloseable> handlesToClose) {
+ TableFormatConfig tableFormatConfig =
currentOptions.tableFormatConfig();
+ BlockBasedTableConfig blockBasedTableConfig =
+ tableFormatConfig == null
+ ? new BlockBasedTableConfig()
+ : (BlockBasedTableConfig)
tableFormatConfig;
+ blockBasedTableConfig.setFilter(existFilter);
Review comment:
```suggestion
blockBasedTableConfig.setFilter(blockBasedFilter);
```
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
##########
@@ -264,4 +272,51 @@ public void testFreeWriteReadOptionsAfterClose() throws
Exception {
assertThat(writeOptions.isOwningHandle(), is(false));
assertThat(readOptions.isOwningHandle(), is(false));
}
+
+ @Test
+ public void testGetColumnFamilyOptionsWithPartitionedIndex() throws
Exception {
+ LRUCache cache = new LRUCache(1024L);
+ WriteBufferManager wbm = new WriteBufferManager(1024L, cache);
+ RocksDBSharedResources sharedResources =
+ new RocksDBSharedResources(cache, wbm, 1024L, true);
+ final ThrowingRunnable<Exception> disposer = sharedResources::close;
+ OpaqueMemoryResource<RocksDBSharedResources> opaqueResource =
+ new OpaqueMemoryResource<>(sharedResources, 1024L, disposer);
+ BloomFilter existFilter = new BloomFilter();
Review comment:
```suggestion
BloomFilter blockBasedFilter = new BloomFilter();
```
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
##########
@@ -207,4 +221,37 @@ public void close() throws Exception {
sharedResources.close();
}
}
+
+ /**
+ * Overwrite configured {@link Filter} if enable partitioned filter.
Partitioned filter only
+ * worked in full bloom filter, not blocked based.
Review comment:
```suggestion
* Overwrite configured {@link Filter} when partitioned filter is
enabled, since partitioned filter only
* works with full bloom filter, not blocked based.
```
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
##########
@@ -264,4 +272,51 @@ public void testFreeWriteReadOptionsAfterClose() throws
Exception {
assertThat(writeOptions.isOwningHandle(), is(false));
assertThat(readOptions.isOwningHandle(), is(false));
}
+
+ @Test
+ public void testGetColumnFamilyOptionsWithPartitionedIndex() throws
Exception {
+ LRUCache cache = new LRUCache(1024L);
+ WriteBufferManager wbm = new WriteBufferManager(1024L, cache);
+ RocksDBSharedResources sharedResources =
+ new RocksDBSharedResources(cache, wbm, 1024L, true);
+ final ThrowingRunnable<Exception> disposer = sharedResources::close;
+ OpaqueMemoryResource<RocksDBSharedResources> opaqueResource =
+ new OpaqueMemoryResource<>(sharedResources, 1024L, disposer);
+ BloomFilter existFilter = new BloomFilter();
+ RocksDBOptionsFactory setBloomFilterOptionFactory =
+ new RocksDBOptionsFactory() {
+
+ @Override
+ public DBOptions createDBOptions(
+ DBOptions currentOptions,
Collection<AutoCloseable> handlesToClose) {
+ return currentOptions;
+ }
+
+ @Override
+ public ColumnFamilyOptions createColumnOptions(
+ ColumnFamilyOptions currentOptions,
+ Collection<AutoCloseable> handlesToClose) {
+ TableFormatConfig tableFormatConfig =
currentOptions.tableFormatConfig();
+ BlockBasedTableConfig blockBasedTableConfig =
+ tableFormatConfig == null
+ ? new BlockBasedTableConfig()
+ : (BlockBasedTableConfig)
tableFormatConfig;
+ blockBasedTableConfig.setFilter(existFilter);
+
currentOptions.setTableFormatConfig(blockBasedTableConfig);
+ return currentOptions;
+ }
+ };
+ try (RocksDBResourceContainer container =
+ new RocksDBResourceContainer(
+ PredefinedOptions.DEFAULT,
setBloomFilterOptionFactory, opaqueResource)) {
Review comment:
```suggestion
PredefinedOptions.DEFAULT,
blockBasedBloomFilterOptionFactory, opaqueResource)) {
```
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
##########
@@ -207,4 +221,37 @@ public void close() throws Exception {
sharedResources.close();
}
}
+
+ /**
+ * Overwrite configured {@link Filter} if enable partitioned filter.
Partitioned filter only
+ * worked in full bloom filter, not blocked based.
+ */
+ private void overwriteFilterIfExist(BlockBasedTableConfig
blockBasedTableConfig) {
Review comment:
Suggest to directly overwrite the filter no matter what its original
type is, since only full filters is a necessity to enable partitioned filter,
and the `handlesToClose` collection will make sure the old filter will be
closed w/o resource leak (only delayed).
What's more, it's hard to handle the case of failing to get filter from
table config through reflection, we cannot let it through silently since the
unknown filter type might be block based.
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
##########
@@ -264,4 +272,51 @@ public void testFreeWriteReadOptionsAfterClose() throws
Exception {
assertThat(writeOptions.isOwningHandle(), is(false));
assertThat(readOptions.isOwningHandle(), is(false));
}
+
+ @Test
+ public void testGetColumnFamilyOptionsWithPartitionedIndex() throws
Exception {
+ LRUCache cache = new LRUCache(1024L);
+ WriteBufferManager wbm = new WriteBufferManager(1024L, cache);
+ RocksDBSharedResources sharedResources =
+ new RocksDBSharedResources(cache, wbm, 1024L, true);
+ final ThrowingRunnable<Exception> disposer = sharedResources::close;
+ OpaqueMemoryResource<RocksDBSharedResources> opaqueResource =
+ new OpaqueMemoryResource<>(sharedResources, 1024L, disposer);
+ BloomFilter existFilter = new BloomFilter();
+ RocksDBOptionsFactory setBloomFilterOptionFactory =
+ new RocksDBOptionsFactory() {
+
+ @Override
+ public DBOptions createDBOptions(
+ DBOptions currentOptions,
Collection<AutoCloseable> handlesToClose) {
+ return currentOptions;
+ }
+
+ @Override
+ public ColumnFamilyOptions createColumnOptions(
+ ColumnFamilyOptions currentOptions,
+ Collection<AutoCloseable> handlesToClose) {
+ TableFormatConfig tableFormatConfig =
currentOptions.tableFormatConfig();
+ BlockBasedTableConfig blockBasedTableConfig =
+ tableFormatConfig == null
+ ? new BlockBasedTableConfig()
+ : (BlockBasedTableConfig)
tableFormatConfig;
+ blockBasedTableConfig.setFilter(existFilter);
+
currentOptions.setTableFormatConfig(blockBasedTableConfig);
+ return currentOptions;
+ }
+ };
+ try (RocksDBResourceContainer container =
+ new RocksDBResourceContainer(
+ PredefinedOptions.DEFAULT,
setBloomFilterOptionFactory, opaqueResource)) {
+ ColumnFamilyOptions columnOptions = container.getColumnOptions();
+ BlockBasedTableConfig actual =
+ (BlockBasedTableConfig) columnOptions.tableFormatConfig();
+ assertThat(actual.indexType(), is(IndexType.kTwoLevelIndexSearch));
+ assertThat(actual.partitionFilters(), is(true));
+ assertThat(actual.pinTopLevelIndexAndFilter(), is(true));
+ assertThat(getFilterFromBlockBasedTableConfig(actual),
not(existFilter));
+ assertFalse("Exist filter wasn't closed.",
existFilter.isOwningHandle());
+ }
Review comment:
```suggestion
assertThat(getFilterFromBlockBasedTableConfig(actual),
not(blockBasedFilter));
}
assertFalse("Block based filter is left unclosed.",
blockBasedFilter.isOwningHandle());
```
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
##########
@@ -264,4 +272,51 @@ public void testFreeWriteReadOptionsAfterClose() throws
Exception {
assertThat(writeOptions.isOwningHandle(), is(false));
assertThat(readOptions.isOwningHandle(), is(false));
}
+
+ @Test
+ public void testGetColumnFamilyOptionsWithPartitionedIndex() throws
Exception {
+ LRUCache cache = new LRUCache(1024L);
+ WriteBufferManager wbm = new WriteBufferManager(1024L, cache);
+ RocksDBSharedResources sharedResources =
+ new RocksDBSharedResources(cache, wbm, 1024L, true);
+ final ThrowingRunnable<Exception> disposer = sharedResources::close;
+ OpaqueMemoryResource<RocksDBSharedResources> opaqueResource =
+ new OpaqueMemoryResource<>(sharedResources, 1024L, disposer);
+ BloomFilter existFilter = new BloomFilter();
+ RocksDBOptionsFactory setBloomFilterOptionFactory =
Review comment:
```suggestion
RocksDBOptionsFactory blockBasedBloomFilterOptionFactory =
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]