Repository: nifi
Updated Branches:
  refs/heads/master 3db14f58f -> 04c41c065


http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/RevisionManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/RevisionManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/RevisionManager.java
new file mode 100644
index 0000000..0db6828
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/RevisionManager.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.revision;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.function.Supplier;
+
+import org.apache.nifi.web.InvalidRevisionException;
+import org.apache.nifi.web.Revision;
+
+
+/**
+ * <p>
+ * A Revision Manager provides the ability to prevent clients of the Web API 
from
+ * stepping on one another. This is done by providing claims and locking 
mechanisms
+ * for components individually.
+ * </p>
+ *
+ * <p>
+ * Clients that will modify a resource must do so using a two-phase commit. 
First,
+ * the client will issue a request that includes an HTTP Header of 
"X-NcmExpects".
+ * This indicates that the request will not actually be performed but rather 
that the
+ * node should validate that the request could in fact be performed. If all 
nodes respond
+ * with a 150-Continue response, then the second phase will commence. The 
second phase
+ * will consist of replicating the same request but without the "X-NcmExpects" 
header.
+ * </p>
+ *
+ * <p>
+ * When the first phase of the two-phase commit is processed, the Revision 
Manager should
+ * be used to obtain a Revision Claim by calling the {@link 
#requestClaim(Collection)}
+ * method. If a Claim is granted, then the request validation may continue. If 
the
+ * Claim is not granted, the request should fail and the second phase should 
not
+ * be performed.
+ * </p>
+ *
+ * <p>
+ * If the first phase of the above two-phase commit completes and all nodes 
indicate that the
+ * request may continue, this means that all nodes have provided granted a 
Claim on the Revisions
+ * that are relevant. This Claim will automatically expire after some time. 
This expiration
+ * means that if the node that issues the first phase never initiates the 
second phase (if the node
+ * dies or loses network connectivitiy, for instance), then the Revision Claim 
will expire and
+ * the Revision will remain unchanged.
+ * </p>
+ *
+ * <p>
+ * When the second phase begins, changes to the resource(s) must be made with 
the Revisions
+ * locked. This is accomplished by wrapping the logic in a {@link Runnable} 
and passing the Runnable,
+ * along with the {@link RevisionClaim} to the {@link 
#updateRevision(RevisionClaim, Supplier)} method.
+ * </p>
+ */
+public interface RevisionManager {
+
+    /**
+     * <p>
+     * Attempts to obtain a Revision Claim for Revisions supplied. If a 
Revision Claim
+     * is granted, no other thread will be allowed to modify any of the 
components for
+     * which a Revision is claimed until either the Revision Claim is 
relinquished by
+     * calling the {@link #updateRevision(RevisionClaim, Runnable)} method or 
the
+     * {@link #releaseClaim(RevisionClaim)} method, or the Revision Claim 
expires.
+     * </p>
+     *
+     * <p>
+     * This method is atomic. If a Revision Claim is unable to be obtained for 
any of the
+     * provided Revisions, then no Revision Claim will be obtained.
+     * </p>
+     *
+     * @param revisions a Set of Revisions, each of which corresponds to a 
different
+     *            component for which a Claim is to be acquired.
+     *
+     * @return the Revision Claim that was granted, if one was granted.
+     *
+     * @throws InvalidRevisionException if any of the Revisions provided is 
out-of-date.
+     */
+    RevisionClaim requestClaim(Collection<Revision> revisions) throws 
InvalidRevisionException;
+
+    /**
+     * <p>
+     * A convenience method that will call {@link #requestClaim(Collection)} 
by wrapping the given
+     * Revision in a Collection
+     * </p>
+     *
+     * @param revision the revision to request a claim for
+     *
+     * @return the Revision Claim that was granted, if one was granted.
+     *
+     * @throws InvalidRevisionException if any of the Revisions provided is 
out-of-date.
+     */
+    RevisionClaim requestClaim(Revision revision) throws 
InvalidRevisionException;
+
+    /**
+     * Returns the current Revision for the component with the given ID. If no 
Revision yet exists for the
+     * component with the given ID, one will be created with a Version of 0 
and no Client ID.
+     *
+     * @param componentId
+     * @return the current Revision for the component with the given ID
+     */
+    Revision getRevision(String componentId);
+
+    /**
+     * Performs the given task without allowing the given Revision Claim to 
expire. Once this method
+     * returns or an Exception is thrown (with the Exception of 
ExpiredRevisionClaimException),
+     * the Revision may have been updated for each component that the 
RevisionClaim holds a Claim for.
+     * If an ExpiredRevisionClaimException is thrown, the Revisions claimed by 
RevisionClaim
+     * will not be updated.
+     *
+     * @param claim the Revision Claim that is responsible for holding a Claim 
on the Revisions for each component that is
+     *            to be updated
+     * @param modifier the name of the entity that is modifying the resource
+     * @param task the task that is responsible for updating the components 
whose Revisions are claimed by the given
+     *            RevisionClaim. The returned Revision set should include a 
Revision for each Revision that is the
+     *            supplied Revision Claim. If there exists any Revision in the 
provided RevisionClaim that is not part
+     *            of the RevisionClaim returned by the task, then the Revision 
is assumed to have not been modified.
+     *
+     * @return a RevisionUpdate object that represents the new version of the 
component that was updated
+     *
+     * @throws ExpiredRevisionClaimException if the Revision Claim has expired
+     */
+    <T> RevisionUpdate<T> updateRevision(RevisionClaim claim, String modifier, 
UpdateRevisionTask<T> task) throws ExpiredRevisionClaimException;
+
+    /**
+     * Performs the given task that is expected to remove a component from the 
flow. As a result,
+     * the Revision for the component referenced by the RevisionClaim will be 
removed.
+     *
+     * @param claim the Revision Claim that is responsible for holding a Claim 
on the Revisions for each component that is
+     *            to be removed
+     * @param task the task that is responsible for deleting the components 
whose Revisions are claimed by the given RevisionClaim
+     * @return the value returned from the DeleteRevisionTask
+     *
+     * @throws ExpiredRevisionClaimException if the Revision Claim has expired
+     */
+    <T> T deleteRevision(RevisionClaim claim, DeleteRevisionTask<T> task) 
throws ExpiredRevisionClaimException;
+
+    /**
+     * Performs some operation to obtain an object of type T whose identifier 
is provided via
+     * the componentId argument, and return that object of type T while 
holding a Read Lock on
+     * the Revision for that object. Note that the callback provided must 
never modify the object
+     * with the given ID.
+     *
+     * @param callback the callback that is to be performed with the Read Lock 
held
+     * @return the value returned from the callback
+     */
+    <T> T get(String componentId, ReadOnlyRevisionCallback<T> callback);
+
+    /**
+     * Performs some operation to obtain an object of type T whose identifier 
is provided via
+     * the componentId argument, and return that object of type T while 
holding a Read Lock on
+     * the Revision for that object. Note that the callback provided must 
never modify the object
+     * with the given ID.
+     *
+     * @param callback the callback that is to be performed with the Read Lock 
held
+     * @return the value returned from the callback
+     */
+    <T> T get(Set<String> componentId, Supplier<T> callback);
+
+    /**
+     * Releases the claims on the revisions held by the given Revision Claim, 
if all of the Revisions
+     * are up-to-date.
+     *
+     * @param claim the claim that holds the revisions
+     *
+     * @return <code>true</code> if the claim was released, <code>false</code> 
if the Revisions were not
+     *         up-to-date
+     */
+    boolean releaseClaim(RevisionClaim claim);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/RevisionUpdate.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/RevisionUpdate.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/RevisionUpdate.java
new file mode 100644
index 0000000..e9b76f6
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/RevisionUpdate.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.revision;
+
+import java.util.Set;
+
+import org.apache.nifi.web.FlowModification;
+import org.apache.nifi.web.Revision;
+
+/**
+ * A packaging of a Component's DTO and the corresponding Revision for that 
component
+ */
+public interface RevisionUpdate<T> {
+    /**
+     * @return the DTO/configuration of the component that was updated
+     */
+    T getComponent();
+
+    /**
+     * @return the last modification that was made to the flow for this 
component
+     */
+    FlowModification getLastModification();
+
+    /**
+     * @return a Set of all Revisions that were updated
+     */
+    Set<Revision> getUpdatedRevisions();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/StandardRevisionClaim.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/StandardRevisionClaim.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/StandardRevisionClaim.java
new file mode 100644
index 0000000..b1223c6
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/StandardRevisionClaim.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.revision;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.nifi.web.Revision;
+
+public class StandardRevisionClaim implements RevisionClaim {
+    private final Set<Revision> revisions;
+
+    public StandardRevisionClaim(final Revision... revisions) {
+        this.revisions = new HashSet<>(revisions.length);
+        for (final Revision revision : revisions) {
+            this.revisions.add(revision);
+        }
+    }
+
+    public StandardRevisionClaim(final Collection<Revision> revisions) {
+        this.revisions = new HashSet<>(revisions);
+    }
+
+    @Override
+    public Set<Revision> getRevisions() {
+        return revisions;
+    }
+
+    @Override
+    public String toString() {
+        return revisions.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/StandardRevisionUpdate.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/StandardRevisionUpdate.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/StandardRevisionUpdate.java
new file mode 100644
index 0000000..bf33b3a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/StandardRevisionUpdate.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.revision;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.nifi.web.FlowModification;
+import org.apache.nifi.web.Revision;
+
+public class StandardRevisionUpdate<T> implements RevisionUpdate<T> {
+    private final T component;
+    private final FlowModification lastModification;
+    private final Set<Revision> updatedRevisions;
+
+    public StandardRevisionUpdate(final T component, final FlowModification 
lastModification) {
+        this(component, lastModification, null);
+    }
+
+    public StandardRevisionUpdate(final T component, final FlowModification 
lastModification, final Set<Revision> updatedRevisions) {
+        this.component = component;
+        this.lastModification = lastModification;
+        this.updatedRevisions = updatedRevisions == null ? new HashSet<>() : 
new HashSet<>(updatedRevisions);
+        if (lastModification != null) {
+            this.updatedRevisions.add(lastModification.getRevision());
+        }
+    }
+
+
+    @Override
+    public T getComponent() {
+        return component;
+    }
+
+    @Override
+    public FlowModification getLastModification() {
+        return lastModification;
+    }
+
+    @Override
+    public Set<Revision> getUpdatedRevisions() {
+        return Collections.unmodifiableSet(updatedRevisions);
+    }
+
+    @Override
+    public String toString() {
+        return "[Component=" + component + ", Last Modification=" + 
lastModification + "]";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/UpdateRevisionTask.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/UpdateRevisionTask.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/UpdateRevisionTask.java
new file mode 100644
index 0000000..f2f7914
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/UpdateRevisionTask.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.revision;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.nifi.web.Revision;
+
+/**
+ * <p>
+ * A task that is responsible for updating some component(s).
+ * </p>
+ */
+public interface UpdateRevisionTask<T> {
+    /**
+     * Updates one or more components and returns updated Revisions for those 
components
+     *
+     * @return the updated revisions for the components
+     */
+    RevisionUpdate<T> update();
+
+    /**
+     * Returns a new Revision that has the same Client ID and Component ID as 
the given one
+     * but with a larger version
+     *
+     * @param revision the revision to update
+     * @return the updated Revision
+     */
+    default Revision incrementRevision(Revision revision) {
+        return new Revision(revision.getVersion() + 1, revision.getClientId(), 
revision.getComponentId());
+    }
+
+    /**
+     * Returns a Collection of Revisions that contains an updated version of 
all Revisions passed in
+     *
+     * @param revisions the Revisions to update
+     * @return a Collection of all Revisions that are passed in
+     */
+    default Collection<Revision> incrementRevisions(Revision... revisions) {
+        final List<Revision> updated = new ArrayList<>(revisions.length);
+        for (final Revision revision : revisions) {
+            updated.add(incrementRevision(revision));
+        }
+
+        return updated;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/test/java/org/apache/nifi/web/revision/TestNaiveRevisionManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/test/java/org/apache/nifi/web/revision/TestNaiveRevisionManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/test/java/org/apache/nifi/web/revision/TestNaiveRevisionManager.java
new file mode 100644
index 0000000..8b9c129
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/test/java/org/apache/nifi/web/revision/TestNaiveRevisionManager.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.revision;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.nifi.web.FlowModification;
+import org.apache.nifi.web.InvalidRevisionException;
+import org.apache.nifi.web.Revision;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+
+@Ignore
+public class TestNaiveRevisionManager {
+    private static final String CLIENT_1 = "client-1";
+    private static final String COMPONENT_1 = "component-1";
+
+    private RevisionUpdate<Object> components(final Revision revision) {
+        return new StandardRevisionUpdate<Object>(null, new 
FlowModification(revision, null));
+    }
+
+    private RevisionUpdate<Object> components(final Revision revision, final 
Revision... additionalRevisions) {
+        final Set<Revision> revisionSet = new HashSet<>();
+        for (final Revision rev : additionalRevisions) {
+            revisionSet.add(rev);
+        }
+        return components(revision, revisionSet);
+    }
+
+    private RevisionUpdate<Object> components(final Revision revision, final 
Set<Revision> additionalRevisions) {
+        final Set<RevisionUpdate<Object>> components = new HashSet<>();
+        for (final Revision rev : additionalRevisions) {
+            components.add(new StandardRevisionUpdate<Object>(null, new 
FlowModification(rev, null)));
+        }
+        return new StandardRevisionUpdate<Object>(null, new 
FlowModification(revision, null), additionalRevisions);
+    }
+
+    @Test
+    public void testTypicalFlow() throws ExpiredRevisionClaimException {
+        final RevisionManager revisionManager = new NaiveRevisionManager();
+        final Revision originalRevision = new Revision(0L, CLIENT_1, 
COMPONENT_1);
+        final RevisionClaim claim = 
revisionManager.requestClaim(originalRevision);
+        assertNotNull(claim);
+
+        revisionManager.updateRevision(claim, "unit test", () -> 
components(new Revision(1L, CLIENT_1, COMPONENT_1)));
+
+        final Revision updatedRevision = 
revisionManager.getRevision(originalRevision.getComponentId());
+        assertNotNull(updatedRevision);
+        assertEquals(originalRevision.getClientId(), 
updatedRevision.getClientId());
+        assertEquals(originalRevision.getComponentId(), 
updatedRevision.getComponentId());
+        assertEquals(1L, updatedRevision.getVersion().longValue());
+    }
+
+    @Test
+    public void testExpiration() throws InterruptedException {
+        final RevisionManager revisionManager = new NaiveRevisionManager(10, 
TimeUnit.MILLISECONDS);
+        final Revision originalRevision = new Revision(0L, CLIENT_1, 
COMPONENT_1);
+        final RevisionClaim claim = 
revisionManager.requestClaim(originalRevision);
+        assertNotNull(claim);
+
+        Thread.sleep(100);
+
+        try {
+            revisionManager.updateRevision(claim, "unit test", () -> 
components(originalRevision, claim.getRevisions()));
+            Assert.fail("Expected Revision Claim to have expired but it did 
not");
+        } catch (final ExpiredRevisionClaimException erce) {
+            // expected
+        }
+    }
+
+    @Test(timeout = 15000)
+    public void testConflictingClaimsFromDifferentClients() {
+        final RevisionManager revisionManager = new NaiveRevisionManager(2, 
TimeUnit.SECONDS);
+        final Revision originalRevision = new Revision(0L, CLIENT_1, 
COMPONENT_1);
+        final RevisionClaim claim = 
revisionManager.requestClaim(originalRevision);
+        assertNotNull(claim);
+
+        final Revision differentClientRevision = new Revision(0L, "client-2", 
COMPONENT_1);
+        final long start = System.nanoTime();
+        final RevisionClaim differentClientClaim = 
revisionManager.requestClaim(differentClientRevision);
+        final long nanos = System.nanoTime() - start;
+
+        // we should block for 2 seconds. But the timing won't necessarily be 
exact,
+        // so we ensure that it takes at least 1.5 seconds to provide a little 
wiggle room.
+        final long minExpectedNanos = TimeUnit.MILLISECONDS.toNanos(1500);
+        assertTrue(nanos > minExpectedNanos);
+
+        // We should not get a Revision Claim because the revision is already 
claimed by a different client id
+        assertNotNull(differentClientClaim);
+        final Set<Revision> newRevisions = differentClientClaim.getRevisions();
+        assertEquals(1, newRevisions.size());
+        assertEquals(differentClientRevision, newRevisions.iterator().next());
+    }
+
+    @Test
+    public void testGetWithReadLockNoContention() {
+        final RevisionManager revisionManager = new NaiveRevisionManager(3, 
TimeUnit.SECONDS);
+        final Object returnedValue = revisionManager.get(COMPONENT_1, revision 
-> revision);
+        assertTrue(returnedValue instanceof Revision);
+
+        final Revision revision = (Revision) returnedValue;
+        assertEquals(0L, revision.getVersion().longValue());
+        assertNull(revision.getClientId());
+        assertEquals(COMPONENT_1, revision.getComponentId());
+    }
+
+    @Test(timeout = 10000)
+    public void testGetWithReadLockAndContentionWithTimeout() {
+        final RevisionManager revisionManager = new NaiveRevisionManager(2, 
TimeUnit.SECONDS);
+        final Revision originalRevision = new Revision(8L, CLIENT_1, 
COMPONENT_1);
+        final RevisionClaim claim = 
revisionManager.requestClaim(originalRevision);
+        assertNotNull(claim);
+
+        final long start = System.nanoTime();
+        final Object returnValue = new Object();
+        final Object valueReturned = revisionManager.get(COMPONENT_1, revision 
-> returnValue);
+        final long nanos = System.nanoTime() - start;
+
+        final long minExpectedNanos = TimeUnit.MILLISECONDS.toNanos(1500L);
+        assertTrue(nanos > minExpectedNanos);
+        assertEquals(returnValue, valueReturned);
+    }
+
+    @Test(timeout = 10000)
+    public void testGetWithReadLockAndContentionWithEventualLockResolution() {
+        final RevisionManager revisionManager = new NaiveRevisionManager(10, 
TimeUnit.MINUTES);
+        final Revision originalRevision = new Revision(8L, CLIENT_1, 
COMPONENT_1);
+        final RevisionClaim claim = 
revisionManager.requestClaim(originalRevision);
+        assertNotNull(claim);
+
+        final Revision updatedRevision = new Revision(100L, CLIENT_1, 
COMPONENT_1);
+
+        // Create a thread that will hold the lock for 2 seconds and then will 
return an updated revision
+        final Thread updateRevisionThread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    revisionManager.updateRevision(claim, "unit test", () -> {
+                        // Wait 2 seconds and then return
+                        try {
+                            Thread.sleep(2000L);
+                        } catch (Exception e) {
+                        }
+
+                        return components(updatedRevision);
+                    });
+                } catch (ExpiredRevisionClaimException e) {
+                    Assert.fail("Revision expired unexpectedly");
+                }
+            }
+        });
+        updateRevisionThread.start();
+
+        final long start = System.nanoTime();
+        final Object returnValue = new Object();
+        final Object valueReturned = revisionManager.get(COMPONENT_1, revision 
-> {
+            Assert.assertEquals(updatedRevision, revision);
+            return returnValue;
+        });
+        final long nanos = System.nanoTime() - start;
+
+        final long minExpectedNanos = TimeUnit.MILLISECONDS.toNanos(1500L);
+        assertTrue(nanos > minExpectedNanos);
+        assertEquals(returnValue, valueReturned);
+    }
+
+    @Test(timeout = 10000)
+    public void testDeleteRevision() {
+        final RevisionManager revisionManager = new NaiveRevisionManager(2, 
TimeUnit.MINUTES);
+        final Revision firstRevision = new Revision(1L, CLIENT_1, COMPONENT_1);
+        final RevisionClaim firstClaim = 
revisionManager.requestClaim(firstRevision);
+        assertNotNull(firstClaim);
+
+        final Revision secondRevision = new Revision(2L, CLIENT_1, 
COMPONENT_1);
+        final FlowModification mod = new FlowModification(secondRevision, 
"unit test");
+        revisionManager.updateRevision(firstClaim, "unit test", () -> new 
StandardRevisionUpdate<Void>(null, mod, null));
+
+        final Revision updatedRevision = 
revisionManager.getRevision(COMPONENT_1);
+        assertEquals(secondRevision, updatedRevision);
+
+        final RevisionClaim secondClaim = 
revisionManager.requestClaim(updatedRevision);
+        assertNotNull(secondClaim);
+
+        final Object obj = new Object();
+        final Object ret = revisionManager.deleteRevision(secondClaim, () -> 
obj);
+        assertEquals(obj, ret);
+
+        final Revision curRevision = revisionManager.getRevision(COMPONENT_1);
+        assertNotNull(curRevision);
+        assertEquals(0L, curRevision.getVersion().longValue());
+        assertNull(curRevision.getClientId());
+        assertEquals(COMPONENT_1, curRevision.getComponentId());
+    }
+
+
+    @Test(timeout = 10000)
+    public void testSameClientDifferentRevisionsDoNotBlockEachOther() {
+        final RevisionManager revisionManager = new NaiveRevisionManager(2, 
TimeUnit.MINUTES);
+        final Revision firstRevision = new Revision(1L, CLIENT_1, COMPONENT_1);
+        final RevisionClaim firstClaim = 
revisionManager.requestClaim(firstRevision);
+        assertNotNull(firstClaim);
+
+        final Revision secondRevision = new Revision(1L, CLIENT_1, 
"component-2");
+        final RevisionClaim secondClaim = 
revisionManager.requestClaim(secondRevision);
+        assertNotNull(secondClaim);
+    }
+
+    @Test(timeout = 10000)
+    public void testDifferentClientDifferentRevisionsDoNotBlockEachOther() {
+        final RevisionManager revisionManager = new NaiveRevisionManager(2, 
TimeUnit.MINUTES);
+        final Revision firstRevision = new Revision(1L, CLIENT_1, COMPONENT_1);
+        final RevisionClaim firstClaim = 
revisionManager.requestClaim(firstRevision);
+        assertNotNull(firstClaim);
+
+        final Revision secondRevision = new Revision(1L, "client-2", 
"component-2");
+        final RevisionClaim secondClaim = 
revisionManager.requestClaim(secondRevision);
+        assertNotNull(secondClaim);
+    }
+
+    @Test(timeout = 10000)
+    public void testDifferentOrderedRevisionsDoNotCauseDeadlock() throws 
ExpiredRevisionClaimException, InterruptedException {
+        // Because we block before obtaining a claim on a revision if another 
client has the revision claimed,
+        // we should not have an issue if Client 1 requests a claim on 
revisions 'a' and 'b' while Client 2
+        // requests a claim on revisions 'b' and 'c' and Client 3 requests a 
claim on revisions 'c' and 'a'.
+        final RevisionManager revisionManager = new NaiveRevisionManager(2, 
TimeUnit.MINUTES);
+        final Revision revision1a = new Revision(1L, "client-1", "a");
+        final Revision revision1b = new Revision(1L, "client-1", "b");
+
+        final Revision revision2b = new Revision(2L, "client-2", "b");
+        final Revision revision2c = new Revision(2L, "client-2", "c");
+
+        final Revision revision3c = new Revision(3L, "client-3", "c");
+        final Revision revision3a = new Revision(3L, "client-3", "a");
+
+        final RevisionClaim claim1 = 
revisionManager.requestClaim(Arrays.asList(revision1a, revision1b));
+        assertNotNull(claim1);
+
+        final AtomicBoolean claim2Obtained = new AtomicBoolean(false);
+        final AtomicBoolean claim3Obtained = new AtomicBoolean(false);
+
+        final AtomicReference<RevisionClaim> claim2Ref = new 
AtomicReference<>();
+        final AtomicReference<RevisionClaim> claim3Ref = new 
AtomicReference<>();
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                final RevisionClaim claim2 = 
revisionManager.requestClaim(Arrays.asList(revision2b, revision2c));
+                assertNotNull(claim2);
+                claim2Obtained.set(true);
+                claim2Ref.set(claim2);
+
+                try {
+                    revisionManager.updateRevision(claim2, "unit test", () -> 
components(new Revision(3L, "client-2", "b"), new Revision(3L, "client-2", 
"c")));
+                } catch (ExpiredRevisionClaimException e) {
+                    Assert.fail("Revision unexpected expired");
+                }
+            }
+        }).start();
+
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                final RevisionClaim claim3 = 
revisionManager.requestClaim(Arrays.asList(revision3c, revision3a));
+                assertNotNull(claim3);
+                claim3Obtained.set(true);
+                claim3Ref.set(claim3);
+
+                try {
+                    revisionManager.updateRevision(claim3Ref.get(), "unit 
test", () -> components(new Revision(2L, "client-3", "c"), new Revision(2L, 
"client-3", "a")));
+                } catch (ExpiredRevisionClaimException e) {
+                    Assert.fail("Revision unexpected expired");
+                }
+            }
+        }).start();
+
+        Thread.sleep(250L);
+
+        assertFalse(claim2Obtained.get());
+        assertFalse(claim3Obtained.get());
+        revisionManager.updateRevision(claim1, "unit test", () -> 
components(new Revision(3L, "client-1", "a"), new Revision(2L, "client-1", 
"b")));
+
+        Thread.sleep(250L);
+        assertTrue(claim2Obtained.get() && claim3Obtained.get());
+
+        assertEquals(2L, 
revisionManager.getRevision("a").getVersion().longValue());
+
+        // The version for 'c' could be either 2 or 3, depending on which 
request completed first.
+        final long versionC = 
revisionManager.getRevision("c").getVersion().longValue();
+        assertTrue(versionC == 2 || versionC == 3);
+
+        assertEquals(3L, 
revisionManager.getRevision("b").getVersion().longValue());
+    }
+
+    @Test(timeout = 10000)
+    public void testReleaseClaim() {
+        final RevisionManager revisionManager = new NaiveRevisionManager(10, 
TimeUnit.MINUTES);
+        final Revision firstRevision = new Revision(1L, CLIENT_1, COMPONENT_1);
+        final RevisionClaim claim = 
revisionManager.requestClaim(firstRevision);
+        assertNotNull(claim);
+
+        final RevisionClaim invalidClaim = new StandardRevisionClaim(new 
Revision(2L, "client-2", COMPONENT_1));
+        assertFalse(revisionManager.releaseClaim(invalidClaim));
+
+        assertTrue(revisionManager.releaseClaim(claim));
+    }
+
+    @Test(timeout = 10000)
+    public void testUpdateWithSomeWrongRevision() {
+        final RevisionManager revisionManager = new NaiveRevisionManager(10, 
TimeUnit.MINUTES);
+        final Revision component1V1 = new Revision(1L, CLIENT_1, COMPONENT_1);
+        final Revision component2V1 = new Revision(1L, CLIENT_1, 
"component-2");
+        final RevisionClaim claim = 
revisionManager.requestClaim(Arrays.asList(component1V1, component2V1));
+        assertNotNull(claim);
+
+        // Perform update but only update the revision for component-2
+        final Revision component1V2 = new Revision(2L, "client-2", 
COMPONENT_1);
+        revisionManager.updateRevision(claim, "unit test", new 
UpdateRevisionTask<Void>() {
+            @Override
+            public RevisionUpdate<Void> update() {
+                return new StandardRevisionUpdate<>(null, new 
FlowModification(component1V2, "unit test"));
+            }
+        });
+
+        // Obtain a claim with correct revisions
+        final Revision component2V2 = new Revision(2L, "client-2", 
"component-2");
+        revisionManager.requestClaim(Arrays.asList(component1V2, 
component2V1));
+
+        // Attempt to update with incorrect revision for second component
+        final RevisionClaim wrongClaim = new 
StandardRevisionClaim(component1V2, component2V2);
+
+        final Revision component1V3 = new Revision(3L, CLIENT_1, COMPONENT_1);
+        try {
+            revisionManager.updateRevision(wrongClaim, "unit test",
+                () -> new StandardRevisionUpdate<>(null, new 
FlowModification(component1V3, "unit test"), Collections.emptySet()));
+            Assert.fail("Expected an Invalid Revision Exception");
+        } catch (final InvalidRevisionException ire) {
+            // expected
+        }
+
+        // release claim should fail because we are passing the wrong revision 
for component 2
+        assertFalse(revisionManager.releaseClaim(new 
StandardRevisionClaim(component1V2, component2V2)));
+
+        // release claim should succeed because we are now using the proper 
revisions
+        assertTrue(revisionManager.releaseClaim(new 
StandardRevisionClaim(component1V2, component2V1)));
+
+        // verify that we can update again.
+        final RevisionClaim thirdClaim = 
revisionManager.requestClaim(Arrays.asList(component1V2, component2V1));
+        assertNotNull(thirdClaim);
+        revisionManager.updateRevision(thirdClaim, "unit test", () -> new 
StandardRevisionUpdate<>(null, new FlowModification(component1V3, "unit 
test")));
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-settings.js
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-settings.js
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-settings.js
index 9463e3e..e014445 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-settings.js
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-settings.js
@@ -317,7 +317,7 @@ nf.Settings = (function () {
         // add the new controller service
         var addService = $.ajax({
             type: 'POST',
-            url: config.urls.api + '/process-groups/' + 
encodeURIComponent(nf.Canvas.getGroupId) + '/controller-services/' + 
encodeURIComponent(availability),
+            url: config.urls.api + '/process-groups/' + 
encodeURIComponent(nf.Canvas.getGroupId()) + '/controller-services/' + 
encodeURIComponent(availability),
             data: JSON.stringify(controllerServiceEntity),
             dataType: 'json',
             contentType: 'application/json'
@@ -861,7 +861,7 @@ nf.Settings = (function () {
         // get the controller services that are running on the nodes
         var nodeControllerServices = $.ajax({
             type: 'GET',
-            url: config.urls.api + '/process-groups/' + 
encodeURIComponent(nf.Canvas.getGroupId) + '/controller-services/' + 
encodeURIComponent(config.node),
+            url: config.urls.api + '/process-groups/' + 
encodeURIComponent(nf.Canvas.getGroupId()) + '/controller-services/' + 
encodeURIComponent(config.node),
             dataType: 'json'
         }).done(function (response) {
             var nodeServices = response.controllerServices;
@@ -879,7 +879,7 @@ nf.Settings = (function () {
             if (nf.Canvas.isClustered()) {
                 $.ajax({
                     type: 'GET',
-                    url: config.urls.api + '/process-groups/' + 
encodeURIComponent(nf.Canvas.getGroupId) + '/controller-services/' + 
encodeURIComponent(config.ncm),
+                    url: config.urls.api + '/process-groups/' + 
encodeURIComponent(nf.Canvas.getGroupId()) + '/controller-services/' + 
encodeURIComponent(config.ncm),
                     dataType: 'json'
                 }).done(function (response) {
                     var ncmServices = response.controllerServices;

Reply via email to