Repository: falcon
Updated Branches:
  refs/heads/master 037e6821b -> d3ebf0b3d


FALCON-2034 Make numThreads and timeOut configurable In ConfigurationStore init

Author: sandeep <[email protected]>

Reviewers: @pallavi-rao, @peeyushb

Closes #192 from sandeepSamudrala/FALCON-2034 and squashes the following 
commits:

78b98d5 [sandeep] FALCON-2034. Make numThreads and timeOut configurable In 
ConfigurationStore init
9d00722 [sandeep] FALCON-2034. Make numThreads and timeOut configurable In 
ConfigurationStore init


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/d3ebf0b3
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/d3ebf0b3
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/d3ebf0b3

Branch: refs/heads/master
Commit: d3ebf0b3d826c32e068c0c786d51dd4d39a7f08f
Parents: 037e682
Author: sandeep <[email protected]>
Authored: Tue Jun 21 10:21:14 2016 +0530
Committer: Pallavi Rao <[email protected]>
Committed: Tue Jun 21 10:21:14 2016 +0530

----------------------------------------------------------------------
 .../falcon/entity/store/ConfigurationStore.java | 29 +++++++++++++++++---
 src/conf/startup.properties                     |  6 ++++
 2 files changed, 31 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/d3ebf0b3/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java 
b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
index debf106..19e10bd 100644
--- 
a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
+++ 
b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
@@ -70,6 +70,10 @@ public final class ConfigurationStore implements 
FalconService {
     private static final Logger LOG = 
LoggerFactory.getLogger(ConfigurationStore.class);
     private static final Logger AUDIT = LoggerFactory.getLogger("AUDIT");
     private static final String UTF_8 = CharEncoding.UTF_8;
+    private static final String LOAD_ENTITIES_THREADS = 
"config.store.num.threads.load.entities";
+    private static final String TIMEOUT_MINS_LOAD_ENTITIES = 
"config.store.start.timeout.minutes";
+    private int numThreads;
+    private int restoreTimeOutInMins;
     private final boolean shouldPersist;
 
     private static final FsPermission STORE_PERMISSION =
@@ -150,6 +154,21 @@ public final class ConfigurationStore implements 
FalconService {
 
     @Override
     public void init() throws FalconException {
+        try {
+            numThreads = 
Integer.parseInt(StartupProperties.get().getProperty(LOAD_ENTITIES_THREADS, 
"100"));
+            LOG.info("Number of threads used to restore entities: {}", 
restoreTimeOutInMins);
+        } catch (NumberFormatException nfe) {
+            throw new FalconException("Invalid value specified for start up 
property \""
+                    + LOAD_ENTITIES_THREADS + "\".Please provide an integer 
value");
+        }
+        try {
+            restoreTimeOutInMins = Integer.parseInt(StartupProperties.get().
+                    getProperty(TIMEOUT_MINS_LOAD_ENTITIES, "30"));
+            LOG.info("TimeOut to load Entities is taken as {} mins", 
restoreTimeOutInMins);
+        } catch (NumberFormatException nfe) {
+            throw new FalconException("Invalid value specified for start up 
property \""
+                    + TIMEOUT_MINS_LOAD_ENTITIES + "\".Please provide an 
integer value");
+        }
         String listenerClassNames = StartupProperties.get().
                 getProperty("configstore.listeners", 
"org.apache.falcon.entity.v0.EntityGraph");
         for (String listenerClassName : listenerClassNames.split(",")) {
@@ -173,7 +192,8 @@ public final class ConfigurationStore implements 
FalconService {
             final ConcurrentHashMap<String, Entity> entityMap = 
dictionary.get(type);
             FileStatus[] files = fs.globStatus(new Path(storePath, type.name() 
+ Path.SEPARATOR + "*"));
             if (files != null) {
-                final ExecutorService service = 
Executors.newFixedThreadPool(100);
+
+                final ExecutorService service = 
Executors.newFixedThreadPool(numThreads);
                 for (final FileStatus file : files) {
                     service.execute(new Runnable() {
                         @Override
@@ -184,6 +204,7 @@ public final class ConfigurationStore implements 
FalconService {
                                 // ".xml"
                                 String entityName = 
URLDecoder.decode(encodedEntityName, UTF_8);
                                 Entity entity = restore(type, entityName);
+                                LOG.info("Restored configuration {}/{}", type, 
entityName);
                                 entityMap.put(entityName, entity);
                             } catch (IOException | FalconException e) {
                                 LOG.error("Unable to restore entity of", file);
@@ -192,10 +213,10 @@ public final class ConfigurationStore implements 
FalconService {
                     });
                 }
                 service.shutdown();
-                if (service.awaitTermination(10, TimeUnit.MINUTES)) {
+                if (service.awaitTermination(restoreTimeOutInMins, 
TimeUnit.MINUTES)) {
                     LOG.info("Restored Configurations for entity type: {} ", 
type.name());
                 } else {
-                    LOG.warn("Time out happened while waiting for all threads 
to finish while restoring entities "
+                    LOG.warn("Timed out while waiting for all threads to 
finish while restoring entities "
                             + "for type: {}", type.name());
                 }
                 // Checking if all entities were loaded
@@ -341,6 +362,7 @@ public final class ConfigurationStore implements 
FalconService {
                 } catch (IOException e) {
                     throw new StoreAccessException(e);
                 }
+                LOG.info("Restored configuration {}/{}", type, name);
                 entityMap.put(name, entity);
                 return entity;
             } else {
@@ -450,7 +472,6 @@ public final class ConfigurationStore implements 
FalconService {
             throw new StoreAccessException("Unable to un-marshall xml 
definition for " + type + "/" + name, e);
         } finally {
             in.close();
-            LOG.info("Restored configuration {}/{}", type, name);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/d3ebf0b3/src/conf/startup.properties
----------------------------------------------------------------------
diff --git a/src/conf/startup.properties b/src/conf/startup.properties
index ae50d51..5ac3d5c 100644
--- a/src/conf/startup.properties
+++ b/src/conf/startup.properties
@@ -136,6 +136,12 @@ 
prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
 
 *.falcon.cleanup.service.frequency=days(1)
 
+# Default number of threads to be used to restore entities.
+*.config.store.num.threads.load.entities=100
+
+# Default timeout in minutes to load entities
+*.config.store.start.timeout.minutes=30
+
 ######### Properties for Feed SLA Monitoring #########
 # frequency of serialization for the state of FeedSLAMonitoringService - 1 hour
 *.feed.sla.serialization.frequency.millis=3600000

Reply via email to