carp84 commented on a change in pull request #13688:
URL: https://github.com/apache/flink/pull/13688#discussion_r511004271
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
##########
@@ -156,9 +160,49 @@ public static ColumnFamilyDescriptor
createColumnFamilyDescriptor(
Preconditions.checkState(!Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY,
nameBytes),
"The chosen state name 'default' collides with the name
of the default column family!");
+ if (writeBufferManagerCapacity != null) {
+ // It'd be great to perform the check earlier, e.g.
when creating write buffer manager.
+ // Unfortunately the check needs write buffer size that
was just calculated.
+ sanityCheckArenaBlockSize(options.writeBufferSize(),
options.arenaBlockSize(), writeBufferManagerCapacity);
+ }
+
return new ColumnFamilyDescriptor(nameBytes, options);
}
+ /**
+ * Logs a warning ff the arena block size is too high causing RocksDB
to flush constantly.
+ * Essentially, the condition here
+ * <a
href="https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/include/rocksdb/write_buffer_manager.h#L47"/>
+ * will always be true.
Review comment:
```suggestion
* Essentially, the condition
* <a
href="https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/include/rocksdb/write_buffer_manager.h#L47">
* here</a> will always be true.
```
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryControllerUtilsTest.java
##########
@@ -91,10 +93,33 @@ public void testCreateSharedResourcesWithExpectedCapacity()
{
long totalMemorySize = 2048L;
double writeBufferRatio = 0.5;
double highPriPoolRatio = 0.1;
-
RocksDBMemoryControllerUtils.allocateRocksDBSharedResources(totalMemorySize,
writeBufferRatio, highPriPoolRatio);
+ RocksDBSharedResources rocksDBSharedResources =
RocksDBMemoryControllerUtils.allocateRocksDBSharedResources(totalMemorySize,
writeBufferRatio, highPriPoolRatio);
long expectedCacheCapacity =
RocksDBMemoryControllerUtils.calculateActualCacheCapacity(totalMemorySize,
writeBufferRatio);
long expectedWbmCapacity =
RocksDBMemoryControllerUtils.calculateWriteBufferManagerCapacity(totalMemorySize,
writeBufferRatio);
+
assertThat(actualCacheCapacity.get(),
is(expectedCacheCapacity));
assertThat(actualWbmCapacity.get(), is(expectedWbmCapacity));
+
assertThat(rocksDBSharedResources.getWriteBufferManagerCapacity(),
is(expectedWbmCapacity));
+ }
+
+ @Test
+ public void testCalculateRocksDBDefaultArenaBlockSize() {
+ long writeBufferSize = 64 * 1024 * 1024;
+ long expectArenaBlockSize = writeBufferSize / 8;
+
assertThat(RocksDBMemoryControllerUtils.calculateRocksDBDefaultArenaBlockSize(writeBufferSize),
is(expectArenaBlockSize));
+ }
+
+ @Test
+ public void testCalculateRocksDBMutableLimit() {
+ long bufferSize = 64 * 1024 * 1024;
+ long limit = bufferSize * 7 / 8;
+
assertThat(RocksDBMemoryControllerUtils.calculateRocksDBMutableLimit(bufferSize),
is(limit));
+ }
+
+ @Test
+ public void testValidateArenaBlockSize() {
+ long arenaBlockSize = 8 * 1024 * 1024;
+
assertFalse(RocksDBMemoryControllerUtils.validateArenaBlockSize(arenaBlockSize,
(long) (arenaBlockSize * 0.5)));
+
assertTrue(RocksDBMemoryControllerUtils.validateArenaBlockSize(arenaBlockSize,
(long) (arenaBlockSize * 1.5)));
}
Review comment:
I'm hesitating on adding these tests since they're testing against the
implementation instead of any contract. Once the calculation formula changes,
these tests will fail and need to be adjusted accordingly.
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOperationsUtilsTest.java
##########
@@ -78,6 +81,60 @@ public void testPathExceptionOnWindows() throws Exception {
}
}
+ @Test
+ public void testSanityCheckArenaBlockSize() {
+ long testWriteBufferSize = 56 * 1024 * 1024L;
+ long testDefaultArenaSize = testWriteBufferSize / 8;
+ long testValidArenaSize = testWriteBufferSize / 7;
+ long testInvalidArenaSize = testWriteBufferSize / 7 - 8L;
+ List<TestData> tests = Arrays.asList(
+ new TestData(testWriteBufferSize, 0,
testInvalidArenaSize, false),
+ new TestData(testWriteBufferSize,
testDefaultArenaSize, testInvalidArenaSize, false),
+ new TestData(testWriteBufferSize, 0,
testValidArenaSize, true),
+ new TestData(testWriteBufferSize,
testDefaultArenaSize, testValidArenaSize, true)
+ );
+
+ for (TestData test : tests) {
+ long writeBufferSize = test.getWriteBufferSize();
+ long arenaBlockSizeConfigured =
test.getArenaBlockSizeConfigured();
+ long writeBufferManagerCapacity =
test.getWriteBufferManagerCapacity();
+ boolean expected = test.getExpectedResult();
+
+ boolean sanityCheckResult =
RocksDBOperationUtils.sanityCheckArenaBlockSize(writeBufferSize,
arenaBlockSizeConfigured, writeBufferManagerCapacity);
+ assertThat(sanityCheckResult, is(expected));
+ }
+ }
+
+ private static class TestData {
+ private final long writeBufferSize;
+ private final long arenaBlockSizeConfigured;
+ private final long writeBufferManagerCapacity;
+ private final boolean expectedResult;
+
+ public TestData(long writeBufferSize, long
arenaBlockSizeConfigured, long writeBufferManagerCapacity, boolean
expectedResult) {
+ this.writeBufferSize = writeBufferSize;
+ this.arenaBlockSizeConfigured =
arenaBlockSizeConfigured;
+ this.writeBufferManagerCapacity =
writeBufferManagerCapacity;
+ this.expectedResult = expectedResult;
+ }
+
+ public long getWriteBufferSize() {
+ return writeBufferSize;
+ }
+
+ public long getArenaBlockSizeConfigured() {
+ return arenaBlockSizeConfigured;
+ }
+
+ public long getWriteBufferManagerCapacity() {
+ return writeBufferManagerCapacity;
+ }
+
+ public boolean getExpectedResult() {
+ return expectedResult;
+ }
+ }
Review comment:
```suggestion
public void testSanityCheckArenaBlockSize() {
long testWriteBufferSize = 56 * 1024 * 1024L;
long testDefaultArenaSize =
RocksDBMemoryControllerUtils.calculateRocksDBDefaultArenaBlockSize(testWriteBufferSize);
long testWriteBufferCapacityBoundary = testDefaultArenaSize * 8
/ 7;
assertThat("The sanity check result is incorrect with default
arena block size",
RocksDBOperationUtils.sanityCheckArenaBlockSize(testWriteBufferSize, 0,
testWriteBufferCapacityBoundary),
is(true));
assertThat("The sanity check should pass when the configured
arena block size is small enough.",
RocksDBOperationUtils.sanityCheckArenaBlockSize(testWriteBufferSize,
testDefaultArenaSize - 1, testWriteBufferCapacityBoundary),
is(true));
assertThat("The sanity check should fail when the configured
arena block size is too big.",
RocksDBOperationUtils.sanityCheckArenaBlockSize(testWriteBufferSize,
testDefaultArenaSize + 1, testWriteBufferCapacityBoundary),
is(false));
}
```
Sorry but I'm still not satisfied with this test and suggest to further
simplify it and adding some hints if any of the test fails.
And although this test is also implementation bounded, it checks/guards the
result of multiple calculations, so I think we should keep it.
We will also need to remove useless imports if the suggestion is accepted.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]