NIFI-4752: Addressed issue with some event types having potentially the wrong FlowFile UUID listed (could have child UUID when it's supposed to have parent flowfile UUID). In testing fix, also found an issue with Search threads not being daemon and Re-Index threads not propertly being shutdown so addressed those as well.
This closes #2390. Signed-off-by: Koji Kawamura <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/658ce505 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/658ce505 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/658ce505 Branch: refs/heads/HDF-3.1-maint Commit: 658ce505547efbee5ebb9f846077d3925a462719 Parents: 445f496 Author: Mark Payne <[email protected]> Authored: Tue Jan 9 13:34:41 2018 -0500 Committer: Matt Gilman <[email protected]> Committed: Tue Jan 16 14:53:59 2018 -0500 ---------------------------------------------------------------------- .../provenance/lucene/SimpleIndexManager.java | 2 +- .../schema/LookupTableEventRecord.java | 39 ++++++++++++++++---- .../store/WriteAheadStorePartition.java | 2 + 3 files changed, 35 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/658ce505/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java index 4d6c11d..d59a81d 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java @@ -53,7 +53,7 @@ public class SimpleIndexManager implements IndexManager { public SimpleIndexManager(final RepositoryConfiguration repoConfig) { this.repoConfig = repoConfig; - this.searchExecutor = Executors.newFixedThreadPool(repoConfig.getQueryThreadPoolSize(), new NamedThreadFactory("Search Lucene Index")); + this.searchExecutor = Executors.newFixedThreadPool(repoConfig.getQueryThreadPoolSize(), new NamedThreadFactory("Search Lucene Index", true)); } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/658ce505/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventRecord.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventRecord.java index eccff2a..2fe1676 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventRecord.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventRecord.java @@ -245,11 +245,14 @@ public class LookupTableEventRecord implements Record { final Map<String, String> previousAttributes = truncateAttributes((Map<String, String>) record.getFieldValue(EventFieldNames.PREVIOUS_ATTRIBUTES), maxAttributeLength); final Map<String, String> updatedAttributes = truncateAttributes((Map<String, String>) record.getFieldValue(EventFieldNames.UPDATED_ATTRIBUTES), maxAttributeLength); + final List<String> childUuids = (List<String>) record.getFieldValue(EventFieldNames.CHILD_UUIDS); + final List<String> parentUuids = (List<String>) record.getFieldValue(EventFieldNames.PARENT_UUIDS); + final StandardProvenanceEventRecord.Builder builder = new StandardProvenanceEventRecord.Builder(); builder.setAlternateIdentifierUri((String) record.getFieldValue(EventFieldNames.ALTERNATE_IDENTIFIER)); - builder.setChildUuids((List<String>) record.getFieldValue(EventFieldNames.CHILD_UUIDS)); + builder.setChildUuids(childUuids); builder.setDetails((String) record.getFieldValue(EventFieldNames.EVENT_DETAILS)); - builder.setParentUuids((List<String>) record.getFieldValue(EventFieldNames.PARENT_UUIDS)); + builder.setParentUuids(parentUuids); builder.setPreviousAttributes(previousAttributes); builder.setRelationship((String) record.getFieldValue(EventFieldNames.RELATIONSHIP)); builder.setSourceSystemFlowFileIdentifier((String) record.getFieldValue(EventFieldNames.SOURCE_SYSTEM_FLOWFILE_IDENTIFIER)); @@ -263,20 +266,42 @@ public class LookupTableEventRecord implements Record { // Determine the event type final Integer eventTypeOrdinal = (Integer) record.getFieldValue(EventFieldNames.EVENT_TYPE); + ProvenanceEventType eventType; if (eventTypeOrdinal == null || eventTypeOrdinal > eventTypes.size() || eventTypeOrdinal < 0) { - builder.setEventType(ProvenanceEventType.UNKNOWN); + eventType = ProvenanceEventType.UNKNOWN; } else { try { - builder.setEventType(ProvenanceEventType.valueOf(eventTypes.get(eventTypeOrdinal))); + eventType = ProvenanceEventType.valueOf(eventTypes.get(eventTypeOrdinal)); } catch (final Exception e) { - builder.setEventType(ProvenanceEventType.UNKNOWN); + eventType = ProvenanceEventType.UNKNOWN; } } + builder.setEventType(eventType); + + // Determine appropriate UUID for the event + String uuid = null; + switch (eventType) { + case CLONE: + case FORK: + case REPLAY: + if (parentUuids != null && !parentUuids.isEmpty()) { + uuid = parentUuids.get(0); + } + break; + case JOIN: + if (childUuids != null && !childUuids.isEmpty()) { + uuid = childUuids.get(0); + } + break; + } - String uuid = updatedAttributes == null ? null : updatedAttributes.get(CoreAttributes.UUID.key()); if (uuid == null) { - uuid = previousAttributes == null ? null : previousAttributes.get(CoreAttributes.UUID.key()); + uuid = updatedAttributes == null ? null : updatedAttributes.get(CoreAttributes.UUID.key()); + if (uuid == null) { + uuid = previousAttributes == null ? null : previousAttributes.get(CoreAttributes.UUID.key()); + } } + builder.setFlowFileUUID(uuid); builder.setEventDuration((Integer) record.getFieldValue(EventFieldNames.EVENT_DURATION)); http://git-wip-us.apache.org/repos/asf/nifi/blob/658ce505/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java index 6c5cc8d..22d2a5f 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java @@ -632,6 +632,8 @@ public class WriteAheadStorePartition implements EventStorePartition { logger.error("Failed to re-index Provenance Events for partition " + partitionName, e); } + executor.shutdown(); + final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); final long seconds = millis / 1000L; final long millisRemainder = millis % 1000L;
