Repository: nifi Updated Branches: refs/heads/master 347b281b2 -> 270944ec6
http://git-wip-us.apache.org/repos/asf/nifi/blob/270944ec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardTemplateDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardTemplateDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardTemplateDAO.java index 3e6d674..42b5da3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardTemplateDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardTemplateDAO.java @@ -16,12 +16,13 @@ */ package org.apache.nifi.web.dao.impl; -import java.io.IOException; import java.util.HashSet; import java.util.Set; -import javax.ws.rs.WebApplicationException; + +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.Template; +import org.apache.nifi.controller.TemplateUtils; import org.apache.nifi.controller.exception.ProcessorInstantiationException; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.web.NiFiCoreException; @@ -30,7 +31,6 @@ import org.apache.nifi.web.api.dto.FlowSnippetDTO; import org.apache.nifi.web.api.dto.TemplateDTO; import org.apache.nifi.web.dao.TemplateDAO; import org.apache.nifi.web.util.SnippetUtils; -import org.apache.commons.lang3.StringUtils; /** * @@ -42,7 +42,7 @@ public class StandardTemplateDAO extends ComponentDAO implements TemplateDAO { private Template locateTemplate(String templateId) { // get the template - Template template = flowController.getTemplate(templateId); + Template template = flowController.getGroup(flowController.getRootGroupId()).findTemplate(templateId); // ensure the template exists if (template == null) { @@ -53,21 +53,22 @@ public class StandardTemplateDAO extends ComponentDAO implements TemplateDAO { } @Override - public Template createTemplate(TemplateDTO templateDTO) { - try { - return flowController.addTemplate(templateDTO); - } catch (IOException ioe) { - throw new WebApplicationException(new IOException("Unable to save specified template: " + ioe.getMessage())); + public Template createTemplate(TemplateDTO templateDTO, String groupId) { + final ProcessGroup processGroup = flowController.getGroup(groupId); + if (processGroup == null) { + throw new ResourceNotFoundException("Could not find Process Group with ID " + groupId); } + + TemplateUtils.scrubTemplate(templateDTO); + final Template template = new Template(templateDTO); + processGroup.addTemplate(template); + + return template; } @Override - public Template importTemplate(TemplateDTO templateDTO) { - try { - return flowController.importTemplate(templateDTO); - } catch (IOException ioe) { - throw new WebApplicationException(new IOException("Unable to import specified template: " + ioe.getMessage())); - } + public Template importTemplate(TemplateDTO templateDTO, String groupId) { + return createTemplate(templateDTO, groupId); } @Override @@ -75,7 +76,7 @@ public class StandardTemplateDAO extends ComponentDAO implements TemplateDAO { ProcessGroup group = locateProcessGroup(flowController, groupId); // get the template id and find the template - Template template = flowController.getTemplate(templateId); + Template template = getTemplate(templateId); // ensure the template could be found if (template == null) { @@ -103,14 +104,10 @@ public class StandardTemplateDAO extends ComponentDAO implements TemplateDAO { @Override public void deleteTemplate(String templateId) { // ensure the template exists - locateTemplate(templateId); + final Template template = locateTemplate(templateId); - try { - // remove the specified template - flowController.removeTemplate(templateId); - } catch (final IOException ioe) { - throw new WebApplicationException(new IOException("Unable to remove specified template: " + ioe.getMessage())); - } + // remove the specified template + template.getProcessGroup().removeTemplate(template); } @Override @@ -121,7 +118,7 @@ public class StandardTemplateDAO extends ComponentDAO implements TemplateDAO { @Override public Set<Template> getTemplates() { final Set<Template> templates = new HashSet<>(); - for (final Template template : flowController.getTemplates()) { + for (final Template template : flowController.getGroup(flowController.getRootGroupId()).findAllTemplates()) { templates.add(template); } return templates; http://git-wip-us.apache.org/repos/asf/nifi/blob/270944ec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/NaiveRevisionManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/NaiveRevisionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/NaiveRevisionManager.java index 768b5f2..77048d1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/NaiveRevisionManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/NaiveRevisionManager.java @@ -62,7 +62,7 @@ public class NaiveRevisionManager implements RevisionManager { } /** - * Constructs a new NaiveRevisionManager that uses the given number of Nanoseconds as the expiration time + * Constructs a new NaiveRevisionManager that uses the given amount of time as the expiration time * for a Revision Claims * * @param claimExpiration how long a Revision Claim should last @@ -92,7 +92,7 @@ public class NaiveRevisionManager implements RevisionManager { final RevisionLock revisionLock = getRevisionLock(revision); final ClaimResult claimResult = revisionLock.requestClaim(revision); - logger.debug("Obtained Revision Claim for {}", revision); + logger.trace("Obtained Revision Claim for {}", revision); if (claimResult.isSuccessful()) { locksObtained.add(revisionLock); @@ -107,7 +107,7 @@ public class NaiveRevisionManager implements RevisionManager { // if we got a Revision Claim on each Revision, return a successful result if (locksObtained.size() == revisionList.size()) { - logger.debug("Obtained Revision Claim for all components"); + logger.trace("Obtained Revision Claim for all components"); // it's possible that obtaining the locks took a while if we are obtaining // many. Renew the timestamp to ensure that the first locks obtained don't @@ -123,6 +123,7 @@ public class NaiveRevisionManager implements RevisionManager { // We failed to obtain all of the Revision Claims necessary. Since // we need this call to atomically obtain all or nothing, we have to now // release the locks that we did obtain. + logger.debug("Failed to obtain all necessary Revisions; releasing claims for {}", locksObtained); for (final RevisionLock revisionLock : locksObtained) { revisionLock.releaseClaim(); } @@ -158,7 +159,7 @@ public class NaiveRevisionManager implements RevisionManager { final boolean verified = revisionLock.requestWriteLock(revision); if (verified) { - logger.debug("Verified Revision Claim for {}", revision); + logger.trace("Verified Revision Claim for {}", revision); successCount++; } else { logger.debug("Failed to verify Revision Claim for {}", revision); @@ -168,7 +169,7 @@ public class NaiveRevisionManager implements RevisionManager { } if (successCount == revisionList.size()) { - logger.debug("Successfully verified Revision Claim for all revisions"); + logger.debug("Successfully verified Revision Claim for all revisions {}", claim); final T taskValue = task.performTask(); for (final Revision revision : revisionList) { @@ -206,7 +207,7 @@ public class NaiveRevisionManager implements RevisionManager { final boolean verified = revisionLock.requestWriteLock(revision); if (verified) { - logger.debug("Verified Revision Claim for {}", revision); + logger.trace("Verified Revision Claim for {}", revision); successCount++; } else { logger.debug("Failed to verify Revision Claim for {}", revision); @@ -245,7 +246,14 @@ public class NaiveRevisionManager implements RevisionManager { } for (final Revision revision : revisionList) { - getRevisionLock(revision).unlock(revision, updatedRevisions.get(revision), modifier); + final Revision updatedRevision = updatedRevisions.get(revision); + getRevisionLock(revision).unlock(revision, updatedRevision, modifier); + + if (updatedRevision.getVersion() != revision.getVersion()) { + logger.debug("Unlocked Revision {} and updated associated Version to {}", revision, updatedRevision.getVersion()); + } else { + logger.debug("Unlocked Revision {} without updating Version", revision); + } } } @@ -281,6 +289,20 @@ public class NaiveRevisionManager implements RevisionManager { } @Override + public boolean cancelClaim(String componentId) { + logger.debug("Attempting to cancel claim for component {}", componentId); + final Revision revision = new Revision(0L, null, componentId); + + final RevisionLock revisionLock = getRevisionLock(revision); + if (revisionLock == null) { + logger.debug("No Revision Lock exists for Component {} - there is no claim to cancel", componentId); + return false; + } + + return revisionLock.releaseClaimIfCurrentThread(); + } + + @Override public <T> T get(final String componentId, final ReadOnlyRevisionCallback<T> callback) { final RevisionLock revisionLock = revisionLockMap.computeIfAbsent(componentId, id -> new RevisionLock(new FlowModification(new Revision(0L, null, id), null), claimExpirationNanos)); logger.debug("Attempting to obtain read lock for {}", revisionLock.getRevision()); @@ -301,15 +323,16 @@ public class NaiveRevisionManager implements RevisionManager { sortedIds.sort(Collator.getInstance()); final Stack<RevisionLock> revisionLocks = new Stack<>(); + logger.debug("Will attempt to obtain read locks for components {}", componentIds); for (final String componentId : sortedIds) { final RevisionLock revisionLock = revisionLockMap.computeIfAbsent(componentId, id -> new RevisionLock(new FlowModification(new Revision(0L, null, id), null), claimExpirationNanos)); - logger.debug("Attempting to obtain read lock for {}", revisionLock.getRevision()); + logger.trace("Attempting to obtain read lock for {}", revisionLock.getRevision()); revisionLock.acquireReadLock(); revisionLocks.push(revisionLock); - logger.debug("Obtained read lock for {}", revisionLock.getRevision()); + logger.trace("Obtained read lock for {}", revisionLock.getRevision()); } - logger.debug("Obtained read lock for all necessary components; calling call-back"); + logger.debug("Obtained read lock for all necessary components {}; calling call-back", componentIds); try { return callback.get(); } finally { @@ -327,7 +350,6 @@ public class NaiveRevisionManager implements RevisionManager { return; } - revisionLock.releaseClaim(); } @@ -390,38 +412,44 @@ public class NaiveRevisionManager implements RevisionManager { Objects.requireNonNull(proposedRevision); threadLock.writeLock().lock(); - if (getRevision().equals(proposedRevision)) { - final LockStamp stamp = lockStamp.get(); - - if (stamp == null) { - logger.debug("Attempted to obtain write lock for {} but no Claim was obtained", proposedRevision); - throw new IllegalStateException("No claim has been obtained for " + proposedRevision + " so cannot lock the component for modification"); - } + boolean releaseLock = true; + try { + if (getRevision().equals(proposedRevision)) { + final LockStamp stamp = lockStamp.get(); - if (stamp.getClientId() == null || stamp.getClientId().equals(proposedRevision.getClientId())) { - // TODO - Must make sure that we don't have an expired stamp if it is the result of another - // operation taking a long time. I.e., Client A fires off two requests for Component X. If the - // first one takes 2 minutes to complete, it should not result in the second request getting - // rejected. I.e., we want to ensure that if the request is received before the Claim expired, - // that we do not throw an ExpiredRevisionClaimException. Expiration of the Revision is intended - // only to avoid the case where a node obtains a Claim and then the node is lost or otherwise does - // not fulfill the second phase of the two-phase commit. - // We may need a Queue of updates (queue would need to be bounded, with a request getting - // rejected if queue is full). - if (stamp.isExpired()) { - threadLock.writeLock().unlock(); - throw new ExpiredRevisionClaimException("Claim for " + proposedRevision + " has expired"); + if (stamp == null) { + final IllegalStateException ise = new IllegalStateException("No claim has been obtained for " + proposedRevision + " so cannot lock the component for modification"); + logger.debug("Attempted to obtain write lock for {} but no Claim was obtained; throwing IllegalStateException", proposedRevision, ise); + throw ise; } - // Intentionally leave the thread lock in a locked state! - return true; - } else { - logger.debug("Failed to verify {} because the Client ID was not the same as the Lock Stamp's Client ID (Lock Stamp was {})", proposedRevision, stamp); + if (stamp.getClientId() == null || stamp.getClientId().equals(proposedRevision.getClientId())) { + // TODO - Must make sure that we don't have an expired stamp if it is the result of another + // operation taking a long time. I.e., Client A fires off two requests for Component X. If the + // first one takes 2 minutes to complete, it should not result in the second request getting + // rejected. I.e., we want to ensure that if the request is received before the Claim expired, + // that we do not throw an ExpiredRevisionClaimException. Expiration of the Revision is intended + // only to avoid the case where a node obtains a Claim and then the node is lost or otherwise does + // not fulfill the second phase of the two-phase commit. + // We may need a Queue of updates (queue would need to be bounded, with a request getting + // rejected if queue is full). + if (stamp.isExpired()) { + throw new ExpiredRevisionClaimException("Claim for " + proposedRevision + " has expired"); + } + + // Intentionally leave the thread lock in a locked state! + releaseLock = false; + return true; + } else { + logger.debug("Failed to verify {} because the Client ID was not the same as the Lock Stamp's Client ID (Lock Stamp was {})", proposedRevision, stamp); + } + } + } finally { + if (releaseLock) { + threadLock.writeLock().unlock(); } } - // revision is wrong. Unlock thread lock and return false - threadLock.writeLock().unlock(); return false; } @@ -471,6 +499,29 @@ public class NaiveRevisionManager implements RevisionManager { lockStamp.set(null); } + public boolean releaseClaimIfCurrentThread() { + threadLock.writeLock().lock(); + try { + final LockStamp stamp = lockStamp.get(); + if (stamp == null) { + logger.debug("Cannot cancel claim for {} because there is no claim held", getRevision()); + return false; + } + + if (stamp.isObtainedByCurrentThread()) { + releaseClaim(); + logger.debug("Successfully canceled claim for {}", getRevision()); + return true; + } + + logger.debug("Cannot cancel claim for {} because it is held by Thread {} and current Thread is {}", + getRevision(), stamp.obtainingThread, Thread.currentThread().getName()); + return false; + } finally { + threadLock.writeLock().unlock(); + } + } + /** * Releases the Revision Claim if and only if the current revision matches the proposed revision * @@ -544,10 +595,12 @@ public class NaiveRevisionManager implements RevisionManager { private static class LockStamp { private final String clientId; private final long expirationTimestamp; + private final Thread obtainingThread; public LockStamp(final String clientId, final long expirationTimestamp) { this.clientId = clientId; this.expirationTimestamp = expirationTimestamp; + this.obtainingThread = Thread.currentThread(); } public String getClientId() { @@ -558,6 +611,10 @@ public class NaiveRevisionManager implements RevisionManager { return System.nanoTime() > expirationTimestamp; } + public boolean isObtainedByCurrentThread() { + return obtainingThread == Thread.currentThread(); + } + @Override public String toString() { return clientId; http://git-wip-us.apache.org/repos/asf/nifi/blob/270944ec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/RevisionManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/RevisionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/RevisionManager.java index a8e2f39..c9b750f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/RevisionManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/RevisionManager.java @@ -178,4 +178,12 @@ public interface RevisionManager { * up-to-date */ boolean releaseClaim(RevisionClaim claim); + + /** + * Releases the claim on the revision for the given component if the claim was obtained by the calling thread + * + * @param componentId the ID of the component + * @return <code>true</code> if the claim was released, false otherwise + */ + boolean cancelClaim(String componentId); } http://git-wip-us.apache.org/repos/asf/nifi/blob/270944ec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/test/java/org/apache/nifi/web/revision/TestNaiveRevisionManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/test/java/org/apache/nifi/web/revision/TestNaiveRevisionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/test/java/org/apache/nifi/web/revision/TestNaiveRevisionManager.java index 8b9c129..1b43131 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/test/java/org/apache/nifi/web/revision/TestNaiveRevisionManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/test/java/org/apache/nifi/web/revision/TestNaiveRevisionManager.java @@ -27,7 +27,12 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -35,11 +40,8 @@ import org.apache.nifi.web.FlowModification; import org.apache.nifi.web.InvalidRevisionException; import org.apache.nifi.web.Revision; import org.junit.Assert; -import org.junit.Ignore; import org.junit.Test; - -@Ignore public class TestNaiveRevisionManager { private static final String CLIENT_1 = "client-1"; private static final String COMPONENT_1 = "component-1"; @@ -235,6 +237,31 @@ public class TestNaiveRevisionManager { } @Test(timeout = 10000) + public void testSameClientSameRevisionBlocks() throws InterruptedException, ExecutionException { + final RevisionManager revisionManager = new NaiveRevisionManager(2, TimeUnit.MINUTES); + final Revision firstRevision = new Revision(1L, CLIENT_1, COMPONENT_1); + final RevisionClaim firstClaim = revisionManager.requestClaim(firstRevision); + assertNotNull(firstClaim); + + final Revision secondRevision = new Revision(1L, CLIENT_1, COMPONENT_1); + final Runnable runnable = new Runnable() { + @Override + public void run() { + revisionManager.requestClaim(secondRevision); + } + }; + final ExecutorService exec = Executors.newFixedThreadPool(1); + final Future<?> future = exec.submit(runnable); + + try { + future.get(2, TimeUnit.SECONDS); + Assert.fail("Call to obtain claim on revision did not block when claim was already held"); + } catch (TimeoutException e) { + // Expected + } + } + + @Test(timeout = 10000) public void testDifferentClientDifferentRevisionsDoNotBlockEachOther() { final RevisionManager revisionManager = new NaiveRevisionManager(2, TimeUnit.MINUTES); final Revision firstRevision = new Revision(1L, CLIENT_1, COMPONENT_1); @@ -333,6 +360,38 @@ public class TestNaiveRevisionManager { } @Test(timeout = 10000) + public void testCancelClaimSameThread() { + final RevisionManager revisionManager = new NaiveRevisionManager(10, TimeUnit.MINUTES); + final Revision firstRevision = new Revision(1L, CLIENT_1, COMPONENT_1); + final RevisionClaim claim = revisionManager.requestClaim(firstRevision); + assertNotNull(claim); + + assertFalse(revisionManager.cancelClaim("component-2")); + assertTrue(revisionManager.cancelClaim(COMPONENT_1)); + } + + @Test(timeout = 10000) + public void testCancelClaimDifferentThread() throws InterruptedException { + final RevisionManager revisionManager = new NaiveRevisionManager(10, TimeUnit.MINUTES); + final Revision firstRevision = new Revision(1L, CLIENT_1, COMPONENT_1); + final RevisionClaim claim = revisionManager.requestClaim(firstRevision); + assertNotNull(claim); + + final Thread t = new Thread(new Runnable() { + @Override + public void run() { + assertFalse(revisionManager.cancelClaim("component-2")); + assertFalse(revisionManager.cancelClaim(COMPONENT_1)); + } + }); + t.setDaemon(true); + t.start(); + + Thread.sleep(1000L); + assertTrue(revisionManager.cancelClaim(COMPONENT_1)); + } + + @Test(timeout = 10000) public void testUpdateWithSomeWrongRevision() { final RevisionManager revisionManager = new NaiveRevisionManager(10, TimeUnit.MINUTES); final Revision component1V1 = new Revision(1L, CLIENT_1, COMPONENT_1);
