NIFI-4752: Addressed issue with some event types having potentially the wrong 
FlowFile UUID listed (could have child UUID when it's supposed to have parent 
flowfile UUID). In testing fix, also found an issue with Search threads not 
being daemon and Re-Index threads not propertly being shutdown so addressed 
those as well.

This closes #2390.

Signed-off-by: Koji Kawamura <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/658ce505
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/658ce505
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/658ce505

Branch: refs/heads/HDF-3.1-maint
Commit: 658ce505547efbee5ebb9f846077d3925a462719
Parents: 445f496
Author: Mark Payne <[email protected]>
Authored: Tue Jan 9 13:34:41 2018 -0500
Committer: Matt Gilman <[email protected]>
Committed: Tue Jan 16 14:53:59 2018 -0500

----------------------------------------------------------------------
 .../provenance/lucene/SimpleIndexManager.java   |  2 +-
 .../schema/LookupTableEventRecord.java          | 39 ++++++++++++++++----
 .../store/WriteAheadStorePartition.java         |  2 +
 3 files changed, 35 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/658ce505/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java
index 4d6c11d..d59a81d 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java
@@ -53,7 +53,7 @@ public class SimpleIndexManager implements IndexManager {
 
     public SimpleIndexManager(final RepositoryConfiguration repoConfig) {
         this.repoConfig = repoConfig;
-        this.searchExecutor = 
Executors.newFixedThreadPool(repoConfig.getQueryThreadPoolSize(), new 
NamedThreadFactory("Search Lucene Index"));
+        this.searchExecutor = 
Executors.newFixedThreadPool(repoConfig.getQueryThreadPoolSize(), new 
NamedThreadFactory("Search Lucene Index", true));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/658ce505/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventRecord.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventRecord.java
index eccff2a..2fe1676 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventRecord.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventRecord.java
@@ -245,11 +245,14 @@ public class LookupTableEventRecord implements Record {
         final Map<String, String> previousAttributes = 
truncateAttributes((Map<String, String>) 
record.getFieldValue(EventFieldNames.PREVIOUS_ATTRIBUTES), maxAttributeLength);
         final Map<String, String> updatedAttributes = 
truncateAttributes((Map<String, String>) 
record.getFieldValue(EventFieldNames.UPDATED_ATTRIBUTES), maxAttributeLength);
 
+        final List<String> childUuids = (List<String>) 
record.getFieldValue(EventFieldNames.CHILD_UUIDS);
+        final List<String> parentUuids = (List<String>) 
record.getFieldValue(EventFieldNames.PARENT_UUIDS);
+
         final StandardProvenanceEventRecord.Builder builder = new 
StandardProvenanceEventRecord.Builder();
         builder.setAlternateIdentifierUri((String) 
record.getFieldValue(EventFieldNames.ALTERNATE_IDENTIFIER));
-        builder.setChildUuids((List<String>) 
record.getFieldValue(EventFieldNames.CHILD_UUIDS));
+        builder.setChildUuids(childUuids);
         builder.setDetails((String) 
record.getFieldValue(EventFieldNames.EVENT_DETAILS));
-        builder.setParentUuids((List<String>) 
record.getFieldValue(EventFieldNames.PARENT_UUIDS));
+        builder.setParentUuids(parentUuids);
         builder.setPreviousAttributes(previousAttributes);
         builder.setRelationship((String) 
record.getFieldValue(EventFieldNames.RELATIONSHIP));
         builder.setSourceSystemFlowFileIdentifier((String) 
record.getFieldValue(EventFieldNames.SOURCE_SYSTEM_FLOWFILE_IDENTIFIER));
@@ -263,20 +266,42 @@ public class LookupTableEventRecord implements Record {
 
         // Determine the event type
         final Integer eventTypeOrdinal = (Integer) 
record.getFieldValue(EventFieldNames.EVENT_TYPE);
+        ProvenanceEventType eventType;
         if (eventTypeOrdinal == null || eventTypeOrdinal > eventTypes.size() 
|| eventTypeOrdinal < 0) {
-            builder.setEventType(ProvenanceEventType.UNKNOWN);
+            eventType = ProvenanceEventType.UNKNOWN;
         } else {
             try {
-                
builder.setEventType(ProvenanceEventType.valueOf(eventTypes.get(eventTypeOrdinal)));
+                eventType = 
ProvenanceEventType.valueOf(eventTypes.get(eventTypeOrdinal));
             } catch (final Exception e) {
-                builder.setEventType(ProvenanceEventType.UNKNOWN);
+                eventType = ProvenanceEventType.UNKNOWN;
             }
         }
+        builder.setEventType(eventType);
+
+        // Determine appropriate UUID for the event
+        String uuid = null;
+        switch (eventType) {
+            case CLONE:
+            case FORK:
+            case REPLAY:
+                if (parentUuids != null && !parentUuids.isEmpty()) {
+                    uuid = parentUuids.get(0);
+                }
+                break;
+            case JOIN:
+                if (childUuids != null && !childUuids.isEmpty()) {
+                    uuid = childUuids.get(0);
+                }
+                break;
+        }
 
-        String uuid = updatedAttributes == null ? null : 
updatedAttributes.get(CoreAttributes.UUID.key());
         if (uuid == null) {
-            uuid = previousAttributes == null ? null : 
previousAttributes.get(CoreAttributes.UUID.key());
+            uuid = updatedAttributes == null ? null : 
updatedAttributes.get(CoreAttributes.UUID.key());
+            if (uuid == null) {
+                uuid = previousAttributes == null ? null : 
previousAttributes.get(CoreAttributes.UUID.key());
+            }
         }
+
         builder.setFlowFileUUID(uuid);
 
         builder.setEventDuration((Integer) 
record.getFieldValue(EventFieldNames.EVENT_DURATION));

http://git-wip-us.apache.org/repos/asf/nifi/blob/658ce505/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java
index 6c5cc8d..22d2a5f 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java
@@ -632,6 +632,8 @@ public class WriteAheadStorePartition implements 
EventStorePartition {
             logger.error("Failed to re-index Provenance Events for partition " 
+ partitionName, e);
         }
 
+        executor.shutdown();
+
         final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
start);
         final long seconds = millis / 1000L;
         final long millisRemainder = millis % 1000L;

Reply via email to