[FLINK-8614] [flip6] Activate Flip-6 mode per default

This commit enables the Flip-6 mode per default. Additionally, it disables
some of the Yarn tests which no longer apply to Flip-6 (tests which wait for
a number of started TM container without a job submission).

This closes #5437.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ab8316f3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ab8316f3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ab8316f3

Branch: refs/heads/master
Commit: ab8316f31a4de0648ab3ffa9f19fcd419aaa3bb9
Parents: ca9ee35
Author: Till Rohrmann <[email protected]>
Authored: Tue Jan 30 09:22:03 2018 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Sun Feb 18 10:12:54 2018 +0100

----------------------------------------------------------------------
 .../program/rest/RestClusterClientTest.java     |  2 +-
 .../apache/flink/configuration/CoreOptions.java | 12 ++--
 .../flink/configuration/JobManagerOptions.java  |  2 +-
 flink-dist/src/main/flink-bin/bin/config.sh     |  2 +-
 .../flink/docs/rest/RestAPIDocGenerator.java    |  2 +-
 .../dispatcher/DispatcherRestEndpoint.java      |  3 +-
 .../runtime/jobmaster/JobManagerRunner.java     |  2 +-
 .../flink/runtime/jobmaster/JobMaster.java      | 27 ++++----
 .../jobmaster/MiniDispatcherRestEndpoint.java   |  3 +-
 .../flink/runtime/rest/FileUploadHandler.java   | 44 +------------
 .../apache/flink/runtime/rest/RestClient.java   |  2 +-
 .../flink/runtime/rest/RestServerEndpoint.java  | 42 +++++++++++-
 .../runtime/webmonitor/WebMonitorEndpoint.java  |  2 +-
 .../runtime/rest/FileUploadHandlerTest.java     | 67 --------------------
 .../runtime/rest/RestServerEndpointITCase.java  |  2 +-
 .../runtime/rest/RestServerEndpointTest.java    | 37 +++++++++++
 .../flink/yarn/YARNHighAvailabilityITCase.java  |  3 +
 .../YARNSessionCapacitySchedulerITCase.java     | 12 ++--
 .../flink/yarn/YARNSessionFIFOITCase.java       | 10 +--
 .../org/apache/flink/yarn/YarnTestBase.java     |  4 ++
 .../apache/flink/yarn/YarnResourceManager.java  |  2 +-
 .../flink/yarn/cli/FlinkYarnSessionCli.java     |  2 +-
 22 files changed, 135 insertions(+), 149 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ab8316f3/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 9e75eea..f54d9d2 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -578,7 +578,7 @@ public class RestClusterClientTest extends TestLogger {
 
                private final AbstractRestHandler<?, ?, ?, ?>[] 
abstractRestHandlers;
 
-               TestRestServerEndpoint(final AbstractRestHandler<?, ?, ?, ?>... 
abstractRestHandlers) {
+               TestRestServerEndpoint(final AbstractRestHandler<?, ?, ?, ?>... 
abstractRestHandlers) throws IOException {
                        super(restServerEndpointConfiguration);
                        this.abstractRestHandlers = abstractRestHandlers;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab8316f3/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index c44219f..9bd7fab 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -230,15 +230,15 @@ public class CoreOptions {
        // 
------------------------------------------------------------------------
 
        /**
+        * Constant value for the Flip-6 execution mode.
+        */
+       public static final String FLIP6_MODE = "flip6";
+
+       /**
         * Switch to select the execution mode. Possible values are 'flip6' and 
'old'.
         */
        public static final ConfigOption<String> MODE = ConfigOptions
                .key("mode")
-               .defaultValue("old")
+               .defaultValue(FLIP6_MODE)
                .withDescription("Switch to select the execution mode. Possible 
values are 'flip6' and 'old'.");
-
-       /**
-        * Constant value for the Flip-6 execution mode.
-        */
-       public static final String FLIP6_MODE = "flip6";
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab8316f3/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index 74ec5b7..ade3958 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -140,7 +140,7 @@ public class JobManagerOptions {
 
        public static final ConfigOption<Long> SLOT_IDLE_TIMEOUT =
                key("slot.idle.timeout")
-                       .defaultValue(20L * 1000L)
+                       .defaultValue(10L * 1000L)
                        .withDescription("The timeout in milliseconds for a 
idle slot in Slot Pool.");
 
        // 
---------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/ab8316f3/flink-dist/src/main/flink-bin/bin/config.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh 
b/flink-dist/src/main/flink-bin/bin/config.sh
index 807bf33..645e156 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -267,7 +267,7 @@ fi
 
 # Define FLIP if it is not already set
 if [ -z "${FLINK_MODE}" ]; then
-    FLINK_MODE=$(readFromConfig ${KEY_FLINK_MODE} "old" "${YAML_CONF}")
+    FLINK_MODE=$(readFromConfig ${KEY_FLINK_MODE} "flip6" "${YAML_CONF}")
 fi
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ab8316f3/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java 
b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
index f4e1c70..b3a5def 100644
--- 
a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
+++ 
b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
@@ -288,7 +288,7 @@ public class RestAPIDocGenerator {
                        metricQueryServiceRetriever = path -> null;
                }
 
-               private DocumentingDispatcherRestEndpoint() {
+               private DocumentingDispatcherRestEndpoint() throws IOException {
                        super(
                                restConfig,
                                dispatcherGatewayRetriever,

http://git-wip-us.apache.org/repos/asf/flink/blob/ab8316f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index b84e287..f8245c2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -38,6 +38,7 @@ import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
 
+import java.io.IOException;
 import java.nio.file.Path;
 import java.util.List;
 import java.util.Map;
@@ -61,7 +62,7 @@ public class DispatcherRestEndpoint extends 
WebMonitorEndpoint<DispatcherGateway
                        Executor executor,
                        MetricQueryServiceRetriever metricQueryServiceRetriever,
                        LeaderElectionService leaderElectionService,
-                       FatalErrorHandler fatalErrorHandler) {
+                       FatalErrorHandler fatalErrorHandler) throws IOException 
{
 
                super(
                        endpointConfiguration,

http://git-wip-us.apache.org/repos/asf/flink/blob/ab8316f3/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 285cb4a..5740bd7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -388,7 +388,7 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions, F
                        log.info("JobManager for job {} ({}) was revoked 
leadership at {}.",
                                jobGraph.getName(), jobGraph.getJobID(), 
getAddress());
 
-                       CompletableFuture<Acknowledge>  suspendFuture = 
jobManager.suspend(new Exception("JobManager is no longer the leader."), 
rpcTimeout);
+                       CompletableFuture<Acknowledge>  suspendFuture = 
jobManager.suspend(new FlinkException("JobManager is no longer the leader."), 
rpcTimeout);
 
                        suspendFuture.whenCompleteAsync(
                                (Acknowledge ack, Throwable throwable) -> {

http://git-wip-us.apache.org/repos/asf/flink/blob/ab8316f3/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index aeac2df..139c053 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -347,7 +347,7 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
         * @param timeout for this operation
         * @return Future acknowledge indicating that the job has been 
suspended. Otherwise the future contains an exception
         */
-       public CompletableFuture<Acknowledge> suspend(final Throwable cause, 
final Time timeout) {
+       public CompletableFuture<Acknowledge> suspend(final Exception cause, 
final Time timeout) {
                CompletableFuture<Acknowledge> suspendFuture = 
callAsyncWithoutFencing(() -> suspendExecution(cause), timeout);
 
                stop();
@@ -375,7 +375,7 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                resourceManagerHeartbeatManager.stop();
 
                // make sure there is a graceful exit
-               suspendExecution(new Exception("JobManager is shutting down."));
+               suspendExecution(new FlinkException("JobManager is shutting 
down."));
 
                // shut down will internally release all registered slots
                slotPool.shutDown();
@@ -595,14 +595,11 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
 
                if (checkpointCoordinator != null) {
-                       getRpcService().execute(new Runnable() {
-                               @Override
-                               public void run() {
-                                       try {
-                                               
checkpointCoordinator.receiveDeclineMessage(decline);
-                                       } catch (Exception e) {
-                                               log.error("Error in 
CheckpointCoordinator while processing {}", decline, e);
-                                       }
+                       getRpcService().execute(() -> {
+                               try {
+                                       
checkpointCoordinator.receiveDeclineMessage(decline);
+                               } catch (Exception e) {
+                                       log.error("Error in 
CheckpointCoordinator while processing {}", decline, e);
                                }
                        });
                } else {
@@ -915,7 +912,7 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
         *
         * @param cause The reason of why this job been suspended.
         */
-       private Acknowledge suspendExecution(final Throwable cause) {
+       private Acknowledge suspendExecution(final Exception cause) {
                validateRunsInMainThread();
 
                if (getFencingToken() == null) {
@@ -939,7 +936,7 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                slotPoolGateway.suspend();
 
                // disconnect from resource manager:
-               closeResourceManagerConnection(new Exception("Execution was 
suspended.", cause));
+               closeResourceManagerConnection(cause);
 
                return Acknowledge.get();
        }
@@ -1037,7 +1034,11 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
 
        private void closeResourceManagerConnection(Exception cause) {
                if (resourceManagerConnection != null) {
-                       log.info("Close ResourceManager connection {}.", 
resourceManagerConnection.getResourceManagerResourceID(), cause);
+                       if (log.isDebugEnabled()) {
+                               log.debug("Close ResourceManager connection 
{}.", resourceManagerConnection.getResourceManagerResourceID(), cause);
+                       } else {
+                               log.info("Close ResourceManager connection {}: 
{}.", resourceManagerConnection.getResourceManagerResourceID(), 
cause.getMessage());
+                       }
 
                        
resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerConnection.getResourceManagerResourceID());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ab8316f3/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniDispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniDispatcherRestEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniDispatcherRestEndpoint.java
index 54ad526..b31c46a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniDispatcherRestEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniDispatcherRestEndpoint.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 
+import java.io.IOException;
 import java.util.concurrent.Executor;
 
 /**
@@ -48,7 +49,7 @@ public class MiniDispatcherRestEndpoint extends 
WebMonitorEndpoint<RestfulGatewa
                        Executor executor,
                        MetricQueryServiceRetriever metricQueryServiceRetriever,
                        LeaderElectionService leaderElectionService,
-                       FatalErrorHandler fatalErrorHandler) {
+                       FatalErrorHandler fatalErrorHandler) throws IOException 
{
                super(
                        endpointConfiguration,
                        leaderRetriever,

http://git-wip-us.apache.org/repos/asf/flink/blob/ab8316f3/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
index c214b50..8854a1f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.rest;
 
-import org.apache.flink.annotation.VisibleForTesting;
-
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline;
@@ -39,8 +37,6 @@ import 
org.apache.flink.shaded.netty4.io.netty.util.AttributeKey;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.UUID;
@@ -66,10 +62,8 @@ public class FileUploadHandler extends 
SimpleChannelInboundHandler<HttpObject> {
 
        private HttpRequest currentHttpRequest;
 
-       public FileUploadHandler(final Path uploadDir) throws IOException {
+       public FileUploadHandler(final Path uploadDir) {
                super(false);
-               createUploadDir(uploadDir);
-
                DiskFileUpload.baseDirectory = 
uploadDir.normalize().toAbsolutePath().toString();
                this.uploadDir = requireNonNull(uploadDir);
        }
@@ -89,7 +83,8 @@ public class FileUploadHandler extends 
SimpleChannelInboundHandler<HttpObject> {
                                ctx.fireChannelRead(msg);
                        }
                } else if (msg instanceof HttpContent && 
currentHttpPostRequestDecoder != null) {
-                       createUploadDir(uploadDir);
+                       // make sure that we still have a upload dir in case 
that it got deleted in the meanwhile
+                       RestServerEndpoint.createUploadDir(uploadDir, LOG);
 
                        final HttpContent httpContent = (HttpContent) msg;
                        currentHttpPostRequestDecoder.offer(httpContent);
@@ -123,37 +118,4 @@ public class FileUploadHandler extends 
SimpleChannelInboundHandler<HttpObject> {
                currentHttpPostRequestDecoder = null;
                currentHttpRequest = null;
        }
-
-       /**
-        * Creates the upload dir if needed.
-        */
-       @VisibleForTesting
-       static void createUploadDir(final Path uploadDir) throws IOException {
-               if (!Files.exists(uploadDir)) {
-                       LOG.warn("Upload directory {} does not exist, or has 
been deleted externally. " +
-                               "Previously uploaded files are no longer 
available.", uploadDir);
-                       checkAndCreateUploadDir(uploadDir);
-               }
-       }
-
-       /**
-        * Checks whether the given directory exists and is writable. If it 
doesn't exist, this method
-        * will attempt to create it.
-        *
-        * @param uploadDir directory to check
-        * @throws IOException if the directory does not exist and cannot be 
created, or if the
-        *                     directory isn't writable
-        */
-       private static synchronized void checkAndCreateUploadDir(final Path 
uploadDir) throws IOException {
-               if (Files.exists(uploadDir) && Files.isWritable(uploadDir)) {
-                       LOG.info("Using directory {} for file uploads.", 
uploadDir);
-               } else if 
(Files.isWritable(Files.createDirectories(uploadDir))) {
-                       LOG.info("Created directory {} for file uploads.", 
uploadDir);
-               } else {
-                       LOG.warn("Upload directory {} cannot be created or is 
not writable.", uploadDir);
-                       throw new IOException(
-                               String.format("Upload directory %s cannot be 
created or is not writable.",
-                                       uploadDir));
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab8316f3/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index 43a9981..42612ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -151,7 +151,7 @@ public class RestClient {
 
                String targetUrl = 
MessageParameters.resolveUrl(messageHeaders.getTargetRestEndpointURL(), 
messageParameters);
 
-               LOG.debug("Sending request of class {} to {}", 
request.getClass(), targetUrl);
+               LOG.debug("Sending request of class {} to {}:{}{}", 
request.getClass(), targetAddress, targetPort, targetUrl);
                // serialize payload
                StringWriter sw = new StringWriter();
                objectMapper.writeValue(sw, request);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab8316f3/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index 1904235..dbb25a7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.rest;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
@@ -49,6 +50,7 @@ import javax.net.ssl.SSLEngine;
 import java.io.IOException;
 import java.io.Serializable;
 import java.net.InetSocketAddress;
+import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.Collections;
 import java.util.Comparator;
@@ -77,12 +79,14 @@ public abstract class RestServerEndpoint {
 
        private volatile boolean started;
 
-       public RestServerEndpoint(RestServerEndpointConfiguration 
configuration) {
+       public RestServerEndpoint(RestServerEndpointConfiguration 
configuration) throws IOException {
                Preconditions.checkNotNull(configuration);
                this.configuredAddress = configuration.getEndpointBindAddress();
                this.configuredPort = configuration.getEndpointBindPort();
                this.sslEngine = configuration.getSslEngine();
+
                this.uploadDir = configuration.getUploadDir();
+               createUploadDir(uploadDir, log);
 
                this.restAddress = null;
 
@@ -136,7 +140,7 @@ public abstract class RestServerEndpoint {
                        ChannelInitializer<SocketChannel> initializer = new 
ChannelInitializer<SocketChannel>() {
 
                                @Override
-                               protected void initChannel(SocketChannel ch) 
throws IOException {
+                               protected void initChannel(SocketChannel ch) {
                                        Handler handler = new 
RouterHandler(router);
 
                                        // SSL should be the first handler in 
the pipeline
@@ -318,6 +322,40 @@ public abstract class RestServerEndpoint {
        }
 
        /**
+        * Creates the upload dir if needed.
+        */
+       @VisibleForTesting
+       static void createUploadDir(final Path uploadDir, final Logger log) 
throws IOException {
+               if (!Files.exists(uploadDir)) {
+                       log.warn("Upload directory {} does not exist, or has 
been deleted externally. " +
+                               "Previously uploaded files are no longer 
available.", uploadDir);
+                       checkAndCreateUploadDir(uploadDir, log);
+               }
+       }
+
+       /**
+        * Checks whether the given directory exists and is writable. If it 
doesn't exist, this method
+        * will attempt to create it.
+        *
+        * @param uploadDir directory to check
+        * @param log logger used for logging output
+        * @throws IOException if the directory does not exist and cannot be 
created, or if the
+        *                     directory isn't writable
+        */
+       private static synchronized void checkAndCreateUploadDir(final Path 
uploadDir, final Logger log) throws IOException {
+               if (Files.exists(uploadDir) && Files.isWritable(uploadDir)) {
+                       log.info("Using directory {} for file uploads.", 
uploadDir);
+               } else if 
(Files.isWritable(Files.createDirectories(uploadDir))) {
+                       log.info("Created directory {} for file uploads.", 
uploadDir);
+               } else {
+                       log.warn("Upload directory {} cannot be created or is 
not writable.", uploadDir);
+                       throw new IOException(
+                               String.format("Upload directory %s cannot be 
created or is not writable.",
+                                       uploadDir));
+               }
+       }
+
+       /**
         * Comparator for Rest URLs.
         *
         * <p>The comparator orders the Rest URLs such that URLs with path 
parameters are ordered behind

http://git-wip-us.apache.org/repos/asf/flink/blob/ab8316f3/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 5e4c72b..10ad344 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -163,7 +163,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
                        Executor executor,
                        MetricQueryServiceRetriever metricQueryServiceRetriever,
                        LeaderElectionService leaderElectionService,
-                       FatalErrorHandler fatalErrorHandler) {
+                       FatalErrorHandler fatalErrorHandler) throws IOException 
{
                super(endpointConfiguration);
                this.leaderRetriever = 
Preconditions.checkNotNull(leaderRetriever);
                this.clusterConfiguration = 
Preconditions.checkNotNull(clusterConfiguration);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab8316f3/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
deleted file mode 100644
index 61277ae..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest;
-
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Assume;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * Tests for {@link FileUploadHandler}.
- */
-public class FileUploadHandlerTest extends TestLogger {
-
-       @Rule
-       public TemporaryFolder temporaryFolder = new TemporaryFolder();
-
-       @Test
-       public void testCreateUploadDir() throws Exception {
-               final File file = temporaryFolder.newFolder();
-               final Path testUploadDir = 
file.toPath().resolve("testUploadDir");
-               assertFalse(Files.exists(testUploadDir));
-               FileUploadHandler.createUploadDir(testUploadDir);
-               assertTrue(Files.exists(testUploadDir));
-       }
-
-       @Test
-       public void testCreateUploadDirFails() throws Exception {
-               final File file = temporaryFolder.newFolder();
-               Assume.assumeTrue(file.setWritable(false));
-
-               final Path testUploadDir = 
file.toPath().resolve("testUploadDir");
-               assertFalse(Files.exists(testUploadDir));
-               try {
-                       FileUploadHandler.createUploadDir(testUploadDir);
-                       fail("Expected exception not thrown.");
-               } catch (IOException e) {
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ab8316f3/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
index 735b12d..50b26e3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
@@ -303,7 +303,7 @@ public class RestServerEndpointITCase extends TestLogger {
                TestRestServerEndpoint(
                        RestServerEndpointConfiguration configuration,
                        TestHandler testHandler,
-                       TestUploadHandler testUploadHandler) {
+                       TestUploadHandler testUploadHandler) throws IOException 
{
                        super(configuration);
 
                        this.testHandler = 
Preconditions.checkNotNull(testHandler);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab8316f3/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointTest.java
index 866dfbf..de32883 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointTest.java
@@ -21,14 +21,25 @@ package org.apache.flink.runtime.rest;
 import org.apache.flink.testutils.category.Flip6;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.Assume;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.helpers.NOPLogger;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Test cases for the {@link RestServerEndpoint}.
@@ -36,6 +47,9 @@ import static org.junit.Assert.assertEquals;
 @Category(Flip6.class)
 public class RestServerEndpointTest extends TestLogger {
 
+       @Rule
+       public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
        /**
         * Tests that the REST handler URLs are properly sorted.
         */
@@ -63,4 +77,27 @@ public class RestServerEndpointTest extends TestLogger {
 
                assertEquals(expected, handlerUrls);
        }
+
+       @Test
+       public void testCreateUploadDir() throws Exception {
+               final File file = temporaryFolder.newFolder();
+               final Path testUploadDir = 
file.toPath().resolve("testUploadDir");
+               assertFalse(Files.exists(testUploadDir));
+               RestServerEndpoint.createUploadDir(testUploadDir, 
NOPLogger.NOP_LOGGER);
+               assertTrue(Files.exists(testUploadDir));
+       }
+
+       @Test
+       public void testCreateUploadDirFails() throws Exception {
+               final File file = temporaryFolder.newFolder();
+               Assume.assumeTrue(file.setWritable(false));
+
+               final Path testUploadDir = 
file.toPath().resolve("testUploadDir");
+               assertFalse(Files.exists(testUploadDir));
+               try {
+                       RestServerEndpoint.createUploadDir(testUploadDir, 
NOPLogger.NOP_LOGGER);
+                       fail("Expected exception not thrown.");
+               } catch (IOException e) {
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab8316f3/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index 5a5fb74..05be03a 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -55,6 +55,8 @@ import java.util.concurrent.TimeUnit;
 
 import scala.concurrent.duration.FiniteDuration;
 
+import static org.junit.Assume.assumeTrue;
+
 /**
  * Tests that verify correct HA behavior.
  */
@@ -104,6 +106,7 @@ public class YARNHighAvailabilityITCase extends 
YarnTestBase {
         */
        @Test
        public void testMultipleAMKill() throws Exception {
+               assumeTrue("This test only works with the old actor based 
code.", !flip6);
                final int numberKillingAttempts = numberApplicationAttempts - 1;
                String confDirPath = 
System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
                final Configuration configuration = 
GlobalConfiguration.loadConfiguration();

http://git-wip-us.apache.org/repos/asf/flink/blob/ab8316f3/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index c806c5e..3a674ad 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -78,6 +78,7 @@ import java.util.regex.Pattern;
 import static junit.framework.TestCase.assertTrue;
 import static org.apache.flink.yarn.UtilsTest.addTestAppender;
 import static org.apache.flink.yarn.UtilsTest.checkForLogString;
+import static org.junit.Assume.assumeTrue;
 
 /**
  * This test starts a MiniYARNCluster with a CapacityScheduler.
@@ -101,6 +102,7 @@ public class YARNSessionCapacitySchedulerITCase extends 
YarnTestBase {
         */
        @Test
        public void testClientStartup() throws IOException {
+               assumeTrue("Flip-6 does not start TMs upfront.", !flip6);
                LOG.info("Starting testClientStartup()");
                runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), 
"-t", flinkLibFolder.getAbsolutePath(),
                                                "-n", "1",
@@ -130,7 +132,7 @@ public class YARNSessionCapacitySchedulerITCase extends 
YarnTestBase {
                                "-yjm", "768",
                                "-ytm", "1024", 
exampleJarLocation.getAbsolutePath()},
                        /* test succeeded after this string */
-                       "Job execution complete",
+                       "Program execution finished",
                        /* prohibited strings: (to verify the parallelism) */
                        // (we should see "DataSink (...) (1/2)" and "DataSink 
(...) (2/2)" instead)
                        new String[]{"DataSink \\(.*\\) \\(1/1\\) switched to 
FINISHED"},
@@ -177,7 +179,7 @@ public class YARNSessionCapacitySchedulerITCase extends 
YarnTestBase {
                                "-yD", "taskmanager.memory.size=" + 
offHeapMemory,
                                "-yD", "taskmanager.memory.preallocate=true", 
exampleJarLocation.getAbsolutePath()},
                        /* test succeeded after this string */
-                       "Job execution complete",
+                       "Program execution finished",
                        /* prohibited strings: (to verify the parallelism) */
                        // (we should see "DataSink (...) (1/2)" and "DataSink 
(...) (2/2)" instead)
                        new String[]{"DataSink \\(.*\\) \\(1/1\\) switched to 
FINISHED"},
@@ -190,6 +192,7 @@ public class YARNSessionCapacitySchedulerITCase extends 
YarnTestBase {
         */
        @Test(timeout = 100000) // timeout after 100 seconds
        public void testTaskManagerFailure() throws Exception {
+               assumeTrue("Flip-6 does not start TMs upfront.", !flip6);
                LOG.info("Starting testTaskManagerFailure()");
                Runner runner = startWithArgs(new String[]{"-j", 
flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),
                                "-n", "1",
@@ -402,7 +405,7 @@ public class YARNSessionCapacitySchedulerITCase extends 
YarnTestBase {
                                "-yjm", "768",
                                "-ytm", "1024", 
exampleJarLocation.getAbsolutePath()},
                                /* test succeeded after this string */
-                       "Job execution complete",
+                       "Program execution finished",
                        /* prohibited strings: (we want to see "DataSink (...) 
(2/2) switched to FINISHED") */
                        new String[]{"DataSink \\(.*\\) \\(1/1\\) switched to 
FINISHED"},
                        RunTypes.CLI_FRONTEND, 0, true);
@@ -477,7 +480,8 @@ public class YARNSessionCapacitySchedulerITCase extends 
YarnTestBase {
                                "-yD", "yarn.tags=test-tag",
                                "-ytm", "1024",
                                "-ys", "2", // test requesting slots from YARN.
-                               "--yarndetached", job,
+                               "-p", "2",
+                               "--detached", job,
                                "--input", 
tmpInFile.getAbsoluteFile().toString(),
                                "--output", 
tmpOutFolder.getAbsoluteFile().toString()},
                        "Job has been submitted with JobID",

http://git-wip-us.apache.org/repos/asf/flink/blob/ab8316f3/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index 975dd28..e545187 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -98,10 +98,12 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
                runner.join();
                checkForLogString("The Flink YARN client has been started in 
detached mode");
 
-               LOG.info("Waiting until two containers are running");
-               // wait until two containers are running
-               while (getRunningContainers() < 2) {
-                       sleep(500);
+               if (!flip6) {
+                       LOG.info("Waiting until two containers are running");
+                       // wait until two containers are running
+                       while (getRunningContainers() < 2) {
+                               sleep(500);
+                       }
                }
 
                //additional sleep for the JM/TM to start and establish 
connection

http://git-wip-us.apache.org/repos/asf/flink/blob/ab8316f3/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index eeda32d..c863e14 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -20,6 +20,7 @@ package org.apache.flink.yarn;
 
 import org.apache.flink.client.cli.CliFrontend;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.Preconditions;
@@ -187,6 +188,7 @@ public abstract class YarnTestBase extends TestLogger {
 
        private YarnClient yarnClient = null;
        protected org.apache.flink.configuration.Configuration 
flinkConfiguration;
+       protected boolean flip6;
 
        @Before
        public void checkClusterEmpty() throws IOException, YarnException {
@@ -207,6 +209,8 @@ public abstract class YarnTestBase extends TestLogger {
                }
 
                flinkConfiguration = new 
org.apache.flink.configuration.Configuration();
+
+               flip6 = 
CoreOptions.FLIP6_MODE.equalsIgnoreCase(flinkConfiguration.getString(CoreOptions.MODE));
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab8316f3/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index d07ac5a..87324cb 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -432,7 +432,7 @@ public class YarnResourceManager extends 
ResourceManager<YarnWorkerNode> impleme
                final ContaineredTaskManagerParameters taskManagerParameters =
                                
ContaineredTaskManagerParameters.create(flinkConfig, resource.getMemory(), 1);
 
-               log.info("TaskExecutor{} will be started with container size {} 
MB, JVM heap size {} MB, " +
+               log.info("TaskExecutor {} will be started with container size 
{} MB, JVM heap size {} MB, " +
                                "JVM direct memory limit {} MB",
                                containerId,
                                
taskManagerParameters.taskManagerTotalMemoryMB(),

http://git-wip-us.apache.org/repos/asf/flink/blob/ab8316f3/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index a289b66..8882236 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -365,7 +365,7 @@ public class FlinkYarnSessionCli extends 
AbstractCustomCommandLine<ApplicationId
        }
 
        private ClusterSpecification createClusterSpecification(Configuration 
configuration, CommandLine cmd) {
-               if (!cmd.hasOption(container.getOpt())) { // number of 
containers is required option!
+               if (!flip6 && !cmd.hasOption(container.getOpt())) { // number 
of containers is required option!
                        LOG.error("Missing required argument {}", 
container.getOpt());
                        printUsage();
                        throw new IllegalArgumentException("Missing required 
argument " + container.getOpt());

Reply via email to