NIFI-1745: Refactor how revisions are handled at NCM/Distributed to Node. This closes #454
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/4b74e4de Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/4b74e4de Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/4b74e4de Branch: refs/heads/master Commit: 4b74e4de7403b1798b604f21450a8687e45d7978 Parents: afc8c64 Author: Mark Payne <[email protected]> Authored: Tue May 17 11:51:09 2016 -0400 Committer: Matt Gilman <[email protected]> Committed: Fri May 20 14:04:24 2016 -0400 ---------------------------------------------------------------------- nifi-assembly/pom.xml | 2 + .../org/apache/nifi/util/NiFiProperties.java | 4 + .../nifi-framework-cluster-web/.gitignore | 1 - .../nifi-framework-cluster-web/pom.xml | 44 ----- .../nifi/cluster/context/ClusterContext.java | 65 ------- .../cluster/context/ClusterContextImpl.java | 69 ------- .../context/ClusterContextThreadLocal.java | 42 ----- .../nifi-framework-cluster/pom.xml | 4 - .../http/replication/RequestReplicator.java | 14 +- .../ThreadPoolRequestReplicator.java | 68 +++---- .../manager/impl/HttpRequestReplicatorImpl.java | 9 +- .../cluster/manager/impl/WebClusterManager.java | 155 ++-------------- .../spring/WebClusterManagerFactoryBean.java | 10 +- .../resources/nifi-cluster-manager-context.xml | 4 - .../TestThreadPoolRequestReplicator.java | 14 +- .../src/main/resources/conf/nifi.properties | 3 + .../nifi-web/nifi-web-api/pom.xml | 5 - .../java/org/apache/nifi/audit/NiFiAuditor.java | 12 +- .../org/apache/nifi/web/NiFiServiceFacade.java | 13 +- .../nifi/web/StandardNiFiServiceFacade.java | 183 +++++++----------- .../nifi/web/api/ApplicationResource.java | 157 +++++----------- .../apache/nifi/web/api/ControllerResource.java | 74 ++++---- .../nifi/web/api/FlowFileQueueResource.java | 89 ++++----- .../org/apache/nifi/web/api/FlowResource.java | 59 +++--- .../nifi/web/api/ProcessGroupResource.java | 186 +++++++------------ .../apache/nifi/web/api/ProvenanceResource.java | 84 ++++----- .../org/apache/nifi/web/dao/SnippetDAO.java | 3 +- .../org/apache/nifi/web/dao/TemplateDAO.java | 3 +- .../nifi/web/dao/impl/StandardSnippetDAO.java | 4 +- .../nifi/web/dao/impl/StandardTemplateDAO.java | 4 +- .../nifi/web/filter/NodeRequestFilter.java | 114 ------------ .../nifi/web/filter/ThreadLocalFilter.java | 53 ------ .../org/apache/nifi/web/filter/TimerFilter.java | 9 +- .../org/apache/nifi/web/util/SnippetUtils.java | 39 ++-- .../src/main/resources/nifi-web-api-context.xml | 4 +- .../src/main/webapp/WEB-INF/web.xml | 16 -- .../nifi-web-optimistic-locking/pom.xml | 4 - .../web/StandardOptimisticLockingManager.java | 17 +- .../nifi/web/revision/NaiveRevisionManager.java | 169 ++++++++++++----- .../nifi/web/revision/RevisionManager.java | 33 +++- .../web/revision/TestNaiveRevisionManager.java | 147 +++++++++++---- .../nifi-framework/pom.xml | 1 - nifi-nar-bundles/nifi-framework-bundle/pom.xml | 5 - 43 files changed, 685 insertions(+), 1310 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/4b74e4de/nifi-assembly/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index d07dfb1..00d30e9 100644 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -467,6 +467,8 @@ language governing permissions and limitations under the License. --> <nifi.cluster.manager.protocol.threads>10</nifi.cluster.manager.protocol.threads> <nifi.cluster.manager.safemode.duration>0 sec</nifi.cluster.manager.safemode.duration> + <nifi.cluster.request.replication.claim.timeout>1 min</nifi.cluster.request.replication.claim.timeout> + <!-- nifi.properties: zookeeper properties --> <nifi.zookeeper.connect.string></nifi.zookeeper.connect.string> <nifi.zookeeper.connect.timeout>3 secs</nifi.zookeeper.connect.timeout> http://git-wip-us.apache.org/repos/asf/nifi/blob/4b74e4de/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java index 63693bf..1b560fc 100644 --- a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java +++ b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java @@ -186,6 +186,8 @@ public class NiFiProperties extends Properties { public static final String CLUSTER_MANAGER_PROTOCOL_THREADS = "nifi.cluster.manager.protocol.threads"; public static final String CLUSTER_MANAGER_SAFEMODE_DURATION = "nifi.cluster.manager.safemode.duration"; + public static final String REQUEST_REPLICATION_CLAIM_TIMEOUT = "nifi.cluster.request.replication.claim.timeout"; + // kerberos properties public static final String KERBEROS_KRB5_FILE = "nifi.kerberos.krb5.file"; public static final String KERBEROS_SERVICE_PRINCIPAL = "nifi.kerberos.service.principal"; @@ -249,6 +251,8 @@ public class NiFiProperties extends Properties { public static final int DEFAULT_CLUSTER_MANAGER_PROTOCOL_THREADS = 10; public static final String DEFAULT_CLUSTER_MANAGER_SAFEMODE_DURATION = "0 sec"; + public static final String DEFAULT_REQUEST_REPLICATION_CLAIM_TIMEOUT = "1 min"; + // state management defaults public static final String DEFAULT_STATE_MANAGEMENT_CONFIG_FILE = "conf/state-management.xml"; http://git-wip-us.apache.org/repos/asf/nifi/blob/4b74e4de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/.gitignore ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/.gitignore b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/.gitignore deleted file mode 100755 index ea8c4bf..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/.gitignore +++ /dev/null @@ -1 +0,0 @@ -/target http://git-wip-us.apache.org/repos/asf/nifi/blob/4b74e4de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/pom.xml deleted file mode 100644 index e559816..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/pom.xml +++ /dev/null @@ -1,44 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-framework</artifactId> - <version>1.0.0-SNAPSHOT</version> - </parent> - <artifactId>nifi-framework-cluster-web</artifactId> - <packaging>jar</packaging> - <description>The clustering software for communicating with the NiFi web api.</description> - <dependencies> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-api</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-properties</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-administration</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-user-actions</artifactId> - </dependency> - </dependencies> -</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/4b74e4de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContext.java deleted file mode 100644 index 8c3e41b..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContext.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.context; - -import java.io.Serializable; -import java.util.List; -import org.apache.nifi.action.Action; -import org.apache.nifi.web.Revision; - -/** - * Contains contextual information about clustering that may be serialized - * between manager and node when communicating over HTTP. - */ -public interface ClusterContext extends Serializable { - - /** - * Returns a list of auditable actions. The list is modifiable and will - * never be null. - * - * @return a collection of actions - */ - List<Action> getActions(); - - Revision getRevision(); - - void setRevision(Revision revision); - - /** - * @return true if the request was sent by the cluster manager; false - * otherwise - */ - boolean isRequestSentByClusterManager(); - - /** - * Sets the flag to indicate if a request was sent by the cluster manager. - * - * @param flag true if the request was sent by the cluster manager; false - * otherwise - */ - void setRequestSentByClusterManager(boolean flag); - - /** - * Gets an id generation seed. This is used to ensure that nodes are able to - * generate the same id across the cluster. This is usually handled by the - * cluster manager creating the id, however for some actions (snippets, - * templates, etc) this is not possible. - * - * @return generated id seed - */ - String getIdGenerationSeed(); -} http://git-wip-us.apache.org/repos/asf/nifi/blob/4b74e4de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextImpl.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextImpl.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextImpl.java deleted file mode 100644 index 43e7c2d..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextImpl.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.context; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; -import org.apache.nifi.action.Action; -import org.apache.nifi.web.Revision; - -/** - * A basic implementation of the context. - */ -public class ClusterContextImpl implements ClusterContext, Serializable { - - private final List<Action> actions = new ArrayList<>(); - - private Revision revision; - - private boolean requestSentByClusterManager; - - private final String idGenerationSeed = UUID.randomUUID().toString(); - - @Override - public List<Action> getActions() { - return actions; - } - - @Override - public Revision getRevision() { - return revision; - } - - @Override - public void setRevision(Revision revision) { - this.revision = revision; - } - - @Override - public boolean isRequestSentByClusterManager() { - return requestSentByClusterManager; - } - - @Override - public void setRequestSentByClusterManager(boolean requestSentByClusterManager) { - this.requestSentByClusterManager = requestSentByClusterManager; - } - - @Override - public String getIdGenerationSeed() { - return this.idGenerationSeed; - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/4b74e4de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java deleted file mode 100644 index 79900fb..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.context; - -/** - * Manages a cluster context on a threadlocal. - */ -public class ClusterContextThreadLocal { - - private static final ThreadLocal<ClusterContext> contextHolder = new ThreadLocal<>(); - - public static void removeContext() { - contextHolder.remove(); - } - - public static ClusterContext createEmptyContext() { - return new ClusterContextImpl(); - } - - public static ClusterContext getContext() { - return contextHolder.get(); - } - - public static void setContext(final ClusterContext context) { - contextHolder.set(context); - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/4b74e4de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml index e5a1a7d..8a06467 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml @@ -67,10 +67,6 @@ </dependency> <dependency> <groupId>org.apache.nifi</groupId> - <artifactId>nifi-framework-cluster-web</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> <artifactId>nifi-web-utils</artifactId> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/nifi/blob/4b74e4de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java index 6c9f18c..b64c766 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java @@ -24,7 +24,19 @@ import java.util.Set; import org.apache.nifi.cluster.protocol.NodeIdentifier; public interface RequestReplicator { - public static final String REQUEST_TRANSACTION_ID = "X-RequestTransactionId"; + + public static final String REQUEST_TRANSACTION_ID_HEADER = "X-RequestTransactionId"; + public static final String CLUSTER_ID_GENERATION_SEED_HEADER = "X-Cluster-Id-Generation-Seed"; + + /** + * The HTTP header that the requestor specifies to ask a node if they are able to process a given request. The value + * is always 150-NodeContinue. The node will respond with 150 CONTINUE if it is able to + * process the request, 417 EXPECTATION_FAILED otherwise. + */ + public static final String REQUEST_VALIDATION_HTTP_HEADER = "X-Validation-Expects"; + public static final String NODE_CONTINUE = "150-NodeContinue"; + public static final int NODE_CONTINUE_STATUS_CODE = 150; + /** * Starts the instance for replicating requests. Calling this method on an already started instance has no effect. http://git-wip-us.apache.org/repos/asf/nifi/blob/4b74e4de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java index d218af2..dd4d2ce 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java @@ -43,8 +43,6 @@ import javax.ws.rs.HttpMethod; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MultivaluedMap; -import org.apache.nifi.cluster.context.ClusterContext; -import org.apache.nifi.cluster.context.ClusterContextImpl; import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.http.HttpResponseMerger; import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMerger; @@ -62,8 +60,6 @@ import org.apache.nifi.events.EventReporter; import org.apache.nifi.logging.NiFiLog; import org.apache.nifi.reporting.Severity; import org.apache.nifi.util.FormatUtils; -import org.apache.nifi.web.OptimisticLockingManager; -import org.apache.nifi.web.util.WebUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,21 +71,6 @@ import com.sun.jersey.api.client.filter.GZIPContentEncodingFilter; import com.sun.jersey.core.util.MultivaluedMapImpl; public class ThreadPoolRequestReplicator implements RequestReplicator { - /** - * The HTTP header to store a cluster context. An example of what may be stored in the context is a node's - * auditable actions in response to a cluster request. The cluster context is serialized - * using Java's serialization mechanism and hex encoded. - */ - static final String CLUSTER_CONTEXT_HTTP_HEADER = "X-ClusterContext"; - - /** - * The HTTP header that the NCM specifies to ask a node if they are able to process a given request. The value - * is always 150-NodeContinue. The node will respond with 150 CONTINUE if it is able to - * process the request, 417 EXPECTATION_FAILED otherwise. - */ - static final String NCM_EXPECTS_HTTP_HEADER = "X-NcmExpects"; - static final String NODE_CONTINUE = "150-NodeContinue"; - static final int NODE_CONTINUE_STATUS_CODE = 150; private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(ThreadPoolRequestReplicator.class)); private static final int MAX_CONCURRENT_REQUESTS = 100; @@ -102,7 +83,6 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { private final EventReporter eventReporter; private final RequestCompletionCallback callback; private final ClusterCoordinator clusterCoordinator; - private final OptimisticLockingManager lockingManager; private final DataFlowManagementService dfmService; private ExecutorService executorService; @@ -121,8 +101,8 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { * @param eventReporter an EventReporter that can be used to notify users of interesting events. May be null. */ public ThreadPoolRequestReplicator(final int numThreads, final Client client, final ClusterCoordinator clusterCoordinator, - final RequestCompletionCallback callback, final EventReporter eventReporter, final OptimisticLockingManager lockingManager, final DataFlowManagementService dfmService) { - this(numThreads, client, clusterCoordinator, "3 sec", "3 sec", callback, eventReporter, null, lockingManager, dfmService); + final RequestCompletionCallback callback, final EventReporter eventReporter, final DataFlowManagementService dfmService) { + this(numThreads, client, clusterCoordinator, "3 sec", "3 sec", callback, eventReporter, null, dfmService); } /** @@ -138,7 +118,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { */ public ThreadPoolRequestReplicator(final int numThreads, final Client client, final ClusterCoordinator clusterCoordinator, final String connectionTimeout, final String readTimeout, final RequestCompletionCallback callback, final EventReporter eventReporter, - final WebClusterManager clusterManager, final OptimisticLockingManager lockingManager, final DataFlowManagementService dfmService) { + final WebClusterManager clusterManager, final DataFlowManagementService dfmService) { if (numThreads <= 0) { throw new IllegalArgumentException("The number of threads must be greater than zero."); } else if (client == null) { @@ -153,7 +133,6 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { this.responseMerger = new StandardHttpResponseMerger(clusterManager); this.eventReporter = eventReporter; this.callback = callback; - this.lockingManager = lockingManager; this.dfmService = dfmService; client.getProperties().put(ClientConfig.PROPERTY_CONNECT_TIMEOUT, connectionTimeoutMs); @@ -198,7 +177,9 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { @Override public AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers) { - return replicate(nodeIds, method, uri, entity, headers, true, null); + final Map<String, String> headersPlusIdGenerationSeed = new HashMap<>(headers); + headersPlusIdGenerationSeed.put(RequestReplicator.CLUSTER_ID_GENERATION_SEED_HEADER, UUID.randomUUID().toString()); + return replicate(nodeIds, method, uri, entity, headersPlusIdGenerationSeed, true, null); } /** @@ -233,7 +214,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { // Update headers to indicate the current revision so that we can // prevent multiple users changing the flow at the same time final Map<String, String> updatedHeaders = new HashMap<>(headers); - final String requestId = updatedHeaders.computeIfAbsent(REQUEST_TRANSACTION_ID, key -> UUID.randomUUID().toString()); + final String requestId = updatedHeaders.computeIfAbsent(REQUEST_TRANSACTION_ID_HEADER, key -> UUID.randomUUID().toString()); if (performVerification) { verifyState(method, uri.getPath()); @@ -284,29 +265,16 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, updatedHeaders, nodeCompletionCallback); replicateRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, updatedHeaders); - - // TODO: Must handle revisions!! - return response; } - private void setRevision(final Map<String, String> headers) { - final ClusterContext clusterCtx = new ClusterContextImpl(); - clusterCtx.setRequestSentByClusterManager(true); // indicate request is sent from cluster manager - clusterCtx.setRevision(lockingManager.getLastModification().getRevision()); - - // serialize cluster context and add to request header - final String serializedClusterCtx = WebUtils.serializeObjectToHex(clusterCtx); - headers.put(CLUSTER_CONTEXT_HTTP_HEADER, serializedClusterCtx); - } - private void performVerification(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, StandardAsyncClusterResponse clusterResponse) { logger.debug("Verifying that mutable request {} {} can be made", method, uri.getPath()); final Map<String, String> updatedHeaders = new HashMap<>(headers); - updatedHeaders.put(NCM_EXPECTS_HTTP_HEADER, NODE_CONTINUE); + updatedHeaders.put(REQUEST_VALIDATION_HTTP_HEADER, NODE_CONTINUE); final int numNodes = nodeIds.size(); final NodeRequestCompletionCallback completionCallback = new NodeRequestCompletionCallback() { @@ -474,6 +442,12 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { } } + /** + * Removes the AsyncClusterResponse with the given ID from the map and handles any cleanup + * or post-processing related to the request after the client has consumed the response + * + * @param requestId the ID of the request that has been consumed by the client + */ private void onResponseConsumed(final String requestId) { final AsyncClusterResponse response = responseMap.remove(requestId); @@ -482,6 +456,13 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { } } + /** + * When all nodes have completed a request and provided a response (or have timed out), this method will be invoked + * to handle calling the Callback that was provided for the request, if any, and handle any cleanup or post-processing + * related to the request + * + * @param requestId the ID of the request that has completed + */ private void onCompletedResponse(final String requestId) { final AsyncClusterResponse response = responseMap.get(requestId); @@ -543,16 +524,15 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { } - private void replicateRequest(final Set<NodeIdentifier> nodeIds, final String scheme, - final String path, final Function<NodeIdentifier, NodeHttpRequest> callableFactory, final Map<String, String> headers) { + + private void replicateRequest(final Set<NodeIdentifier> nodeIds, final String scheme, final String path, + final Function<NodeIdentifier, NodeHttpRequest> callableFactory, final Map<String, String> headers) { if (nodeIds.isEmpty()) { return; // return quickly for trivial case } // submit the requests to the nodes - final String requestId = UUID.randomUUID().toString(); - headers.put(WebClusterManager.REQUEST_ID_HEADER, requestId); for (final NodeIdentifier nodeId : nodeIds) { final NodeHttpRequest callable = callableFactory.apply(nodeId); executorService.submit(callable); http://git-wip-us.apache.org/repos/asf/nifi/blob/4b74e4de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImpl.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImpl.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImpl.java index e5f171d..8c645a9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImpl.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImpl.java @@ -25,7 +25,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; @@ -268,8 +267,6 @@ public class HttpRequestReplicatorImpl implements HttpRequestReplicator { } // submit the requests to the nodes - final String requestId = UUID.randomUUID().toString(); - headers.put(WebClusterManager.REQUEST_ID_HEADER, requestId); for (final Map.Entry<NodeIdentifier, URI> entry : uriMap.entrySet()) { final NodeIdentifier nodeId = entry.getKey(); final URI nodeUri = entry.getValue(); @@ -339,15 +336,15 @@ public class HttpRequestReplicatorImpl implements HttpRequestReplicator { } final StringBuilder sb = new StringBuilder(); - sb.append("Node Responses for ").append(method).append(" ").append(path).append(" (Request ID ").append(requestId).append("):\n"); + sb.append("Node Responses for ").append(method).append(" ").append(path).append(":\n"); for (final NodeResponse response : result) { sb.append(response).append("\n"); } final long averageNanos = (nanosAdded == 0) ? -1L : nanosSum / nanosAdded; final long averageMillis = (averageNanos < 0) ? averageNanos : TimeUnit.MILLISECONDS.convert(averageNanos, TimeUnit.NANOSECONDS); - logger.debug("For {} {} (Request ID {}), minimum response time = {}, max = {}, average = {} ms", - method, path, requestId, min, max, averageMillis); + logger.debug("For {} {}, minimum response time = {}, max = {}, average = {} ms", + method, path, min, max, averageMillis); logger.debug(sb.toString()); } http://git-wip-us.apache.org/repos/asf/nifi/blob/4b74e4de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java index 59d582b..f7b5745 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java @@ -19,7 +19,6 @@ package org.apache.nifi.cluster.manager.impl; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.Serializable; import java.net.URI; import java.util.ArrayList; import java.util.Collection; @@ -54,13 +53,10 @@ import javax.xml.transform.TransformerFactory; import javax.xml.transform.dom.DOMSource; import javax.xml.transform.stream.StreamResult; -import org.apache.commons.lang3.StringUtils; import org.apache.nifi.admin.service.AuditService; import org.apache.nifi.annotation.lifecycle.OnAdded; import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored; import org.apache.nifi.annotation.lifecycle.OnRemoved; -import org.apache.nifi.cluster.context.ClusterContext; -import org.apache.nifi.cluster.context.ClusterContextImpl; import org.apache.nifi.cluster.coordination.heartbeat.ClusterProtocolHeartbeatMonitor; import org.apache.nifi.cluster.coordination.heartbeat.NodeHeartbeat; import org.apache.nifi.cluster.coordination.http.HttpResponseMerger; @@ -76,7 +72,6 @@ import org.apache.nifi.cluster.firewall.ClusterNodeFirewall; import org.apache.nifi.cluster.flow.ClusterDataFlow; import org.apache.nifi.cluster.flow.DaoException; import org.apache.nifi.cluster.flow.DataFlowManagementService; -import org.apache.nifi.cluster.flow.PersistedFlowState; import org.apache.nifi.cluster.manager.HttpClusterManager; import org.apache.nifi.cluster.manager.NodeResponse; import org.apache.nifi.cluster.manager.exception.ConflictingNodeIdException; @@ -165,11 +160,7 @@ import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.util.DomUtils; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; -import org.apache.nifi.util.ObjectHolder; import org.apache.nifi.util.ReflectionUtils; -import org.apache.nifi.web.OptimisticLockingManager; -import org.apache.nifi.web.Revision; -import org.apache.nifi.web.UpdateRevision; import org.apache.nifi.web.api.dto.BulletinDTO; import org.apache.nifi.web.util.WebUtils; import org.slf4j.Logger; @@ -197,58 +188,21 @@ import com.sun.jersey.api.client.config.DefaultClientConfig; */ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, ControllerServiceProvider, ReportingTaskProvider, RequestCompletionCallback { - public static final String ROOT_GROUP_ID_ALIAS = "root"; public static final String BULLETIN_CATEGORY = "Clustering"; private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(WebClusterManager.class)); - /** - * The HTTP header to store a cluster context. An example of what may be stored in the context is a node's auditable actions in response to a cluster request. The cluster context is serialized - * using Java's serialization mechanism and hex encoded. - */ - public static final String CLUSTER_CONTEXT_HTTP_HEADER = "X-ClusterContext"; - - /** - * HTTP Header that stores a unique ID for each request that is replicated to the nodes. This is used for logging purposes so that request information, such as timing, can be correlated between - * the NCM and the nodes - */ - public static final String REQUEST_ID_HEADER = "X-RequestID"; - - /** - * The HTTP header that the NCM specifies to ask a node if they are able to process a given request. The value is always 150-NodeContinue. The node will respond with 150 CONTINUE if it is able to - * process the request, 417 EXPECTATION_FAILED otherwise. - */ - public static final String NCM_EXPECTS_HTTP_HEADER = "X-NcmExpects"; - public static final int NODE_CONTINUE_STATUS_CODE = 150; - - /** - * The HTTP header that the NCM specifies to indicate that a node should invalidate the specified user group. This is done to ensure that user cache is not stale when an administrator modifies a - * group through the UI. - */ - public static final String CLUSTER_INVALIDATE_USER_GROUP_HEADER = "X-ClusterInvalidateUserGroup"; - - /** - * The HTTP header that the NCM specifies to indicate that a node should invalidate the specified user. This is done to ensure that user cache is not stale when an administrator modifies a user - * through the UI. - */ - public static final String CLUSTER_INVALIDATE_USER_HEADER = "X-ClusterInvalidateUser"; /** * The default number of seconds to respond to a connecting node if the manager cannot provide it with a current data flow. */ private static final int DEFAULT_CONNECTION_REQUEST_TRY_AGAIN_SECONDS = 5; - - public static final Pattern CLUSTER_PROCESSOR_URI_PATTERN = Pattern.compile("/nifi-api/cluster/processors/[a-f0-9\\-]{36}"); - - - public static final Pattern REPORTING_TASK_URI_PATTERN = Pattern.compile("/nifi-api/controller/reporting-tasks/node/[a-f0-9\\-]{36}"); public static final Pattern COUNTER_URI_PATTERN = Pattern.compile("/nifi-api/controller/counters/[a-f0-9\\-]{36}"); private final NiFiProperties properties; private final DataFlowManagementService dataFlowManagementService; private final ClusterManagerProtocolSenderListener senderListener; - private final OptimisticLockingManager optimisticLockingManager; private final StringEncryptor encryptor; private final ReentrantReadWriteLock resourceRWLock = new ReentrantReadWriteLock(true); private final ClusterManagerLock readLock = new ClusterManagerLock(resourceRWLock.readLock(), "Read"); @@ -281,8 +235,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C private final RequestReplicator httpRequestReplicator; public WebClusterManager( - final DataFlowManagementService dataFlowManagementService, final ClusterManagerProtocolSenderListener senderListener, - final NiFiProperties properties, final StringEncryptor encryptor, final OptimisticLockingManager optimisticLockingManager) { + final DataFlowManagementService dataFlowManagementService, final ClusterManagerProtocolSenderListener senderListener, + final NiFiProperties properties, final StringEncryptor encryptor) { if (dataFlowManagementService == null) { throw new IllegalArgumentException("DataFlowManagementService may not be null."); @@ -298,7 +252,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C this.instanceId = UUID.randomUUID().toString(); this.senderListener = senderListener; this.encryptor = encryptor; - this.optimisticLockingManager = optimisticLockingManager; senderListener.addHandler(this); senderListener.setBulletinRepository(bulletinRepository); @@ -356,7 +309,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C final Client jerseyClient = WebUtils.createClient(new DefaultClientConfig(), SslContextFactory.createSslContext(properties)); return new ThreadPoolRequestReplicator(numThreads, jerseyClient, clusterCoordinator, connectionTimeout, readTimeout, this, - eventReporter, this, optimisticLockingManager, dataFlowManagementService); + eventReporter, this, dataFlowManagementService); } private EventReporter createEventReporter() { @@ -1734,85 +1687,21 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C logger.debug("Applying prototype request " + uri + " to nodes."); - // the starting state of the flow (current, stale, unknown) - final PersistedFlowState originalPersistedFlowState = dataFlowManagementService.getPersistedFlowState(); - - // check if this request can change the flow - final boolean mutableRequest = canChangeNodeState(method, uri); - - final ObjectHolder<NodeResponse> holder = new ObjectHolder<>(null); - final UpdateRevision federateRequest = new UpdateRevision() { - @Override - public Revision execute(Revision currentRevision) { - // update headers to contain cluster contextual information to send to the node - final Map<String, String> updatedHeaders = new HashMap<>(headers); - final ClusterContext clusterCtx = new ClusterContextImpl(); - clusterCtx.setRequestSentByClusterManager(true); // indicate request is sent from cluster manager - clusterCtx.setRevision(currentRevision); - - // serialize cluster context and add to request header - final String serializedClusterCtx = WebUtils.serializeObjectToHex(clusterCtx); - updatedHeaders.put(CLUSTER_CONTEXT_HTTP_HEADER, serializedClusterCtx); - - // replicate request - final AsyncClusterResponse clusterResponse = httpRequestReplicator.replicate(nodeIds, method, uri, entity == null ? parameters : entity, updatedHeaders); + // replicate request + final AsyncClusterResponse clusterResponse = httpRequestReplicator.replicate(nodeIds, method, uri, entity == null ? parameters : entity, headers); - final NodeResponse clientResponse; - try { - clientResponse = clusterResponse.awaitMergedResponse(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - logger.warn("Thread was interrupted while waiting for a response from one or more nodes", e); - final Set<NodeIdentifier> noResponses = clusterResponse.getNodesInvolved(); - noResponses.removeAll(clusterResponse.getCompletedNodeIdentifiers()); - throw new IllegalClusterStateException("Interrupted while waiting for a response from the following nodes: " + noResponses, e); - } - - holder.set(clientResponse); - - // if we have a response get the updated cluster context for auditing and revision updating - Revision updatedRevision = null; - if (mutableRequest && clientResponse != null) { - try { - // get the cluster context from the response header - final String serializedClusterContext = clientResponse.getClientResponse().getHeaders().getFirst(CLUSTER_CONTEXT_HTTP_HEADER); - if (StringUtils.isNotBlank(serializedClusterContext)) { - // deserialize object - final Serializable clusterContextObj = WebUtils.deserializeHexToObject(serializedClusterContext); - - // if we have a valid object, audit the actions - if (clusterContextObj instanceof ClusterContext) { - final ClusterContext clusterContext = (ClusterContext) clusterContextObj; - if (auditService != null) { - try { - auditService.addActions(clusterContext.getActions()); - } catch (final Throwable t) { - logger.warn("Unable to record actions: " + t.getMessage()); - if (logger.isDebugEnabled()) { - logger.warn(StringUtils.EMPTY, t); - } - } - } - updatedRevision = clusterContext.getRevision(); - } - } - } catch (final ClassNotFoundException cnfe) { - logger.warn("Classpath issue detected because failed to deserialize cluster context from node response due to: " + cnfe, cnfe); - } - } - - return updatedRevision; - } - }; - - // federate the request and lock on the revision - if (mutableRequest) { - optimisticLockingManager.setRevision(federateRequest); - } else { - federateRequest.execute(optimisticLockingManager.getLastModification().getRevision()); + final NodeResponse clientResponse; + try { + clientResponse = clusterResponse.awaitMergedResponse(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.warn("Thread was interrupted while waiting for a response from one or more nodes", e); + final Set<NodeIdentifier> noResponses = clusterResponse.getNodesInvolved(); + noResponses.removeAll(clusterResponse.getCompletedNodeIdentifiers()); + throw new IllegalClusterStateException("Interrupted while waiting for a response from the following nodes: " + noResponses, e); } - return holder.get(); + return clientResponse; } @@ -2152,20 +2041,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C } } - private void notifyDataFlowManagmentServiceOfFlowStateChange(final PersistedFlowState newState) { - writeLock.lock(); - try { - logger.debug("Notifying DataFlow Management Service that flow state is " + newState); - dataFlowManagementService.setPersistedFlowState(newState); - if (newState != PersistedFlowState.CURRENT) { - cachedDataFlow = null; - /* do not reset primary node ID because only the data flow has changed */ - } - } finally { - writeLock.unlock("notifyDataFlowManagementServiceOfFlowStateChange"); - } - } - public NodeHeartbeat getLatestHeartbeat(final NodeIdentifier nodeId) { return heartbeatMonitor.getLatestHeartbeat(nodeId); } http://git-wip-us.apache.org/repos/asf/nifi/blob/4b74e4de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java index a06bdaf..95263bd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java @@ -27,7 +27,6 @@ import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.io.socket.multicast.DiscoverableService; import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl; import org.apache.nifi.util.NiFiProperties; -import org.apache.nifi.web.OptimisticLockingManager; import org.springframework.beans.BeansException; import org.springframework.beans.factory.FactoryBean; import org.springframework.context.ApplicationContext; @@ -46,8 +45,6 @@ public class WebClusterManagerFactoryBean implements FactoryBean, ApplicationCon private StringEncryptor encryptor; - private OptimisticLockingManager optimisticLockingManager; - @Override public Object getObject() throws Exception { if (properties.isClusterManager() && properties.isNode()) { @@ -67,8 +64,7 @@ public class WebClusterManagerFactoryBean implements FactoryBean, ApplicationCon dataFlowService, senderListener, properties, - encryptor, - optimisticLockingManager + encryptor ); // set the service broadcaster @@ -119,8 +115,4 @@ public class WebClusterManagerFactoryBean implements FactoryBean, ApplicationCon public void setEncryptor(final StringEncryptor encryptor) { this.encryptor = encryptor; } - - public void setOptimisticLockingManager(OptimisticLockingManager optimisticLockingManager) { - this.optimisticLockingManager = optimisticLockingManager; - } } http://git-wip-us.apache.org/repos/asf/nifi/blob/4b74e4de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml index 1de15cf..559462e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml @@ -59,14 +59,10 @@ <property name="properties" ref="nifiProperties"/> </bean> - <!-- cluster manager optimistic locking manager --> - <bean id="clusterManagerOptimisticLockingManager" class="org.apache.nifi.web.StandardOptimisticLockingManager"/> - <!-- cluster manager --> <bean id="clusterManager" class="org.apache.nifi.cluster.spring.WebClusterManagerFactoryBean"> <property name="properties" ref="nifiProperties"/> <property name="encryptor" ref="stringEncryptor"/> - <property name="optimisticLockingManager" ref="clusterManagerOptimisticLockingManager"/> </bean> <!-- discoverable services --> http://git-wip-us.apache.org/repos/asf/nifi/blob/4b74e4de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java index 6d3571b..1cc210e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java @@ -40,7 +40,6 @@ import org.apache.nifi.cluster.manager.NodeResponse; import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.util.NiFiProperties; -import org.apache.nifi.web.StandardOptimisticLockingManager; import org.apache.nifi.web.api.entity.Entity; import org.apache.nifi.web.api.entity.ProcessorEntity; import org.junit.Assert; @@ -156,13 +155,12 @@ public class TestThreadPoolRequestReplicator { final AtomicInteger requestCount = new AtomicInteger(0); final DataFlowManagementService dfmService = Mockito.mock(DataFlowManagementService.class); - final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, new Client(), coordinator, - "1 sec", "1 sec", null, null, null, new StandardOptimisticLockingManager(), dfmService) { + final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, null, dfmService) { @Override protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method, final URI uri, final String requestId) { // the resource builder will not expose its headers to us, so we are using Mockito's Whitebox class to extract them. final OutBoundHeaders headers = (OutBoundHeaders) Whitebox.getInternalState(resourceBuilder, "metadata"); - final Object expectsHeader = headers.getFirst(ThreadPoolRequestReplicator.NCM_EXPECTS_HTTP_HEADER); + final Object expectsHeader = headers.getFirst(ThreadPoolRequestReplicator.REQUEST_VALIDATION_HTTP_HEADER); final int statusCode; if (requestCount.incrementAndGet() == 1) { @@ -208,13 +206,12 @@ public class TestThreadPoolRequestReplicator { final AtomicInteger requestCount = new AtomicInteger(0); final DataFlowManagementService dfmService = Mockito.mock(DataFlowManagementService.class); - final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, new Client(), coordinator, - "1 sec", "1 sec", null, null, null, new StandardOptimisticLockingManager(), dfmService) { + final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, null, dfmService) { @Override protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method, final URI uri, final String requestId) { // the resource builder will not expose its headers to us, so we are using Mockito's Whitebox class to extract them. final OutBoundHeaders headers = (OutBoundHeaders) Whitebox.getInternalState(resourceBuilder, "metadata"); - final Object expectsHeader = headers.getFirst(ThreadPoolRequestReplicator.NCM_EXPECTS_HTTP_HEADER); + final Object expectsHeader = headers.getFirst(ThreadPoolRequestReplicator.REQUEST_VALIDATION_HTTP_HEADER); final int requestIndex = requestCount.incrementAndGet(); assertEquals(ThreadPoolRequestReplicator.NODE_CONTINUE, expectsHeader); @@ -256,8 +253,7 @@ public class TestThreadPoolRequestReplicator { final ClusterCoordinator coordinator = Mockito.mock(ClusterCoordinator.class); final DataFlowManagementService dfmService = Mockito.mock(DataFlowManagementService.class); - final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, new Client(), coordinator, - "1 sec", "1 sec", null, null, null, new StandardOptimisticLockingManager(), dfmService) { + final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, null, dfmService) { @Override protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method, final URI uri, final String requestId) { if (delayMillis > 0L) { http://git-wip-us.apache.org/repos/asf/nifi/blob/4b74e4de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties index f7912a1..c5fd95c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties @@ -170,6 +170,9 @@ nifi.zookeeper.connect.timeout=${nifi.zookeeper.connect.timeout} nifi.zookeeper.session.timeout=${nifi.zookeeper.session.timeout} nifi.zookeeper.root.node=${nifi.zookeeper.root.node} +# How long a request should be allowed to hold a 'lock' on a component. # +nifi.cluster.request.replication.claim.timeout=${nifi.cluster.request.replication.claim.timeout} + # cluster manager properties (only configure for cluster manager) # nifi.cluster.is.manager=${nifi.cluster.is.manager} nifi.cluster.manager.address=${nifi.cluster.manager.address} http://git-wip-us.apache.org/repos/asf/nifi/blob/4b74e4de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/pom.xml index 937f368..a6c17b1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/pom.xml @@ -154,11 +154,6 @@ </dependency> <dependency> <groupId>org.apache.nifi</groupId> - <artifactId>nifi-framework-cluster-web</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> <artifactId>nifi-api</artifactId> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/nifi/blob/4b74e4de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/NiFiAuditor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/NiFiAuditor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/NiFiAuditor.java index 85f0b9f..14c4b64 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/NiFiAuditor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/NiFiAuditor.java @@ -18,16 +18,15 @@ package org.apache.nifi.audit; import java.util.ArrayList; import java.util.Collection; + +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.action.Action; import org.apache.nifi.action.details.FlowChangeMoveDetails; import org.apache.nifi.action.details.MoveDetails; import org.apache.nifi.admin.service.AuditService; -import org.apache.nifi.cluster.context.ClusterContext; -import org.apache.nifi.cluster.context.ClusterContextThreadLocal; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.web.NiFiServiceFacade; import org.apache.nifi.web.dao.ProcessGroupDAO; -import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; /** @@ -58,13 +57,6 @@ public abstract class NiFiAuditor { * @param logger logger */ protected void saveActions(Collection<Action> actions, Logger logger) { - ClusterContext ctx = ClusterContextThreadLocal.getContext(); - - // if we're a connected node, then put audit actions on threadlocal to propagate back to manager - if (ctx != null) { - ctx.getActions().addAll(actions); - } - // always save the actions regardless of cluster or stand-alone // all nodes in a cluster will have their own local copy without batching try { http://git-wip-us.apache.org/repos/asf/nifi/blob/4b74e4de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java index fa78b93..41b9867 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java @@ -81,6 +81,7 @@ import org.apache.nifi.web.api.entity.SnippetEntity; import java.util.Date; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; /** @@ -306,19 +307,21 @@ public interface NiFiServiceFacade { * @param description description * @param snippetId id * @param groupId id of the process group + * @param idGenerationSeed the seed to use for generating a UUID * @return template */ - TemplateDTO createTemplate(String name, String description, String snippetId, String groupId); + TemplateDTO createTemplate(String name, String description, String snippetId, String groupId, Optional<String> idGenerationSeed); /** * Imports the specified Template. * * @param templateDTO The template dto * @param groupId id of the process group + * @param idGenerationSeed the seed to use for generating a UUID * * @return The new template dto */ - TemplateDTO importTemplate(TemplateDTO templateDTO, String groupId); + TemplateDTO importTemplate(TemplateDTO templateDTO, String groupId, Optional<String> idGenerationSeed); /** * Instantiate the corresponding template. @@ -327,9 +330,10 @@ public interface NiFiServiceFacade { * @param templateId template id * @param originX x * @param originY y + * @param idGenerationSeed the ID to use for generating UUID's. May be null. * @return snapshot */ - FlowEntity createTemplateInstance(String groupId, Double originX, Double originY, String templateId); + FlowEntity createTemplateInstance(String groupId, Double originX, Double originY, String templateId, String idGenerationSeed); /** * Gets the template with the specified id. @@ -1302,9 +1306,10 @@ public interface NiFiServiceFacade { * @param snippetId snippet id * @param originX x * @param originY y + * @param idGenerationSeed the seed to use for generating UUID's. May be null. * @return snapshot */ - FlowEntity copySnippet(String groupId, String snippetId, Double originX, Double originY); + FlowEntity copySnippet(String groupId, String snippetId, Double originX, Double originY, String idGenerationSeed); /** * Creates a new snippet. http://git-wip-us.apache.org/repos/asf/nifi/blob/4b74e4de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 2961ab3..0600cb6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -16,6 +16,30 @@ */ package org.apache.nifi.web; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TimeZone; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import javax.ws.rs.WebApplicationException; + import org.apache.commons.lang3.StringUtils; import org.apache.nifi.action.Action; import org.apache.nifi.action.Component; @@ -30,8 +54,6 @@ import org.apache.nifi.authorization.Resource; import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.authorization.user.NiFiUserUtils; -import org.apache.nifi.cluster.context.ClusterContext; -import org.apache.nifi.cluster.context.ClusterContextThreadLocal; import org.apache.nifi.cluster.coordination.heartbeat.NodeHeartbeat; import org.apache.nifi.cluster.manager.exception.UnknownNodeException; import org.apache.nifi.cluster.manager.impl.WebClusterManager; @@ -42,6 +64,7 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Connection; import org.apache.nifi.connectable.Funnel; import org.apache.nifi.connectable.Port; @@ -168,28 +191,6 @@ import org.apache.nifi.web.util.SnippetUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.ws.rs.WebApplicationException; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.ListIterator; -import java.util.Map; -import java.util.Set; -import java.util.TimeZone; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import java.util.function.Supplier; -import java.util.stream.Collectors; - /** * Implementation of NiFiServiceFacade that performs revision checking. */ @@ -200,9 +201,6 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { private ControllerFacade controllerFacade; private SnippetUtils snippetUtils; - // optimistic locking manager -// private OptimisticLockingManager optimisticLockingManager; - // revision manager private RevisionManager revisionManager; @@ -239,7 +237,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // ----------------------------------------- @Override public void claimRevision(Revision revision) { - revisionManager.requestClaim(revision); + revisionManager.requestClaim(revision, NiFiUserUtils.getNiFiUser()); } // ----------------------------------------- @@ -274,22 +272,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public void verifyDeleteConnection(String connectionId) { - try { - connectionDAO.verifyDelete(connectionId); - } catch (final Exception e) { - revisionManager.cancelClaim(connectionId); - throw e; - } + connectionDAO.verifyDelete(connectionId); } @Override public void verifyDeleteFunnel(String funnelId) { - try { - funnelDAO.verifyDelete(funnelId); - } catch (final Exception e) { - revisionManager.cancelClaim(funnelId); - throw e; - } + funnelDAO.verifyDelete(funnelId); } @Override @@ -308,12 +296,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public void verifyDeleteInputPort(String inputPortId) { - try { - inputPortDAO.verifyDelete(inputPortId); - } catch (final Exception e) { - revisionManager.cancelClaim(inputPortId); - throw e; - } + inputPortDAO.verifyDelete(inputPortId); } @Override @@ -332,12 +315,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public void verifyDeleteOutputPort(String outputPortId) { - try { - outputPortDAO.verifyDelete(outputPortId); - } catch (final Exception e) { - revisionManager.cancelClaim(outputPortId); - throw e; - } + outputPortDAO.verifyDelete(outputPortId); } @Override @@ -356,12 +334,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public void verifyDeleteProcessor(String processorId) { - try { - processorDAO.verifyDelete(processorId); - } catch (final Exception e) { - revisionManager.cancelClaim(processorId); - throw e; - } + processorDAO.verifyDelete(processorId); } @Override @@ -380,12 +353,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public void verifyDeleteProcessGroup(String groupId) { - try { - processGroupDAO.verifyDelete(groupId); - } catch (final Exception e) { - revisionManager.cancelClaim(groupId); - throw e; - } + processGroupDAO.verifyDelete(groupId); } @Override @@ -424,12 +392,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public void verifyDeleteRemoteProcessGroup(String remoteProcessGroupId) { - try { - remoteProcessGroupDAO.verifyDelete(remoteProcessGroupId); - } catch (final Exception e) { - revisionManager.cancelClaim(remoteProcessGroupId); - throw e; - } + remoteProcessGroupDAO.verifyDelete(remoteProcessGroupId); } @Override @@ -458,12 +421,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public void verifyDeleteControllerService(String controllerServiceId) { - try { - controllerServiceDAO.verifyDelete(controllerServiceId); - } catch (final Exception e) { - revisionManager.cancelClaim(controllerServiceId); - throw e; - } + controllerServiceDAO.verifyDelete(controllerServiceId); } @Override @@ -482,12 +440,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public void verifyDeleteReportingTask(String reportingTaskId) { - try { - reportingTaskDAO.verifyDelete(reportingTaskId); - } catch (final Exception e) { - revisionManager.cancelClaim(reportingTaskId); - throw e; - } + reportingTaskDAO.verifyDelete(reportingTaskId); } // ----------------------------------------- @@ -581,9 +534,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { * @return A ConfigurationSnapshot that represents the new configuration */ private <D, C> RevisionUpdate<D> updateComponent(final Revision revision, final Authorizable authorizable, final Supplier<C> daoUpdate, final Function<C, D> dtoCreation) { - final String modifier = NiFiUserUtils.getNiFiUserName(); + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + final String modifier = user.getUserName(); try { - final RevisionUpdate<D> updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(revision), modifier, new UpdateRevisionTask<D>() { + final RevisionUpdate<D> updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(revision), user, new UpdateRevisionTask<D>() { @Override public RevisionUpdate<D> update() { // ensure write access to the flow @@ -706,12 +660,13 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { requestProcessGroup.authorize(authorizer, RequestAction.WRITE); } - final String modifier = NiFiUserUtils.getNiFiUserName(); + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + final String modifier = user.getUserName(); final RevisionClaim revisionClaim = new StandardRevisionClaim(requiredRevisions); RevisionUpdate<SnippetDTO> versionedSnippet; try { - versionedSnippet = revisionManager.updateRevision(revisionClaim, modifier, new UpdateRevisionTask<SnippetDTO>() { + versionedSnippet = revisionManager.updateRevision(revisionClaim, user, new UpdateRevisionTask<SnippetDTO>() { @Override public RevisionUpdate<SnippetDTO> update() { // get the updated component @@ -1081,7 +1036,9 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { */ private <D, C> D deleteComponent(final Revision revision, final Authorizable authorizable, final Runnable deleteAction, final D dto) { final RevisionClaim claim = new StandardRevisionClaim(revision); - return revisionManager.deleteRevision(claim, new DeleteRevisionTask<D>() { + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + + return revisionManager.deleteRevision(claim, user, new DeleteRevisionTask<D>() { @Override public D performTask() { logger.debug("Attempting to delete component {} with claim {}", authorizable, claim); @@ -1089,6 +1046,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // ensure access to the component authorizable.authorize(authorizer, RequestAction.WRITE); + // If the component has outgoing connections, ensure that we can delete them all. + if (authorizable instanceof Connectable) { + final Connectable connectable = (Connectable) authorizable; + for (final Connection connection : connectable.getConnections()) { + connection.authorize(authorizer, RequestAction.WRITE); + } + } + deleteAction.run(); // save the flow @@ -1102,12 +1067,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public void verifyDeleteSnippet(String id) { - try { - snippetDAO.verifyDelete(id); - } catch (final Exception e) { - revisionManager.cancelClaim(id); - throw e; - } + snippetDAO.verifyDelete(id); } @Override @@ -1356,7 +1316,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public FlowEntity copySnippet(final String groupId, final String snippetId, final Double originX, final Double originY) { + public FlowEntity copySnippet(final String groupId, final String snippetId, final Double originX, final Double originY, final String idGenerationSeed) { final FlowDTO flowDto = revisionManager.get(groupId, rev -> { // ensure access to process group @@ -1364,7 +1324,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { processGroup.authorize(authorizer, RequestAction.WRITE); // create the new snippet - final FlowSnippetDTO snippet = snippetDAO.copySnippet(groupId, snippetId, originX, originY); + final FlowSnippetDTO snippet = snippetDAO.copySnippet(groupId, snippetId, originX, originY, idGenerationSeed); // TODO - READ access to all components in snippet @@ -1505,7 +1465,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public TemplateDTO createTemplate(String name, String description, String snippetId, String groupId) { + public TemplateDTO createTemplate(String name, String description, String snippetId, String groupId, Optional<String> idGenerationSeed) { // get the specified snippet Snippet snippet = snippetDAO.getSnippet(snippetId); @@ -1517,12 +1477,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { templateDTO.setSnippet(snippetUtils.populateFlowSnippet(snippet, true, true)); // set the id based on the specified seed - final ClusterContext clusterContext = ClusterContextThreadLocal.getContext(); - if (clusterContext != null) { - templateDTO.setId(UUID.nameUUIDFromBytes(clusterContext.getIdGenerationSeed().getBytes(StandardCharsets.UTF_8)).toString()); - } else { - templateDTO.setId(UUID.randomUUID().toString()); - } + final String uuid = idGenerationSeed.isPresent() ? (UUID.nameUUIDFromBytes(idGenerationSeed.get().getBytes(StandardCharsets.UTF_8))).toString() : UUID.randomUUID().toString(); + templateDTO.setId(uuid); // create the template Template template = templateDAO.createTemplate(templateDTO, groupId); @@ -1531,14 +1487,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public TemplateDTO importTemplate(TemplateDTO templateDTO, String groupId) { + public TemplateDTO importTemplate(TemplateDTO templateDTO, String groupId, Optional<String> idGenerationSeed) { // ensure id is set - final ClusterContext clusterContext = ClusterContextThreadLocal.getContext(); - if (clusterContext != null) { - templateDTO.setId(UUID.nameUUIDFromBytes(clusterContext.getIdGenerationSeed().getBytes(StandardCharsets.UTF_8)).toString()); - } else { - templateDTO.setId(UUID.randomUUID().toString()); - } + final String uuid = idGenerationSeed.isPresent() ? (UUID.nameUUIDFromBytes(idGenerationSeed.get().getBytes(StandardCharsets.UTF_8))).toString() : UUID.randomUUID().toString(); + templateDTO.setId(uuid); // mark the timestamp templateDTO.setTimestamp(new Date()); @@ -1551,7 +1503,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public FlowEntity createTemplateInstance(final String groupId, final Double originX, final Double originY, final String templateId) { + public FlowEntity createTemplateInstance(final String groupId, final Double originX, final Double originY, final String templateId, final String idGenerationSeed) { final FlowDTO flowDto = revisionManager.get(groupId, rev -> { // ensure access to process group final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId); @@ -1559,7 +1511,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // instantiate the template - there is no need to make another copy of the flow snippet since the actual template // was copied and this dto is only used to instantiate it's components (which as already completed) - final FlowSnippetDTO snippet = templateDAO.instantiateTemplate(groupId, originX, originY, templateId); + final FlowSnippetDTO snippet = templateDAO.instantiateTemplate(groupId, originX, originY, templateId, idGenerationSeed); // TODO - READ access to all components in snippet @@ -1624,9 +1576,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public ProcessorEntity setProcessorAnnotationData(final Revision revision, final String processorId, final String annotationData) { - final String modifier = NiFiUserUtils.getNiFiUserName(); + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + final String modifier = user.getUserName(); - final RevisionUpdate<ProcessorEntity> update = revisionManager.updateRevision(new StandardRevisionClaim(revision), modifier, new UpdateRevisionTask<ProcessorEntity>() { + final RevisionUpdate<ProcessorEntity> update = revisionManager.updateRevision(new StandardRevisionClaim(revision), user, new UpdateRevisionTask<ProcessorEntity>() { @Override public RevisionUpdate<ProcessorEntity> update() { // create the processor config @@ -1701,9 +1654,9 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final Map<String, Revision> referenceRevisions, final String controllerServiceId, final ScheduledState scheduledState, final ControllerServiceState controllerServiceState) { final RevisionClaim claim = new StandardRevisionClaim(referenceRevisions.values()); - final String modifier = NiFiUserUtils.getNiFiUserName(); - final RevisionUpdate<ControllerServiceReferencingComponentsEntity> update = revisionManager.updateRevision(claim, modifier, + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + final RevisionUpdate<ControllerServiceReferencingComponentsEntity> update = revisionManager.updateRevision(claim, user, new UpdateRevisionTask<ControllerServiceReferencingComponentsEntity>() { @Override public RevisionUpdate<ControllerServiceReferencingComponentsEntity> update() {
