Repository: falcon
Updated Branches:
  refs/heads/master d1cc9d229 -> 7a74bb0c6


FALCON-1640 Cascading Delete for instances in Native Scheduler. Contributed by 
Pavan Kumar Kolamuri.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/7a74bb0c
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/7a74bb0c
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/7a74bb0c

Branch: refs/heads/master
Commit: 7a74bb0c6ee25d1d0bc5944b3af0438527343956
Parents: d1cc9d2
Author: Ajay Yadava <[email protected]>
Authored: Thu Dec 24 17:40:59 2015 +0530
Committer: Ajay Yadava <[email protected]>
Committed: Thu Dec 24 17:40:59 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../apache/falcon/entity/AbstractTestBase.java  |  6 +++
 .../falcon/state/store/jdbc/EntityBean.java     | 13 ++++++
 .../falcon/state/store/jdbc/InstanceBean.java   | 16 ++++++++
 .../falcon/state/store/jdbc/JDBCStateStore.java | 12 +++++-
 .../state/service/store/TestJDBCStateStore.java | 42 +++++++++++++++++++-
 6 files changed, 88 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/7a74bb0c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index aa8d48f..6f1e490 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -41,6 +41,8 @@ Trunk (Unreleased)
     FALCON-1213 Base framework of the native scheduler(Pallavi Rao)
 
   IMPROVEMENTS
+    FALCON-1640 Cascading Delete for instances in Native Scheduler(Pavan Kumar 
Kolamuri via Ajay Yadava)
+
     FALCON-1683 Inconsistent behavior when user tries to switch 
schedulers(Pallavi Rao via Ajay Yadava)
 
     FALCON-1669 Falcon should show more helpful message when it is unable to 
initialize Falcon Client object(Baishuo via Ajay Yadava)

http://git-wip-us.apache.org/repos/asf/falcon/blob/7a74bb0c/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
----------------------------------------------------------------------
diff --git 
a/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java 
b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
index a6d607b..fd963e5 100644
--- a/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
+++ b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
@@ -139,6 +139,12 @@ public class AbstractTestBase {
         }
     }
 
+    protected void deleteEntity(EntityType type, String name) throws 
FalconException {
+        store.remove(type, name);
+    }
+
+
+
     private void decorateACL(String proxyUser, String defaultGroupName, 
Cluster cluster) {
         if (cluster.getACL() != null) {
             return;

http://git-wip-us.apache.org/repos/asf/falcon/blob/7a74bb0c/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/EntityBean.java
----------------------------------------------------------------------
diff --git 
a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/EntityBean.java 
b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/EntityBean.java
index 03ada39..37fb0cb 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/EntityBean.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/EntityBean.java
@@ -20,13 +20,16 @@ package org.apache.falcon.state.store.jdbc;
 import org.apache.openjpa.persistence.jdbc.Index;
 
 import javax.persistence.Basic;
+import javax.persistence.CascadeType;
 import javax.persistence.Column;
 import javax.persistence.Entity;
 import javax.persistence.Id;
 import javax.persistence.NamedQueries;
 import javax.persistence.NamedQuery;
+import javax.persistence.OneToMany;
 import javax.persistence.Table;
 import javax.validation.constraints.NotNull;
+import java.util.List;
 //SUSPEND CHECKSTYLE CHECK  LineLengthCheck
 /**
  * Entity object which will be stored in Data Base.
@@ -65,6 +68,9 @@ public class EntityBean {
     @Column(name = "current_state")
     private String state;
 
+    @OneToMany(cascade= CascadeType.REMOVE, mappedBy="entityBean")
+    private List<InstanceBean> instanceBeans;
+
     public EntityBean() {
     }
 
@@ -100,5 +106,12 @@ public class EntityBean {
         this.state = state;
     }
 
+    public List<InstanceBean> getInstanceBeans() {
+        return instanceBeans;
+    }
+
+    public void setInstanceBeans(List<InstanceBean> instanceBeans) {
+        this.instanceBeans = instanceBeans;
+    }
 }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/7a74bb0c/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java
----------------------------------------------------------------------
diff --git 
a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java 
b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java
index 305b398..5ed3ccd 100644
--- 
a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java
+++ 
b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java
@@ -17,13 +17,17 @@
  */
 package org.apache.falcon.state.store.jdbc;
 
+import org.apache.openjpa.persistence.jdbc.ForeignKey;
+import org.apache.openjpa.persistence.jdbc.ForeignKeyAction;
 import org.apache.openjpa.persistence.jdbc.Index;
 
 import javax.persistence.Basic;
+import javax.persistence.CascadeType;
 import javax.persistence.Column;
 import javax.persistence.Entity;
 import javax.persistence.Id;
 import javax.persistence.Lob;
+import javax.persistence.ManyToOne;
 import javax.persistence.NamedQueries;
 import javax.persistence.NamedQuery;
 import javax.persistence.Table;
@@ -104,6 +108,10 @@ public class InstanceBean {
     @Column(name = "instance_sequence")
     private Integer instanceSequence;
 
+    @ForeignKey(deleteAction= ForeignKeyAction.CASCADE)
+    @ManyToOne(cascade= CascadeType.REMOVE)
+    private EntityBean entityBean;
+
 
     @Column(name = "awaited_predicates", columnDefinition = "BLOB")
     @Lob
@@ -209,4 +217,12 @@ public class InstanceBean {
     public void setProperties(byte[] properties) {
         this.properties = properties;
     }
+
+    public EntityBean getEntityBean() {
+        return entityBean;
+    }
+
+    public void setEntityBean(EntityBean entityBean) {
+        this.entityBean = entityBean;
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/7a74bb0c/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
----------------------------------------------------------------------
diff --git 
a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
 
b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
index 151c2c2..abd4119 100644
--- 
a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
+++ 
b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
@@ -89,6 +89,14 @@ public final class JDBCStateStore extends AbstractStateStore 
{
     }
 
     private EntityState getEntityByKey(EntityID id) throws StateStoreException 
{
+        EntityBean entityBean = getEntityBean(id);
+        if (entityBean == null) {
+            return null;
+        }
+        return BeanMapperUtil.convertToEntityState(entityBean);
+    }
+
+    private EntityBean getEntityBean(EntityID id) {
         EntityManager entityManager = getEntityManager();
         Query q = entityManager.createNamedQuery("GET_ENTITY");
         q.setParameter("id", id.getKey());
@@ -97,7 +105,7 @@ public final class JDBCStateStore extends AbstractStateStore 
{
             return null;
         }
         entityManager.close();
-        return BeanMapperUtil.convertToEntityState((EntityBean) result.get(0));
+        return ((EntityBean)result.get(0));
     }
 
     @Override
@@ -182,6 +190,8 @@ public final class JDBCStateStore extends 
AbstractStateStore {
         }
         try {
             InstanceBean instanceBean = 
BeanMapperUtil.convertToInstanceBean(instanceState);
+            EntityBean entityBean = getEntityBean(new 
InstanceID(instanceState.getInstance()).getEntityID());
+            instanceBean.setEntityBean(entityBean);
             EntityManager entityManager = getEntityManager();
             beginTransaction(entityManager);
             entityManager.persist(instanceBean);

http://git-wip-us.apache.org/repos/asf/falcon/blob/7a74bb0c/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java
----------------------------------------------------------------------
diff --git 
a/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java
 
b/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java
index 34156c0..bb8ff61 100644
--- 
a/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java
+++ 
b/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java
@@ -398,6 +398,41 @@ public class TestJDBCStateStore extends 
AbstractSchedulerTestBase {
 
     }
 
+    @Test
+    public void testCascadingDelete() throws Exception {
+        storeEntity(EntityType.CLUSTER, "testCluster");
+        storeEntity(EntityType.FEED, "clicksFeed");
+        storeEntity(EntityType.FEED, "clicksSummary");
+        EntityState entityState = getEntityState(EntityType.PROCESS, 
"process1");
+        stateStore.putEntity(entityState);
+        ExecutionInstance processExecutionInstance1 = 
BeanMapperUtil.getExecutionInstance(
+                entityState.getEntity().getEntityType(), 
entityState.getEntity(),
+                System.currentTimeMillis() - 60000, "cluster1", 
System.currentTimeMillis() - 60000);
+        InstanceState instanceState1 = new 
InstanceState(processExecutionInstance1);
+        instanceState1.setCurrentState(InstanceState.STATE.READY);
+
+        ExecutionInstance processExecutionInstance2 = 
BeanMapperUtil.getExecutionInstance(
+                entityState.getEntity().getEntityType(), 
entityState.getEntity(),
+                System.currentTimeMillis(), "cluster1", 
System.currentTimeMillis());
+        InstanceState instanceState2 = new 
InstanceState(processExecutionInstance2);
+        instanceState2.setCurrentState(InstanceState.STATE.RUNNING);
+
+        stateStore.putExecutionInstance(instanceState1);
+        stateStore.putExecutionInstance(instanceState2);
+
+        Collection<InstanceState> instances = 
stateStore.getAllExecutionInstances(entityState.getEntity(), "cluster1");
+        Assert.assertEquals(instances.size(), 2);
+
+
+        stateStore.deleteEntity(new EntityID(entityState.getEntity()));
+        deleteEntity(EntityType.PROCESS, "process1");
+
+
+        instances = 
stateStore.getAllExecutionInstances(entityState.getEntity(), "cluster1");
+        Assert.assertEquals(instances.size(), 0);
+    }
+
+
 
     private void initInstanceState(InstanceState instanceState) {
         instanceState.setCurrentState(InstanceState.STATE.READY);
@@ -420,8 +455,11 @@ public class TestJDBCStateStore extends 
AbstractSchedulerTestBase {
 
     @AfterTest
     public void cleanUpTables() throws StateStoreException {
-        stateStore.deleteEntities();
-        stateStore.deleteExecutionInstances();
+        try {
+            stateStore.deleteEntities();
+        } catch (Exception e) {
+            // ignore
+        }
     }
 
     @AfterClass

Reply via email to