Repository: falcon Updated Branches: refs/heads/master c215234e6 -> 52f308aa2
FALCON-2293 falcon extension fails for deletion if 2 extensions have same entity name Author: sandeep <[email protected]> Author: sandeep.samudrala <[email protected]> Reviewers: @pallavi-rao Closes #378 from sandeepSamudrala/FALCON-2293 and squashes the following commits: 478700c [sandeep.samudrala] FALCON-2293 Removed unused import d506de6 [sandeep] FALCON-2285 Addressed review comments 873889a [sandeep] FALCON-2285 Modified error message aa2390b [sandeep] FALCON-2293 falcon extension fails for deletion if 2 extensions have same entity name d0e39e8 [sandeep] Merge branch 'master' of https://github.com/apache/falcon 85750dd [sandeep] Merge branch 'master' of https://github.com/apache/falcon 432a03a [sandeep] Merge branch 'master' of https://github.com/apache/falcon 0780363 [sandeep] Merge branch 'master' of https://github.com/apache/falcon a3bd0e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon db425c5 [sandeep] Merge branch 'master' of https://github.com/apache/falcon 3f67fed [sandeep] Merge branch 'master' of https://github.com/apache/falcon cb2b00d [sandeep] Merge branch 'master' of https://github.com/apache/falcon 79e8d64 [sandeep] Merge branch 'master' of https://github.com/apache/falcon 7de7798 [sandeep] go -b FALCON-2263Merge branch 'master' of https://github.com/apache/falcon c5da0a2 [sandeep] Merge branch 'master' of https://github.com/apache/falcon 7e16263 [sandeep] Merge branch 'master' of https://github.com/apache/falcon a234d94 [sandeep] FALCON-2231 Incoporated review comments and small fixes for duplicate submission and colo addition to schedule command 26e3350 [sandeep] Merge branch 'master' of https://github.com/apache/falcon 73fbf75 [sandeep] Merge branch 'master' of https://github.com/apache/falcon cc28658 [sandeep] Merge branch 'master' of https://github.com/apache/falcon 089b10d [sandeep] Merge branch 'master' of https://github.com/apache/falcon 456d4ee [sandeep] Merge branch 'master' of https://github.com/apache/falcon 0cf9af6 [sandeep] Merge branch 'master' of https://github.com/apache/falcon 4a2e23e [sandeep] Merge branch 'master' of https://github.com/apache/falcon b1546ed [sandeep] Merge branch 'master' of https://github.com/apache/falcon 0a433fb [sandeep] Merge branch 'master' of https://github.com/apache/falcon 194f36a [sandeep] Merge branch 'master' of https://github.com/apache/falcon e0ad358 [sandeep] Merge branch 'master' of https://github.com/apache/falcon f96a084 [sandeep] Merge branch 'master' of https://github.com/apache/falcon 9cf36e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon bbca081 [sandeep] Merge branch 'master' of https://github.com/apache/falcon 48f6afa [sandeep] Merge branch 'master' of https://github.com/apache/falcon 250cc46 [sandeep] Merge branch 'master' of https://github.com/apache/falcon d0393e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon a178805 [sandeep] Merge branch 'master' of https://github.com/apache/falcon d6dc8bf [sandeep] Merge branch 'master' of https://github.com/apache/falcon 1bb8d3c [sandeep] Merge branch 'master' of https://github.com/apache/falcon c065566 [sandeep] reverting last line changes made 1a4dcd2 [sandeep] rebased and resolved the conflicts from master 271318b [sandeep] FALCON-2097. Adding UT to the new method for getting next instance time with Delay. a94d4fe [sandeep] rebasing from master 9e68a57 [sandeep] FALCON-298. Feed update with replication delay creates holes Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/52f308aa Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/52f308aa Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/52f308aa Branch: refs/heads/master Commit: 52f308aa262c413c76b7d06bb159930d367ea3a3 Parents: c215234 Author: sandeep <[email protected]> Authored: Thu Mar 23 08:44:31 2017 +0530 Committer: Pallavi Rao <[email protected]> Committed: Thu Mar 23 08:44:31 2017 +0530 ---------------------------------------------------------------------- .../org/apache/falcon/ExtensionHandler.java | 2 +- .../resource/AbstractExtensionManager.java | 20 ++++++++++++++++++++ .../resource/proxy/ExtensionManagerProxy.java | 15 ++++++++------- .../falcon/unit/LocalExtensionManager.java | 6 ++++++ .../org/apache/falcon/unit/TestFalconUnit.java | 7 +++++++ 5 files changed, 42 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/52f308aa/client/src/main/java/org/apache/falcon/ExtensionHandler.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/ExtensionHandler.java b/client/src/main/java/org/apache/falcon/ExtensionHandler.java index 769cd78..3961e46 100644 --- a/client/src/main/java/org/apache/falcon/ExtensionHandler.java +++ b/client/src/main/java/org/apache/falcon/ExtensionHandler.java @@ -148,7 +148,7 @@ public final class ExtensionHandler { private static String createStagePath(String extensionName, String jobName) { String stagePath = TMP_BASE_DIR + File.separator + extensionName + PATH_SEPARATOR + jobName - + PATH_SEPARATOR + System.currentTimeMillis()/1000; + + PATH_SEPARATOR + System.currentTimeMillis(); File tmpPath = new File(stagePath); if (tmpPath.mkdir()) { throw new FalconCLIException("Failed to create stage directory" + tmpPath.toString()); http://git-wip-us.apache.org/repos/asf/falcon/blob/52f308aa/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java index 4ffeb95..2131996 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java @@ -23,6 +23,7 @@ import org.apache.falcon.FalconWebException; import org.apache.falcon.entity.EntityNotRegisteredException; import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.parser.ValidationException; +import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.extensions.ExtensionStatus; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.extensions.jdbc.ExtensionMetaStore; @@ -276,6 +277,25 @@ public class AbstractExtensionManager extends AbstractSchedulableEntityManager { } } + protected static void checkIfPartOfAnotherExtension(String entityName, EntityType entityType, String jobName) + throws FalconException { + try { + Entity entity = EntityUtil.getEntity(entityType, entityName); + String extractedJobName = AbstractExtensionManager.getJobNameFromTag(entity.getTags()); + if (StringUtils.isBlank(extractedJobName)) { + LOG.error("Entity:{} is already submitted", entity.getName()); + throw FalconWebException.newAPIException("Entity:" + entity.getName() + " is already submitted", + Response.Status.INTERNAL_SERVER_ERROR); + } else if (!extractedJobName.equals(jobName)) { + LOG.error("Entity: {} is part another extension job:{}", entity.getName(), extractedJobName); + throw FalconWebException.newAPIException("Entity:" + entity.getName() +" is part another extension job:" + + extractedJobName, Response.Status.INTERNAL_SERVER_ERROR); + } + } catch (EntityNotRegisteredException ignored) { + //Valid. Ignore if its not submitted already. + } + } + protected static ExtensionBean getExtensionIfExists(String extensionName) { ExtensionMetaStore metaStore = ExtensionStore.getMetaStore(); ExtensionBean extensionBean = metaStore.getDetail(extensionName); http://git-wip-us.apache.org/repos/asf/falcon/blob/52f308aa/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java index 9808892..e05e2e3 100644 --- a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java +++ b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java @@ -420,8 +420,8 @@ public class ExtensionManagerProxy extends AbstractExtensionManager { throws FalconException, IOException, JAXBException { List<Entity> feeds = entityMap.get(EntityType.FEED); List<Entity> processes = entityMap.get(EntityType.PROCESS); - validateFeeds(feeds); - validateProcesses(processes); + validateFeeds(feeds, jobName); + validateProcesses(processes, jobName); List<String> feedNames = new ArrayList<>(); List<String> processNames = new ArrayList<>(); @@ -458,8 +458,8 @@ public class ExtensionManagerProxy extends AbstractExtensionManager { HttpServletRequest request) throws FalconException, IOException, JAXBException { List<Entity> feeds = entityMap.get(EntityType.FEED); List<Entity> processes = entityMap.get(EntityType.PROCESS); - validateFeeds(feeds); - validateProcesses(processes); + validateFeeds(feeds, jobName); + validateProcesses(processes, jobName); List<String> feedNames = new ArrayList<>(); List<String> processNames = new ArrayList<>(); @@ -503,16 +503,17 @@ public class ExtensionManagerProxy extends AbstractExtensionManager { return getBufferedRequest(new HttpServletRequestInputStreamWrapper(request, servletInputStream)); } - - private void validateFeeds(List<Entity> feeds) throws FalconException { + private void validateFeeds(List<Entity> feeds, String jobName) throws FalconException { for (Entity feed : feeds) { + checkIfPartOfAnotherExtension(feed.getName(), EntityType.FEED, jobName); super.validate(feed); } } - private void validateProcesses(List<Entity> processes) throws FalconException { + private void validateProcesses(List<Entity> processes, String jobName) throws FalconException { ProcessEntityParser processEntityParser = new ProcessEntityParser(); for (Entity process : processes) { + checkIfPartOfAnotherExtension(process.getName(), EntityType.PROCESS, jobName); processEntityParser.validate((Process) process, false); } } http://git-wip-us.apache.org/repos/asf/falcon/blob/52f308aa/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java ---------------------------------------------------------------------- diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java index 8936225..f196736 100644 --- a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java +++ b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java @@ -20,6 +20,7 @@ package org.apache.falcon.unit; import org.apache.commons.io.IOUtils; import org.apache.falcon.FalconException; +import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.extensions.jdbc.ExtensionMetaStore; @@ -48,8 +49,13 @@ public class LocalExtensionManager extends AbstractExtensionManager { SortedMap<EntityType, List<Entity>> entityMap) throws FalconException, IOException { checkIfExtensionIsEnabled(extensionName); checkIfExtensionJobNameExists(jobName, extensionName); + EntityUtil.applyTags(extensionName, jobName, entityMap.get(EntityType.FEED)); + EntityUtil.applyTags(extensionName, jobName, entityMap.get(EntityType.PROCESS)); for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) { for (Entity entity : entry.getValue()) { + checkIfPartOfAnotherExtension(entity.getName(), entity.getEntityType(), jobName); + } + for (Entity entity : entry.getValue()) { submitInternal(entity, "falconUser"); } } http://git-wip-us.apache.org/repos/asf/falcon/blob/52f308aa/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java ---------------------------------------------------------------------- diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java index 861a089..6b63c23 100644 --- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java +++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java @@ -74,6 +74,7 @@ public class TestFalconUnit extends FalconUnitTestBase { private static final String JARS_DIR = "file:///" + System.getProperty("user.dir") + "/src/test/resources"; private static final String EXTENSION_PROPERTIES = "extension.properties"; private static final String TEST_JOB = "testJob"; + private static final String TEST_JOB_DUPLICATE = "testJobDuplicate"; private static final String TEST_EXTENSION = "testExtension"; private FileSystem fileSystem; @@ -459,6 +460,12 @@ public class TestFalconUnit extends FalconUnitTestBase { apiResult = submitExtensionJob(TEST_EXTENSION, TEST_JOB, null, null); assertStatus(apiResult); + try { + submitExtensionJob(TEST_EXTENSION, TEST_JOB_DUPLICATE, null, null); + } catch (FalconWebException e) { + Assert.assertEquals(((APIResult) e.getResponse().getEntity()).getMessage(), "Entity:sample is part another " + + "extension job:testJob"); + } ExtensionJobList extensionJobList = getExtensionJobs(TEST_EXTENSION, null, null); Assert.assertEquals(extensionJobList.getNumJobs(), 1);
