[FLINK-8562] [tests] Fix YARNSessionFIFOSecuredITCase

Before the YARNSessionFIFOSecuredITCase also passed without Kerberos being 
active.

This closes #5416.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4b66514f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4b66514f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4b66514f

Branch: refs/heads/release-1.5
Commit: 4b66514f2c31d5ea29493baf9d022a0115faf82d
Parents: 69e5d14
Author: Shuyi Chen <[email protected]>
Authored: Tue Feb 6 00:50:21 2018 -0800
Committer: Till Rohrmann <[email protected]>
Committed: Thu Mar 22 18:33:01 2018 +0100

----------------------------------------------------------------------
 .../flink/yarn/YARNSessionFIFOITCase.java       | 13 +++-
 .../yarn/YARNSessionFIFOSecuredITCase.java      | 13 ++++
 .../flink/yarn/YarnConfigurationITCase.java     |  2 +-
 .../org/apache/flink/yarn/YarnTestBase.java     | 79 +++++++++++++++-----
 4 files changed, 86 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4b66514f/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index b3dcaca..464e73c 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -49,6 +49,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.yarn.UtilsTest.addTestAppender;
 import static org.apache.flink.yarn.UtilsTest.checkForLogString;
@@ -106,8 +107,16 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
                        }
                }
 
-               //additional sleep for the JM/TM to start and establish 
connection
-               sleep(2000);
+               // additional sleep for the JM/TM to start and establish 
connection
+               long startTime = System.nanoTime();
+               while (System.nanoTime() - startTime < 
TimeUnit.NANOSECONDS.convert(10, TimeUnit.SECONDS) &&
+                               !(verifyStringsInNamedLogFiles(
+                                               new String[]{"YARN Application 
Master started"}, "jobmanager.log") &&
+                                               verifyStringsInNamedLogFiles(
+                                                               new 
String[]{"Starting TaskManager actor"}, "taskmanager.log"))) {
+                       LOG.info("Still waiting for JM/TM to initialize...");
+                       sleep(500);
+               }
                LOG.info("Two containers are running. Killing the application");
 
                // kill application "externally".

http://git-wip-us.apache.org/repos/asf/flink/blob/4b66514f/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
index 3954f8a..18e1c3a 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
@@ -30,10 +30,12 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.concurrent.Callable;
 
@@ -97,6 +99,17 @@ public class YARNSessionFIFOSecuredITCase extends 
YARNSessionFIFOITCase {
                SecureTestEnvironment.cleanup();
        }
 
+       @Override
+       public void testDetachedMode() throws InterruptedException, IOException 
{
+               super.testDetachedMode();
+               if (!verifyStringsInNamedLogFiles(
+                               new String[]{"Login successful for user", 
"using keytab file"}, "jobmanager.log") ||
+                               !verifyStringsInNamedLogFiles(
+                                               new String[]{"Login successful 
for user", "using keytab file"}, "taskmanager.log")) {
+                       Assert.fail("Can not find expected strings in log 
files.");
+               }
+       }
+
        /* For secure cluster testing, it is enough to run only one test and 
override below test methods
         * to keep the overall build time minimal
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/4b66514f/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
index 2a1b099..635fdf3 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
@@ -78,7 +78,7 @@ public class YarnConfigurationITCase extends YarnTestBase {
        @Test(timeout = 60000)
        public void testFlinkContainerMemory() throws Exception {
                final YarnClient yarnClient = getYarnClient();
-               final Configuration configuration = new 
Configuration(flinkConfiguration);
+               final Configuration configuration = new 
Configuration(flinkConfiguration.clone());
 
                final int masterMemory = 64;
                final int taskManagerMemory = 128;

http://git-wip-us.apache.org/repos/asf/flink/blob/4b66514f/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index 3ec805e..803f89c 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -21,7 +21,9 @@ package org.apache.flink.yarn;
 import org.apache.flink.client.cli.CliFrontend;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
@@ -57,7 +59,6 @@ import org.slf4j.MarkerFactory;
 
 import javax.annotation.Nullable;
 
-import java.io.BufferedWriter;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -68,17 +69,20 @@ import java.io.InputStream;
 import java.io.PipedInputStream;
 import java.io.PipedOutputStream;
 import java.io.PrintStream;
-import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Scanner;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 import java.util.regex.Pattern;
 
+import static org.apache.flink.configuration.CoreOptions.OLD_MODE;
+
 /**
  * This base class allows to use the MiniYARNCluster.
  * The cluster is re-used for all tests.
@@ -145,7 +149,7 @@ public abstract class YarnTestBase extends TestLogger {
 
        private YarnClient yarnClient = null;
 
-       protected org.apache.flink.configuration.Configuration 
flinkConfiguration;
+       protected static org.apache.flink.configuration.Configuration 
flinkConfiguration;
 
        protected boolean flip6;
 
@@ -213,8 +217,6 @@ public abstract class YarnTestBase extends TestLogger {
                        }
                }
 
-               flinkConfiguration = new 
org.apache.flink.configuration.Configuration();
-
                flip6 = 
CoreOptions.FLIP6_MODE.equalsIgnoreCase(flinkConfiguration.getString(CoreOptions.MODE));
        }
 
@@ -397,6 +399,51 @@ public abstract class YarnTestBase extends TestLogger {
                }
        }
 
+       public static boolean verifyStringsInNamedLogFiles(
+                       final String[] mustHave, final String fileName) {
+               List<String> mustHaveList = Arrays.asList(mustHave);
+               File cwd = new File("target/" + 
YARN_CONFIGURATION.get(TEST_CLUSTER_NAME_KEY));
+               if (!cwd.exists() || !cwd.isDirectory()) {
+                       return false;
+               }
+
+               File foundFile = findFile(cwd.getAbsolutePath(), new 
FilenameFilter() {
+                       @Override
+                       public boolean accept(File dir, String name) {
+                               if (fileName != null && !name.equals(fileName)) 
{
+                                       return false;
+                               }
+                               File f = new File(dir.getAbsolutePath() + "/" + 
name);
+                               LOG.info("Searching in {}", 
f.getAbsolutePath());
+                               try {
+                                       Set<String> foundSet = new 
HashSet<>(mustHave.length);
+                                       Scanner scanner = new Scanner(f);
+                                       while (scanner.hasNextLine()) {
+                                               final String lineFromFile = 
scanner.nextLine();
+                                               for (String str : mustHave) {
+                                                       if 
(lineFromFile.contains(str)) {
+                                                               
foundSet.add(str);
+                                                       }
+                                               }
+                                               if 
(foundSet.containsAll(mustHaveList)) {
+                                                       return true;
+                                               }
+                                       }
+                               } catch (FileNotFoundException e) {
+                                       LOG.warn("Unable to locate file: " + 
e.getMessage() + " file: " + f.getAbsolutePath());
+                               }
+                               return false;
+                       }
+               });
+
+               if (foundFile != null) {
+                       LOG.info("Found string {} in {}.", 
Arrays.toString(mustHave), foundFile.getAbsolutePath());
+                       return true;
+               } else {
+                       return false;
+               }
+       }
+
        public static void sleep(int time) {
                try {
                        Thread.sleep(time);
@@ -465,27 +512,23 @@ public abstract class YarnTestBase extends TestLogger {
 
                        File flinkConfDirPath = findFile(flinkDistRootDir, new 
ContainsName(new String[]{"flink-conf.yaml"}));
                        Assert.assertNotNull(flinkConfDirPath);
+                       flinkConfiguration =
+                                       GlobalConfiguration.loadConfiguration();
 
                        if (!StringUtils.isBlank(principal) && 
!StringUtils.isBlank(keytab)) {
+
                                //copy conf dir to test temporary workspace 
location
                                tempConfPathForSecureRun = 
tmp.newFolder("conf");
 
                                String confDirPath = 
flinkConfDirPath.getParentFile().getAbsolutePath();
                                FileUtils.copyDirectory(new File(confDirPath), 
tempConfPathForSecureRun);
 
-                               try (FileWriter fw = new FileWriter(new 
File(tempConfPathForSecureRun, "flink-conf.yaml"), true);
-                                       BufferedWriter bw = new 
BufferedWriter(fw);
-                                       PrintWriter out = new PrintWriter(bw)) {
-
-                                       LOG.info("writing keytab: " + keytab + 
" and principal: " + principal + " to config file");
-                                       out.println("");
-                                       out.println("#Security Configurations 
Auto Populated ");
-                                       
out.println(SecurityOptions.KERBEROS_LOGIN_KEYTAB.key() + ": " + keytab);
-                                       
out.println(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL.key() + ": " + principal);
-                                       out.println("");
-                               } catch (IOException e) {
-                                       throw new RuntimeException("Exception 
occured while trying to append the security configurations.", e);
-                               }
+                               
flinkConfiguration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB.key(), 
keytab);
+                               
flinkConfiguration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL.key(), 
principal);
+                               
flinkConfiguration.setString(CoreOptions.MODE.key(), OLD_MODE);
+
+                               
BootstrapTools.writeConfiguration(flinkConfiguration,
+                                               new 
File(tempConfPathForSecureRun, "flink-conf.yaml"));
 
                                String configDir = 
tempConfPathForSecureRun.getAbsolutePath();
 

Reply via email to