NIFI-4707: Improved S2SProvenanceReportingTask

- Simplified consumeEvents method signature
- Refactored ComponentMapHolder methods visibility
- Renamed componentMap to componentNameMap
- Map more metadata from ConnectionStatus for Remote Input/Output Ports
- Support Process Group hierachy filtering
- Throw an exception when the reporting task fails to send provenance
data to keep current provenance event index so that events can be
consumed again


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d65e6b25
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d65e6b25
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d65e6b25

Branch: refs/heads/master
Commit: d65e6b25630fa918ede2cd6922dc777e816679c3
Parents: 1f79392
Author: Koji Kawamura <[email protected]>
Authored: Wed Dec 20 15:51:58 2017 +0900
Committer: Matthew Burgess <[email protected]>
Committed: Tue Jan 2 14:46:42 2018 -0500

----------------------------------------------------------------------
 .../atlas/reporting/ReportLineageToAtlas.java   |  2 +-
 .../util/provenance/ComponentMapHolder.java     | 75 +++++++++++++-------
 .../provenance/ProvenanceEventConsumer.java     | 13 ++--
 .../SiteToSiteProvenanceReportingTask.java      | 10 +--
 4 files changed, 66 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/d65e6b25/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
index f722b9d..5bb6024 100644
--- 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
@@ -640,7 +640,7 @@ public class ReportLineageToAtlas extends 
AbstractReportingTask {
         final AnalysisContext analysisContext = new 
StandardAnalysisContext(nifiFlow, clusterResolvers,
                 // FIXME: This class cast shouldn't be necessary to query 
lineage. Possible refactor target in next major update.
                 (ProvenanceRepository)eventAccess.getProvenanceRepository());
-        consumer.consumeEvents(context, context.getStateManager(), 
(componentMapHolder, events) -> {
+        consumer.consumeEvents(context, (componentMapHolder, events) -> {
             for (ProvenanceEventRecord event : events) {
                 try {
                     lineageStrategy.processEvent(analysisContext, nifiFlow, 
event);

http://git-wip-us.apache.org/repos/asf/nifi/blob/d65e6b25/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ComponentMapHolder.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ComponentMapHolder.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ComponentMapHolder.java
index 495968a..342b5a2 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ComponentMapHolder.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ComponentMapHolder.java
@@ -24,68 +24,95 @@ import 
org.apache.nifi.controller.status.RemoteProcessGroupStatus;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Stack;
 
 public class ComponentMapHolder {
-    final Map<String,String> componentMap = new HashMap<>();
-    final Map<String,String> componentToParentGroupMap = new HashMap<>();
-
-    public ComponentMapHolder putAll(ComponentMapHolder holder) {
-        this.componentMap.putAll(holder.getComponentMap());
-        
this.componentToParentGroupMap.putAll(holder.getComponentToParentGroupMap());
+    private static final String REMOTE_INPUT_PORT = "Remote Input Port";
+    private static final String REMOTE_OUTPUT_PORT = "Remote Output Port";
+    private final Map<String,String> componentNameMap = new HashMap<>();
+    private final Map<String,String> componentToParentGroupMap = new 
HashMap<>();
+    private final Map<String,String> sourceToConnectionParentGroupMap = new 
HashMap<>();
+    private final Map<String,String> destinationToConnectionParentGroupMap = 
new HashMap<>();
+
+    private ComponentMapHolder putAll(ComponentMapHolder holder) {
+        this.componentNameMap.putAll(holder.componentNameMap);
+        
this.componentToParentGroupMap.putAll(holder.componentToParentGroupMap);
+        
this.sourceToConnectionParentGroupMap.putAll(holder.sourceToConnectionParentGroupMap);
+        
this.destinationToConnectionParentGroupMap.putAll(holder.destinationToConnectionParentGroupMap);
         return this;
     }
 
-    public Map<String, String> getComponentMap() {
-        return componentMap;
-    }
-
-    public Map<String, String> getComponentToParentGroupMap() {
-        return componentToParentGroupMap;
+    public String getComponentName(final String componentId) {
+        return componentNameMap.get(componentId);
     }
 
-    public String getComponentName(final String componentId) {
-        return componentMap.get(componentId);
+    public Stack<String> getProcessGroupIdStack(final String 
startingProcessGroupId) {
+        final Stack<String> stack = new Stack<>();
+        String processGroupId = startingProcessGroupId;
+        stack.push(startingProcessGroupId);
+        while (componentToParentGroupMap.containsKey(processGroupId)) {
+            final String parentGroupId = 
componentToParentGroupMap.get(processGroupId);
+            if (parentGroupId == null || parentGroupId.isEmpty()) {
+                break;
+            }
+            stack.push(parentGroupId);
+            processGroupId = parentGroupId;
+        }
+        return stack;
     }
 
-    public String getProcessGroupId(final String componentId) {
+    public String getProcessGroupId(final String componentId, final String 
componentType) {
+        // Where a Remote Input/Output Port resides is only available at 
ConnectionStatus.
+        if (REMOTE_INPUT_PORT.equals(componentType)) {
+            return destinationToConnectionParentGroupMap.get(componentId);
+        } else if (REMOTE_OUTPUT_PORT.equals(componentType)) {
+            return sourceToConnectionParentGroupMap.get(componentId);
+        }
         return componentToParentGroupMap.get(componentId);
     }
 
     public static ComponentMapHolder createComponentMap(final 
ProcessGroupStatus status) {
         final ComponentMapHolder holder = new ComponentMapHolder();
-        final Map<String,String> componentMap = holder.getComponentMap();
-        final Map<String,String> componentToParentGroupMap = 
holder.getComponentToParentGroupMap();
+        final Map<String,String> componentNameMap = holder.componentNameMap;
+        final Map<String,String> componentToParentGroupMap = 
holder.componentToParentGroupMap;
+        final Map<String,String> sourceToConnectionParentGroupMap = 
holder.sourceToConnectionParentGroupMap;
+        final Map<String,String> destinationToConnectionParentGroupMap = 
holder.destinationToConnectionParentGroupMap;
 
         if (status != null) {
-            componentMap.put(status.getId(), status.getName());
+            componentNameMap.put(status.getId(), status.getName());
 
             for (final ProcessorStatus procStatus : 
status.getProcessorStatus()) {
-                componentMap.put(procStatus.getId(), procStatus.getName());
+                componentNameMap.put(procStatus.getId(), procStatus.getName());
                 componentToParentGroupMap.put(procStatus.getId(), 
status.getId());
             }
 
             for (final PortStatus portStatus : status.getInputPortStatus()) {
-                componentMap.put(portStatus.getId(), portStatus.getName());
+                componentNameMap.put(portStatus.getId(), portStatus.getName());
                 componentToParentGroupMap.put(portStatus.getId(), 
status.getId());
             }
 
             for (final PortStatus portStatus : status.getOutputPortStatus()) {
-                componentMap.put(portStatus.getId(), portStatus.getName());
+                componentNameMap.put(portStatus.getId(), portStatus.getName());
                 componentToParentGroupMap.put(portStatus.getId(), 
status.getId());
             }
 
             for (final RemoteProcessGroupStatus rpgStatus : 
status.getRemoteProcessGroupStatus()) {
-                componentMap.put(rpgStatus.getId(), rpgStatus.getName());
+                componentNameMap.put(rpgStatus.getId(), rpgStatus.getName());
                 componentToParentGroupMap.put(rpgStatus.getId(), 
status.getId());
             }
 
             for (final ConnectionStatus connectionStatus : 
status.getConnectionStatus()) {
-                componentMap.put(connectionStatus.getId(), 
connectionStatus.getName());
+                componentNameMap.put(connectionStatus.getId(), 
connectionStatus.getName());
                 componentToParentGroupMap.put(connectionStatus.getId(), 
status.getId());
+                // Add source and destination for Remote Input/Output Ports 
because metadata for those are only available at ConnectionStatus.
+                
componentNameMap.computeIfAbsent(connectionStatus.getSourceId(), k -> 
connectionStatus.getSourceName());
+                
componentNameMap.computeIfAbsent(connectionStatus.getDestinationId(), k -> 
connectionStatus.getDestinationName());
+                
sourceToConnectionParentGroupMap.put(connectionStatus.getSourceId(), 
connectionStatus.getGroupId());
+                
destinationToConnectionParentGroupMap.put(connectionStatus.getDestinationId(), 
connectionStatus.getGroupId());
             }
 
             for (final ProcessGroupStatus childGroup : 
status.getProcessGroupStatus()) {
-                componentMap.put(childGroup.getId(), childGroup.getName());
+                componentNameMap.put(childGroup.getId(), childGroup.getName());
                 componentToParentGroupMap.put(childGroup.getId(), 
status.getId());
                 holder.putAll(createComponentMap(childGroup));
             }

http://git-wip-us.apache.org/repos/asf/nifi/blob/d65e6b25/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java
index 8256626..75c1e60 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java
@@ -113,7 +113,7 @@ public class ProvenanceEventConsumer {
         this.logger = logger;
     }
 
-    public void consumeEvents(final ReportingContext context, final 
StateManager stateManager,
+    public void consumeEvents(final ReportingContext context,
                               final BiConsumer<ComponentMapHolder, 
List<ProvenanceEventRecord>> consumer) throws ProcessException {
 
         if (context == null) {
@@ -123,6 +123,7 @@ public class ProvenanceEventConsumer {
         final EventAccess eventAccess = context.getEventAccess();
         final ProcessGroupStatus procGroupStatus = 
eventAccess.getControllerStatus();
         final ComponentMapHolder componentMapHolder = 
ComponentMapHolder.createComponentMap(procGroupStatus);
+        final StateManager stateManager = context.getStateManager();
 
         Long currMaxId = eventAccess.getProvenanceRepository().getMaxEventId();
 
@@ -234,12 +235,16 @@ public class ProvenanceEventConsumer {
 
             for (ProvenanceEventRecord provenanceEventRecord : 
provenanceEvents) {
                 if(!componentIds.isEmpty() && 
!componentIds.contains(provenanceEventRecord.getComponentId())) {
-                    // If we aren't filtering it out based on component ID, 
let's see if this component has a parent process group ID
+                    // If we aren't filtering it out based on component ID, 
let's see if this component has a parent process group IDs
                     // that is being filtered on
-                    if (componentMapHolder == null || 
componentMapHolder.getComponentToParentGroupMap().isEmpty()) {
+                    if (componentMapHolder == null) {
                         continue;
                     }
-                    if 
(!componentIds.contains(componentMapHolder.getComponentToParentGroupMap().get(provenanceEventRecord.getComponentId())))
 {
+                    final String processGroupId = 
componentMapHolder.getProcessGroupId(provenanceEventRecord.getComponentId(), 
provenanceEventRecord.getComponentType());
+                    if (processGroupId == null || processGroupId.isEmpty()) {
+                        continue;
+                    }
+                    if 
(componentMapHolder.getProcessGroupIdStack(processGroupId).stream().noneMatch(pgid
 -> componentIds.contains(pgid))) {
                         continue;
                     }
                 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/d65e6b25/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
index c99e9d8..61c8bc4 100644
--- 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
+++ 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
@@ -203,14 +203,14 @@ public class SiteToSiteProvenanceReportingTask extends 
AbstractSiteToSiteReporti
         final DateFormat df = new SimpleDateFormat(TIMESTAMP_FORMAT);
         df.setTimeZone(TimeZone.getTimeZone("Z"));
 
-        consumer.consumeEvents(context, context.getStateManager(), (mapHolder, 
events) -> {
+        consumer.consumeEvents(context, (mapHolder, events) -> {
             final long start = System.nanoTime();
             // Create a JSON array of all the events in the current batch
             final JsonArrayBuilder arrayBuilder = factory.createArrayBuilder();
             for (final ProvenanceEventRecord event : events) {
                 final String componentName = 
mapHolder.getComponentName(event.getComponentId());
-                final String processGroupId = 
mapHolder.getProcessGroupId(event.getComponentId());
-                final String processGroupName = 
mapHolder.getComponentMap().get(processGroupId);
+                final String processGroupId = 
mapHolder.getProcessGroupId(event.getComponentId(), event.getComponentType());
+                final String processGroupName = 
mapHolder.getComponentName(processGroupId);
                 arrayBuilder.add(serialize(factory, builder, event, df, 
componentName, processGroupId, processGroupName, hostname, url, rootGroupName, 
platform, nodeId));
             }
             final JsonArray jsonArray = arrayBuilder.build();
@@ -219,8 +219,8 @@ public class SiteToSiteProvenanceReportingTask extends 
AbstractSiteToSiteReporti
             try {
                 final Transaction transaction = 
getClient().createTransaction(TransferDirection.SEND);
                 if (transaction == null) {
-                    getLogger().debug("All destination nodes are penalized; 
will attempt to send data later");
-                    return;
+                    // Throw an exception to avoid provenance event id will 
not proceed so that those can be consumed again.
+                    throw new ProcessException("All destination nodes are 
penalized; will attempt to send data later");
                 }
 
                 final Map<String, String> attributes = new HashMap<>();

Reply via email to