Repository: falcon Updated Branches: refs/heads/master 8f32de0f3 -> 1685541f7
FALCON-944 Parallel update APIs create 2 coords. Contributed by Suhas Vasu Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/1685541f Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/1685541f Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/1685541f Branch: refs/heads/master Commit: 1685541f705b85cf348582f17a088db9d6c91ce0 Parents: 8f32de0 Author: Suhas Vasu <[email protected]> Authored: Tue Feb 17 17:05:34 2015 +0530 Committer: Suhas Vasu <[email protected]> Committed: Tue Feb 17 17:05:34 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../apache/falcon/entity/lock/MemoryLocks.java | 79 ++++++++++++++++++++ .../falcon/entity/lock/MemoryLocksTest.java | 65 ++++++++++++++++ .../falcon/resource/AbstractEntityManager.java | 48 +++++++++++- .../AbstractSchedulableEntityManager.java | 23 +++++- .../java/org/apache/falcon/cli/FalconCLIIT.java | 1 + .../falcon/resource/EntityManagerJerseyIT.java | 74 ++++++++++++++++++ 7 files changed, 287 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/1685541f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 57061d3..0123fae 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -67,6 +67,8 @@ Trunk (Unreleased) Seetharam) OPTIMIZATIONS + FALCON-944 Parallel update APIs create 2 coords (Suhas Vasu) + FALCON-943 process update copying user lib is very slow. (Shwetha G S) FALCON-419 Update deprecated HCatalog API to use Hive Metastore API. http://git-wip-us.apache.org/repos/asf/falcon/blob/1685541f/common/src/main/java/org/apache/falcon/entity/lock/MemoryLocks.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/lock/MemoryLocks.java b/common/src/main/java/org/apache/falcon/entity/lock/MemoryLocks.java new file mode 100644 index 0000000..a91b5d2 --- /dev/null +++ b/common/src/main/java/org/apache/falcon/entity/lock/MemoryLocks.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.entity.lock; + +import org.apache.falcon.entity.v0.Entity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ConcurrentHashMap; + +/** + * In memory resource locking that provides lock capabilities. + */ +public final class MemoryLocks { + private static final Logger LOG = LoggerFactory.getLogger(MemoryLocks.class); + private static ConcurrentHashMap<String, Boolean> locks = new ConcurrentHashMap<String, Boolean>(); + + private static MemoryLocks instance = new MemoryLocks(); + + private MemoryLocks() { + } + + public static MemoryLocks getInstance() { + return instance; + } + + /** + * Obtain a lock for an entity. + * + * @param entity entity object. + * @return the lock token for the resource, or <code>null</code> if the lock could not be obtained. + */ + public boolean acquireLock(Entity entity) { + boolean lockObtained = false; + String entityName = getLockKey(entity); + + Boolean putResponse = locks.putIfAbsent(entityName, true); + if (putResponse == null || !putResponse) { + LOG.info("Lock obtained for schedule/update of {} by {}", + entity.toShortString(), Thread.currentThread().getName()); + lockObtained = true; + } + return lockObtained; + } + + /** + * Release the lock for an entity. + * + * @param entity entity object. + */ + public void releaseLock(Entity entity) { + String entityName = getLockKey(entity); + + locks.remove(entityName); + LOG.info("Successfully released lock for {} by {}", + entity.toShortString(), Thread.currentThread().getName()); + } + + private String getLockKey(Entity entity) { + return entity.getEntityType().toString() + "." + entity.getName(); + } + + +} http://git-wip-us.apache.org/repos/asf/falcon/blob/1685541f/common/src/test/java/org/apache/falcon/entity/lock/MemoryLocksTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/lock/MemoryLocksTest.java b/common/src/test/java/org/apache/falcon/entity/lock/MemoryLocksTest.java new file mode 100644 index 0000000..5050b73 --- /dev/null +++ b/common/src/test/java/org/apache/falcon/entity/lock/MemoryLocksTest.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.entity.lock; + +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.process.*; +import org.apache.falcon.entity.v0.process.Process; +import org.testng.Assert; +import org.testng.annotations.Test; + +/** + * Test for Memory Locking mechanism used for schedule/update of entities. + */ + +public class MemoryLocksTest { + private static final String FEED_XML = "/config/feed/feed-0.1.xml"; + private static final String PROCESS_XML = "/config/process/process-0.1.xml"; + + @Test + public void testSuccessfulMemoryLockAcquisition() throws Exception { + MemoryLocks memoryLocks = MemoryLocks.getInstance(); + Entity feed = (Entity) EntityType.FEED.getUnmarshaller().unmarshal(this.getClass().getResource(FEED_XML)); + Assert.assertEquals(memoryLocks.acquireLock(feed), true); + memoryLocks.releaseLock(feed); + } + + @Test + public void testUnsuccessfulMemoryLockAcquisition() throws Exception { + MemoryLocks memoryLocks = MemoryLocks.getInstance(); + Entity feed = (Entity) EntityType.FEED.getUnmarshaller().unmarshal(this.getClass().getResource(FEED_XML)); + Assert.assertEquals(memoryLocks.acquireLock(feed), true); + Assert.assertEquals(memoryLocks.acquireLock(feed), false); + memoryLocks.releaseLock(feed); + } + + @Test + public void testDuplicateEntityNameLockAcquisition() throws Exception { + MemoryLocks memoryLocks = MemoryLocks.getInstance(); + //In case both feed & process have identical names, they shouldn't clash during updates + Entity feed = (Entity) EntityType.FEED.getUnmarshaller().unmarshal(this.getClass().getResource(FEED_XML)); + org.apache.falcon.entity.v0.process.Process process = (Process) EntityType.PROCESS.getUnmarshaller(). + unmarshal(this.getClass().getResource(PROCESS_XML)); + process.setName(feed.getName()); + Assert.assertEquals(memoryLocks.acquireLock(feed), true); + Assert.assertEquals(memoryLocks.acquireLock(process), true); + memoryLocks.releaseLock(feed); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/1685541f/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java index caa9a74..424bada 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java @@ -27,6 +27,7 @@ import org.apache.falcon.FalconWebException; import org.apache.falcon.Pair; import org.apache.falcon.entity.EntityNotRegisteredException; import org.apache.falcon.entity.EntityUtil; +import org.apache.falcon.entity.lock.MemoryLocks; import org.apache.falcon.entity.parser.EntityParser; import org.apache.falcon.entity.parser.EntityParserFactory; import org.apache.falcon.entity.parser.ValidationException; @@ -72,6 +73,7 @@ import java.util.Set; */ public abstract class AbstractEntityManager { private static final Logger LOG = LoggerFactory.getLogger(AbstractEntityManager.class); + private static MemoryLocks memoryLocks = MemoryLocks.getInstance(); protected static final int XML_DEBUG_LEN = 10 * 1024; protected static final String DEFAULT_NUM_RESULTS = "10"; @@ -265,10 +267,9 @@ public abstract class AbstractEntityManager { } } - // Parallel update can get very clumsy if two feeds are updated which - // are referred by a single process. Sequencing them. - public synchronized APIResult update(HttpServletRequest request, String type, String entityName, String colo) { + public APIResult update(HttpServletRequest request, String type, String entityName, String colo) { checkColo(colo); + List<Entity> tokenList = null; try { EntityType entityType = EntityType.getEnum(type); Entity oldEntity = EntityUtil.getEntity(type, entityName); @@ -280,6 +281,8 @@ public abstract class AbstractEntityManager { validateUpdate(oldEntity, newEntity); configStore.initiateUpdate(newEntity); + tokenList = obtainUpdateEntityLocks(oldEntity); + StringBuilder result = new StringBuilder("Updated successfully"); //Update in workflow engine if (!DeploymentUtil.isPrism()) { @@ -304,9 +307,48 @@ public abstract class AbstractEntityManager { throw FalconWebException.newException(e, Response.Status.BAD_REQUEST); } finally { ConfigurationStore.get().cleanupUpdateInit(); + releaseUpdateEntityLocks(entityName, tokenList); } } + private List<Entity> obtainUpdateEntityLocks(Entity entity) + throws FalconException { + List<Entity> tokenList = new ArrayList<Entity>(); + + //first obtain lock for the entity for which update is issued. + if (memoryLocks.acquireLock(entity)) { + tokenList.add(entity); + } else { + throw new FalconException("Looks like an update command is already issued for " + entity.toShortString()); + } + + //now obtain locks for all dependent entities. + Set<Entity> affectedEntities = EntityGraph.get().getDependents(entity); + for (Entity e : affectedEntities) { + if (memoryLocks.acquireLock(e)) { + tokenList.add(e); + } else { + LOG.error("Error while trying to acquire lock for {}. Releasing already obtained locks", + e.toShortString()); + throw new FalconException("There are multiple update commands running for dependent entity " + + e.toShortString()); + } + } + return tokenList; + } + + private void releaseUpdateEntityLocks(String entityName, List<Entity> tokenList) { + if (tokenList != null && !tokenList.isEmpty()) { + for (Entity entity : tokenList) { + memoryLocks.releaseLock(entity); + } + LOG.info("All update locks released for {}", entityName); + } else { + LOG.info("No locks to release for " + entityName); + } + + } + private void validateUpdate(Entity oldEntity, Entity newEntity) throws FalconException { if (oldEntity.getEntityType() != newEntity.getEntityType() || !oldEntity.equals(newEntity)) { throw new FalconException( http://git-wip-us.apache.org/repos/asf/falcon/blob/1685541f/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java index adfef35..f7c2f61 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java @@ -23,6 +23,7 @@ import org.apache.falcon.FalconException; import org.apache.falcon.FalconWebException; import org.apache.falcon.Pair; import org.apache.falcon.entity.EntityUtil; +import org.apache.falcon.entity.lock.MemoryLocks; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.SchemaHelper; @@ -47,6 +48,7 @@ import java.util.*; public abstract class AbstractSchedulableEntityManager extends AbstractInstanceManager { private static final Logger LOG = LoggerFactory.getLogger(AbstractSchedulableEntityManager.class); + private static MemoryLocks memoryLocks = MemoryLocks.getInstance(); /** * Schedules an submitted entity immediately. @@ -73,8 +75,25 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM throws FalconException, AuthorizationException { checkSchedulableEntity(type); - Entity entityObj = EntityUtil.getEntity(type, entity); - getWorkflowEngine().schedule(entityObj); + Entity entityObj = null; + try { + entityObj = EntityUtil.getEntity(type, entity); + //first acquire lock on entity before scheduling + if (!memoryLocks.acquireLock(entityObj)) { + throw new FalconException("Looks like an schedule/update command is already running for " + + entityObj.toShortString()); + } + LOG.info("Memory lock obtained for {} by {}", entityObj.toShortString(), Thread.currentThread().getName()); + getWorkflowEngine().schedule(entityObj); + } catch (Exception e) { + throw new FalconException("Entity schedule failed for " + type + ": " + entity, e); + } finally { + if (entityObj != null) { + memoryLocks.releaseLock(entityObj); + LOG.info("Memory lock released for {}", entityObj.toShortString()); + } + } + } /** http://git-wip-us.apache.org/repos/asf/falcon/blob/1685541f/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java index 7512302..d46f112 100644 --- a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java +++ b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java @@ -116,6 +116,7 @@ public class FalconCLIIT { filePath = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay); Assert.assertEquals(executeWithURL("entity -submitAndSchedule -type process -file " + filePath), 0); + OozieTestUtils.waitForProcessWFtoStart(context); Assert.assertEquals(executeWithURL("entity -update -name " + overlay.get("processName") + " -type process -file " + filePath), 0); http://git-wip-us.apache.org/repos/asf/falcon/blob/1685541f/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java index c6fd420..280253d 100644 --- a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java +++ b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java @@ -66,6 +66,10 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.TimeZone; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.regex.Pattern; /** @@ -812,6 +816,49 @@ public class EntityManagerJerseyIT { } } + @Test + public void testDuplicateUpdateCommands() throws Exception { + TestContext context = newContext(); + context.scheduleProcess(); + OozieTestUtils.waitForBundleStart(context, Job.Status.RUNNING); + List<BundleJob> bundles = OozieTestUtils.getBundles(context); + Assert.assertEquals(bundles.size(), 1); + + Process process = (Process) getDefinition(context, EntityType.PROCESS, context.processName); + + String feed3 = "f3" + System.currentTimeMillis(); + Map<String, String> overlay = new HashMap<String, String>(); + overlay.put("inputFeedName", feed3); + overlay.put("cluster", context.clusterName); + overlay.put("user", System.getProperty("user.name")); + ClientResponse response = context.submitToFalcon(TestContext.FEED_TEMPLATE1, overlay, EntityType.FEED); + context.assertSuccessful(response); + + Input input = new Input(); + input.setFeed(feed3); + input.setName("inputData2"); + input.setStart("today(20,0)"); + input.setEnd("today(20,20)"); + process.getInputs().getInputs().add(input); + + updateEndtime(process); + Date endTime = getEndTime(); + ExecutorService service = Executors.newSingleThreadExecutor(); + Future<ClientResponse> future = service.submit(new UpdateCommand(context, process, endTime)); + response = update(context, process, endTime); + ClientResponse duplicateUpdateThreadResponse = future.get(); + + // since there are duplicate threads for updates, there is no guarantee which request will succeed + if (response.getStatus() == Response.Status.OK.getStatusCode()) { + context.assertSuccessful(response); + context.assertFailure(duplicateUpdateThreadResponse); + } else { + context.assertFailure(response); + context.assertSuccessful(duplicateUpdateThreadResponse); + } + + } + public Date getEndTime() { Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC")); cal.add(Calendar.DAY_OF_MONTH, 1); @@ -821,4 +868,31 @@ public class EntityManagerJerseyIT { cal.set(Calendar.MILLISECOND, 0); return cal.getTime(); } + + class UpdateCommand implements Callable<ClientResponse> { + private TestContext context; + private Process process; + private Date endTime; + + public TestContext getContext() { + return context; + } + public Process getProcess() { + return process; + } + public Date getEndTime() { + return endTime; + } + + public UpdateCommand(TestContext context, Process process, Date endTime) { + this.context = context; + this.process = process; + this.endTime = endTime; + } + + @Override + public ClientResponse call() throws Exception { + return update(context, process, endTime); + } + } }
