Repository: falcon Updated Branches: refs/heads/master 5ecae18ec -> 49fbc8c96
FALCON-1517 Instance Management Api in Falcon Unit (Narayan Periwal) Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/49fbc8c9 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/49fbc8c9 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/49fbc8c9 Branch: refs/heads/master Commit: 49fbc8c961f4f467f7f7ae12efd1c80963c7f8a9 Parents: 5ecae18 Author: Pallavi Rao <[email protected]> Authored: Fri Oct 30 14:27:53 2015 +0530 Committer: Pallavi Rao <[email protected]> Committed: Fri Oct 30 14:27:53 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../falcon/client/AbstractFalconClient.java | 200 ++++++++++++++++++- .../org/apache/falcon/client/FalconClient.java | 23 --- .../client/LocalOozieClientCoordProxy.java | 62 ++++++ .../oozie/client/LocalProxyOozieClient.java | 32 +-- .../resource/AbstractInstanceManager.java | 65 +++--- .../apache/falcon/unit/FalconUnitClient.java | 81 +++++++- .../falcon/unit/LocalInstanceManager.java | 51 +++++ .../apache/falcon/unit/FalconUnitTestBase.java | 7 +- .../org/apache/falcon/unit/TestFalconUnit.java | 113 ++++++++++- .../falcon/unit/examples/JavaSleepExample.java | 33 +++ unit/src/test/resources/sleepWorkflow.xml | 41 ++++ 12 files changed, 635 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/49fbc8c9/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index b1f9269..869b182 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -14,6 +14,8 @@ Trunk (Unreleased) FALCON-1213 Base framework of the native scheduler(Pallavi Rao) IMPROVEMENTS + FALCON-1517 Instance Management Api in Falcon Unit (Narayan Periwal via Pallavi Rao) + FALCON-1520 Delete, update, Validate entity operations support in Falcon Unit (Pavan Kolamuri via Pallavi Rao) OPTIMIZATIONS http://git-wip-us.apache.org/repos/asf/falcon/blob/49fbc8c9/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java index 91d5324..27b93c0 100644 --- a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java @@ -21,9 +21,15 @@ import org.apache.falcon.LifeCycle; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.resource.APIResult; +import org.apache.falcon.resource.FeedInstanceResult; +import org.apache.falcon.resource.InstanceDependencyResult; import org.apache.falcon.resource.InstancesResult; +import org.apache.falcon.resource.InstancesSummaryResult; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStream; +import java.io.UnsupportedEncodingException; import java.util.List; /** @@ -34,6 +40,9 @@ public abstract class AbstractFalconClient { //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck + protected static final String FALCON_INSTANCE_ACTION_CLUSTERS = "falcon.instance.action.clusters"; + protected static final String FALCON_INSTANCE_SOURCE_CLUSTERS = "falcon.instance.source.clusters"; + /** * Submit a new entity. Entities can be of type feed, process or data end * points. Entity definitions are validated structurally against schema and @@ -119,7 +128,7 @@ public abstract class AbstractFalconClient { * @param sortOrder sort order can be asc or desc * @param offset offset while displaying results * @param numResults num of Results to output - * @param doAsUser + * @param doAsUser proxy user * @return * @throws FalconCLIException */ @@ -129,7 +138,6 @@ public abstract class AbstractFalconClient { String orderBy, String sortOrder, Integer offset, Integer numResults, String doAsUser) throws FalconCLIException; - //RESUME CHECKSTYLE CHECK ParameterNumberCheck /** * Suspend an entity. @@ -166,4 +174,192 @@ public abstract class AbstractFalconClient { */ public abstract APIResult getStatus(EntityType entityType, String entityName, String colo, String doAsUser) throws FalconCLIException; + + /** + * Kill currently running instance(s) of an entity. + * @param type Valid options are feed or process. + * @param entity name of the entity. + * @param start start time of the instance(s) that you want to refer to + * @param end end time of the instance(s) that you want to refer to + * @param colo Colo on which the query should be run. + * @param lifeCycles <optional param> can be Eviction/Replication(default) for feed and Execution(default) for + * process. + * @param doAsUser proxy user + * @return Result of the kill operation. + */ + public abstract InstancesResult killInstances(String type, String entity, String start, String end, String colo, + String clusters, String sourceClusters, List<LifeCycle> lifeCycles, + String doAsUser) throws FalconCLIException, + UnsupportedEncodingException; + + /** + * Suspend instances of an entity. + * @param type Valid options are feed or process. + * @param entity name of the entity. + * @param start the start time of the instance(s) that you want to refer to + * @param end the end time of the instance(s) that you want to refer to + * @param colo Colo on which the query should be run. + * @param lifeCycles <optional param> can be Eviction/Replication(default) for feed and Execution(default) for + * process. + * @param doAsUser proxy user + * @return Results of the suspend command. + */ + public abstract InstancesResult suspendInstances(String type, String entity, String start, String end, String colo, + String clusters, String sourceClusters, List<LifeCycle> lifeCycles, + String doAsUser) throws FalconCLIException, UnsupportedEncodingException; + + /** + * Resume suspended instances of an entity. + * @param type Valid options are feed or process. + * @param entity name of the entity. + * @param start start time of the instance(s) that you want to refer to + * @param end the end time of the instance(s) that you want to refer to + * @param colo Colo on which the query should be run. + * @param lifeCycles <optional param> can be Eviction/Replication(default) for feed and Execution(default) for + * process. + * @param doAsUser proxy user + * @return Results of the resume command. + */ + public abstract InstancesResult resumeInstances(String type, String entity, String start, String end, String colo, + String clusters, String sourceClusters, List<LifeCycle> lifeCycles, + String doAsUser) throws FalconCLIException, UnsupportedEncodingException; + + /** + * Rerun instances of an entity. On issuing a rerun, by default the execution resumes from the last failed node in + * the workflow. + * @param type Valid options are feed or process. + * @param entity name of the entity. + * @param start start is the start time of the instance that you want to refer to + * @param end end is the end time of the instance that you want to refer to + * @param colo Colo on which the query should be run. + * @param lifeCycles <optional param> can be Eviction/Replication(default) for feed and Execution(default) for + * process. + * @param isForced <optional param> can be used to forcefully rerun the entire instance. + * @param doAsUser proxy user + * @return Results of the rerun command. + */ + public abstract InstancesResult rerunInstances(String type, String entity, String start, String end, + String filePath, String colo, String clusters, + String sourceClusters, List<LifeCycle> lifeCycles, Boolean isForced, + String doAsUser) throws FalconCLIException, IOException; + + /** + * Get summary of instance/instances of an entity. + * @param type Valid options are cluster, feed or process. + * @param entity Name of the entity. + * @param start <optional param> Show instances from this date. Date format is yyyy-MM-dd'T'HH:mm'Z'. + * By default, it is set to (end - (10 * entityFrequency)). + * @param end <optional param> Show instances up to this date. Date format is yyyy-MM-dd'T'HH:mm'Z'. + * Default is set to now. + * @param colo <optional param> Colo on which the query should be run. + * @param lifeCycles <optional param> Valid lifecycles for feed are Eviction/Replication(default) and for process + * is Execution(default). + * @param filterBy <optional param> Filter results by list of field:value pairs. + * Example1: filterBy=STATUS:RUNNING,CLUSTER:primary-cluster + * Example2: filterBy=Status:RUNNING,Status:KILLED + * Supported filter fields are STATUS, CLUSTER. + * Query will do an AND among filterBy fields. + * @param orderBy <optional param> Field by which results should be ordered. + * Supports ordering by "cluster". Example: orderBy=cluster + * @param sortOrder <optional param> Valid options are "asc" and "desc". Example: sortOrder=asc + * @param doAsUser proxy user + * @return Summary of the instances over the specified time range + */ + public abstract InstancesSummaryResult getSummaryOfInstances(String type, String entity, String start, String end, + String colo, List<LifeCycle> lifeCycles, + String filterBy, String orderBy, String sortOrder, + String doAsUser) throws FalconCLIException; + + /** + * Get falcon feed instance availability. + * @param type Valid options is feed. + * @param entity Name of the entity. + * @param start <optional param> Show instances from this date. Date format is yyyy-MM-dd'T'HH:mm'Z'. + * By default, it is set to (end - (10 * entityFrequency)). + * @param end <optional param> Show instances up to this date. Date format is yyyy-MM-dd'T'HH:mm'Z'. + * Default is set to now. + * @param colo Colo on which the query should be run. + * @param doAsUser proxy user + * @return Feed instance availability status + */ + public abstract FeedInstanceResult getFeedListing(String type, String entity, String start, String end, String colo, + String doAsUser) throws FalconCLIException; + + /** + * Get log of a specific instance of an entity. + * @param type Valid options are cluster, feed or process. + * @param entity Name of the entity. + * @param start <optional param> Show instances from this date. Date format is yyyy-MM-dd'T'HH:mm'Z'. + * By default, it is set to (end - (10 * entityFrequency)). + * @param end <optional param> Show instances up to this date. Date format is yyyy-MM-dd'T'HH:mm'Z'. + * Default is set to now. + * @param colo <optional param> Colo on which the query should be run. + * @param runId <optional param> Run Id. + * @param lifeCycles <optional param> Valid lifecycles for feed are Eviction/Replication(default) and for process is + * Execution(default). + * @param filterBy <optional param> Filter results by list of field:value pairs. + * Example: filterBy=STATUS:RUNNING,CLUSTER:primary-cluster + * Supported filter fields are STATUS, CLUSTER, SOURCECLUSTER, STARTEDAFTER. + * Query will do an AND among filterBy fields. + * @param orderBy <optional param> Field by which results should be ordered. + * Supports ordering by "status","startTime","endTime","cluster". + * @param sortOrder <optional param> Valid options are "asc" and "desc" + * @param offset <optional param> Show results from the offset, used for pagination. Defaults to 0. + * @param numResults <optional param> Number of results to show per request, used for pagination. Only integers > 0 + * are valid, Default is 10. + * @param doAsUser proxy user + * @return Log of specified instance. + */ + public abstract InstancesResult getLogsOfInstances(String type, String entity, String start, String end, + String colo, String runId, List<LifeCycle> lifeCycles, + String filterBy, String orderBy, String sortOrder, + Integer offset, Integer numResults, String doAsUser) throws + FalconCLIException; + + //RESUME CHECKSTYLE CHECK ParameterNumberCheck + + /** + * Get the params passed to the workflow for an instance of feed/process. + * @param type Valid options are cluster, feed or process. + * @param entity Name of the entity. + * @param start should be the nominal time of the instance for which you want the params to be returned + * @param colo <optional param> Colo on which the query should be run. + * @param lifeCycles <optional param> Valid lifecycles for feed are Eviction/Replication(default) and for process is + * Execution(default). + * @param doAsUser proxy user + * @return List of instances currently running. + */ + public abstract InstancesResult getParamsOfInstance(String type, String entity, String start, String colo, + List<LifeCycle> lifeCycles, String doAsUser) throws + FalconCLIException, UnsupportedEncodingException; + + /** + * Get dependent instances for a particular instance. + * @param entityType Valid options are feed or process. + * @param entityName Name of the entity + * @param instanceTime <mandatory param> time of the given instance + * @param colo Colo on which the query should be run. + * @return Dependent instances for the specified instance + */ + public abstract InstanceDependencyResult getInstanceDependencies(String entityType, String entityName, + String instanceTime, String colo) throws + FalconCLIException; + + protected InputStream getServletInputStream(String clusters, String sourceClusters, String properties) throws + FalconCLIException, UnsupportedEncodingException { + + InputStream stream; + StringBuilder buffer = new StringBuilder(); + if (clusters != null) { + buffer.append(FALCON_INSTANCE_ACTION_CLUSTERS).append('=').append(clusters).append('\n'); + } + if (sourceClusters != null) { + buffer.append(FALCON_INSTANCE_SOURCE_CLUSTERS).append('=').append(sourceClusters).append('\n'); + } + if (properties != null) { + buffer.append(properties); + } + stream = new ByteArrayInputStream(buffer.toString().getBytes()); + return (buffer.length() == 0) ? null : stream; + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/49fbc8c9/client/src/main/java/org/apache/falcon/client/FalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java index 27510f6..c49dd08 100644 --- a/client/src/main/java/org/apache/falcon/client/FalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java @@ -58,7 +58,6 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.UriBuilder; import java.io.BufferedReader; -import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; @@ -86,9 +85,6 @@ public class FalconClient extends AbstractFalconClient { public static final String USER = System.getProperty("user.name"); public static final String AUTH_URL = "api/options?" + PseudoAuthenticator.USER_NAME + "=" + USER; - private static final String FALCON_INSTANCE_ACTION_CLUSTERS = "falcon.instance.action.clusters"; - private static final String FALCON_INSTANCE_SOURCE_CLUSTERS = "falcon.instance.source.clusters"; - /** * Name of the HTTP cookie used for the authentication token between the client and the server. */ @@ -670,25 +666,6 @@ public class FalconClient extends AbstractFalconClient { return stream; } - private InputStream getServletInputStream(String clusters, - String sourceClusters, String properties) - throws FalconCLIException, UnsupportedEncodingException { - - InputStream stream; - StringBuilder buffer = new StringBuilder(); - if (clusters != null) { - buffer.append(FALCON_INSTANCE_ACTION_CLUSTERS).append('=').append(clusters).append('\n'); - } - if (sourceClusters != null) { - buffer.append(FALCON_INSTANCE_SOURCE_CLUSTERS).append('=').append(sourceClusters).append('\n'); - } - if (properties != null) { - buffer.append(properties); - } - stream = new ByteArrayInputStream(buffer.toString().getBytes()); - return (buffer.length() == 0) ? null : stream; - } - private APIResult sendEntityRequest(Entities entities, EntityType entityType, String entityName, String colo, Boolean skipDryRun, String doAsUser, String properties) throws FalconCLIException { http://git-wip-us.apache.org/repos/asf/falcon/blob/49fbc8c9/oozie/src/main/java/org/apache/oozie/client/LocalOozieClientCoordProxy.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/oozie/client/LocalOozieClientCoordProxy.java b/oozie/src/main/java/org/apache/oozie/client/LocalOozieClientCoordProxy.java new file mode 100644 index 0000000..093d6ff --- /dev/null +++ b/oozie/src/main/java/org/apache/oozie/client/LocalOozieClientCoordProxy.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.client; + +import org.apache.oozie.BaseEngineException; +import org.apache.oozie.CoordinatorEngine; +import org.apache.oozie.LocalOozieClientCoord; + +/** + * Client API to submit and manage Oozie Coord jobs against an Oozie + * intance. + */ +public class LocalOozieClientCoordProxy extends LocalOozieClientCoord { + + private final CoordinatorEngine coordEngine; + + /** + * Create a coordinator client for Oozie local use. + * <p/> + * + * @param coordEngine the engine instance to use. + */ + public LocalOozieClientCoordProxy(CoordinatorEngine coordEngine) { + super(coordEngine); + this.coordEngine = coordEngine; + } + + /** + * Get the info of a coordinator job and subset actions. + * + * @param jobId job Id. + * @param filter filter the status filter + * @param start starting index in the list of actions belonging to the job + * @param len number of actions to be returned + * @return the job info. + * @throws OozieClientException thrown if the job info could not be retrieved. + */ + @Override + public CoordinatorJob getCoordJobInfo(String jobId, String filter, int start, int len) throws OozieClientException { + try { + return coordEngine.getCoordJob(jobId, filter, start, len, false); + } catch (BaseEngineException bex) { + throw new OozieClientException(bex.getErrorCode().toString(), bex); + } + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/49fbc8c9/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java b/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java index c2100d1..6ae92de 100644 --- a/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java +++ b/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java @@ -18,10 +18,11 @@ package org.apache.oozie.client; import org.apache.oozie.BundleEngine; +import org.apache.oozie.CoordinatorEngine; import org.apache.oozie.LocalOozieClient; -import org.apache.oozie.LocalOozieClientCoord; import org.apache.oozie.local.LocalOozie; import org.apache.oozie.service.BundleEngineService; +import org.apache.oozie.service.CoordinatorEngineService; import org.apache.oozie.service.Services; import java.io.PrintStream; @@ -34,11 +35,12 @@ import java.util.Properties; public class LocalProxyOozieClient extends OozieClient { private static LocalOozieClientBundle localOozieClientBundle; - private static LocalOozieClientCoord localOozieClientCoord; + private static LocalOozieClientCoordProxy localOozieClientCoordProxy; private static LocalOozieClient localOozieClient; private static final BundleEngine BUNDLE_ENGINE = Services.get(). get(BundleEngineService.class).getBundleEngine(System.getProperty("user.name")); - + private static final CoordinatorEngine COORDINATOR_ENGINE = Services.get().get(CoordinatorEngineService.class). + getCoordinatorEngine(System.getProperty("user.name")); private LocalOozieClientBundle getLocalOozieClientBundle() { if (localOozieClientBundle == null) { @@ -54,11 +56,11 @@ public class LocalProxyOozieClient extends OozieClient { return localOozieClient; } - private LocalOozieClientCoord getLocalOozieClientCoord() { - if (localOozieClientCoord == null) { - localOozieClientCoord = (LocalOozieClientCoord) LocalOozie.getCoordClient(); + private LocalOozieClientCoordProxy getLocalOozieClientCoordProxy() { + if (localOozieClientCoordProxy == null) { + localOozieClientCoordProxy = new LocalOozieClientCoordProxy(COORDINATOR_ENGINE); } - return localOozieClientCoord; + return localOozieClientCoordProxy; } private OozieClient getClient(String jobId) { @@ -66,7 +68,7 @@ public class LocalProxyOozieClient extends OozieClient { if (jobId.toUpperCase().endsWith("B")) { //checking if it's a bundle job return getLocalOozieClientBundle(); } else if (jobId.toUpperCase().endsWith("C")) { //checking if it's a coordinator job - return getLocalOozieClientCoord(); + return getLocalOozieClientCoordProxy(); } else if (jobId.toUpperCase().endsWith("W")) { //checking if it's a workflow job return getLocalOozieClient(); } else { @@ -104,43 +106,43 @@ public class LocalProxyOozieClient extends OozieClient { @Override public CoordinatorAction getCoordActionInfo(String actionId) throws OozieClientException { - return getLocalOozieClientCoord().getCoordActionInfo(actionId); + return getLocalOozieClientCoordProxy().getCoordActionInfo(actionId); } @Override public CoordinatorJob getCoordJobInfo(final String jobId) throws OozieClientException { - return getLocalOozieClientCoord().getCoordJobInfo(jobId); + return getLocalOozieClientCoordProxy().getCoordJobInfo(jobId); } @Override public List<CoordinatorJob> getCoordJobsInfo(final String filter, final int start, final int len) throws OozieClientException { - return getLocalOozieClientCoord().getCoordJobsInfo(filter, start, len); + return getLocalOozieClientCoordProxy().getCoordJobsInfo(filter, start, len); } @Override public CoordinatorJob getCoordJobInfo(final String jobId, final String filter, final int start, final int len) throws OozieClientException { - return getLocalOozieClientCoord().getCoordJobInfo(jobId, filter, start, len); + return getLocalOozieClientCoordProxy().getCoordJobInfo(jobId, filter, start, len); } @Override public List<CoordinatorAction> reRunCoord(final String jobId, final String rerunType, final String scope, final boolean refresh, final boolean noCleanup) throws OozieClientException { - return getLocalOozieClientCoord().reRunCoord(jobId, rerunType, scope, refresh, noCleanup); + return getLocalOozieClientCoordProxy().reRunCoord(jobId, rerunType, scope, refresh, noCleanup); } @Override public List<WorkflowJob> getJobsInfo(final String filter) throws OozieClientException { - return getLocalOozieClientCoord().getJobsInfo(filter); + return getLocalOozieClientCoordProxy().getJobsInfo(filter); } @Override public List<WorkflowJob> getJobsInfo(final String filter, final int start, final int len) throws OozieClientException { - return getLocalOozieClientCoord().getJobsInfo(filter, start, len); + return getLocalOozieClientCoordProxy().getJobsInfo(filter, start, len); } @Override http://git-wip-us.apache.org/repos/asf/falcon/blob/49fbc8c9/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java index 606f741..fea2989 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java @@ -555,10 +555,14 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager { } } - public InstancesResult killInstance(HttpServletRequest request, - String type, String entity, String startStr, - String endStr, String colo, - List<LifeCycle> lifeCycles) { + public InstancesResult killInstance(HttpServletRequest request, String type, String entity, String startStr, + String endStr, String colo, List<LifeCycle> lifeCycles) { + Properties props = getProperties(request); + return killInstance(props, type, entity, startStr, endStr, colo, lifeCycles); + } + + public InstancesResult killInstance(Properties props, String type, String entity, String startStr, + String endStr, String colo, List<LifeCycle> lifeCycles) { checkColo(colo); checkType(type); try { @@ -568,7 +572,6 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager { Pair<Date, Date> startAndEndDate = getStartAndEndDateForLifecycleOperations( entityObject, startStr, endStr); - Properties props = getProperties(request); AbstractWorkflowEngine wfEngine = getWorkflowEngine(); return wfEngine.killInstances(entityObject, startAndEndDate.first, startAndEndDate.second, props, lifeCycles); @@ -578,10 +581,14 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager { } } - public InstancesResult suspendInstance(HttpServletRequest request, - String type, String entity, String startStr, - String endStr, String colo, - List<LifeCycle> lifeCycles) { + public InstancesResult suspendInstance(HttpServletRequest request, String type, String entity, String startStr, + String endStr, String colo, List<LifeCycle> lifeCycles) { + Properties props = getProperties(request); + return suspendInstance(props, type, entity, startStr, endStr, colo, lifeCycles); + } + + public InstancesResult suspendInstance(Properties props, String type, String entity, String startStr, String endStr, + String colo, List<LifeCycle> lifeCycles) { checkColo(colo); checkType(type); try { @@ -591,7 +598,6 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager { Pair<Date, Date> startAndEndDate = getStartAndEndDateForLifecycleOperations( entityObject, startStr, endStr); - Properties props = getProperties(request); AbstractWorkflowEngine wfEngine = getWorkflowEngine(); return wfEngine.suspendInstances(entityObject, startAndEndDate.first, startAndEndDate.second, props, lifeCycles); @@ -601,10 +607,14 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager { } } - public InstancesResult resumeInstance(HttpServletRequest request, - String type, String entity, String startStr, - String endStr, String colo, - List<LifeCycle> lifeCycles) { + public InstancesResult resumeInstance(HttpServletRequest request, String type, String entity, String startStr, + String endStr, String colo, List<LifeCycle> lifeCycles) { + Properties props = getProperties(request); + return resumeInstance(props, type, entity, startStr, endStr, colo, lifeCycles); + } + + public InstancesResult resumeInstance(Properties props, String type, String entity, String startStr, String endStr, + String colo, List<LifeCycle> lifeCycles) { checkColo(colo); checkType(type); try { @@ -614,7 +624,6 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager { Pair<Date, Date> startAndEndDate = getStartAndEndDateForLifecycleOperations( entityObject, startStr, endStr); - Properties props = getProperties(request); AbstractWorkflowEngine wfEngine = getWorkflowEngine(); return wfEngine.resumeInstances(entityObject, startAndEndDate.first, startAndEndDate.second, props, lifeCycles); @@ -790,9 +799,14 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager { return null; } + public InstancesResult reRunInstance(String type, String entity, String startStr, String endStr, + HttpServletRequest request, String colo, List<LifeCycle> lifeCycles, + Boolean isForced) { + Properties props = getProperties(request); + return reRunInstance(type, entity, startStr, endStr, props, colo, lifeCycles, isForced); + } - public InstancesResult reRunInstance(String type, String entity, String startStr, - String endStr, HttpServletRequest request, + public InstancesResult reRunInstance(String type, String entity, String startStr, String endStr, Properties props, String colo, List<LifeCycle> lifeCycles, Boolean isForced) { checkColo(colo); checkType(type); @@ -803,7 +817,6 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager { Pair<Date, Date> startAndEndDate = getStartAndEndDateForLifecycleOperations( entityObject, startStr, endStr); - Properties props = getProperties(request); AbstractWorkflowEngine wfEngine = getWorkflowEngine(); return wfEngine.reRunInstances(entityObject, startAndEndDate.first, startAndEndDate.second, props, lifeCycles, isForced); @@ -814,14 +827,18 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager { } //RESUME CHECKSTYLE CHECK ParameterNumberCheck - private Properties getProperties(HttpServletRequest request) throws IOException { + private Properties getProperties(HttpServletRequest request) { Properties props = new Properties(); - ServletInputStream xmlStream = request == null ? null : request.getInputStream(); - if (xmlStream != null) { - if (xmlStream.markSupported()) { - xmlStream.mark(XML_DEBUG_LEN); // mark up to debug len + try { + ServletInputStream xmlStream = request == null ? null : request.getInputStream(); + if (xmlStream != null) { + if (xmlStream.markSupported()) { + xmlStream.mark(XML_DEBUG_LEN); // mark up to debug len + } + props.load(xmlStream); } - props.load(xmlStream); + } catch (IOException e) { + LOG.error("Failed to get properties from request", e); } return props; } http://git-wip-us.apache.org/repos/asf/falcon/blob/49fbc8c9/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java ---------------------------------------------------------------------- diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java index b5afae3..9f2b714 100644 --- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java +++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java @@ -32,7 +32,10 @@ import org.apache.falcon.entity.v0.process.Cluster; import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.entity.v0.process.Validity; import org.apache.falcon.resource.APIResult; +import org.apache.falcon.resource.FeedInstanceResult; +import org.apache.falcon.resource.InstanceDependencyResult; import org.apache.falcon.resource.InstancesResult; +import org.apache.falcon.resource.InstancesSummaryResult; import org.apache.falcon.util.DateUtil; import org.apache.falcon.workflow.WorkflowEngineFactory; import org.apache.falcon.workflow.engine.AbstractWorkflowEngine; @@ -40,9 +43,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.Date; import java.util.List; +import java.util.Properties; import java.util.TimeZone; /** @@ -51,6 +56,7 @@ import java.util.TimeZone; public class FalconUnitClient extends AbstractFalconClient { private static final Logger LOG = LoggerFactory.getLogger(FalconUnitClient.class); + protected static final int XML_DEBUG_LEN = 10 * 1024; private static final String DEFAULT_ORDERBY = "status"; private static final String DEFAULT_SORTED_ORDER = "asc"; @@ -162,7 +168,6 @@ public class FalconUnitClient extends AbstractFalconClient { sortOrder, offset, numResults); } - //RESUME CHECKSTYLE CHECK ParameterNumberCheck /** @@ -237,6 +242,68 @@ public class FalconUnitClient extends AbstractFalconClient { return localSchedulableEntityManager.getStatus(entityType.name(), entityName, colo); } + public InstancesResult killInstances(String type, String entity, String start, String end, String colo, + String clusters, String sourceClusters, List<LifeCycle> lifeCycles, + String doAsUser) throws FalconCLIException, UnsupportedEncodingException { + Properties props = getProperties(clusters, sourceClusters); + return localInstanceManager.killInstance(props, type, entity, start, end, colo, lifeCycles); + } + + public InstancesResult suspendInstances(String type, String entity, String start, String end, String colo, + String clusters, String sourceClusters, List<LifeCycle> lifeCycles, + String doAsUser) throws FalconCLIException, UnsupportedEncodingException { + Properties props = getProperties(clusters, sourceClusters); + return localInstanceManager.suspendInstance(props, type, entity, start, end, colo, lifeCycles); + } + + public InstancesResult resumeInstances(String type, String entity, String start, String end, String colo, + String clusters, String sourceClusters, List<LifeCycle> lifeCycles, + String doAsUser) throws FalconCLIException, UnsupportedEncodingException { + Properties props = getProperties(clusters, sourceClusters); + return localInstanceManager.resumeInstance(props, type, entity, start, end, colo, lifeCycles); + } + + public InstancesResult rerunInstances(String type, String entity, String start, String end, String filePath, + String colo, String clusters, String sourceClusters, + List<LifeCycle> lifeCycles, Boolean isForced, String doAsUser) throws + FalconCLIException, IOException { + Properties props = getProperties(clusters, sourceClusters); + return localInstanceManager.reRunInstance(type, entity, start, end, props, colo, lifeCycles, isForced); + } + + public InstancesSummaryResult getSummaryOfInstances(String type, String entity, String start, String end, + String colo, List<LifeCycle> lifeCycles, String filterBy, + String orderBy, String sortOrder, String doAsUser) throws + FalconCLIException { + return localInstanceManager.getSummary(type, entity, start, end, colo, lifeCycles, filterBy, orderBy, + sortOrder); + } + + public FeedInstanceResult getFeedListing(String type, String entity, String start, String end, String colo, + String doAsUser) throws FalconCLIException { + return localInstanceManager.getListing(type, entity, start, end, colo); + } + + public InstancesResult getLogsOfInstances(String type, String entity, String start, String end, String colo, + String runId, List<LifeCycle> lifeCycles, String filterBy, + String orderBy, String sortOrder, Integer offset, Integer numResults, + String doAsUser) throws FalconCLIException { + return localInstanceManager.getLogs(type, entity, start, end, colo, runId, lifeCycles, filterBy, orderBy, + sortOrder, offset, numResults); + } + + public InstancesResult getParamsOfInstance(String type, String entity, String start, String colo, + List<LifeCycle> lifeCycles, String doAsUser) throws FalconCLIException, + UnsupportedEncodingException { + return localInstanceManager.getInstanceParams(type, entity, start, colo, lifeCycles); + } + //RESUME CHECKSTYLE CHECK ParameterNumberCheck + + public InstanceDependencyResult getInstanceDependencies(String entityType, String entityName, String instanceTime, + String colo) throws FalconCLIException { + return localInstanceManager.getInstanceDependencies(entityType, entityName, instanceTime, colo); + } + private boolean checkAndUpdateCluster(Entity entity, EntityType entityType, String cluster) { if (entityType == EntityType.FEED) { return checkAndUpdateFeedClusters(entity, cluster); @@ -303,5 +370,15 @@ public class FalconUnitClient extends AbstractFalconClient { } } } -} + private Properties getProperties(String clusters, String sourceClusters) { + Properties props = new Properties(); + if (StringUtils.isNotEmpty(clusters)) { + props.setProperty(FALCON_INSTANCE_ACTION_CLUSTERS, clusters); + } + if (StringUtils.isNotEmpty(sourceClusters)) { + props.setProperty(FALCON_INSTANCE_SOURCE_CLUSTERS, sourceClusters); + } + return props; + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/49fbc8c9/unit/src/main/java/org/apache/falcon/unit/LocalInstanceManager.java ---------------------------------------------------------------------- diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalInstanceManager.java b/unit/src/main/java/org/apache/falcon/unit/LocalInstanceManager.java index 1503b28..148cbf7 100644 --- a/unit/src/main/java/org/apache/falcon/unit/LocalInstanceManager.java +++ b/unit/src/main/java/org/apache/falcon/unit/LocalInstanceManager.java @@ -19,9 +19,13 @@ package org.apache.falcon.unit; import org.apache.falcon.LifeCycle; import org.apache.falcon.resource.AbstractInstanceManager; +import org.apache.falcon.resource.FeedInstanceResult; +import org.apache.falcon.resource.InstanceDependencyResult; import org.apache.falcon.resource.InstancesResult; +import org.apache.falcon.resource.InstancesSummaryResult; import java.util.List; +import java.util.Properties; /** * A proxy implementation of the entity instance operations. @@ -31,6 +35,27 @@ public class LocalInstanceManager extends AbstractInstanceManager { public LocalInstanceManager() {} //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck + public InstancesResult killInstance(Properties properties, String type, String entity, String startStr, + String endStr, String colo, List<LifeCycle> lifeCycles) { + return super.killInstance(properties, type, entity, startStr, endStr, colo, lifeCycles); + } + + public InstancesResult suspendInstance(Properties properties, String type, String entity, String startStr, + String endStr, String colo, List<LifeCycle> lifeCycles) { + return super.suspendInstance(properties, type, entity, startStr, endStr, colo, lifeCycles); + } + + public InstancesResult resumeInstance(Properties properties, String type, String entity, String startStr, + String endStr, String colo, List<LifeCycle> lifeCycles) { + return super.resumeInstance(properties, type, entity, startStr, endStr, colo, lifeCycles); + } + + public InstancesResult reRunInstance(String type, String entity, String startStr, String endStr, + Properties properties, String colo, List<LifeCycle> lifeCycles, + Boolean isForced) { + return super.reRunInstance(type, entity, startStr, endStr, properties, colo, lifeCycles, isForced); + } + public InstancesResult getStatusOfInstances(String type, String entity, String start, String end, String colo, List<LifeCycle> lifeCycles, String filterBy, String orderBy, String sortOrder, Integer offset, @@ -38,6 +63,32 @@ public class LocalInstanceManager extends AbstractInstanceManager { return super.getStatus(type, entity, start, end, colo, lifeCycles, filterBy, orderBy, sortOrder, offset, numResults); } + + public InstancesSummaryResult getSummary(String type, String entity, String startStr, String endStr, String colo, + List<LifeCycle> lifeCycles, String filterBy, String orderBy, + String sortOrder) { + return super.getSummary(type, entity, startStr, endStr, colo, lifeCycles, filterBy, orderBy, sortOrder); + } + + public FeedInstanceResult getListing(String type, String entity, String startStr, String endStr, String colo) { + return super.getListing(type, entity, startStr, endStr, colo); + } + + public InstancesResult getLogs(String type, String entity, String startStr, String endStr, String colo, + String runId, List<LifeCycle> lifeCycles, String filterBy, String orderBy, + String sortOrder, Integer offset, Integer numResults) { + return super.getLogs(type, entity, startStr, endStr, colo, runId, lifeCycles, filterBy, orderBy, sortOrder, + offset, numResults); + } //RESUME CHECKSTYLE CHECK ParameterNumberCheck + public InstancesResult getInstanceParams(String type, String entity, String startTime, String colo, + List<LifeCycle> lifeCycles) { + return super.getInstanceParams(type, entity, startTime, colo, lifeCycles); + } + + public InstanceDependencyResult getInstanceDependencies(String entityType, String entityName, + String instanceTimeString, String colo) { + return super.getInstanceDependencies(entityType, entityName, instanceTimeString, colo); + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/49fbc8c9/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java ---------------------------------------------------------------------- diff --git a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java index ac478f4..bd81798 100644 --- a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java +++ b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java @@ -174,7 +174,7 @@ public class FalconUnitTestBase { throw new FalconException("Process not found " + processName); } String workflowPath = processEntity.getWorkflow().getPath(); - fs.copyFromLocalFile(new Path(localWfPath), new Path(workflowPath)); + fs.copyFromLocalFile(new Path(localWfPath), new Path(workflowPath, "workflow.xml")); return falconUnitClient.schedule(EntityType.PROCESS, processName, startTime, numInstances, cluster, skipDryRun, properties); } @@ -332,12 +332,13 @@ public class FalconUnitTestBase { } } - protected long waitForStatus(final String entityType, final String entityName, final String instanceTime) { + protected long waitForStatus(final String entityType, final String entityName, final String instanceTime, + final InstancesResult.WorkflowStatus instanceStatus) { return waitFor(WAIT_TIME, new Predicate() { public boolean evaluate() throws Exception { InstancesResult.WorkflowStatus status = falconUnitClient.getInstanceStatus(entityType, entityName, instanceTime); - return InstancesResult.WorkflowStatus.SUCCEEDED.equals(status); + return instanceStatus.equals(status); } }); } http://git-wip-us.apache.org/repos/asf/falcon/blob/49fbc8c9/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java ---------------------------------------------------------------------- diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java index 8cdbd88..2c8642d 100644 --- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java +++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java @@ -24,7 +24,10 @@ import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.entity.v0.process.Property; import org.apache.falcon.resource.APIResult; +import org.apache.falcon.resource.FeedInstanceResult; +import org.apache.falcon.resource.InstanceDependencyResult; import org.apache.falcon.resource.InstancesResult; +import org.apache.falcon.resource.InstancesSummaryResult; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.testng.Assert; @@ -54,7 +57,9 @@ public class TestFalconUnit extends FalconUnitTestBase { private static final String OUTPUT_FEED_NAME = "out"; private static final String INPUT_FILE_NAME = "input.txt"; private static final String SCHEDULE_TIME = "2015-06-20T00:00Z"; + private static final String END_TIME = "2015-06-20T00:01Z"; private static final String WORKFLOW = "workflow.xml"; + private static final String SLEEP_WORKFLOW = "sleepWorkflow.xml"; @Test public void testProcessInstanceExecution() throws Exception { @@ -66,7 +71,7 @@ public class TestFalconUnit extends FalconUnitTestBase { result = scheduleProcess(PROCESS_NAME, SCHEDULE_TIME, 1, CLUSTER_NAME, getAbsolutePath(WORKFLOW), true, ""); assertStatus(result); - waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME); + waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME, InstancesResult.WorkflowStatus.SUCCEEDED); InstancesResult.WorkflowStatus status = getClient().getInstanceStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME); Assert.assertEquals(status, InstancesResult.WorkflowStatus.SUCCEEDED); @@ -113,7 +118,7 @@ public class TestFalconUnit extends FalconUnitTestBase { result = scheduleProcess(PROCESS_NAME, scheduleTime, 2, CLUSTER_NAME, getAbsolutePath(WORKFLOW), true, ""); assertStatus(result); - waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, scheduleTime); + waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, scheduleTime, InstancesResult.WorkflowStatus.SUCCEEDED); result = getClient().suspend(EntityType.PROCESS, PROCESS_NAME, CLUSTER_NAME, null); assertStatus(result); result = getClient().getStatus(EntityType.PROCESS, PROCESS_NAME, CLUSTER_NAME, null); @@ -139,7 +144,7 @@ public class TestFalconUnit extends FalconUnitTestBase { result = scheduleProcess(PROCESS_NAME, SCHEDULE_TIME, 10, CLUSTER_NAME, getAbsolutePath(WORKFLOW), true, ""); assertStatus(result); - waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME); + waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME, InstancesResult.WorkflowStatus.SUCCEEDED); result = getClient().delete(EntityType.PROCESS, PROCESS_NAME, null); assertStatus(result); try { @@ -186,10 +191,9 @@ public class TestFalconUnit extends FalconUnitTestBase { createData(INPUT_FEED_NAME, CLUSTER_NAME, SCHEDULE_TIME, INPUT_FILE_NAME); result = submitProcess(getAbsolutePath(PROCESS), PROCESS_APP_PATH); assertStatus(result); - result = scheduleProcess(PROCESS_NAME, SCHEDULE_TIME, 10, CLUSTER_NAME, getAbsolutePath(WORKFLOW), - true, ""); + result = scheduleProcess(PROCESS_NAME, SCHEDULE_TIME, 1, CLUSTER_NAME, getAbsolutePath(WORKFLOW), true, ""); assertStatus(result); - waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME); + waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME, InstancesResult.WorkflowStatus.SUCCEEDED); Process process = getEntity(EntityType.PROCESS, PROCESS_NAME); setDummyProperty(process); @@ -228,4 +232,101 @@ public class TestFalconUnit extends FalconUnitTestBase { process.getProperties().getProperties().add(property); } + + @Test + public void testProcessInstanceManagementAPI1() throws Exception { + submitClusterAndFeeds(); + // submitting and scheduling process + createData(INPUT_FEED_NAME, CLUSTER_NAME, SCHEDULE_TIME, INPUT_FILE_NAME); + APIResult result = submitProcess(getAbsolutePath(PROCESS), PROCESS_APP_PATH); + assertStatus(result); + result = scheduleProcess(PROCESS_NAME, SCHEDULE_TIME, 3, CLUSTER_NAME, getAbsolutePath(SLEEP_WORKFLOW), true, + ""); + assertStatus(result); + InstancesResult.WorkflowStatus currentStatus; + waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME, InstancesResult.WorkflowStatus.RUNNING); + currentStatus = getClient().getInstanceStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME); + Assert.assertEquals(currentStatus, InstancesResult.WorkflowStatus.RUNNING); + + getClient().suspendInstances(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME, END_TIME, null, + CLUSTER_NAME, null, null, null); + waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME, InstancesResult.WorkflowStatus.SUSPENDED); + currentStatus = getClient().getInstanceStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME); + Assert.assertEquals(currentStatus, InstancesResult.WorkflowStatus.SUSPENDED); + + getClient().resumeInstances(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME, END_TIME, null, + CLUSTER_NAME, null, null, null); + waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME, InstancesResult.WorkflowStatus.RUNNING); + currentStatus = getClient().getInstanceStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME); + Assert.assertEquals(currentStatus, InstancesResult.WorkflowStatus.RUNNING); + + getClient().killInstances(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME, END_TIME, null, CLUSTER_NAME, + null, null, null); + waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME, InstancesResult.WorkflowStatus.KILLED); + currentStatus = getClient().getInstanceStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME); + Assert.assertEquals(currentStatus, InstancesResult.WorkflowStatus.KILLED); + + getClient().rerunInstances(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME, END_TIME, null, null, + CLUSTER_NAME, null, null, true, null); + waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME, InstancesResult.WorkflowStatus.RUNNING); + currentStatus = getClient().getInstanceStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME); + Assert.assertEquals(currentStatus, InstancesResult.WorkflowStatus.RUNNING); + } + + @Test + public void testProcessInstanceManagementAPI2() throws Exception { + submitClusterAndFeeds(); + // submitting and scheduling process + createData(INPUT_FEED_NAME, CLUSTER_NAME, SCHEDULE_TIME, INPUT_FILE_NAME); + APIResult result = submitProcess(getAbsolutePath(PROCESS), PROCESS_APP_PATH); + assertStatus(result); + result = scheduleProcess(PROCESS_NAME, SCHEDULE_TIME, 3, CLUSTER_NAME, getAbsolutePath(WORKFLOW), true, ""); + assertStatus(result); + InstancesResult.WorkflowStatus currentStatus; + waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME, InstancesResult.WorkflowStatus.SUCCEEDED); + currentStatus = getClient().getInstanceStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME); + Assert.assertEquals(currentStatus, InstancesResult.WorkflowStatus.SUCCEEDED); + + InstancesSummaryResult summaryResult = getClient().getSummaryOfInstances(EntityType.PROCESS.name(), + PROCESS_NAME, SCHEDULE_TIME, END_TIME, null, null, null, "asc", null, null); + Assert.assertEquals(summaryResult.getStatus(), APIResult.Status.SUCCEEDED); + Assert.assertNotNull(summaryResult.getInstancesSummary()); + Assert.assertEquals(summaryResult.getInstancesSummary().length, 1); + + + InstancesResult instancesResult = getClient().getLogsOfInstances(EntityType.PROCESS.name(), PROCESS_NAME, + SCHEDULE_TIME, END_TIME, null, "0", null, null, "asc", null, new Integer(0), new Integer(1), null); + Assert.assertEquals(instancesResult.getStatus(), APIResult.Status.SUCCEEDED); + Assert.assertNotNull(instancesResult.getInstances()); + Assert.assertEquals(instancesResult.getInstances().length, 1); + + instancesResult = getClient().getParamsOfInstance(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME, null, + null, null); + Assert.assertEquals(instancesResult.getStatus(), APIResult.Status.SUCCEEDED); + Assert.assertNotNull(instancesResult.getInstances()); + Assert.assertEquals(instancesResult.getInstances().length, 1); + + InstanceDependencyResult dependencyResult = getClient().getInstanceDependencies(EntityType.PROCESS.name(), + PROCESS_NAME, SCHEDULE_TIME, null); + Assert.assertEquals(dependencyResult.getStatus(), APIResult.Status.SUCCEEDED); + Assert.assertNotNull(dependencyResult.getDependencies()); + Assert.assertEquals(dependencyResult.getDependencies().length, 2); //2 for input and output feed + } + + @Test + public void testFeedInstanceManagementAPI() throws Exception { + // submit with default props + submitCluster(); + // submitting feeds + APIResult result = submit(EntityType.FEED, getAbsolutePath(INPUT_FEED)); + assertStatus(result); + createData(INPUT_FEED_NAME, CLUSTER_NAME, SCHEDULE_TIME, INPUT_FILE_NAME); + String inPath = getFeedPathForTS(CLUSTER_NAME, INPUT_FEED_NAME, SCHEDULE_TIME); + Assert.assertTrue(fs.exists(new Path(inPath))); + result = schedule(EntityType.FEED, INPUT_FEED_NAME, CLUSTER_NAME); + Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); + FeedInstanceResult feedInstanceResult = getClient().getFeedListing(EntityType.FEED.name(), INPUT_FEED_NAME, + SCHEDULE_TIME, END_TIME, null, null); + Assert.assertEquals(feedInstanceResult.getStatus(), APIResult.Status.SUCCEEDED); + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/49fbc8c9/unit/src/test/java/org/apache/falcon/unit/examples/JavaSleepExample.java ---------------------------------------------------------------------- diff --git a/unit/src/test/java/org/apache/falcon/unit/examples/JavaSleepExample.java b/unit/src/test/java/org/apache/falcon/unit/examples/JavaSleepExample.java new file mode 100644 index 0000000..4f4f827 --- /dev/null +++ b/unit/src/test/java/org/apache/falcon/unit/examples/JavaSleepExample.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.unit.examples; + +/** + * Java Sleep Example. + */ +public final class JavaSleepExample { + + private JavaSleepExample() {} + + public static void main(String[] args) throws InterruptedException { + long start = System.currentTimeMillis(); + Thread.sleep(40000); + System.out.println("Sleep time in ms = " + (System.currentTimeMillis()-start)); + + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/49fbc8c9/unit/src/test/resources/sleepWorkflow.xml ---------------------------------------------------------------------- diff --git a/unit/src/test/resources/sleepWorkflow.xml b/unit/src/test/resources/sleepWorkflow.xml new file mode 100644 index 0000000..2f6598f --- /dev/null +++ b/unit/src/test/resources/sleepWorkflow.xml @@ -0,0 +1,41 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<workflow-app xmlns="uri:oozie:workflow:0.2" name="java-main-wf"> + <start to="java-node"/> + <action name="java-node"> + <java> + <job-tracker>local</job-tracker> + <name-node>jail://global:00</name-node> + <configuration> + <property> + <name>mapred.job.queue.name</name> + <value>default</value> + </property> + </configuration> + <main-class>org.apache.falcon.unit.examples.JavaSleepExample</main-class> + </java> + <ok to="end"/> + <error to="fail"/> + </action> + <kill name="fail"> + <message>Java failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> + </kill> + <end name="end"/> +</workflow-app> \ No newline at end of file
