This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 851420014584fdfd91308937cd913917c11cf4bd
Author: Till Rohrmann <trohrm...@apache.org>
AuthorDate: Thu Apr 23 10:53:27 2020 +0200

    [FLINK-17308] Add regular cleanup task for ExecutionGraphCache
    
    The WebMonitorEndpoint now schedules are regular cleanup task which
    runs every 2 * WebOptions.REFRESH_INTERVAL and tries to clean up
    expired ExecutionGraphCache entries. This ensures that we will remove
    unused entries.
    
    This closes #11879.
---
 .../runtime/webmonitor/WebMonitorEndpoint.java     |  21 ++++
 .../webmonitor/TestingExecutionGraphCache.java     | 113 +++++++++++++++++++++
 .../runtime/webmonitor/WebMonitorEndpointTest.java |  79 ++++++++++++++
 3 files changed, 213 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 7ccf12e..a41926b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -136,6 +136,8 @@ import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
 
+import javax.annotation.Nullable;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -146,6 +148,7 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -175,6 +178,9 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
 
        private final Collection<JsonArchivist> archivingHandlers = new 
ArrayList<>(16);
 
+       @Nullable
+       private ScheduledFuture<?> executionGraphCleanupTask;
+
        public WebMonitorEndpoint(
                        RestServerEndpointConfiguration endpointConfiguration,
                        GatewayRetriever<? extends T> leaderRetriever,
@@ -725,13 +731,28 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
        @Override
        public void startInternal() throws Exception {
                leaderElectionService.start(this);
+               startExecutionGraphCacheCleanupTask();
+
                if (hasWebUI) {
                        log.info("Web frontend listening at {}.", 
getRestBaseUrl());
                }
        }
 
+       private void startExecutionGraphCacheCleanupTask() {
+               final long cleanupInterval = 2 * 
restConfiguration.getRefreshInterval();
+               executionGraphCleanupTask = executor.scheduleWithFixedDelay(
+                       executionGraphCache::cleanup,
+                       cleanupInterval,
+                       cleanupInterval,
+                       TimeUnit.MILLISECONDS);
+       }
+
        @Override
        protected CompletableFuture<Void> shutDownInternal() {
+               if (executionGraphCleanupTask != null) {
+                       executionGraphCleanupTask.cancel(false);
+               }
+
                executionGraphCache.close();
 
                final CompletableFuture<Void> shutdownFuture = 
FutureUtils.runAfterwards(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingExecutionGraphCache.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingExecutionGraphCache.java
new file mode 100644
index 0000000..a61bdc3
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingExecutionGraphCache.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
+import java.util.function.IntSupplier;
+
+/**
+ * Testing implementation of {@link ExecutionGraphCache}.
+ */
+public class TestingExecutionGraphCache implements ExecutionGraphCache {
+       private final IntSupplier sizeSupplier;
+
+       private final BiFunction<JobID, RestfulGateway, 
CompletableFuture<AccessExecutionGraph>> getExecutionGraphFunction;
+
+       private final Runnable cleanupRunnable;
+
+       private final Runnable closeRunnable;
+
+       private TestingExecutionGraphCache(
+                       IntSupplier sizeSupplier,
+                       BiFunction<JobID, RestfulGateway, 
CompletableFuture<AccessExecutionGraph>> getExecutionGraphFunction,
+                       Runnable cleanupRunnable,
+                       Runnable closeRunnable) {
+               this.sizeSupplier = sizeSupplier;
+               this.getExecutionGraphFunction = getExecutionGraphFunction;
+               this.cleanupRunnable = cleanupRunnable;
+               this.closeRunnable = closeRunnable;
+       }
+
+       @Override
+       public int size() {
+               return sizeSupplier.getAsInt();
+       }
+
+       @Override
+       public CompletableFuture<AccessExecutionGraph> getExecutionGraph(JobID 
jobId, RestfulGateway restfulGateway) {
+               return getExecutionGraphFunction.apply(jobId, restfulGateway);
+       }
+
+       @Override
+       public void cleanup() {
+               cleanupRunnable.run();
+       }
+
+       @Override
+       public void close() {
+               closeRunnable.run();
+       }
+
+       public static Builder newBuilder() {
+               return new Builder();
+       }
+
+       /**
+        * Builder for the {@link TestingExecutionGraphCache}.
+        */
+       public static final class Builder {
+
+               private IntSupplier sizeSupplier = () -> 0;
+               private BiFunction<JobID, RestfulGateway, 
CompletableFuture<AccessExecutionGraph>> getExecutionGraphFunction = (ignoredA, 
ignoredB) -> FutureUtils.completedExceptionally(new 
UnsupportedOperationException());
+               private Runnable cleanupRunnable = () -> {};
+               private Runnable closeRunnable = () -> {};
+
+               private Builder() {}
+
+               public Builder setSizeSupplier(IntSupplier sizeSupplier) {
+                       this.sizeSupplier = sizeSupplier;
+                       return this;
+               }
+
+               public Builder setGetExecutionGraphFunction(BiFunction<JobID, 
RestfulGateway, CompletableFuture<AccessExecutionGraph>> 
getExecutionGraphFunction) {
+                       this.getExecutionGraphFunction = 
getExecutionGraphFunction;
+                       return this;
+               }
+
+               public Builder setCleanupRunnable(Runnable cleanupRunnable) {
+                       this.cleanupRunnable = cleanupRunnable;
+                       return this;
+               }
+
+               public Builder setCloseRunnable(Runnable closeRunnable) {
+                       this.closeRunnable = closeRunnable;
+                       return this;
+               }
+
+               public TestingExecutionGraphCache build() {
+                       return new TestingExecutionGraphCache(sizeSupplier, 
getExecutionGraphFunction, cleanupRunnable, closeRunnable);
+               }
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpointTest.java
new file mode 100644
index 0000000..e03138e
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpointTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.blob.NoOpTransientBlobService;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.VoidMetricFetcher;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.ExecutorUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Tests for the {@link WebMonitorEndpoint}.
+ */
+public class WebMonitorEndpointTest extends TestLogger {
+
+       @Test
+       public void cleansUpExpiredExecutionGraphs() throws Exception {
+               final Configuration configuration = new Configuration();
+               configuration.setString(RestOptions.ADDRESS, "localhost");
+               configuration.setLong(WebOptions.REFRESH_INTERVAL, 5L);
+               final ScheduledExecutorService executor = 
Executors.newScheduledThreadPool(1);
+               final long timeout = 10000L;
+
+               final OneShotLatch cleanupLatch = new OneShotLatch();
+               final TestingExecutionGraphCache executionGraphCache = 
TestingExecutionGraphCache.newBuilder()
+                       .setCleanupRunnable(cleanupLatch::trigger)
+                       .build();
+               try (final WebMonitorEndpoint<RestfulGateway> 
webMonitorEndpoint = new WebMonitorEndpoint<>(
+                       
RestServerEndpointConfiguration.fromConfiguration(configuration),
+                       CompletableFuture::new,
+                       configuration,
+                       
RestHandlerConfiguration.fromConfiguration(configuration),
+                       CompletableFuture::new,
+                       NoOpTransientBlobService.INSTANCE,
+                       executor,
+                       VoidMetricFetcher.INSTANCE,
+                       new TestingLeaderElectionService(),
+                       executionGraphCache,
+                       new TestingFatalErrorHandler())) {
+
+                       webMonitorEndpoint.start();
+
+                       // check that the cleanup will be triggered
+                       cleanupLatch.await(timeout, TimeUnit.MILLISECONDS);
+               } finally {
+                       ExecutorUtils.gracefulShutdown(timeout, 
TimeUnit.MILLISECONDS, executor);
+               }
+       }
+}

Reply via email to