Repository: falcon Updated Branches: refs/heads/master 3a8659611 -> 947ed13b7
FALCON-1205 SLAService to keep track of missing SLAs for feeds. Contributed by Ajay Yadava. Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/947ed13b Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/947ed13b Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/947ed13b Branch: refs/heads/master Commit: 947ed13b71aa80236396cbe86549feeb1c509a1c Parents: 3a86596 Author: Ajay Yadava <[email protected]> Authored: Tue Sep 15 21:47:10 2015 +0530 Committer: Ajay Yadava <[email protected]> Committed: Tue Sep 15 21:47:10 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../src/main/java/org/apache/falcon/Pair.java | 6 +- .../apache/falcon/entity/CatalogStorage.java | 6 + .../org/apache/falcon/entity/FeedHelper.java | 8 + .../apache/falcon/entity/FileSystemStorage.java | 9 + .../java/org/apache/falcon/entity/Storage.java | 9 + common/src/main/resources/startup.properties | 3 + .../apache/falcon/entity/AbstractTestBase.java | 5 +- .../entity/store/FeedLocationStoreTest.java | 7 +- .../service/FeedSLAMonitoringService.java | 362 +++++++++++++++++++ src/conf/startup.properties | 8 + 11 files changed, 418 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/947ed13b/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index bd24a86..3aa300c 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -8,6 +8,8 @@ Trunk (Unreleased) FALCON-1027 Falcon proxy user support(Sowmya Ramesh) IMPROVEMENTS + FALCON-1205 SLAService to keep track of missing SLAs for feeds(Ajay Yadava) + FALCON-1449 Move getEntityProperties method to EntityUtil.(Ajay Yadava) FALCON-1357 Update CHANGES.txt to change 0.7 branch to release.(Ajay Yadava) http://git-wip-us.apache.org/repos/asf/falcon/blob/947ed13b/client/src/main/java/org/apache/falcon/Pair.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/Pair.java b/client/src/main/java/org/apache/falcon/Pair.java index e1f8bc3..d4cea90 100644 --- a/client/src/main/java/org/apache/falcon/Pair.java +++ b/client/src/main/java/org/apache/falcon/Pair.java @@ -18,12 +18,16 @@ package org.apache.falcon; +import java.io.Serializable; + /** * Simple pair class to hold a pair of object of specific class. * @param <A> - First element in pair. * @param <B> - Second element in pair */ -public class Pair<A, B> { +public class Pair<A, B> implements Serializable { + + private static final long serialVersionUID = 1L; //SUSPEND CHECKSTYLE CHECK VisibilityModifierCheck public final A first; http://git-wip-us.apache.org/repos/asf/falcon/blob/947ed13b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java index 7930fba..143d9b4 100644 --- a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java +++ b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java @@ -387,6 +387,12 @@ public class CatalogStorage extends Configured implements Storage { } @Override + public FeedInstanceStatus.AvailabilityStatus getInstanceAvailabilityStatus(Feed feed, String clusterName, + LocationType locationType, Date instancetime) throws FalconException { + throw new UnsupportedOperationException("getInstanceAvailabilityStatus"); //TODO to be implemented later + } + + @Override public StringBuilder evict(String retentionLimit, String timeZone, Path logFilePath) throws FalconException { LOG.info("Applying retention on {}, Limit: {}, timezone: {}", getTable(), retentionLimit, timeZone); http://git-wip-us.apache.org/repos/asf/falcon/blob/947ed13b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java index 894c370..572923b 100644 --- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java +++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java @@ -740,4 +740,12 @@ public final class FeedHelper { } return result; } + + public static FeedInstanceStatus.AvailabilityStatus getFeedInstanceStatus(Feed feed, String clusterName, + Date instanceTime) + throws FalconException { + Storage storage = createStorage(clusterName, feed); + return storage.getInstanceAvailabilityStatus(feed, clusterName, LocationType.DATA, instanceTime); + } + } http://git-wip-us.apache.org/repos/asf/falcon/blob/947ed13b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java index 9ff33e7..200f71f 100644 --- a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java +++ b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java @@ -460,6 +460,15 @@ public class FileSystemStorage extends Configured implements Storage { } } + @Override + public FeedInstanceStatus.AvailabilityStatus getInstanceAvailabilityStatus(Feed feed, String clusterName, + LocationType locationType, + Date instanceTime) throws FalconException { + + List<FeedInstanceStatus> result = getListing(feed, clusterName, locationType, instanceTime, instanceTime); + return result.get(0).getStatus(); + } + public FileStatus getFileStatus(FileSystem fileSystem, Path feedInstancePath) { FileStatus fileStatus = null; try { http://git-wip-us.apache.org/repos/asf/falcon/blob/947ed13b/common/src/main/java/org/apache/falcon/entity/Storage.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/Storage.java b/common/src/main/java/org/apache/falcon/entity/Storage.java index 777cde8..3dc8f67 100644 --- a/common/src/main/java/org/apache/falcon/entity/Storage.java +++ b/common/src/main/java/org/apache/falcon/entity/Storage.java @@ -94,6 +94,15 @@ public interface Storage extends Configurable { List<FeedInstanceStatus> getListing(Feed feed, String cluster, LocationType locationType, Date start, Date end) throws FalconException; + + /** + * Checks the availability status for a given feed instance. + */ + FeedInstanceStatus.AvailabilityStatus getInstanceAvailabilityStatus(Feed feed, String clusterName, + LocationType locationType, + Date instanceTime) throws FalconException; + + /** * Delete the instances of the feeds which are older than the retentionLimit specified. * http://git-wip-us.apache.org/repos/asf/falcon/blob/947ed13b/common/src/main/resources/startup.properties ---------------------------------------------------------------------- diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties index 0593b96..39a412d 100644 --- a/common/src/main/resources/startup.properties +++ b/common/src/main/resources/startup.properties @@ -46,6 +46,7 @@ org.apache.falcon.entity.ColoClusterRelation,\ org.apache.falcon.group.FeedGroupMap,\ org.apache.falcon.entity.store.FeedLocationStore,\ + org.apache.falcon.service.FeedSLAMonitoringService,\ org.apache.falcon.service.SharedLibraryHostingService ##### JMS MQ Broker Implementation class ##### @@ -69,6 +70,8 @@ #Configurations used in UTs debug.config.store.uri=file://${user.dir}/target/store +#Location to store state of Feed SLA monitoring service +debug.pending.feed.instances.store.uri = file://${user.dir}/data/sla/pendingfeedinstances debug.config.oozie.conf.uri=${user.dir}/target/oozie debug.system.lib.location=${system.lib.location} debug.broker.url=vm://localhost http://git-wip-us.apache.org/repos/asf/falcon/blob/947ed13b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java index a36623c..6179855 100644 --- a/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java +++ b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java @@ -72,8 +72,9 @@ public class AbstractTestBase { cleanupStore(); String listeners = StartupProperties.get().getProperty("configstore.listeners"); - StartupProperties.get().setProperty("configstore.listeners", - listeners.replace("org.apache.falcon.service.SharedLibraryHostingService", "")); + listeners = listeners.replace("org.apache.falcon.service.SharedLibraryHostingService", ""); + listeners = listeners.replace("org.apache.falcon.service.FeedSLAMonitoringService", ""); + StartupProperties.get().setProperty("configstore.listeners", listeners); store = ConfigurationStore.get(); store.init(); http://git-wip-us.apache.org/repos/asf/falcon/blob/947ed13b/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java b/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java index b94c248..033a55b 100644 --- a/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java +++ b/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java @@ -60,10 +60,9 @@ public class FeedLocationStoreTest extends AbstractTestBase { cleanupStore(); String listeners = StartupProperties.get().getProperty("configstore.listeners"); - StartupProperties.get().setProperty("configstore.listeners", - listeners.replace("org.apache.falcon.service.SharedLibraryHostingService", "")); -// StartupProperties.get().setProperty("configstore.listeners", -// "org.apache.falcon.entity.store.FeedLocationStore"); + listeners = listeners.replace("org.apache.falcon.service.SharedLibraryHostingService", ""); + listeners = listeners.replace("org.apache.falcon.service.FeedSLAMonitoringService", ""); + StartupProperties.get().setProperty("configstore.listeners", listeners); store = ConfigurationStore.get(); store.init(); http://git-wip-us.apache.org/repos/asf/falcon/blob/947ed13b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java new file mode 100644 index 0000000..37aa9e6 --- /dev/null +++ b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java @@ -0,0 +1,362 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.service; + +import org.apache.commons.io.IOUtils; +import org.apache.falcon.FalconException; +import org.apache.falcon.Pair; +import org.apache.falcon.entity.EntityUtil; +import org.apache.falcon.entity.FeedHelper; +import org.apache.falcon.entity.FeedInstanceStatus; +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.feed.Cluster; +import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.hadoop.HadoopClientFactory; +import org.apache.falcon.util.DeploymentUtil; +import org.apache.falcon.util.StartupProperties; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.OutputStream; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * Service to monitor Feed SLAs. + */ +public class FeedSLAMonitoringService implements ConfigurationChangeListener, FalconService { + private static final Logger LOG = LoggerFactory.getLogger(FeedSLAMonitoringService.class); + + private static final String ONE_HOUR = String.valueOf(60 * 60 * 1000); + + private static final int ONE_MS = 1; + + /** + * Permissions for storePath. + */ + private static final FsPermission STORE_PERMISSION = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE); + + /** + * Feeds to be monitored. + */ + private static Set<String> monitoredFeeds; + + + /** + * Map<Pair<feedName, clusterName>, Set<instanceTime> to store + * each missing instance of a feed. + */ + private static Map<Pair<String, String>, Set<Date>> pendingInstances; + + + /** + * Used to store the last time when pending instances were checked for SLA. + */ + private static Date lastCheckedAt; + + + /** + * Used to store last time when the state was serialized to the store. + */ + private static Date lastSerializedAt; + + /** + * Frequency in seconds of "status check" for pending feed instances. + */ + private static final int STATUS_CHECK_FREQUENCY_SECS = 10 * 60; // 10 minutes + + + /** + * Time Duration (in milliseconds) in future for generating pending feed instances. + * + * In every cycle pending feed instances are added for monitoring, till this time in future. + */ + private static final int LOOKAHEAD_WINDOW_MILLIS = 15 * 60 * 1000; // 15 MINUTES + + + /** + * Frequency in milliseconds of serializing(for backup) monitoring service's state. + */ + private int serializationFrequencyMillis; + + /** + * Filesystem used for serializing and deserializing. + */ + private FileSystem fileSystem; + + /** + * Working directory for the feed sla monitoring service. + */ + private Path storePath; + + /** + * Path to store the state of the monitoring service. + */ + private Path filePath; + + @Override + public void onAdd(Entity entity) throws FalconException { + if (entity.getEntityType() == EntityType.FEED) { + Feed feed = (Feed) entity; + // currently sla service is enabled only for fileSystemStorage + if (feed.getLocations() != null && feed.getSla() != null) { + Set<String> currentClusters = DeploymentUtil.getCurrentClusters(); + for (Cluster cluster : feed.getClusters().getClusters()) { + if (currentClusters.contains(cluster.getName())) { + LOG.debug("Adding feed:{} for monitoring", feed.getName()); + monitoredFeeds.add(feed.getName()); + } + } + } + } + } + + @Override + public void onRemove(Entity entity) throws FalconException { + if (entity.getEntityType() == EntityType.FEED) { + Feed feed = (Feed) entity; + if (feed.getSla() != null) { + Set<String> currentClusters = DeploymentUtil.getCurrentClusters(); + for (Cluster cluster : feed.getClusters().getClusters()) { + if (currentClusters.contains(cluster.getName())) { + monitoredFeeds.remove(feed.getName()); + break; + } + } + } + } + } + + @Override + public void onChange(Entity oldEntity, Entity newEntity) throws FalconException { + onRemove(oldEntity); + onAdd(newEntity); + } + + @Override + public void onReload(Entity entity) throws FalconException { + onAdd(entity); + } + + @Override + public String getName() { + return FeedSLAMonitoringService.class.getSimpleName(); + } + + @Override + public void init() throws FalconException { + String uri = StartupProperties.get().getProperty("feed.sla.service.store.uri"); + storePath = new Path(uri); + filePath = new Path(storePath, "feedSLAMonitoringService"); + fileSystem = initializeFileSystem(); + + String freq = StartupProperties.get().getProperty("feed.sla.serialization.frequency.millis", ONE_HOUR); + try { + serializationFrequencyMillis = Integer.valueOf(freq); + } catch (NumberFormatException e) { + LOG.error("Invalid value : {} found in startup.properties for the property " + + "feed.sla.serialization.frequency.millis Should be an integer", freq); + throw new FalconException("Invalid integer value for property ", e); + } + deserialize(filePath); + ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); + executor.scheduleWithFixedDelay(new Monitor(), 0, STATUS_CHECK_FREQUENCY_SECS, TimeUnit.SECONDS); + } + + @Override + public void destroy() throws FalconException { + serializeState(); // store the state of monitoring service to the disk. + } + + private FileSystem initializeFileSystem() { + try { + fileSystem = HadoopClientFactory.get().createFalconFileSystem(storePath.toUri()); + if (!fileSystem.exists(storePath)) { + LOG.info("Creating directory for pending feed instances: {}", storePath); + // set permissions so config store dir is owned by falcon alone + HadoopClientFactory.mkdirs(fileSystem, storePath, STORE_PERMISSION); + } + return fileSystem; + } catch (Exception e) { + throw new RuntimeException("Unable to bring up feed sla store for path: " + storePath, e); + } + } + + //Periodically update status of pending instances, add new instances and take backup. + private class Monitor implements Runnable { + + @Override + public void run() { + try { + if (!monitoredFeeds.isEmpty()) { + checkPendingInstanceAvailability(); + + // add Instances from last checked time to 10 minutes from now(some buffer for status check) + Date now = new Date(); + Date newCheckPoint = new Date(now.getTime() + LOOKAHEAD_WINDOW_MILLIS); + addNewPendingFeedInstances(lastCheckedAt, newCheckPoint); + lastCheckedAt = newCheckPoint; + + //take backup + if (now.getTime() - lastSerializedAt.getTime() > serializationFrequencyMillis) { + serializeState(); + lastSerializedAt = new Date(); + } + } + } catch (Throwable e) { + LOG.error("Feed SLA monitoring failed: ", e); + } + } + } + + + void addNewPendingFeedInstances(Date from, Date to) throws FalconException { + Set<String> currentClusters = DeploymentUtil.getCurrentClusters(); + for (String feedName : monitoredFeeds) { + + Feed feed = EntityUtil.getEntity(EntityType.FEED, feedName); + for (Cluster feedCluster : feed.getClusters().getClusters()) { + if (currentClusters.contains(feedCluster.getName())) { + Date nextInstanceTime = from; + Pair<String, String> key = new Pair<>(feed.getName(), feedCluster.getName()); + Set<Date> instances = pendingInstances.get(key); + if (instances == null) { + instances = new HashSet<>(); + } + + org.apache.falcon.entity.v0.cluster.Cluster currentCluster = + EntityUtil.getEntity(EntityType.CLUSTER, feedCluster.getName()); + while (nextInstanceTime.before(to)) { + nextInstanceTime = EntityUtil.getNextStartTime(feed, currentCluster, nextInstanceTime); + instances.add(nextInstanceTime); + nextInstanceTime = new Date(nextInstanceTime.getTime() + ONE_MS); + } + pendingInstances.put(key, instances); + } + } + } + } + + + /** + * Checks the availability of all the pendingInstances and removes the ones which have become available. + */ + private void checkPendingInstanceAvailability() throws FalconException { + for (Map.Entry<Pair<String, String>, Set<Date>> entry: pendingInstances.entrySet()) { + for (Date date : entry.getValue()) { + boolean status = checkFeedInstanceAvailability(entry.getKey().first, entry.getKey().second, date); + if (status) { + pendingInstances.get(entry.getKey()).remove(date); + } + } + } + } + + // checks whether a given feed instance is available or not + private boolean checkFeedInstanceAvailability(String feedName, String clusterName, Date nominalTime) throws + FalconException { + Feed feed = EntityUtil.getEntity(EntityType.FEED, feedName); + + try { + LOG.debug("Checking instance availability status for feed:{}, cluster:{}, instanceTime:{}", feed.getName(), + clusterName, nominalTime); + FeedInstanceStatus.AvailabilityStatus status = FeedHelper.getFeedInstanceStatus(feed, clusterName, + nominalTime); + if (status == FeedInstanceStatus.AvailabilityStatus.AVAILABLE) { + LOG.debug("Feed instance(feed:{}, cluster:{}, instanceTime:{}) is available.", feed.getName(), + clusterName, nominalTime); + return true; + } + } catch (Throwable e) { + LOG.error("Couldn't find status for feed:{}, cluster:{}", feedName, clusterName, e); + } + LOG.debug("Feed instance(feed:{}, cluster:{}, instanceTime:{}) is not available.", feed.getName(), + clusterName, nominalTime); + return false; + } + + private void serializeState() throws FalconException{ + LOG.info("Saving context to: [{}]", storePath); + + //create a temporary file and rename it. + Path tmp = new Path(storePath , "tmp"); + ObjectOutputStream oos = null; + try { + OutputStream out = fileSystem.create(tmp); + oos = new ObjectOutputStream(out); + Map<String, Object> state = new HashMap<>(); + state.put("lastSerializedAt", lastSerializedAt.getTime()); + state.put("lastCheckedAt", lastCheckedAt.getTime()); + state.put("monitoredFeeds", monitoredFeeds); + state.put("pendingInstances", pendingInstances); + oos.writeObject(state); + fileSystem.rename(tmp, filePath); + } catch (IOException e) { + throw new FalconException("Error serializing context to : " + storePath.toUri(), e); + } finally { + IOUtils.closeQuietly(oos); + } + } + + @SuppressWarnings("unchecked") + private void deserialize(Path path) { + try { + Map<String, Object> state = deserializeInternal(path); + pendingInstances = (Map<Pair<String, String>, Set<Date>>) state.get("pendingInstances"); + lastCheckedAt = (Date) state.get("lastCheckedAt"); + lastSerializedAt = (Date) state.get("lastSerializedAt"); + monitoredFeeds = (Set<String>) state.get("monitoredFeeds"); + + } catch (Throwable throwable) { + LOG.error("Couldn't restore the state of feed sla monitoring service. Resetting it", throwable); + pendingInstances = new HashMap<>(); + lastCheckedAt = new Date(); + lastSerializedAt = new Date(); + monitoredFeeds = new HashSet<>(); + } + } + + @SuppressWarnings("unchecked") + private Map<String, Object> deserializeInternal(Path path) throws IOException, ClassNotFoundException { + Map<String, Object> state; + InputStream in = fileSystem.open(path); + ObjectInputStream ois = new ObjectInputStream(in); + try { + state = (Map<String, Object>) ois.readObject(); + } finally { + ois.close(); + } + return state; + } + +} + http://git-wip-us.apache.org/repos/asf/falcon/blob/947ed13b/src/conf/startup.properties ---------------------------------------------------------------------- diff --git a/src/conf/startup.properties b/src/conf/startup.properties index ca55689..305ac36 100644 --- a/src/conf/startup.properties +++ b/src/conf/startup.properties @@ -37,6 +37,7 @@ *.application.services=org.apache.falcon.security.AuthenticationInitializationService,\ org.apache.falcon.workflow.WorkflowJobEndNotificationService, \ org.apache.falcon.service.ProcessSubscriberService,\ + org.apache.falcon.service.FeedSLAMonitoringService,\ org.apache.falcon.entity.store.ConfigurationStore,\ org.apache.falcon.rerun.service.RetryService,\ org.apache.falcon.rerun.service.LateRunService,\ @@ -53,6 +54,7 @@ prism.application.services=org.apache.falcon.entity.store.ConfigurationStore org.apache.falcon.entity.ColoClusterRelation,\ org.apache.falcon.group.FeedGroupMap,\ org.apache.falcon.entity.store.FeedLocationStore,\ + org.apache.falcon.service.FeedSLAMonitoringService,\ org.apache.falcon.service.SharedLibraryHostingService ##### Prism Configuration Store Change listeners ##### @@ -77,6 +79,9 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\ # Location to store user entity configurations *.config.store.uri=file://${falcon.home}/data/${falcon.app.type}-store +#Location to store state of Feed SLA monitoring service +*.feed.sla.service.store.uri = file://${falcon.home}/data/sla/pendingfeedinstances + # Location of libraries that is shipped to Hadoop *.system.lib.location=${falcon.home}/server/webapp/${falcon.app.type}/WEB-INF/lib @@ -84,6 +89,9 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\ *.falcon.cleanup.service.frequency=days(1) +# frequency of serialization for the state of FeedSLAMonitoringService - 1 hour +*.feed.sla.serialization.frequency.millis=3600000 + ######### Properties for configuring JMS provider - activemq ######### # Default Active MQ url
