FALCON-1742 Implement instance summary api for native scheduler (By Pallavi Rao)
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/2590096e Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/2590096e Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/2590096e Branch: refs/heads/0.9 Commit: 2590096ec5bb57bef3f4a62422194e888d12dca2 Parents: f0893f7 Author: Pallavi Rao <[email protected]> Authored: Tue Jan 19 21:53:54 2016 +0530 Committer: Pallavi Rao <[email protected]> Committed: Tue Jan 19 21:53:54 2016 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 ++ .../falcon/resource/EntitySummaryResult.java | 2 +- .../falcon/state/store/InMemoryStateStore.java | 21 ++++++++++++ .../falcon/state/store/InstanceStateStore.java | 13 ++++++++ .../falcon/state/store/jdbc/BeanMapperUtil.java | 20 +++++++++++ .../falcon/state/store/jdbc/InstanceBean.java | 3 +- .../falcon/state/store/jdbc/JDBCStateStore.java | 17 ++++++++++ .../workflow/engine/FalconWorkflowEngine.java | 23 ++++++++++++- .../state/service/store/TestJDBCStateStore.java | 35 ++++++++++++++++++++ .../apache/falcon/unit/FalconUnitClient.java | 6 ++++ .../InstanceSchedulerManagerJerseyIT.java | 24 ++++++++++++++ 11 files changed, 163 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/2590096e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 531c4a5..2747185 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -22,6 +22,8 @@ Proposed Release Version: 0.9 INCOMPATIBLE CHANGES NEW FEATURES + FALCON-1742 Implement instance summary api for native scheduler (Pallavi Rao) + FALCON-1677 Support re-tries for timed-out instances (Narayan Periwal via Pallavi Rao) FALCON-1643 Add CLI option to display captured replication metrics(Peeyush Bishnoi via Ajay Yadava) http://git-wip-us.apache.org/repos/asf/falcon/blob/2590096e/client/src/main/java/org/apache/falcon/resource/EntitySummaryResult.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/resource/EntitySummaryResult.java b/client/src/main/java/org/apache/falcon/resource/EntitySummaryResult.java index 4a885ec..3ebfe26 100644 --- a/client/src/main/java/org/apache/falcon/resource/EntitySummaryResult.java +++ b/client/src/main/java/org/apache/falcon/resource/EntitySummaryResult.java @@ -35,7 +35,7 @@ public class EntitySummaryResult extends APIResult { * Workflow status as being set in result object. */ public static enum WorkflowStatus { - WAITING, RUNNING, SUSPENDED, KILLED, FAILED, SUCCEEDED, ERROR + WAITING, RUNNING, SUSPENDED, KILLED, FAILED, SUCCEEDED, ERROR, READY } @XmlElement http://git-wip-us.apache.org/repos/asf/falcon/blob/2590096e/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java index c4ced46..69f1e48 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java +++ b/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java @@ -218,6 +218,27 @@ public final class InMemoryStateStore extends AbstractStateStore { } @Override + public Map<InstanceState.STATE, Long> getExecutionInstanceSummary(Entity entity, String cluster, + DateTime start, DateTime end) throws StateStoreException { + Map<InstanceState.STATE, Long> summary = new HashMap<>(); + for (InstanceState state : getAllExecutionInstances(entity, cluster)) { + ExecutionInstance instance = state.getInstance(); + DateTime instanceTime = instance.getInstanceTime(); + // Start date inclusive and end date exclusive. + // If start date and end date are equal no instances will be added. + if ((instanceTime.isEqual(start) || instanceTime.isAfter(start)) + && instanceTime.isBefore(end)) { + if (summary.containsKey(state.getCurrentState())) { + summary.put(state.getCurrentState(), summary.get(state.getCurrentState()) + 1L); + } else { + summary.put(state.getCurrentState(), 1L); + } + } + } + return summary; + } + + @Override public InstanceState getLastExecutionInstance(Entity entity, String cluster) throws StateStoreException { EntityClusterID id = new EntityClusterID(entity, cluster); if (!entityStates.containsKey(id.getEntityID().getKey())) { http://git-wip-us.apache.org/repos/asf/falcon/blob/2590096e/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java index 8ce8fa0..b7269f8 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java +++ b/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java @@ -17,6 +17,7 @@ */ package org.apache.falcon.state.store; +import java.util.Map; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.exception.StateStoreException; import org.apache.falcon.state.EntityClusterID; @@ -101,6 +102,18 @@ public interface InstanceStateStore { */ Collection<InstanceState> getExecutionInstances(EntityClusterID entityClusterID, Collection<InstanceState.STATE> states) throws StateStoreException; + + /** + * @param entity + * @param cluster + * @param states + * @param start + * @param end + * @return - A map of state and the no. of instances in that state. + * @throws StateStoreException + */ + Map<InstanceState.STATE, Long> getExecutionInstanceSummary(Entity entity, String cluster, + DateTime start, DateTime end) throws StateStoreException; /** * @param entity * @param cluster http://git-wip-us.apache.org/repos/asf/falcon/blob/2590096e/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java index 3def14a..194819e 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java +++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java @@ -17,6 +17,8 @@ */ package org.apache.falcon.state.store.jdbc; +import java.util.HashMap; +import java.util.Map; import org.apache.commons.io.IOUtils; import org.apache.falcon.FalconException; import org.apache.falcon.entity.EntityUtil; @@ -301,4 +303,22 @@ public final class BeanMapperUtil { IOUtils.closeQuietly(out); } } + + /** + * @param summary + * @return A map of state and count given the JQL result. + */ + public static Map<InstanceState.STATE, Long> getInstanceStateSummary(Collection<Object[]> summary) { + Map<InstanceState.STATE, Long> stateSummary = new HashMap<>(); + if (summary != null && !summary.isEmpty()) { + for (Object[] projection : summary) { + // Has to have 2 columns (state and count), else Array will be out of bounds. + if (projection.length == 2) { + stateSummary.put(InstanceState.STATE.valueOf((String)projection[0]), + Long.valueOf(((Number)projection[1]).longValue())); + } + } + } + return stateSummary; + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/2590096e/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java index 7f7b966..e8385b1 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java +++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java @@ -50,7 +50,8 @@ import java.sql.Timestamp; @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_FOR_STATES", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.currentState IN (:currentState)"), @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_CLUSTER_FOR_STATES_WITH_RANGE", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster AND a.currentState IN (:currentState) AND a.instanceTime >= :startTime AND a.instanceTime < :endTime"), @NamedQuery(name = "GET_LAST_INSTANCE_FOR_ENTITY_CLUSTER", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster order by a.instanceTime desc"), - @NamedQuery(name = "DELETE_INSTANCES_TABLE", query = "delete from InstanceBean a") + @NamedQuery(name = "DELETE_INSTANCES_TABLE", query = "delete from InstanceBean a"), + @NamedQuery(name = "GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE", query = "select a.currentState, COUNT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster AND a.instanceTime >= :startTime AND a.instanceTime < :endTime GROUP BY a.currentState") }) //RESUME CHECKSTYLE CHECK LineLengthCheck @Table(name = "INSTANCES") http://git-wip-us.apache.org/repos/asf/falcon/blob/2590096e/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java index 2eafbce..1c07286 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java +++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java @@ -17,6 +17,7 @@ */ package org.apache.falcon.state.store.jdbc; +import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.exception.StateStoreException; @@ -357,6 +358,22 @@ public final class JDBCStateStore extends AbstractStateStore { } @Override + public Map<InstanceState.STATE, Long> getExecutionInstanceSummary(Entity entity, String cluster, + DateTime start, DateTime end) throws StateStoreException { + String entityKey = new EntityClusterID(entity, cluster).getEntityID().getKey(); + EntityManager entityManager = getEntityManager(); + Query q = entityManager.createNamedQuery("GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE"); + q.setParameter("entityId", entityKey); + q.setParameter("cluster", cluster); + q.setParameter("startTime", new Timestamp(start.getMillis())); + q.setParameter("endTime", new Timestamp(end.getMillis())); + List result = q.getResultList(); + entityManager.close(); + + return BeanMapperUtil.getInstanceStateSummary(result); + } + + @Override public Collection<InstanceState> getExecutionInstances(Entity entity, String cluster, Collection<InstanceState.STATE> states, DateTime start, DateTime end) throws StateStoreException { http://git-wip-us.apache.org/repos/asf/falcon/blob/2590096e/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java index eb39ec0..34bcf01 100644 --- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java +++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java @@ -18,6 +18,7 @@ package org.apache.falcon.workflow.engine; +import java.util.HashMap; import org.apache.commons.lang3.StringUtils; import org.apache.falcon.FalconException; import org.apache.falcon.LifeCycle; @@ -398,7 +399,27 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine { @Override public InstancesSummaryResult getSummary(Entity entity, Date start, Date end, List<LifeCycle> lifeCycles) throws FalconException { - throw new FalconException("Not yet Implemented"); + Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity); + List<InstancesSummaryResult.InstanceSummary> instanceSummaries = new ArrayList<>(); + + // Iterate over entity clusters + for (String cluster : clusters) { + LOG.debug("Retrieving summary of instances for cluster : {}", cluster); + Map<InstanceState.STATE, Long> summaries = STATE_STORE.getExecutionInstanceSummary(entity, cluster, + new DateTime(start), new DateTime(end)); + Map<String, Long> summaryMap = new HashMap<>(); + // Iterate over the map and convert STATE to String + for (Map.Entry<InstanceState.STATE, Long> summary : summaries.entrySet()) { + summaryMap.put(summary.getKey().name(), summary.getValue()); + } + instanceSummaries.add(new InstancesSummaryResult.InstanceSummary(cluster, summaryMap)); + } + + InstancesSummaryResult instancesSummaryResult = + new InstancesSummaryResult(APIResult.Status.SUCCEEDED, JobAction.SUMMARY.name()); + instancesSummaryResult.setInstancesSummary(instanceSummaries. + toArray(new InstancesSummaryResult.InstanceSummary[instanceSummaries.size()])); + return instancesSummaryResult; } @Override http://git-wip-us.apache.org/repos/asf/falcon/blob/2590096e/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java ---------------------------------------------------------------------- diff --git a/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java b/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java index 2a383cc..d597e27 100644 --- a/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java +++ b/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java @@ -17,6 +17,7 @@ */ package org.apache.falcon.state.service.store; +import java.util.Map; import org.apache.commons.lang.RandomStringUtils; import org.apache.falcon.FalconException; import org.apache.falcon.cluster.util.EmbeddedCluster; @@ -445,7 +446,41 @@ public class TestJDBCStateStore extends AbstractSchedulerTestBase { Assert.assertEquals(instances.size(), 0); } + @Test + public void testGetExecutionSummaryWithRange() throws Exception { + storeEntity(EntityType.CLUSTER, "testCluster"); + storeEntity(EntityType.FEED, "clicksFeed"); + storeEntity(EntityType.FEED, "clicksSummary"); + + long instance1Time = System.currentTimeMillis() - 180000; + long instance2Time = System.currentTimeMillis(); + EntityState entityState = getEntityState(EntityType.PROCESS, "clicksProcess"); + ExecutionInstance processExecutionInstance1 = BeanMapperUtil.getExecutionInstance( + entityState.getEntity().getEntityType(), entityState.getEntity(), + instance1Time, "cluster1", instance1Time); + InstanceState instanceState1 = new InstanceState(processExecutionInstance1); + instanceState1.setCurrentState(InstanceState.STATE.RUNNING); + ExecutionInstance processExecutionInstance2 = BeanMapperUtil.getExecutionInstance( + entityState.getEntity().getEntityType(), entityState.getEntity(), + instance2Time, "cluster1", instance2Time); + InstanceState instanceState2 = new InstanceState(processExecutionInstance2); + instanceState2.setCurrentState(InstanceState.STATE.SUCCEEDED); + + stateStore.putExecutionInstance(instanceState1); + stateStore.putExecutionInstance(instanceState2); + + + Map<InstanceState.STATE, Long> summary = stateStore.getExecutionInstanceSummary(entityState.getEntity(), + "cluster1", new DateTime(instance1Time), new DateTime(instance1Time + 60000)); + Assert.assertEquals(summary.size(), 1); + Assert.assertEquals(summary.get(InstanceState.STATE.RUNNING).longValue(), 1L); + + summary = stateStore.getExecutionInstanceSummary(entityState.getEntity(), + "cluster1", new DateTime(instance2Time), new DateTime(instance2Time + 60000)); + Assert.assertEquals(summary.size(), 1); + Assert.assertEquals(summary.get(InstanceState.STATE.SUCCEEDED).longValue(), 1L); + } private void initInstanceState(InstanceState instanceState) { instanceState.setCurrentState(InstanceState.STATE.READY); http://git-wip-us.apache.org/repos/asf/falcon/blob/2590096e/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java ---------------------------------------------------------------------- diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java index 13375ef..7371e3a 100644 --- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java +++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java @@ -324,6 +324,12 @@ public class FalconUnitClient extends AbstractFalconClient { String colo, List<LifeCycle> lifeCycles, String filterBy, String orderBy, String sortOrder, String doAsUser) throws FalconCLIException { + if (StringUtils.isBlank(orderBy)) { + orderBy = DEFAULT_ORDERBY; + } + if (StringUtils.isBlank(sortOrder)) { + sortOrder = DEFAULT_SORTED_ORDER; + } return localInstanceManager.getSummary(type, entity, start, end, colo, lifeCycles, filterBy, orderBy, sortOrder); } http://git-wip-us.apache.org/repos/asf/falcon/blob/2590096e/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java index 698580b..aac7e26 100644 --- a/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java +++ b/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java @@ -144,4 +144,28 @@ public class InstanceSchedulerManagerJerseyIT extends AbstractSchedulerManagerJe Assert.assertEquals(result.getInstances()[0].getInstance(), "2012-04-22T00:00Z"); Assert.assertEquals(result.getInstances()[2].getInstance(), START_INSTANCE); } + + @Test + public void testInstanceSummary() throws Exception { + UnitTestContext context = new UnitTestContext(); + Map<String, String> overlay = context.getUniqueOverlay(); + + setupProcessExecution(context, overlay, 3); + + String processName = overlay.get(PROCESS_NAME); + String colo = overlay.get(COLO); + + waitForStatus(EntityType.PROCESS.toString(), processName, + START_INSTANCE, InstancesResult.WorkflowStatus.RUNNING); + + InstancesSummaryResult result = falconUnitClient.getSummaryOfInstances(EntityType.PROCESS.toString(), + processName, START_INSTANCE, "2012-04-23T00:00Z", colo, null, null, null, null, null); + + Assert.assertEquals(result.getInstancesSummary().length, 1); + Assert.assertEquals(result.getInstancesSummary()[0].getCluster(), overlay.get(CLUSTER)); + Assert.assertEquals(result.getInstancesSummary()[0].getSummaryMap().size(), 2); + // Parallelism is 2 + Assert.assertEquals(result.getInstancesSummary()[0].getSummaryMap().get("RUNNING").longValue(), 2L); + Assert.assertEquals(result.getInstancesSummary()[0].getSummaryMap().get("READY").longValue(), 1L); + } }
