Repository: falcon Updated Branches: refs/heads/master c75f0f446 -> 932965b9e
FALCON-1563 Old feed instances get deleted from SLA monitoring on feed update. Contributed by Ajay Yadava. Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/932965b9 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/932965b9 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/932965b9 Branch: refs/heads/master Commit: 932965b9e22beca008365851d76f4936e10ee24b Parents: c75f0f4 Author: Ajay Yadava <[email protected]> Authored: Tue Nov 3 21:41:43 2015 +0530 Committer: Ajay Yadava <[email protected]> Committed: Tue Nov 3 21:41:43 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../org/apache/falcon/entity/FeedHelper.java | 6 ++- .../service/FeedSLAMonitoringService.java | 42 ++++++++++++++++++-- 3 files changed, 45 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/932965b9/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 69b01b6..ea8d226 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -105,6 +105,8 @@ Branch : 0.8 (Proposed Release version: 0.8) FALCON-1403 Revisit IT cleanup and teardown(Narayan Periwal via Pallavi Rao) BUG FIXES + FALCON-1563 Old feed instances get deleted from SLA monitoring on feed update (Ajay Yadava). + FALCON-1560 Lifecycle does not allow feed with frequency greater than days(1) (Ajay Yadava). FALCON-1556 Falcon build fails when building with hivedr profile(Sowmya Ramesh) http://git-wip-us.apache.org/repos/asf/falcon/blob/932965b9/common/src/main/java/org/apache/falcon/entity/FeedHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java index 0b2e94f..8c55e41 100644 --- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java +++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java @@ -274,11 +274,15 @@ public final class FeedHelper { if (clusterSla != null) { return clusterSla; } - final Sla feedSla = feed.getSla(); return feedSla == null ? null : feedSla; } + public static Sla getSLA(String clusterName, Feed feed) { + Cluster cluster = FeedHelper.getCluster(feed, clusterName); + return cluster != null ? getSLA(cluster, feed) : null; + } + protected static CatalogTable getTable(Cluster cluster, Feed feed) { // check if table is overridden in cluster if (cluster.getTable() != null) { http://git-wip-us.apache.org/repos/asf/falcon/blob/932965b9/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java index 0eae0c6..d4383ec 100644 --- a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java +++ b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java @@ -47,9 +47,11 @@ import java.io.InputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.OutputStream; +import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.BlockingQueue; @@ -166,10 +168,11 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen public void onRemove(Entity entity) throws FalconException { if (entity.getEntityType() == EntityType.FEED) { Feed feed = (Feed) entity; - if (feed.getSla() != null) { + // currently sla service is enabled only for fileSystemStorage + if (feed.getLocations() != null) { Set<String> currentClusters = DeploymentUtil.getCurrentClusters(); for (Cluster cluster : feed.getClusters().getClusters()) { - if (currentClusters.contains(cluster.getName())) { + if (currentClusters.contains(cluster.getName()) && FeedHelper.getSLA(cluster, feed) != null) { monitoredFeeds.remove(feed.getName()); pendingInstances.remove(new Pair<>(feed.getName(), cluster.getName())); } @@ -178,10 +181,41 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen } } + private boolean isSLAMonitoringEnabledInCurrentColo(Feed feed) { + if (feed.getLocations() != null) { + Set<String> currentClusters = DeploymentUtil.getCurrentClusters(); + for (Cluster cluster : feed.getClusters().getClusters()) { + if (currentClusters.contains(cluster.getName()) && FeedHelper.getSLA(cluster, feed) != null) { + return true; + } + } + } + return false; + } + @Override public void onChange(Entity oldEntity, Entity newEntity) throws FalconException { - onRemove(oldEntity); - onAdd(newEntity); + if (newEntity.getEntityType() == EntityType.FEED) { + Feed oldFeed = (Feed) oldEntity; + Feed newFeed = (Feed) newEntity; + if (!isSLAMonitoringEnabledInCurrentColo(newFeed)) { + onRemove(oldFeed); + } else if (!isSLAMonitoringEnabledInCurrentColo(oldFeed)) { + onAdd(newFeed); + } else { + List<String> slaRemovedClusters = new ArrayList<>(); + for (String oldCluster : EntityUtil.getClustersDefinedInColos(oldFeed)) { + if (FeedHelper.getSLA(oldCluster, oldFeed) != null + && FeedHelper.getSLA(oldCluster, newFeed) == null) { + slaRemovedClusters.add(oldCluster); + } + } + + for (String clusterName : slaRemovedClusters) { + pendingInstances.remove(new Pair<>(newFeed.getName(), clusterName)); + } + } + } } @Override
