Repository: flink Updated Branches: refs/heads/master e24a866bf -> 95765b6d8
[FLINK-5153] Add test for YARN application tags Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/95765b6d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/95765b6d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/95765b6d Branch: refs/heads/master Commit: 95765b6d8fdda9c601ad61dc39ce02043ecefa05 Parents: d9c116e Author: Patrick Lucas <m...@patricklucas.com> Authored: Tue Feb 7 12:47:21 2017 -0500 Committer: Robert Metzger <rmetz...@apache.org> Committed: Wed Feb 8 18:39:47 2017 +0100 ---------------------------------------------------------------------- .../YARNSessionCapacitySchedulerITCase.java | 43 ++++++++++++++++---- 1 file changed, 35 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/95765b6d/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java index ec66eb2..2a3b6c6 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import com.google.common.base.Joiner; +import com.google.common.collect.Sets; import org.apache.commons.io.FileUtils; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.GlobalConfiguration; @@ -53,13 +54,9 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.FilenameFilter; import java.io.IOException; -import java.util.EnumSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Collections; -import java.util.Comparator; -import java.util.Arrays; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.*; import java.util.concurrent.ConcurrentMap; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -413,11 +410,14 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase { throw new RuntimeException(e); } - Runner runner = startWithArgs(new String[]{"run", "-m", "yarn-cluster", "-yj", flinkUberjar.getAbsolutePath(), + Runner runner = startWithArgs(new String[]{ + "run", "-m", "yarn-cluster", + "-yj", flinkUberjar.getAbsolutePath(), "-yt", flinkLibFolder.getAbsolutePath(), "-yn", "1", "-yjm", "768", "-yD", "yarn.heap-cutoff-ratio=0.5", // test if the cutoff is passed correctly + "-yD", "yarn.tags=test-tag", "-ytm", "1024", "-ys", "2", // test requesting slots from YARN. "--yarndetached", job, @@ -516,6 +516,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase { LOG.info("Got report {}", rep); } while(rep.getYarnApplicationState() == YarnApplicationState.RUNNING); + verifyApplicationTags(rep); } catch(Throwable t) { LOG.warn("Error while detached yarn session was running", t); Assert.fail(t.getMessage()); @@ -543,6 +544,32 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase { } } + /** + * Ensures that the YARN application tags were set properly. + * + * Since YARN application tags were only added in Hadoop 2.4, but Flink still supports Hadoop 2.3, reflection is + * required to invoke the methods. If the method does not exist, this test passes. + */ + private void verifyApplicationTags(final ApplicationReport report) throws InvocationTargetException, + IllegalAccessException { + + final Method applicationTagsMethod; + + Class<ApplicationReport> clazz = ApplicationReport.class; + try { + // this method is only supported by Hadoop 2.4.0 onwards + applicationTagsMethod = clazz.getMethod("getApplicationTags"); + } catch (NoSuchMethodException e) { + // only verify the tags if the method exists + return; + } + + @SuppressWarnings("unchecked") + Set<String> applicationTags = (Set<String>) applicationTagsMethod.invoke(report); + + Assert.assertEquals(applicationTags, Sets.newHashSet("test-tag")); + } + @After public void checkForProhibitedLogContents() { ensureNoProhibitedStringInLogFiles(PROHIBITED_STRINGS, WHITELISTED_STRINGS);