Repository: tez Updated Branches: refs/heads/master e02e4f721 -> 9930011b0
TEZ-3357. Change TimelineCachePlugin to handle DAG grouping. (Harish Jaiprakash via hitesh) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/9930011b Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/9930011b Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/9930011b Branch: refs/heads/master Commit: 9930011b0f05d56ece049867a71fb7eebfe6442e Parents: e02e4f7 Author: Hitesh Shah <[email protected]> Authored: Thu Jul 21 11:23:33 2016 -0700 Committer: Hitesh Shah <[email protected]> Committed: Thu Jul 21 11:23:33 2016 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../apache/tez/dag/api/TezConfiguration.java | 34 +++- .../org/apache/tez/dag/records/TezDAGID.java | 19 +++ .../logging/ats/TimelineCachePluginImpl.java | 110 ++++++++---- .../ats/TestTimelineCachePluginImpl.java | 170 ++++++++++++++++--- 5 files changed, 274 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/9930011b/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index dd89640..a925104 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3357. Change TimelineCachePlugin to handle DAG grouping. TEZ-3348. NullPointerException in Tez MROutput while trying to write using Parquet's DeprecatedParquetOutputFormat. TEZ-3356. Fix initializing of stats when custom ShuffleVertexManager is used. TEZ-3235. Modify Example TestOrderedWordCount job to test the IPC limit for large dag plans. @@ -85,6 +86,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3357. Change TimelineCachePlugin to handle DAG grouping. TEZ-3348. NullPointerException in Tez MROutput while trying to write using Parquet's DeprecatedParquetOutputFormat. TEZ-3356. Fix initializing of stats when custom ShuffleVertexManager is used. TEZ-3329. Tez ATS data is incomplete for a vertex which fails or gets killed before initialization http://git-wip-us.apache.org/repos/asf/tez/blob/9930011b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 1de2eda..11c50cf 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -1229,6 +1229,38 @@ public class TezConfiguration extends Configuration { "org.apache.tez.dag.history.logging.impl.SimpleHistoryLoggingService"; /** + * Comma separated list of Integers. These are the values that were set for the config value + * for {@value #TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP}. The older values are required so that + * the groupIds generated previously will continue to be generated by the plugin. If an older + * value is not present then the UI may not show information for DAGs which were created + * with a different grouping value. + * + * Note: Do not add too many values here as it will affect the performance of Yarn Timeline + * Server/Tez UI due to the need to scan for more log files. + */ + @Private + @Unstable + @ConfigurationScope(Scope.AM) + @ConfigurationProperty + public static final String TEZ_HISTORY_LOGGING_TIMELINE_CACHE_PLUGIN_OLD_NUM_DAGS_PER_GROUP = + TEZ_PREFIX + "history.logging.timeline-cache-plugin.old-num-dags-per-group"; + + /** + * Integer value. Number of DAGs to be grouped together. This is used by the history logging + * service to generate groupIds such that numDagsPerGroup will have same groupId in a given + * session. If the value is set to 1 then we disable grouping. This config is used to control the + * number of DAGs written into one log file, and hence controls number of files created in + * the Filesystem used by YARN Timeline. + */ + @Private + @Unstable + @ConfigurationScope(Scope.AM) + @ConfigurationProperty(type="integer") + public static final String TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP = + TEZ_PREFIX + "history.timeline.num-dags-per-group"; + public static final int TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP_DEFAULT = 1; + + /** * String value. The directory into which history data will be written. This defaults to the * container logging directory. This is relevant only when SimpleHistoryLoggingService is being * used for {@link TezConfiguration#TEZ_HISTORY_LOGGING_SERVICE_CLASS} @@ -1237,7 +1269,7 @@ public class TezConfiguration extends Configuration { @ConfigurationProperty public static final String TEZ_SIMPLE_HISTORY_LOGGING_DIR = TEZ_PREFIX + "simple.history.logging.dir"; - + /** * Int value. Maximum errors allowed while logging history data. After crossing this limit history * logging gets disabled. The job continues to run after this. http://git-wip-us.apache.org/repos/asf/tez/blob/9930011b/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java index 3828890..58ab509 100644 --- a/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java +++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java @@ -176,6 +176,25 @@ public class TezDAGID extends TezID { return appendTo(new StringBuilder(DAG)).toString(); } + // The groupId prefix. + private static final String DAG_GROUPID_PREFIX = "daggroup"; + + /** + * Generate a DAG group id which groups multiple DAGs into one group. + * + * @param numDagsPerGroup The number of DAGs present in one group. + * @return The group id to be used for grouping numDagsPerGroup into one group. + */ + public String getGroupId(int numDagsPerGroup) { + if (numDagsPerGroup <= 1) { + throw new IllegalArgumentException("numDagsPerGroup has to be more than one. Got: " + numDagsPerGroup); + } + return DAG_GROUPID_PREFIX + SEPARATOR + + getApplicationId().getClusterTimestamp() + SEPARATOR + + tezAppIdFormat.get().format(getApplicationId().getId()) + SEPARATOR + + tezDagIdFormat.get().format(getId() / numDagsPerGroup); + } + public static TezDAGID fromString(String dagId) { int id = 0; int appId = 0; http://git-wip-us.apache.org/repos/asf/tez/blob/9930011b/tez-plugins/tez-yarn-timeline-cache-plugin/src/main/java/org/apache/tez/dag/history/logging/ats/TimelineCachePluginImpl.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-cache-plugin/src/main/java/org/apache/tez/dag/history/logging/ats/TimelineCachePluginImpl.java b/tez-plugins/tez-yarn-timeline-cache-plugin/src/main/java/org/apache/tez/dag/history/logging/ats/TimelineCachePluginImpl.java index d81f56a..b4217a1 100644 --- a/tez-plugins/tez-yarn-timeline-cache-plugin/src/main/java/org/apache/tez/dag/history/logging/ats/TimelineCachePluginImpl.java +++ b/tez-plugins/tez-yarn-timeline-cache-plugin/src/main/java/org/apache/tez/dag/history/logging/ats/TimelineCachePluginImpl.java @@ -23,20 +23,26 @@ import java.util.HashSet; import java.util.Set; import java.util.SortedSet; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId; import org.apache.hadoop.yarn.server.timeline.NameValuePair; import org.apache.hadoop.yarn.server.timeline.TimelineEntityGroupPlugin; +import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.history.logging.EntityTypes; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.collect.Sets; -public class TimelineCachePluginImpl extends TimelineEntityGroupPlugin { +public class TimelineCachePluginImpl extends TimelineEntityGroupPlugin implements Configurable { + private static final Logger LOG = LoggerFactory.getLogger(TimelineCachePluginImpl.class); private static Set<String> summaryEntityTypes; private static Set<String> knownEntityTypes; @@ -54,11 +60,27 @@ public class TimelineCachePluginImpl extends TimelineEntityGroupPlugin { EntityTypes.TEZ_APPLICATION.name()); } + private Configuration conf; + + private Set<Integer> allNumGroupsPerDag; + // Empty public constructor public TimelineCachePluginImpl() { + setConf(new TezConfiguration()); + } + + private Set<TimelineEntityGroupId> createTimelineEntityGroupIds(TezDAGID dagId) { + ApplicationId appId = dagId.getApplicationId(); + HashSet<TimelineEntityGroupId> groupIds = Sets.newHashSet( + TimelineEntityGroupId.newInstance(appId, appId.toString()), + TimelineEntityGroupId.newInstance(appId, dagId.toString())); + for (int numGroupsPerDag : allNumGroupsPerDag) { + groupIds.add(TimelineEntityGroupId.newInstance(appId, dagId.getGroupId(numGroupsPerDag))); + } + return groupIds; } - private TimelineEntityGroupId convertToTimelineEntityGroupId(String entityType, String entityId) { + private Set<TimelineEntityGroupId> convertToTimelineEntityGroupIds(String entityType, String entityId) { if (entityType == null || entityType.isEmpty() || entityId == null || entityId.isEmpty()) { return null; @@ -66,27 +88,23 @@ public class TimelineCachePluginImpl extends TimelineEntityGroupPlugin { if (entityType.equals(EntityTypes.TEZ_DAG_ID.name())) { TezDAGID dagId = TezDAGID.fromString(entityId); if (dagId != null) { - return TimelineEntityGroupId.newInstance(dagId.getApplicationId(), dagId.toString()); + return createTimelineEntityGroupIds(dagId); } } else if (entityType.equals(EntityTypes.TEZ_VERTEX_ID.name())) { TezVertexID vertexID = TezVertexID.fromString(entityId); if (vertexID != null) { - return TimelineEntityGroupId.newInstance(vertexID.getDAGId().getApplicationId(), - vertexID.getDAGId().toString()); + return createTimelineEntityGroupIds(vertexID.getDAGId()); } } else if (entityType.equals(EntityTypes.TEZ_TASK_ID.name())) { TezTaskID taskID = TezTaskID.fromString(entityId); if (taskID != null) { - return TimelineEntityGroupId.newInstance(taskID.getVertexID().getDAGId().getApplicationId(), - taskID.getVertexID().getDAGId().toString()); + return createTimelineEntityGroupIds(taskID.getVertexID().getDAGId()); } } else if (entityType.equals(EntityTypes.TEZ_TASK_ATTEMPT_ID.name())) { TezTaskAttemptID taskAttemptID = TezTaskAttemptID.fromString(entityId); if (taskAttemptID != null) { - return TimelineEntityGroupId.newInstance( - taskAttemptID.getTaskID().getVertexID().getDAGId().getApplicationId(), - taskAttemptID.getTaskID().getVertexID().getDAGId().toString()); + return createTimelineEntityGroupIds(taskAttemptID.getTaskID().getVertexID().getDAGId()); } } else if (entityType.equals(EntityTypes.TEZ_CONTAINER_ID.name())) { String cId = entityId; @@ -95,9 +113,9 @@ public class TimelineCachePluginImpl extends TimelineEntityGroupPlugin { } ContainerId containerId = ContainerId.fromString(cId); if (containerId != null) { - return TimelineEntityGroupId.newInstance( + return Sets.newHashSet(TimelineEntityGroupId.newInstance( containerId.getApplicationAttemptId().getApplicationId(), - containerId.getApplicationAttemptId().getApplicationId().toString()); + containerId.getApplicationAttemptId().getApplicationId().toString())); } } return null; @@ -113,15 +131,7 @@ public class TimelineCachePluginImpl extends TimelineEntityGroupPlugin { || summaryEntityTypes.contains(entityType)) { return null; } - TimelineEntityGroupId groupId = convertToTimelineEntityGroupId(primaryFilter.getName(), - primaryFilter.getValue().toString()); - if (groupId != null) { - TimelineEntityGroupId appGroupId = - TimelineEntityGroupId.newInstance(groupId.getApplicationId(), - groupId.getApplicationId().toString()); - return Sets.newHashSet(groupId, appGroupId); - } - return null; + return convertToTimelineEntityGroupIds(primaryFilter.getName(), primaryFilter.getValue().toString()); } @Override @@ -129,14 +139,7 @@ public class TimelineCachePluginImpl extends TimelineEntityGroupPlugin { if (!knownEntityTypes.contains(entityType) || summaryEntityTypes.contains(entityType)) { return null; } - TimelineEntityGroupId groupId = convertToTimelineEntityGroupId(entityType, entityId); - if (groupId != null) { - TimelineEntityGroupId appGroupId = - TimelineEntityGroupId.newInstance(groupId.getApplicationId(), - groupId.getApplicationId().toString()); - return Sets.newHashSet(groupId, appGroupId); - } - return null; + return convertToTimelineEntityGroupIds(entityType, entityId); } @Override @@ -147,20 +150,53 @@ public class TimelineCachePluginImpl extends TimelineEntityGroupPlugin { || entityIds == null || entityIds.isEmpty()) { return null; } - Set<TimelineEntityGroupId> groupIds = new HashSet<TimelineEntityGroupId>(); - Set<ApplicationId> appIdSet = new HashSet<ApplicationId>(); + Set<TimelineEntityGroupId> groupIds = new HashSet<TimelineEntityGroupId>(); for (String entityId : entityIds) { - TimelineEntityGroupId groupId = convertToTimelineEntityGroupId(entityType, entityId); + Set<TimelineEntityGroupId> groupId = convertToTimelineEntityGroupIds(entityType, entityId); if (groupId != null) { - groupIds.add(groupId); - appIdSet.add(groupId.getApplicationId()); + groupIds.addAll(groupId); } } - for (ApplicationId appId : appIdSet) { - groupIds.add(TimelineEntityGroupId.newInstance(appId, appId.toString())); - } return groupIds; } + @Override + public void setConf(Configuration conf) { + this.conf = conf instanceof TezConfiguration ? conf : new TezConfiguration(conf); + + this.allNumGroupsPerDag = loadAllNumDagsPerGroup(); + } + + private Set<Integer> loadAllNumDagsPerGroup() { + Set<Integer> allNumDagsPerGroup = new HashSet<Integer>(); + + int numDagsPerGroup = conf.getInt(TezConfiguration.TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP, + TezConfiguration.TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP_DEFAULT); + if (numDagsPerGroup > 1) { + // Add current numDagsPerGroup from config. + allNumDagsPerGroup.add(numDagsPerGroup); + } + + // Add the older values from config. + int [] usedNumGroups = conf.getInts(TezConfiguration.TEZ_HISTORY_LOGGING_TIMELINE_CACHE_PLUGIN_OLD_NUM_DAGS_PER_GROUP); + if (usedNumGroups != null) { + for (int i = 0; i < usedNumGroups.length; ++i) { + allNumDagsPerGroup.add(usedNumGroups[i]); + } + } + + // Warn for performance impact + if (allNumDagsPerGroup.size() > 3) { + LOG.warn("Too many entries in " + TezConfiguration.TEZ_HISTORY_LOGGING_TIMELINE_CACHE_PLUGIN_OLD_NUM_DAGS_PER_GROUP + + ", this can result in slower lookup from Yarn Timeline server or slower load times in TezUI."); + } + return allNumDagsPerGroup; + } + + @Override + public Configuration getConf() { + return conf; + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/9930011b/tez-plugins/tez-yarn-timeline-cache-plugin/src/test/java/org/apache/tez/dag/history/logging/ats/TestTimelineCachePluginImpl.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-cache-plugin/src/test/java/org/apache/tez/dag/history/logging/ats/TestTimelineCachePluginImpl.java b/tez-plugins/tez-yarn-timeline-cache-plugin/src/test/java/org/apache/tez/dag/history/logging/ats/TestTimelineCachePluginImpl.java index 562a66e..6f819ba 100644 --- a/tez-plugins/tez-yarn-timeline-cache-plugin/src/test/java/org/apache/tez/dag/history/logging/ats/TestTimelineCachePluginImpl.java +++ b/tez-plugins/tez-yarn-timeline-cache-plugin/src/test/java/org/apache/tez/dag/history/logging/ats/TestTimelineCachePluginImpl.java @@ -27,11 +27,14 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId; import org.apache.hadoop.yarn.server.timeline.NameValuePair; +import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.history.logging.EntityTypes; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; @@ -43,6 +46,7 @@ import org.junit.Test; import com.google.common.collect.Sets; + public class TestTimelineCachePluginImpl { static ApplicationId appId1; @@ -61,8 +65,20 @@ public class TestTimelineCachePluginImpl { static Map<String, String> typeIdMap1; static Map<String, String> typeIdMap2; - TimelineCachePluginImpl plugin = - new TimelineCachePluginImpl(); + private static TimelineCachePluginImpl createPlugin(int numDagsPerGroup, String usedNumDagsPerGroup) { + Configuration conf = new Configuration(false); + if (numDagsPerGroup > 0) { + conf.setInt(TezConfiguration.TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP, numDagsPerGroup); + } + if (usedNumDagsPerGroup != null) { + conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_TIMELINE_CACHE_PLUGIN_OLD_NUM_DAGS_PER_GROUP, usedNumDagsPerGroup); + } + if (numDagsPerGroup > 0 || usedNumDagsPerGroup != null) { + return ReflectionUtils.newInstance(TimelineCachePluginImpl.class, conf); + } else { + return new TimelineCachePluginImpl(); + } + } @BeforeClass public static void beforeClass() { @@ -94,11 +110,11 @@ public class TestTimelineCachePluginImpl { typeIdMap2.put(EntityTypes.TEZ_VERTEX_ID.name(), vertexID2.toString()); typeIdMap2.put(EntityTypes.TEZ_TASK_ID.name(), taskID2.toString()); typeIdMap2.put(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), attemptID2.toString()); - } @Test public void testGetTimelineEntityGroupIdByPrimaryFilter() { + TimelineCachePluginImpl plugin = createPlugin(100, null); for (Entry<String, String> entry : typeIdMap1.entrySet()) { NameValuePair primaryFilter = new NameValuePair(entry.getKey(), entry.getValue()); Assert.assertNull(plugin.getTimelineEntityGroupId(EntityTypes.TEZ_APPLICATION.name(), @@ -108,19 +124,38 @@ public class TestTimelineCachePluginImpl { Assert.assertNull(groupIds); continue; } + Assert.assertEquals(3, groupIds.size()); + Iterator<TimelineEntityGroupId> iter = groupIds.iterator(); + while (iter.hasNext()) { + TimelineEntityGroupId groupId = iter.next(); + Assert.assertEquals(appId1, groupId.getApplicationId()); + Assert.assertTrue(getGroupIds(dagID1, appId1, 100).contains(groupId.getTimelineEntityGroupId())); + } + } + } + + @Test + public void testGetTimelineEntityGroupIdByIdDefaultConfig() { + TimelineCachePluginImpl plugin = createPlugin(-1, null); + for (Entry<String, String> entry : typeIdMap1.entrySet()) { + Set<TimelineEntityGroupId> groupIds = plugin.getTimelineEntityGroupId(entry.getValue(), entry.getKey()); + if (entry.getKey().equals(EntityTypes.TEZ_DAG_ID.name())) { + Assert.assertNull(groupIds); + continue; + } Assert.assertEquals(2, groupIds.size()); Iterator<TimelineEntityGroupId> iter = groupIds.iterator(); while (iter.hasNext()) { TimelineEntityGroupId groupId = iter.next(); Assert.assertEquals(appId1, groupId.getApplicationId()); - Assert.assertTrue((dagID1.toString().equals(groupId.getTimelineEntityGroupId())) - || (appId1.toString().equals(groupId.getTimelineEntityGroupId()))); + Assert.assertTrue(getGroupIds(dagID1, appId1).contains(groupId.getTimelineEntityGroupId())); } } } @Test - public void testGetTimelineEntityGroupIdById() { + public void testGetTimelineEntityGroupIdByIdNoGroupingConf() { + TimelineCachePluginImpl plugin = createPlugin(1, null); for (Entry<String, String> entry : typeIdMap1.entrySet()) { Set<TimelineEntityGroupId> groupIds = plugin.getTimelineEntityGroupId(entry.getValue(), entry.getKey()); if (entry.getKey().equals(EntityTypes.TEZ_DAG_ID.name())) { @@ -132,14 +167,90 @@ public class TestTimelineCachePluginImpl { while (iter.hasNext()) { TimelineEntityGroupId groupId = iter.next(); Assert.assertEquals(appId1, groupId.getApplicationId()); - Assert.assertTrue((dagID1.toString().equals(groupId.getTimelineEntityGroupId())) - || (appId1.toString().equals(groupId.getTimelineEntityGroupId()))); + Assert.assertTrue(getGroupIds(dagID1, appId1).contains(groupId.getTimelineEntityGroupId())); + } + } + } + + @Test + public void testGetTimelineEntityGroupIdById() { + TimelineCachePluginImpl plugin = createPlugin(100, null); + for (Entry<String, String> entry : typeIdMap1.entrySet()) { + Set<TimelineEntityGroupId> groupIds = plugin.getTimelineEntityGroupId(entry.getValue(), entry.getKey()); + if (entry.getKey().equals(EntityTypes.TEZ_DAG_ID.name())) { + Assert.assertNull(groupIds); + continue; + } + Assert.assertEquals(3, groupIds.size()); + Iterator<TimelineEntityGroupId> iter = groupIds.iterator(); + while (iter.hasNext()) { + TimelineEntityGroupId groupId = iter.next(); + Assert.assertEquals(appId1, groupId.getApplicationId()); + Assert.assertTrue(getGroupIds(dagID1, appId1, 100).contains(groupId.getTimelineEntityGroupId())); + } + } + } + + @Test + public void testGetTimelineEntityGroupIdByIdWithOldGroupIdsSingle() { + TimelineCachePluginImpl plugin = createPlugin(100, "50"); + for (Entry<String, String> entry : typeIdMap2.entrySet()) { + Set<TimelineEntityGroupId> groupIds = plugin.getTimelineEntityGroupId(entry.getValue(), entry.getKey()); + if (entry.getKey().equals(EntityTypes.TEZ_DAG_ID.name())) { + Assert.assertNull(groupIds); + continue; + } + Assert.assertEquals(4, groupIds.size()); + Iterator<TimelineEntityGroupId> iter = groupIds.iterator(); + while (iter.hasNext()) { + TimelineEntityGroupId groupId = iter.next(); + Assert.assertEquals(appId2, groupId.getApplicationId()); + Assert.assertTrue(getGroupIds(dagID2, appId2, 100, 50).contains(groupId.getTimelineEntityGroupId())); + } + } + } + + @Test + public void testGetTimelineEntityGroupIdByIdWithOldGroupIdsMultiple() { + TimelineCachePluginImpl plugin = createPlugin(100, "25, 50"); + for (Entry<String, String> entry : typeIdMap2.entrySet()) { + Set<TimelineEntityGroupId> groupIds = plugin.getTimelineEntityGroupId(entry.getValue(), entry.getKey()); + if (entry.getKey().equals(EntityTypes.TEZ_DAG_ID.name())) { + Assert.assertNull(groupIds); + continue; + } + Assert.assertEquals(5, groupIds.size()); + Iterator<TimelineEntityGroupId> iter = groupIds.iterator(); + while (iter.hasNext()) { + TimelineEntityGroupId groupId = iter.next(); + Assert.assertEquals(appId2, groupId.getApplicationId()); + Assert.assertTrue(getGroupIds(dagID2, appId2, 100, 25, 50).contains(groupId.getTimelineEntityGroupId())); + } + } + } + + @Test + public void testGetTimelineEntityGroupIdByIdWithOldGroupIdsEmpty() { + TimelineCachePluginImpl plugin = createPlugin(100, ""); + for (Entry<String, String> entry : typeIdMap2.entrySet()) { + Set<TimelineEntityGroupId> groupIds = plugin.getTimelineEntityGroupId(entry.getValue(), entry.getKey()); + if (entry.getKey().equals(EntityTypes.TEZ_DAG_ID.name())) { + Assert.assertNull(groupIds); + continue; + } + Assert.assertEquals(3, groupIds.size()); + Iterator<TimelineEntityGroupId> iter = groupIds.iterator(); + while (iter.hasNext()) { + TimelineEntityGroupId groupId = iter.next(); + Assert.assertEquals(appId2, groupId.getApplicationId()); + Assert.assertTrue(getGroupIds(dagID2, appId2, 100).contains(groupId.getTimelineEntityGroupId())); } } } @Test public void testGetTimelineEntityGroupIdByIds() { + TimelineCachePluginImpl plugin = createPlugin(100, null); for (Entry<String, String> entry : typeIdMap1.entrySet()) { SortedSet<String> entityIds = new TreeSet<String>(); entityIds.add(entry.getValue()); @@ -150,31 +261,36 @@ public class TestTimelineCachePluginImpl { Assert.assertNull(groupIds); continue; } - Assert.assertEquals(4, groupIds.size()); + Assert.assertEquals(6, groupIds.size()); int found = 0; Iterator<TimelineEntityGroupId> iter = groupIds.iterator(); while (iter.hasNext()) { TimelineEntityGroupId groupId = iter.next(); - if (groupId.getApplicationId().equals(appId1) - && groupId.getTimelineEntityGroupId().equals(dagID1.toString())) { - ++found; - } else if (groupId.getApplicationId().equals(appId2) - && groupId.getTimelineEntityGroupId().equals(dagID2.toString())) { - ++found; - } else if (groupId.getApplicationId().equals(appId1) - && groupId.getTimelineEntityGroupId().equals(appId1.toString())) { - ++found; - } else if (groupId.getApplicationId().equals(appId2) - && groupId.getTimelineEntityGroupId().equals(appId2.toString())) { - ++found; + if (groupId.getApplicationId().equals(appId1)) { + String entityGroupId = groupId.getTimelineEntityGroupId(); + if (getGroupIds(dagID1, appId1, 100).contains(entityGroupId)) { + ++found; + } else { + Assert.fail("Unexpected group id: " + entityGroupId); + } + } else if (groupId.getApplicationId().equals(appId2)) { + String entityGroupId = groupId.getTimelineEntityGroupId(); + if (getGroupIds(dagID2, appId2, 100).contains(entityGroupId)) { + ++found; + } else { + Assert.fail("Unexpected group id: " + entityGroupId); + } + } else { + Assert.fail("Unexpected appId: " + groupId.getApplicationId()); } } - Assert.assertEquals("All groupIds not returned", 4, found); + Assert.assertEquals("All groupIds not returned", 6, found); } } @Test public void testInvalidIds() { + TimelineCachePluginImpl plugin = createPlugin(-1, null); Assert.assertNull(plugin.getTimelineEntityGroupId(EntityTypes.TEZ_DAG_ID.name(), vertexID1.toString())); Assert.assertNull(plugin.getTimelineEntityGroupId(EntityTypes.TEZ_VERTEX_ID.name(), @@ -190,6 +306,7 @@ public class TestTimelineCachePluginImpl { @Test public void testInvalidTypeRequests() { + TimelineCachePluginImpl plugin = createPlugin(-1, null); Assert.assertNull(plugin.getTimelineEntityGroupId(EntityTypes.TEZ_APPLICATION.name(), appId1.toString())); Assert.assertNull(plugin.getTimelineEntityGroupId(EntityTypes.TEZ_APPLICATION_ATTEMPT.name(), @@ -206,7 +323,7 @@ public class TestTimelineCachePluginImpl { @Test public void testContainerIdConversion() { - + TimelineCachePluginImpl plugin = createPlugin(-1, null); String entityType = EntityTypes.TEZ_CONTAINER_ID.name(); SortedSet<String> entityIds = new TreeSet<String>(); entityIds.add("tez_" + cId1.toString()); @@ -255,6 +372,13 @@ public class TestTimelineCachePluginImpl { } } Assert.assertEquals("All groupIds not returned", 1, found); + } + private Set<String> getGroupIds(TezDAGID dagId, ApplicationId appId, int ... allNumDagsPerGroup) { + HashSet<String> groupIds = Sets.newHashSet(dagId.toString(), appId.toString()); + for (int numDagsPerGroup : allNumDagsPerGroup) { + groupIds.add(dagId.getGroupId(numDagsPerGroup)); + } + return groupIds; } }
