carp84 commented on a change in pull request #14341:
URL: https://github.com/apache/flink/pull/14341#discussion_r579248839



##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
##########
@@ -143,6 +151,12 @@ public ColumnFamilyOptions getColumnOptions() {
                         "We currently only support BlockBasedTableConfig When 
bounding total memory.");
                 blockBasedTableConfig = (BlockBasedTableConfig) 
tableFormatConfig;
             }
+            if (rocksResources.isUsingPartitionedIndexFilters()) {
+                
blockBasedTableConfig.setIndexType(IndexType.kTwoLevelIndexSearch);
+                blockBasedTableConfig.setPartitionFilters(true);
+                blockBasedTableConfig.setPinTopLevelIndexAndFilter(true);
+                this.overwriteFilterIfExist(blockBasedTableConfig);
+            }

Review comment:
       ```suggestion
               if (rocksResources.isUsingPartitionedIndexFilters() && 
overwriteFilterIfExist(blockBasedTableConfig)) {
                   
blockBasedTableConfig.setIndexType(IndexType.kTwoLevelIndexSearch);
                   blockBasedTableConfig.setPartitionFilters(true);
                   blockBasedTableConfig.setPinTopLevelIndexAndFilter(true);
               }
   ```
   This suggestion indicates that full filters is a necessity to enable 
partitioned index filters, and if the `overwriteFilterIfExist` is changed to 
always do the overwriting, then it will always return true and the code could 
be adjusted accordingly.

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
##########
@@ -264,4 +272,51 @@ public void testFreeWriteReadOptionsAfterClose() throws 
Exception {
         assertThat(writeOptions.isOwningHandle(), is(false));
         assertThat(readOptions.isOwningHandle(), is(false));
     }
+
+    @Test
+    public void testGetColumnFamilyOptionsWithPartitionedIndex() throws 
Exception {
+        LRUCache cache = new LRUCache(1024L);
+        WriteBufferManager wbm = new WriteBufferManager(1024L, cache);
+        RocksDBSharedResources sharedResources =
+                new RocksDBSharedResources(cache, wbm, 1024L, true);
+        final ThrowingRunnable<Exception> disposer = sharedResources::close;
+        OpaqueMemoryResource<RocksDBSharedResources> opaqueResource =
+                new OpaqueMemoryResource<>(sharedResources, 1024L, disposer);
+        BloomFilter existFilter = new BloomFilter();
+        RocksDBOptionsFactory setBloomFilterOptionFactory =
+                new RocksDBOptionsFactory() {
+
+                    @Override
+                    public DBOptions createDBOptions(
+                            DBOptions currentOptions, 
Collection<AutoCloseable> handlesToClose) {
+                        return currentOptions;
+                    }
+
+                    @Override
+                    public ColumnFamilyOptions createColumnOptions(
+                            ColumnFamilyOptions currentOptions,
+                            Collection<AutoCloseable> handlesToClose) {
+                        TableFormatConfig tableFormatConfig = 
currentOptions.tableFormatConfig();
+                        BlockBasedTableConfig blockBasedTableConfig =
+                                tableFormatConfig == null
+                                        ? new BlockBasedTableConfig()
+                                        : (BlockBasedTableConfig) 
tableFormatConfig;
+                        blockBasedTableConfig.setFilter(existFilter);

Review comment:
       ```suggestion
                           blockBasedTableConfig.setFilter(blockBasedFilter);
   ```

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
##########
@@ -264,4 +272,51 @@ public void testFreeWriteReadOptionsAfterClose() throws 
Exception {
         assertThat(writeOptions.isOwningHandle(), is(false));
         assertThat(readOptions.isOwningHandle(), is(false));
     }
+
+    @Test
+    public void testGetColumnFamilyOptionsWithPartitionedIndex() throws 
Exception {
+        LRUCache cache = new LRUCache(1024L);
+        WriteBufferManager wbm = new WriteBufferManager(1024L, cache);
+        RocksDBSharedResources sharedResources =
+                new RocksDBSharedResources(cache, wbm, 1024L, true);
+        final ThrowingRunnable<Exception> disposer = sharedResources::close;
+        OpaqueMemoryResource<RocksDBSharedResources> opaqueResource =
+                new OpaqueMemoryResource<>(sharedResources, 1024L, disposer);
+        BloomFilter existFilter = new BloomFilter();

Review comment:
       ```suggestion
           BloomFilter blockBasedFilter = new BloomFilter();
   ```

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
##########
@@ -207,4 +221,37 @@ public void close() throws Exception {
             sharedResources.close();
         }
     }
+
+    /**
+     * Overwrite configured {@link Filter} if enable partitioned filter. 
Partitioned filter only
+     * worked in full bloom filter, not blocked based.

Review comment:
       ```suggestion
        * Overwrite configured {@link Filter} when partitioned filter is 
enabled, since partitioned filter only
        * works with full bloom filter, not blocked based.
   ```

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
##########
@@ -264,4 +272,51 @@ public void testFreeWriteReadOptionsAfterClose() throws 
Exception {
         assertThat(writeOptions.isOwningHandle(), is(false));
         assertThat(readOptions.isOwningHandle(), is(false));
     }
+
+    @Test
+    public void testGetColumnFamilyOptionsWithPartitionedIndex() throws 
Exception {
+        LRUCache cache = new LRUCache(1024L);
+        WriteBufferManager wbm = new WriteBufferManager(1024L, cache);
+        RocksDBSharedResources sharedResources =
+                new RocksDBSharedResources(cache, wbm, 1024L, true);
+        final ThrowingRunnable<Exception> disposer = sharedResources::close;
+        OpaqueMemoryResource<RocksDBSharedResources> opaqueResource =
+                new OpaqueMemoryResource<>(sharedResources, 1024L, disposer);
+        BloomFilter existFilter = new BloomFilter();
+        RocksDBOptionsFactory setBloomFilterOptionFactory =
+                new RocksDBOptionsFactory() {
+
+                    @Override
+                    public DBOptions createDBOptions(
+                            DBOptions currentOptions, 
Collection<AutoCloseable> handlesToClose) {
+                        return currentOptions;
+                    }
+
+                    @Override
+                    public ColumnFamilyOptions createColumnOptions(
+                            ColumnFamilyOptions currentOptions,
+                            Collection<AutoCloseable> handlesToClose) {
+                        TableFormatConfig tableFormatConfig = 
currentOptions.tableFormatConfig();
+                        BlockBasedTableConfig blockBasedTableConfig =
+                                tableFormatConfig == null
+                                        ? new BlockBasedTableConfig()
+                                        : (BlockBasedTableConfig) 
tableFormatConfig;
+                        blockBasedTableConfig.setFilter(existFilter);
+                        
currentOptions.setTableFormatConfig(blockBasedTableConfig);
+                        return currentOptions;
+                    }
+                };
+        try (RocksDBResourceContainer container =
+                new RocksDBResourceContainer(
+                        PredefinedOptions.DEFAULT, 
setBloomFilterOptionFactory, opaqueResource)) {

Review comment:
       ```suggestion
                           PredefinedOptions.DEFAULT, 
blockBasedBloomFilterOptionFactory, opaqueResource)) {
   ```

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
##########
@@ -207,4 +221,37 @@ public void close() throws Exception {
             sharedResources.close();
         }
     }
+
+    /**
+     * Overwrite configured {@link Filter} if enable partitioned filter. 
Partitioned filter only
+     * worked in full bloom filter, not blocked based.
+     */
+    private void overwriteFilterIfExist(BlockBasedTableConfig 
blockBasedTableConfig) {

Review comment:
       Suggest to directly overwrite the filter no matter what its original 
type is, since only full filters is a necessity to enable partitioned filter, 
and the `handlesToClose` collection will make sure the old filter will be 
closed w/o resource leak (only delayed).
   
   What's more, it's hard to handle the case of failing to get filter from 
table config through reflection, we cannot let it through silently since the 
unknown filter type might be block based.

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
##########
@@ -264,4 +272,51 @@ public void testFreeWriteReadOptionsAfterClose() throws 
Exception {
         assertThat(writeOptions.isOwningHandle(), is(false));
         assertThat(readOptions.isOwningHandle(), is(false));
     }
+
+    @Test
+    public void testGetColumnFamilyOptionsWithPartitionedIndex() throws 
Exception {
+        LRUCache cache = new LRUCache(1024L);
+        WriteBufferManager wbm = new WriteBufferManager(1024L, cache);
+        RocksDBSharedResources sharedResources =
+                new RocksDBSharedResources(cache, wbm, 1024L, true);
+        final ThrowingRunnable<Exception> disposer = sharedResources::close;
+        OpaqueMemoryResource<RocksDBSharedResources> opaqueResource =
+                new OpaqueMemoryResource<>(sharedResources, 1024L, disposer);
+        BloomFilter existFilter = new BloomFilter();
+        RocksDBOptionsFactory setBloomFilterOptionFactory =
+                new RocksDBOptionsFactory() {
+
+                    @Override
+                    public DBOptions createDBOptions(
+                            DBOptions currentOptions, 
Collection<AutoCloseable> handlesToClose) {
+                        return currentOptions;
+                    }
+
+                    @Override
+                    public ColumnFamilyOptions createColumnOptions(
+                            ColumnFamilyOptions currentOptions,
+                            Collection<AutoCloseable> handlesToClose) {
+                        TableFormatConfig tableFormatConfig = 
currentOptions.tableFormatConfig();
+                        BlockBasedTableConfig blockBasedTableConfig =
+                                tableFormatConfig == null
+                                        ? new BlockBasedTableConfig()
+                                        : (BlockBasedTableConfig) 
tableFormatConfig;
+                        blockBasedTableConfig.setFilter(existFilter);
+                        
currentOptions.setTableFormatConfig(blockBasedTableConfig);
+                        return currentOptions;
+                    }
+                };
+        try (RocksDBResourceContainer container =
+                new RocksDBResourceContainer(
+                        PredefinedOptions.DEFAULT, 
setBloomFilterOptionFactory, opaqueResource)) {
+            ColumnFamilyOptions columnOptions = container.getColumnOptions();
+            BlockBasedTableConfig actual =
+                    (BlockBasedTableConfig) columnOptions.tableFormatConfig();
+            assertThat(actual.indexType(), is(IndexType.kTwoLevelIndexSearch));
+            assertThat(actual.partitionFilters(), is(true));
+            assertThat(actual.pinTopLevelIndexAndFilter(), is(true));
+            assertThat(getFilterFromBlockBasedTableConfig(actual), 
not(existFilter));
+            assertFalse("Exist filter wasn't closed.", 
existFilter.isOwningHandle());
+        }

Review comment:
       ```suggestion
               assertThat(getFilterFromBlockBasedTableConfig(actual), 
not(blockBasedFilter));
           }
           assertFalse("Block based filter is left unclosed.", 
blockBasedFilter.isOwningHandle());
   ```

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
##########
@@ -264,4 +272,51 @@ public void testFreeWriteReadOptionsAfterClose() throws 
Exception {
         assertThat(writeOptions.isOwningHandle(), is(false));
         assertThat(readOptions.isOwningHandle(), is(false));
     }
+
+    @Test
+    public void testGetColumnFamilyOptionsWithPartitionedIndex() throws 
Exception {
+        LRUCache cache = new LRUCache(1024L);
+        WriteBufferManager wbm = new WriteBufferManager(1024L, cache);
+        RocksDBSharedResources sharedResources =
+                new RocksDBSharedResources(cache, wbm, 1024L, true);
+        final ThrowingRunnable<Exception> disposer = sharedResources::close;
+        OpaqueMemoryResource<RocksDBSharedResources> opaqueResource =
+                new OpaqueMemoryResource<>(sharedResources, 1024L, disposer);
+        BloomFilter existFilter = new BloomFilter();
+        RocksDBOptionsFactory setBloomFilterOptionFactory =

Review comment:
       ```suggestion
           RocksDBOptionsFactory blockBasedBloomFilterOptionFactory =
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to