carp84 commented on a change in pull request #13393:
URL: https://github.com/apache/flink/pull/13393#discussion_r495554774
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryControllerUtils.java
##########
@@ -95,4 +95,46 @@ static Cache createCache(long cacheCapacity, double
highPriorityPoolRatio) {
static WriteBufferManager createWriteBufferManager(long
writeBufferManagerCapacity, Cache cache) {
return new WriteBufferManager(writeBufferManagerCapacity,
cache);
}
+
+ /**
+ * Calculate the default arena block size as RocksDB calculates it in
+ * <a
href="https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/db/column_family.cc#L196"/>.
+ *
+ * @return the default arena block size
+ * @param writeBufferSize the write buffer size (bytes)
+ */
+ static long calculateRocksDBDefaultArenaBlockSize(long writeBufferSize)
{
+ return writeBufferSize / 8;
+ }
+
+ /**
+ * Calculate {@code mutable_limit_} as RocksDB calculates it in
+ * <a
href="https://github.com/dataArtisans/frocksdb/blob/FRocksDB-5.17.2/memtable/write_buffer_manager.cc#L54"/>.
+ *
+ * @param bufferSize write buffer size
+ * @return mutableLimit
+ */
+ static long calculateRocksDBMutableLimit(long bufferSize) {
+ return bufferSize * 7 / 8;
+ }
+
+ /**
+ * RocksDB starts flushing the active memtable constantly in the a case
when the arena block size is greater than
+ * mutable limit
+ * (calculated as in {@link #calculateRocksDBMutableLimit(long)}).
Review comment:
```suggestion
* RocksDB starts flushing the active memtable constantly in the case
when the arena block size is greater than
* mutable limit (as calculated in {@link
#calculateRocksDBMutableLimit(long)}).
```
Remove the typo and fix line wrapping.
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
##########
@@ -526,8 +526,8 @@ public CheckpointStorage createCheckpointStorage(JobID
jobId) throws IOException
metricGroup,
stateHandles,
keyGroupCompressionDecorator,
- cancelStreamRegistry
- )
+ cancelStreamRegistry,
+ resourceContainer.getWriteBufferManagerCapacity())
Review comment:
This change could be abandoned after the suggested refactor.
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
##########
@@ -382,7 +388,8 @@ private static void checkAndCreateDirectory(File directory)
throws IOException {
nativeMetricOptions,
metricGroup,
restoreStateHandles,
- ttlCompactFiltersManager);
+ ttlCompactFiltersManager,
+ writeBufferManagerCapacity);
Review comment:
```suggestion
optionsContainer.getWriteBufferManagerCapacity();
```
We can directly use `optionsContainer.getWriteBufferManagerCapacity()` here
and don't need to pass `writeBufferManagerCapacity` through constructor
parameters.
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
##########
@@ -545,7 +549,7 @@ public void notifyCheckpointAborted(long checkpointId)
throws Exception {
StateSnapshotTransformFactory.noTransform());
newRocksStateInfo =
RocksDBOperationUtils.createStateInfo(
- newMetaInfo, db, columnFamilyOptionsFactory,
ttlCompactFiltersManager);
+ newMetaInfo, db, columnFamilyOptionsFactory,
ttlCompactFiltersManager, writeBufferManagerCapacity);
Review comment:
```suggestion
newMetaInfo, db, columnFamilyOptionsFactory,
ttlCompactFiltersManager,
optionsContainer.getWriteBufferManagerCapacity());
```
We can directly use `optionsContainer.getWriteBufferManagerCapacity()` here
and don't need to pass `writeBufferManagerCapacity` through constructor
parameters.
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
##########
@@ -488,8 +496,8 @@ private PriorityQueueSetFactory initPriorityQueueFactory(
optionsContainer.getReadOptions(),
writeBatchWrapper,
nativeMetricMonitor,
- columnFamilyOptionsFactory
- );
+ columnFamilyOptionsFactory,
+ writeBufferManagerCapacity);
Review comment:
Ditto.
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
##########
@@ -156,9 +159,43 @@ public static ColumnFamilyDescriptor
createColumnFamilyDescriptor(
Preconditions.checkState(!Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY,
nameBytes),
"The chosen state name 'default' collides with the name
of the default column family!");
+ if (writeBufferManagerCapacity != null) {
+ // It'd be great to perform the check earlier, e.g.
when creating write buffer manager.
+ // Unfortunately the check needs write buffer size that
was just calculated.
+ sanityCheckArenaBlockSize(options.writeBufferSize(),
options.arenaBlockSize(), writeBufferManagerCapacity);
+ }
+
return new ColumnFamilyDescriptor(nameBytes, options);
}
+ /**
+ * Logs a warning of the arena block size is too high causing RocksDB
to flush constantly.
+ * Essentially, the condition here
+ * <a
href="https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/include/rocksdb/write_buffer_manager.h#L47"/>
+ * will always be true.
+ *
+ * @param writeBufferSize the size of write buffer (bytes)
+ * @param arenaBlockSizeConfigured the manually configured arena block
size
+ * @param writeBufferManagerCapacity the size of the write buffer
manager (bytes)
+ * @return true is sanity check passes, false otherwise
+ */
+ public static boolean sanityCheckArenaBlockSize(long writeBufferSize,
long arenaBlockSizeConfigured, long writeBufferManagerCapacity) throws
IllegalStateException {
Review comment:
```suggestion
static boolean sanityCheckArenaBlockSize(
long writeBufferSize,
long arenaBlockSizeConfigured,
long writeBufferManagerCapacity) throws IllegalStateException {
```
Line wrapping improvement
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java
##########
@@ -67,7 +67,8 @@
new UnregisteredMetricsGroup(),
Collections.emptyList(),
UncompressedStreamCompressionDecorator.INSTANCE,
- new CloseableRegistry());
+ new CloseableRegistry(),
+ null);
Review comment:
This change could be abandoned after the suggested refactor.
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
##########
@@ -421,7 +429,7 @@ private static void checkAndCreateDirectory(File directory)
throws IOException {
metricGroup,
restoreStateHandles,
ttlCompactFiltersManager,
- writeBatchSize);
+ writeBatchSize, writeBufferManagerCapacity);
Review comment:
Ditto.
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java
##########
@@ -98,7 +99,8 @@
UncompressedStreamCompressionDecorator.INSTANCE,
db,
defaultCFHandle,
- new CloseableRegistry());
+ new CloseableRegistry(),
+ null);
Review comment:
Ditto.
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
##########
@@ -156,9 +159,43 @@ public static ColumnFamilyDescriptor
createColumnFamilyDescriptor(
Preconditions.checkState(!Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY,
nameBytes),
"The chosen state name 'default' collides with the name
of the default column family!");
+ if (writeBufferManagerCapacity != null) {
+ // It'd be great to perform the check earlier, e.g.
when creating write buffer manager.
+ // Unfortunately the check needs write buffer size that
was just calculated.
+ sanityCheckArenaBlockSize(options.writeBufferSize(),
options.arenaBlockSize(), writeBufferManagerCapacity);
+ }
+
return new ColumnFamilyDescriptor(nameBytes, options);
}
+ /**
+ * Logs a warning of the arena block size is too high causing RocksDB
to flush constantly.
+ * Essentially, the condition here
+ * <a
href="https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/include/rocksdb/write_buffer_manager.h#L47"/>
+ * will always be true.
+ *
+ * @param writeBufferSize the size of write buffer (bytes)
+ * @param arenaBlockSizeConfigured the manually configured arena block
size
+ * @param writeBufferManagerCapacity the size of the write buffer
manager (bytes)
+ * @return true is sanity check passes, false otherwise
+ */
+ public static boolean sanityCheckArenaBlockSize(long writeBufferSize,
long arenaBlockSizeConfigured, long writeBufferManagerCapacity) throws
IllegalStateException {
+ long defaultArenaBlockSize =
RocksDBMemoryControllerUtils.calculateRocksDBDefaultArenaBlockSize(writeBufferSize);
+ long arenaBlockSize = arenaBlockSizeConfigured <= 0 ?
defaultArenaBlockSize : arenaBlockSizeConfigured;
+ long mutableLimit =
RocksDBMemoryControllerUtils.calculateRocksDBMutableLimit(writeBufferManagerCapacity);
+ if
(RocksDBMemoryControllerUtils.validateArenaBlockSize(arenaBlockSize,
mutableLimit)) {
+ return true;
+ } else {
+ LOG.warn("RocksDBStateBackend performance will be poor
because of the current Flink memory configuration! " +
+ "RocksDB will flush memtable
constantly, causing high IO and CPU. " +
+ "Typically the easiest fix is to
increase task manager managed memory size. " +
+ "If running locally, see the parameter
taskmanager.memory.managed.size. " +
+ "Details: arenaBlockSize {} <
mutableLimit {} (writeBufferSize {} arenaBlockSizeConfigured {}
defaultArenaBlockSize {} writeBufferManagerCapacity {})",
+ arenaBlockSize, mutableLimit,
writeBufferSize, arenaBlockSizeConfigured, defaultArenaBlockSize,
writeBufferManagerCapacity);
Review comment:
```suggestion
LOG.warn("RocksDBStateBackend performance will be poor
because of the current Flink memory configuration! " +
"RocksDB will flush memtable
constantly, causing high IO and CPU. " +
"Typically the easiest fix is to
increase task manager managed memory size. " +
"If running locally, see the parameter
taskmanager.memory.managed.size. " +
"Details: arenaBlockSize {} <
mutableLimit {} (writeBufferSize = {}, arenaBlockSizeConfigured = {}," +
" defaultArenaBlockSize = {},
writeBufferManagerCapacity = {})",
arenaBlockSize, mutableLimit, writeBufferSize,
arenaBlockSizeConfigured,
defaultArenaBlockSize,
writeBufferManagerCapacity);
```
Line wrapping improvement.
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/StateBackendBenchmarkUtils.java
##########
@@ -101,7 +101,8 @@
new UnregisteredMetricsGroup(),
Collections.emptyList(),
AbstractStateBackend.getCompressionDecorator(executionConfig),
- new CloseableRegistry());
+ new CloseableRegistry(),
+ null);
Review comment:
Ditto.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]