Repository: nifi Updated Branches: refs/heads/master 3db14f58f -> 04c41c065
http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/RevisionManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/RevisionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/RevisionManager.java new file mode 100644 index 0000000..0db6828 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/RevisionManager.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.revision; + +import java.util.Collection; +import java.util.Set; +import java.util.function.Supplier; + +import org.apache.nifi.web.InvalidRevisionException; +import org.apache.nifi.web.Revision; + + +/** + * <p> + * A Revision Manager provides the ability to prevent clients of the Web API from + * stepping on one another. This is done by providing claims and locking mechanisms + * for components individually. + * </p> + * + * <p> + * Clients that will modify a resource must do so using a two-phase commit. First, + * the client will issue a request that includes an HTTP Header of "X-NcmExpects". + * This indicates that the request will not actually be performed but rather that the + * node should validate that the request could in fact be performed. If all nodes respond + * with a 150-Continue response, then the second phase will commence. The second phase + * will consist of replicating the same request but without the "X-NcmExpects" header. + * </p> + * + * <p> + * When the first phase of the two-phase commit is processed, the Revision Manager should + * be used to obtain a Revision Claim by calling the {@link #requestClaim(Collection)} + * method. If a Claim is granted, then the request validation may continue. If the + * Claim is not granted, the request should fail and the second phase should not + * be performed. + * </p> + * + * <p> + * If the first phase of the above two-phase commit completes and all nodes indicate that the + * request may continue, this means that all nodes have provided granted a Claim on the Revisions + * that are relevant. This Claim will automatically expire after some time. This expiration + * means that if the node that issues the first phase never initiates the second phase (if the node + * dies or loses network connectivitiy, for instance), then the Revision Claim will expire and + * the Revision will remain unchanged. + * </p> + * + * <p> + * When the second phase begins, changes to the resource(s) must be made with the Revisions + * locked. This is accomplished by wrapping the logic in a {@link Runnable} and passing the Runnable, + * along with the {@link RevisionClaim} to the {@link #updateRevision(RevisionClaim, Supplier)} method. + * </p> + */ +public interface RevisionManager { + + /** + * <p> + * Attempts to obtain a Revision Claim for Revisions supplied. If a Revision Claim + * is granted, no other thread will be allowed to modify any of the components for + * which a Revision is claimed until either the Revision Claim is relinquished by + * calling the {@link #updateRevision(RevisionClaim, Runnable)} method or the + * {@link #releaseClaim(RevisionClaim)} method, or the Revision Claim expires. + * </p> + * + * <p> + * This method is atomic. If a Revision Claim is unable to be obtained for any of the + * provided Revisions, then no Revision Claim will be obtained. + * </p> + * + * @param revisions a Set of Revisions, each of which corresponds to a different + * component for which a Claim is to be acquired. + * + * @return the Revision Claim that was granted, if one was granted. + * + * @throws InvalidRevisionException if any of the Revisions provided is out-of-date. + */ + RevisionClaim requestClaim(Collection<Revision> revisions) throws InvalidRevisionException; + + /** + * <p> + * A convenience method that will call {@link #requestClaim(Collection)} by wrapping the given + * Revision in a Collection + * </p> + * + * @param revision the revision to request a claim for + * + * @return the Revision Claim that was granted, if one was granted. + * + * @throws InvalidRevisionException if any of the Revisions provided is out-of-date. + */ + RevisionClaim requestClaim(Revision revision) throws InvalidRevisionException; + + /** + * Returns the current Revision for the component with the given ID. If no Revision yet exists for the + * component with the given ID, one will be created with a Version of 0 and no Client ID. + * + * @param componentId + * @return the current Revision for the component with the given ID + */ + Revision getRevision(String componentId); + + /** + * Performs the given task without allowing the given Revision Claim to expire. Once this method + * returns or an Exception is thrown (with the Exception of ExpiredRevisionClaimException), + * the Revision may have been updated for each component that the RevisionClaim holds a Claim for. + * If an ExpiredRevisionClaimException is thrown, the Revisions claimed by RevisionClaim + * will not be updated. + * + * @param claim the Revision Claim that is responsible for holding a Claim on the Revisions for each component that is + * to be updated + * @param modifier the name of the entity that is modifying the resource + * @param task the task that is responsible for updating the components whose Revisions are claimed by the given + * RevisionClaim. The returned Revision set should include a Revision for each Revision that is the + * supplied Revision Claim. If there exists any Revision in the provided RevisionClaim that is not part + * of the RevisionClaim returned by the task, then the Revision is assumed to have not been modified. + * + * @return a RevisionUpdate object that represents the new version of the component that was updated + * + * @throws ExpiredRevisionClaimException if the Revision Claim has expired + */ + <T> RevisionUpdate<T> updateRevision(RevisionClaim claim, String modifier, UpdateRevisionTask<T> task) throws ExpiredRevisionClaimException; + + /** + * Performs the given task that is expected to remove a component from the flow. As a result, + * the Revision for the component referenced by the RevisionClaim will be removed. + * + * @param claim the Revision Claim that is responsible for holding a Claim on the Revisions for each component that is + * to be removed + * @param task the task that is responsible for deleting the components whose Revisions are claimed by the given RevisionClaim + * @return the value returned from the DeleteRevisionTask + * + * @throws ExpiredRevisionClaimException if the Revision Claim has expired + */ + <T> T deleteRevision(RevisionClaim claim, DeleteRevisionTask<T> task) throws ExpiredRevisionClaimException; + + /** + * Performs some operation to obtain an object of type T whose identifier is provided via + * the componentId argument, and return that object of type T while holding a Read Lock on + * the Revision for that object. Note that the callback provided must never modify the object + * with the given ID. + * + * @param callback the callback that is to be performed with the Read Lock held + * @return the value returned from the callback + */ + <T> T get(String componentId, ReadOnlyRevisionCallback<T> callback); + + /** + * Performs some operation to obtain an object of type T whose identifier is provided via + * the componentId argument, and return that object of type T while holding a Read Lock on + * the Revision for that object. Note that the callback provided must never modify the object + * with the given ID. + * + * @param callback the callback that is to be performed with the Read Lock held + * @return the value returned from the callback + */ + <T> T get(Set<String> componentId, Supplier<T> callback); + + /** + * Releases the claims on the revisions held by the given Revision Claim, if all of the Revisions + * are up-to-date. + * + * @param claim the claim that holds the revisions + * + * @return <code>true</code> if the claim was released, <code>false</code> if the Revisions were not + * up-to-date + */ + boolean releaseClaim(RevisionClaim claim); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/RevisionUpdate.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/RevisionUpdate.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/RevisionUpdate.java new file mode 100644 index 0000000..e9b76f6 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/RevisionUpdate.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.revision; + +import java.util.Set; + +import org.apache.nifi.web.FlowModification; +import org.apache.nifi.web.Revision; + +/** + * A packaging of a Component's DTO and the corresponding Revision for that component + */ +public interface RevisionUpdate<T> { + /** + * @return the DTO/configuration of the component that was updated + */ + T getComponent(); + + /** + * @return the last modification that was made to the flow for this component + */ + FlowModification getLastModification(); + + /** + * @return a Set of all Revisions that were updated + */ + Set<Revision> getUpdatedRevisions(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/StandardRevisionClaim.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/StandardRevisionClaim.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/StandardRevisionClaim.java new file mode 100644 index 0000000..b1223c6 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/StandardRevisionClaim.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.revision; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +import org.apache.nifi.web.Revision; + +public class StandardRevisionClaim implements RevisionClaim { + private final Set<Revision> revisions; + + public StandardRevisionClaim(final Revision... revisions) { + this.revisions = new HashSet<>(revisions.length); + for (final Revision revision : revisions) { + this.revisions.add(revision); + } + } + + public StandardRevisionClaim(final Collection<Revision> revisions) { + this.revisions = new HashSet<>(revisions); + } + + @Override + public Set<Revision> getRevisions() { + return revisions; + } + + @Override + public String toString() { + return revisions.toString(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/StandardRevisionUpdate.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/StandardRevisionUpdate.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/StandardRevisionUpdate.java new file mode 100644 index 0000000..bf33b3a --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/StandardRevisionUpdate.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.revision; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import org.apache.nifi.web.FlowModification; +import org.apache.nifi.web.Revision; + +public class StandardRevisionUpdate<T> implements RevisionUpdate<T> { + private final T component; + private final FlowModification lastModification; + private final Set<Revision> updatedRevisions; + + public StandardRevisionUpdate(final T component, final FlowModification lastModification) { + this(component, lastModification, null); + } + + public StandardRevisionUpdate(final T component, final FlowModification lastModification, final Set<Revision> updatedRevisions) { + this.component = component; + this.lastModification = lastModification; + this.updatedRevisions = updatedRevisions == null ? new HashSet<>() : new HashSet<>(updatedRevisions); + if (lastModification != null) { + this.updatedRevisions.add(lastModification.getRevision()); + } + } + + + @Override + public T getComponent() { + return component; + } + + @Override + public FlowModification getLastModification() { + return lastModification; + } + + @Override + public Set<Revision> getUpdatedRevisions() { + return Collections.unmodifiableSet(updatedRevisions); + } + + @Override + public String toString() { + return "[Component=" + component + ", Last Modification=" + lastModification + "]"; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/UpdateRevisionTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/UpdateRevisionTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/UpdateRevisionTask.java new file mode 100644 index 0000000..f2f7914 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/UpdateRevisionTask.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.revision; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.nifi.web.Revision; + +/** + * <p> + * A task that is responsible for updating some component(s). + * </p> + */ +public interface UpdateRevisionTask<T> { + /** + * Updates one or more components and returns updated Revisions for those components + * + * @return the updated revisions for the components + */ + RevisionUpdate<T> update(); + + /** + * Returns a new Revision that has the same Client ID and Component ID as the given one + * but with a larger version + * + * @param revision the revision to update + * @return the updated Revision + */ + default Revision incrementRevision(Revision revision) { + return new Revision(revision.getVersion() + 1, revision.getClientId(), revision.getComponentId()); + } + + /** + * Returns a Collection of Revisions that contains an updated version of all Revisions passed in + * + * @param revisions the Revisions to update + * @return a Collection of all Revisions that are passed in + */ + default Collection<Revision> incrementRevisions(Revision... revisions) { + final List<Revision> updated = new ArrayList<>(revisions.length); + for (final Revision revision : revisions) { + updated.add(incrementRevision(revision)); + } + + return updated; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/test/java/org/apache/nifi/web/revision/TestNaiveRevisionManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/test/java/org/apache/nifi/web/revision/TestNaiveRevisionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/test/java/org/apache/nifi/web/revision/TestNaiveRevisionManager.java new file mode 100644 index 0000000..8b9c129 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/test/java/org/apache/nifi/web/revision/TestNaiveRevisionManager.java @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.revision; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.nifi.web.FlowModification; +import org.apache.nifi.web.InvalidRevisionException; +import org.apache.nifi.web.Revision; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; + + +@Ignore +public class TestNaiveRevisionManager { + private static final String CLIENT_1 = "client-1"; + private static final String COMPONENT_1 = "component-1"; + + private RevisionUpdate<Object> components(final Revision revision) { + return new StandardRevisionUpdate<Object>(null, new FlowModification(revision, null)); + } + + private RevisionUpdate<Object> components(final Revision revision, final Revision... additionalRevisions) { + final Set<Revision> revisionSet = new HashSet<>(); + for (final Revision rev : additionalRevisions) { + revisionSet.add(rev); + } + return components(revision, revisionSet); + } + + private RevisionUpdate<Object> components(final Revision revision, final Set<Revision> additionalRevisions) { + final Set<RevisionUpdate<Object>> components = new HashSet<>(); + for (final Revision rev : additionalRevisions) { + components.add(new StandardRevisionUpdate<Object>(null, new FlowModification(rev, null))); + } + return new StandardRevisionUpdate<Object>(null, new FlowModification(revision, null), additionalRevisions); + } + + @Test + public void testTypicalFlow() throws ExpiredRevisionClaimException { + final RevisionManager revisionManager = new NaiveRevisionManager(); + final Revision originalRevision = new Revision(0L, CLIENT_1, COMPONENT_1); + final RevisionClaim claim = revisionManager.requestClaim(originalRevision); + assertNotNull(claim); + + revisionManager.updateRevision(claim, "unit test", () -> components(new Revision(1L, CLIENT_1, COMPONENT_1))); + + final Revision updatedRevision = revisionManager.getRevision(originalRevision.getComponentId()); + assertNotNull(updatedRevision); + assertEquals(originalRevision.getClientId(), updatedRevision.getClientId()); + assertEquals(originalRevision.getComponentId(), updatedRevision.getComponentId()); + assertEquals(1L, updatedRevision.getVersion().longValue()); + } + + @Test + public void testExpiration() throws InterruptedException { + final RevisionManager revisionManager = new NaiveRevisionManager(10, TimeUnit.MILLISECONDS); + final Revision originalRevision = new Revision(0L, CLIENT_1, COMPONENT_1); + final RevisionClaim claim = revisionManager.requestClaim(originalRevision); + assertNotNull(claim); + + Thread.sleep(100); + + try { + revisionManager.updateRevision(claim, "unit test", () -> components(originalRevision, claim.getRevisions())); + Assert.fail("Expected Revision Claim to have expired but it did not"); + } catch (final ExpiredRevisionClaimException erce) { + // expected + } + } + + @Test(timeout = 15000) + public void testConflictingClaimsFromDifferentClients() { + final RevisionManager revisionManager = new NaiveRevisionManager(2, TimeUnit.SECONDS); + final Revision originalRevision = new Revision(0L, CLIENT_1, COMPONENT_1); + final RevisionClaim claim = revisionManager.requestClaim(originalRevision); + assertNotNull(claim); + + final Revision differentClientRevision = new Revision(0L, "client-2", COMPONENT_1); + final long start = System.nanoTime(); + final RevisionClaim differentClientClaim = revisionManager.requestClaim(differentClientRevision); + final long nanos = System.nanoTime() - start; + + // we should block for 2 seconds. But the timing won't necessarily be exact, + // so we ensure that it takes at least 1.5 seconds to provide a little wiggle room. + final long minExpectedNanos = TimeUnit.MILLISECONDS.toNanos(1500); + assertTrue(nanos > minExpectedNanos); + + // We should not get a Revision Claim because the revision is already claimed by a different client id + assertNotNull(differentClientClaim); + final Set<Revision> newRevisions = differentClientClaim.getRevisions(); + assertEquals(1, newRevisions.size()); + assertEquals(differentClientRevision, newRevisions.iterator().next()); + } + + @Test + public void testGetWithReadLockNoContention() { + final RevisionManager revisionManager = new NaiveRevisionManager(3, TimeUnit.SECONDS); + final Object returnedValue = revisionManager.get(COMPONENT_1, revision -> revision); + assertTrue(returnedValue instanceof Revision); + + final Revision revision = (Revision) returnedValue; + assertEquals(0L, revision.getVersion().longValue()); + assertNull(revision.getClientId()); + assertEquals(COMPONENT_1, revision.getComponentId()); + } + + @Test(timeout = 10000) + public void testGetWithReadLockAndContentionWithTimeout() { + final RevisionManager revisionManager = new NaiveRevisionManager(2, TimeUnit.SECONDS); + final Revision originalRevision = new Revision(8L, CLIENT_1, COMPONENT_1); + final RevisionClaim claim = revisionManager.requestClaim(originalRevision); + assertNotNull(claim); + + final long start = System.nanoTime(); + final Object returnValue = new Object(); + final Object valueReturned = revisionManager.get(COMPONENT_1, revision -> returnValue); + final long nanos = System.nanoTime() - start; + + final long minExpectedNanos = TimeUnit.MILLISECONDS.toNanos(1500L); + assertTrue(nanos > minExpectedNanos); + assertEquals(returnValue, valueReturned); + } + + @Test(timeout = 10000) + public void testGetWithReadLockAndContentionWithEventualLockResolution() { + final RevisionManager revisionManager = new NaiveRevisionManager(10, TimeUnit.MINUTES); + final Revision originalRevision = new Revision(8L, CLIENT_1, COMPONENT_1); + final RevisionClaim claim = revisionManager.requestClaim(originalRevision); + assertNotNull(claim); + + final Revision updatedRevision = new Revision(100L, CLIENT_1, COMPONENT_1); + + // Create a thread that will hold the lock for 2 seconds and then will return an updated revision + final Thread updateRevisionThread = new Thread(new Runnable() { + @Override + public void run() { + try { + revisionManager.updateRevision(claim, "unit test", () -> { + // Wait 2 seconds and then return + try { + Thread.sleep(2000L); + } catch (Exception e) { + } + + return components(updatedRevision); + }); + } catch (ExpiredRevisionClaimException e) { + Assert.fail("Revision expired unexpectedly"); + } + } + }); + updateRevisionThread.start(); + + final long start = System.nanoTime(); + final Object returnValue = new Object(); + final Object valueReturned = revisionManager.get(COMPONENT_1, revision -> { + Assert.assertEquals(updatedRevision, revision); + return returnValue; + }); + final long nanos = System.nanoTime() - start; + + final long minExpectedNanos = TimeUnit.MILLISECONDS.toNanos(1500L); + assertTrue(nanos > minExpectedNanos); + assertEquals(returnValue, valueReturned); + } + + @Test(timeout = 10000) + public void testDeleteRevision() { + final RevisionManager revisionManager = new NaiveRevisionManager(2, TimeUnit.MINUTES); + final Revision firstRevision = new Revision(1L, CLIENT_1, COMPONENT_1); + final RevisionClaim firstClaim = revisionManager.requestClaim(firstRevision); + assertNotNull(firstClaim); + + final Revision secondRevision = new Revision(2L, CLIENT_1, COMPONENT_1); + final FlowModification mod = new FlowModification(secondRevision, "unit test"); + revisionManager.updateRevision(firstClaim, "unit test", () -> new StandardRevisionUpdate<Void>(null, mod, null)); + + final Revision updatedRevision = revisionManager.getRevision(COMPONENT_1); + assertEquals(secondRevision, updatedRevision); + + final RevisionClaim secondClaim = revisionManager.requestClaim(updatedRevision); + assertNotNull(secondClaim); + + final Object obj = new Object(); + final Object ret = revisionManager.deleteRevision(secondClaim, () -> obj); + assertEquals(obj, ret); + + final Revision curRevision = revisionManager.getRevision(COMPONENT_1); + assertNotNull(curRevision); + assertEquals(0L, curRevision.getVersion().longValue()); + assertNull(curRevision.getClientId()); + assertEquals(COMPONENT_1, curRevision.getComponentId()); + } + + + @Test(timeout = 10000) + public void testSameClientDifferentRevisionsDoNotBlockEachOther() { + final RevisionManager revisionManager = new NaiveRevisionManager(2, TimeUnit.MINUTES); + final Revision firstRevision = new Revision(1L, CLIENT_1, COMPONENT_1); + final RevisionClaim firstClaim = revisionManager.requestClaim(firstRevision); + assertNotNull(firstClaim); + + final Revision secondRevision = new Revision(1L, CLIENT_1, "component-2"); + final RevisionClaim secondClaim = revisionManager.requestClaim(secondRevision); + assertNotNull(secondClaim); + } + + @Test(timeout = 10000) + public void testDifferentClientDifferentRevisionsDoNotBlockEachOther() { + final RevisionManager revisionManager = new NaiveRevisionManager(2, TimeUnit.MINUTES); + final Revision firstRevision = new Revision(1L, CLIENT_1, COMPONENT_1); + final RevisionClaim firstClaim = revisionManager.requestClaim(firstRevision); + assertNotNull(firstClaim); + + final Revision secondRevision = new Revision(1L, "client-2", "component-2"); + final RevisionClaim secondClaim = revisionManager.requestClaim(secondRevision); + assertNotNull(secondClaim); + } + + @Test(timeout = 10000) + public void testDifferentOrderedRevisionsDoNotCauseDeadlock() throws ExpiredRevisionClaimException, InterruptedException { + // Because we block before obtaining a claim on a revision if another client has the revision claimed, + // we should not have an issue if Client 1 requests a claim on revisions 'a' and 'b' while Client 2 + // requests a claim on revisions 'b' and 'c' and Client 3 requests a claim on revisions 'c' and 'a'. + final RevisionManager revisionManager = new NaiveRevisionManager(2, TimeUnit.MINUTES); + final Revision revision1a = new Revision(1L, "client-1", "a"); + final Revision revision1b = new Revision(1L, "client-1", "b"); + + final Revision revision2b = new Revision(2L, "client-2", "b"); + final Revision revision2c = new Revision(2L, "client-2", "c"); + + final Revision revision3c = new Revision(3L, "client-3", "c"); + final Revision revision3a = new Revision(3L, "client-3", "a"); + + final RevisionClaim claim1 = revisionManager.requestClaim(Arrays.asList(revision1a, revision1b)); + assertNotNull(claim1); + + final AtomicBoolean claim2Obtained = new AtomicBoolean(false); + final AtomicBoolean claim3Obtained = new AtomicBoolean(false); + + final AtomicReference<RevisionClaim> claim2Ref = new AtomicReference<>(); + final AtomicReference<RevisionClaim> claim3Ref = new AtomicReference<>(); + new Thread(new Runnable() { + @Override + public void run() { + final RevisionClaim claim2 = revisionManager.requestClaim(Arrays.asList(revision2b, revision2c)); + assertNotNull(claim2); + claim2Obtained.set(true); + claim2Ref.set(claim2); + + try { + revisionManager.updateRevision(claim2, "unit test", () -> components(new Revision(3L, "client-2", "b"), new Revision(3L, "client-2", "c"))); + } catch (ExpiredRevisionClaimException e) { + Assert.fail("Revision unexpected expired"); + } + } + }).start(); + + new Thread(new Runnable() { + @Override + public void run() { + final RevisionClaim claim3 = revisionManager.requestClaim(Arrays.asList(revision3c, revision3a)); + assertNotNull(claim3); + claim3Obtained.set(true); + claim3Ref.set(claim3); + + try { + revisionManager.updateRevision(claim3Ref.get(), "unit test", () -> components(new Revision(2L, "client-3", "c"), new Revision(2L, "client-3", "a"))); + } catch (ExpiredRevisionClaimException e) { + Assert.fail("Revision unexpected expired"); + } + } + }).start(); + + Thread.sleep(250L); + + assertFalse(claim2Obtained.get()); + assertFalse(claim3Obtained.get()); + revisionManager.updateRevision(claim1, "unit test", () -> components(new Revision(3L, "client-1", "a"), new Revision(2L, "client-1", "b"))); + + Thread.sleep(250L); + assertTrue(claim2Obtained.get() && claim3Obtained.get()); + + assertEquals(2L, revisionManager.getRevision("a").getVersion().longValue()); + + // The version for 'c' could be either 2 or 3, depending on which request completed first. + final long versionC = revisionManager.getRevision("c").getVersion().longValue(); + assertTrue(versionC == 2 || versionC == 3); + + assertEquals(3L, revisionManager.getRevision("b").getVersion().longValue()); + } + + @Test(timeout = 10000) + public void testReleaseClaim() { + final RevisionManager revisionManager = new NaiveRevisionManager(10, TimeUnit.MINUTES); + final Revision firstRevision = new Revision(1L, CLIENT_1, COMPONENT_1); + final RevisionClaim claim = revisionManager.requestClaim(firstRevision); + assertNotNull(claim); + + final RevisionClaim invalidClaim = new StandardRevisionClaim(new Revision(2L, "client-2", COMPONENT_1)); + assertFalse(revisionManager.releaseClaim(invalidClaim)); + + assertTrue(revisionManager.releaseClaim(claim)); + } + + @Test(timeout = 10000) + public void testUpdateWithSomeWrongRevision() { + final RevisionManager revisionManager = new NaiveRevisionManager(10, TimeUnit.MINUTES); + final Revision component1V1 = new Revision(1L, CLIENT_1, COMPONENT_1); + final Revision component2V1 = new Revision(1L, CLIENT_1, "component-2"); + final RevisionClaim claim = revisionManager.requestClaim(Arrays.asList(component1V1, component2V1)); + assertNotNull(claim); + + // Perform update but only update the revision for component-2 + final Revision component1V2 = new Revision(2L, "client-2", COMPONENT_1); + revisionManager.updateRevision(claim, "unit test", new UpdateRevisionTask<Void>() { + @Override + public RevisionUpdate<Void> update() { + return new StandardRevisionUpdate<>(null, new FlowModification(component1V2, "unit test")); + } + }); + + // Obtain a claim with correct revisions + final Revision component2V2 = new Revision(2L, "client-2", "component-2"); + revisionManager.requestClaim(Arrays.asList(component1V2, component2V1)); + + // Attempt to update with incorrect revision for second component + final RevisionClaim wrongClaim = new StandardRevisionClaim(component1V2, component2V2); + + final Revision component1V3 = new Revision(3L, CLIENT_1, COMPONENT_1); + try { + revisionManager.updateRevision(wrongClaim, "unit test", + () -> new StandardRevisionUpdate<>(null, new FlowModification(component1V3, "unit test"), Collections.emptySet())); + Assert.fail("Expected an Invalid Revision Exception"); + } catch (final InvalidRevisionException ire) { + // expected + } + + // release claim should fail because we are passing the wrong revision for component 2 + assertFalse(revisionManager.releaseClaim(new StandardRevisionClaim(component1V2, component2V2))); + + // release claim should succeed because we are now using the proper revisions + assertTrue(revisionManager.releaseClaim(new StandardRevisionClaim(component1V2, component2V1))); + + // verify that we can update again. + final RevisionClaim thirdClaim = revisionManager.requestClaim(Arrays.asList(component1V2, component2V1)); + assertNotNull(thirdClaim); + revisionManager.updateRevision(thirdClaim, "unit test", () -> new StandardRevisionUpdate<>(null, new FlowModification(component1V3, "unit test"))); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-settings.js ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-settings.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-settings.js index 9463e3e..e014445 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-settings.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-settings.js @@ -317,7 +317,7 @@ nf.Settings = (function () { // add the new controller service var addService = $.ajax({ type: 'POST', - url: config.urls.api + '/process-groups/' + encodeURIComponent(nf.Canvas.getGroupId) + '/controller-services/' + encodeURIComponent(availability), + url: config.urls.api + '/process-groups/' + encodeURIComponent(nf.Canvas.getGroupId()) + '/controller-services/' + encodeURIComponent(availability), data: JSON.stringify(controllerServiceEntity), dataType: 'json', contentType: 'application/json' @@ -861,7 +861,7 @@ nf.Settings = (function () { // get the controller services that are running on the nodes var nodeControllerServices = $.ajax({ type: 'GET', - url: config.urls.api + '/process-groups/' + encodeURIComponent(nf.Canvas.getGroupId) + '/controller-services/' + encodeURIComponent(config.node), + url: config.urls.api + '/process-groups/' + encodeURIComponent(nf.Canvas.getGroupId()) + '/controller-services/' + encodeURIComponent(config.node), dataType: 'json' }).done(function (response) { var nodeServices = response.controllerServices; @@ -879,7 +879,7 @@ nf.Settings = (function () { if (nf.Canvas.isClustered()) { $.ajax({ type: 'GET', - url: config.urls.api + '/process-groups/' + encodeURIComponent(nf.Canvas.getGroupId) + '/controller-services/' + encodeURIComponent(config.ncm), + url: config.urls.api + '/process-groups/' + encodeURIComponent(nf.Canvas.getGroupId()) + '/controller-services/' + encodeURIComponent(config.ncm), dataType: 'json' }).done(function (response) { var ncmServices = response.controllerServices;
