Repository: falcon Updated Branches: refs/heads/master 8b6f7c526 -> 27a69b185
FALCON-1472 Improvements in SLA service. Contributed by Ajay Yadava. Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/27a69b18 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/27a69b18 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/27a69b18 Branch: refs/heads/master Commit: 27a69b1850c948a94ad361cbdee21865381852b2 Parents: 8b6f7c5 Author: Ajay Yadava <[email protected]> Authored: Fri Sep 25 22:36:23 2015 +0530 Committer: Ajay Yadava <[email protected]> Committed: Fri Sep 25 22:36:23 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../org/apache/falcon/util/DeploymentUtil.java | 9 +++++ .../service/FeedSLAMonitoringService.java | 39 +++++++++++++------- src/conf/log4j.xml | 14 +++++++ 4 files changed, 50 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/27a69b18/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 96eae36..a4edebc 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -11,6 +11,8 @@ Trunk (Unreleased) FALCON-1027 Falcon proxy user support(Sowmya Ramesh) IMPROVEMENTS + FALCON-1472 Improvements in SLA service(Ajay Yadava) + FALCON-438 Auto generate documentation for REST API(Narayan Periwal via Ajay Yadava) FALCON-1483 Add Utils to common to support native scheduler(Pallavi Rao via Ajay Yadava) http://git-wip-us.apache.org/repos/asf/falcon/blob/27a69b18/common/src/main/java/org/apache/falcon/util/DeploymentUtil.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/util/DeploymentUtil.java b/common/src/main/java/org/apache/falcon/util/DeploymentUtil.java index 5d65073..561520c 100644 --- a/common/src/main/java/org/apache/falcon/util/DeploymentUtil.java +++ b/common/src/main/java/org/apache/falcon/util/DeploymentUtil.java @@ -19,9 +19,12 @@ package org.apache.falcon.util; import org.apache.falcon.entity.ColoClusterRelation; +import org.apache.falcon.entity.store.ConfigurationStore; +import org.apache.falcon.entity.v0.EntityType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collection; import java.util.HashSet; import java.util.Set; @@ -69,6 +72,12 @@ public final class DeploymentUtil { } public static Set<String> getCurrentClusters() { + // return all clusters in embedded mode + if (EMBEDDED_MODE) { + Collection<String> allClusters = ConfigurationStore.get().getEntities(EntityType.CLUSTER); + Set<String> result = new HashSet<>(allClusters); + return result; + } String colo = getCurrentColo(); return ColoClusterRelation.get().getClusters(colo); } http://git-wip-us.apache.org/repos/asf/falcon/blob/27a69b18/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java index 37aa9e6..8bf43b8 100644 --- a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java +++ b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java @@ -54,7 +54,7 @@ import java.util.concurrent.TimeUnit; * Service to monitor Feed SLAs. */ public class FeedSLAMonitoringService implements ConfigurationChangeListener, FalconService { - private static final Logger LOG = LoggerFactory.getLogger(FeedSLAMonitoringService.class); + private static final Logger LOG = LoggerFactory.getLogger("FeedSLA"); private static final String ONE_HOUR = String.valueOf(60 * 60 * 1000); @@ -187,7 +187,16 @@ public class FeedSLAMonitoringService implements ConfigurationChangeListener, Fa + "feed.sla.serialization.frequency.millis Should be an integer", freq); throw new FalconException("Invalid integer value for property ", e); } - deserialize(filePath); + try { + if (fileSystem.exists(filePath)) { + deserialize(filePath); + } else { + LOG.debug("No old state exists at: {}, Initializing a clean state.", filePath.toString()); + initializeService(); + } + } catch (IOException e) { + throw new FalconException("Couldn't check the existence of " + filePath, e); + } ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); executor.scheduleWithFixedDelay(new Monitor(), 0, STATUS_CHECK_FREQUENCY_SECS, TimeUnit.SECONDS); } @@ -316,7 +325,6 @@ public class FeedSLAMonitoringService implements ConfigurationChangeListener, Fa Map<String, Object> state = new HashMap<>(); state.put("lastSerializedAt", lastSerializedAt.getTime()); state.put("lastCheckedAt", lastCheckedAt.getTime()); - state.put("monitoredFeeds", monitoredFeeds); state.put("pendingInstances", pendingInstances); oos.writeObject(state); fileSystem.rename(tmp, filePath); @@ -328,23 +336,26 @@ public class FeedSLAMonitoringService implements ConfigurationChangeListener, Fa } @SuppressWarnings("unchecked") - private void deserialize(Path path) { + private void deserialize(Path path) throws FalconException { try { Map<String, Object> state = deserializeInternal(path); pendingInstances = (Map<Pair<String, String>, Set<Date>>) state.get("pendingInstances"); - lastCheckedAt = (Date) state.get("lastCheckedAt"); - lastSerializedAt = (Date) state.get("lastSerializedAt"); - monitoredFeeds = (Set<String>) state.get("monitoredFeeds"); - - } catch (Throwable throwable) { - LOG.error("Couldn't restore the state of feed sla monitoring service. Resetting it", throwable); - pendingInstances = new HashMap<>(); - lastCheckedAt = new Date(); - lastSerializedAt = new Date(); - monitoredFeeds = new HashSet<>(); + lastCheckedAt = new Date((Long) state.get("lastCheckedAt")); + lastSerializedAt = new Date((Long) state.get("lastSerializedAt")); + monitoredFeeds = new HashSet<>(); // will be populated on the onLoad of entities. + LOG.debug("Restored the service from old state."); + } catch (IOException | ClassNotFoundException e) { + throw new FalconException("Couldn't deserialize the old state", e); } } + private void initializeService() { + pendingInstances = new HashMap<>(); + lastCheckedAt = new Date(); + lastSerializedAt = new Date(); + monitoredFeeds = new HashSet<>(); + } + @SuppressWarnings("unchecked") private Map<String, Object> deserializeInternal(Path path) throws IOException, ClassNotFoundException { Map<String, Object> state; http://git-wip-us.apache.org/repos/asf/falcon/blob/27a69b18/src/conf/log4j.xml ---------------------------------------------------------------------- diff --git a/src/conf/log4j.xml b/src/conf/log4j.xml index 1341c6e..892f4e2 100644 --- a/src/conf/log4j.xml +++ b/src/conf/log4j.xml @@ -51,6 +51,15 @@ </layout> </appender> + <appender name="FeedSLA" class="org.apache.log4j.DailyRollingFileAppender"> + <param name="File" value="${falcon.log.dir}/${falcon.app.type}.feed.sla.log"/> + <param name="Append" value="true"/> + <param name="Threshold" value="debug"/> + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="%d %m%n"/> + </layout> + </appender> + <appender name="ALERT" class="org.apache.log4j.DailyRollingFileAppender"> <param name="File" value="${falcon.log.dir}/${falcon.app.type}.alerts.log"/> <param name="Append" value="true"/> @@ -84,6 +93,11 @@ <appender-ref ref="METRIC"/> </logger> + <logger name="FeedSLA"> + <level value="debug"/> + <appender-ref ref="FeedSLA"/> + </logger> + <logger name="org.apache.hadoop.security" additivity="false"> <level value="info"/> <appender-ref ref="SECURITY"/>
