http://git-wip-us.apache.org/repos/asf/hadoop/blob/02f597c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/EntityGroupPlugInForTest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/EntityGroupPlugInForTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/EntityGroupPlugInForTest.java new file mode 100644 index 0000000..71e26cb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/EntityGroupPlugInForTest.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timeline; + +import com.google.common.collect.Sets; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId; + +import java.util.Collection; +import java.util.Set; +import java.util.SortedSet; + +class EntityGroupPlugInForTest extends TimelineEntityGroupPlugin { + + private static TimelineEntityGroupId timelineEntityGroupId + = TimelineEntityGroupId.newInstance( + TestEntityGroupFSTimelineStore.TEST_APPLICATION_ID, "test"); + + @Override + public Set<TimelineEntityGroupId> getTimelineEntityGroupId(String entityType, + NameValuePair primaryFilter, + Collection<NameValuePair> secondaryFilters) { + return Sets.newHashSet(timelineEntityGroupId); + } + + @Override + public Set<TimelineEntityGroupId> getTimelineEntityGroupId(String entityId, + String entityType) { + return Sets.newHashSet(timelineEntityGroupId); + } + + @Override + public Set<TimelineEntityGroupId> getTimelineEntityGroupId(String entityType, + SortedSet<String> entityIds, + Set<String> eventTypes) { + return Sets.newHashSet(timelineEntityGroupId); + } + + static TimelineEntityGroupId getStandardTimelineGroupId() { + return timelineEntityGroupId; + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/02f597c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/PluginStoreTestUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/PluginStoreTestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/PluginStoreTestUtils.java new file mode 100644 index 0000000..e0379b1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/PluginStoreTestUtils.java @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timeline; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonGenerator; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.SerializationConfig; +import org.codehaus.jackson.map.annotate.JsonSerialize; +import org.codehaus.jackson.util.MinimalPrettyPrinter; +import org.codehaus.jackson.xc.JaxbAnnotationIntrospector; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +class PluginStoreTestUtils { + + static FSDataOutputStream createLogFile(Path logPath, FileSystem fs) + throws IOException { + FSDataOutputStream stream; + stream = fs.create(logPath, true); + return stream; + } + + static ObjectMapper createObjectMapper() { + ObjectMapper mapper = new ObjectMapper(); + mapper.setAnnotationIntrospector(new JaxbAnnotationIntrospector()); + mapper.setSerializationInclusion(JsonSerialize.Inclusion.NON_NULL); + mapper.configure(SerializationConfig.Feature.CLOSE_CLOSEABLE, false); + return mapper; + } + + /** + * Create sample entities for testing + * @return two timeline entities in a {@link TimelineEntities} object + */ + static TimelineEntities generateTestEntities() { + TimelineEntities entities = new TimelineEntities(); + Map<String, Set<Object>> primaryFilters = + new HashMap<String, Set<Object>>(); + Set<Object> l1 = new HashSet<Object>(); + l1.add("username"); + Set<Object> l2 = new HashSet<Object>(); + l2.add(Integer.MAX_VALUE); + Set<Object> l3 = new HashSet<Object>(); + l3.add("123abc"); + Set<Object> l4 = new HashSet<Object>(); + l4.add((long)Integer.MAX_VALUE + 1l); + primaryFilters.put("user", l1); + primaryFilters.put("appname", l2); + primaryFilters.put("other", l3); + primaryFilters.put("long", l4); + Map<String, Object> secondaryFilters = new HashMap<String, Object>(); + secondaryFilters.put("startTime", 123456); + secondaryFilters.put("status", "RUNNING"); + Map<String, Object> otherInfo1 = new HashMap<String, Object>(); + otherInfo1.put("info1", "val1"); + otherInfo1.putAll(secondaryFilters); + + String entityId1 = "id_1"; + String entityType1 = "type_1"; + String entityId2 = "id_2"; + String entityType2 = "type_2"; + + Map<String, Set<String>> relatedEntities = + new HashMap<String, Set<String>>(); + relatedEntities.put(entityType2, Collections.singleton(entityId2)); + + TimelineEvent ev3 = createEvent(789l, "launch_event", null); + TimelineEvent ev4 = createEvent(0l, "init_event", null); + List<TimelineEvent> events = new ArrayList<TimelineEvent>(); + events.add(ev3); + events.add(ev4); + entities.addEntity(createEntity(entityId2, entityType2, 456l, events, null, + null, null, "domain_id_1")); + + TimelineEvent ev1 = createEvent(123l, "start_event", null); + entities.addEntity(createEntity(entityId1, entityType1, 123l, + Collections.singletonList(ev1), relatedEntities, primaryFilters, + otherInfo1, "domain_id_1")); + return entities; + } + + static void verifyTestEntities(TimelineDataManager tdm) + throws YarnException, IOException { + TimelineEntity entity1 = tdm.getEntity("type_1", "id_1", + EnumSet.allOf(TimelineReader.Field.class), + UserGroupInformation.getLoginUser()); + TimelineEntity entity2 = tdm.getEntity("type_2", "id_2", + EnumSet.allOf(TimelineReader.Field.class), + UserGroupInformation.getLoginUser()); + assertNotNull(entity1); + assertNotNull(entity2); + assertEquals("Failed to read out entity 1", + (Long) 123l, entity1.getStartTime()); + assertEquals("Failed to read out entity 2", + (Long) 456l, entity2.getStartTime()); + } + + /** + * Create a test entity + */ + static TimelineEntity createEntity(String entityId, String entityType, + Long startTime, List<TimelineEvent> events, + Map<String, Set<String>> relatedEntities, + Map<String, Set<Object>> primaryFilters, + Map<String, Object> otherInfo, String domainId) { + TimelineEntity entity = new TimelineEntity(); + entity.setEntityId(entityId); + entity.setEntityType(entityType); + entity.setStartTime(startTime); + entity.setEvents(events); + if (relatedEntities != null) { + for (Map.Entry<String, Set<String>> e : relatedEntities.entrySet()) { + for (String v : e.getValue()) { + entity.addRelatedEntity(e.getKey(), v); + } + } + } else { + entity.setRelatedEntities(null); + } + entity.setPrimaryFilters(primaryFilters); + entity.setOtherInfo(otherInfo); + entity.setDomainId(domainId); + return entity; + } + + /** + * Create a test event + */ + static TimelineEvent createEvent(long timestamp, String type, Map<String, + Object> info) { + TimelineEvent event = new TimelineEvent(); + event.setTimestamp(timestamp); + event.setEventType(type); + event.setEventInfo(info); + return event; + } + + /** + * Write timeline entities to a file system + * @param entities + * @param logPath + * @param fs + * @throws IOException + */ + static void writeEntities(TimelineEntities entities, Path logPath, + FileSystem fs) throws IOException { + FSDataOutputStream outStream = createLogFile(logPath, fs); + JsonGenerator jsonGenerator + = (new JsonFactory()).createJsonGenerator(outStream); + jsonGenerator.setPrettyPrinter(new MinimalPrettyPrinter("\n")); + ObjectMapper objMapper = createObjectMapper(); + for (TimelineEntity entity : entities.getEntities()) { + objMapper.writeValue(jsonGenerator, entity); + } + outStream.close(); + } + + static TimelineDataManager getTdmWithStore(Configuration config, TimelineStore store) { + TimelineACLsManager aclManager = new TimelineACLsManager(config); + TimelineDataManager tdm = new TimelineDataManager(store, aclManager); + tdm.init(config); + return tdm; + } + + static TimelineDataManager getTdmWithMemStore(Configuration config) { + TimelineStore store = new MemoryTimelineStore(); + TimelineDataManager tdm = getTdmWithStore(config, store); + return tdm; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/02f597c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java new file mode 100644 index 0000000..e43b705 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java @@ -0,0 +1,332 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timeline; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.List; + +import static org.apache.hadoop.yarn.server.timeline.EntityGroupFSTimelineStore.AppState; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils { + + private static final String SAMPLE_APP_NAME = "1234_5678"; + + static final ApplicationId TEST_APPLICATION_ID + = ConverterUtils.toApplicationId( + ConverterUtils.APPLICATION_PREFIX + "_" + SAMPLE_APP_NAME); + + private static final String TEST_APP_DIR_NAME + = TEST_APPLICATION_ID.toString(); + private static final String TEST_ATTEMPT_DIR_NAME + = ApplicationAttemptId.appAttemptIdStrPrefix + SAMPLE_APP_NAME + "_1"; + private static final String TEST_SUMMARY_LOG_FILE_NAME + = EntityGroupFSTimelineStore.SUMMARY_LOG_PREFIX + "test"; + private static final String TEST_ENTITY_LOG_FILE_NAME + = EntityGroupFSTimelineStore.ENTITY_LOG_PREFIX + + EntityGroupPlugInForTest.getStandardTimelineGroupId(); + private static final String TEST_DOMAIN_LOG_FILE_NAME + = EntityGroupFSTimelineStore.DOMAIN_LOG_PREFIX + "test"; + + private static final Path TEST_ROOT_DIR + = new Path(System.getProperty("test.build.data", + System.getProperty("java.io.tmpdir")), + TestEntityGroupFSTimelineStore.class.getSimpleName()); + private static final Path TEST_APP_DIR_PATH + = new Path(TEST_ROOT_DIR, TEST_APP_DIR_NAME); + private static final Path TEST_ATTEMPT_DIR_PATH + = new Path(TEST_APP_DIR_PATH, TEST_ATTEMPT_DIR_NAME); + private static final Path TEST_DONE_DIR_PATH + = new Path(TEST_ROOT_DIR, "done"); + + private static Configuration config = new YarnConfiguration(); + private static MiniDFSCluster hdfsCluster; + private static FileSystem fs; + private EntityGroupFSTimelineStore store; + private TimelineEntity entityNew; + + @Rule + public TestName currTestName = new TestName(); + + @BeforeClass + public static void setupClass() throws Exception { + config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, false); + config.set( + YarnConfiguration + .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES, + "YARN_APPLICATION,YARN_APPLICATION_ATTEMPT,YARN_CONTAINER"); + config.set(YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR, + TEST_DONE_DIR_PATH.toString()); + config.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR.toString()); + HdfsConfiguration hdfsConfig = new HdfsConfiguration(); + hdfsCluster + = new MiniDFSCluster.Builder(hdfsConfig).numDataNodes(1).build(); + fs = hdfsCluster.getFileSystem(); + } + + @Before + public void setup() throws Exception { + createTestFiles(); + store = new EntityGroupFSTimelineStore(); + if (currTestName.getMethodName().contains("Plugin")) { + config.set(YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES, + EntityGroupPlugInForTest.class.getName()); + } + store.init(config); + store.start(); + store.setFs(fs); + } + + @After + public void tearDown() throws Exception { + fs.delete(TEST_APP_DIR_PATH, true); + store.stop(); + } + + @AfterClass + public static void tearDownClass() throws Exception { + hdfsCluster.shutdown(); + FileContext fileContext = FileContext.getLocalFSFileContext(); + fileContext.delete(new Path( + config.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH)), true); + } + + @Test + public void testAppLogsScanLogs() throws Exception { + EntityGroupFSTimelineStore.AppLogs appLogs = + store.new AppLogs(TEST_APPLICATION_ID, TEST_APP_DIR_PATH, + AppState.COMPLETED); + appLogs.scanForLogs(); + List<LogInfo> summaryLogs = appLogs.getSummaryLogs(); + List<LogInfo> detailLogs = appLogs.getDetailLogs(); + assertEquals(2, summaryLogs.size()); + assertEquals(1, detailLogs.size()); + + for (LogInfo log : summaryLogs) { + String fileName = log.getFilename(); + assertTrue(fileName.equals(TEST_SUMMARY_LOG_FILE_NAME) + || fileName.equals(TEST_DOMAIN_LOG_FILE_NAME)); + } + + for (LogInfo log : detailLogs) { + String fileName = log.getFilename(); + assertEquals(fileName, TEST_ENTITY_LOG_FILE_NAME); + } + } + + @Test + public void testMoveToDone() throws Exception { + EntityGroupFSTimelineStore.AppLogs appLogs = + store.new AppLogs(TEST_APPLICATION_ID, TEST_APP_DIR_PATH, + AppState.COMPLETED); + Path pathBefore = appLogs.getAppDirPath(); + appLogs.moveToDone(); + Path pathAfter = appLogs.getAppDirPath(); + assertNotEquals(pathBefore, pathAfter); + assertTrue(pathAfter.toString().contains(TEST_DONE_DIR_PATH.toString())); + } + + @Test + public void testParseSummaryLogs() throws Exception { + TimelineDataManager tdm = PluginStoreTestUtils.getTdmWithMemStore(config); + EntityGroupFSTimelineStore.AppLogs appLogs = + store.new AppLogs(TEST_APPLICATION_ID, TEST_APP_DIR_PATH, + AppState.COMPLETED); + appLogs.scanForLogs(); + appLogs.parseSummaryLogs(tdm); + PluginStoreTestUtils.verifyTestEntities(tdm); + } + + @Test + public void testCleanLogs() throws Exception { + // Create test dirs and files + // Irrelevant file, should not be reclaimed + Path irrelevantFilePath = new Path( + TEST_DONE_DIR_PATH, "irrelevant.log"); + FSDataOutputStream stream = fs.create(irrelevantFilePath); + stream.close(); + // Irrelevant directory, should not be reclaimed + Path irrelevantDirPath = new Path(TEST_DONE_DIR_PATH, "irrelevant"); + fs.mkdirs(irrelevantDirPath); + + Path doneAppHomeDir = new Path(new Path(TEST_DONE_DIR_PATH, "0000"), "001"); + // First application, untouched after creation + Path appDirClean = new Path(doneAppHomeDir, TEST_APP_DIR_NAME); + Path attemptDirClean = new Path(appDirClean, TEST_ATTEMPT_DIR_NAME); + fs.mkdirs(attemptDirClean); + Path filePath = new Path(attemptDirClean, "test.log"); + stream = fs.create(filePath); + stream.close(); + // Second application, one file touched after creation + Path appDirHoldByFile = new Path(doneAppHomeDir, TEST_APP_DIR_NAME + "1"); + Path attemptDirHoldByFile + = new Path(appDirHoldByFile, TEST_ATTEMPT_DIR_NAME); + fs.mkdirs(attemptDirHoldByFile); + Path filePathHold = new Path(attemptDirHoldByFile, "test1.log"); + stream = fs.create(filePathHold); + stream.close(); + // Third application, one dir touched after creation + Path appDirHoldByDir = new Path(doneAppHomeDir, TEST_APP_DIR_NAME + "2"); + Path attemptDirHoldByDir = new Path(appDirHoldByDir, TEST_ATTEMPT_DIR_NAME); + fs.mkdirs(attemptDirHoldByDir); + Path dirPathHold = new Path(attemptDirHoldByDir, "hold"); + fs.mkdirs(dirPathHold); + // Fourth application, empty dirs + Path appDirEmpty = new Path(doneAppHomeDir, TEST_APP_DIR_NAME + "3"); + Path attemptDirEmpty = new Path(appDirEmpty, TEST_ATTEMPT_DIR_NAME); + fs.mkdirs(attemptDirEmpty); + Path dirPathEmpty = new Path(attemptDirEmpty, "empty"); + fs.mkdirs(dirPathEmpty); + + // Should retain all logs after this run + EntityGroupFSTimelineStore.cleanLogs(TEST_DONE_DIR_PATH, fs, 10000); + assertTrue(fs.exists(irrelevantDirPath)); + assertTrue(fs.exists(irrelevantFilePath)); + assertTrue(fs.exists(filePath)); + assertTrue(fs.exists(filePathHold)); + assertTrue(fs.exists(dirPathHold)); + assertTrue(fs.exists(dirPathEmpty)); + + // Make sure the created dir is old enough + Thread.sleep(2000); + // Touch the second application + stream = fs.append(filePathHold); + stream.writeBytes("append"); + stream.close(); + // Touch the third application by creating a new dir + fs.mkdirs(new Path(dirPathHold, "holdByMe")); + + EntityGroupFSTimelineStore.cleanLogs(TEST_DONE_DIR_PATH, fs, 1000); + + // Verification after the second cleaner call + assertTrue(fs.exists(irrelevantDirPath)); + assertTrue(fs.exists(irrelevantFilePath)); + assertTrue(fs.exists(filePathHold)); + assertTrue(fs.exists(dirPathHold)); + assertTrue(fs.exists(doneAppHomeDir)); + + // appDirClean and appDirEmpty should be cleaned up + assertFalse(fs.exists(appDirClean)); + assertFalse(fs.exists(appDirEmpty)); + } + + @Test + public void testPluginRead() throws Exception { + // Verify precondition + assertEquals(EntityGroupPlugInForTest.class.getName(), + store.getConfig().get( + YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES)); + // Load data and cache item, prepare timeline store by making a cache item + EntityGroupFSTimelineStore.AppLogs appLogs = + store.new AppLogs(TEST_APPLICATION_ID, TEST_APP_DIR_PATH, + AppState.COMPLETED); + EntityCacheItem cacheItem = new EntityCacheItem(config, fs); + cacheItem.setAppLogs(appLogs); + store.setCachedLogs( + EntityGroupPlugInForTest.getStandardTimelineGroupId(), cacheItem); + // Generate TDM + TimelineDataManager tdm + = PluginStoreTestUtils.getTdmWithStore(config, store); + + // Verify single entity read + TimelineEntity entity3 = tdm.getEntity("type_3", "id_3", + EnumSet.allOf(TimelineReader.Field.class), + UserGroupInformation.getLoginUser()); + assertNotNull(entity3); + assertEquals(entityNew.getStartTime(), entity3.getStartTime()); + // Verify multiple entities read + TimelineEntities entities = tdm.getEntities("type_3", null, null, null, + null, null, null, null, EnumSet.allOf(TimelineReader.Field.class), + UserGroupInformation.getLoginUser()); + assertEquals(entities.getEntities().size(), 1); + for (TimelineEntity entity : entities.getEntities()) { + assertEquals(entityNew.getStartTime(), entity.getStartTime()); + } + } + + @Test + public void testSummaryRead() throws Exception { + // Load data + EntityGroupFSTimelineStore.AppLogs appLogs = + store.new AppLogs(TEST_APPLICATION_ID, TEST_APP_DIR_PATH, + AppState.COMPLETED); + TimelineDataManager tdm + = PluginStoreTestUtils.getTdmWithStore(config, store); + appLogs.scanForLogs(); + appLogs.parseSummaryLogs(tdm); + + // Verify single entity read + PluginStoreTestUtils.verifyTestEntities(tdm); + // Verify multiple entities read + TimelineEntities entities = tdm.getEntities("type_1", null, null, null, + null, null, null, null, EnumSet.allOf(TimelineReader.Field.class), + UserGroupInformation.getLoginUser()); + assertEquals(entities.getEntities().size(), 1); + for (TimelineEntity entity : entities.getEntities()) { + assertEquals((Long) 123l, entity.getStartTime()); + } + + } + + private void createTestFiles() throws IOException { + TimelineEntities entities = PluginStoreTestUtils.generateTestEntities(); + PluginStoreTestUtils.writeEntities(entities, + new Path(TEST_ATTEMPT_DIR_PATH, TEST_SUMMARY_LOG_FILE_NAME), fs); + + entityNew = PluginStoreTestUtils + .createEntity("id_3", "type_3", 789l, null, null, + null, null, "domain_id_1"); + TimelineEntities entityList = new TimelineEntities(); + entityList.addEntity(entityNew); + PluginStoreTestUtils.writeEntities(entityList, + new Path(TEST_ATTEMPT_DIR_PATH, TEST_ENTITY_LOG_FILE_NAME), fs); + + FSDataOutputStream out = fs.create( + new Path(TEST_ATTEMPT_DIR_PATH, TEST_DOMAIN_LOG_FILE_NAME)); + out.close(); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/02f597c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLogInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLogInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLogInfo.java new file mode 100644 index 0000000..fa6fcc7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLogInfo.java @@ -0,0 +1,253 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.yarn.server.timeline; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonGenerator; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.util.MinimalPrettyPrinter; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.EnumSet; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class TestLogInfo { + + private static final Path TEST_ROOT_DIR = new Path( + System.getProperty("test.build.data", + System.getProperty("java.io.tmpdir")), + TestLogInfo.class.getSimpleName()); + + private static final String TEST_ATTEMPT_DIR_NAME = "test_app"; + private static final String TEST_ENTITY_FILE_NAME = "test_entity"; + private static final String TEST_DOMAIN_FILE_NAME = "test_domain"; + private static final String TEST_BROKEN_FILE_NAME = "test_broken"; + + private Configuration config = new YarnConfiguration(); + private MiniDFSCluster hdfsCluster; + private FileSystem fs; + private ObjectMapper objMapper; + + private JsonFactory jsonFactory = new JsonFactory(); + private JsonGenerator jsonGenerator; + private FSDataOutputStream outStream = null; + private FSDataOutputStream outStreamDomain = null; + + private TimelineDomain testDomain; + + private static final short FILE_LOG_DIR_PERMISSIONS = 0770; + + @Before + public void setup() throws Exception { + config.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR.toString()); + HdfsConfiguration hdfsConfig = new HdfsConfiguration(); + hdfsCluster = new MiniDFSCluster.Builder(hdfsConfig).numDataNodes(1).build(); + fs = hdfsCluster.getFileSystem(); + Path testAppDirPath = new Path(TEST_ROOT_DIR, TEST_ATTEMPT_DIR_NAME); + fs.mkdirs(testAppDirPath, new FsPermission(FILE_LOG_DIR_PERMISSIONS)); + objMapper = PluginStoreTestUtils.createObjectMapper(); + + TimelineEntities testEntities = PluginStoreTestUtils.generateTestEntities(); + writeEntitiesLeaveOpen(testEntities, + new Path(testAppDirPath, TEST_ENTITY_FILE_NAME)); + + testDomain = new TimelineDomain(); + testDomain.setId("domain_1"); + testDomain.setReaders(UserGroupInformation.getLoginUser().getUserName()); + testDomain.setOwner(UserGroupInformation.getLoginUser().getUserName()); + testDomain.setDescription("description"); + writeDomainLeaveOpen(testDomain, + new Path(testAppDirPath, TEST_DOMAIN_FILE_NAME)); + + writeBrokenFile(new Path(testAppDirPath, TEST_BROKEN_FILE_NAME)); + } + + @After + public void tearDown() throws Exception { + jsonGenerator.close(); + outStream.close(); + outStreamDomain.close(); + hdfsCluster.shutdown(); + } + + @Test + public void testMatchesGroupId() throws Exception { + String testGroupId = "app1_group1"; + // Match + EntityLogInfo testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, + "app1_group1", + UserGroupInformation.getLoginUser().getUserName()); + assertTrue(testLogInfo.matchesGroupId(testGroupId)); + testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, + "test_app1_group1", + UserGroupInformation.getLoginUser().getUserName()); + assertTrue(testLogInfo.matchesGroupId(testGroupId)); + // Unmatch + testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, "app2_group1", + UserGroupInformation.getLoginUser().getUserName()); + assertFalse(testLogInfo.matchesGroupId(testGroupId)); + testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, "app1_group2", + UserGroupInformation.getLoginUser().getUserName()); + assertFalse(testLogInfo.matchesGroupId(testGroupId)); + testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, "app1_group12", + UserGroupInformation.getLoginUser().getUserName()); + assertFalse(testLogInfo.matchesGroupId(testGroupId)); + // Check delimiters + testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, "app1_group1_2", + UserGroupInformation.getLoginUser().getUserName()); + assertTrue(testLogInfo.matchesGroupId(testGroupId)); + testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, "app1_group1.dat", + UserGroupInformation.getLoginUser().getUserName()); + assertTrue(testLogInfo.matchesGroupId(testGroupId)); + // Check file names shorter than group id + testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, "app2", + UserGroupInformation.getLoginUser().getUserName()); + assertFalse(testLogInfo.matchesGroupId(testGroupId)); + } + + @Test + public void testParseEntity() throws Exception { + // Load test data + TimelineDataManager tdm = PluginStoreTestUtils.getTdmWithMemStore(config); + EntityLogInfo testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, + TEST_ENTITY_FILE_NAME, + UserGroupInformation.getLoginUser().getUserName()); + testLogInfo.parseForStore(tdm, TEST_ROOT_DIR, true, jsonFactory, objMapper, + fs); + // Verify for the first batch + PluginStoreTestUtils.verifyTestEntities(tdm); + // Load new data + TimelineEntity entityNew = PluginStoreTestUtils + .createEntity("id_3", "type_3", 789l, null, null, + null, null, "domain_id_1"); + TimelineEntities entityList = new TimelineEntities(); + entityList.addEntity(entityNew); + writeEntitiesLeaveOpen(entityList, + new Path(new Path(TEST_ROOT_DIR, TEST_ATTEMPT_DIR_NAME), + TEST_ENTITY_FILE_NAME)); + testLogInfo.parseForStore(tdm, TEST_ROOT_DIR, true, jsonFactory, objMapper, + fs); + // Verify the newly added data + TimelineEntity entity3 = tdm.getEntity(entityNew.getEntityType(), + entityNew.getEntityId(), EnumSet.allOf(TimelineReader.Field.class), + UserGroupInformation.getLoginUser()); + assertNotNull(entity3); + assertEquals("Failed to read out entity new", + entityNew.getStartTime(), entity3.getStartTime()); + tdm.close(); + } + + @Test + public void testParseBrokenEntity() throws Exception { + // Load test data + TimelineDataManager tdm = PluginStoreTestUtils.getTdmWithMemStore(config); + EntityLogInfo testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, + TEST_BROKEN_FILE_NAME, + UserGroupInformation.getLoginUser().getUserName()); + DomainLogInfo domainLogInfo = new DomainLogInfo(TEST_ATTEMPT_DIR_NAME, + TEST_BROKEN_FILE_NAME, + UserGroupInformation.getLoginUser().getUserName()); + // Try parse, should not fail + testLogInfo.parseForStore(tdm, TEST_ROOT_DIR, true, jsonFactory, objMapper, + fs); + domainLogInfo.parseForStore(tdm, TEST_ROOT_DIR, true, jsonFactory, objMapper, + fs); + tdm.close(); + } + + @Test + public void testParseDomain() throws Exception { + // Load test data + TimelineDataManager tdm = PluginStoreTestUtils.getTdmWithMemStore(config); + DomainLogInfo domainLogInfo = new DomainLogInfo(TEST_ATTEMPT_DIR_NAME, + TEST_DOMAIN_FILE_NAME, + UserGroupInformation.getLoginUser().getUserName()); + domainLogInfo.parseForStore(tdm, TEST_ROOT_DIR, true, jsonFactory, objMapper, + fs); + // Verify domain data + TimelineDomain resultDomain = tdm.getDomain("domain_1", + UserGroupInformation.getLoginUser()); + assertNotNull(resultDomain); + assertEquals(testDomain.getReaders(), resultDomain.getReaders()); + assertEquals(testDomain.getOwner(), resultDomain.getOwner()); + assertEquals(testDomain.getDescription(), resultDomain.getDescription()); + } + + private void writeBrokenFile(Path logPath) throws IOException { + FSDataOutputStream out = null; + try { + String broken = "{ broken { [[]} broken"; + out = PluginStoreTestUtils.createLogFile(logPath, fs); + out.write(broken.getBytes(Charset.forName("UTF-8"))); + out.close(); + out = null; + } finally { + if (out != null) { + out.close(); + } + } + } + + // TestLogInfo needs to maintain opened hdfs files so we have to build our own + // write methods + private void writeEntitiesLeaveOpen(TimelineEntities entities, Path logPath) + throws IOException { + if (outStream == null) { + outStream = PluginStoreTestUtils.createLogFile(logPath, fs); + jsonGenerator = (new JsonFactory()).createJsonGenerator(outStream); + jsonGenerator.setPrettyPrinter(new MinimalPrettyPrinter("\n")); + } + for (TimelineEntity entity : entities.getEntities()) { + objMapper.writeValue(jsonGenerator, entity); + } + outStream.hflush(); + } + + private void writeDomainLeaveOpen(TimelineDomain domain, Path logPath) + throws IOException { + if (outStreamDomain == null) { + outStreamDomain = PluginStoreTestUtils.createLogFile(logPath, fs); + } + // Write domain uses its own json generator to isolate from entity writers + JsonGenerator jsonGeneratorLocal + = (new JsonFactory()).createJsonGenerator(outStreamDomain); + jsonGeneratorLocal.setPrettyPrinter(new MinimalPrettyPrinter("\n")); + objMapper.writeValue(jsonGeneratorLocal, domain); + outStreamDomain.hflush(); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/02f597c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml index 0043115..b217ca4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml @@ -42,5 +42,6 @@ <module>hadoop-yarn-server-sharedcachemanager</module> <module>hadoop-yarn-server-tests</module> <module>hadoop-yarn-server-applicationhistoryservice</module> + <module>hadoop-yarn-server-timeline-pluginstorage</module> </modules> </project>
