Myasuka commented on a change in pull request #14341:
URL: https://github.com/apache/flink/pull/14341#discussion_r551108392
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
##########
@@ -162,7 +162,7 @@ public ColumnFamilyOptions
createColumnOptions(Collection<AutoCloseable> handles
final long targetFileSize = 256 * 1024 * 1024;
final long writeBufferSize = 64 * 1024 * 1024;
- BloomFilter bloomFilter = new BloomFilter();
+ BloomFilter bloomFilter = new BloomFilter(10, false);
Review comment:
I think we could add some basic description of changing to full filter
here.
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
##########
@@ -127,6 +127,14 @@
.withDescription("The approximate size (in bytes) of
user data packed per block. " +
"RocksDB has default blocksize as '4KB'.");
+ public static final ConfigOption<MemorySize> METADATA_BLOCK_SIZE =
+ key("state.backend.rocksdb.block.metadata-blocksize")
+ .memoryType()
+ .noDefaultValue()
+ .withDescription("Approximate size (in bytes) of
partitioned metadata packed per block. " +
Review comment:
As we use `MemorySize` to wrap this option, we could no longer need to
describe it as `in bytes`.
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
##########
@@ -209,4 +223,34 @@ public void close() throws Exception {
sharedResources.close();
}
}
+
+ /**
+ * Overwrite configured {@link Filter} if enable partitioned filter.
+ * Partitioned filter only worked in full bloom filter, not blocked
based.
+ */
+ private void overwriteFilterIfExist(BlockBasedTableConfig
blockBasedTableConfig) {
+ Filter filter =
getFilterFromBlockBasedTableConfig(blockBasedTableConfig);
+ if (filter != null) {
+ // TODO Can get filter's config in the future RocksDB
version, and build new filter use existing config.
+ BloomFilter newFilter = new BloomFilter(10, false);
+ LOG.warn("Overwrite existing filter if '{}' is
enabled.", RocksDBOptions.USE_PARTITIONED_INDEX_FILTERS);
+ blockBasedTableConfig.setFilter(newFilter);
+ handlesToClose.add(newFilter);
+ IOUtils.closeQuietly(filter);
+ handlesToClose.remove(filter);
+ }
+ }
+
+ @VisibleForTesting
+ protected static Filter
getFilterFromBlockBasedTableConfig(BlockBasedTableConfig blockBasedTableConfig)
{
Review comment:
This static method could be package level.
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSharedResources.java
##########
@@ -50,6 +53,10 @@ public long getWriteBufferManagerCapacity() {
return writeBufferManagerCapacity;
}
+ public boolean isUsingPartitionedIndex() {
Review comment:
I think this method could be renamed to `isUsingPartitionedIndexFilters`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]