Repository: falcon
Updated Branches:
  refs/heads/master 8b6f7c526 -> 27a69b185


FALCON-1472 Improvements in SLA service. Contributed by Ajay Yadava.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/27a69b18
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/27a69b18
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/27a69b18

Branch: refs/heads/master
Commit: 27a69b1850c948a94ad361cbdee21865381852b2
Parents: 8b6f7c5
Author: Ajay Yadava <[email protected]>
Authored: Fri Sep 25 22:36:23 2015 +0530
Committer: Ajay Yadava <[email protected]>
Committed: Fri Sep 25 22:36:23 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../org/apache/falcon/util/DeploymentUtil.java  |  9 +++++
 .../service/FeedSLAMonitoringService.java       | 39 +++++++++++++-------
 src/conf/log4j.xml                              | 14 +++++++
 4 files changed, 50 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/27a69b18/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 96eae36..a4edebc 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,8 @@ Trunk (Unreleased)
     FALCON-1027 Falcon proxy user support(Sowmya Ramesh)
 
   IMPROVEMENTS
+    FALCON-1472 Improvements in SLA service(Ajay Yadava)
+
     FALCON-438 Auto generate documentation for REST API(Narayan Periwal via 
Ajay Yadava)
 
     FALCON-1483 Add Utils to common to support native scheduler(Pallavi Rao 
via Ajay Yadava)

http://git-wip-us.apache.org/repos/asf/falcon/blob/27a69b18/common/src/main/java/org/apache/falcon/util/DeploymentUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/util/DeploymentUtil.java 
b/common/src/main/java/org/apache/falcon/util/DeploymentUtil.java
index 5d65073..561520c 100644
--- a/common/src/main/java/org/apache/falcon/util/DeploymentUtil.java
+++ b/common/src/main/java/org/apache/falcon/util/DeploymentUtil.java
@@ -19,9 +19,12 @@
 package org.apache.falcon.util;
 
 import org.apache.falcon.entity.ColoClusterRelation;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
 
@@ -69,6 +72,12 @@ public final class DeploymentUtil {
     }
 
     public static Set<String> getCurrentClusters() {
+        // return all clusters in embedded mode
+        if (EMBEDDED_MODE) {
+            Collection<String> allClusters = 
ConfigurationStore.get().getEntities(EntityType.CLUSTER);
+            Set<String> result = new HashSet<>(allClusters);
+            return result;
+        }
         String colo = getCurrentColo();
         return ColoClusterRelation.get().getClusters(colo);
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/27a69b18/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
----------------------------------------------------------------------
diff --git 
a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java 
b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
index 37aa9e6..8bf43b8 100644
--- 
a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
+++ 
b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
@@ -54,7 +54,7 @@ import java.util.concurrent.TimeUnit;
  * Service to monitor Feed SLAs.
  */
 public class FeedSLAMonitoringService implements ConfigurationChangeListener, 
FalconService {
-    private static final Logger LOG = 
LoggerFactory.getLogger(FeedSLAMonitoringService.class);
+    private static final Logger LOG = LoggerFactory.getLogger("FeedSLA");
 
     private static final String ONE_HOUR = String.valueOf(60 * 60 * 1000);
 
@@ -187,7 +187,16 @@ public class FeedSLAMonitoringService implements 
ConfigurationChangeListener, Fa
                     + "feed.sla.serialization.frequency.millis Should be an 
integer", freq);
             throw new FalconException("Invalid integer value for property ", 
e);
         }
-        deserialize(filePath);
+        try {
+            if (fileSystem.exists(filePath)) {
+                deserialize(filePath);
+            } else {
+                LOG.debug("No old state exists at: {}, Initializing a clean 
state.", filePath.toString());
+                initializeService();
+            }
+        } catch (IOException e) {
+            throw new FalconException("Couldn't check the existence of " + 
filePath, e);
+        }
         ScheduledThreadPoolExecutor executor = new 
ScheduledThreadPoolExecutor(1);
         executor.scheduleWithFixedDelay(new Monitor(), 0, 
STATUS_CHECK_FREQUENCY_SECS, TimeUnit.SECONDS);
     }
@@ -316,7 +325,6 @@ public class FeedSLAMonitoringService implements 
ConfigurationChangeListener, Fa
             Map<String, Object> state = new HashMap<>();
             state.put("lastSerializedAt", lastSerializedAt.getTime());
             state.put("lastCheckedAt", lastCheckedAt.getTime());
-            state.put("monitoredFeeds", monitoredFeeds);
             state.put("pendingInstances", pendingInstances);
             oos.writeObject(state);
             fileSystem.rename(tmp, filePath);
@@ -328,23 +336,26 @@ public class FeedSLAMonitoringService implements 
ConfigurationChangeListener, Fa
     }
 
     @SuppressWarnings("unchecked")
-    private void deserialize(Path path) {
+    private void deserialize(Path path) throws FalconException {
         try {
             Map<String, Object> state = deserializeInternal(path);
             pendingInstances = (Map<Pair<String, String>, Set<Date>>) 
state.get("pendingInstances");
-            lastCheckedAt = (Date) state.get("lastCheckedAt");
-            lastSerializedAt = (Date) state.get("lastSerializedAt");
-            monitoredFeeds = (Set<String>) state.get("monitoredFeeds");
-
-        } catch (Throwable throwable) {
-            LOG.error("Couldn't restore the state of feed sla monitoring 
service. Resetting it", throwable);
-            pendingInstances = new HashMap<>();
-            lastCheckedAt = new Date();
-            lastSerializedAt = new Date();
-            monitoredFeeds = new HashSet<>();
+            lastCheckedAt = new Date((Long) state.get("lastCheckedAt"));
+            lastSerializedAt = new Date((Long) state.get("lastSerializedAt"));
+            monitoredFeeds = new HashSet<>(); // will be populated on the 
onLoad of entities.
+            LOG.debug("Restored the service from old state.");
+        } catch (IOException | ClassNotFoundException e) {
+            throw new FalconException("Couldn't deserialize the old state", e);
         }
     }
 
+    private void initializeService() {
+        pendingInstances = new HashMap<>();
+        lastCheckedAt = new Date();
+        lastSerializedAt = new Date();
+        monitoredFeeds = new HashSet<>();
+    }
+
     @SuppressWarnings("unchecked")
     private Map<String, Object> deserializeInternal(Path path) throws 
IOException, ClassNotFoundException {
         Map<String, Object> state;

http://git-wip-us.apache.org/repos/asf/falcon/blob/27a69b18/src/conf/log4j.xml
----------------------------------------------------------------------
diff --git a/src/conf/log4j.xml b/src/conf/log4j.xml
index 1341c6e..892f4e2 100644
--- a/src/conf/log4j.xml
+++ b/src/conf/log4j.xml
@@ -51,6 +51,15 @@
         </layout>
     </appender>
 
+    <appender name="FeedSLA" class="org.apache.log4j.DailyRollingFileAppender">
+        <param name="File" 
value="${falcon.log.dir}/${falcon.app.type}.feed.sla.log"/>
+        <param name="Append" value="true"/>
+        <param name="Threshold" value="debug"/>
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%d %m%n"/>
+        </layout>
+    </appender>
+
     <appender name="ALERT" class="org.apache.log4j.DailyRollingFileAppender">
         <param name="File" 
value="${falcon.log.dir}/${falcon.app.type}.alerts.log"/>
         <param name="Append" value="true"/>
@@ -84,6 +93,11 @@
         <appender-ref ref="METRIC"/>
     </logger>
 
+    <logger name="FeedSLA">
+        <level value="debug"/>
+        <appender-ref ref="FeedSLA"/>
+    </logger>
+
     <logger name="org.apache.hadoop.security" additivity="false">
         <level value="info"/>
         <appender-ref ref="SECURITY"/>

Reply via email to