This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new de69316 [FLINK-23492][runtime] Harder
testCachedStatsCleanedAfterCleanupInterval
de69316 is described below
commit de6931625ea28a3ab774558faed02cd02060543e
Author: Nicolaus Weidner <[email protected]>
AuthorDate: Thu Aug 5 08:57:17 2021 +0200
[FLINK-23492][runtime] Harder testCachedStatsCleanedAfterCleanupInterval
---
.../threadinfo/JobVertexThreadInfoTracker.java | 14 +-
.../JobVertexThreadInfoTrackerBuilder.java | 36 ++++-
.../threadinfo/JobVertexThreadInfoTrackerTest.java | 148 +++++++++++++++------
3 files changed, 148 insertions(+), 50 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java
index 1f39d05..e195ab3 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java
@@ -35,7 +35,6 @@ import
org.apache.flink.runtime.webmonitor.stats.JobVertexStatsTracker;
import org.apache.flink.runtime.webmonitor.stats.Statistics;
import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
-import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -113,7 +112,8 @@ public class JobVertexThreadInfoTracker<T extends
Statistics> implements JobVert
Duration statsRefreshInterval,
Duration delayBetweenSamples,
int maxStackTraceDepth,
- Time rpcTimeout) {
+ Time rpcTimeout,
+ Cache<Key, T> vertexStatsCache) {
this.coordinator = checkNotNull(coordinator, "Thread info samples
coordinator");
this.resourceManagerGatewayRetriever =
@@ -138,11 +138,7 @@ public class JobVertexThreadInfoTracker<T extends
Statistics> implements JobVert
checkArgument(maxStackTraceDepth > 0, "Max stack trace depth must be
greater than 0");
this.maxThreadInfoDepth = maxStackTraceDepth;
- this.vertexStatsCache =
- CacheBuilder.newBuilder()
- .concurrencyLevel(1)
- .expireAfterAccess(cleanUpInterval.toMillis(),
TimeUnit.MILLISECONDS)
- .build();
+ this.vertexStatsCache = checkNotNull(vertexStatsCache, "Vertex stats
cache");
executor.scheduleWithFixedDelay(
this::cleanUpVertexStatsCache,
@@ -268,7 +264,7 @@ public class JobVertexThreadInfoTracker<T extends
Statistics> implements JobVert
return new Key(jobId, vertex.getJobVertexId());
}
- private static class Key {
+ static class Key {
private final JobID jobId;
private final JobVertexID jobVertexId;
@@ -315,8 +311,8 @@ public class JobVertexThreadInfoTracker<T extends
Statistics> implements JobVert
return;
}
if (threadInfoStats != null) {
- resultAvailableFuture.complete(null);
vertexStatsCache.put(key,
createStatsFn.apply(threadInfoStats));
+ resultAvailableFuture.complete(null);
} else {
LOG.debug(
"Failed to gather a thread info sample for {}",
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerBuilder.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerBuilder.java
index 6ba961d..a0b07ae 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerBuilder.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerBuilder.java
@@ -17,15 +17,23 @@
package org.apache.flink.runtime.webmonitor.threadinfo;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.runtime.webmonitor.stats.Statistics;
+import
org.apache.flink.runtime.webmonitor.threadinfo.JobVertexThreadInfoTracker.Key;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
import java.time.Duration;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.function.Function;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
/**
* Builder for {@link JobVertexThreadInfoTracker}.
*
@@ -44,6 +52,7 @@ public class JobVertexThreadInfoTrackerBuilder<T extends
Statistics> {
private Duration statsRefreshInterval;
private Duration delayBetweenSamples;
private int maxThreadInfoDepth;
+ private Cache<Key, T> vertexStatsCache;
JobVertexThreadInfoTrackerBuilder(
GatewayRetriever<ResourceManagerGateway>
resourceManagerGatewayRetriever,
@@ -128,11 +137,27 @@ public class JobVertexThreadInfoTrackerBuilder<T extends
Statistics> {
}
/**
+ * Sets {@code vertexStatsCache}. This is currently only used for testing.
+ *
+ * @param vertexStatsCache The Cache instance to use for caching
statistics. Will use the
+ * default defined in {@link
JobVertexThreadInfoTrackerBuilder#defaultCache()} if not set.
+ * @return Builder.
+ */
+ @VisibleForTesting
+ JobVertexThreadInfoTrackerBuilder<T> setVertexStatsCache(Cache<Key, T>
vertexStatsCache) {
+ this.vertexStatsCache = vertexStatsCache;
+ return this;
+ }
+
+ /**
* Constructs a new {@link JobVertexThreadInfoTracker}.
*
* @return a new {@link JobVertexThreadInfoTracker} instance.
*/
public JobVertexThreadInfoTracker<T> build() {
+ if (vertexStatsCache == null) {
+ vertexStatsCache = defaultCache();
+ }
return new JobVertexThreadInfoTracker<>(
coordinator,
resourceManagerGatewayRetriever,
@@ -143,7 +168,16 @@ public class JobVertexThreadInfoTrackerBuilder<T extends
Statistics> {
statsRefreshInterval,
delayBetweenSamples,
maxThreadInfoDepth,
- restTimeout);
+ restTimeout,
+ vertexStatsCache);
+ }
+
+ private Cache<Key, T> defaultCache() {
+ checkArgument(cleanUpInterval.toMillis() > 0, "Clean up interval must
be greater than 0");
+ return CacheBuilder.newBuilder()
+ .concurrencyLevel(1)
+ .expireAfterAccess(cleanUpInterval.toMillis(),
TimeUnit.MILLISECONDS)
+ .build();
}
/**
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java
index ba88339..bcffda2 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java
@@ -34,19 +34,29 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.JvmUtils;
import org.apache.flink.util.TestLogger;
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+import org.apache.flink.shaded.guava18.com.google.common.cache.RemovalListener;
+import
org.apache.flink.shaded.guava18.com.google.common.cache.RemovalNotification;
+
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
+import javax.annotation.Nonnull;
+
import java.time.Duration;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
@@ -57,7 +67,6 @@ import java.util.function.Function;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@@ -117,71 +126,88 @@ public class JobVertexThreadInfoTrackerTest extends
TestLogger {
/** Tests that cached result is reused within refresh interval. */
@Test
public void testCachedStatsNotUpdatedWithinRefreshInterval() throws
Exception {
- final int requestId2 = 1;
-
- final JobVertexThreadInfoStats threadInfoStats2 =
- createThreadInfoStats(requestId2, TIME_GAP, null);
+ final JobVertexThreadInfoStats unusedThreadInfoStats =
+ createThreadInfoStats(1, TIME_GAP, null);
final JobVertexThreadInfoTracker<JobVertexThreadInfoStats> tracker =
createThreadInfoTracker(
- CLEAN_UP_INTERVAL,
STATS_REFRESH_INTERVAL,
threadInfoStatsDefaultSample,
- threadInfoStats2);
+ unusedThreadInfoStats);
// stores threadInfoStatsDefaultSample in cache
doInitialRequestAndVerifyResult(tracker);
Optional<JobVertexThreadInfoStats> result =
tracker.getVertexStats(JOB_ID, EXECUTION_JOB_VERTEX);
- // cached result is returned instead of threadInfoStats2
+ // cached result is returned instead of unusedThreadInfoStats
assertEquals(threadInfoStatsDefaultSample, result.get());
}
/** Tests that cached result is NOT reused after refresh interval. */
@Test
public void testCachedStatsUpdatedAfterRefreshInterval() throws Exception {
- final Duration threadInfoStatsRefreshInterval2 = Duration.ofMillis(10);
- final long waitingTime = threadInfoStatsRefreshInterval2.toMillis() +
10;
+ final Duration shortRefreshInterval = Duration.ofMillis(1);
- final int requestId2 = 1;
- final JobVertexThreadInfoStats threadInfoStats2 =
+ // first entry is in the past, so refresh is triggered immediately
upon fetching it
+ final JobVertexThreadInfoStats initialThreadInfoStats =
createThreadInfoStats(
- requestId2, TIME_GAP,
Collections.singletonList(threadInfoSample));
-
+ Instant.now().minus(10, ChronoUnit.SECONDS),
+ REQUEST_ID,
+ Duration.ofMillis(5),
+ Collections.singletonList(threadInfoSample));
+ final JobVertexThreadInfoStats threadInfoStatsAfterRefresh =
+ createThreadInfoStats(1, TIME_GAP,
Collections.singletonList(threadInfoSample));
+
+ // register a CountDownLatch with the cache so we can await refresh of
the entry
+ CountDownLatch cacheRefreshed = new CountDownLatch(1);
+ Cache<JobVertexThreadInfoTracker.Key, JobVertexThreadInfoStats>
vertexStatsCache =
+ createCache(CLEAN_UP_INTERVAL, new
LatchRemovalListener<>(cacheRefreshed));
final JobVertexThreadInfoTracker<JobVertexThreadInfoStats> tracker =
createThreadInfoTracker(
CLEAN_UP_INTERVAL,
- threadInfoStatsRefreshInterval2,
- threadInfoStatsDefaultSample,
- threadInfoStats2);
- doInitialRequestAndVerifyResult(tracker);
+ shortRefreshInterval,
+ vertexStatsCache,
+ initialThreadInfoStats,
+ threadInfoStatsAfterRefresh);
- // ensure that the previous request "expires"
- Thread.sleep(waitingTime);
+ // no stats yet, but the request triggers async collection of stats
+ assertFalse(tracker.getVertexStats(JOB_ID,
EXECUTION_JOB_VERTEX).isPresent());
+ // block until the async call completes and the first result is
available
+ tracker.getResultAvailableFuture().get();
- Optional<JobVertexThreadInfoStats> result =
- tracker.getVertexStats(JOB_ID, EXECUTION_JOB_VERTEX);
+ // retrieve the entry, triggering the refresh as side effect
+ assertExpectedEqualsReceived(
+ initialThreadInfoStats, tracker.getVertexStats(JOB_ID,
EXECUTION_JOB_VERTEX));
- assertExpectedEqualsReceived(threadInfoStats2, result);
+ // wait until the entry is refreshed
+ cacheRefreshed.await();
- assertNotSame(result.get(), threadInfoStatsDefaultSample);
+ // verify that we get the second result on the next request
+ Optional<JobVertexThreadInfoStats> result =
+ tracker.getVertexStats(JOB_ID, EXECUTION_JOB_VERTEX);
+ assertExpectedEqualsReceived(threadInfoStatsAfterRefresh, result);
}
/** Tests that cached results are removed within the cleanup interval. */
@Test
public void testCachedStatsCleanedAfterCleanupInterval() throws Exception {
- final Duration cleanUpInterval2 = Duration.ofMillis(10);
- final long waitingTime = cleanUpInterval2.toMillis() + 10;
+ final Duration shortCleanUpInterval = Duration.ofMillis(1);
+ // register a CountDownLatch with the cache so we can await expiry of
the entry
+ CountDownLatch cacheExpired = new CountDownLatch(1);
+ Cache<JobVertexThreadInfoTracker.Key, JobVertexThreadInfoStats>
vertexStatsCache =
+ createCache(shortCleanUpInterval, new
LatchRemovalListener<>(cacheExpired));
final JobVertexThreadInfoTracker<JobVertexThreadInfoStats> tracker =
createThreadInfoTracker(
- cleanUpInterval2, STATS_REFRESH_INTERVAL,
threadInfoStatsDefaultSample);
- doInitialRequestAndVerifyResult(tracker);
+ shortCleanUpInterval,
+ STATS_REFRESH_INTERVAL,
+ vertexStatsCache,
+ threadInfoStatsDefaultSample);
- // wait until we are ready to cleanup
- Thread.sleep(waitingTime);
+ // no stats yet, but the request triggers async collection of stats
+ assertFalse(tracker.getVertexStats(JOB_ID,
EXECUTION_JOB_VERTEX).isPresent());
+ // wait until one eviction was registered
+ cacheExpired.await();
- // cleanup the cached thread info stats
- tracker.cleanUpVertexStatsCache();
assertFalse(tracker.getVertexStats(JOB_ID,
EXECUTION_JOB_VERTEX).isPresent());
}
@@ -215,9 +241,21 @@ public class JobVertexThreadInfoTrackerTest extends
TestLogger {
assertFalse(tracker.getVertexStats(JOB_ID,
EXECUTION_JOB_VERTEX).isPresent());
}
+ private Cache<JobVertexThreadInfoTracker.Key, JobVertexThreadInfoStats>
createCache(
+ Duration cleanUpInterval,
+ RemovalListener<JobVertexThreadInfoTracker.Key,
JobVertexThreadInfoStats>
+ removalListener) {
+ return CacheBuilder.newBuilder()
+ .concurrencyLevel(1)
+ .expireAfterAccess(cleanUpInterval.toMillis(),
TimeUnit.MILLISECONDS)
+ .removalListener(removalListener)
+ .build();
+ }
+
private void doInitialRequestAndVerifyResult(
JobVertexThreadInfoTracker<JobVertexThreadInfoStats> tracker)
throws InterruptedException, ExecutionException {
+ // no stats yet, but the request triggers async collection of stats
assertFalse(tracker.getVertexStats(JOB_ID,
EXECUTION_JOB_VERTEX).isPresent());
// block until the async call completes and the first result is
available
tracker.getResultAvailableFuture().get();
@@ -242,15 +280,19 @@ public class JobVertexThreadInfoTrackerTest extends
TestLogger {
}
private JobVertexThreadInfoTracker<JobVertexThreadInfoStats>
createThreadInfoTracker() {
- return createThreadInfoTracker(
- CLEAN_UP_INTERVAL, STATS_REFRESH_INTERVAL,
threadInfoStatsDefaultSample);
+ return createThreadInfoTracker(STATS_REFRESH_INTERVAL,
threadInfoStatsDefaultSample);
+ }
+
+ private JobVertexThreadInfoTracker<JobVertexThreadInfoStats>
createThreadInfoTracker(
+ Duration statsRefreshInterval, JobVertexThreadInfoStats... stats) {
+ return createThreadInfoTracker(CLEAN_UP_INTERVAL,
statsRefreshInterval, null, stats);
}
private JobVertexThreadInfoTracker<JobVertexThreadInfoStats>
createThreadInfoTracker(
Duration cleanUpInterval,
- Duration threadInfoStatsRefreshInterval,
+ Duration statsRefreshInterval,
+ Cache<JobVertexThreadInfoTracker.Key, JobVertexThreadInfoStats>
vertexStatsCache,
JobVertexThreadInfoStats... stats) {
-
final ThreadInfoRequestCoordinator coordinator =
new TestingThreadInfoRequestCoordinator(Runnable::run,
REQUEST_TIMEOUT, stats);
@@ -262,16 +304,24 @@ public class JobVertexThreadInfoTrackerTest extends
TestLogger {
.setCoordinator(coordinator)
.setCleanUpInterval(cleanUpInterval)
.setNumSamples(NUMBER_OF_SAMPLES)
- .setStatsRefreshInterval(threadInfoStatsRefreshInterval)
+ .setStatsRefreshInterval(statsRefreshInterval)
.setDelayBetweenSamples(DELAY_BETWEEN_SAMPLES)
.setMaxThreadInfoDepth(MAX_STACK_TRACE_DEPTH)
+ .setVertexStatsCache(vertexStatsCache)
.build();
}
private static JobVertexThreadInfoStats createThreadInfoStats(
int requestId, Duration timeGap, List<ThreadInfoSample>
threadInfoSamples) {
- long startTime = System.currentTimeMillis();
- long endTime = startTime + timeGap.toMillis();
+ return createThreadInfoStats(Instant.now(), requestId, timeGap,
threadInfoSamples);
+ }
+
+ private static JobVertexThreadInfoStats createThreadInfoStats(
+ Instant startTime,
+ int requestId,
+ Duration timeGap,
+ List<ThreadInfoSample> threadInfoSamples) {
+ Instant endTime = startTime.plus(timeGap);
final Map<ExecutionAttemptID, List<ThreadInfoSample>>
threadInfoRatiosByTask =
new HashMap<>();
@@ -281,7 +331,11 @@ public class JobVertexThreadInfoTrackerTest extends
TestLogger {
vertex.getCurrentExecutionAttempt().getAttemptId(),
threadInfoSamples);
}
- return new JobVertexThreadInfoStats(requestId, startTime, endTime,
threadInfoRatiosByTask);
+ return new JobVertexThreadInfoStats(
+ requestId,
+ startTime.toEpochMilli(),
+ endTime.toEpochMilli(),
+ threadInfoRatiosByTask);
}
private static ExecutionJobVertex createExecutionJobVertex() {
@@ -332,4 +386,18 @@ public class JobVertexThreadInfoTrackerTest extends
TestLogger {
jobVertexThreadInfoStats[(counter++) %
jobVertexThreadInfoStats.length]);
}
}
+
+ private static class LatchRemovalListener<K, V> implements
RemovalListener<K, V> {
+
+ private final CountDownLatch latch;
+
+ private LatchRemovalListener(CountDownLatch latch) {
+ this.latch = latch;
+ }
+
+ @Override
+ public void onRemoval(@Nonnull RemovalNotification<K, V>
removalNotification) {
+ latch.countDown();
+ }
+ }
}