YARN-5368. Memory leak in timeline server (Jonathan Eagles via Varun Saxena)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/01aca54a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/01aca54a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/01aca54a

Branch: refs/heads/HDFS-10467
Commit: 01aca54a22c8586d232a8f79fe9977aeb8d09b83
Parents: 6b09336
Author: Varun Saxena <varunsax...@apache.org>
Authored: Wed Mar 29 01:53:20 2017 +0530
Committer: Varun Saxena <varunsax...@apache.org>
Committed: Wed Mar 29 01:53:20 2017 +0530

----------------------------------------------------------------------
 .../timeline/RollingLevelDBTimelineStore.java   | 263 +++++++++----------
 1 file changed, 117 insertions(+), 146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/01aca54a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDBTimelineStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDBTimelineStore.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDBTimelineStore.java
index 4d38008..20e0379 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDBTimelineStore.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDBTimelineStore.java
@@ -275,9 +275,7 @@ public class RollingLevelDBTimelineStore extends 
AbstractService implements
     Path domainDBPath = new Path(dbPath, DOMAIN);
     Path starttimeDBPath = new Path(dbPath, STARTTIME);
     Path ownerDBPath = new Path(dbPath, OWNER);
-    FileSystem localFS = null;
-    try {
-      localFS = FileSystem.getLocal(conf);
+    try (FileSystem localFS = FileSystem.getLocal(conf)) {
       if (!localFS.exists(dbPath)) {
         if (!localFS.mkdirs(dbPath)) {
           throw new IOException("Couldn't create directory for leveldb "
@@ -306,8 +304,6 @@ public class RollingLevelDBTimelineStore extends 
AbstractService implements
         }
         localFS.setPermission(ownerDBPath, LEVELDB_DIR_UMASK);
       }
-    } finally {
-      IOUtils.cleanup(LOG, localFS);
     }
     options.maxOpenFiles(conf.getInt(
         TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES,
@@ -408,19 +404,15 @@ public class RollingLevelDBTimelineStore extends 
AbstractService implements
         .add(writeReverseOrderedLong(revStartTime)).add(entityId)
         .getBytesForLookup();
 
-    DBIterator iterator = null;
-    try {
-      DB db = entitydb.getDBForStartTime(revStartTime);
-      if (db == null) {
-        return null;
-      }
-      iterator = db.iterator();
+    DB db = entitydb.getDBForStartTime(revStartTime);
+    if (db == null) {
+      return null;
+    }
+    try (DBIterator iterator = db.iterator()) {
       iterator.seek(prefix);
 
       return getEntity(entityId, entityType, revStartTime, fields, iterator,
           prefix, prefix.length);
-    } finally {
-      IOUtils.cleanup(LOG, iterator);
     }
   }
 
@@ -533,62 +525,61 @@ public class RollingLevelDBTimelineStore extends 
AbstractService implements
                 o2.length);
           }
         });
-    DBIterator iterator = null;
-    try {
+
       // look up start times for the specified entities
       // skip entities with no start time
-      for (String entityId : entityIds) {
-        byte[] startTime = getStartTime(entityId, entityType);
-        if (startTime != null) {
-          List<EntityIdentifier> entities = startTimeMap.get(startTime);
-          if (entities == null) {
-            entities = new ArrayList<EntityIdentifier>();
-            startTimeMap.put(startTime, entities);
-          }
-          entities.add(new EntityIdentifier(entityId, entityType));
+    for (String entityId : entityIds) {
+      byte[] startTime = getStartTime(entityId, entityType);
+      if (startTime != null) {
+        List<EntityIdentifier> entities = startTimeMap.get(startTime);
+        if (entities == null) {
+          entities = new ArrayList<EntityIdentifier>();
+          startTimeMap.put(startTime, entities);
         }
+        entities.add(new EntityIdentifier(entityId, entityType));
       }
-      for (Entry<byte[], List<EntityIdentifier>> entry : startTimeMap
+    }
+    for (Entry<byte[], List<EntityIdentifier>> entry : startTimeMap
           .entrySet()) {
-        // look up the events matching the given parameters (limit,
-        // start time, end time, event types) for entities whose start times
-        // were found and add the entities to the return list
-        byte[] revStartTime = entry.getKey();
-        for (EntityIdentifier entityIdentifier : entry.getValue()) {
-          EventsOfOneEntity entity = new EventsOfOneEntity();
-          entity.setEntityId(entityIdentifier.getId());
-          entity.setEntityType(entityType);
-          events.addEvent(entity);
-          KeyBuilder kb = KeyBuilder.newInstance().add(entityType)
-              .add(revStartTime).add(entityIdentifier.getId())
-              .add(EVENTS_COLUMN);
-          byte[] prefix = kb.getBytesForLookup();
-          if (windowEnd == null) {
-            windowEnd = Long.MAX_VALUE;
-          }
-          byte[] revts = writeReverseOrderedLong(windowEnd);
-          kb.add(revts);
-          byte[] first = kb.getBytesForLookup();
-          byte[] last = null;
-          if (windowStart != null) {
-            last = KeyBuilder.newInstance().add(prefix)
-                .add(writeReverseOrderedLong(windowStart)).getBytesForLookup();
-          }
-          if (limit == null) {
-            limit = DEFAULT_LIMIT;
-          }
-          DB db = entitydb.getDBForStartTime(readReverseOrderedLong(
-              revStartTime, 0));
-          if (db == null) {
-            continue;
-          }
-          iterator = db.iterator();
+      // look up the events matching the given parameters (limit,
+      // start time, end time, event types) for entities whose start times
+      // were found and add the entities to the return list
+      byte[] revStartTime = entry.getKey();
+      for (EntityIdentifier entityIdentifier : entry.getValue()) {
+        EventsOfOneEntity entity = new EventsOfOneEntity();
+        entity.setEntityId(entityIdentifier.getId());
+        entity.setEntityType(entityType);
+        events.addEvent(entity);
+        KeyBuilder kb = KeyBuilder.newInstance().add(entityType)
+            .add(revStartTime).add(entityIdentifier.getId())
+            .add(EVENTS_COLUMN);
+        byte[] prefix = kb.getBytesForLookup();
+        if (windowEnd == null) {
+          windowEnd = Long.MAX_VALUE;
+        }
+        byte[] revts = writeReverseOrderedLong(windowEnd);
+        kb.add(revts);
+        byte[] first = kb.getBytesForLookup();
+        byte[] last = null;
+        if (windowStart != null) {
+          last = KeyBuilder.newInstance().add(prefix)
+              .add(writeReverseOrderedLong(windowStart)).getBytesForLookup();
+        }
+        if (limit == null) {
+          limit = DEFAULT_LIMIT;
+        }
+        DB db = entitydb.getDBForStartTime(readReverseOrderedLong(
+            revStartTime, 0));
+        if (db == null) {
+          continue;
+        }
+        try (DBIterator iterator = db.iterator()) {
           for (iterator.seek(first); entity.getEvents().size() < limit
               && iterator.hasNext(); iterator.next()) {
             byte[] key = iterator.peekNext().getKey();
             if (!prefixMatches(prefix, prefix.length, key)
                 || (last != null && WritableComparator.compareBytes(key, 0,
-                    key.length, last, 0, last.length) > 0)) {
+                key.length, last, 0, last.length) > 0)) {
               break;
             }
             TimelineEvent event = getEntityEvent(eventType, key, prefix.length,
@@ -599,8 +590,6 @@ public class RollingLevelDBTimelineStore extends 
AbstractService implements
           }
         }
       }
-    } finally {
-      IOUtils.cleanup(LOG, iterator);
     }
     return events;
   }
@@ -657,66 +646,64 @@ public class RollingLevelDBTimelineStore extends 
AbstractService implements
       Long limit, Long starttime, Long endtime, String fromId, Long fromTs,
       Collection<NameValuePair> secondaryFilters, EnumSet<Field> fields,
       CheckAcl checkAcl, boolean usingPrimaryFilter) throws IOException {
-    DBIterator iterator = null;
-    try {
-      KeyBuilder kb = KeyBuilder.newInstance().add(base).add(entityType);
-      // only db keys matching the prefix (base + entity type) will be parsed
-      byte[] prefix = kb.getBytesForLookup();
-      if (endtime == null) {
-        // if end time is null, place no restriction on end time
-        endtime = Long.MAX_VALUE;
-      }
+    KeyBuilder kb = KeyBuilder.newInstance().add(base).add(entityType);
+    // only db keys matching the prefix (base + entity type) will be parsed
+    byte[] prefix = kb.getBytesForLookup();
+    if (endtime == null) {
+      // if end time is null, place no restriction on end time
+      endtime = Long.MAX_VALUE;
+    }
 
-      // Sanitize the fields parameter
-      if (fields == null) {
-        fields = EnumSet.allOf(Field.class);
-      }
+    // Sanitize the fields parameter
+    if (fields == null) {
+      fields = EnumSet.allOf(Field.class);
+    }
 
-      // construct a first key that will be seeked to using end time or fromId
-      long firstStartTime = Long.MAX_VALUE;
-      byte[] first = null;
-      if (fromId != null) {
-        Long fromIdStartTime = getStartTimeLong(fromId, entityType);
-        if (fromIdStartTime == null) {
-          // no start time for provided id, so return empty entities
-          return new TimelineEntities();
-        }
-        if (fromIdStartTime <= endtime) {
-          // if provided id's start time falls before the end of the window,
-          // use it to construct the seek key
-          firstStartTime = fromIdStartTime;
-          first = kb.add(writeReverseOrderedLong(fromIdStartTime)).add(fromId)
-              .getBytesForLookup();
-        }
-      }
-      // if seek key wasn't constructed using fromId, construct it using end ts
-      if (first == null) {
-        firstStartTime = endtime;
-        first = kb.add(writeReverseOrderedLong(endtime)).getBytesForLookup();
+    // construct a first key that will be seeked to using end time or fromId
+    long firstStartTime = Long.MAX_VALUE;
+    byte[] first = null;
+    if (fromId != null) {
+      Long fromIdStartTime = getStartTimeLong(fromId, entityType);
+      if (fromIdStartTime == null) {
+        // no start time for provided id, so return empty entities
+        return new TimelineEntities();
       }
-      byte[] last = null;
-      if (starttime != null) {
-        // if start time is not null, set a last key that will not be
-        // iterated past
-        last = KeyBuilder.newInstance().add(base).add(entityType)
-            .add(writeReverseOrderedLong(starttime)).getBytesForLookup();
-      }
-      if (limit == null) {
-        // if limit is not specified, use the default
-        limit = DEFAULT_LIMIT;
+      if (fromIdStartTime <= endtime) {
+        // if provided id's start time falls before the end of the window,
+        // use it to construct the seek key
+        firstStartTime = fromIdStartTime;
+        first = kb.add(writeReverseOrderedLong(fromIdStartTime)).add(fromId)
+            .getBytesForLookup();
       }
+    }
+    // if seek key wasn't constructed using fromId, construct it using end ts
+    if (first == null) {
+      firstStartTime = endtime;
+      first = kb.add(writeReverseOrderedLong(endtime)).getBytesForLookup();
+    }
+    byte[] last = null;
+    if (starttime != null) {
+      // if start time is not null, set a last key that will not be
+      // iterated past
+      last = KeyBuilder.newInstance().add(base).add(entityType)
+          .add(writeReverseOrderedLong(starttime)).getBytesForLookup();
+    }
+    if (limit == null) {
+      // if limit is not specified, use the default
+      limit = DEFAULT_LIMIT;
+    }
 
-      TimelineEntities entities = new TimelineEntities();
-      RollingLevelDB rollingdb = null;
-      if (usingPrimaryFilter) {
-        rollingdb = indexdb;
-      } else {
-        rollingdb = entitydb;
-      }
+    TimelineEntities entities = new TimelineEntities();
+    RollingLevelDB rollingdb = null;
+    if (usingPrimaryFilter) {
+      rollingdb = indexdb;
+    } else {
+      rollingdb = entitydb;
+    }
 
-      DB db = rollingdb.getDBForStartTime(firstStartTime);
-      while (entities.getEntities().size() < limit && db != null) {
-        iterator = db.iterator();
+    DB db = rollingdb.getDBForStartTime(firstStartTime);
+    while (entities.getEntities().size() < limit && db != null) {
+      try (DBIterator iterator = db.iterator()) {
         iterator.seek(first);
 
         // iterate until one of the following conditions is met: limit is
@@ -726,7 +713,7 @@ public class RollingLevelDBTimelineStore extends 
AbstractService implements
           byte[] key = iterator.peekNext().getKey();
           if (!prefixMatches(prefix, prefix.length, key)
               || (last != null && WritableComparator.compareBytes(key, 0,
-                  key.length, last, 0, last.length) > 0)) {
+              key.length, last, 0, last.length) > 0)) {
             break;
           }
           // read the start time and entity id from the current key
@@ -814,10 +801,8 @@ public class RollingLevelDBTimelineStore extends 
AbstractService implements
         }
         db = rollingdb.getPreviousDB(db);
       }
-      return entities;
-    } finally {
-      IOUtils.cleanup(LOG, iterator);
     }
+    return entities;
   }
 
   /**
@@ -1459,15 +1444,14 @@ public class RollingLevelDBTimelineStore extends 
AbstractService implements
     long startTimesCount = 0;
 
     WriteBatch writeBatch = null;
-    DBIterator iterator = null;
 
-    try {
-      writeBatch = starttimedb.createWriteBatch();
-      ReadOptions readOptions = new ReadOptions();
-      readOptions.fillCache(false);
-      iterator = starttimedb.iterator(readOptions);
+    ReadOptions readOptions = new ReadOptions();
+    readOptions.fillCache(false);
+    try (DBIterator iterator = starttimedb.iterator(readOptions)) {
+
       // seek to the first start time entry
       iterator.seekToFirst();
+      writeBatch = starttimedb.createWriteBatch();
 
       // evaluate each start time entry to see if it needs to be evicted or not
       while (iterator.hasNext()) {
@@ -1513,7 +1497,6 @@ public class RollingLevelDBTimelineStore extends 
AbstractService implements
           + " start time entities earlier than " + minStartTime);
     } finally {
       IOUtils.cleanup(LOG, writeBatch);
-      IOUtils.cleanup(LOG, iterator);
     }
     return startTimesCount;
   }
@@ -1598,11 +1581,9 @@ public class RollingLevelDBTimelineStore extends 
AbstractService implements
   // TODO: make data retention work with the domain data as well
   @Override
   public void put(TimelineDomain domain) throws IOException {
-    WriteBatch domainWriteBatch = null;
-    WriteBatch ownerWriteBatch = null;
-    try {
-      domainWriteBatch = domaindb.createWriteBatch();
-      ownerWriteBatch = ownerdb.createWriteBatch();
+    try (WriteBatch domainWriteBatch = domaindb.createWriteBatch();
+         WriteBatch ownerWriteBatch = ownerdb.createWriteBatch();) {
+
       if (domain.getId() == null || domain.getId().length() == 0) {
         throw new IllegalArgumentException("Domain doesn't have an ID");
       }
@@ -1682,9 +1663,6 @@ public class RollingLevelDBTimelineStore extends 
AbstractService implements
       ownerWriteBatch.put(ownerLookupEntryKey, timestamps);
       domaindb.write(domainWriteBatch);
       ownerdb.write(ownerWriteBatch);
-    } finally {
-      IOUtils.cleanup(LOG, domainWriteBatch);
-      IOUtils.cleanup(LOG, ownerWriteBatch);
     }
   }
 
@@ -1709,26 +1687,21 @@ public class RollingLevelDBTimelineStore extends 
AbstractService implements
 
   @Override
   public TimelineDomain getDomain(String domainId) throws IOException {
-    DBIterator iterator = null;
-    try {
+    try (DBIterator iterator = domaindb.iterator()) {
       byte[] prefix = KeyBuilder.newInstance().add(domainId)
           .getBytesForLookup();
-      iterator = domaindb.iterator();
       iterator.seek(prefix);
       return getTimelineDomain(iterator, domainId, prefix);
-    } finally {
-      IOUtils.cleanup(LOG, iterator);
     }
   }
 
   @Override
   public TimelineDomains getDomains(String owner) throws IOException {
-    DBIterator iterator = null;
-    try {
+    try (DBIterator iterator = ownerdb.iterator()) {
       byte[] prefix = KeyBuilder.newInstance().add(owner).getBytesForLookup();
+      iterator.seek(prefix);
       List<TimelineDomain> domains = new ArrayList<TimelineDomain>();
-      for (iterator = ownerdb.iterator(), iterator.seek(prefix); iterator
-          .hasNext();) {
+      while (iterator.hasNext()) {
         byte[] key = iterator.peekNext().getKey();
         if (!prefixMatches(prefix, prefix.length, key)) {
           break;
@@ -1761,8 +1734,6 @@ public class RollingLevelDBTimelineStore extends 
AbstractService implements
       TimelineDomains domainsToReturn = new TimelineDomains();
       domainsToReturn.addDomains(domains);
       return domainsToReturn;
-    } finally {
-      IOUtils.cleanup(LOG, iterator);
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to