Repository: falcon Updated Branches: refs/heads/master fccfc1c63 -> 5c9fe56b2
FALCON-1368 Improve Falcon server restart time. Contributed by Sandeep Samudrala. Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/5c9fe56b Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/5c9fe56b Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/5c9fe56b Branch: refs/heads/master Commit: 5c9fe56b2256b76368add5d8700bcd2b203bad40 Parents: fccfc1c Author: Ajay Yadava <[email protected]> Authored: Fri Aug 7 19:42:35 2015 +0530 Committer: Ajay Yadava <[email protected]> Committed: Fri Aug 7 20:03:58 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../falcon/entity/store/ConfigurationStore.java | 64 +++++++++++++++----- 2 files changed, 50 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/5c9fe56b/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index be06b6f..2c16bd8 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -11,6 +11,8 @@ Trunk (Unreleased) FALCON-796 Enable users to triage data processing issues through falcon (Ajay Yadava) IMPROVEMENTS + FALCON-1368 Improve Falcon server restart time(Sandeep Samudrala via Ajay Yadava) + FALCON-1361 Default end date should be now(Pragya Mittal via Ajay Yadava) FALCON-1362 Colo option shouldn't be mandatory(Sandeep Samudrala via Ajay Yadava) http://git-wip-us.apache.org/repos/asf/falcon/blob/5c9fe56b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java index 7b53ebb..e27187b 100644 --- a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java +++ b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java @@ -49,6 +49,9 @@ import java.util.LinkedHashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; /** * Persistent store for falcon entities. @@ -145,25 +148,54 @@ public final class ConfigurationStore implements FalconService { } if (shouldPersist) { - try { - for (EntityType type : ENTITY_LOAD_ORDER) { - ConcurrentHashMap<String, Entity> entityMap = dictionary.get(type); - FileStatus[] files = fs.globStatus(new Path(storePath, type.name() + Path.SEPARATOR + "*")); - if (files != null) { - for (FileStatus file : files) { - String fileName = file.getPath().getName(); - String encodedEntityName = fileName.substring(0, fileName.length() - 4); // drop - // ".xml" - String entityName = URLDecoder.decode(encodedEntityName, UTF_8); - Entity entity = restore(type, entityName); - entityMap.put(entityName, entity); - onReload(entity); + for (final EntityType type : ENTITY_LOAD_ORDER) { + loadEntity(type); + } + } + } + + private void loadEntity(final EntityType type) throws FalconException { + try { + final ConcurrentHashMap<String, Entity> entityMap = dictionary.get(type); + FileStatus[] files = fs.globStatus(new Path(storePath, type.name() + Path.SEPARATOR + "*")); + if (files != null) { + final ExecutorService service = Executors.newFixedThreadPool(100); + for (final FileStatus file : files) { + service.execute(new Runnable() { + @Override + public void run() { + try { + String fileName = file.getPath().getName(); + String encodedEntityName = fileName.substring(0, fileName.length() - 4); // drop + // ".xml" + String entityName = URLDecoder.decode(encodedEntityName, UTF_8); + Entity entity = restore(type, entityName); + entityMap.put(entityName, entity); + } catch (IOException | FalconException e) { + LOG.error("Unable to restore entity of", file); + } } - } + }); + } + service.shutdown(); + if (service.awaitTermination(10, TimeUnit.MINUTES)) { + LOG.info("Restored Configurations for entity type: {} ", type.name()); + } else { + LOG.warn("Time out happened while waiting for all threads to finish while restoring entities " + + "for type: {}", type.name()); + } + // Checking if all entities were loaded + if (entityMap.size() != files.length) { + throw new FalconException("Unable to restore configurations for entity type " + type.name()); + } + for (Entity entity : entityMap.values()){ + onReload(entity); } - } catch (IOException e) { - throw new FalconException("Unable to restore configurations", e); } + } catch (IOException e) { + throw new FalconException("Unable to restore configurations", e); + } catch (InterruptedException e) { + throw new FalconException("Failed to restore configurations in 10 minutes for entity type " + type.name()); } }
