Repository: falcon
Updated Branches:
  refs/heads/0.8 75f7a93a1 -> 9d201294f


FALCON-1563 Old feed instances get deleted from SLA monitoring on feed update. 
Contributed by Ajay Yadava.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/9d201294
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/9d201294
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/9d201294

Branch: refs/heads/0.8
Commit: 9d201294fcd6d7bec8fc6cb285abd3f082c84989
Parents: 75f7a93
Author: Ajay Yadava <[email protected]>
Authored: Tue Nov 3 20:41:56 2015 +0530
Committer: Ajay Yadava <[email protected]>
Committed: Tue Nov 3 20:41:56 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../org/apache/falcon/entity/FeedHelper.java    |  6 ++-
 .../service/FeedSLAMonitoringService.java       | 42 ++++++++++++++++++--
 3 files changed, 45 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/9d201294/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 619270c..9eec6ea 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -81,6 +81,8 @@ Release version: 0.8
     FALCON-1403 Revisit IT cleanup and teardown(Narayan Periwal via Pallavi 
Rao)
 
   BUG FIXES
+    FALCON-1563 Old feed instances get deleted from SLA monitoring on feed 
update (Ajay Yadava)
+
     FALCON-1579 post-processing action fails with 
javax.servlet.jsp.el.ELException (Ajay Yadava via Pallavi Rao)
 
     FALCON-1560 Lifecycle does not allow feed with frequency greater than 
days(1) (Ajay Yadava)

http://git-wip-us.apache.org/repos/asf/falcon/blob/9d201294/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java 
b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
index f1a22e9..a00b3ad 100644
--- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
@@ -270,11 +270,15 @@ public final class FeedHelper {
         if (clusterSla != null) {
             return clusterSla;
         }
-
         final Sla feedSla = feed.getSla();
         return feedSla == null ? null : feedSla;
     }
 
+    public static Sla getSLA(String clusterName, Feed feed) {
+        Cluster cluster = FeedHelper.getCluster(feed, clusterName);
+        return cluster != null ? getSLA(cluster, feed) : null;
+    }
+
     protected static CatalogTable getTable(Cluster cluster, Feed feed) {
         // check if table is overridden in cluster
         if (cluster.getTable() != null) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/9d201294/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
----------------------------------------------------------------------
diff --git 
a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java 
b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
index 0eae0c6..d4383ec 100644
--- 
a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
+++ 
b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
@@ -47,9 +47,11 @@ import java.io.InputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.OutputStream;
+import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
@@ -166,10 +168,11 @@ public final class FeedSLAMonitoringService implements 
ConfigurationChangeListen
     public void onRemove(Entity entity) throws FalconException {
         if (entity.getEntityType() == EntityType.FEED) {
             Feed feed = (Feed) entity;
-            if (feed.getSla() != null) {
+            // currently sla service is enabled only for fileSystemStorage
+            if (feed.getLocations() != null) {
                 Set<String> currentClusters = 
DeploymentUtil.getCurrentClusters();
                 for (Cluster cluster : feed.getClusters().getClusters()) {
-                    if (currentClusters.contains(cluster.getName())) {
+                    if (currentClusters.contains(cluster.getName()) && 
FeedHelper.getSLA(cluster, feed) != null) {
                         monitoredFeeds.remove(feed.getName());
                         pendingInstances.remove(new Pair<>(feed.getName(), 
cluster.getName()));
                     }
@@ -178,10 +181,41 @@ public final class FeedSLAMonitoringService implements 
ConfigurationChangeListen
         }
     }
 
+    private boolean isSLAMonitoringEnabledInCurrentColo(Feed feed) {
+        if (feed.getLocations() != null) {
+            Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
+            for (Cluster cluster : feed.getClusters().getClusters()) {
+                if (currentClusters.contains(cluster.getName()) && 
FeedHelper.getSLA(cluster, feed) != null) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
     @Override
     public void onChange(Entity oldEntity, Entity newEntity) throws 
FalconException {
-        onRemove(oldEntity);
-        onAdd(newEntity);
+        if (newEntity.getEntityType() == EntityType.FEED) {
+            Feed oldFeed = (Feed) oldEntity;
+            Feed newFeed = (Feed) newEntity;
+            if (!isSLAMonitoringEnabledInCurrentColo(newFeed)) {
+                onRemove(oldFeed);
+            } else if (!isSLAMonitoringEnabledInCurrentColo(oldFeed)) {
+                onAdd(newFeed);
+            } else {
+                List<String> slaRemovedClusters = new ArrayList<>();
+                for (String oldCluster : 
EntityUtil.getClustersDefinedInColos(oldFeed)) {
+                    if (FeedHelper.getSLA(oldCluster, oldFeed) != null
+                        && FeedHelper.getSLA(oldCluster, newFeed) == null) {
+                        slaRemovedClusters.add(oldCluster);
+                    }
+                }
+
+                for (String clusterName : slaRemovedClusters) {
+                    pendingInstances.remove(new Pair<>(newFeed.getName(), 
clusterName));
+                }
+            }
+        }
     }
 
     @Override

Reply via email to