Repository: nifi Updated Branches: refs/heads/master 54549891e -> bc5237593
NIFI-2291: Correct the Content URI for 1.0.0 REST API; added cluster node identifier & whether or not clustered to ReportingContext so that the Reporting Task could make use of it Signed-off-by: Yolanda M. Davis <[email protected]> This closes #752 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/bc523759 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/bc523759 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/bc523759 Branch: refs/heads/master Commit: bc5237593e438405c33713602e78cf009f6614e9 Parents: 5454989 Author: Mark Payne <[email protected]> Authored: Fri Jul 29 21:17:47 2016 -0400 Committer: Yolanda M. Davis <[email protected]> Committed: Wed Aug 3 10:35:11 2016 -0400 ---------------------------------------------------------------------- .../apache/nifi/reporting/ReportingContext.java | 12 ++++++++++ .../apache/nifi/util/MockReportingContext.java | 10 ++++++++ .../reporting/StandardReportingContext.java | 11 +++++++++ .../AbstractSiteToSiteReportingTask.java | 11 +++++---- .../SiteToSiteProvenanceReportingTask.java | 25 +++++++++++++++----- 5 files changed, 59 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/bc523759/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingContext.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingContext.java b/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingContext.java index cb131e2..01a49e8 100644 --- a/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingContext.java +++ b/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingContext.java @@ -92,4 +92,16 @@ public interface ReportingContext { * @return the StateManager that can be used to store and retrieve state for this component */ StateManager getStateManager(); + + /** + * @return <code>true</code> if this instance of NiFi is configured to be part of a cluster, <code>false</code> + * if this instance of NiFi is a standalone instance + */ + boolean isClustered(); + + /** + * @return the ID of this node in the cluster, or <code>null</code> if either this node is not clustered or the Node Identifier + * has not yet been established + */ + String getClusterNodeIdentifier(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/bc523759/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java index da43c62..26ad590 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java @@ -123,4 +123,14 @@ public class MockReportingContext extends MockControllerServiceLookup implements public StateManager getStateManager() { return stateManager; } + + @Override + public boolean isClustered() { + return false; + } + + @Override + public String getClusterNodeIdentifier() { + return null; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/bc523759/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java index 205a690..ef84432 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java @@ -24,6 +24,7 @@ import java.util.Set; import org.apache.nifi.attribute.expression.language.PreparedQuery; import org.apache.nifi.attribute.expression.language.Query; import org.apache.nifi.attribute.expression.language.StandardPropertyValue; +import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.state.StateManager; @@ -152,4 +153,14 @@ public class StandardReportingContext implements ReportingContext, ControllerSer return flowController.getStateManagerProvider().getStateManager(reportingTask.getIdentifier()); } + @Override + public boolean isClustered() { + return flowController.isConfiguredForClustering(); + } + + @Override + public String getClusterNodeIdentifier() { + final NodeIdentifier nodeId = flowController.getNodeId(); + return nodeId == null ? null : nodeId.getId(); + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/bc523759/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java index b1b3410..edf40ce 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java @@ -24,6 +24,7 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.events.EventReporter; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.remote.client.SiteToSiteClient; import org.apache.nifi.ssl.SSLContextService; @@ -39,6 +40,7 @@ import java.util.concurrent.TimeUnit; * Base class for ReportingTasks that send data over site-to-site. */ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingTask { + protected static final String DESTINATION_URL_PATH = "/nifi"; static final PropertyDescriptor DESTINATION_URL = new PropertyDescriptor.Builder() .name("Destination URL") @@ -110,15 +112,16 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT public void setup(final ConfigurationContext context) throws IOException { final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class); final SSLContext sslContext = sslContextService == null ? null : sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED); + final ComponentLog logger = getLogger(); final EventReporter eventReporter = new EventReporter() { @Override public void reportEvent(final Severity severity, final String category, final String message) { switch (severity) { case WARNING: - getLogger().warn(message); + logger.warn(message); break; case ERROR: - getLogger().error(message); + logger.error(message); break; default: break; @@ -168,12 +171,12 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT .build(); } - if (url != null && !url.getPath().endsWith("/nifi")) { + if (url != null && !url.getPath().equals(DESTINATION_URL_PATH)) { return new ValidationResult.Builder() .input(input) .subject(subject) .valid(false) - .explanation("URL path must be /nifi") + .explanation("URL path must be " + DESTINATION_URL_PATH) .build(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/bc523759/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java index 8c2bd33..ef10a21 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java @@ -113,6 +113,14 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti @Override public void onTrigger(final ReportingContext context) { + final boolean isClustered = context.isClustered(); + final String nodeId = context.getClusterNodeIdentifier(); + if (nodeId == null && isClustered) { + getLogger().debug("This instance of NiFi is configured for clustering, but the Cluster Node Identifier is not yet available. " + + "Will wait for Node Identifier to be established."); + return; + } + final ProcessGroupStatus procGroupStatus = context.getEventAccess().getControllerStatus(); final String rootGroupName = procGroupStatus == null ? null : procGroupStatus.getName(); final Map<String,String> componentMap = createComponentMap(procGroupStatus); @@ -187,7 +195,7 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti final JsonArrayBuilder arrayBuilder = factory.createArrayBuilder(); for (final ProvenanceEventRecord event : events) { final String componentName = componentMap.get(event.getComponentId()); - arrayBuilder.add(serialize(factory, builder, event, df, componentName, hostname, url, rootGroupName, platform)); + arrayBuilder.add(serialize(factory, builder, event, df, componentName, hostname, url, rootGroupName, platform, nodeId)); } final JsonArray jsonArray = arrayBuilder.build(); @@ -242,7 +250,7 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti } static JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBuilder builder, final ProvenanceEventRecord event, final DateFormat df, - final String componentName, final String hostname, final URL nifiUrl, final String applicationName, final String platform) { + final String componentName, final String hostname, final URL nifiUrl, final String applicationName, final String platform, final String nodeIdentifier) { addField(builder, "eventId", UUID.randomUUID().toString()); addField(builder, "eventOrdinal", event.getEventId()); addField(builder, "eventType", event.getEventType().name()); @@ -263,10 +271,15 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti addField(builder, "actorHostname", hostname); if (nifiUrl != null) { - final String urlPrefix = nifiUrl.toString().replace(nifiUrl.getPath(), ""); - final String contentUriBase = urlPrefix + "/nifi-api/controller/provenance/events/" + event.getEventId() + "/content/"; - addField(builder, "contentURI", contentUriBase + "output"); - addField(builder, "previousContentURI", contentUriBase + "input"); + // TO get URL Prefix, we just remove the /nifi from the end of the URL. We know that the URL ends with + // "/nifi" because the Property Validator enforces it + final String urlString = nifiUrl.toString(); + final String urlPrefix = urlString.substring(0, urlString.length() - DESTINATION_URL_PATH.length()); + + final String contentUriBase = urlPrefix + "/nifi-api/provenance-events/" + event.getEventId() + "/content/"; + final String nodeIdSuffix = nodeIdentifier == null ? "" : "?clusterNodeId=" + nodeIdentifier; + addField(builder, "contentURI", contentUriBase + "output" + nodeIdSuffix); + addField(builder, "previousContentURI", contentUriBase + "input" + nodeIdSuffix); } addField(builder, factory, "parentIds", event.getParentUuids());
