Repository: falcon Updated Branches: refs/heads/master 4f42dc117 -> 42d379463
FALCON-2199 Delete API support for extension job (user extension) This pull request is dependent on https://github.com/apache/falcon/pull/331 Author: sandeep <[email protected]> Reviewers: @pallavi-rao Closes #333 from sandeepSamudrala/FALCON-2199 Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/42d37946 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/42d37946 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/42d37946 Branch: refs/heads/master Commit: 42d379463ddb7210f05153c318b1be66dbd1d5be Parents: 4f42dc1 Author: sandeep <[email protected]> Authored: Mon Jan 2 14:19:14 2017 +0530 Committer: Pallavi Rao <[email protected]> Committed: Mon Jan 2 14:19:14 2017 +0530 ---------------------------------------------------------------------- .../org/apache/falcon/ExtensionHandler.java | 4 +- .../falcon/client/AbstractFalconClient.java | 9 ++-- .../resource/AbstractExtensionManager.java | 30 +++++++++++++ .../falcon/resource/proxy/EntityProxyUtil.java | 31 +++++++++++++ .../resource/proxy/ExtensionManagerProxy.java | 46 ++++++++++++-------- .../proxy/SchedulableEntityManagerProxy.java | 26 +---------- .../service/BacklogMetricEmitterService.java | 13 +++--- .../apache/falcon/unit/FalconUnitClient.java | 13 +++++- .../falcon/unit/LocalExtensionManager.java | 15 +++++++ .../apache/falcon/unit/FalconUnitTestBase.java | 3 ++ .../org/apache/falcon/unit/TestFalconUnit.java | 37 ++++++++++++---- 11 files changed, 165 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/42d37946/client/src/main/java/org/apache/falcon/ExtensionHandler.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/ExtensionHandler.java b/client/src/main/java/org/apache/falcon/ExtensionHandler.java index 5e2f26d..f6f3346 100644 --- a/client/src/main/java/org/apache/falcon/ExtensionHandler.java +++ b/client/src/main/java/org/apache/falcon/ExtensionHandler.java @@ -54,7 +54,7 @@ public final class ExtensionHandler { private static final String TMP_BASE_DIR = String.format("file://%s", System.getProperty("java.io.tmpdir")); private static final String LOCATION = "location"; private static final String TYPE = "type"; - private static final String NAME = "name"; + private static final String NAME = "extensionName"; private static final String EXTENSION_BUILDER_INTERFACE_SERVICE_FILE = "META-INF/services/org.apache.falcon.extensions.ExtensionBuilder"; @@ -185,8 +185,6 @@ public final class ExtensionHandler { for (File innerFile : files) { if (innerFile.isFile()) { urls.add(innerFile.toURI().toURL()); - } else { - urls.addAll(getFilesInPath(file.toURI().toURL())); } } } http://git-wip-us.apache.org/repos/asf/falcon/blob/42d37946/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java index 8cdbf30..e4ce993 100644 --- a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java @@ -203,7 +203,6 @@ public abstract class AbstractFalconClient { * loadAndPrepare. * @param configPath path to extension parameters. * @return - * @throws FalconCLIException */ public abstract APIResult submitExtensionJob(String extensionName, String jobName, String configPath, String doAsUser); @@ -216,7 +215,6 @@ public abstract class AbstractFalconClient { * loadAndPrepare. * @param configPath path to extension parameters. * @return - * @throws FalconCLIException */ public abstract APIResult submitAndScheduleExtensionJob(String extensionName, String jobName, String configPath, String doAsUser); @@ -227,11 +225,16 @@ public abstract class AbstractFalconClient { * loadAndPrepare. * @param configPath path to extension parameters. * @return - * @throws FalconCLIException */ public abstract APIResult updateExtensionJob(String jobName, String configPath, String doAsUser); /** + * Deletes the entities that are part of the extension job and then deleted the job from the DB. + * @param jobName name of the extension job that needs to be deleted. + * @return APIResult status of the deletion query. + */ + public abstract APIResult deleteExtensionJob(final String jobName, final String doAsUser); + /** * Prepares set of entities the extension has implemented to validate the extension job. * @param jobName job name of the extension job. * @return http://git-wip-us.apache.org/repos/asf/falcon/blob/42d37946/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java index 9fb0dd4..f1ed6f5 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java @@ -20,7 +20,11 @@ package org.apache.falcon.resource; import org.apache.commons.lang3.StringUtils; import org.apache.falcon.FalconException; import org.apache.falcon.FalconWebException; +import org.apache.falcon.entity.EntityNotRegisteredException; +import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.parser.ValidationException; +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.extensions.jdbc.ExtensionMetaStore; import org.apache.falcon.extensions.store.ExtensionStore; import org.apache.falcon.persistence.ExtensionBean; @@ -33,7 +37,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.ws.rs.core.Response; +import java.io.IOException; +import java.util.ArrayList; import java.util.List; +import java.util.SortedMap; +import java.util.TreeMap; /** * A base class for managing Extension Operations. @@ -106,6 +114,28 @@ public class AbstractExtensionManager extends AbstractSchedulableEntityManager { } } + protected SortedMap<EntityType, List<Entity>> getJobEntities(ExtensionJobsBean extensionJobsBean) + throws FalconException, IOException { + TreeMap<EntityType, List<Entity>> entityMap = new TreeMap<>(); + List<String> processes = extensionJobsBean.getProcesses(); + List<String> feeds = extensionJobsBean.getFeeds(); + entityMap.put(EntityType.PROCESS, getEntities(processes, EntityType.PROCESS)); + entityMap.put(EntityType.FEED, getEntities(feeds, EntityType.FEED)); + return entityMap; + } + + private List<Entity> getEntities(List<String> entityNames, EntityType entityType) throws FalconException { + List<Entity> entities = new ArrayList<>(); + for (String entityName : entityNames) { + try { + entities.add(EntityUtil.getEntity(entityType, entityName)); + } catch (EntityNotRegisteredException e) { + LOG.error("Entity {} not found during deletion nothing to do", entityName); + } + } + return entities; + } + private JSONObject buildExtensionJobDetailResult(final String jobName) throws FalconException { ExtensionMetaStore metaStore = ExtensionStore.getMetaStore(); ExtensionJobsBean jobsBean = metaStore.getExtensionJobDetails(jobName); http://git-wip-us.apache.org/repos/asf/falcon/blob/42d37946/prism/src/main/java/org/apache/falcon/resource/proxy/EntityProxyUtil.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/EntityProxyUtil.java b/prism/src/main/java/org/apache/falcon/resource/proxy/EntityProxyUtil.java index a07a6d4..ae0a61a 100644 --- a/prism/src/main/java/org/apache/falcon/resource/proxy/EntityProxyUtil.java +++ b/prism/src/main/java/org/apache/falcon/resource/proxy/EntityProxyUtil.java @@ -20,6 +20,9 @@ package org.apache.falcon.resource.proxy; import org.apache.falcon.FalconException; import org.apache.falcon.FalconRuntimException; +import org.apache.falcon.FalconWebException; +import org.apache.falcon.entity.EntityNotRegisteredException; +import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.resource.APIResult; import org.apache.falcon.resource.channel.Channel; @@ -36,6 +39,9 @@ import static org.apache.falcon.resource.AbstractEntityManager.getAllColos; import static org.apache.falcon.resource.AbstractEntityManager.getApplicableColos; import static org.apache.falcon.resource.proxy.SchedulableEntityManagerProxy.FALCON_TAG; +/** + * Proxy Util class to proxy entity management apis from prism to servers. + */ class EntityProxyUtil { private final Map<String, Channel> entityManagerChannels = new HashMap<>(); private final Map<String, Channel> configSyncChannels = new HashMap<>(); @@ -89,6 +95,31 @@ class EntityProxyUtil { return results; } + Map<String, APIResult> proxyDelete(final String type, final String entityName, + final HttpServletRequest bufferedRequest) { + Map<String, APIResult> results = new HashMap<>(); + results.put(FALCON_TAG, new EntityProxy(type, entityName) { + @Override + public APIResult execute() { + try { + EntityUtil.getEntity(type, entityName); + return super.execute(); + } catch (EntityNotRegisteredException e) { + return new APIResult(APIResult.Status.SUCCEEDED, + entityName + "(" + type + ") doesn't exist. Nothing to do"); + } catch (FalconException e) { + throw FalconWebException.newAPIException(e); + } + } + + @Override + protected APIResult doExecute(String colo) throws FalconException { + return getConfigSyncChannel(colo).invoke("delete", bufferedRequest, type, entityName, colo); + } + }.execute()); + return results; + } + Map<String, APIResult> proxyUpdate(final String type, final String entityName, final Boolean skipDryRun, final HttpServletRequest bufferedRequest, Entity newEntity) { final Set<String> oldColos = getApplicableColos(type, entityName); http://git-wip-us.apache.org/repos/asf/falcon/blob/42d37946/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java index 0e79f12..551dbbf 100644 --- a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java +++ b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java @@ -96,7 +96,6 @@ public class ExtensionManagerProxy extends AbstractExtensionManager { private String currentColo = DeploymentUtil.getCurrentColo(); private EntityProxyUtil entityProxyUtil = new EntityProxyUtil(); - private static final String EXTENSION_PROPERTY_JSON_SUFFIX = "-properties.json"; //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck @GET @@ -271,28 +270,26 @@ public class ExtensionManagerProxy extends AbstractExtensionManager { @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN}) @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON}) public APIResult delete(@PathParam("job-name") String jobName, + @Context HttpServletRequest request, @DefaultValue("") @QueryParam("doAs") String doAsUser) { checkIfExtensionServiceIsEnabled(); - try { - List<Entity> entities = getEntityList("", "", "", TAG_PREFIX_EXTENSION_JOB + jobName, "", doAsUser); - if (entities.isEmpty()) { - // return failure if the extension job doesn't exist - return new APIResult(APIResult.Status.SUCCEEDED, - "Extension job " + jobName + " doesn't exist. Nothing to delete."); - } + ExtensionMetaStore metaStore = ExtensionStore.getMetaStore(); + ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName); + if (extensionJobsBean == null) { + // return failure if the extension job doesn't exist + return new APIResult(APIResult.Status.SUCCEEDED, + "Extension job " + jobName + " doesn't exist. Nothing to delete."); + } - for (Entity entity : entities) { - // TODO(yzheng): need to remember the entity dependency graph for clean ordered removal - canRemove(entity); - if (entity.getEntityType().isSchedulable() && !DeploymentUtil.isPrism()) { - getWorkflowEngine(entity).delete(entity); - } - configStore.remove(entity.getEntityType(), entity.getName()); - } - } catch (FalconException | IOException e) { + SortedMap<EntityType, List<Entity>> entityMap; + try { + entityMap = getJobEntities(extensionJobsBean); + deleteEntities(entityMap, request); + } catch (FalconException | IOException | JAXBException e) { LOG.error("Error when deleting extension job: " + jobName + ": ", e); throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); } + metaStore.deleteExtensionJob(jobName); return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + jobName + " deleted successfully"); } @@ -423,6 +420,21 @@ public class ExtensionManagerProxy extends AbstractExtensionManager { return new BufferedRequest(request); } + private void deleteEntities(SortedMap<EntityType, List<Entity>> entityMap, HttpServletRequest request) + throws IOException, JAXBException { + for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) { + for (final Entity entity : entry.getValue()) { + final HttpServletRequest bufferedRequest = getEntityStream(entity, entity.getEntityType(), request); + final String entityType = entity.getEntityType().toString(); + final String entityName = entity.getName(); + entityProxyUtil.proxyDelete(entityType, entityName, bufferedRequest); + if (!embeddedMode) { + super.delete(bufferedRequest, entityType, entityName, currentColo); + } + } + } + } + private void submitEntities(String extensionName, String jobName, SortedMap<EntityType, List<Entity>> entityMap, InputStream configStream, HttpServletRequest request) throws FalconException, IOException, JAXBException { http://git-wip-us.apache.org/repos/asf/falcon/blob/42d37946/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java index 74a1acc..8f41c48 100644 --- a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java +++ b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java @@ -21,7 +21,6 @@ package org.apache.falcon.resource.proxy; import org.apache.commons.lang.StringUtils; import org.apache.falcon.FalconException; import org.apache.falcon.FalconWebException; -import org.apache.falcon.entity.EntityNotRegisteredException; import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; @@ -222,29 +221,8 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana throw FalconWebException.newAPIException(e); } final HttpServletRequest bufferedRequest = new BufferedRequest(request); - Map<String, APIResult> results = new HashMap<String, APIResult>(); - - results.put(FALCON_TAG, new EntityProxy(type, entityName) { - @Override - public APIResult execute() { - try { - EntityUtil.getEntity(type, entityName); - return super.execute(); - } catch (EntityNotRegisteredException e) { - return new APIResult(APIResult.Status.SUCCEEDED, - entityName + "(" + type + ") doesn't exist. Nothing to do"); - } catch (FalconException e) { - throw FalconWebException.newAPIException(e); - } - } - - @Override - protected APIResult doExecute(String colo) throws FalconException { - return entityProxyUtil.getConfigSyncChannel(colo).invoke("delete", bufferedRequest, type, entityName, - colo); - } - }.execute()); - + Map<String, APIResult> results = new HashMap<>(); + results.putAll(entityProxyUtil.proxyDelete(type, entityName, bufferedRequest)); // delete only if deleted from everywhere if (!embeddedMode && results.get(FALCON_TAG).getStatus() == APIResult.Status.SUCCEEDED) { results.put(PRISM_TAG, super.delete(bufferedRequest, type, entityName, currentColo)); http://git-wip-us.apache.org/repos/asf/falcon/blob/42d37946/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java b/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java index 23f4cf1..16830f9 100644 --- a/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java +++ b/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java @@ -116,11 +116,14 @@ public final class BacklogMetricEmitterService implements FalconService, if (entity.getEntityType() != EntityType.PROCESS){ return; } - backlogMetricStore.deleteEntityInstance(entity.getName()); - entityBacklogs.remove(entity); - Process process = EntityUtil.getEntity(entity.getEntityType(), entity.getName()); - for(Cluster cluster : process.getClusters().getClusters()){ - dropMetric(cluster.getName(), process); + Process process = (Process) entity; + if (process.getSla() != null) { + backlogMetricStore.deleteEntityInstance(entity.getName()); + entityBacklogs.remove(entity); + process = EntityUtil.getEntity(entity.getEntityType(), entity.getName()); + for (Cluster cluster : process.getClusters().getClusters()) { + dropMetric(cluster.getName(), process); + } } } http://git-wip-us.apache.org/repos/asf/falcon/blob/42d37946/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java ---------------------------------------------------------------------- diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java index 66b8e9b..9ed2a0d 100644 --- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java +++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java @@ -336,6 +336,7 @@ public class FalconUnitClient extends AbstractFalconClient { } } + @Override public APIResult updateExtensionJob(String jobName, String configPath, String doAsUser) { InputStream configStream = getServletInputStream(configPath); try { @@ -344,10 +345,20 @@ public class FalconUnitClient extends AbstractFalconClient { return localExtensionManager.updateExtensionJob(extensionName, jobName, configStream, entityMap); } catch (FalconException | IOException e) { - throw new FalconCLIException("Failed in updating the extension job " + jobName); + throw new FalconCLIException("Failed in updating the extension job:" + jobName); + } + } + + @Override + public APIResult deleteExtensionJob(String jobName, String doAsUser) { + try { + return localExtensionManager.deleteExtensionJob(jobName); + } catch (FalconException | IOException e) { + throw new FalconCLIException("Failed to delete the extension job:" + jobName); } } + @Override public APIResult getExtensionJobDetails(final String jobName) { return localExtensionManager.getExtensionJobDetails(jobName); http://git-wip-us.apache.org/repos/asf/falcon/blob/42d37946/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java ---------------------------------------------------------------------- diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java index 7002dc8..0412ef2 100644 --- a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java +++ b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java @@ -22,7 +22,9 @@ import org.apache.commons.io.IOUtils; import org.apache.falcon.FalconException; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.extensions.jdbc.ExtensionMetaStore; import org.apache.falcon.extensions.store.ExtensionStore; +import org.apache.falcon.persistence.ExtensionJobsBean; import org.apache.falcon.resource.APIResult; import org.apache.falcon.resource.AbstractExtensionManager; import org.apache.falcon.security.CurrentUser; @@ -86,6 +88,19 @@ public class LocalExtensionManager extends AbstractExtensionManager { return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted successfully" + jobName); } + public APIResult deleteExtensionJob(String jobName) throws FalconException, IOException{ + ExtensionMetaStore metaStore = ExtensionStore.getMetaStore(); + ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName); + SortedMap<EntityType, List<Entity>> entityMap = getJobEntities(extensionJobsBean); + for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) { + for (Entity entity : entry.getValue()) { + delete(entity.getEntityType().name(), entity.getName(), null); + } + } + ExtensionStore.getMetaStore().deleteExtensionJob(jobName); + return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + jobName + " deleted successfully"); + } + public APIResult updateExtensionJob(String extensionName, String jobName, InputStream configStream, SortedMap<EntityType, List<Entity>> entityMap) throws FalconException, IOException { http://git-wip-us.apache.org/repos/asf/falcon/blob/42d37946/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java ---------------------------------------------------------------------- diff --git a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java index 690fdd5..9e836e7 100644 --- a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java +++ b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java @@ -245,6 +245,9 @@ public class FalconUnitTestBase { return falconUnitClient.updateExtensionJob(jobName, configPath, doAsUser); } + APIResult deleteExtensionJob(String jobName, String doAsUser) { + return falconUnitClient.deleteExtensionJob(jobName, doAsUser); + } public static String overlayParametersOverTemplate(String template, Map<String, String> overlay) throws IOException { http://git-wip-us.apache.org/repos/asf/falcon/blob/42d37946/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java ---------------------------------------------------------------------- diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java index 07d8195..a41743d 100644 --- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java +++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java @@ -19,6 +19,7 @@ package org.apache.falcon.unit; import org.apache.falcon.FalconException; import org.apache.falcon.FalconWebException; +import org.apache.falcon.entity.EntityNotRegisteredException; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.entity.v0.process.Property; @@ -72,6 +73,8 @@ public class TestFalconUnit extends FalconUnitTestBase { private static final String EXTENSION_PATH = "/projects/falcon/extension/testExtension"; private static final String JARS_DIR = "file:///" + System.getProperty("user.dir") + "/src/test/resources"; private static final String EXTENSION_PROPERTIES = "extension.properties"; + private static final String TEST_JOB = "testJob"; + private static final String TEST_EXTENSION = "testExtension"; private FileSystem fileSystem; private static final String STORAGE_URL = "jail://global:00"; @@ -431,39 +434,55 @@ public class TestFalconUnit extends FalconUnitTestBase { } @Test - public void testSubmitAndScheduleExtensionJob() throws Exception { + public void testExtensionJobOperations() throws Exception { clearDB(); submitCluster(); createExtensionPackage(); String packageBuildLib = new Path(EXTENSION_PATH, "libs/build/").toString(); - String result = registerExtension("testExtension", STORAGE_URL + EXTENSION_PATH, "testExtension"); + String result = registerExtension(TEST_EXTENSION, STORAGE_URL + EXTENSION_PATH, TEST_EXTENSION); Assert.assertEquals(result, "Extension :testExtension registered successfully."); createDir(PROCESS_APP_PATH); copyExtensionJar(packageBuildLib); - APIResult apiResult = submitAndScheduleExtensionJob("testExtension", "testJob", null, null); + APIResult apiResult = submitAndScheduleExtensionJob(TEST_EXTENSION, TEST_JOB, null, null); assertStatus(apiResult); - result = getExtensionJobDetails("testJob"); + result = getExtensionJobDetails(TEST_JOB); JSONObject resultJson = new JSONObject(result); - Assert.assertEquals(resultJson.get("extensionName"), "testExtension"); - Process process = (Process)getClient().getDefinition(EntityType.PROCESS.toString(), "sample", null); + Assert.assertEquals(resultJson.get("extensionName"), TEST_EXTENSION); + Process process = (Process) getClient().getDefinition(EntityType.PROCESS.toString(), "sample", null); Assert.assertEquals(process.getPipelines(), "testPipeline"); apiResult = getClient().getStatus(EntityType.PROCESS, "sample", CLUSTER_NAME, null, false); assertStatus(apiResult); Assert.assertEquals(apiResult.getMessage(), "RUNNING"); - apiResult = updateExtensionJob("testJob", getAbsolutePath(EXTENSION_PROPERTIES), null); + apiResult = updateExtensionJob(TEST_JOB, getAbsolutePath(EXTENSION_PROPERTIES), null); assertStatus(apiResult); - String processes = new JSONObject(getExtensionJobDetails("testJob")).get("processes").toString(); + String processes = new JSONObject(getExtensionJobDetails(TEST_JOB)).get("processes").toString(); Assert.assertEquals(processes, "sample"); - process = (Process)getClient().getDefinition(EntityType.PROCESS.toString(), "sample", null); + process = (Process) getClient().getDefinition(EntityType.PROCESS.toString(), "sample", null); Assert.assertEquals(process.getPipelines(), "testSample"); apiResult = getClient().getStatus(EntityType.PROCESS, "sample", CLUSTER_NAME, null, false); assertStatus(apiResult); Assert.assertEquals(apiResult.getMessage(), "RUNNING"); + + apiResult = deleteExtensionJob(TEST_JOB, null); + assertStatus(apiResult); + try { + getEntity(EntityType.PROCESS, "sample"); + Assert.fail("Should have thrown a validation exception"); + } catch (EntityNotRegisteredException e) { + //Do nothing. Exception Expected + } + try { + getClient().getExtensionJobDetails(TEST_JOB); + Assert.fail("Should have thrown a FalconWebException"); + } catch (FalconWebException e) { + Assert.assertEquals(((APIResult) e.getResponse().getEntity()).getMessage(), "Job name not found:testJob"); + //Do nothing. Exception Expected. + } }
