FALCON-1662 Ensure entity can be scheduled on multiple clusters on same colo (by Pallavi Rao)
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/6fb5cc89 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/6fb5cc89 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/6fb5cc89 Branch: refs/heads/0.9 Commit: 6fb5cc89a6a6f2f2b5a1aff7fd086489d24f889d Parents: 1613ef6 Author: Pallavi Rao <[email protected]> Authored: Mon Jan 18 11:40:32 2016 +0530 Committer: Pallavi Rao <[email protected]> Committed: Mon Jan 18 11:40:32 2016 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 ++ .../workflow/engine/OozieWorkflowEngine.java | 7 ++++++- .../execution/FalconExecutionService.java | 5 +++++ .../falcon/state/store/jdbc/InstanceBean.java | 2 +- .../falcon/state/store/jdbc/JDBCStateStore.java | 3 ++- .../workflow/engine/FalconWorkflowEngine.java | 9 ++++++--- .../state/service/store/TestJDBCStateStore.java | 21 ++++++++++++++++---- 7 files changed, 39 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/6fb5cc89/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index b5bf941..78cc96b 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -58,6 +58,8 @@ Proposed Release Version: 0.9 FALCON-1213 Base framework of the native scheduler(Pallavi Rao) IMPROVEMENTS + FALCON-1662 Ensure entity can be scheduled on multiple clusters on same colo (Pallavi Rao) + FALCON-1545 Add documentation for Hive replication job counters(Peeyush Bishnoi via Ajay Yadava) FALCON-1601 Make Falcon StateStore more secure by not disclosing imp params in startup.props(Pavan Kumar Kolamuri via Ajay Yadava) http://git-wip-us.apache.org/repos/asf/falcon/blob/6fb5cc89/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java index cf0e30d..d4b45a6 100644 --- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java +++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java @@ -269,12 +269,17 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { private boolean isBundleInState(Map<String, BundleJob> bundles, BundleStatus status) throws FalconException { + // Need a separate list to avoid concurrent modification. + List<String> bundlesToRemove = new ArrayList<>(); // After removing MISSING bundles for clusters, if bundles.size() == 0, entity is not scheduled. Return false. for (Map.Entry<String, BundleJob> clusterBundle : bundles.entrySet()) { if (clusterBundle.getValue() == MISSING) { // There is no active bundle for this cluster - bundles.remove(clusterBundle.getKey()); + bundlesToRemove.add(clusterBundle.getKey()); } } + for (String bundleToRemove : bundlesToRemove) { + bundles.remove(bundleToRemove); + } if (bundles.size() == 0) { return false; } http://git-wip-us.apache.org/repos/asf/falcon/blob/6fb5cc89/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java b/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java index f45ec98..7bdcd6f 100644 --- a/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java +++ b/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java @@ -221,6 +221,11 @@ public final class FalconExecutionService implements FalconService, EntityStateC */ public void delete(Entity entity) throws FalconException { for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) { + EntityClusterID id = new EntityClusterID(entity, cluster); + if (!executors.containsKey(id)) { + LOG.info("Entity {} is already deleted on cluster {}.", id, cluster); + continue; + } EntityExecutor executor = getEntityExecutor(entity, cluster); executor.killAll(); executors.remove(executor.getId()); http://git-wip-us.apache.org/repos/asf/falcon/blob/6fb5cc89/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java index 5ed3ccd..7f7b966 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java +++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java @@ -48,7 +48,7 @@ import java.sql.Timestamp; @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_CLUSTER", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster"), @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_CLUSTER_FOR_STATES", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster AND a.currentState IN (:currentState)"), @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_FOR_STATES", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.currentState IN (:currentState)"), - @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_FOR_STATES_WITH_RANGE", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.currentState IN (:currentState) AND a.instanceTime >= :startTime AND a.instanceTime < :endTime"), + @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_CLUSTER_FOR_STATES_WITH_RANGE", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster AND a.currentState IN (:currentState) AND a.instanceTime >= :startTime AND a.instanceTime < :endTime"), @NamedQuery(name = "GET_LAST_INSTANCE_FOR_ENTITY_CLUSTER", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster order by a.instanceTime desc"), @NamedQuery(name = "DELETE_INSTANCES_TABLE", query = "delete from InstanceBean a") }) http://git-wip-us.apache.org/repos/asf/falcon/blob/6fb5cc89/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java index e898247..2eafbce 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java +++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java @@ -362,7 +362,7 @@ public final class JDBCStateStore extends AbstractStateStore { DateTime end) throws StateStoreException { String entityKey = new EntityClusterID(entity, cluster).getEntityID().getKey(); EntityManager entityManager = getEntityManager(); - Query q = entityManager.createNamedQuery("GET_INSTANCES_FOR_ENTITY_FOR_STATES_WITH_RANGE"); + Query q = entityManager.createNamedQuery("GET_INSTANCES_FOR_ENTITY_CLUSTER_FOR_STATES_WITH_RANGE"); q.setParameter("entityId", entityKey); List<String> instanceStates = new ArrayList<>(); for (InstanceState.STATE state : states) { @@ -371,6 +371,7 @@ public final class JDBCStateStore extends AbstractStateStore { q.setParameter("currentState", instanceStates); q.setParameter("startTime", new Timestamp(start.getMillis())); q.setParameter("endTime", new Timestamp(end.getMillis())); + q.setParameter("cluster", cluster); List result = q.getResultList(); entityManager.close(); try { http://git-wip-us.apache.org/repos/asf/falcon/blob/6fb5cc89/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java index ab1a786..b7379d4 100644 --- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java +++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java @@ -319,6 +319,7 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine { for (String name : props.stringPropertyNames()) { keyValuePairs[i++] = new InstancesResult.KeyValuePair(name, props.getProperty(name)); } + instanceInfo.wfParams = keyValuePairs; break; default: throw new IllegalArgumentException("Unhandled action " + action); @@ -416,9 +417,11 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine { boolean entityUpdated = UpdateHelper.isEntityUpdated(oldEntity, newEntity, cluster, EntityUtil.getLatestStagingPath(clusterEntity, oldEntity)); - + StringBuilder result = new StringBuilder(); if (!entityUpdated) { - throw new FalconException("No relevant updates detected in the new entity definition!"); + // Ideally should throw an exception, but, keeping it backward-compatible. + LOG.warn("No relevant updates detected in the new entity definition for entity {}!", newEntity.getName()); + return result.toString(); } Date oldEndTime = EntityUtil.getEndTime(oldEntity, cluster); @@ -435,7 +438,7 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine { Collection<InstanceState> instances = new ArrayList<>(); instances.add(STATE_STORE.getLastExecutionInstance(oldEntity, cluster)); EXECUTION_SERVICE.getEntityExecutor(oldEntity, cluster).update(newEntity); - StringBuilder result = new StringBuilder(); + result.append(newEntity.toShortString()).append("/Effective Time: ") .append(getEffectiveTime(newEntity, cluster, instances)); return result.toString(); http://git-wip-us.apache.org/repos/asf/falcon/blob/6fb5cc89/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java ---------------------------------------------------------------------- diff --git a/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java b/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java index bb8ff61..2a383cc 100644 --- a/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java +++ b/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java @@ -346,21 +346,34 @@ public class TestJDBCStateStore extends AbstractSchedulerTestBase { InstanceState instanceState2 = new InstanceState(processExecutionInstance2); instanceState2.setCurrentState(InstanceState.STATE.RUNNING); + ExecutionInstance processExecutionInstance3 = BeanMapperUtil.getExecutionInstance( + entityState.getEntity().getEntityType(), entityState.getEntity(), + instance2Time, "cluster2", instance2Time); + InstanceState instanceState3 = new InstanceState(processExecutionInstance3); + instanceState3.setCurrentState(InstanceState.STATE.RUNNING); + stateStore.putExecutionInstance(instanceState1); stateStore.putExecutionInstance(instanceState2); + stateStore.putExecutionInstance(instanceState3); List<InstanceState.STATE> states = new ArrayList<>(); states.add(InstanceState.STATE.RUNNING); Collection<InstanceState> actualInstances = stateStore.getExecutionInstances(entityState.getEntity(), "cluster1", states, new DateTime(instance1Time), new DateTime(instance1Time + 60000)); - Assert.assertEquals(1, actualInstances.size()); - Assert.assertEquals(instanceState1, actualInstances.toArray()[0]); + Assert.assertEquals(actualInstances.size(), 1); + Assert.assertEquals(actualInstances.toArray()[0], instanceState1); actualInstances = stateStore.getExecutionInstances(entityState.getEntity(), "cluster1", states, new DateTime(instance2Time), new DateTime(instance2Time + 60000)); - Assert.assertEquals(1, actualInstances.size()); - Assert.assertEquals(instanceState2, actualInstances.toArray()[0]); + Assert.assertEquals(actualInstances.size(), 1); + Assert.assertEquals(actualInstances.toArray()[0], instanceState2); + + // Ensure we can get instances for a different cluster + actualInstances = stateStore.getExecutionInstances(entityState.getEntity(), + "cluster2", states, new DateTime(instance2Time), new DateTime(instance2Time + 60000)); + Assert.assertEquals(actualInstances.size(), 1); + Assert.assertEquals(actualInstances.toArray()[0], instanceState3); }
