carp84 commented on a change in pull request #13393:
URL: https://github.com/apache/flink/pull/13393#discussion_r502211558
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
##########
@@ -131,22 +133,23 @@ public static void registerKvStateInformation(
RegisteredStateMetaInfoBase metaInfoBase,
RocksDB db,
Function<String, ColumnFamilyOptions>
columnFamilyOptionsFactory,
- @Nullable RocksDbTtlCompactFiltersManager
ttlCompactFiltersManager) {
+ @Nullable RocksDbTtlCompactFiltersManager
ttlCompactFiltersManager, Long writeBufferManagerCapacity) {
Review comment:
```suggestion
@Nullable RocksDbTtlCompactFiltersManager
ttlCompactFiltersManager,
@Nullable Long writeBufferManagerCapacity) {
```
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
##########
@@ -151,7 +151,7 @@ public void testGetColumnFamilyOptionsWithSharedResources()
throws Exception {
final long cacheSize = 1024L, writeBufferSize = 512L;
final LRUCache cache = new LRUCache(cacheSize, -1, false, 0.1);
final WriteBufferManager wbm = new
WriteBufferManager(writeBufferSize, cache);
- RocksDBSharedResources rocksDBSharedResources = new
RocksDBSharedResources(cache, wbm);
+ RocksDBSharedResources rocksDBSharedResources = new
RocksDBSharedResources(cache, wbm, 512L);
Review comment:
```suggestion
RocksDBSharedResources rocksDBSharedResources = new
RocksDBSharedResources(cache, wbm, writeBufferSize);
```
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
##########
@@ -131,22 +133,23 @@ public static void registerKvStateInformation(
RegisteredStateMetaInfoBase metaInfoBase,
RocksDB db,
Function<String, ColumnFamilyOptions>
columnFamilyOptionsFactory,
- @Nullable RocksDbTtlCompactFiltersManager
ttlCompactFiltersManager) {
+ @Nullable RocksDbTtlCompactFiltersManager
ttlCompactFiltersManager, Long writeBufferManagerCapacity) {
ColumnFamilyDescriptor columnFamilyDescriptor =
createColumnFamilyDescriptor(
- metaInfoBase, columnFamilyOptionsFactory,
ttlCompactFiltersManager);
+ metaInfoBase, columnFamilyOptionsFactory,
ttlCompactFiltersManager, writeBufferManagerCapacity);
return new
RocksDBKeyedStateBackend.RocksDbKvStateInfo(createColumnFamily(columnFamilyDescriptor,
db), metaInfoBase);
}
/**
- * Creates a column descriptor for sate column family.
+ * Creates a column descriptor for a state column family.
*
* <p>Sets TTL compaction filter if {@code ttlCompactFiltersManager} is
not {@code null}.
*/
public static ColumnFamilyDescriptor createColumnFamilyDescriptor(
RegisteredStateMetaInfoBase metaInfoBase,
Function<String, ColumnFamilyOptions>
columnFamilyOptionsFactory,
- @Nullable RocksDbTtlCompactFiltersManager
ttlCompactFiltersManager) {
+ @Nullable RocksDbTtlCompactFiltersManager
ttlCompactFiltersManager,
+ Long writeBufferManagerCapacity) {
Review comment:
```suggestion
@Nullable Long writeBufferManagerCapacity) {
```
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOperationsUtilsTest.java
##########
@@ -78,6 +82,60 @@ public void testPathExceptionOnWindows() throws Exception {
}
}
+ @Test
+ public void testSanityCheckArenaBlockSize() {
+ List<TestData> tests = Arrays.asList(
+ new TestData(67108864, 0, 8388608, false),
+ new TestData(67108864, 8388608, 8388608, false),
+ new TestData(67108864, 0, 11184810, true),
+ new TestData(67108864, 8388608, 11184810, true)
+ );
+
+ for (TestData test : tests) {
+ long writeBufferSize = test.getWriteBufferSize();
+ long arenaBlockSizeConfigured =
test.getArenaBlockSizeConfigured();
+ long writeBufferManagerCapacity =
test.getWriteBufferManagerCapacity();
+ boolean expected = test.isExpected();
+
+ boolean isOk =
RocksDBOperationUtils.sanityCheckArenaBlockSize(writeBufferSize,
arenaBlockSizeConfigured, writeBufferManagerCapacity);
+ if (expected) {
+ assertTrue(isOk);
+ } else {
+ assertFalse(isOk);
+ }
+ }
+ }
+
+ private static class TestData {
+ private final long writeBufferSize;
+ private final long arenaBlockSizeConfigured;
+ private final long writeBufferManagerCapacity;
+ private final boolean expected;
+
+ public TestData(long writeBufferSize, long
arenaBlockSizeConfigured, long writeBufferManagerCapacity, boolean expected) {
+ this.writeBufferSize = writeBufferSize;
+ this.arenaBlockSizeConfigured =
arenaBlockSizeConfigured;
+ this.writeBufferManagerCapacity =
writeBufferManagerCapacity;
+ this.expected = expected;
+ }
+
+ public long getWriteBufferSize() {
+ return writeBufferSize;
+ }
+
+ public long getArenaBlockSizeConfigured() {
+ return arenaBlockSizeConfigured;
+ }
+
+ public long getWriteBufferManagerCapacity() {
+ return writeBufferManagerCapacity;
+ }
+
+ public boolean isExpected() {
Review comment:
```suggestion
public boolean expectedResult() {
```
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOperationsUtilsTest.java
##########
@@ -78,6 +82,60 @@ public void testPathExceptionOnWindows() throws Exception {
}
}
+ @Test
+ public void testSanityCheckArenaBlockSize() {
+ List<TestData> tests = Arrays.asList(
+ new TestData(67108864, 0, 8388608, false),
+ new TestData(67108864, 8388608, 8388608, false),
+ new TestData(67108864, 0, 11184810, true),
+ new TestData(67108864, 8388608, 11184810, true)
+ );
Review comment:
```suggestion
long testWriteBufferSize = 56 * 1024 * 1024L;
long testDefaultArenaSize = testWriteBufferSize / 8;
long testValidArenaSize = testWriteBufferSize / 7;
long testInvalidArenaSize = testWriteBufferSize / 7 - 8L;
List<TestData> tests = Arrays.asList(
new TestData(testWriteBufferSize, 0,
testInvalidArenaSize, false),
new TestData(testWriteBufferSize, testDefaultArenaSize,
testInvalidArenaSize, false),
new TestData(testWriteBufferSize, 0,
testValidArenaSize, true),
new TestData(testWriteBufferSize, testDefaultArenaSize,
testValidArenaSize, true)
);
```
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryControllerUtils.java
##########
@@ -95,4 +95,45 @@ static Cache createCache(long cacheCapacity, double
highPriorityPoolRatio) {
static WriteBufferManager createWriteBufferManager(long
writeBufferManagerCapacity, Cache cache) {
return new WriteBufferManager(writeBufferManagerCapacity,
cache);
}
+
+ /**
+ * Calculate the default arena block size as RocksDB calculates it in
+ * <a
href="https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/db/column_family.cc#L196"/>.
+ *
+ * @return the default arena block size
+ * @param writeBufferSize the write buffer size (bytes)
+ */
+ static long calculateRocksDBDefaultArenaBlockSize(long writeBufferSize)
{
+ return writeBufferSize / 8;
+ }
+
+ /**
+ * Calculate {@code mutable_limit_} as RocksDB calculates it in
+ * <a
href="https://github.com/dataArtisans/frocksdb/blob/FRocksDB-5.17.2/memtable/write_buffer_manager.cc#L54"/>.
+ *
+ * @param bufferSize write buffer size
+ * @return mutableLimit
+ */
+ static long calculateRocksDBMutableLimit(long bufferSize) {
+ return bufferSize * 7 / 8;
+ }
+
+ /**
+ * RocksDB starts flushing the active memtable constantly in the case
when the arena block size is greater than
+ * mutable limit (as calculated in {@link
#calculateRocksDBMutableLimit(long)}).
+ *
+ * <p>This happens because in such a case the check here
+ * <a
href="https://github.com/dataArtisans/frocksdb/blob/958f191d3f7276ae59b270f9db8390034d549ee0/include/rocksdb/write_buffer_manager.h#L47"/>
+ * is always true.
+ *
+ * <p>This method checks that arena block size is smaller than mutable
limit.
+ *
+ * @param arenaBlockSize Arena block size
+ * @param mutableLimit mutable limit
+ * @return whether arena block size is sensible
+ */
+ @VisibleForTesting
+ static boolean validateArenaBlockSize(long arenaBlockSize, long
mutableLimit) {
+ return arenaBlockSize < mutableLimit;
Review comment:
```suggestion
return arenaBlockSize <= mutableLimit;
```
It should be ok if the arena block size equals to mutable limit according to
the RocksDB code
[here](https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/include/rocksdb/write_buffer_manager.h#L47)?
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
##########
@@ -156,9 +159,49 @@ public static ColumnFamilyDescriptor
createColumnFamilyDescriptor(
Preconditions.checkState(!Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY,
nameBytes),
"The chosen state name 'default' collides with the name
of the default column family!");
+ if (writeBufferManagerCapacity != null) {
+ // It'd be great to perform the check earlier, e.g.
when creating write buffer manager.
+ // Unfortunately the check needs write buffer size that
was just calculated.
+ sanityCheckArenaBlockSize(options.writeBufferSize(),
options.arenaBlockSize(), writeBufferManagerCapacity);
+ }
+
return new ColumnFamilyDescriptor(nameBytes, options);
}
+ /**
+ * Logs a warning of the arena block size is too high causing RocksDB
to flush constantly.
+ * Essentially, the condition here
+ * <a
href="https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/include/rocksdb/write_buffer_manager.h#L47"/>
+ * will always be true.
+ *
+ * @param writeBufferSize the size of write buffer (bytes)
+ * @param arenaBlockSizeConfigured the manually configured arena block
size
+ * @param writeBufferManagerCapacity the size of the write buffer
manager (bytes)
+ * @return true is sanity check passes, false otherwise
Review comment:
```suggestion
* @return true if the sanity check passes, false otherwise
```
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
##########
@@ -156,9 +159,49 @@ public static ColumnFamilyDescriptor
createColumnFamilyDescriptor(
Preconditions.checkState(!Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY,
nameBytes),
"The chosen state name 'default' collides with the name
of the default column family!");
+ if (writeBufferManagerCapacity != null) {
+ // It'd be great to perform the check earlier, e.g.
when creating write buffer manager.
+ // Unfortunately the check needs write buffer size that
was just calculated.
+ sanityCheckArenaBlockSize(options.writeBufferSize(),
options.arenaBlockSize(), writeBufferManagerCapacity);
+ }
+
return new ColumnFamilyDescriptor(nameBytes, options);
}
+ /**
+ * Logs a warning of the arena block size is too high causing RocksDB
to flush constantly.
Review comment:
```suggestion
* Logs a warning if the arena block size is too high causing RocksDB
to flush constantly.
```
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryControllerUtils.java
##########
@@ -95,4 +95,45 @@ static Cache createCache(long cacheCapacity, double
highPriorityPoolRatio) {
static WriteBufferManager createWriteBufferManager(long
writeBufferManagerCapacity, Cache cache) {
return new WriteBufferManager(writeBufferManagerCapacity,
cache);
}
+
+ /**
+ * Calculate the default arena block size as RocksDB calculates it in
+ * <a
href="https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/db/column_family.cc#L196"/>.
+ *
+ * @return the default arena block size
+ * @param writeBufferSize the write buffer size (bytes)
+ */
+ static long calculateRocksDBDefaultArenaBlockSize(long writeBufferSize)
{
+ return writeBufferSize / 8;
+ }
+
+ /**
+ * Calculate {@code mutable_limit_} as RocksDB calculates it in
+ * <a
href="https://github.com/dataArtisans/frocksdb/blob/FRocksDB-5.17.2/memtable/write_buffer_manager.cc#L54"/>.
Review comment:
```suggestion
* Calculate {@code mutable_limit_} as RocksDB calculates it
* <a
href="https://github.com/dataArtisans/frocksdb/blob/FRocksDB-5.17.2/memtable/write_buffer_manager.cc#L54"/>
* here</a>.
```
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryControllerUtils.java
##########
@@ -95,4 +95,45 @@ static Cache createCache(long cacheCapacity, double
highPriorityPoolRatio) {
static WriteBufferManager createWriteBufferManager(long
writeBufferManagerCapacity, Cache cache) {
return new WriteBufferManager(writeBufferManagerCapacity,
cache);
}
+
+ /**
+ * Calculate the default arena block size as RocksDB calculates it in
+ * <a
href="https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/db/column_family.cc#L196"/>.
+ *
+ * @return the default arena block size
+ * @param writeBufferSize the write buffer size (bytes)
+ */
+ static long calculateRocksDBDefaultArenaBlockSize(long writeBufferSize)
{
+ return writeBufferSize / 8;
+ }
+
+ /**
+ * Calculate {@code mutable_limit_} as RocksDB calculates it in
+ * <a
href="https://github.com/dataArtisans/frocksdb/blob/FRocksDB-5.17.2/memtable/write_buffer_manager.cc#L54"/>.
+ *
+ * @param bufferSize write buffer size
+ * @return mutableLimit
+ */
+ static long calculateRocksDBMutableLimit(long bufferSize) {
+ return bufferSize * 7 / 8;
+ }
+
+ /**
+ * RocksDB starts flushing the active memtable constantly in the case
when the arena block size is greater than
+ * mutable limit (as calculated in {@link
#calculateRocksDBMutableLimit(long)}).
+ *
+ * <p>This happens because in such a case the check here
+ * <a
href="https://github.com/dataArtisans/frocksdb/blob/958f191d3f7276ae59b270f9db8390034d549ee0/include/rocksdb/write_buffer_manager.h#L47"/>
+ * is always true.
Review comment:
```suggestion
* <p>This happens because in such a case the check
* <a
href="https://github.com/dataArtisans/frocksdb/blob/958f191d3f7276ae59b270f9db8390034d549ee0/include/rocksdb/write_buffer_manager.h#L47">
* here</a> is always true.
```
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryControllerUtils.java
##########
@@ -95,4 +95,45 @@ static Cache createCache(long cacheCapacity, double
highPriorityPoolRatio) {
static WriteBufferManager createWriteBufferManager(long
writeBufferManagerCapacity, Cache cache) {
return new WriteBufferManager(writeBufferManagerCapacity,
cache);
}
+
+ /**
+ * Calculate the default arena block size as RocksDB calculates it in
+ * <a
href="https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/db/column_family.cc#L196"/>.
+ *
+ * @return the default arena block size
+ * @param writeBufferSize the write buffer size (bytes)
+ */
+ static long calculateRocksDBDefaultArenaBlockSize(long writeBufferSize)
{
+ return writeBufferSize / 8;
+ }
+
+ /**
+ * Calculate {@code mutable_limit_} as RocksDB calculates it in
+ * <a
href="https://github.com/dataArtisans/frocksdb/blob/FRocksDB-5.17.2/memtable/write_buffer_manager.cc#L54"/>.
+ *
+ * @param bufferSize write buffer size
+ * @return mutableLimit
+ */
+ static long calculateRocksDBMutableLimit(long bufferSize) {
+ return bufferSize * 7 / 8;
+ }
+
+ /**
+ * RocksDB starts flushing the active memtable constantly in the case
when the arena block size is greater than
+ * mutable limit (as calculated in {@link
#calculateRocksDBMutableLimit(long)}).
+ *
+ * <p>This happens because in such a case the check here
+ * <a
href="https://github.com/dataArtisans/frocksdb/blob/958f191d3f7276ae59b270f9db8390034d549ee0/include/rocksdb/write_buffer_manager.h#L47"/>
+ * is always true.
Review comment:
```suggestion
* <p>This happens because in such a case the check
* <a
href="https://github.com/dataArtisans/frocksdb/blob/958f191d3f7276ae59b270f9db8390034d549ee0/include/rocksdb/write_buffer_manager.h#L47">
* here</a> is always true.
```
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryControllerUtils.java
##########
@@ -95,4 +95,45 @@ static Cache createCache(long cacheCapacity, double
highPriorityPoolRatio) {
static WriteBufferManager createWriteBufferManager(long
writeBufferManagerCapacity, Cache cache) {
return new WriteBufferManager(writeBufferManagerCapacity,
cache);
}
+
+ /**
+ * Calculate the default arena block size as RocksDB calculates it in
+ * <a
href="https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/db/column_family.cc#L196"/>.
Review comment:
```suggestion
* Calculate the default arena block size as RocksDB calculates it
* <a
href="https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/db/column_family.cc#L196">
* here</a>.
```
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOperationsUtilsTest.java
##########
@@ -78,6 +82,60 @@ public void testPathExceptionOnWindows() throws Exception {
}
}
+ @Test
+ public void testSanityCheckArenaBlockSize() {
+ List<TestData> tests = Arrays.asList(
+ new TestData(67108864, 0, 8388608, false),
+ new TestData(67108864, 8388608, 8388608, false),
+ new TestData(67108864, 0, 11184810, true),
+ new TestData(67108864, 8388608, 11184810, true)
+ );
+
+ for (TestData test : tests) {
+ long writeBufferSize = test.getWriteBufferSize();
+ long arenaBlockSizeConfigured =
test.getArenaBlockSizeConfigured();
+ long writeBufferManagerCapacity =
test.getWriteBufferManagerCapacity();
+ boolean expected = test.isExpected();
Review comment:
```suggestion
boolean expected = test.expectedResult();
```
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOperationsUtilsTest.java
##########
@@ -78,6 +82,60 @@ public void testPathExceptionOnWindows() throws Exception {
}
}
+ @Test
+ public void testSanityCheckArenaBlockSize() {
+ List<TestData> tests = Arrays.asList(
+ new TestData(67108864, 0, 8388608, false),
+ new TestData(67108864, 8388608, 8388608, false),
+ new TestData(67108864, 0, 11184810, true),
+ new TestData(67108864, 8388608, 11184810, true)
+ );
+
+ for (TestData test : tests) {
+ long writeBufferSize = test.getWriteBufferSize();
+ long arenaBlockSizeConfigured =
test.getArenaBlockSizeConfigured();
+ long writeBufferManagerCapacity =
test.getWriteBufferManagerCapacity();
+ boolean expected = test.isExpected();
+
+ boolean isOk =
RocksDBOperationUtils.sanityCheckArenaBlockSize(writeBufferSize,
arenaBlockSizeConfigured, writeBufferManagerCapacity);
+ if (expected) {
+ assertTrue(isOk);
+ } else {
+ assertFalse(isOk);
+ }
Review comment:
```suggestion
boolean sanityCheckResult =
RocksDBOperationUtils.sanityCheckArenaBlockSize(writeBufferSize,
arenaBlockSizeConfigured, writeBufferManagerCapacity);
assertThat(sanityCheckResult, is(expected));
```
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOperationsUtilsTest.java
##########
@@ -32,10 +32,14 @@
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
import static org.hamcrest.Matchers.containsString;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
Review comment:
```suggestion
import static org.junit.Assert.assertThat;
```
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
##########
@@ -156,9 +159,49 @@ public static ColumnFamilyDescriptor
createColumnFamilyDescriptor(
Preconditions.checkState(!Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY,
nameBytes),
"The chosen state name 'default' collides with the name
of the default column family!");
+ if (writeBufferManagerCapacity != null) {
+ // It'd be great to perform the check earlier, e.g.
when creating write buffer manager.
+ // Unfortunately the check needs write buffer size that
was just calculated.
+ sanityCheckArenaBlockSize(options.writeBufferSize(),
options.arenaBlockSize(), writeBufferManagerCapacity);
+ }
+
return new ColumnFamilyDescriptor(nameBytes, options);
}
+ /**
+ * Logs a warning of the arena block size is too high causing RocksDB
to flush constantly.
+ * Essentially, the condition here
+ * <a
href="https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/include/rocksdb/write_buffer_manager.h#L47"/>
+ * will always be true.
+ *
+ * @param writeBufferSize the size of write buffer (bytes)
+ * @param arenaBlockSizeConfigured the manually configured arena block
size
+ * @param writeBufferManagerCapacity the size of the write buffer
manager (bytes)
+ * @return true is sanity check passes, false otherwise
+ */
+ static boolean sanityCheckArenaBlockSize(
+ long writeBufferSize,
+ long arenaBlockSizeConfigured,
+ long writeBufferManagerCapacity) throws IllegalStateException {
+
+ long defaultArenaBlockSize =
RocksDBMemoryControllerUtils.calculateRocksDBDefaultArenaBlockSize(writeBufferSize);
+ long arenaBlockSize = arenaBlockSizeConfigured <= 0 ?
defaultArenaBlockSize : arenaBlockSizeConfigured;
+ long mutableLimit =
RocksDBMemoryControllerUtils.calculateRocksDBMutableLimit(writeBufferManagerCapacity);
+ if
(RocksDBMemoryControllerUtils.validateArenaBlockSize(arenaBlockSize,
mutableLimit)) {
+ return true;
+ } else {
+ LOG.warn("RocksDBStateBackend performance will be poor
because of the current Flink memory configuration! " +
+ "RocksDB will flush memtable
constantly, causing high IO and CPU. " +
+ "Typically the easiest fix is to
increase task manager managed memory size. " +
+ "If running locally, see the parameter
taskmanager.memory.managed.size. " +
+ "Details: arenaBlockSize {} <
mutableLimit {} (writeBufferSize = {}, arenaBlockSizeConfigured = {}," +
Review comment:
```suggestion
"Details: arenaBlockSize {} >
mutableLimit {} (writeBufferSize = {}, arenaBlockSizeConfigured = {}," +
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]