Repository: flink
Updated Branches:
  refs/heads/master e24a866bf -> 95765b6d8


[FLINK-5153] Add test for YARN application tags


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/95765b6d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/95765b6d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/95765b6d

Branch: refs/heads/master
Commit: 95765b6d8fdda9c601ad61dc39ce02043ecefa05
Parents: d9c116e
Author: Patrick Lucas <m...@patricklucas.com>
Authored: Tue Feb 7 12:47:21 2017 -0500
Committer: Robert Metzger <rmetz...@apache.org>
Committed: Wed Feb 8 18:39:47 2017 +0100

----------------------------------------------------------------------
 .../YARNSessionCapacitySchedulerITCase.java     | 43 ++++++++++++++++----
 1 file changed, 35 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/95765b6d/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index ec66eb2..2a3b6c6 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.google.common.base.Joiner;
+import com.google.common.collect.Sets;
 import org.apache.commons.io.FileUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -53,13 +54,9 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.FilenameFilter;
 import java.io.IOException;
-import java.util.EnumSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Arrays;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.*;
 import java.util.concurrent.ConcurrentMap;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -413,11 +410,14 @@ public class YARNSessionCapacitySchedulerITCase extends 
YarnTestBase {
                        throw new RuntimeException(e);
                }
 
-               Runner runner = startWithArgs(new String[]{"run", "-m", 
"yarn-cluster", "-yj", flinkUberjar.getAbsolutePath(),
+               Runner runner = startWithArgs(new String[]{
+                               "run", "-m", "yarn-cluster",
+                               "-yj", flinkUberjar.getAbsolutePath(),
                                "-yt", flinkLibFolder.getAbsolutePath(),
                                "-yn", "1",
                                "-yjm", "768",
                                "-yD", "yarn.heap-cutoff-ratio=0.5", // test if 
the cutoff is passed correctly
+                               "-yD", "yarn.tags=test-tag",
                                "-ytm", "1024",
                                "-ys", "2", // test requesting slots from YARN.
                                "--yarndetached", job,
@@ -516,6 +516,7 @@ public class YARNSessionCapacitySchedulerITCase extends 
YarnTestBase {
                                LOG.info("Got report {}", rep);
                        } while(rep.getYarnApplicationState() == 
YarnApplicationState.RUNNING);
 
+                       verifyApplicationTags(rep);
                } catch(Throwable t) {
                        LOG.warn("Error while detached yarn session was 
running", t);
                        Assert.fail(t.getMessage());
@@ -543,6 +544,32 @@ public class YARNSessionCapacitySchedulerITCase extends 
YarnTestBase {
                }
        }
 
+       /**
+        * Ensures that the YARN application tags were set properly.
+        *
+        * Since YARN application tags were only added in Hadoop 2.4, but Flink 
still supports Hadoop 2.3, reflection is
+        * required to invoke the methods. If the method does not exist, this 
test passes.
+        */
+       private void verifyApplicationTags(final ApplicationReport report) 
throws InvocationTargetException,
+               IllegalAccessException {
+
+               final Method applicationTagsMethod;
+
+               Class<ApplicationReport> clazz = ApplicationReport.class;
+               try {
+                       // this method is only supported by Hadoop 2.4.0 onwards
+                       applicationTagsMethod = 
clazz.getMethod("getApplicationTags");
+               } catch (NoSuchMethodException e) {
+                       // only verify the tags if the method exists
+                       return;
+               }
+
+               @SuppressWarnings("unchecked")
+               Set<String> applicationTags = (Set<String>) 
applicationTagsMethod.invoke(report);
+
+               Assert.assertEquals(applicationTags, 
Sets.newHashSet("test-tag"));
+       }
+
        @After
        public void checkForProhibitedLogContents() {
                ensureNoProhibitedStringInLogFiles(PROHIBITED_STRINGS, 
WHITELISTED_STRINGS);

Reply via email to