Myasuka commented on a change in pull request #14341:
URL: https://github.com/apache/flink/pull/14341#discussion_r551108392



##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
##########
@@ -162,7 +162,7 @@ public ColumnFamilyOptions 
createColumnOptions(Collection<AutoCloseable> handles
                        final long targetFileSize = 256 * 1024 * 1024;
                        final long writeBufferSize = 64 * 1024 * 1024;
 
-                       BloomFilter bloomFilter = new BloomFilter();
+                       BloomFilter bloomFilter = new BloomFilter(10, false);

Review comment:
       I think we could add some basic description of changing to full filter 
here. 

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
##########
@@ -127,6 +127,14 @@
                        .withDescription("The approximate size (in bytes) of 
user data packed per block. " +
                                "RocksDB has default blocksize as '4KB'.");
 
+       public static final ConfigOption<MemorySize> METADATA_BLOCK_SIZE =
+               key("state.backend.rocksdb.block.metadata-blocksize")
+                       .memoryType()
+                       .noDefaultValue()
+                       .withDescription("Approximate size (in bytes) of 
partitioned metadata packed per block. " +

Review comment:
       As we use `MemorySize` to wrap this option, we could no longer need to 
describe it as `in bytes`.

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
##########
@@ -209,4 +223,34 @@ public void close() throws Exception {
                        sharedResources.close();
                }
        }
+
+       /**
+        * Overwrite configured {@link Filter} if enable partitioned filter.
+        * Partitioned filter only worked in full bloom filter, not blocked 
based.
+        */
+       private void overwriteFilterIfExist(BlockBasedTableConfig 
blockBasedTableConfig) {
+               Filter filter = 
getFilterFromBlockBasedTableConfig(blockBasedTableConfig);
+               if (filter != null) {
+                       // TODO Can get filter's config in the future RocksDB 
version, and build new filter use existing config.
+                       BloomFilter newFilter = new BloomFilter(10, false);
+                       LOG.warn("Overwrite existing filter if '{}' is 
enabled.", RocksDBOptions.USE_PARTITIONED_INDEX_FILTERS);
+                       blockBasedTableConfig.setFilter(newFilter);
+                       handlesToClose.add(newFilter);
+                       IOUtils.closeQuietly(filter);
+                       handlesToClose.remove(filter);
+               }
+       }
+
+       @VisibleForTesting
+       protected static Filter 
getFilterFromBlockBasedTableConfig(BlockBasedTableConfig blockBasedTableConfig) 
{

Review comment:
       This static method could be package level.

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSharedResources.java
##########
@@ -50,6 +53,10 @@ public long getWriteBufferManagerCapacity() {
                return writeBufferManagerCapacity;
        }
 
+       public boolean isUsingPartitionedIndex() {

Review comment:
       I think this method could be renamed to `isUsingPartitionedIndexFilters`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to