[FLINK-8614] [flip6] Activate Flip-6 mode per default This commit enables the Flip-6 mode per default. Additionally, it disables some of the Yarn tests which no longer apply to Flip-6 (tests which wait for a number of started TM container without a job submission).
This closes #5437. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ab8316f3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ab8316f3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ab8316f3 Branch: refs/heads/master Commit: ab8316f31a4de0648ab3ffa9f19fcd419aaa3bb9 Parents: ca9ee35 Author: Till Rohrmann <[email protected]> Authored: Tue Jan 30 09:22:03 2018 +0100 Committer: Till Rohrmann <[email protected]> Committed: Sun Feb 18 10:12:54 2018 +0100 ---------------------------------------------------------------------- .../program/rest/RestClusterClientTest.java | 2 +- .../apache/flink/configuration/CoreOptions.java | 12 ++-- .../flink/configuration/JobManagerOptions.java | 2 +- flink-dist/src/main/flink-bin/bin/config.sh | 2 +- .../flink/docs/rest/RestAPIDocGenerator.java | 2 +- .../dispatcher/DispatcherRestEndpoint.java | 3 +- .../runtime/jobmaster/JobManagerRunner.java | 2 +- .../flink/runtime/jobmaster/JobMaster.java | 27 ++++---- .../jobmaster/MiniDispatcherRestEndpoint.java | 3 +- .../flink/runtime/rest/FileUploadHandler.java | 44 +------------ .../apache/flink/runtime/rest/RestClient.java | 2 +- .../flink/runtime/rest/RestServerEndpoint.java | 42 +++++++++++- .../runtime/webmonitor/WebMonitorEndpoint.java | 2 +- .../runtime/rest/FileUploadHandlerTest.java | 67 -------------------- .../runtime/rest/RestServerEndpointITCase.java | 2 +- .../runtime/rest/RestServerEndpointTest.java | 37 +++++++++++ .../flink/yarn/YARNHighAvailabilityITCase.java | 3 + .../YARNSessionCapacitySchedulerITCase.java | 12 ++-- .../flink/yarn/YARNSessionFIFOITCase.java | 10 +-- .../org/apache/flink/yarn/YarnTestBase.java | 4 ++ .../apache/flink/yarn/YarnResourceManager.java | 2 +- .../flink/yarn/cli/FlinkYarnSessionCli.java | 2 +- 22 files changed, 135 insertions(+), 149 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ab8316f3/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java index 9e75eea..f54d9d2 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java @@ -578,7 +578,7 @@ public class RestClusterClientTest extends TestLogger { private final AbstractRestHandler<?, ?, ?, ?>[] abstractRestHandlers; - TestRestServerEndpoint(final AbstractRestHandler<?, ?, ?, ?>... abstractRestHandlers) { + TestRestServerEndpoint(final AbstractRestHandler<?, ?, ?, ?>... abstractRestHandlers) throws IOException { super(restServerEndpointConfiguration); this.abstractRestHandlers = abstractRestHandlers; } http://git-wip-us.apache.org/repos/asf/flink/blob/ab8316f3/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java index c44219f..9bd7fab 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java @@ -230,15 +230,15 @@ public class CoreOptions { // ------------------------------------------------------------------------ /** + * Constant value for the Flip-6 execution mode. + */ + public static final String FLIP6_MODE = "flip6"; + + /** * Switch to select the execution mode. Possible values are 'flip6' and 'old'. */ public static final ConfigOption<String> MODE = ConfigOptions .key("mode") - .defaultValue("old") + .defaultValue(FLIP6_MODE) .withDescription("Switch to select the execution mode. Possible values are 'flip6' and 'old'."); - - /** - * Constant value for the Flip-6 execution mode. - */ - public static final String FLIP6_MODE = "flip6"; } http://git-wip-us.apache.org/repos/asf/flink/blob/ab8316f3/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java index 74ec5b7..ade3958 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java @@ -140,7 +140,7 @@ public class JobManagerOptions { public static final ConfigOption<Long> SLOT_IDLE_TIMEOUT = key("slot.idle.timeout") - .defaultValue(20L * 1000L) + .defaultValue(10L * 1000L) .withDescription("The timeout in milliseconds for a idle slot in Slot Pool."); // --------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ab8316f3/flink-dist/src/main/flink-bin/bin/config.sh ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh index 807bf33..645e156 100755 --- a/flink-dist/src/main/flink-bin/bin/config.sh +++ b/flink-dist/src/main/flink-bin/bin/config.sh @@ -267,7 +267,7 @@ fi # Define FLIP if it is not already set if [ -z "${FLINK_MODE}" ]; then - FLINK_MODE=$(readFromConfig ${KEY_FLINK_MODE} "old" "${YAML_CONF}") + FLINK_MODE=$(readFromConfig ${KEY_FLINK_MODE} "flip6" "${YAML_CONF}") fi http://git-wip-us.apache.org/repos/asf/flink/blob/ab8316f3/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java ---------------------------------------------------------------------- diff --git a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java index f4e1c70..b3a5def 100644 --- a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java +++ b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java @@ -288,7 +288,7 @@ public class RestAPIDocGenerator { metricQueryServiceRetriever = path -> null; } - private DocumentingDispatcherRestEndpoint() { + private DocumentingDispatcherRestEndpoint() throws IOException { super( restConfig, dispatcherGatewayRetriever, http://git-wip-us.apache.org/repos/asf/flink/blob/ab8316f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java index b84e287..f8245c2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java @@ -38,6 +38,7 @@ import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; +import java.io.IOException; import java.nio.file.Path; import java.util.List; import java.util.Map; @@ -61,7 +62,7 @@ public class DispatcherRestEndpoint extends WebMonitorEndpoint<DispatcherGateway Executor executor, MetricQueryServiceRetriever metricQueryServiceRetriever, LeaderElectionService leaderElectionService, - FatalErrorHandler fatalErrorHandler) { + FatalErrorHandler fatalErrorHandler) throws IOException { super( endpointConfiguration, http://git-wip-us.apache.org/repos/asf/flink/blob/ab8316f3/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java index 285cb4a..5740bd7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java @@ -388,7 +388,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F log.info("JobManager for job {} ({}) was revoked leadership at {}.", jobGraph.getName(), jobGraph.getJobID(), getAddress()); - CompletableFuture<Acknowledge> suspendFuture = jobManager.suspend(new Exception("JobManager is no longer the leader."), rpcTimeout); + CompletableFuture<Acknowledge> suspendFuture = jobManager.suspend(new FlinkException("JobManager is no longer the leader."), rpcTimeout); suspendFuture.whenCompleteAsync( (Acknowledge ack, Throwable throwable) -> { http://git-wip-us.apache.org/repos/asf/flink/blob/ab8316f3/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index aeac2df..139c053 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -347,7 +347,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast * @param timeout for this operation * @return Future acknowledge indicating that the job has been suspended. Otherwise the future contains an exception */ - public CompletableFuture<Acknowledge> suspend(final Throwable cause, final Time timeout) { + public CompletableFuture<Acknowledge> suspend(final Exception cause, final Time timeout) { CompletableFuture<Acknowledge> suspendFuture = callAsyncWithoutFencing(() -> suspendExecution(cause), timeout); stop(); @@ -375,7 +375,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast resourceManagerHeartbeatManager.stop(); // make sure there is a graceful exit - suspendExecution(new Exception("JobManager is shutting down.")); + suspendExecution(new FlinkException("JobManager is shutting down.")); // shut down will internally release all registered slots slotPool.shutDown(); @@ -595,14 +595,11 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); if (checkpointCoordinator != null) { - getRpcService().execute(new Runnable() { - @Override - public void run() { - try { - checkpointCoordinator.receiveDeclineMessage(decline); - } catch (Exception e) { - log.error("Error in CheckpointCoordinator while processing {}", decline, e); - } + getRpcService().execute(() -> { + try { + checkpointCoordinator.receiveDeclineMessage(decline); + } catch (Exception e) { + log.error("Error in CheckpointCoordinator while processing {}", decline, e); } }); } else { @@ -915,7 +912,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast * * @param cause The reason of why this job been suspended. */ - private Acknowledge suspendExecution(final Throwable cause) { + private Acknowledge suspendExecution(final Exception cause) { validateRunsInMainThread(); if (getFencingToken() == null) { @@ -939,7 +936,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast slotPoolGateway.suspend(); // disconnect from resource manager: - closeResourceManagerConnection(new Exception("Execution was suspended.", cause)); + closeResourceManagerConnection(cause); return Acknowledge.get(); } @@ -1037,7 +1034,11 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast private void closeResourceManagerConnection(Exception cause) { if (resourceManagerConnection != null) { - log.info("Close ResourceManager connection {}.", resourceManagerConnection.getResourceManagerResourceID(), cause); + if (log.isDebugEnabled()) { + log.debug("Close ResourceManager connection {}.", resourceManagerConnection.getResourceManagerResourceID(), cause); + } else { + log.info("Close ResourceManager connection {}: {}.", resourceManagerConnection.getResourceManagerResourceID(), cause.getMessage()); + } resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerConnection.getResourceManagerResourceID()); http://git-wip-us.apache.org/repos/asf/flink/blob/ab8316f3/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniDispatcherRestEndpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniDispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniDispatcherRestEndpoint.java index 54ad526..b31c46a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniDispatcherRestEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniDispatcherRestEndpoint.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; +import java.io.IOException; import java.util.concurrent.Executor; /** @@ -48,7 +49,7 @@ public class MiniDispatcherRestEndpoint extends WebMonitorEndpoint<RestfulGatewa Executor executor, MetricQueryServiceRetriever metricQueryServiceRetriever, LeaderElectionService leaderElectionService, - FatalErrorHandler fatalErrorHandler) { + FatalErrorHandler fatalErrorHandler) throws IOException { super( endpointConfiguration, leaderRetriever, http://git-wip-us.apache.org/repos/asf/flink/blob/ab8316f3/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java index c214b50..8854a1f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java @@ -18,8 +18,6 @@ package org.apache.flink.runtime.rest; -import org.apache.flink.annotation.VisibleForTesting; - import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline; @@ -39,8 +37,6 @@ import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.UUID; @@ -66,10 +62,8 @@ public class FileUploadHandler extends SimpleChannelInboundHandler<HttpObject> { private HttpRequest currentHttpRequest; - public FileUploadHandler(final Path uploadDir) throws IOException { + public FileUploadHandler(final Path uploadDir) { super(false); - createUploadDir(uploadDir); - DiskFileUpload.baseDirectory = uploadDir.normalize().toAbsolutePath().toString(); this.uploadDir = requireNonNull(uploadDir); } @@ -89,7 +83,8 @@ public class FileUploadHandler extends SimpleChannelInboundHandler<HttpObject> { ctx.fireChannelRead(msg); } } else if (msg instanceof HttpContent && currentHttpPostRequestDecoder != null) { - createUploadDir(uploadDir); + // make sure that we still have a upload dir in case that it got deleted in the meanwhile + RestServerEndpoint.createUploadDir(uploadDir, LOG); final HttpContent httpContent = (HttpContent) msg; currentHttpPostRequestDecoder.offer(httpContent); @@ -123,37 +118,4 @@ public class FileUploadHandler extends SimpleChannelInboundHandler<HttpObject> { currentHttpPostRequestDecoder = null; currentHttpRequest = null; } - - /** - * Creates the upload dir if needed. - */ - @VisibleForTesting - static void createUploadDir(final Path uploadDir) throws IOException { - if (!Files.exists(uploadDir)) { - LOG.warn("Upload directory {} does not exist, or has been deleted externally. " + - "Previously uploaded files are no longer available.", uploadDir); - checkAndCreateUploadDir(uploadDir); - } - } - - /** - * Checks whether the given directory exists and is writable. If it doesn't exist, this method - * will attempt to create it. - * - * @param uploadDir directory to check - * @throws IOException if the directory does not exist and cannot be created, or if the - * directory isn't writable - */ - private static synchronized void checkAndCreateUploadDir(final Path uploadDir) throws IOException { - if (Files.exists(uploadDir) && Files.isWritable(uploadDir)) { - LOG.info("Using directory {} for file uploads.", uploadDir); - } else if (Files.isWritable(Files.createDirectories(uploadDir))) { - LOG.info("Created directory {} for file uploads.", uploadDir); - } else { - LOG.warn("Upload directory {} cannot be created or is not writable.", uploadDir); - throw new IOException( - String.format("Upload directory %s cannot be created or is not writable.", - uploadDir)); - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/ab8316f3/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java index 43a9981..42612ea 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java @@ -151,7 +151,7 @@ public class RestClient { String targetUrl = MessageParameters.resolveUrl(messageHeaders.getTargetRestEndpointURL(), messageParameters); - LOG.debug("Sending request of class {} to {}", request.getClass(), targetUrl); + LOG.debug("Sending request of class {} to {}:{}{}", request.getClass(), targetAddress, targetPort, targetUrl); // serialize payload StringWriter sw = new StringWriter(); objectMapper.writeValue(sw, request); http://git-wip-us.apache.org/repos/asf/flink/blob/ab8316f3/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java index 1904235..dbb25a7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.rest; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.rest.handler.PipelineErrorHandler; @@ -49,6 +50,7 @@ import javax.net.ssl.SSLEngine; import java.io.IOException; import java.io.Serializable; import java.net.InetSocketAddress; +import java.nio.file.Files; import java.nio.file.Path; import java.util.Collections; import java.util.Comparator; @@ -77,12 +79,14 @@ public abstract class RestServerEndpoint { private volatile boolean started; - public RestServerEndpoint(RestServerEndpointConfiguration configuration) { + public RestServerEndpoint(RestServerEndpointConfiguration configuration) throws IOException { Preconditions.checkNotNull(configuration); this.configuredAddress = configuration.getEndpointBindAddress(); this.configuredPort = configuration.getEndpointBindPort(); this.sslEngine = configuration.getSslEngine(); + this.uploadDir = configuration.getUploadDir(); + createUploadDir(uploadDir, log); this.restAddress = null; @@ -136,7 +140,7 @@ public abstract class RestServerEndpoint { ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() { @Override - protected void initChannel(SocketChannel ch) throws IOException { + protected void initChannel(SocketChannel ch) { Handler handler = new RouterHandler(router); // SSL should be the first handler in the pipeline @@ -318,6 +322,40 @@ public abstract class RestServerEndpoint { } /** + * Creates the upload dir if needed. + */ + @VisibleForTesting + static void createUploadDir(final Path uploadDir, final Logger log) throws IOException { + if (!Files.exists(uploadDir)) { + log.warn("Upload directory {} does not exist, or has been deleted externally. " + + "Previously uploaded files are no longer available.", uploadDir); + checkAndCreateUploadDir(uploadDir, log); + } + } + + /** + * Checks whether the given directory exists and is writable. If it doesn't exist, this method + * will attempt to create it. + * + * @param uploadDir directory to check + * @param log logger used for logging output + * @throws IOException if the directory does not exist and cannot be created, or if the + * directory isn't writable + */ + private static synchronized void checkAndCreateUploadDir(final Path uploadDir, final Logger log) throws IOException { + if (Files.exists(uploadDir) && Files.isWritable(uploadDir)) { + log.info("Using directory {} for file uploads.", uploadDir); + } else if (Files.isWritable(Files.createDirectories(uploadDir))) { + log.info("Created directory {} for file uploads.", uploadDir); + } else { + log.warn("Upload directory {} cannot be created or is not writable.", uploadDir); + throw new IOException( + String.format("Upload directory %s cannot be created or is not writable.", + uploadDir)); + } + } + + /** * Comparator for Rest URLs. * * <p>The comparator orders the Rest URLs such that URLs with path parameters are ordered behind http://git-wip-us.apache.org/repos/asf/flink/blob/ab8316f3/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java index 5e4c72b..10ad344 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java @@ -163,7 +163,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp Executor executor, MetricQueryServiceRetriever metricQueryServiceRetriever, LeaderElectionService leaderElectionService, - FatalErrorHandler fatalErrorHandler) { + FatalErrorHandler fatalErrorHandler) throws IOException { super(endpointConfiguration); this.leaderRetriever = Preconditions.checkNotNull(leaderRetriever); this.clusterConfiguration = Preconditions.checkNotNull(clusterConfiguration); http://git-wip-us.apache.org/repos/asf/flink/blob/ab8316f3/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java deleted file mode 100644 index 61277ae..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rest; - -import org.apache.flink.util.TestLogger; - -import org.junit.Assume; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -/** - * Tests for {@link FileUploadHandler}. - */ -public class FileUploadHandlerTest extends TestLogger { - - @Rule - public TemporaryFolder temporaryFolder = new TemporaryFolder(); - - @Test - public void testCreateUploadDir() throws Exception { - final File file = temporaryFolder.newFolder(); - final Path testUploadDir = file.toPath().resolve("testUploadDir"); - assertFalse(Files.exists(testUploadDir)); - FileUploadHandler.createUploadDir(testUploadDir); - assertTrue(Files.exists(testUploadDir)); - } - - @Test - public void testCreateUploadDirFails() throws Exception { - final File file = temporaryFolder.newFolder(); - Assume.assumeTrue(file.setWritable(false)); - - final Path testUploadDir = file.toPath().resolve("testUploadDir"); - assertFalse(Files.exists(testUploadDir)); - try { - FileUploadHandler.createUploadDir(testUploadDir); - fail("Expected exception not thrown."); - } catch (IOException e) { - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ab8316f3/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java index 735b12d..50b26e3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java @@ -303,7 +303,7 @@ public class RestServerEndpointITCase extends TestLogger { TestRestServerEndpoint( RestServerEndpointConfiguration configuration, TestHandler testHandler, - TestUploadHandler testUploadHandler) { + TestUploadHandler testUploadHandler) throws IOException { super(configuration); this.testHandler = Preconditions.checkNotNull(testHandler); http://git-wip-us.apache.org/repos/asf/flink/blob/ab8316f3/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointTest.java index 866dfbf..de32883 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointTest.java @@ -21,14 +21,25 @@ package org.apache.flink.runtime.rest; import org.apache.flink.testutils.category.Flip6; import org.apache.flink.util.TestLogger; +import org.junit.Assume; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; +import org.slf4j.helpers.NOPLogger; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; import java.util.List; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * Test cases for the {@link RestServerEndpoint}. @@ -36,6 +47,9 @@ import static org.junit.Assert.assertEquals; @Category(Flip6.class) public class RestServerEndpointTest extends TestLogger { + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + /** * Tests that the REST handler URLs are properly sorted. */ @@ -63,4 +77,27 @@ public class RestServerEndpointTest extends TestLogger { assertEquals(expected, handlerUrls); } + + @Test + public void testCreateUploadDir() throws Exception { + final File file = temporaryFolder.newFolder(); + final Path testUploadDir = file.toPath().resolve("testUploadDir"); + assertFalse(Files.exists(testUploadDir)); + RestServerEndpoint.createUploadDir(testUploadDir, NOPLogger.NOP_LOGGER); + assertTrue(Files.exists(testUploadDir)); + } + + @Test + public void testCreateUploadDirFails() throws Exception { + final File file = temporaryFolder.newFolder(); + Assume.assumeTrue(file.setWritable(false)); + + final Path testUploadDir = file.toPath().resolve("testUploadDir"); + assertFalse(Files.exists(testUploadDir)); + try { + RestServerEndpoint.createUploadDir(testUploadDir, NOPLogger.NOP_LOGGER); + fail("Expected exception not thrown."); + } catch (IOException e) { + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/ab8316f3/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java index 5a5fb74..05be03a 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java @@ -55,6 +55,8 @@ import java.util.concurrent.TimeUnit; import scala.concurrent.duration.FiniteDuration; +import static org.junit.Assume.assumeTrue; + /** * Tests that verify correct HA behavior. */ @@ -104,6 +106,7 @@ public class YARNHighAvailabilityITCase extends YarnTestBase { */ @Test public void testMultipleAMKill() throws Exception { + assumeTrue("This test only works with the old actor based code.", !flip6); final int numberKillingAttempts = numberApplicationAttempts - 1; String confDirPath = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR); final Configuration configuration = GlobalConfiguration.loadConfiguration(); http://git-wip-us.apache.org/repos/asf/flink/blob/ab8316f3/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java index c806c5e..3a674ad 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java @@ -78,6 +78,7 @@ import java.util.regex.Pattern; import static junit.framework.TestCase.assertTrue; import static org.apache.flink.yarn.UtilsTest.addTestAppender; import static org.apache.flink.yarn.UtilsTest.checkForLogString; +import static org.junit.Assume.assumeTrue; /** * This test starts a MiniYARNCluster with a CapacityScheduler. @@ -101,6 +102,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase { */ @Test public void testClientStartup() throws IOException { + assumeTrue("Flip-6 does not start TMs upfront.", !flip6); LOG.info("Starting testClientStartup()"); runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(), "-n", "1", @@ -130,7 +132,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase { "-yjm", "768", "-ytm", "1024", exampleJarLocation.getAbsolutePath()}, /* test succeeded after this string */ - "Job execution complete", + "Program execution finished", /* prohibited strings: (to verify the parallelism) */ // (we should see "DataSink (...) (1/2)" and "DataSink (...) (2/2)" instead) new String[]{"DataSink \\(.*\\) \\(1/1\\) switched to FINISHED"}, @@ -177,7 +179,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase { "-yD", "taskmanager.memory.size=" + offHeapMemory, "-yD", "taskmanager.memory.preallocate=true", exampleJarLocation.getAbsolutePath()}, /* test succeeded after this string */ - "Job execution complete", + "Program execution finished", /* prohibited strings: (to verify the parallelism) */ // (we should see "DataSink (...) (1/2)" and "DataSink (...) (2/2)" instead) new String[]{"DataSink \\(.*\\) \\(1/1\\) switched to FINISHED"}, @@ -190,6 +192,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase { */ @Test(timeout = 100000) // timeout after 100 seconds public void testTaskManagerFailure() throws Exception { + assumeTrue("Flip-6 does not start TMs upfront.", !flip6); LOG.info("Starting testTaskManagerFailure()"); Runner runner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(), "-n", "1", @@ -402,7 +405,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase { "-yjm", "768", "-ytm", "1024", exampleJarLocation.getAbsolutePath()}, /* test succeeded after this string */ - "Job execution complete", + "Program execution finished", /* prohibited strings: (we want to see "DataSink (...) (2/2) switched to FINISHED") */ new String[]{"DataSink \\(.*\\) \\(1/1\\) switched to FINISHED"}, RunTypes.CLI_FRONTEND, 0, true); @@ -477,7 +480,8 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase { "-yD", "yarn.tags=test-tag", "-ytm", "1024", "-ys", "2", // test requesting slots from YARN. - "--yarndetached", job, + "-p", "2", + "--detached", job, "--input", tmpInFile.getAbsoluteFile().toString(), "--output", tmpOutFolder.getAbsoluteFile().toString()}, "Job has been submitted with JobID", http://git-wip-us.apache.org/repos/asf/flink/blob/ab8316f3/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java index 975dd28..e545187 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java @@ -98,10 +98,12 @@ public class YARNSessionFIFOITCase extends YarnTestBase { runner.join(); checkForLogString("The Flink YARN client has been started in detached mode"); - LOG.info("Waiting until two containers are running"); - // wait until two containers are running - while (getRunningContainers() < 2) { - sleep(500); + if (!flip6) { + LOG.info("Waiting until two containers are running"); + // wait until two containers are running + while (getRunningContainers() < 2) { + sleep(500); + } } //additional sleep for the JM/TM to start and establish connection http://git-wip-us.apache.org/repos/asf/flink/blob/ab8316f3/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java index eeda32d..c863e14 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java @@ -20,6 +20,7 @@ package org.apache.flink.yarn; import org.apache.flink.client.cli.CliFrontend; import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.util.Preconditions; @@ -187,6 +188,7 @@ public abstract class YarnTestBase extends TestLogger { private YarnClient yarnClient = null; protected org.apache.flink.configuration.Configuration flinkConfiguration; + protected boolean flip6; @Before public void checkClusterEmpty() throws IOException, YarnException { @@ -207,6 +209,8 @@ public abstract class YarnTestBase extends TestLogger { } flinkConfiguration = new org.apache.flink.configuration.Configuration(); + + flip6 = CoreOptions.FLIP6_MODE.equalsIgnoreCase(flinkConfiguration.getString(CoreOptions.MODE)); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/ab8316f3/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java index d07ac5a..87324cb 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java @@ -432,7 +432,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme final ContaineredTaskManagerParameters taskManagerParameters = ContaineredTaskManagerParameters.create(flinkConfig, resource.getMemory(), 1); - log.info("TaskExecutor{} will be started with container size {} MB, JVM heap size {} MB, " + + log.info("TaskExecutor {} will be started with container size {} MB, JVM heap size {} MB, " + "JVM direct memory limit {} MB", containerId, taskManagerParameters.taskManagerTotalMemoryMB(), http://git-wip-us.apache.org/repos/asf/flink/blob/ab8316f3/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java index a289b66..8882236 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java @@ -365,7 +365,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId } private ClusterSpecification createClusterSpecification(Configuration configuration, CommandLine cmd) { - if (!cmd.hasOption(container.getOpt())) { // number of containers is required option! + if (!flip6 && !cmd.hasOption(container.getOpt())) { // number of containers is required option! LOG.error("Missing required argument {}", container.getOpt()); printUsage(); throw new IllegalArgumentException("Missing required argument " + container.getOpt());
