NIFI-2824: - Updating replication logic to account for the potential replication target and then invoking the corresponding action.
Signed-off-by: Yolanda M. Davis <ymda...@apache.org> This closes #1068 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/1ba7f830 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/1ba7f830 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/1ba7f830 Branch: refs/heads/support/nifi-1.0.x Commit: 1ba7f8302c2d6573ff9f43acd49e644213c480f6 Parents: 3feb59d Author: Matt Gilman <matt.c.gil...@gmail.com> Authored: Mon Sep 26 16:03:07 2016 -0400 Committer: jpercivall <jperciv...@apache.org> Committed: Wed Dec 14 16:20:44 2016 -0500 ---------------------------------------------------------------------- .../web/StandardNiFiWebConfigurationContext.java | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/1ba7f830/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java index 021f216..fb38ce9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java @@ -49,6 +49,7 @@ import org.apache.nifi.controller.reporting.ReportingTaskProvider; import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.web.api.ApplicationResource.ReplicationTarget; import org.apache.nifi.web.api.dto.AllowableValueDTO; import org.apache.nifi.web.api.dto.ControllerServiceDTO; import org.apache.nifi.web.api.dto.ProcessorConfigDTO; @@ -72,14 +73,12 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.URLEncoder; import java.util.Collection; -import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Set; /** * Implements the NiFiWebConfigurationContext interface to support a context in both standalone and clustered environments. @@ -292,17 +291,25 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration return componentFacade.updateComponent(requestContext, annotationData, properties); } + private ReplicationTarget getReplicationTarget() { + return clusterCoordinator.isActiveClusterCoordinator() ? ReplicationTarget.CLUSTER_NODES : ReplicationTarget.CLUSTER_COORDINATOR; + } + private NodeResponse replicate(final String method, final URI uri, final Object entity, final Map<String, String> headers) throws InterruptedException { final NodeIdentifier coordinatorNode = clusterCoordinator.getElectedActiveCoordinatorNode(); if (coordinatorNode == null) { throw new NoClusterCoordinatorException(); } - final Set<NodeIdentifier> coordinatorNodes = Collections.singleton(coordinatorNode); - return requestReplicator.replicate(coordinatorNodes, method, uri, entity, headers, false, true).awaitMergedResponse(); + // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly + // to the cluster nodes themselves. + if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { + return requestReplicator.replicate(method, uri, entity, headers).awaitMergedResponse(); + } else { + return requestReplicator.forwardToCoordinator(coordinatorNode, method, uri, entity, headers).awaitMergedResponse(); + } } - /** * Facade over accessing different types of NiFi components. */