Repository: nifi
Updated Branches:
  refs/heads/master 54549891e -> bc5237593


NIFI-2291: Correct the Content URI for 1.0.0 REST API; added cluster node 
identifier & whether or not clustered to ReportingContext so that the Reporting 
Task could make use of it

Signed-off-by: Yolanda M. Davis <[email protected]>

This closes #752


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/bc523759
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/bc523759
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/bc523759

Branch: refs/heads/master
Commit: bc5237593e438405c33713602e78cf009f6614e9
Parents: 5454989
Author: Mark Payne <[email protected]>
Authored: Fri Jul 29 21:17:47 2016 -0400
Committer: Yolanda M. Davis <[email protected]>
Committed: Wed Aug 3 10:35:11 2016 -0400

----------------------------------------------------------------------
 .../apache/nifi/reporting/ReportingContext.java | 12 ++++++++++
 .../apache/nifi/util/MockReportingContext.java  | 10 ++++++++
 .../reporting/StandardReportingContext.java     | 11 +++++++++
 .../AbstractSiteToSiteReportingTask.java        | 11 +++++----
 .../SiteToSiteProvenanceReportingTask.java      | 25 +++++++++++++++-----
 5 files changed, 59 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/bc523759/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingContext.java 
b/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingContext.java
index cb131e2..01a49e8 100644
--- a/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingContext.java
+++ b/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingContext.java
@@ -92,4 +92,16 @@ public interface ReportingContext {
      * @return the StateManager that can be used to store and retrieve state 
for this component
      */
     StateManager getStateManager();
+
+    /**
+     * @return <code>true</code> if this instance of NiFi is configured to be 
part of a cluster, <code>false</code>
+     *         if this instance of NiFi is a standalone instance
+     */
+    boolean isClustered();
+
+    /**
+     * @return the ID of this node in the cluster, or <code>null</code> if 
either this node is not clustered or the Node Identifier
+     *         has not yet been established
+     */
+    String getClusterNodeIdentifier();
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/bc523759/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java
index da43c62..26ad590 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java
@@ -123,4 +123,14 @@ public class MockReportingContext extends 
MockControllerServiceLookup implements
     public StateManager getStateManager() {
         return stateManager;
     }
+
+    @Override
+    public boolean isClustered() {
+        return false;
+    }
+
+    @Override
+    public String getClusterNodeIdentifier() {
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/bc523759/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
index 205a690..ef84432 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
@@ -24,6 +24,7 @@ import java.util.Set;
 import org.apache.nifi.attribute.expression.language.PreparedQuery;
 import org.apache.nifi.attribute.expression.language.Query;
 import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.components.state.StateManager;
@@ -152,4 +153,14 @@ public class StandardReportingContext implements 
ReportingContext, ControllerSer
         return 
flowController.getStateManagerProvider().getStateManager(reportingTask.getIdentifier());
     }
 
+    @Override
+    public boolean isClustered() {
+        return flowController.isConfiguredForClustering();
+    }
+
+    @Override
+    public String getClusterNodeIdentifier() {
+        final NodeIdentifier nodeId = flowController.getNodeId();
+        return nodeId == null ? null : nodeId.getId();
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/bc523759/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java
 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java
index b1b3410..edf40ce 100644
--- 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java
+++ 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java
@@ -24,6 +24,7 @@ import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.Validator;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.remote.client.SiteToSiteClient;
 import org.apache.nifi.ssl.SSLContextService;
@@ -39,6 +40,7 @@ import java.util.concurrent.TimeUnit;
  * Base class for ReportingTasks that send data over site-to-site.
  */
 public abstract class AbstractSiteToSiteReportingTask extends 
AbstractReportingTask {
+    protected static final String DESTINATION_URL_PATH = "/nifi";
 
     static final PropertyDescriptor DESTINATION_URL = new 
PropertyDescriptor.Builder()
             .name("Destination URL")
@@ -110,15 +112,16 @@ public abstract class AbstractSiteToSiteReportingTask 
extends AbstractReportingT
     public void setup(final ConfigurationContext context) throws IOException {
         final SSLContextService sslContextService = 
context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class);
         final SSLContext sslContext = sslContextService == null ? null : 
sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED);
+        final ComponentLog logger = getLogger();
         final EventReporter eventReporter = new EventReporter() {
             @Override
             public void reportEvent(final Severity severity, final String 
category, final String message) {
                 switch (severity) {
                     case WARNING:
-                        getLogger().warn(message);
+                        logger.warn(message);
                         break;
                     case ERROR:
-                        getLogger().error(message);
+                        logger.error(message);
                         break;
                     default:
                         break;
@@ -168,12 +171,12 @@ public abstract class AbstractSiteToSiteReportingTask 
extends AbstractReportingT
                         .build();
             }
 
-            if (url != null && !url.getPath().endsWith("/nifi")) {
+            if (url != null && !url.getPath().equals(DESTINATION_URL_PATH)) {
                 return new ValidationResult.Builder()
                         .input(input)
                         .subject(subject)
                         .valid(false)
-                        .explanation("URL path must be /nifi")
+                        .explanation("URL path must be " + 
DESTINATION_URL_PATH)
                         .build();
             }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/bc523759/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
index 8c2bd33..ef10a21 100644
--- 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
+++ 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
@@ -113,6 +113,14 @@ public class SiteToSiteProvenanceReportingTask extends 
AbstractSiteToSiteReporti
 
     @Override
     public void onTrigger(final ReportingContext context) {
+        final boolean isClustered = context.isClustered();
+        final String nodeId = context.getClusterNodeIdentifier();
+        if (nodeId == null && isClustered) {
+            getLogger().debug("This instance of NiFi is configured for 
clustering, but the Cluster Node Identifier is not yet available. "
+                + "Will wait for Node Identifier to be established.");
+            return;
+        }
+
         final ProcessGroupStatus procGroupStatus = 
context.getEventAccess().getControllerStatus();
         final String rootGroupName = procGroupStatus == null ? null : 
procGroupStatus.getName();
         final Map<String,String> componentMap = 
createComponentMap(procGroupStatus);
@@ -187,7 +195,7 @@ public class SiteToSiteProvenanceReportingTask extends 
AbstractSiteToSiteReporti
             final JsonArrayBuilder arrayBuilder = factory.createArrayBuilder();
             for (final ProvenanceEventRecord event : events) {
                 final String componentName = 
componentMap.get(event.getComponentId());
-                arrayBuilder.add(serialize(factory, builder, event, df, 
componentName, hostname, url, rootGroupName, platform));
+                arrayBuilder.add(serialize(factory, builder, event, df, 
componentName, hostname, url, rootGroupName, platform, nodeId));
             }
             final JsonArray jsonArray = arrayBuilder.build();
 
@@ -242,7 +250,7 @@ public class SiteToSiteProvenanceReportingTask extends 
AbstractSiteToSiteReporti
     }
 
     static JsonObject serialize(final JsonBuilderFactory factory, final 
JsonObjectBuilder builder, final ProvenanceEventRecord event, final DateFormat 
df,
-        final String componentName, final String hostname, final URL nifiUrl, 
final String applicationName, final String platform) {
+        final String componentName, final String hostname, final URL nifiUrl, 
final String applicationName, final String platform, final String 
nodeIdentifier) {
         addField(builder, "eventId", UUID.randomUUID().toString());
         addField(builder, "eventOrdinal", event.getEventId());
         addField(builder, "eventType", event.getEventType().name());
@@ -263,10 +271,15 @@ public class SiteToSiteProvenanceReportingTask extends 
AbstractSiteToSiteReporti
 
         addField(builder, "actorHostname", hostname);
         if (nifiUrl != null) {
-            final String urlPrefix = 
nifiUrl.toString().replace(nifiUrl.getPath(), "");
-            final String contentUriBase = urlPrefix + 
"/nifi-api/controller/provenance/events/" + event.getEventId() + "/content/";
-            addField(builder, "contentURI", contentUriBase + "output");
-            addField(builder, "previousContentURI", contentUriBase + "input");
+            // TO get URL Prefix, we just remove the /nifi from the end of the 
URL. We know that the URL ends with
+            // "/nifi" because the Property Validator enforces it
+            final String urlString = nifiUrl.toString();
+            final String urlPrefix = urlString.substring(0, urlString.length() 
- DESTINATION_URL_PATH.length());
+
+            final String contentUriBase = urlPrefix + 
"/nifi-api/provenance-events/" + event.getEventId() + "/content/";
+            final String nodeIdSuffix = nodeIdentifier == null ? "" : 
"?clusterNodeId=" + nodeIdentifier;
+            addField(builder, "contentURI", contentUriBase + "output" + 
nodeIdSuffix);
+            addField(builder, "previousContentURI", contentUriBase + "input" + 
nodeIdSuffix);
         }
 
         addField(builder, factory, "parentIds", event.getParentUuids());

Reply via email to