Repository: falcon Updated Branches: refs/heads/master 3ec8d9534 -> bd0028458
FALCON-1410 Entity submit fails when multiple threads try submitting same definition. Contributed by Sandeep Samudrala. Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/bd002845 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/bd002845 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/bd002845 Branch: refs/heads/master Commit: bd002845867f886c75b773daeea5952d06fdef76 Parents: 3ec8d95 Author: Ajay Yadava <[email protected]> Authored: Mon Sep 7 13:32:17 2015 +0530 Committer: Ajay Yadava <[email protected]> Committed: Mon Sep 7 13:32:17 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../falcon/resource/AbstractEntityManager.java | 44 +++++--- .../falcon/resource/EntityManagerJerseyIT.java | 102 +++++++++++++++++-- .../org/apache/falcon/resource/TestContext.java | 7 ++ 4 files changed, 133 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/bd002845/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 8ac2cd1..88d0f64 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -12,6 +12,8 @@ Trunk (Unreleased) OPTIMIZATIONS BUG FIXES + FALCON-1410 Entity submit fails when multiple threads try submitting same definition(Sandeep Samudrala via Ajay Yadava) + FALCON-1429 Fix Falcon monitoring, alert, audit and monitoring plugins by fixing aspectj handling(Venkat Ranganathan via Ajay Yadava) FALCON-1416 Add ACL (if missing) during touch(Narayan Periwal via Ajay Yadava) http://git-wip-us.apache.org/repos/asf/falcon/blob/bd002845/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java index 03efa20..f8f36b2 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java @@ -244,6 +244,7 @@ public abstract class AbstractEntityManager { */ public APIResult delete(HttpServletRequest request, String type, String entity, String colo) { checkColo(colo); + List<Entity> tokenList = null; try { EntityType entityType = EntityType.getEnum(type); String removedFromEngine = ""; @@ -251,6 +252,7 @@ public abstract class AbstractEntityManager { Entity entityObj = EntityUtil.getEntity(type, entity); canRemove(entityObj); + tokenList = obtainEntityLocks(entityObj, "delete"); if (entityType.isSchedulable() && !DeploymentUtil.isPrism()) { getWorkflowEngine().delete(entityObj); removedFromEngine = "(KILLED in ENGINE)"; @@ -267,6 +269,8 @@ public abstract class AbstractEntityManager { } catch (Throwable e) { LOG.error("Unable to reach workflow engine for deletion or deletion failed", e); throw FalconWebException.newException(e, Response.Status.BAD_REQUEST); + } finally { + releaseEntityLocks(entity, tokenList); } } @@ -285,7 +289,7 @@ public abstract class AbstractEntityManager { validateUpdate(oldEntity, newEntity); configStore.initiateUpdate(newEntity); - tokenList = obtainUpdateEntityLocks(oldEntity); + tokenList = obtainEntityLocks(oldEntity, "update"); StringBuilder result = new StringBuilder("Updated successfully"); //Update in workflow engine @@ -311,11 +315,11 @@ public abstract class AbstractEntityManager { throw FalconWebException.newException(e, Response.Status.BAD_REQUEST); } finally { ConfigurationStore.get().cleanupUpdateInit(); - releaseUpdateEntityLocks(entityName, tokenList); + releaseEntityLocks(entityName, tokenList); } } - private List<Entity> obtainUpdateEntityLocks(Entity entity) + private List<Entity> obtainEntityLocks(Entity entity, String command) throws FalconException { List<Entity> tokenList = new ArrayList<Entity>(); @@ -323,25 +327,28 @@ public abstract class AbstractEntityManager { if (memoryLocks.acquireLock(entity)) { tokenList.add(entity); } else { - throw new FalconException("Looks like an update command is already issued for " + entity.toShortString()); + throw new FalconException(command + " command is already issued for " + entity.toShortString()); } - //now obtain locks for all dependent entities. + //now obtain locks for all dependent entities if any. Set<Entity> affectedEntities = EntityGraph.get().getDependents(entity); - for (Entity e : affectedEntities) { - if (memoryLocks.acquireLock(e)) { - tokenList.add(e); - } else { - LOG.error("Error while trying to acquire lock for {}. Releasing already obtained locks", - e.toShortString()); - throw new FalconException("There are multiple update commands running for dependent entity " - + e.toShortString()); + if (affectedEntities != null) { + for (Entity e : affectedEntities) { + if (memoryLocks.acquireLock(e)) { + tokenList.add(e); + LOG.debug("{} on entity {} has acquired lock on {}", command, entity, e); + } else { + LOG.error("Error while trying to acquire lock for {}. Releasing already obtained locks", + e.toShortString()); + throw new FalconException("There are multiple update commands running for dependent entity " + + e.toShortString()); + } } } return tokenList; } - private void releaseUpdateEntityLocks(String entityName, List<Entity> tokenList) { + private void releaseEntityLocks(String entityName, List<Entity> tokenList) { if (tokenList != null && !tokenList.isEmpty()) { for (Entity entity : tokenList) { memoryLocks.releaseLock(entity); @@ -391,14 +398,21 @@ public abstract class AbstractEntityManager { } } - protected synchronized Entity submitInternal(HttpServletRequest request, String type) + protected Entity submitInternal(HttpServletRequest request, String type) throws IOException, FalconException { EntityType entityType = EntityType.getEnum(type); Entity entity = deserializeEntity(request, entityType); + List<Entity> tokenList = null; // KLUDGE - Until ACL is mandated entity passed should be decorated for equals check to pass decorateEntityWithACL(entity); + try { + tokenList = obtainEntityLocks(entity, "submit"); + }finally { + ConfigurationStore.get().cleanupUpdateInit(); + releaseEntityLocks(entity.getName(), tokenList); + } Entity existingEntity = configStore.get(entityType, entity.getName()); if (existingEntity != null) { if (EntityUtil.equals(existingEntity, entity)) { http://git-wip-us.apache.org/repos/asf/falcon/blob/bd002845/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java index f0cee61..bcd3bd5 100644 --- a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java +++ b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java @@ -574,6 +574,55 @@ public class EntityManagerJerseyIT { context.assertSuccessful(response); } + @Test + public void testDuplicateSubmitCommands() throws Exception { + TestContext context = newContext(); + Map<String, String> overlay = context.getUniqueOverlay(); + + ExecutorService service = Executors.newSingleThreadExecutor(); + ExecutorService duplicateService = Executors.newSingleThreadExecutor(); + + Future<ClientResponse> future = service.submit(new SubmitCommand(context, overlay)); + Future<ClientResponse> duplicateFuture = duplicateService.submit(new SubmitCommand(context, overlay)); + + ClientResponse response = future.get(); + ClientResponse duplicateSubmitThreadResponse = duplicateFuture.get(); + + // since there are duplicate threads for submits, there is no guarantee which request will succeed. + testDuplicateCommandsResponse(context, response, duplicateSubmitThreadResponse); + } + + @Test + public void testDuplicateDeleteCommands() throws Exception { + TestContext context = newContext(); + Map<String, String> overlay = context.getUniqueOverlay(); + context.submitToFalcon(TestContext.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER); + + ExecutorService service = Executors.newSingleThreadExecutor(); + ExecutorService duplicateService = Executors.newSingleThreadExecutor(); + + Future<ClientResponse> future = service.submit(new DeleteCommand(context, overlay.get("cluster"), "cluster")); + Future<ClientResponse> duplicateFuture = duplicateService.submit(new DeleteCommand(context, + overlay.get("cluster"), "cluster")); + + ClientResponse response = future.get(); + ClientResponse duplicateSubmitThreadResponse = duplicateFuture.get(); + + // since there are duplicate threads for deletion, there is no guarantee which request will succeed. + testDuplicateCommandsResponse(context, response, duplicateSubmitThreadResponse); + } + + private void testDuplicateCommandsResponse(TestContext context, ClientResponse response, + ClientResponse duplicateSubmitThreadResponse) { + if (response.getStatus() == Response.Status.OK.getStatusCode()) { + context.assertSuccessful(response); + context.assertFailure(duplicateSubmitThreadResponse); + } else { + context.assertFailure(response); + context.assertSuccessful(duplicateSubmitThreadResponse); + } + } + public void testProcesssScheduleAndDelete() throws Exception { TestContext context = newContext(); ClientResponse clientResponse; @@ -886,13 +935,7 @@ public class EntityManagerJerseyIT { ClientResponse duplicateUpdateThreadResponse = future.get(); // since there are duplicate threads for updates, there is no guarantee which request will succeed - if (response.getStatus() == Response.Status.OK.getStatusCode()) { - context.assertSuccessful(response); - context.assertFailure(duplicateUpdateThreadResponse); - } else { - context.assertFailure(response); - context.assertSuccessful(duplicateUpdateThreadResponse); - } + testDuplicateCommandsResponse(context, response, duplicateUpdateThreadResponse); } @@ -932,4 +975,49 @@ public class EntityManagerJerseyIT { return update(context, process, endTime, false); } } + + class SubmitCommand implements Callable<ClientResponse> { + private Map<String, String> overlay; + private TestContext context; + + public TestContext getContext() { + return context; + } + + public Map<String, String> getOverlay() { + return overlay; + } + + public SubmitCommand(TestContext context, Map<String, String> overlay) { + this.context = context; + this.overlay = overlay; + } + + @Override + public ClientResponse call() throws Exception { + return context.submitToFalcon(TestContext.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER); + } + } + + class DeleteCommand implements Callable<ClientResponse> { + private TestContext context; + private String entityName; + private String entityType; + + public TestContext getContext() { + return context; + } + + public DeleteCommand(TestContext context, String entityName, String entityType) { + this.context = context; + this.entityName = entityName; + this.entityType = entityType; + } + + @Override + public ClientResponse call() throws Exception { + return context.deleteFromFalcon(entityName, entityType); + } + } + } http://git-wip-us.apache.org/repos/asf/falcon/blob/bd002845/webapp/src/test/java/org/apache/falcon/resource/TestContext.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java index 4a25b88..54671fb 100644 --- a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java +++ b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java @@ -384,6 +384,13 @@ public class TestContext { .post(ClientResponse.class, rawlogStream); } + public ClientResponse deleteFromFalcon(String entityName, String entityType) throws IOException{ + return this.service.path("api/entities/delete/" + entityType + "/" + entityName.toLowerCase()) + .header("Cookie", getAuthenticationToken()) + .accept(MediaType.TEXT_XML) + .delete(ClientResponse.class); + } + public void assertStatus(ClientResponse clientResponse, APIResult.Status status) { String response = clientResponse.getEntity(String.class); try {
