AlanConfluent commented on code in PR #21778:
URL: https://github.com/apache/flink/pull/21778#discussion_r1091448435
##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryControllerUtils.java:
##########
@@ -168,4 +170,26 @@ static long calculateRocksDBMutableLimit(long bufferSize) {
static boolean validateArenaBlockSize(long arenaBlockSize, long
mutableLimit) {
return arenaBlockSize <= mutableLimit;
}
+
+ /** Factory for Write Buffer Manager and Bock Cache. */
+ public interface RocksDBMemoryFactory extends Serializable {
+ Cache createCache(long cacheCapacity, double highPriorityPoolRatio);
+
+ WriteBufferManager createWriteBufferManager(long
writeBufferManagerCapacity, Cache cache);
+
+ RocksDBMemoryFactory DEFAULT =
+ new RocksDBMemoryFactory() {
+ @Override
+ public Cache createCache(long cacheCapacity, double
highPriorityPoolRatio) {
+ // TODO use strict capacity limit until FLINK-15532
resolved
+ return new LRUCache(cacheCapacity, -1, false,
highPriorityPoolRatio);
Review Comment:
Any reason not to just call the existing static method
`RocksDBMemoryControllerUtils.createCache`?
##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java:
##########
@@ -175,6 +176,9 @@ public class EmbeddedRocksDBStateBackend extends
AbstractManagedMemoryStateBacke
* key-group range.
*/
private double overlapFractionThreshold;
+
+ /** Factory for Write Buffer Manager and Bock Cache. */
Review Comment:
Nit: "Block Cache"
##########
flink-tests/src/test/java/org/apache/flink/test/state/TaskManagerWideRocksDbMemorySharingITCase.java:
##########
@@ -100,116 +81,48 @@ public void init() throws Exception {
@After
public void destroy() {
cluster.after();
- metricsReporter.close();
}
- @Ignore
@Test
public void testBlockCache() throws Exception {
+ List<Cache> createdCaches = new CopyOnWriteArrayList<>();
+ List<WriteBufferManager> createdWriteBufferManagers = new
CopyOnWriteArrayList<>();
+ TestingRocksDBMemoryFactory memoryFactory =
+ new TestingRocksDBMemoryFactory(
+ sharedObjects.add(createdCaches),
+ sharedObjects.add(createdWriteBufferManagers));
List<JobID> jobIDs = new ArrayList<>(NUMBER_OF_JOBS);
try {
- // launch jobs
for (int i = 0; i < NUMBER_OF_JOBS; i++) {
-
jobIDs.add(cluster.getRestClusterClient().submitJob(dag()).get());
+
jobIDs.add(cluster.getRestClusterClient().submitJob(dag(memoryFactory)).get());
}
-
- // wait for init
- Deadline initDeadline = Deadline.fromNow(Duration.ofMinutes(1));
for (JobID jid : jobIDs) {
waitForAllTaskRunning(cluster.getMiniCluster(), jid, false);
- waitForAllMetricsReported(jid, initDeadline);
}
+ Assert.assertEquals(1, createdCaches.size());
Review Comment:
Presumably, if they were not sharing the cache as intended, you would have
more than one in this list right?
##########
flink-tests/src/test/java/org/apache/flink/test/state/TaskManagerWideRocksDbMemorySharingITCase.java:
##########
@@ -100,116 +81,48 @@ public void init() throws Exception {
@After
public void destroy() {
cluster.after();
- metricsReporter.close();
}
- @Ignore
@Test
public void testBlockCache() throws Exception {
+ List<Cache> createdCaches = new CopyOnWriteArrayList<>();
+ List<WriteBufferManager> createdWriteBufferManagers = new
CopyOnWriteArrayList<>();
+ TestingRocksDBMemoryFactory memoryFactory =
+ new TestingRocksDBMemoryFactory(
+ sharedObjects.add(createdCaches),
+ sharedObjects.add(createdWriteBufferManagers));
List<JobID> jobIDs = new ArrayList<>(NUMBER_OF_JOBS);
try {
- // launch jobs
for (int i = 0; i < NUMBER_OF_JOBS; i++) {
-
jobIDs.add(cluster.getRestClusterClient().submitJob(dag()).get());
+
jobIDs.add(cluster.getRestClusterClient().submitJob(dag(memoryFactory)).get());
}
-
- // wait for init
- Deadline initDeadline = Deadline.fromNow(Duration.ofMinutes(1));
for (JobID jid : jobIDs) {
waitForAllTaskRunning(cluster.getMiniCluster(), jid, false);
- waitForAllMetricsReported(jid, initDeadline);
}
+ Assert.assertEquals(1, createdCaches.size());
Review Comment:
Nice, this seems like a much more reliable way to check the cache directly.
##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryControllerUtils.java:
##########
@@ -168,4 +170,26 @@ static long calculateRocksDBMutableLimit(long bufferSize) {
static boolean validateArenaBlockSize(long arenaBlockSize, long
mutableLimit) {
return arenaBlockSize <= mutableLimit;
}
+
+ /** Factory for Write Buffer Manager and Bock Cache. */
+ public interface RocksDBMemoryFactory extends Serializable {
+ Cache createCache(long cacheCapacity, double highPriorityPoolRatio);
+
+ WriteBufferManager createWriteBufferManager(long
writeBufferManagerCapacity, Cache cache);
+
+ RocksDBMemoryFactory DEFAULT =
+ new RocksDBMemoryFactory() {
+ @Override
+ public Cache createCache(long cacheCapacity, double
highPriorityPoolRatio) {
+ // TODO use strict capacity limit until FLINK-15532
resolved
+ return new LRUCache(cacheCapacity, -1, false,
highPriorityPoolRatio);
+ }
+
+ @Override
+ public WriteBufferManager createWriteBufferManager(
Review Comment:
Any reason not to just call the existing static method
`RocksDBMemoryControllerUtils.createWriteBufferManager`?
##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java:
##########
@@ -298,6 +303,11 @@ private EmbeddedRocksDBStateBackend(
checkArgument(
overlapFractionThreshold >= 0 && this.overlapFractionThreshold
<= 1,
"Overlap fraction threshold of restoring should be between 0
and 1");
+
+ this.rocksDBMemoryFactory =
Review Comment:
If the default is `RocksDBMemoryFactory.DEFAULT`, can it ever be null?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]