[FLINK-8562] [tests] Fix YARNSessionFIFOSecuredITCase Before the YARNSessionFIFOSecuredITCase also passed without Kerberos being active.
This closes #5416. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4b66514f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4b66514f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4b66514f Branch: refs/heads/release-1.5 Commit: 4b66514f2c31d5ea29493baf9d022a0115faf82d Parents: 69e5d14 Author: Shuyi Chen <[email protected]> Authored: Tue Feb 6 00:50:21 2018 -0800 Committer: Till Rohrmann <[email protected]> Committed: Thu Mar 22 18:33:01 2018 +0100 ---------------------------------------------------------------------- .../flink/yarn/YARNSessionFIFOITCase.java | 13 +++- .../yarn/YARNSessionFIFOSecuredITCase.java | 13 ++++ .../flink/yarn/YarnConfigurationITCase.java | 2 +- .../org/apache/flink/yarn/YarnTestBase.java | 79 +++++++++++++++----- 4 files changed, 86 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4b66514f/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java index b3dcaca..464e73c 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java @@ -49,6 +49,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.EnumSet; import java.util.List; +import java.util.concurrent.TimeUnit; import static org.apache.flink.yarn.UtilsTest.addTestAppender; import static org.apache.flink.yarn.UtilsTest.checkForLogString; @@ -106,8 +107,16 @@ public class YARNSessionFIFOITCase extends YarnTestBase { } } - //additional sleep for the JM/TM to start and establish connection - sleep(2000); + // additional sleep for the JM/TM to start and establish connection + long startTime = System.nanoTime(); + while (System.nanoTime() - startTime < TimeUnit.NANOSECONDS.convert(10, TimeUnit.SECONDS) && + !(verifyStringsInNamedLogFiles( + new String[]{"YARN Application Master started"}, "jobmanager.log") && + verifyStringsInNamedLogFiles( + new String[]{"Starting TaskManager actor"}, "taskmanager.log"))) { + LOG.info("Still waiting for JM/TM to initialize..."); + sleep(500); + } LOG.info("Two containers are running. Killing the application"); // kill application "externally". http://git-wip-us.apache.org/repos/asf/flink/blob/4b66514f/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java index 3954f8a..18e1c3a 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java @@ -30,10 +30,12 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.Collections; import java.util.concurrent.Callable; @@ -97,6 +99,17 @@ public class YARNSessionFIFOSecuredITCase extends YARNSessionFIFOITCase { SecureTestEnvironment.cleanup(); } + @Override + public void testDetachedMode() throws InterruptedException, IOException { + super.testDetachedMode(); + if (!verifyStringsInNamedLogFiles( + new String[]{"Login successful for user", "using keytab file"}, "jobmanager.log") || + !verifyStringsInNamedLogFiles( + new String[]{"Login successful for user", "using keytab file"}, "taskmanager.log")) { + Assert.fail("Can not find expected strings in log files."); + } + } + /* For secure cluster testing, it is enough to run only one test and override below test methods * to keep the overall build time minimal */ http://git-wip-us.apache.org/repos/asf/flink/blob/4b66514f/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java index 2a1b099..635fdf3 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java @@ -78,7 +78,7 @@ public class YarnConfigurationITCase extends YarnTestBase { @Test(timeout = 60000) public void testFlinkContainerMemory() throws Exception { final YarnClient yarnClient = getYarnClient(); - final Configuration configuration = new Configuration(flinkConfiguration); + final Configuration configuration = new Configuration(flinkConfiguration.clone()); final int masterMemory = 64; final int taskManagerMemory = 128; http://git-wip-us.apache.org/repos/asf/flink/blob/4b66514f/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java index 3ec805e..803f89c 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java @@ -21,7 +21,9 @@ package org.apache.flink.yarn; import org.apache.flink.client.cli.CliFrontend; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.SecurityOptions; +import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; @@ -57,7 +59,6 @@ import org.slf4j.MarkerFactory; import javax.annotation.Nullable; -import java.io.BufferedWriter; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileNotFoundException; @@ -68,17 +69,20 @@ import java.io.InputStream; import java.io.PipedInputStream; import java.io.PipedOutputStream; import java.io.PrintStream; -import java.io.PrintWriter; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Scanner; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentMap; import java.util.regex.Pattern; +import static org.apache.flink.configuration.CoreOptions.OLD_MODE; + /** * This base class allows to use the MiniYARNCluster. * The cluster is re-used for all tests. @@ -145,7 +149,7 @@ public abstract class YarnTestBase extends TestLogger { private YarnClient yarnClient = null; - protected org.apache.flink.configuration.Configuration flinkConfiguration; + protected static org.apache.flink.configuration.Configuration flinkConfiguration; protected boolean flip6; @@ -213,8 +217,6 @@ public abstract class YarnTestBase extends TestLogger { } } - flinkConfiguration = new org.apache.flink.configuration.Configuration(); - flip6 = CoreOptions.FLIP6_MODE.equalsIgnoreCase(flinkConfiguration.getString(CoreOptions.MODE)); } @@ -397,6 +399,51 @@ public abstract class YarnTestBase extends TestLogger { } } + public static boolean verifyStringsInNamedLogFiles( + final String[] mustHave, final String fileName) { + List<String> mustHaveList = Arrays.asList(mustHave); + File cwd = new File("target/" + YARN_CONFIGURATION.get(TEST_CLUSTER_NAME_KEY)); + if (!cwd.exists() || !cwd.isDirectory()) { + return false; + } + + File foundFile = findFile(cwd.getAbsolutePath(), new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + if (fileName != null && !name.equals(fileName)) { + return false; + } + File f = new File(dir.getAbsolutePath() + "/" + name); + LOG.info("Searching in {}", f.getAbsolutePath()); + try { + Set<String> foundSet = new HashSet<>(mustHave.length); + Scanner scanner = new Scanner(f); + while (scanner.hasNextLine()) { + final String lineFromFile = scanner.nextLine(); + for (String str : mustHave) { + if (lineFromFile.contains(str)) { + foundSet.add(str); + } + } + if (foundSet.containsAll(mustHaveList)) { + return true; + } + } + } catch (FileNotFoundException e) { + LOG.warn("Unable to locate file: " + e.getMessage() + " file: " + f.getAbsolutePath()); + } + return false; + } + }); + + if (foundFile != null) { + LOG.info("Found string {} in {}.", Arrays.toString(mustHave), foundFile.getAbsolutePath()); + return true; + } else { + return false; + } + } + public static void sleep(int time) { try { Thread.sleep(time); @@ -465,27 +512,23 @@ public abstract class YarnTestBase extends TestLogger { File flinkConfDirPath = findFile(flinkDistRootDir, new ContainsName(new String[]{"flink-conf.yaml"})); Assert.assertNotNull(flinkConfDirPath); + flinkConfiguration = + GlobalConfiguration.loadConfiguration(); if (!StringUtils.isBlank(principal) && !StringUtils.isBlank(keytab)) { + //copy conf dir to test temporary workspace location tempConfPathForSecureRun = tmp.newFolder("conf"); String confDirPath = flinkConfDirPath.getParentFile().getAbsolutePath(); FileUtils.copyDirectory(new File(confDirPath), tempConfPathForSecureRun); - try (FileWriter fw = new FileWriter(new File(tempConfPathForSecureRun, "flink-conf.yaml"), true); - BufferedWriter bw = new BufferedWriter(fw); - PrintWriter out = new PrintWriter(bw)) { - - LOG.info("writing keytab: " + keytab + " and principal: " + principal + " to config file"); - out.println(""); - out.println("#Security Configurations Auto Populated "); - out.println(SecurityOptions.KERBEROS_LOGIN_KEYTAB.key() + ": " + keytab); - out.println(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL.key() + ": " + principal); - out.println(""); - } catch (IOException e) { - throw new RuntimeException("Exception occured while trying to append the security configurations.", e); - } + flinkConfiguration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB.key(), keytab); + flinkConfiguration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL.key(), principal); + flinkConfiguration.setString(CoreOptions.MODE.key(), OLD_MODE); + + BootstrapTools.writeConfiguration(flinkConfiguration, + new File(tempConfPathForSecureRun, "flink-conf.yaml")); String configDir = tempConfPathForSecureRun.getAbsolutePath();
