zentol commented on a change in pull request #16654:
URL: https://github.com/apache/flink/pull/16654#discussion_r681645843
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java
##########
@@ -117,71 +126,88 @@ public void testGetThreadInfoStats() throws Exception {
/** Tests that cached result is reused within refresh interval. */
@Test
public void testCachedStatsNotUpdatedWithinRefreshInterval() throws
Exception {
- final int requestId2 = 1;
-
- final JobVertexThreadInfoStats threadInfoStats2 =
- createThreadInfoStats(requestId2, TIME_GAP, null);
+ final JobVertexThreadInfoStats unusedThreadInfoStats =
+ createThreadInfoStats(1, TIME_GAP, null);
final JobVertexThreadInfoTracker<JobVertexThreadInfoStats> tracker =
createThreadInfoTracker(
- CLEAN_UP_INTERVAL,
STATS_REFRESH_INTERVAL,
threadInfoStatsDefaultSample,
- threadInfoStats2);
+ unusedThreadInfoStats);
// stores threadInfoStatsDefaultSample in cache
doInitialRequestAndVerifyResult(tracker);
Optional<JobVertexThreadInfoStats> result =
tracker.getVertexStats(JOB_ID, EXECUTION_JOB_VERTEX);
- // cached result is returned instead of threadInfoStats2
+ // cached result is returned instead of unusedThreadInfoStats
assertEquals(threadInfoStatsDefaultSample, result.get());
}
/** Tests that cached result is NOT reused after refresh interval. */
@Test
public void testCachedStatsUpdatedAfterRefreshInterval() throws Exception {
- final Duration threadInfoStatsRefreshInterval2 = Duration.ofMillis(10);
- final long waitingTime = threadInfoStatsRefreshInterval2.toMillis() +
10;
+ final Duration shortRefreshInterval = Duration.ofMillis(1);
- final int requestId2 = 1;
- final JobVertexThreadInfoStats threadInfoStats2 =
+ // first entry is in the past, so refresh is triggered immediately
upon fetching it
+ final JobVertexThreadInfoStats initialThreadInfoStats =
createThreadInfoStats(
- requestId2, TIME_GAP,
Collections.singletonList(threadInfoSample));
-
+ Instant.now().minus(10, ChronoUnit.SECONDS),
+ REQUEST_ID,
+ Duration.ofMillis(5),
+ Collections.singletonList(threadInfoSample));
+ final JobVertexThreadInfoStats threadInfoStatsAfterRefresh =
+ createThreadInfoStats(1, TIME_GAP,
Collections.singletonList(threadInfoSample));
+
+ // register a CountDownLatch with the cache so we can await refresh of
the entry
+ CountDownLatch cacheRefreshed = new CountDownLatch(1);
+ Cache<JobVertexThreadInfoTracker.Key, JobVertexThreadInfoStats>
vertexStatsCache =
+ createCache(CLEAN_UP_INTERVAL, new
LatchRemovalListener<>(cacheRefreshed));
final JobVertexThreadInfoTracker<JobVertexThreadInfoStats> tracker =
createThreadInfoTracker(
CLEAN_UP_INTERVAL,
- threadInfoStatsRefreshInterval2,
- threadInfoStatsDefaultSample,
- threadInfoStats2);
- doInitialRequestAndVerifyResult(tracker);
+ shortRefreshInterval,
+ vertexStatsCache,
+ initialThreadInfoStats,
+ threadInfoStatsAfterRefresh);
- // ensure that the previous request "expires"
- Thread.sleep(waitingTime);
+ // no stats yet, but the request triggers async collection of stats
+ assertFalse(tracker.getVertexStats(JOB_ID,
EXECUTION_JOB_VERTEX).isPresent());
+ // block until the async call completes and the first result is
available
+ tracker.getResultAvailableFuture().get();
- Optional<JobVertexThreadInfoStats> result =
- tracker.getVertexStats(JOB_ID, EXECUTION_JOB_VERTEX);
+ // retrieve the entry, triggering the refresh as side effect
+ assertExpectedEqualsReceived(
+ initialThreadInfoStats, tracker.getVertexStats(JOB_ID,
EXECUTION_JOB_VERTEX));
- assertExpectedEqualsReceived(threadInfoStats2, result);
+ // wait until the entry is refreshed, with generous buffer
+ assertTrue(cacheRefreshed.await(500, TimeUnit.MILLISECONDS));
Review comment:
```suggestion
assertTrue(cacheRefreshed.await());
```
Based on recent discussions we no longer use timeouts in tests.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java
##########
@@ -117,71 +126,88 @@ public void testGetThreadInfoStats() throws Exception {
/** Tests that cached result is reused within refresh interval. */
@Test
public void testCachedStatsNotUpdatedWithinRefreshInterval() throws
Exception {
- final int requestId2 = 1;
-
- final JobVertexThreadInfoStats threadInfoStats2 =
- createThreadInfoStats(requestId2, TIME_GAP, null);
+ final JobVertexThreadInfoStats unusedThreadInfoStats =
+ createThreadInfoStats(1, TIME_GAP, null);
final JobVertexThreadInfoTracker<JobVertexThreadInfoStats> tracker =
createThreadInfoTracker(
- CLEAN_UP_INTERVAL,
STATS_REFRESH_INTERVAL,
threadInfoStatsDefaultSample,
- threadInfoStats2);
+ unusedThreadInfoStats);
// stores threadInfoStatsDefaultSample in cache
doInitialRequestAndVerifyResult(tracker);
Optional<JobVertexThreadInfoStats> result =
tracker.getVertexStats(JOB_ID, EXECUTION_JOB_VERTEX);
- // cached result is returned instead of threadInfoStats2
+ // cached result is returned instead of unusedThreadInfoStats
assertEquals(threadInfoStatsDefaultSample, result.get());
}
/** Tests that cached result is NOT reused after refresh interval. */
@Test
public void testCachedStatsUpdatedAfterRefreshInterval() throws Exception {
- final Duration threadInfoStatsRefreshInterval2 = Duration.ofMillis(10);
- final long waitingTime = threadInfoStatsRefreshInterval2.toMillis() +
10;
+ final Duration shortRefreshInterval = Duration.ofMillis(1);
- final int requestId2 = 1;
- final JobVertexThreadInfoStats threadInfoStats2 =
+ // first entry is in the past, so refresh is triggered immediately
upon fetching it
+ final JobVertexThreadInfoStats initialThreadInfoStats =
createThreadInfoStats(
- requestId2, TIME_GAP,
Collections.singletonList(threadInfoSample));
-
+ Instant.now().minus(10, ChronoUnit.SECONDS),
+ REQUEST_ID,
+ Duration.ofMillis(5),
+ Collections.singletonList(threadInfoSample));
+ final JobVertexThreadInfoStats threadInfoStatsAfterRefresh =
+ createThreadInfoStats(1, TIME_GAP,
Collections.singletonList(threadInfoSample));
+
+ // register a CountDownLatch with the cache so we can await refresh of
the entry
+ CountDownLatch cacheRefreshed = new CountDownLatch(1);
+ Cache<JobVertexThreadInfoTracker.Key, JobVertexThreadInfoStats>
vertexStatsCache =
+ createCache(CLEAN_UP_INTERVAL, new
LatchRemovalListener<>(cacheRefreshed));
final JobVertexThreadInfoTracker<JobVertexThreadInfoStats> tracker =
createThreadInfoTracker(
CLEAN_UP_INTERVAL,
- threadInfoStatsRefreshInterval2,
- threadInfoStatsDefaultSample,
- threadInfoStats2);
- doInitialRequestAndVerifyResult(tracker);
+ shortRefreshInterval,
+ vertexStatsCache,
+ initialThreadInfoStats,
+ threadInfoStatsAfterRefresh);
- // ensure that the previous request "expires"
- Thread.sleep(waitingTime);
+ // no stats yet, but the request triggers async collection of stats
+ assertFalse(tracker.getVertexStats(JOB_ID,
EXECUTION_JOB_VERTEX).isPresent());
+ // block until the async call completes and the first result is
available
+ tracker.getResultAvailableFuture().get();
- Optional<JobVertexThreadInfoStats> result =
- tracker.getVertexStats(JOB_ID, EXECUTION_JOB_VERTEX);
+ // retrieve the entry, triggering the refresh as side effect
+ assertExpectedEqualsReceived(
+ initialThreadInfoStats, tracker.getVertexStats(JOB_ID,
EXECUTION_JOB_VERTEX));
- assertExpectedEqualsReceived(threadInfoStats2, result);
+ // wait until the entry is refreshed, with generous buffer
+ assertTrue(cacheRefreshed.await(500, TimeUnit.MILLISECONDS));
- assertNotSame(result.get(), threadInfoStatsDefaultSample);
+ // verify that we get the second result on the next request
+ Optional<JobVertexThreadInfoStats> result =
+ tracker.getVertexStats(JOB_ID, EXECUTION_JOB_VERTEX);
+ assertExpectedEqualsReceived(threadInfoStatsAfterRefresh, result);
}
/** Tests that cached results are removed within the cleanup interval. */
@Test
public void testCachedStatsCleanedAfterCleanupInterval() throws Exception {
- final Duration cleanUpInterval2 = Duration.ofMillis(10);
- final long waitingTime = cleanUpInterval2.toMillis() + 10;
+ final Duration shortCleanUpInterval = Duration.ofMillis(1);
+ // register a CountDownLatch with the cache so we can await expiry of
the entry
+ CountDownLatch cacheExpired = new CountDownLatch(1);
+ Cache<JobVertexThreadInfoTracker.Key, JobVertexThreadInfoStats>
vertexStatsCache =
+ createCache(shortCleanUpInterval, new
LatchRemovalListener<>(cacheExpired));
final JobVertexThreadInfoTracker<JobVertexThreadInfoStats> tracker =
createThreadInfoTracker(
- cleanUpInterval2, STATS_REFRESH_INTERVAL,
threadInfoStatsDefaultSample);
- doInitialRequestAndVerifyResult(tracker);
+ shortCleanUpInterval,
+ STATS_REFRESH_INTERVAL,
+ vertexStatsCache,
+ threadInfoStatsDefaultSample);
- // wait until we are ready to cleanup
- Thread.sleep(waitingTime);
+ // no stats yet, but the request triggers async collection of stats
+ assertFalse(tracker.getVertexStats(JOB_ID,
EXECUTION_JOB_VERTEX).isPresent());
+ // wait until one eviction was registered, with generous buffer
+ assertTrue(cacheExpired.await(1000, TimeUnit.MILLISECONDS));
Review comment:
```suggestion
assertTrue(cacheExpired.await());
```
see above
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]