This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 851420014584fdfd91308937cd913917c11cf4bd Author: Till Rohrmann <trohrm...@apache.org> AuthorDate: Thu Apr 23 10:53:27 2020 +0200 [FLINK-17308] Add regular cleanup task for ExecutionGraphCache The WebMonitorEndpoint now schedules are regular cleanup task which runs every 2 * WebOptions.REFRESH_INTERVAL and tries to clean up expired ExecutionGraphCache entries. This ensures that we will remove unused entries. This closes #11879. --- .../runtime/webmonitor/WebMonitorEndpoint.java | 21 ++++ .../webmonitor/TestingExecutionGraphCache.java | 113 +++++++++++++++++++++ .../runtime/webmonitor/WebMonitorEndpointTest.java | 79 ++++++++++++++ 3 files changed, 213 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java index 7ccf12e..a41926b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java @@ -136,6 +136,8 @@ import org.apache.flink.util.Preconditions; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; +import javax.annotation.Nullable; + import java.io.File; import java.io.IOException; import java.util.ArrayList; @@ -146,6 +148,7 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; /** @@ -175,6 +178,9 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp private final Collection<JsonArchivist> archivingHandlers = new ArrayList<>(16); + @Nullable + private ScheduledFuture<?> executionGraphCleanupTask; + public WebMonitorEndpoint( RestServerEndpointConfiguration endpointConfiguration, GatewayRetriever<? extends T> leaderRetriever, @@ -725,13 +731,28 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp @Override public void startInternal() throws Exception { leaderElectionService.start(this); + startExecutionGraphCacheCleanupTask(); + if (hasWebUI) { log.info("Web frontend listening at {}.", getRestBaseUrl()); } } + private void startExecutionGraphCacheCleanupTask() { + final long cleanupInterval = 2 * restConfiguration.getRefreshInterval(); + executionGraphCleanupTask = executor.scheduleWithFixedDelay( + executionGraphCache::cleanup, + cleanupInterval, + cleanupInterval, + TimeUnit.MILLISECONDS); + } + @Override protected CompletableFuture<Void> shutDownInternal() { + if (executionGraphCleanupTask != null) { + executionGraphCleanupTask.cancel(false); + } + executionGraphCache.close(); final CompletableFuture<Void> shutdownFuture = FutureUtils.runAfterwards( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingExecutionGraphCache.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingExecutionGraphCache.java new file mode 100644 index 0000000..a61bdc3 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingExecutionGraphCache.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; + +import java.util.concurrent.CompletableFuture; +import java.util.function.BiFunction; +import java.util.function.IntSupplier; + +/** + * Testing implementation of {@link ExecutionGraphCache}. + */ +public class TestingExecutionGraphCache implements ExecutionGraphCache { + private final IntSupplier sizeSupplier; + + private final BiFunction<JobID, RestfulGateway, CompletableFuture<AccessExecutionGraph>> getExecutionGraphFunction; + + private final Runnable cleanupRunnable; + + private final Runnable closeRunnable; + + private TestingExecutionGraphCache( + IntSupplier sizeSupplier, + BiFunction<JobID, RestfulGateway, CompletableFuture<AccessExecutionGraph>> getExecutionGraphFunction, + Runnable cleanupRunnable, + Runnable closeRunnable) { + this.sizeSupplier = sizeSupplier; + this.getExecutionGraphFunction = getExecutionGraphFunction; + this.cleanupRunnable = cleanupRunnable; + this.closeRunnable = closeRunnable; + } + + @Override + public int size() { + return sizeSupplier.getAsInt(); + } + + @Override + public CompletableFuture<AccessExecutionGraph> getExecutionGraph(JobID jobId, RestfulGateway restfulGateway) { + return getExecutionGraphFunction.apply(jobId, restfulGateway); + } + + @Override + public void cleanup() { + cleanupRunnable.run(); + } + + @Override + public void close() { + closeRunnable.run(); + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Builder for the {@link TestingExecutionGraphCache}. + */ + public static final class Builder { + + private IntSupplier sizeSupplier = () -> 0; + private BiFunction<JobID, RestfulGateway, CompletableFuture<AccessExecutionGraph>> getExecutionGraphFunction = (ignoredA, ignoredB) -> FutureUtils.completedExceptionally(new UnsupportedOperationException()); + private Runnable cleanupRunnable = () -> {}; + private Runnable closeRunnable = () -> {}; + + private Builder() {} + + public Builder setSizeSupplier(IntSupplier sizeSupplier) { + this.sizeSupplier = sizeSupplier; + return this; + } + + public Builder setGetExecutionGraphFunction(BiFunction<JobID, RestfulGateway, CompletableFuture<AccessExecutionGraph>> getExecutionGraphFunction) { + this.getExecutionGraphFunction = getExecutionGraphFunction; + return this; + } + + public Builder setCleanupRunnable(Runnable cleanupRunnable) { + this.cleanupRunnable = cleanupRunnable; + return this; + } + + public Builder setCloseRunnable(Runnable closeRunnable) { + this.closeRunnable = closeRunnable; + return this; + } + + public TestingExecutionGraphCache build() { + return new TestingExecutionGraphCache(sizeSupplier, getExecutionGraphFunction, cleanupRunnable, closeRunnable); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpointTest.java new file mode 100644 index 0000000..e03138e --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpointTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.WebOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.blob.NoOpTransientBlobService; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.rest.RestServerEndpointConfiguration; +import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration; +import org.apache.flink.runtime.rest.handler.legacy.metrics.VoidMetricFetcher; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; +import org.apache.flink.util.ExecutorUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Tests for the {@link WebMonitorEndpoint}. + */ +public class WebMonitorEndpointTest extends TestLogger { + + @Test + public void cleansUpExpiredExecutionGraphs() throws Exception { + final Configuration configuration = new Configuration(); + configuration.setString(RestOptions.ADDRESS, "localhost"); + configuration.setLong(WebOptions.REFRESH_INTERVAL, 5L); + final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); + final long timeout = 10000L; + + final OneShotLatch cleanupLatch = new OneShotLatch(); + final TestingExecutionGraphCache executionGraphCache = TestingExecutionGraphCache.newBuilder() + .setCleanupRunnable(cleanupLatch::trigger) + .build(); + try (final WebMonitorEndpoint<RestfulGateway> webMonitorEndpoint = new WebMonitorEndpoint<>( + RestServerEndpointConfiguration.fromConfiguration(configuration), + CompletableFuture::new, + configuration, + RestHandlerConfiguration.fromConfiguration(configuration), + CompletableFuture::new, + NoOpTransientBlobService.INSTANCE, + executor, + VoidMetricFetcher.INSTANCE, + new TestingLeaderElectionService(), + executionGraphCache, + new TestingFatalErrorHandler())) { + + webMonitorEndpoint.start(); + + // check that the cleanup will be triggered + cleanupLatch.await(timeout, TimeUnit.MILLISECONDS); + } finally { + ExecutorUtils.gracefulShutdown(timeout, TimeUnit.MILLISECONDS, executor); + } + } +}