This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new eb0a56b  [FLINK-18772] Disable web submission for per-job/application 
mode deployments
eb0a56b is described below

commit eb0a56bafa241c476d1b69a7ae0e1dc269305c4d
Author: Till Rohrmann <[email protected]>
AuthorDate: Thu Jul 30 14:14:24 2020 +0200

    [FLINK-18772] Disable web submission for per-job/application mode 
deployments
    
    When running Flink in per-job/application mode, it will instantiate a 
MiniDispatcherRestEndpoint.
    This endpoint does not instantiate the web submission REST handlers. 
However, it still displayed
    the submit job link in the web ui. This commit changes the behaviour so 
that we no longer display
    this link when running Flink in per-job/application mode.
    
    This closes #13030.
---
 .../runtime/dispatcher/DispatcherRestEndpoint.java     | 18 +++++++++++++-----
 .../flink/runtime/webmonitor/WebMonitorEndpoint.java   | 11 ++++++++++-
 2 files changed, 23 insertions(+), 6 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 870cb5a..c48ac7d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -41,6 +41,8 @@ import org.apache.flink.util.FlinkException;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
 
 import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
@@ -96,8 +98,17 @@ public class DispatcherRestEndpoint extends 
WebMonitorEndpoint<DispatcherGateway
                        executor,
                        clusterConfiguration);
 
+               handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), 
jobSubmitHandler));
+
+               return handlers;
+       }
+
+       @Override
+       protected Collection<Tuple2<RestHandlerSpecification, 
ChannelInboundHandler>> 
initializeWebSubmissionHandlers(CompletableFuture<String> localAddressFuture) {
                if (restConfiguration.isWebSubmitEnabled()) {
                        try {
+                               final Time timeout = 
restConfiguration.getTimeout();
+
                                webSubmissionExtension = 
WebMonitorUtils.loadWebSubmissionExtension(
                                        leaderRetriever,
                                        timeout,
@@ -107,8 +118,7 @@ public class DispatcherRestEndpoint extends 
WebMonitorEndpoint<DispatcherGateway
                                        executor,
                                        clusterConfiguration);
 
-                               // register extension handlers
-                               
handlers.addAll(webSubmissionExtension.getHandlers());
+                               return webSubmissionExtension.getHandlers();
                        } catch (FlinkException e) {
                                if (log.isDebugEnabled()) {
                                        log.debug("Failed to load web based job 
submission extension.", e);
@@ -121,9 +131,7 @@ public class DispatcherRestEndpoint extends 
WebMonitorEndpoint<DispatcherGateway
                        log.info("Web-based job submission is not enabled.");
                }
 
-               handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), 
jobSubmitHandler));
-
-               return handlers;
+               return Collections.emptyList();
        }
 
        @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index ad98cc8..e00364e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -133,6 +133,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.UUID;
@@ -207,6 +208,10 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
        protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> 
initializeHandlers(final CompletableFuture<String> localAddressFuture) {
                ArrayList<Tuple2<RestHandlerSpecification, 
ChannelInboundHandler>> handlers = new ArrayList<>(30);
 
+               final Collection<Tuple2<RestHandlerSpecification, 
ChannelInboundHandler>> webSubmissionHandlers = 
initializeWebSubmissionHandlers(localAddressFuture);
+               handlers.addAll(webSubmissionHandlers);
+               final boolean hasWebSubmissionHandlers = 
!webSubmissionHandlers.isEmpty();
+
                final Time timeout = restConfiguration.getTimeout();
 
                ClusterOverviewHandler clusterOverviewHandler = new 
ClusterOverviewHandler(
@@ -221,7 +226,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
                        responseHeaders,
                        DashboardConfigurationHeaders.getInstance(),
                        restConfiguration.getRefreshInterval(),
-                       restConfiguration.isWebSubmitEnabled());
+                       hasWebSubmissionHandlers);
 
                JobIdsHandler jobIdsHandler = new JobIdsHandler(
                        leaderRetriever,
@@ -664,6 +669,10 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
                return handlers;
        }
 
+       protected Collection<Tuple2<RestHandlerSpecification, 
ChannelInboundHandler>> initializeWebSubmissionHandlers(final 
CompletableFuture<String> localAddressFuture) {
+               return Collections.emptyList();
+       }
+
        @Nonnull
        private ChannelInboundHandler createStaticFileHandler(
                        Time timeout,

Reply via email to