Repository: falcon Updated Branches: refs/heads/0.10 bdda78ca8 -> b67dd535e
FALCON-2034 Make numThreads and timeOut configurable In ConfigurationStore init Author: sandeep <[email protected]> Reviewers: @pallavi-rao, @peeyushb Closes #192 from sandeepSamudrala/FALCON-2034 and squashes the following commits: 78b98d5 [sandeep] FALCON-2034. Make numThreads and timeOut configurable In ConfigurationStore init 9d00722 [sandeep] FALCON-2034. Make numThreads and timeOut configurable In ConfigurationStore init (cherry picked from commit d3ebf0b3d826c32e068c0c786d51dd4d39a7f08f) Signed-off-by: Pallavi Rao <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/b67dd535 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/b67dd535 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/b67dd535 Branch: refs/heads/0.10 Commit: b67dd535ea6c57c4008fd3165a339a6bc1581839 Parents: bdda78c Author: sandeep <[email protected]> Authored: Tue Jun 21 10:21:14 2016 +0530 Committer: Pallavi Rao <[email protected]> Committed: Tue Jun 21 10:21:32 2016 +0530 ---------------------------------------------------------------------- .../falcon/entity/store/ConfigurationStore.java | 29 +++++++++++++++++--- src/conf/startup.properties | 6 ++++ 2 files changed, 31 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/b67dd535/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java index 7f2b172..52feab7 100644 --- a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java +++ b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java @@ -69,6 +69,10 @@ public final class ConfigurationStore implements FalconService { private static final Logger LOG = LoggerFactory.getLogger(ConfigurationStore.class); private static final Logger AUDIT = LoggerFactory.getLogger("AUDIT"); private static final String UTF_8 = CharEncoding.UTF_8; + private static final String LOAD_ENTITIES_THREADS = "config.store.num.threads.load.entities"; + private static final String TIMEOUT_MINS_LOAD_ENTITIES = "config.store.start.timeout.minutes"; + private int numThreads; + private int restoreTimeOutInMins; private final boolean shouldPersist; private static final FsPermission STORE_PERMISSION = @@ -149,6 +153,21 @@ public final class ConfigurationStore implements FalconService { @Override public void init() throws FalconException { + try { + numThreads = Integer.parseInt(StartupProperties.get().getProperty(LOAD_ENTITIES_THREADS, "100")); + LOG.info("Number of threads used to restore entities: {}", restoreTimeOutInMins); + } catch (NumberFormatException nfe) { + throw new FalconException("Invalid value specified for start up property \"" + + LOAD_ENTITIES_THREADS + "\".Please provide an integer value"); + } + try { + restoreTimeOutInMins = Integer.parseInt(StartupProperties.get(). + getProperty(TIMEOUT_MINS_LOAD_ENTITIES, "30")); + LOG.info("TimeOut to load Entities is taken as {} mins", restoreTimeOutInMins); + } catch (NumberFormatException nfe) { + throw new FalconException("Invalid value specified for start up property \"" + + TIMEOUT_MINS_LOAD_ENTITIES + "\".Please provide an integer value"); + } String listenerClassNames = StartupProperties.get(). getProperty("configstore.listeners", "org.apache.falcon.entity.v0.EntityGraph"); for (String listenerClassName : listenerClassNames.split(",")) { @@ -172,7 +191,8 @@ public final class ConfigurationStore implements FalconService { final ConcurrentHashMap<String, Entity> entityMap = dictionary.get(type); FileStatus[] files = fs.globStatus(new Path(storePath, type.name() + Path.SEPARATOR + "*")); if (files != null) { - final ExecutorService service = Executors.newFixedThreadPool(100); + + final ExecutorService service = Executors.newFixedThreadPool(numThreads); for (final FileStatus file : files) { service.execute(new Runnable() { @Override @@ -183,6 +203,7 @@ public final class ConfigurationStore implements FalconService { // ".xml" String entityName = URLDecoder.decode(encodedEntityName, UTF_8); Entity entity = restore(type, entityName); + LOG.info("Restored configuration {}/{}", type, entityName); entityMap.put(entityName, entity); } catch (IOException | FalconException e) { LOG.error("Unable to restore entity of", file); @@ -191,10 +212,10 @@ public final class ConfigurationStore implements FalconService { }); } service.shutdown(); - if (service.awaitTermination(10, TimeUnit.MINUTES)) { + if (service.awaitTermination(restoreTimeOutInMins, TimeUnit.MINUTES)) { LOG.info("Restored Configurations for entity type: {} ", type.name()); } else { - LOG.warn("Time out happened while waiting for all threads to finish while restoring entities " + LOG.warn("Timed out while waiting for all threads to finish while restoring entities " + "for type: {}", type.name()); } // Checking if all entities were loaded @@ -336,6 +357,7 @@ public final class ConfigurationStore implements FalconService { } catch (IOException e) { throw new StoreAccessException(e); } + LOG.info("Restored configuration {}/{}", type, name); entityMap.put(name, entity); return entity; } else { @@ -445,7 +467,6 @@ public final class ConfigurationStore implements FalconService { throw new StoreAccessException("Unable to un-marshall xml definition for " + type + "/" + name, e); } finally { in.close(); - LOG.info("Restored configuration {}/{}", type, name); } } http://git-wip-us.apache.org/repos/asf/falcon/blob/b67dd535/src/conf/startup.properties ---------------------------------------------------------------------- diff --git a/src/conf/startup.properties b/src/conf/startup.properties index ae50d51..5ac3d5c 100644 --- a/src/conf/startup.properties +++ b/src/conf/startup.properties @@ -136,6 +136,12 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\ *.falcon.cleanup.service.frequency=days(1) +# Default number of threads to be used to restore entities. +*.config.store.num.threads.load.entities=100 + +# Default timeout in minutes to load entities +*.config.store.start.timeout.minutes=30 + ######### Properties for Feed SLA Monitoring ######### # frequency of serialization for the state of FeedSLAMonitoringService - 1 hour *.feed.sla.serialization.frequency.millis=3600000
