Repository: falcon Updated Branches: refs/heads/master d1cc9d229 -> 7a74bb0c6
FALCON-1640 Cascading Delete for instances in Native Scheduler. Contributed by Pavan Kumar Kolamuri. Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/7a74bb0c Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/7a74bb0c Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/7a74bb0c Branch: refs/heads/master Commit: 7a74bb0c6ee25d1d0bc5944b3af0438527343956 Parents: d1cc9d2 Author: Ajay Yadava <[email protected]> Authored: Thu Dec 24 17:40:59 2015 +0530 Committer: Ajay Yadava <[email protected]> Committed: Thu Dec 24 17:40:59 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../apache/falcon/entity/AbstractTestBase.java | 6 +++ .../falcon/state/store/jdbc/EntityBean.java | 13 ++++++ .../falcon/state/store/jdbc/InstanceBean.java | 16 ++++++++ .../falcon/state/store/jdbc/JDBCStateStore.java | 12 +++++- .../state/service/store/TestJDBCStateStore.java | 42 +++++++++++++++++++- 6 files changed, 88 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/7a74bb0c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index aa8d48f..6f1e490 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -41,6 +41,8 @@ Trunk (Unreleased) FALCON-1213 Base framework of the native scheduler(Pallavi Rao) IMPROVEMENTS + FALCON-1640 Cascading Delete for instances in Native Scheduler(Pavan Kumar Kolamuri via Ajay Yadava) + FALCON-1683 Inconsistent behavior when user tries to switch schedulers(Pallavi Rao via Ajay Yadava) FALCON-1669 Falcon should show more helpful message when it is unable to initialize Falcon Client object(Baishuo via Ajay Yadava) http://git-wip-us.apache.org/repos/asf/falcon/blob/7a74bb0c/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java index a6d607b..fd963e5 100644 --- a/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java +++ b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java @@ -139,6 +139,12 @@ public class AbstractTestBase { } } + protected void deleteEntity(EntityType type, String name) throws FalconException { + store.remove(type, name); + } + + + private void decorateACL(String proxyUser, String defaultGroupName, Cluster cluster) { if (cluster.getACL() != null) { return; http://git-wip-us.apache.org/repos/asf/falcon/blob/7a74bb0c/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/EntityBean.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/EntityBean.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/EntityBean.java index 03ada39..37fb0cb 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/EntityBean.java +++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/EntityBean.java @@ -20,13 +20,16 @@ package org.apache.falcon.state.store.jdbc; import org.apache.openjpa.persistence.jdbc.Index; import javax.persistence.Basic; +import javax.persistence.CascadeType; import javax.persistence.Column; import javax.persistence.Entity; import javax.persistence.Id; import javax.persistence.NamedQueries; import javax.persistence.NamedQuery; +import javax.persistence.OneToMany; import javax.persistence.Table; import javax.validation.constraints.NotNull; +import java.util.List; //SUSPEND CHECKSTYLE CHECK LineLengthCheck /** * Entity object which will be stored in Data Base. @@ -65,6 +68,9 @@ public class EntityBean { @Column(name = "current_state") private String state; + @OneToMany(cascade= CascadeType.REMOVE, mappedBy="entityBean") + private List<InstanceBean> instanceBeans; + public EntityBean() { } @@ -100,5 +106,12 @@ public class EntityBean { this.state = state; } + public List<InstanceBean> getInstanceBeans() { + return instanceBeans; + } + + public void setInstanceBeans(List<InstanceBean> instanceBeans) { + this.instanceBeans = instanceBeans; + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/7a74bb0c/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java index 305b398..5ed3ccd 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java +++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java @@ -17,13 +17,17 @@ */ package org.apache.falcon.state.store.jdbc; +import org.apache.openjpa.persistence.jdbc.ForeignKey; +import org.apache.openjpa.persistence.jdbc.ForeignKeyAction; import org.apache.openjpa.persistence.jdbc.Index; import javax.persistence.Basic; +import javax.persistence.CascadeType; import javax.persistence.Column; import javax.persistence.Entity; import javax.persistence.Id; import javax.persistence.Lob; +import javax.persistence.ManyToOne; import javax.persistence.NamedQueries; import javax.persistence.NamedQuery; import javax.persistence.Table; @@ -104,6 +108,10 @@ public class InstanceBean { @Column(name = "instance_sequence") private Integer instanceSequence; + @ForeignKey(deleteAction= ForeignKeyAction.CASCADE) + @ManyToOne(cascade= CascadeType.REMOVE) + private EntityBean entityBean; + @Column(name = "awaited_predicates", columnDefinition = "BLOB") @Lob @@ -209,4 +217,12 @@ public class InstanceBean { public void setProperties(byte[] properties) { this.properties = properties; } + + public EntityBean getEntityBean() { + return entityBean; + } + + public void setEntityBean(EntityBean entityBean) { + this.entityBean = entityBean; + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/7a74bb0c/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java index 151c2c2..abd4119 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java +++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java @@ -89,6 +89,14 @@ public final class JDBCStateStore extends AbstractStateStore { } private EntityState getEntityByKey(EntityID id) throws StateStoreException { + EntityBean entityBean = getEntityBean(id); + if (entityBean == null) { + return null; + } + return BeanMapperUtil.convertToEntityState(entityBean); + } + + private EntityBean getEntityBean(EntityID id) { EntityManager entityManager = getEntityManager(); Query q = entityManager.createNamedQuery("GET_ENTITY"); q.setParameter("id", id.getKey()); @@ -97,7 +105,7 @@ public final class JDBCStateStore extends AbstractStateStore { return null; } entityManager.close(); - return BeanMapperUtil.convertToEntityState((EntityBean) result.get(0)); + return ((EntityBean)result.get(0)); } @Override @@ -182,6 +190,8 @@ public final class JDBCStateStore extends AbstractStateStore { } try { InstanceBean instanceBean = BeanMapperUtil.convertToInstanceBean(instanceState); + EntityBean entityBean = getEntityBean(new InstanceID(instanceState.getInstance()).getEntityID()); + instanceBean.setEntityBean(entityBean); EntityManager entityManager = getEntityManager(); beginTransaction(entityManager); entityManager.persist(instanceBean); http://git-wip-us.apache.org/repos/asf/falcon/blob/7a74bb0c/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java ---------------------------------------------------------------------- diff --git a/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java b/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java index 34156c0..bb8ff61 100644 --- a/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java +++ b/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java @@ -398,6 +398,41 @@ public class TestJDBCStateStore extends AbstractSchedulerTestBase { } + @Test + public void testCascadingDelete() throws Exception { + storeEntity(EntityType.CLUSTER, "testCluster"); + storeEntity(EntityType.FEED, "clicksFeed"); + storeEntity(EntityType.FEED, "clicksSummary"); + EntityState entityState = getEntityState(EntityType.PROCESS, "process1"); + stateStore.putEntity(entityState); + ExecutionInstance processExecutionInstance1 = BeanMapperUtil.getExecutionInstance( + entityState.getEntity().getEntityType(), entityState.getEntity(), + System.currentTimeMillis() - 60000, "cluster1", System.currentTimeMillis() - 60000); + InstanceState instanceState1 = new InstanceState(processExecutionInstance1); + instanceState1.setCurrentState(InstanceState.STATE.READY); + + ExecutionInstance processExecutionInstance2 = BeanMapperUtil.getExecutionInstance( + entityState.getEntity().getEntityType(), entityState.getEntity(), + System.currentTimeMillis(), "cluster1", System.currentTimeMillis()); + InstanceState instanceState2 = new InstanceState(processExecutionInstance2); + instanceState2.setCurrentState(InstanceState.STATE.RUNNING); + + stateStore.putExecutionInstance(instanceState1); + stateStore.putExecutionInstance(instanceState2); + + Collection<InstanceState> instances = stateStore.getAllExecutionInstances(entityState.getEntity(), "cluster1"); + Assert.assertEquals(instances.size(), 2); + + + stateStore.deleteEntity(new EntityID(entityState.getEntity())); + deleteEntity(EntityType.PROCESS, "process1"); + + + instances = stateStore.getAllExecutionInstances(entityState.getEntity(), "cluster1"); + Assert.assertEquals(instances.size(), 0); + } + + private void initInstanceState(InstanceState instanceState) { instanceState.setCurrentState(InstanceState.STATE.READY); @@ -420,8 +455,11 @@ public class TestJDBCStateStore extends AbstractSchedulerTestBase { @AfterTest public void cleanUpTables() throws StateStoreException { - stateStore.deleteEntities(); - stateStore.deleteExecutionInstances(); + try { + stateStore.deleteEntities(); + } catch (Exception e) { + // ignore + } } @AfterClass
