Repository: falcon Updated Branches: refs/heads/master c30fce751 -> 7854e3d90
FALCON-1520 Delete, update, Validate entity operations support in Falcon Unit (by Pavan Kolamuri) Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/7854e3d9 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/7854e3d9 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/7854e3d9 Branch: refs/heads/master Commit: 7854e3d90e7a5258b85f45afd00694e3f5157142 Parents: c30fce7 Author: Pallavi Rao <[email protected]> Authored: Mon Oct 26 16:44:30 2015 +0530 Committer: Pallavi Rao <[email protected]> Committed: Mon Oct 26 16:44:30 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../falcon/client/AbstractFalconClient.java | 61 +++++- .../falcon/client/FalconCLIException.java | 4 + .../falcon/entity/store/ConfigurationStore.java | 2 + .../oozie/client/LocalProxyOozieClient.java | 4 +- .../falcon/resource/AbstractEntityManager.java | 47 +++-- .../AbstractSchedulableEntityManager.java | 4 +- .../proxy/SchedulableEntityManagerProxy.java | 4 +- .../apache/falcon/unit/FalconUnitClient.java | 89 +++++---- .../unit/LocalSchedulableEntityManager.java | 31 +++- .../apache/falcon/unit/FalconUnitTestBase.java | 11 +- .../org/apache/falcon/unit/TestFalconUnit.java | 185 +++++++++++++++---- unit/src/test/resources/process1.xml | 50 ----- 13 files changed, 341 insertions(+), 152 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/7854e3d9/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 95dd69d..c00c265 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -10,6 +10,7 @@ Trunk (Unreleased) FALCON-1213 Base framework of the native scheduler(Pallavi Rao) IMPROVEMENTS + FALCON-1520 Delete, update, Validate entity operations support in Falcon Unit (Pavan Kolamuri via Pallavi Rao) OPTIMIZATIONS http://git-wip-us.apache.org/repos/asf/falcon/blob/7854e3d9/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java index b889931..91d5324 100644 --- a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java @@ -18,6 +18,7 @@ package org.apache.falcon.client; import org.apache.falcon.LifeCycle; +import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.resource.APIResult; import org.apache.falcon.resource.InstancesResult; @@ -31,12 +32,14 @@ import java.util.List; */ public abstract class AbstractFalconClient { + //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck + /** * Submit a new entity. Entities can be of type feed, process or data end * points. Entity definitions are validated structurally against schema and * subsequently for other rules before they are admitted into the system. - * @param entityType - * @param filePath + * @param entityType Entity type. Valid options are cluster, feed or process. + * @param filePath Path for the entity definition * @return * @throws FalconCLIException */ @@ -45,17 +48,63 @@ public abstract class AbstractFalconClient { /** * Schedules an submitted process entity immediately. - * @param entityType - * @param entityName - * @param colo + * @param entityType Entity type. Valid options are cluster, feed or process. + * @param entityName Name of the entity. + * @param colo Cluster name. * @return * @throws FalconCLIException */ public abstract APIResult schedule(EntityType entityType, String entityName, String colo, Boolean skipDryRun, String doAsuser, String properties) throws FalconCLIException; + /** + * Delete the specified entity. + * @param entityType Entity type. Valid options are cluster, feed or process. + * @param entityName Name of the entity. + * @param doAsUser Proxy User. + * @return + * @throws FalconCLIException + */ + public abstract APIResult delete(EntityType entityType, String entityName, + String doAsUser) throws FalconCLIException; + + /** + * Validates the submitted entity. + * @param entityType Entity type. Valid options are cluster, feed or process. + * @param filePath Path for the entity definition to validate. + * @param skipDryRun Dry run. + * @param doAsUser Proxy User. + * @return + * @throws FalconCLIException + */ + public abstract APIResult validate(String entityType, String filePath, Boolean skipDryRun, + String doAsUser) throws FalconCLIException; + + /** + * Updates the submitted entity. + * @param entityType Entity type. Valid options are cluster, feed or process. + * @param entityName Name of the entity. + * @param filePath Path for the entity definition to update. + * @param skipDryRun Dry run. + * @param doAsUser Proxy User. + * @return + * @throws FalconCLIException + */ + public abstract APIResult update(String entityType, String entityName, String filePath, + Boolean skipDryRun, String doAsUser) throws FalconCLIException; + + /** + * Get definition of the entity. + * @param entityType Entity type. Valid options are cluster, feed or process. + * @param entityName Name of the entity. + * @param doAsUser Proxy user. + * @return + * @throws FalconCLIException + */ + public abstract Entity getDefinition(String entityType, String entityName, + String doAsUser) throws FalconCLIException; + - //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck /** * http://git-wip-us.apache.org/repos/asf/falcon/blob/7854e3d9/client/src/main/java/org/apache/falcon/client/FalconCLIException.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/FalconCLIException.java b/client/src/main/java/org/apache/falcon/client/FalconCLIException.java index ec74c27..51ef952 100644 --- a/client/src/main/java/org/apache/falcon/client/FalconCLIException.java +++ b/client/src/main/java/org/apache/falcon/client/FalconCLIException.java @@ -36,6 +36,10 @@ public class FalconCLIException extends Exception { super(msg); } + public FalconCLIException(Throwable e) { + super(e); + } + public FalconCLIException(String msg, Throwable throwable) { super(msg, throwable); } http://git-wip-us.apache.org/repos/asf/falcon/blob/7854e3d9/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java index e27187b..4dd1c68 100644 --- a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java +++ b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java @@ -60,6 +60,8 @@ public final class ConfigurationStore implements FalconService { private static final EntityType[] ENTITY_LOAD_ORDER = new EntityType[] { EntityType.CLUSTER, EntityType.FEED, EntityType.PROCESS, }; + public static final EntityType[] ENTITY_DELETE_ORDER = new EntityType[] { EntityType.PROCESS, EntityType.FEED, + EntityType.CLUSTER, }; private static final Logger LOG = LoggerFactory.getLogger(ConfigurationStore.class); private static final Logger AUDIT = LoggerFactory.getLogger("AUDIT"); http://git-wip-us.apache.org/repos/asf/falcon/blob/7854e3d9/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java b/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java index 756828f..c2100d1 100644 --- a/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java +++ b/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java @@ -166,7 +166,7 @@ public class LocalProxyOozieClient extends OozieClient { @Override public void reRun(String jobId, Properties conf) throws OozieClientException { - throw new IllegalStateException("Rerun not supported "); + getClient(jobId).reRun(jobId, conf); } @Override @@ -181,7 +181,7 @@ public class LocalProxyOozieClient extends OozieClient { @Override public void kill(String jobId) throws OozieClientException { - throw new IllegalStateException("Kill not supported"); + getClient(jobId).kill(jobId); } @Override http://git-wip-us.apache.org/repos/asf/falcon/blob/7854e3d9/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java index 3323dd1..16ef83a 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java @@ -75,7 +75,7 @@ import java.util.Set; public abstract class AbstractEntityManager { private static final Logger LOG = LoggerFactory.getLogger(AbstractEntityManager.class); private static MemoryLocks memoryLocks = MemoryLocks.getInstance(); - private static final String DO_AS_PARAM = "doAs"; + protected static final String DO_AS_PARAM = "doAs"; protected static final int XML_DEBUG_LEN = 10 * 1024; private AbstractWorkflowEngine workflowEngine; @@ -195,7 +195,8 @@ public abstract class AbstractEntityManager { checkColo(colo); try { - Entity entity = submitInternal(request, type); + String doAsUser = request.getParameter(DO_AS_PARAM); + Entity entity = submitInternal(request.getInputStream(), type, doAsUser); return new APIResult(APIResult.Status.SUCCEEDED, "Submit successful (" + type + ") " + entity.getName()); } catch (Throwable e) { LOG.error("Unable to persist entity object", e); @@ -205,15 +206,24 @@ public abstract class AbstractEntityManager { /** * Post an entity XML with entity type. Validates the XML which can be - * Process, Feed or Dataendpoint + * Process, Feed or Data endpoint * * @param type entity type - * @return APIResule -Succeeded or Failed + * @return APIResult -Succeeded or Failed */ public APIResult validate(HttpServletRequest request, String type, Boolean skipDryRun) { try { + return validate(request.getInputStream(), type, skipDryRun); + } catch (IOException e) { + LOG.error("Unable to get InputStream from Request", request, e); + throw FalconWebException.newException(e, Response.Status.BAD_REQUEST); + } + } + + protected APIResult validate(InputStream inputStream, String type, Boolean skipDryRun) { + try { EntityType entityType = EntityType.getEnum(type); - Entity entity = deserializeEntity(request, entityType); + Entity entity = deserializeEntity(inputStream, entityType); validate(entity); //Validate that the entity can be scheduled in the cluster @@ -244,6 +254,11 @@ public abstract class AbstractEntityManager { * @return APIResult */ public APIResult delete(HttpServletRequest request, String type, String entity, String colo) { + return delete(type, entity, colo); + + } + + protected APIResult delete(String type, String entity, String colo) { checkColo(colo); List<Entity> tokenList = new ArrayList<>(); try { @@ -277,12 +292,23 @@ public abstract class AbstractEntityManager { public APIResult update(HttpServletRequest request, String type, String entityName, String colo, Boolean skipDryRun) { + try { + return update(request.getInputStream(), type, entityName, colo, skipDryRun); + } catch (IOException e) { + LOG.error("Unable to get InputStream from Request", request, e); + throw FalconWebException.newException(e, Response.Status.BAD_REQUEST); + } + + } + + protected APIResult update(InputStream inputStream, String type, String entityName, + String colo, Boolean skipDryRun) { checkColo(colo); List<Entity> tokenList = new ArrayList<>(); try { EntityType entityType = EntityType.getEnum(type); Entity oldEntity = EntityUtil.getEntity(type, entityName); - Entity newEntity = deserializeEntity(request, entityType); + Entity newEntity = deserializeEntity(inputStream, entityType); // KLUDGE - Until ACL is mandated entity passed should be decorated for equals check to pass decorateEntityWithACL(newEntity); validate(newEntity); @@ -309,7 +335,6 @@ public abstract class AbstractEntityManager { } configStore.update(entityType, newEntity); - return new APIResult(APIResult.Status.SUCCEEDED, result.toString()); } catch (Throwable e) { LOG.error("Update failed", e); @@ -399,11 +424,11 @@ public abstract class AbstractEntityManager { } } - protected Entity submitInternal(HttpServletRequest request, String type) + protected Entity submitInternal(InputStream inputStream, String type, String doAsUser) throws IOException, FalconException { EntityType entityType = EntityType.getEnum(type); - Entity entity = deserializeEntity(request, entityType); + Entity entity = deserializeEntity(inputStream, entityType); List<Entity> tokenList = new ArrayList<>(); // KLUDGE - Until ACL is mandated entity passed should be decorated for equals check to pass decorateEntityWithACL(entity); @@ -425,7 +450,6 @@ public abstract class AbstractEntityManager { + "Can't be submitted again. Try removing before submitting."); } - String doAsUser = request.getParameter(DO_AS_PARAM); SecurityUtil.tryProxy(entity, doAsUser); // proxy before validating since FS/Oozie needs to be proxied validate(entity); configStore.publish(entityType, entity); @@ -477,11 +501,10 @@ public abstract class AbstractEntityManager { } } - protected Entity deserializeEntity(HttpServletRequest request, EntityType entityType) + protected Entity deserializeEntity(InputStream xmlStream, EntityType entityType) throws IOException, FalconException { EntityParser<?> entityParser = EntityParserFactory.getParser(entityType); - InputStream xmlStream = request.getInputStream(); if (xmlStream.markSupported()) { xmlStream.mark(XML_DEBUG_LEN); // mark up to debug len } http://git-wip-us.apache.org/repos/asf/falcon/blob/7854e3d9/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java index 0db55df..d317aa1 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java @@ -82,7 +82,7 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM } } - private synchronized void scheduleInternal(String type, String entity, Boolean skipDryRun, + protected synchronized void scheduleInternal(String type, String entity, Boolean skipDryRun, Map<String, String> properties) throws FalconException, AuthorizationException { checkSchedulableEntity(type); @@ -187,7 +187,7 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM checkColo(colo); try { checkSchedulableEntity(type); - Entity entity = submitInternal(request, type); + Entity entity = submitInternal(request.getInputStream(), type, request.getParameter(DO_AS_PARAM)); scheduleInternal(type, entity.getName(), skipDryRun, EntityUtil.getPropertyMap(properties)); return new APIResult(APIResult.Status.SUCCEEDED, entity.getName() + "(" + type + ") scheduled successfully"); http://git-wip-us.apache.org/repos/asf/falcon/blob/7854e3d9/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java index 9d13d74..d3ba189 100644 --- a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java +++ b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java @@ -199,7 +199,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana private Entity getEntity(HttpServletRequest request, String type) { try { request.getInputStream().reset(); - Entity entity = deserializeEntity(request, EntityType.getEnum(type)); + Entity entity = deserializeEntity(request.getInputStream(), EntityType.getEnum(type)); request.getInputStream().reset(); return entity; } catch (Exception e) { @@ -225,7 +225,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana EntityType entityType = EntityType.getEnum(type); final Entity entity; try { - entity = deserializeEntity(bufferedRequest, entityType); + entity = deserializeEntity(bufferedRequest.getInputStream(), entityType); bufferedRequest.getInputStream().reset(); } catch (Exception e) { throw FalconWebException.newException("Unable to parse the request", Response.Status.BAD_REQUEST); http://git-wip-us.apache.org/repos/asf/falcon/blob/7854e3d9/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java ---------------------------------------------------------------------- diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java index 783af19..b5afae3 100644 --- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java +++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java @@ -23,8 +23,6 @@ import org.apache.falcon.LifeCycle; import org.apache.falcon.client.AbstractFalconClient; import org.apache.falcon.client.FalconCLIException; import org.apache.falcon.entity.EntityUtil; -import org.apache.falcon.entity.parser.EntityParser; -import org.apache.falcon.entity.parser.EntityParserFactory; import org.apache.falcon.entity.store.ConfigurationStore; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; @@ -42,7 +40,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.io.InputStream; import java.util.ArrayList; import java.util.Date; import java.util.List; @@ -55,6 +52,9 @@ public class FalconUnitClient extends AbstractFalconClient { private static final Logger LOG = LoggerFactory.getLogger(FalconUnitClient.class); + private static final String DEFAULT_ORDERBY = "status"; + private static final String DEFAULT_SORTED_ORDER = "asc"; + protected ConfigurationStore configStore; private AbstractWorkflowEngine workflowEngine; private LocalSchedulableEntityManager localSchedulableEntityManager; @@ -84,29 +84,9 @@ public class FalconUnitClient extends AbstractFalconClient { */ @Override public APIResult submit(String type, String filePath, String doAsUser) throws IOException, FalconCLIException { - try { - EntityType entityType = EntityType.getEnum(type); - InputStream entityStream = FalconUnitHelper.getFileInputStream(filePath); - EntityParser entityParser = EntityParserFactory.getParser(entityType); - Entity entity = entityParser.parse(entityStream); - - Entity existingEntity = configStore.get(entityType, entity.getName()); - if (existingEntity != null) { - if (EntityUtil.equals(existingEntity, entity)) { - LOG.warn(entity.toShortString() + " already registered with same definition " + entity.getName()); - return new APIResult(APIResult.Status.SUCCEEDED, "{} already registered with same definition" - + entity.getName()); - } - LOG.warn(entity.toShortString() + " already registered with different definition " - + "Can't be submitted again. Try removing before submitting."); - return new APIResult(APIResult.Status.FAILED, "{} already registered with different definition " - + "Can't be submitted again. Try removing before submitting." + entity.getName()); - } - entityParser.validate(entity); - configStore.publish(entityType, entity); - LOG.info("Submit successful: ({}): {}", entityType.name(), entity.getName()); - return new APIResult(APIResult.Status.SUCCEEDED, "Submit successful (" + type + ") " + entity.getName()); + try { + return localSchedulableEntityManager.submit(type, filePath, doAsUser); } catch (FalconException e) { throw new FalconCLIException("FAILED", e); } @@ -128,12 +108,56 @@ public class FalconUnitClient extends AbstractFalconClient { return schedule(entityType, entityName, null, 0, cluster, skipDryRun, properties); } + @Override + public APIResult delete(EntityType entityType, String entityName, String doAsUser) { + return localSchedulableEntityManager.delete(entityType, entityName, doAsUser); + } + + @Override + public APIResult validate(String entityType, String filePath, Boolean skipDryRun, + String doAsUser) throws FalconCLIException { + try { + return localSchedulableEntityManager.validate(entityType, filePath, skipDryRun, doAsUser); + } catch (FalconException e) { + throw new FalconCLIException(e); + } + } + + @Override + public APIResult update(String entityType, String entityName, String filePath, + Boolean skipDryRun, String doAsUser) throws FalconCLIException { + try { + return localSchedulableEntityManager.update(entityType, entityName, filePath, + skipDryRun, "local", doAsUser); + } catch (FalconException e) { + throw new FalconCLIException(e); + } + } + + @Override + public Entity getDefinition(String entityType, String entityName, String doAsUser) throws FalconCLIException { + String entity = localSchedulableEntityManager.getEntityDefinition(entityType, entityName); + return Entity.fromString(EntityType.getEnum(entityType), entity); + } + //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck @Override public InstancesResult getStatusOfInstances(String type, String entity, String start, String end, String colo, List<LifeCycle> lifeCycles, String filterBy, String orderBy, String sortOrder, Integer offset, Integer numResults, String doAsUser) throws FalconCLIException { + if (orderBy == null) { + orderBy = DEFAULT_ORDERBY; + } + if (sortOrder == null) { + sortOrder = DEFAULT_SORTED_ORDER; + } + if (offset == null) { + offset = 0; + } + if (numResults == null) { + numResults = 1; + } return localInstanceManager.getStatusOfInstances(type, entity, start, end, colo, lifeCycles, filterBy, orderBy, sortOrder, offset, numResults); @@ -164,7 +188,7 @@ public class FalconUnitClient extends AbstractFalconClient { if (StringUtils.isNotEmpty(startTime) && entityType == EntityType.PROCESS) { updateStartAndEndTime((Process) entity, startTime, numInstances, cluster); } - workflowEngine.schedule(entity, skipDryRun, EntityUtil.getPropertyMap(properties)); + workflowEngine.schedule(entity, skipDryRun, EntityUtil.getPropertyMap(properties)); LOG.info(entityName + " is scheduled successfully"); return new APIResult(APIResult.Status.SUCCEEDED, entity + "(" + "PROCESS" + ") scheduled successfully"); } catch (FalconException e) { @@ -180,16 +204,13 @@ public class FalconUnitClient extends AbstractFalconClient { * @param nominalTime nominal time of process * @return InstancesResult.WorkflowStatus */ - public InstancesResult.WorkflowStatus getInstanceStatus(EntityType entityType, String entityName, + public InstancesResult.WorkflowStatus getInstanceStatus(String entityType, String entityName, String nominalTime) throws Exception { - if (entityType == EntityType.CLUSTER) { - throw new IllegalArgumentException("Instance management functions don't apply to Cluster entities"); - } - Entity entityObject = EntityUtil.getEntity(entityType, entityName); Date startTime = SchemaHelper.parseDateUTC(nominalTime); - Date endTime = DateUtil.getNextMinute(startTime); - List<LifeCycle> lifeCycles = FalconUnitHelper.checkAndUpdateLifeCycle(null, entityType.name()); - InstancesResult instancesResult = workflowEngine.getStatus(entityObject, startTime, endTime, lifeCycles); + Date endTimeDate = DateUtil.getNextMinute(startTime); + String endTime = DateUtil.getDateFormatFromTime(endTimeDate.getTime()); + InstancesResult instancesResult = getStatusOfInstances(entityType, entityName, nominalTime, endTime, null, + null, null, null, null, null, null, null); if (instancesResult.getInstances() != null && instancesResult.getInstances().length > 0 && instancesResult.getInstances()[0] != null) { LOG.info("Instance status is " + instancesResult.getInstances()[0].getStatus()); http://git-wip-us.apache.org/repos/asf/falcon/blob/7854e3d9/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java ---------------------------------------------------------------------- diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java b/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java index 8b1c435..42adc9a 100644 --- a/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java +++ b/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java @@ -17,14 +17,19 @@ */ package org.apache.falcon.unit; +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.resource.APIResult; import org.apache.falcon.resource.AbstractSchedulableEntityManager; +import java.io.IOException; +import java.io.InputStream; + /** * A proxy implementation of the schedulable entity operations in local mode. */ public class LocalSchedulableEntityManager extends AbstractSchedulableEntityManager { - // Created for future purposes to add all entity API's here for falcon unit. public LocalSchedulableEntityManager() {} @@ -40,4 +45,28 @@ public class LocalSchedulableEntityManager extends AbstractSchedulableEntityMana return super.getStatus(type, entity, colo); } + public APIResult delete(EntityType entityType, String entityName, String doAsUser) { + if (entityType == null) { + throw new IllegalStateException("Entity-Type cannot be null"); + } + return super.delete(entityType.name(), entityName, doAsUser); + } + + public APIResult validate(String entityType, String filePath, Boolean skipDryRun, + String doAsUser) throws FalconException { + InputStream inputStream = FalconUnitHelper.getFileInputStream(filePath); + return super.validate(inputStream, entityType, skipDryRun); + } + + public APIResult update(String entityType, String entityName, String filePath, + Boolean skipDryRun, String doAsUser, String colo) throws FalconException { + InputStream inputStream = FalconUnitHelper.getFileInputStream(filePath); + return super.update(inputStream, entityType, entityName, colo, skipDryRun); + } + + public APIResult submit(String entityType, String filePath, String doAsUser) throws FalconException, IOException { + InputStream inputStream = FalconUnitHelper.getFileInputStream(filePath); + Entity entity = super.submitInternal(inputStream, entityType, doAsUser); + return new APIResult(APIResult.Status.SUCCEEDED, "Submit successful (" + entityType + ") " + entity.getName()); + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/7854e3d9/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java ---------------------------------------------------------------------- diff --git a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java index d12efbc..ac478f4 100644 --- a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java +++ b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java @@ -77,7 +77,6 @@ public class FalconUnitTestBase { * @throws Exception thrown if the predicate evaluation could not evaluate. */ public interface Predicate { - boolean evaluate() throws Exception; } @@ -122,9 +121,9 @@ public class FalconUnitTestBase { @AfterMethod public void cleanUpActionXml() throws IOException, FalconException { - for (EntityType type : EntityType.values()) { + for (EntityType type : ConfigurationStore.ENTITY_DELETE_ORDER) { for (String name : ConfigurationStore.get().getEntities(type)) { - ConfigurationStore.get().remove(type, name); + getClient().delete(type, name, null); } } //Needed since oozie writes action xml to current directory. @@ -275,7 +274,7 @@ public class FalconUnitTestBase { String inputFile) throws FalconException, ParseException, IOException { String feedPath = getFeedPathForTS(cluster, feedName, time); fs.mkdirs(new Path(feedPath)); - fs.copyFromLocalFile(new Path(getAbsolutePath("/" + inputFile)), new Path(feedPath)); + fs.copyFromLocalFile(new Path(getAbsolutePath(inputFile)), new Path(feedPath)); } protected String getFeedPathForTS(String cluster, String feedName, @@ -295,7 +294,7 @@ public class FalconUnitTestBase { public String getAbsolutePath(String fileName) { - return this.getClass().getResource(fileName).getPath(); + return this.getClass().getResource("/" + fileName).getPath(); } public void createDir(String path) throws IOException { @@ -333,7 +332,7 @@ public class FalconUnitTestBase { } } - protected long waitForStatus(final EntityType entityType, final String entityName, final String instanceTime) { + protected long waitForStatus(final String entityType, final String entityName, final String instanceTime) { return waitFor(WAIT_TIME, new Predicate() { public boolean evaluate() throws Exception { InstancesResult.WorkflowStatus status = falconUnitClient.getInstanceStatus(entityType, http://git-wip-us.apache.org/repos/asf/falcon/blob/7854e3d9/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java ---------------------------------------------------------------------- diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java index d504bd2..8cdbd88 100644 --- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java +++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java @@ -18,8 +18,11 @@ package org.apache.falcon.unit; import org.apache.falcon.FalconException; +import org.apache.falcon.FalconWebException; import org.apache.falcon.client.FalconCLIException; import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.process.Process; +import org.apache.falcon.entity.v0.process.Property; import org.apache.falcon.resource.APIResult; import org.apache.falcon.resource.InstancesResult; import org.apache.hadoop.fs.FileStatus; @@ -27,36 +30,47 @@ import org.apache.hadoop.fs.Path; import org.testng.Assert; import org.testng.annotations.Test; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; import java.io.IOException; import java.text.ParseException; +import static org.apache.falcon.entity.EntityUtil.getEntity; + + /** * Test cases of falcon jobs using Local Oozie and LocalJobRunner. */ public class TestFalconUnit extends FalconUnitTestBase { + private static final String INPUT_FEED = "infeed.xml"; + private static final String OUTPUT_FEED = "outfeed.xml"; + private static final String PROCESS = "process.xml"; + private static final String PROCESS_APP_PATH = "/app/oozie-mr"; + private static final String CLUSTER_NAME = "local"; + private static final String INPUT_FEED_NAME = "in"; + private static final String PROCESS_NAME = "process"; + private static final String OUTPUT_FEED_NAME = "out"; + private static final String INPUT_FILE_NAME = "input.txt"; + private static final String SCHEDULE_TIME = "2015-06-20T00:00Z"; + private static final String WORKFLOW = "workflow.xml"; + @Test public void testProcessInstanceExecution() throws Exception { - // submit with default props - submitCluster(); - // submitting feeds - APIResult result = submit(EntityType.FEED, getAbsolutePath("/infeed.xml")); - assertStatus(result); - result = submit(EntityType.FEED, getAbsolutePath("/outfeed.xml")); - assertStatus(result); + submitClusterAndFeeds(); // submitting and scheduling process - String scheduleTime = "2015-06-20T00:00Z"; - createData("in", "local", scheduleTime, "input.txt"); - result = submitProcess(getAbsolutePath("/process.xml"), "/app/oozie-mr"); + createData(INPUT_FEED_NAME, CLUSTER_NAME, SCHEDULE_TIME, INPUT_FILE_NAME); + APIResult result = submitProcess(getAbsolutePath(PROCESS), PROCESS_APP_PATH); assertStatus(result); - result = scheduleProcess("process", scheduleTime, 1, "local", getAbsolutePath("/workflow.xml"), + result = scheduleProcess(PROCESS_NAME, SCHEDULE_TIME, 1, CLUSTER_NAME, getAbsolutePath(WORKFLOW), true, ""); assertStatus(result); - waitForStatus(EntityType.PROCESS, "process", scheduleTime); - InstancesResult.WorkflowStatus status = falconUnitClient.getInstanceStatus(EntityType.PROCESS, - "process", scheduleTime); + waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME); + InstancesResult.WorkflowStatus status = getClient().getInstanceStatus(EntityType.PROCESS.name(), + PROCESS_NAME, SCHEDULE_TIME); Assert.assertEquals(status, InstancesResult.WorkflowStatus.SUCCEEDED); - String outPath = getFeedPathForTS("local", "out", scheduleTime); + String outPath = getFeedPathForTS(CLUSTER_NAME, OUTPUT_FEED_NAME, SCHEDULE_TIME); Assert.assertTrue(getFileSystem().exists(new Path(outPath))); FileStatus[] files = getFileSystem().listStatus(new Path(outPath)); Assert.assertTrue(files.length > 0); @@ -69,52 +83,149 @@ public class TestFalconUnit extends FalconUnitTestBase { // submit with default props submitCluster(); // submitting feeds - APIResult result = submit(EntityType.FEED, getAbsolutePath("/infeed.xml")); + APIResult result = submit(EntityType.FEED, getAbsolutePath(INPUT_FEED)); assertStatus(result); - String scheduleTime = "2015-06-20T00:00Z"; - createData("in", "local", scheduleTime, "input.txt"); - String inPath = getFeedPathForTS("local", "in", scheduleTime); + createData(INPUT_FEED_NAME, CLUSTER_NAME, SCHEDULE_TIME, INPUT_FILE_NAME); + String inPath = getFeedPathForTS(CLUSTER_NAME, INPUT_FEED_NAME, SCHEDULE_TIME); Assert.assertTrue(fs.exists(new Path(inPath))); - result = schedule(EntityType.FEED, "in", "local"); + result = schedule(EntityType.FEED, INPUT_FEED_NAME, CLUSTER_NAME); Assert.assertEquals(APIResult.Status.SUCCEEDED, result.getStatus()); waitFor(WAIT_TIME, new Predicate() { public boolean evaluate() throws Exception { - InstancesResult.WorkflowStatus status = getRetentionStatus("in", "local"); + InstancesResult.WorkflowStatus status = getRetentionStatus(INPUT_FEED_NAME, CLUSTER_NAME); return InstancesResult.WorkflowStatus.SUCCEEDED.equals(status); } }); - InstancesResult.WorkflowStatus status = getRetentionStatus("in", "local"); + InstancesResult.WorkflowStatus status = getRetentionStatus(INPUT_FEED_NAME, CLUSTER_NAME); Assert.assertEquals(InstancesResult.WorkflowStatus.SUCCEEDED, status); Assert.assertFalse(fs.exists(new Path(inPath))); } @Test public void testSuspendAndResume() throws Exception { - // submit with default props - submitCluster(); - // submitting feeds - APIResult result = submit(EntityType.FEED, getAbsolutePath("/infeed.xml")); - assertStatus(result); - result = submit(EntityType.FEED, getAbsolutePath("/outfeed.xml")); - assertStatus(result); + submitClusterAndFeeds(); // submitting and scheduling process String scheduleTime = "2015-06-20T00:00Z"; - createData("in", "local", scheduleTime, "input.txt"); - result = submitProcess(getAbsolutePath("/process1.xml"), "/app/oozie-mr"); + //String processName = "process1"; + createData(INPUT_FEED_NAME, CLUSTER_NAME, scheduleTime, INPUT_FILE_NAME); + APIResult result = submitProcess(getAbsolutePath(PROCESS), PROCESS_APP_PATH); assertStatus(result); - result = scheduleProcess("process1", scheduleTime, 2, "local", getAbsolutePath("/workflow.xml"), + result = scheduleProcess(PROCESS_NAME, scheduleTime, 2, CLUSTER_NAME, getAbsolutePath(WORKFLOW), true, ""); assertStatus(result); - waitForStatus(EntityType.PROCESS, "process1", scheduleTime); - result = getClient().suspend(EntityType.PROCESS, "process1", "local", null); + waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, scheduleTime); + result = getClient().suspend(EntityType.PROCESS, PROCESS_NAME, CLUSTER_NAME, null); assertStatus(result); - result = getClient().getStatus(EntityType.PROCESS, "process1", "local", null); + result = getClient().getStatus(EntityType.PROCESS, PROCESS_NAME, CLUSTER_NAME, null); assertStatus(result); Assert.assertEquals(result.getMessage(), "SUSPENDED"); - result = getClient().resume(EntityType.PROCESS, "process1", "local", null); + result = getClient().resume(EntityType.PROCESS, PROCESS_NAME, CLUSTER_NAME, null); assertStatus(result); - result = getClient().getStatus(EntityType.PROCESS, "process1", "local", null); + result = getClient().getStatus(EntityType.PROCESS, PROCESS_NAME, CLUSTER_NAME, null); assertStatus(result); Assert.assertEquals(result.getMessage(), "RUNNING"); } + + @Test + public void testDelete() throws IOException, FalconCLIException, FalconException, + ParseException, InterruptedException { + // submit cluster and feeds + submitClusterAndFeeds(); + APIResult result = submitProcess(getAbsolutePath(PROCESS), PROCESS_APP_PATH); + assertStatus(result); + createData(INPUT_FEED_NAME, CLUSTER_NAME, SCHEDULE_TIME, INPUT_FILE_NAME); + result = submitProcess(getAbsolutePath(PROCESS), PROCESS_APP_PATH); + assertStatus(result); + result = scheduleProcess(PROCESS_NAME, SCHEDULE_TIME, 10, CLUSTER_NAME, getAbsolutePath(WORKFLOW), + true, ""); + assertStatus(result); + waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME); + result = getClient().delete(EntityType.PROCESS, PROCESS_NAME, null); + assertStatus(result); + try { + getEntity(EntityType.PROCESS, PROCESS_NAME); + Assert.fail("Exception should be thrown"); + } catch (FalconException e) { + // nothing to do + } + + result = getClient().delete(EntityType.FEED, INPUT_FEED_NAME, null); + assertStatus(result); + try { + getEntity(EntityType.FEED, INPUT_FEED_NAME); + Assert.fail("Exception should be thrown"); + } catch (FalconException e) { + // nothing to do + } + } + + @Test + public void testValidate() throws IOException, FalconCLIException, FalconException { + submitClusterAndFeeds(); + APIResult result = getClient().validate(EntityType.PROCESS.name(), + getAbsolutePath(PROCESS), true, null); + assertStatus(result); + try { + getClient().validate(EntityType.PROCESS.name(), + getAbsolutePath(INPUT_FEED), true, null); + Assert.fail("Exception should be thrown"); + } catch (FalconWebException e) { + // nothing to do + } + } + + @Test + public void testUpdate() throws IOException, FalconCLIException, FalconException, + ParseException, InterruptedException { + submitClusterAndFeeds(); + APIResult result = submitProcess(getAbsolutePath(PROCESS), PROCESS_APP_PATH); + assertStatus(result); + result = getClient().update(EntityType.PROCESS.name(), PROCESS_NAME, + getAbsolutePath(PROCESS), true, null); + assertStatus(result); + createData(INPUT_FEED_NAME, CLUSTER_NAME, SCHEDULE_TIME, INPUT_FILE_NAME); + result = submitProcess(getAbsolutePath(PROCESS), PROCESS_APP_PATH); + assertStatus(result); + result = scheduleProcess(PROCESS_NAME, SCHEDULE_TIME, 10, CLUSTER_NAME, getAbsolutePath(WORKFLOW), + true, ""); + assertStatus(result); + waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME); + + Process process = getEntity(EntityType.PROCESS, PROCESS_NAME); + setDummyProperty(process); + String processXml = process.toString(); + + File file = new File("target/newprocess.xml"); + file.createNewFile(); + FileWriter fw = new FileWriter(file.getAbsoluteFile()); + BufferedWriter bw = new BufferedWriter(fw); + bw.write(processXml); + bw.close(); + + result = falconUnitClient.update(EntityType.PROCESS.name(), PROCESS_NAME, file.getAbsolutePath(), true, null); + assertStatus(result); + + process = getEntity(EntityType.PROCESS, + PROCESS_NAME); + Assert.assertEquals(process.toString(), processXml); + file.delete(); + } + + private void submitClusterAndFeeds() throws IOException, FalconCLIException { + // submit with default props + submitCluster(); + // submitting feeds + APIResult result = submit(EntityType.FEED, getAbsolutePath(INPUT_FEED)); + assertStatus(result); + result = submit(EntityType.FEED, getAbsolutePath(OUTPUT_FEED)); + assertStatus(result); + } + + public void setDummyProperty(Process process) { + Property property = new Property(); + property.setName("dummy"); + property.setValue("dummy"); + process.getProperties().getProperties().add(property); + + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/7854e3d9/unit/src/test/resources/process1.xml ---------------------------------------------------------------------- diff --git a/unit/src/test/resources/process1.xml b/unit/src/test/resources/process1.xml deleted file mode 100644 index 37dbb9c..0000000 --- a/unit/src/test/resources/process1.xml +++ /dev/null @@ -1,50 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - --> - -<process name="process1" xmlns="uri:falcon:process:0.1"> - <clusters> - <cluster name="local"> - <validity start="2013-11-18T00:05Z" end="2013-11-18T01:05Z"/> - </cluster> - </clusters> - - <parallel>5</parallel> - <order>FIFO</order> - <frequency>minutes(1)</frequency> - <timezone>UTC</timezone> - - <inputs> - <!-- In the workflow, the input paths will be available in a variable 'inpaths' --> - <input name="inpaths" feed="in" start="now(0,0)" end="now(0,0)" /> - </inputs> - - <outputs> - <!-- In the workflow, the output path will be available in a variable 'outpath' --> - <output name="outpath" feed="out" instance="now(0,0)"/> - </outputs> - - <properties> - <!-- In the workflow, these properties will be available with variable - key --> - <property name="queueName" value="default"/> - <!-- The schedule time available as a property in workflow --> - <property name="time" value="${instanceTime()}"/> - </properties> - - <workflow engine="oozie" path="/app/oozie-mr"/> -</process>
