[hotfix] [yarn tests] Fix deadlock between YARN Session CLI tests and Surefire
The Surefire Plugin uses stdin to communicate with forked JVMs for tests. The YARN Session CLI tests also try to read the stdin stream. The tests deadlock since Surefire never releases the stdin locks during the lifetime of a test. This change adds a parameter whether the YARN Session CLI should try to read user console input, and sets this to false in the integration tests. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/da23ee38 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/da23ee38 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/da23ee38 Branch: refs/heads/master Commit: da23ee38e5b36ddf26a6a5a807efbbbcbfe1d517 Parents: 6511557 Author: Stephan Ewen <se...@apache.org> Authored: Mon May 30 19:49:25 2016 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Mon May 30 19:49:25 2016 +0200 ---------------------------------------------------------------------- .../flink/client/FlinkYarnSessionCli.java | 24 +++++++++++++------- .../flink/client/cli/CliFrontendParser.java | 2 +- .../flink/yarn/FlinkYarnSessionCliTest.java | 2 +- .../org/apache/flink/yarn/YarnTestBase.java | 2 +- .../src/test/resources/log4j-test.properties | 3 ++- 5 files changed, 21 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/da23ee38/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java index 91f8df2..bb61ffb 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java +++ b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java @@ -82,6 +82,8 @@ public class FlinkYarnSessionCli { */ private final Option DYNAMIC_PROPERTIES; + private final boolean acceptInteractiveInput; + //------------------------------------ Internal fields ------------------------- private AbstractFlinkYarnCluster yarnCluster = null; private boolean detachedMode = false; @@ -89,7 +91,9 @@ public class FlinkYarnSessionCli { /** Default yarn application name. */ private String defaultApplicationName = null; - public FlinkYarnSessionCli(String shortPrefix, String longPrefix) { + public FlinkYarnSessionCli(String shortPrefix, String longPrefix, boolean acceptInteractiveInput) { + this.acceptInteractiveInput = acceptInteractiveInput; + QUERY = new Option(shortPrefix + "q", longPrefix + "query", false, "Display available YARN resources (memory, cores)"); QUEUE = new Option(shortPrefix + "qu", longPrefix + "queue", true, "Specify YARN queue."); SHIP_PATH = new Option(shortPrefix + "t", longPrefix + "ship", true, "Ship files in the specified directory (t for transfer)"); @@ -292,7 +296,7 @@ public class FlinkYarnSessionCli { propertiesFile.setReadable(true, false); // readable for all. } - public static void runInteractiveCli(AbstractFlinkYarnCluster yarnCluster) { + public static void runInteractiveCli(AbstractFlinkYarnCluster yarnCluster, boolean readConsoleInput) { final String HELP = "Available commands:\n" + "help - show these commands\n" + "stop - stop the YARN session"; @@ -304,6 +308,8 @@ public class FlinkYarnSessionCli { // ------------------ check if there are updates by the cluster ----------- GetClusterStatusResponse status = yarnCluster.getClusterStatus(); + LOG.debug("Received status message: {}", status); + if (status != null && numTaskmanagers != status.numRegisteredTaskManagers()) { System.err.println("Number of connected TaskManagers changed to " + status.numRegisteredTaskManagers() + ". " + @@ -324,15 +330,16 @@ public class FlinkYarnSessionCli { yarnCluster.shutdown(true); } - // wait until CLIENT_POLLING_INTERVALL is over or the user entered something. + // wait until CLIENT_POLLING_INTERVAL is over or the user entered something. long startTime = System.currentTimeMillis(); while ((System.currentTimeMillis() - startTime) < CLIENT_POLLING_INTERVALL * 1000 - && !in.ready()) { + && (!readConsoleInput || !in.ready())) + { Thread.sleep(200); } //------------- handle interactive command by user. ---------------------- - - if (in.ready()) { + + if (readConsoleInput && in.ready()) { String command = in.readLine(); switch (command) { case "quit": @@ -347,6 +354,7 @@ public class FlinkYarnSessionCli { break; } } + if (yarnCluster.hasBeenStopped()) { LOG.info("Stopping interactive command line interface, YARN cluster has been stopped."); break; @@ -358,7 +366,7 @@ public class FlinkYarnSessionCli { } public static void main(String[] args) { - FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", ""); // no prefix for the YARN session + FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", "", true); // no prefix for the YARN session System.exit(cli.run(args)); } @@ -458,7 +466,7 @@ public class FlinkYarnSessionCli { "Please also note that the temporary files of the YARN session in {} will not be removed.", flinkYarnClient.getSessionFilesDir()); } else { - runInteractiveCli(yarnCluster); + runInteractiveCli(yarnCluster, acceptInteractiveInput); if (!yarnCluster.hasBeenStopped()) { LOG.info("Command Line Interface requested session shutdown"); http://git-wip-us.apache.org/repos/asf/flink/blob/da23ee38/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java index 2ac53d2..b75952e 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java @@ -35,7 +35,7 @@ public class CliFrontendParser { /** command line interface of the YARN session, with a special initialization here * to prefix all options with y/yarn. */ - private static final FlinkYarnSessionCli yarnSessionCLi = new FlinkYarnSessionCli("y", "yarn"); + private static final FlinkYarnSessionCli yarnSessionCLi = new FlinkYarnSessionCli("y", "yarn", true); static final Option HELP_OPTION = new Option("h", "help", false, http://git-wip-us.apache.org/repos/asf/flink/blob/da23ee38/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java index 30116af..7197b64 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java @@ -53,7 +53,7 @@ public class FlinkYarnSessionCliTest { map.put("FLINK_CONF_DIR", tmpFolder.getAbsolutePath()); TestBaseUtils.setEnv(map); Options options = new Options(); - FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", ""); + FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", "", false); cli.getYARNSessionCLIOptions(options); CommandLineParser parser = new PosixParser(); http://git-wip-us.apache.org/repos/asf/flink/blob/da23ee38/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java index fc1e5bc..03ab647 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java @@ -562,7 +562,7 @@ public abstract class YarnTestBase extends TestLogger { public void run() { switch(type) { case YARN_SESSION: - yCli = new FlinkYarnSessionCli("", ""); + yCli = new FlinkYarnSessionCli("", "", false); returnValue = yCli.run(args); break; case CLI_FRONTEND: http://git-wip-us.apache.org/repos/asf/flink/blob/da23ee38/flink-yarn-tests/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/resources/log4j-test.properties b/flink-yarn-tests/src/test/resources/log4j-test.properties index ebe0d37..e94ca26 100644 --- a/flink-yarn-tests/src/test/resources/log4j-test.properties +++ b/flink-yarn-tests/src/test/resources/log4j-test.properties @@ -30,6 +30,7 @@ log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console log4j.logger.org.apache.flink.yarn.YARNSessionFIFOITCase=INFO log4j.logger.org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase=INFO log4j.logger.org.apache.flink.yarn.YarnHighAvailability=INFO -log4j.logger.org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch=INFO +log4j.logger.org.apache.hadoop=OFF log4j.logger.org.apache.flink.runtime.leaderelection=INFO log4j.logger.org.apache.flink.runtime.leaderretrieval=INFO +