dmvk commented on a change in pull request #16654:
URL: https://github.com/apache/flink/pull/16654#discussion_r680881047
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java
##########
@@ -169,20 +168,32 @@ public void testCachedStatsUpdatedAfterRefreshInterval()
throws Exception {
/** Tests that cached results are removed within the cleanup interval. */
@Test
public void testCachedStatsCleanedAfterCleanupInterval() throws Exception {
Review comment:
I'm not convinced that stressing guava internals makes sense here.
Invalidating cache entries manually should do the same job, while being easy to
synchronize.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java
##########
@@ -169,20 +168,32 @@ public void testCachedStatsUpdatedAfterRefreshInterval()
throws Exception {
/** Tests that cached results are removed within the cleanup interval. */
@Test
public void testCachedStatsCleanedAfterCleanupInterval() throws Exception {
- final Duration cleanUpInterval2 = Duration.ofMillis(10);
+ final Duration cleanUpInterval2 = Duration.ofMillis(1);
final long waitingTime = cleanUpInterval2.toMillis() + 10;
+ // use cache recording stats so we can check later that it evicted the
entry
+ Cache<JobVertexThreadInfoTracker.Key, JobVertexThreadInfoStats>
vertexStatsCache =
+ CacheBuilder.newBuilder()
+ .concurrencyLevel(1)
+ .expireAfterAccess(cleanUpInterval2.toMillis(),
TimeUnit.MILLISECONDS)
+ .recordStats()
+ .build();
final JobVertexThreadInfoTracker<JobVertexThreadInfoStats> tracker =
createThreadInfoTracker(
- cleanUpInterval2, STATS_REFRESH_INTERVAL,
threadInfoStatsDefaultSample);
- doInitialRequestAndVerifyResult(tracker);
+ cleanUpInterval2,
+ STATS_REFRESH_INTERVAL,
+ vertexStatsCache,
+ threadInfoStatsDefaultSample);
- // wait until we are ready to cleanup
+ // no stats yet, but the request triggers async collection of stats
+ assertFalse(tracker.getVertexStats(JOB_ID,
EXECUTION_JOB_VERTEX).isPresent());
+ // wait until the result is available
+ tracker.getResultAvailableFuture().get();
+ // wait for the cleanup time plus a little buffer
Thread.sleep(waitingTime);
- // cleanup the cached thread info stats
- tracker.cleanUpVertexStatsCache();
assertFalse(tracker.getVertexStats(JOB_ID,
EXECUTION_JOB_VERTEX).isPresent());
+ assertEquals(1, vertexStatsCache.stats().evictionCount());
Review comment:
This check becomes obsolete with removal listener.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java
##########
@@ -139,7 +139,7 @@ public void
testCachedStatsNotUpdatedWithinRefreshInterval() throws Exception {
/** Tests that cached result is NOT reused after refresh interval. */
@Test
public void testCachedStatsUpdatedAfterRefreshInterval() throws Exception {
Review comment:
This test still seems flaky on my box.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerBuilder.java
##########
@@ -127,12 +135,28 @@
return this;
}
+ /**
+ * Sets {@code vertexStatsCache}.
+ *
+ * @param vertexStatsCache The Cache instance to use for caching
statistics. Will use the
+ * default defined in {@link
JobVertexThreadInfoTrackerBuilder#defaultCache()} if not set.
+ * @return Builder.
+ */
+ public JobVertexThreadInfoTrackerBuilder<T> setVertexStatsCache(
Review comment:
Maybe it'd make sense to make this package private and
`@VisibleForTesting`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]