This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new de69316  [FLINK-23492][runtime] Harder 
testCachedStatsCleanedAfterCleanupInterval
de69316 is described below

commit de6931625ea28a3ab774558faed02cd02060543e
Author: Nicolaus Weidner <[email protected]>
AuthorDate: Thu Aug 5 08:57:17 2021 +0200

    [FLINK-23492][runtime] Harder testCachedStatsCleanedAfterCleanupInterval
---
 .../threadinfo/JobVertexThreadInfoTracker.java     |  14 +-
 .../JobVertexThreadInfoTrackerBuilder.java         |  36 ++++-
 .../threadinfo/JobVertexThreadInfoTrackerTest.java | 148 +++++++++++++++------
 3 files changed, 148 insertions(+), 50 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java
index 1f39d05..e195ab3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java
@@ -35,7 +35,6 @@ import 
org.apache.flink.runtime.webmonitor.stats.JobVertexStatsTracker;
 import org.apache.flink.runtime.webmonitor.stats.Statistics;
 
 import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
-import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -113,7 +112,8 @@ public class JobVertexThreadInfoTracker<T extends 
Statistics> implements JobVert
             Duration statsRefreshInterval,
             Duration delayBetweenSamples,
             int maxStackTraceDepth,
-            Time rpcTimeout) {
+            Time rpcTimeout,
+            Cache<Key, T> vertexStatsCache) {
 
         this.coordinator = checkNotNull(coordinator, "Thread info samples 
coordinator");
         this.resourceManagerGatewayRetriever =
@@ -138,11 +138,7 @@ public class JobVertexThreadInfoTracker<T extends 
Statistics> implements JobVert
         checkArgument(maxStackTraceDepth > 0, "Max stack trace depth must be 
greater than 0");
         this.maxThreadInfoDepth = maxStackTraceDepth;
 
-        this.vertexStatsCache =
-                CacheBuilder.newBuilder()
-                        .concurrencyLevel(1)
-                        .expireAfterAccess(cleanUpInterval.toMillis(), 
TimeUnit.MILLISECONDS)
-                        .build();
+        this.vertexStatsCache = checkNotNull(vertexStatsCache, "Vertex stats 
cache");
 
         executor.scheduleWithFixedDelay(
                 this::cleanUpVertexStatsCache,
@@ -268,7 +264,7 @@ public class JobVertexThreadInfoTracker<T extends 
Statistics> implements JobVert
         return new Key(jobId, vertex.getJobVertexId());
     }
 
-    private static class Key {
+    static class Key {
         private final JobID jobId;
         private final JobVertexID jobVertexId;
 
@@ -315,8 +311,8 @@ public class JobVertexThreadInfoTracker<T extends 
Statistics> implements JobVert
                         return;
                     }
                     if (threadInfoStats != null) {
-                        resultAvailableFuture.complete(null);
                         vertexStatsCache.put(key, 
createStatsFn.apply(threadInfoStats));
+                        resultAvailableFuture.complete(null);
                     } else {
                         LOG.debug(
                                 "Failed to gather a thread info sample for {}",
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerBuilder.java
index 6ba961d..a0b07ae 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerBuilder.java
@@ -17,15 +17,23 @@
 
 package org.apache.flink.runtime.webmonitor.threadinfo;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.runtime.webmonitor.stats.Statistics;
+import 
org.apache.flink.runtime.webmonitor.threadinfo.JobVertexThreadInfoTracker.Key;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
 
 import java.time.Duration;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * Builder for {@link JobVertexThreadInfoTracker}.
  *
@@ -44,6 +52,7 @@ public class JobVertexThreadInfoTrackerBuilder<T extends 
Statistics> {
     private Duration statsRefreshInterval;
     private Duration delayBetweenSamples;
     private int maxThreadInfoDepth;
+    private Cache<Key, T> vertexStatsCache;
 
     JobVertexThreadInfoTrackerBuilder(
             GatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever,
@@ -128,11 +137,27 @@ public class JobVertexThreadInfoTrackerBuilder<T extends 
Statistics> {
     }
 
     /**
+     * Sets {@code vertexStatsCache}. This is currently only used for testing.
+     *
+     * @param vertexStatsCache The Cache instance to use for caching 
statistics. Will use the
+     *     default defined in {@link 
JobVertexThreadInfoTrackerBuilder#defaultCache()} if not set.
+     * @return Builder.
+     */
+    @VisibleForTesting
+    JobVertexThreadInfoTrackerBuilder<T> setVertexStatsCache(Cache<Key, T> 
vertexStatsCache) {
+        this.vertexStatsCache = vertexStatsCache;
+        return this;
+    }
+
+    /**
      * Constructs a new {@link JobVertexThreadInfoTracker}.
      *
      * @return a new {@link JobVertexThreadInfoTracker} instance.
      */
     public JobVertexThreadInfoTracker<T> build() {
+        if (vertexStatsCache == null) {
+            vertexStatsCache = defaultCache();
+        }
         return new JobVertexThreadInfoTracker<>(
                 coordinator,
                 resourceManagerGatewayRetriever,
@@ -143,7 +168,16 @@ public class JobVertexThreadInfoTrackerBuilder<T extends 
Statistics> {
                 statsRefreshInterval,
                 delayBetweenSamples,
                 maxThreadInfoDepth,
-                restTimeout);
+                restTimeout,
+                vertexStatsCache);
+    }
+
+    private Cache<Key, T> defaultCache() {
+        checkArgument(cleanUpInterval.toMillis() > 0, "Clean up interval must 
be greater than 0");
+        return CacheBuilder.newBuilder()
+                .concurrencyLevel(1)
+                .expireAfterAccess(cleanUpInterval.toMillis(), 
TimeUnit.MILLISECONDS)
+                .build();
     }
 
     /**
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java
index ba88339..bcffda2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java
@@ -34,19 +34,29 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.JvmUtils;
 import org.apache.flink.util.TestLogger;
 
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+import org.apache.flink.shaded.guava18.com.google.common.cache.RemovalListener;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.RemovalNotification;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.Timeout;
 
+import javax.annotation.Nonnull;
+
 import java.time.Duration;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
@@ -57,7 +67,6 @@ import java.util.function.Function;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
@@ -117,71 +126,88 @@ public class JobVertexThreadInfoTrackerTest extends 
TestLogger {
     /** Tests that cached result is reused within refresh interval. */
     @Test
     public void testCachedStatsNotUpdatedWithinRefreshInterval() throws 
Exception {
-        final int requestId2 = 1;
-
-        final JobVertexThreadInfoStats threadInfoStats2 =
-                createThreadInfoStats(requestId2, TIME_GAP, null);
+        final JobVertexThreadInfoStats unusedThreadInfoStats =
+                createThreadInfoStats(1, TIME_GAP, null);
 
         final JobVertexThreadInfoTracker<JobVertexThreadInfoStats> tracker =
                 createThreadInfoTracker(
-                        CLEAN_UP_INTERVAL,
                         STATS_REFRESH_INTERVAL,
                         threadInfoStatsDefaultSample,
-                        threadInfoStats2);
+                        unusedThreadInfoStats);
         // stores threadInfoStatsDefaultSample in cache
         doInitialRequestAndVerifyResult(tracker);
         Optional<JobVertexThreadInfoStats> result =
                 tracker.getVertexStats(JOB_ID, EXECUTION_JOB_VERTEX);
-        // cached result is returned instead of threadInfoStats2
+        // cached result is returned instead of unusedThreadInfoStats
         assertEquals(threadInfoStatsDefaultSample, result.get());
     }
 
     /** Tests that cached result is NOT reused after refresh interval. */
     @Test
     public void testCachedStatsUpdatedAfterRefreshInterval() throws Exception {
-        final Duration threadInfoStatsRefreshInterval2 = Duration.ofMillis(10);
-        final long waitingTime = threadInfoStatsRefreshInterval2.toMillis() + 
10;
+        final Duration shortRefreshInterval = Duration.ofMillis(1);
 
-        final int requestId2 = 1;
-        final JobVertexThreadInfoStats threadInfoStats2 =
+        // first entry is in the past, so refresh is triggered immediately 
upon fetching it
+        final JobVertexThreadInfoStats initialThreadInfoStats =
                 createThreadInfoStats(
-                        requestId2, TIME_GAP, 
Collections.singletonList(threadInfoSample));
-
+                        Instant.now().minus(10, ChronoUnit.SECONDS),
+                        REQUEST_ID,
+                        Duration.ofMillis(5),
+                        Collections.singletonList(threadInfoSample));
+        final JobVertexThreadInfoStats threadInfoStatsAfterRefresh =
+                createThreadInfoStats(1, TIME_GAP, 
Collections.singletonList(threadInfoSample));
+
+        // register a CountDownLatch with the cache so we can await refresh of 
the entry
+        CountDownLatch cacheRefreshed = new CountDownLatch(1);
+        Cache<JobVertexThreadInfoTracker.Key, JobVertexThreadInfoStats> 
vertexStatsCache =
+                createCache(CLEAN_UP_INTERVAL, new 
LatchRemovalListener<>(cacheRefreshed));
         final JobVertexThreadInfoTracker<JobVertexThreadInfoStats> tracker =
                 createThreadInfoTracker(
                         CLEAN_UP_INTERVAL,
-                        threadInfoStatsRefreshInterval2,
-                        threadInfoStatsDefaultSample,
-                        threadInfoStats2);
-        doInitialRequestAndVerifyResult(tracker);
+                        shortRefreshInterval,
+                        vertexStatsCache,
+                        initialThreadInfoStats,
+                        threadInfoStatsAfterRefresh);
 
-        // ensure that the previous request "expires"
-        Thread.sleep(waitingTime);
+        // no stats yet, but the request triggers async collection of stats
+        assertFalse(tracker.getVertexStats(JOB_ID, 
EXECUTION_JOB_VERTEX).isPresent());
+        // block until the async call completes and the first result is 
available
+        tracker.getResultAvailableFuture().get();
 
-        Optional<JobVertexThreadInfoStats> result =
-                tracker.getVertexStats(JOB_ID, EXECUTION_JOB_VERTEX);
+        // retrieve the entry, triggering the refresh as side effect
+        assertExpectedEqualsReceived(
+                initialThreadInfoStats, tracker.getVertexStats(JOB_ID, 
EXECUTION_JOB_VERTEX));
 
-        assertExpectedEqualsReceived(threadInfoStats2, result);
+        // wait until the entry is refreshed
+        cacheRefreshed.await();
 
-        assertNotSame(result.get(), threadInfoStatsDefaultSample);
+        // verify that we get the second result on the next request
+        Optional<JobVertexThreadInfoStats> result =
+                tracker.getVertexStats(JOB_ID, EXECUTION_JOB_VERTEX);
+        assertExpectedEqualsReceived(threadInfoStatsAfterRefresh, result);
     }
 
     /** Tests that cached results are removed within the cleanup interval. */
     @Test
     public void testCachedStatsCleanedAfterCleanupInterval() throws Exception {
-        final Duration cleanUpInterval2 = Duration.ofMillis(10);
-        final long waitingTime = cleanUpInterval2.toMillis() + 10;
+        final Duration shortCleanUpInterval = Duration.ofMillis(1);
 
+        // register a CountDownLatch with the cache so we can await expiry of 
the entry
+        CountDownLatch cacheExpired = new CountDownLatch(1);
+        Cache<JobVertexThreadInfoTracker.Key, JobVertexThreadInfoStats> 
vertexStatsCache =
+                createCache(shortCleanUpInterval, new 
LatchRemovalListener<>(cacheExpired));
         final JobVertexThreadInfoTracker<JobVertexThreadInfoStats> tracker =
                 createThreadInfoTracker(
-                        cleanUpInterval2, STATS_REFRESH_INTERVAL, 
threadInfoStatsDefaultSample);
-        doInitialRequestAndVerifyResult(tracker);
+                        shortCleanUpInterval,
+                        STATS_REFRESH_INTERVAL,
+                        vertexStatsCache,
+                        threadInfoStatsDefaultSample);
 
-        // wait until we are ready to cleanup
-        Thread.sleep(waitingTime);
+        // no stats yet, but the request triggers async collection of stats
+        assertFalse(tracker.getVertexStats(JOB_ID, 
EXECUTION_JOB_VERTEX).isPresent());
+        // wait until one eviction was registered
+        cacheExpired.await();
 
-        // cleanup the cached thread info stats
-        tracker.cleanUpVertexStatsCache();
         assertFalse(tracker.getVertexStats(JOB_ID, 
EXECUTION_JOB_VERTEX).isPresent());
     }
 
@@ -215,9 +241,21 @@ public class JobVertexThreadInfoTrackerTest extends 
TestLogger {
         assertFalse(tracker.getVertexStats(JOB_ID, 
EXECUTION_JOB_VERTEX).isPresent());
     }
 
+    private Cache<JobVertexThreadInfoTracker.Key, JobVertexThreadInfoStats> 
createCache(
+            Duration cleanUpInterval,
+            RemovalListener<JobVertexThreadInfoTracker.Key, 
JobVertexThreadInfoStats>
+                    removalListener) {
+        return CacheBuilder.newBuilder()
+                .concurrencyLevel(1)
+                .expireAfterAccess(cleanUpInterval.toMillis(), 
TimeUnit.MILLISECONDS)
+                .removalListener(removalListener)
+                .build();
+    }
+
     private void doInitialRequestAndVerifyResult(
             JobVertexThreadInfoTracker<JobVertexThreadInfoStats> tracker)
             throws InterruptedException, ExecutionException {
+        // no stats yet, but the request triggers async collection of stats
         assertFalse(tracker.getVertexStats(JOB_ID, 
EXECUTION_JOB_VERTEX).isPresent());
         // block until the async call completes and the first result is 
available
         tracker.getResultAvailableFuture().get();
@@ -242,15 +280,19 @@ public class JobVertexThreadInfoTrackerTest extends 
TestLogger {
     }
 
     private JobVertexThreadInfoTracker<JobVertexThreadInfoStats> 
createThreadInfoTracker() {
-        return createThreadInfoTracker(
-                CLEAN_UP_INTERVAL, STATS_REFRESH_INTERVAL, 
threadInfoStatsDefaultSample);
+        return createThreadInfoTracker(STATS_REFRESH_INTERVAL, 
threadInfoStatsDefaultSample);
+    }
+
+    private JobVertexThreadInfoTracker<JobVertexThreadInfoStats> 
createThreadInfoTracker(
+            Duration statsRefreshInterval, JobVertexThreadInfoStats... stats) {
+        return createThreadInfoTracker(CLEAN_UP_INTERVAL, 
statsRefreshInterval, null, stats);
     }
 
     private JobVertexThreadInfoTracker<JobVertexThreadInfoStats> 
createThreadInfoTracker(
             Duration cleanUpInterval,
-            Duration threadInfoStatsRefreshInterval,
+            Duration statsRefreshInterval,
+            Cache<JobVertexThreadInfoTracker.Key, JobVertexThreadInfoStats> 
vertexStatsCache,
             JobVertexThreadInfoStats... stats) {
-
         final ThreadInfoRequestCoordinator coordinator =
                 new TestingThreadInfoRequestCoordinator(Runnable::run, 
REQUEST_TIMEOUT, stats);
 
@@ -262,16 +304,24 @@ public class JobVertexThreadInfoTrackerTest extends 
TestLogger {
                 .setCoordinator(coordinator)
                 .setCleanUpInterval(cleanUpInterval)
                 .setNumSamples(NUMBER_OF_SAMPLES)
-                .setStatsRefreshInterval(threadInfoStatsRefreshInterval)
+                .setStatsRefreshInterval(statsRefreshInterval)
                 .setDelayBetweenSamples(DELAY_BETWEEN_SAMPLES)
                 .setMaxThreadInfoDepth(MAX_STACK_TRACE_DEPTH)
+                .setVertexStatsCache(vertexStatsCache)
                 .build();
     }
 
     private static JobVertexThreadInfoStats createThreadInfoStats(
             int requestId, Duration timeGap, List<ThreadInfoSample> 
threadInfoSamples) {
-        long startTime = System.currentTimeMillis();
-        long endTime = startTime + timeGap.toMillis();
+        return createThreadInfoStats(Instant.now(), requestId, timeGap, 
threadInfoSamples);
+    }
+
+    private static JobVertexThreadInfoStats createThreadInfoStats(
+            Instant startTime,
+            int requestId,
+            Duration timeGap,
+            List<ThreadInfoSample> threadInfoSamples) {
+        Instant endTime = startTime.plus(timeGap);
 
         final Map<ExecutionAttemptID, List<ThreadInfoSample>> 
threadInfoRatiosByTask =
                 new HashMap<>();
@@ -281,7 +331,11 @@ public class JobVertexThreadInfoTrackerTest extends 
TestLogger {
                     vertex.getCurrentExecutionAttempt().getAttemptId(), 
threadInfoSamples);
         }
 
-        return new JobVertexThreadInfoStats(requestId, startTime, endTime, 
threadInfoRatiosByTask);
+        return new JobVertexThreadInfoStats(
+                requestId,
+                startTime.toEpochMilli(),
+                endTime.toEpochMilli(),
+                threadInfoRatiosByTask);
     }
 
     private static ExecutionJobVertex createExecutionJobVertex() {
@@ -332,4 +386,18 @@ public class JobVertexThreadInfoTrackerTest extends 
TestLogger {
                     jobVertexThreadInfoStats[(counter++) % 
jobVertexThreadInfoStats.length]);
         }
     }
+
+    private static class LatchRemovalListener<K, V> implements 
RemovalListener<K, V> {
+
+        private final CountDownLatch latch;
+
+        private LatchRemovalListener(CountDownLatch latch) {
+            this.latch = latch;
+        }
+
+        @Override
+        public void onRemoval(@Nonnull RemovalNotification<K, V> 
removalNotification) {
+            latch.countDown();
+        }
+    }
 }

Reply via email to