AlanConfluent commented on code in PR #21733:
URL: https://github.com/apache/flink/pull/21733#discussion_r1084601474
##########
flink-tests/src/test/java/org/apache/flink/test/state/TaskManagerWideRocksDbMemorySharingITCase.java:
##########
@@ -125,36 +129,45 @@ public void testBlockCache() throws Exception {
0));
// do some work and check the actual usage of memory
- for (int i = 0; i < 10; i++) {
+ double[] deviations = new double[NUM_MEASUREMENTS];
+ for (int i = 0; i < NUM_MEASUREMENTS; i++) {
Thread.sleep(50L);
- DoubleSummaryStatistics stats =
+ double[] blockCacheUsages =
collectGaugeValues(jobIDs, "rocksdb.block-cache-usage")
-
.collect(Collectors.summarizingDouble((Double::doubleValue)));
- assertEquals(
- String.format(
- "Block cache usage reported by different tasks
varies too much: %s\n"
- + "That likely mean that they use
different cache objects",
- stats),
- stats.getMax(),
- stats.getMin(),
- // some deviation is possible because:
- // 1. records are being processed in parallel with
requesting metrics
- // 2. reporting metrics is not synchronized
- 500_000d);
+ .mapToDouble(value -> value)
+ .toArray();
assertTrue(
String.format(
"total block cache usage is too high: %s
(limit: %s, effective limit: %s)",
- stats, EXPECTED_BLOCK_CACHE_SIZE,
EFFECTIVE_LIMIT),
- stats.getMax() <= EFFECTIVE_LIMIT);
+ Arrays.toString(blockCacheUsages),
+ EXPECTED_BLOCK_CACHE_SIZE,
+ EFFECTIVE_LIMIT),
+ Arrays.stream(blockCacheUsages).max().getAsDouble() <=
EFFECTIVE_LIMIT);
+ deviations[i] = new
StandardDeviation().evaluate(blockCacheUsages);
}
-
+ validateDeviations(deviations);
} finally {
for (JobID jobID : jobIDs) {
cluster.getRestClusterClient().cancel(jobID).get();
}
}
}
+ private static void validateDeviations(double[] deviations) {
+ DescriptiveStatisticsHistogramStatistics percentile =
+ new DescriptiveStatisticsHistogramStatistics(deviations);
+ assertTrue(
+ String.format(
+ "Block cache usage reported by different tasks varies
too much: %s\n"
+ + "That likely mean that they use different
cache objects",
+ Arrays.toString(deviations)),
+ // some deviation is possible because:
+ // 1. records are being processed in parallel with requesting
metrics
+ // 2. reporting metrics is not synchronized
+ percentile.getQuantile(.50d) <= 10_000d
Review Comment:
These empirical values are hard to know how they're calibrated. It would be
nice to know that this fails when the cache is not shared.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]