Repository: nifi
Updated Branches:
  refs/heads/master e748fd584 -> cf7bfe9e1


NIFI-1132: Limited number of Lineage Identifiers held to 100 and marked the 
getLineageIdentifiers() method as deprecated


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/73c16719
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/73c16719
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/73c16719

Branch: refs/heads/master
Commit: 73c167197563d4d07a036739f9341a0f5e36b188
Parents: 5f8fdae
Author: Mark Payne <[email protected]>
Authored: Mon Nov 9 12:09:56 2015 -0500
Committer: Mark Payne <[email protected]>
Committed: Mon Nov 9 12:09:56 2015 -0500

----------------------------------------------------------------------
 .../java/org/apache/nifi/flowfile/FlowFile.java |  7 ++
 .../nifi/provenance/ProvenanceEventRecord.java  |  9 ++-
 .../nifi/provenance/StandardQueryResult.java    | 23 +++++-
 .../repository/StandardFlowFileRecord.java      | 24 ++++--
 .../TestPersistentProvenanceRepository.java     | 78 ++------------------
 5 files changed, 61 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/73c16719/nifi-api/src/main/java/org/apache/nifi/flowfile/FlowFile.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/flowfile/FlowFile.java 
b/nifi-api/src/main/java/org/apache/nifi/flowfile/FlowFile.java
index 0e2c19d..5edb7dd 100644
--- a/nifi-api/src/main/java/org/apache/nifi/flowfile/FlowFile.java
+++ b/nifi-api/src/main/java/org/apache/nifi/flowfile/FlowFile.java
@@ -64,7 +64,14 @@ public interface FlowFile extends Comparable<FlowFile> {
      * @return a set of identifiers that are unique to this FlowFile's lineage.
      * If FlowFile X is derived from FlowFile Y, both FlowFiles will have the
      * same value for the Lineage Claim ID.
+     * 
+     * @deprecated this collection was erroneously unbounded and caused a lot 
of OutOfMemoryError problems
+     *             when dealing with FlowFiles with many ancestors. This 
Collection is
+     *             now capped at 100 lineage identifiers. This method was 
introduced with the idea of providing
+     *             future performance improvements but due to the high cost of 
heap consumption will not be used
+     *             in such a manner. As a result, this method will be removed 
in a future release.
      */
+    @Deprecated
     Set<String> getLineageIdentifiers();
 
     /**

http://git-wip-us.apache.org/repos/asf/nifi/blob/73c16719/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventRecord.java 
b/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventRecord.java
index dc251b3..fc26d93 100644
--- 
a/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventRecord.java
+++ 
b/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventRecord.java
@@ -51,7 +51,14 @@ public interface ProvenanceEventRecord {
     /**
      * @return the set of all lineage identifiers that are associated with the
      * FlowFile for which this Event was created
-     */
+     * 
+     * @deprecated this collection was erroneously unbounded and caused a lot 
of OutOfMemoryError problems
+     *             when querying Provenance Events about FlowFiles with many 
ancestors. This Collection is
+     *             now capped at 100 lineage identifiers. This method was 
introduced with the idea of providing
+     *             future performance improvements but due to the high cost of 
heap consumption will not be used
+     *             in such a manner. As a result, this method will be removed 
in a future release.
+     */
+    @Deprecated
     Set<String> getLineageIdentifiers();
 
     /**

http://git-wip-us.apache.org/repos/asf/nifi/blob/73c16719/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java
 
b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java
index 9a9a27d..03ab3ea 100644
--- 
a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java
+++ 
b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java
@@ -18,8 +18,12 @@ package org.apache.nifi.provenance;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.Date;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -40,7 +44,7 @@ public class StandardQueryResult implements QueryResult {
 
     private final Lock writeLock = rwLock.writeLock();
     // guarded by writeLock
-    private final List<ProvenanceEventRecord> matchingRecords = new 
ArrayList<>();
+    private final Set<ProvenanceEventRecord> matchingRecords = new 
TreeSet<>(new EventIdComparator());
     private long totalHitCount;
     private int numCompletedSteps = 0;
     private Date expirationDate;
@@ -66,8 +70,14 @@ public class StandardQueryResult implements QueryResult {
             }
 
             final List<ProvenanceEventRecord> copy = new 
ArrayList<>(query.getMaxResults());
-            for (int i = 0; i < query.getMaxResults(); i++) {
-                copy.add(matchingRecords.get(i));
+
+            int i = 0;
+            final Iterator<ProvenanceEventRecord> itr = 
matchingRecords.iterator();
+            while (itr.hasNext()) {
+                copy.add(itr.next());
+                if (++i >= query.getMaxResults()) {
+                    break;
+                }
             }
 
             return copy;
@@ -165,4 +175,11 @@ public class StandardQueryResult implements QueryResult {
     private void updateExpiration() {
         expirationDate = new Date(System.currentTimeMillis() + TTL);
     }
+
+    private static class EventIdComparator implements 
Comparator<ProvenanceEventRecord> {
+        @Override
+        public int compare(final ProvenanceEventRecord o1, final 
ProvenanceEventRecord o2) {
+            return Long.compare(o2.getEventId(), o1.getEventId());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/73c16719/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java
index cc8c734..5474c7a 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java
@@ -25,24 +25,25 @@ import java.util.Map;
 import java.util.Set;
 import java.util.regex.Pattern;
 
-import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-
 import org.apache.commons.lang3.builder.CompareToBuilder;
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.commons.lang3.builder.ToStringStyle;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
 
 /**
  * <p>
- * A flow file is a logical notion of an item in a flow with its associated 
attributes and identity which can be used as a reference for its actual 
content.</p>
+ * A flow file is a logical notion of an item in a flow with its associated 
attributes and identity which can be used as a reference for its actual content.
+ * </p>
  *
  * <b>Immutable - Thread Safe</b>
  *
  */
 public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
+    private static final int MAX_LINEAGE_IDENTIFIERS = 100;
 
     private final long id;
     private final long entryDate;
@@ -182,7 +183,18 @@ public final class StandardFlowFileRecord implements 
FlowFile, FlowFileRecord {
         public Builder lineageIdentifiers(final Collection<String> 
lineageIdentifiers) {
             if (null != lineageIdentifiers) {
                 bLineageIdentifiers.clear();
-                bLineageIdentifiers.addAll(lineageIdentifiers);
+
+                if (lineageIdentifiers.size() > MAX_LINEAGE_IDENTIFIERS) {
+                    int i = 0;
+                    for (final String id : lineageIdentifiers) {
+                        bLineageIdentifiers.add(id);
+                        if (i++ >= MAX_LINEAGE_IDENTIFIERS) {
+                            break;
+                        }
+                    }
+                } else {
+                    bLineageIdentifiers.addAll(lineageIdentifiers);
+                }
             }
             return this;
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/73c16719/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
index 6875743..5e4aed0 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
@@ -117,16 +117,16 @@ public class TestPersistentProvenanceRepository {
         // Delete all of the storage files. We do this in order to clean up 
the tons of files that
         // we create but also to ensure that we have closed all of the file 
handles. If we leave any
         // streams open, for instance, this will throw an IOException, causing 
our unit test to fail.
-        for ( final File storageDir : config.getStorageDirectories() ) {
+        for (final File storageDir : config.getStorageDirectories()) {
             int i;
-            for (i=0; i < 3; i++) {
+            for (i = 0; i < 3; i++) {
                 try {
                     FileUtils.deleteFile(storageDir, true);
                     break;
                 } catch (final IOException ioe) {
                     // if there is a virus scanner, etc. running in the 
background we may not be able to
                     // delete the file. Wait a sec and try again.
-                    if ( i == 2 ) {
+                    if (i == 2) {
                         throw ioe;
                     } else {
                         try {
@@ -441,7 +441,7 @@ public class TestPersistentProvenanceRepository {
         repo.waitForRollover();
 
         final Query query = new Query(UUID.randomUUID().toString());
-        //        
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.FlowFileUUID, 
"00000000-0000-0000-0000*"));
+        // 
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.FlowFileUUID, 
"00000000-0000-0000-0000*"));
         
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.Filename, 
"file-*"));
         
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, 
"12?4"));
         
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.TransitURI, 
"nifi://*"));
@@ -465,68 +465,6 @@ public class TestPersistentProvenanceRepository {
     }
 
     @Test
-    public void testIndexAndCompressOnRolloverAndSubsequentSearchAsync() 
throws IOException, InterruptedException, ParseException {
-        final RepositoryConfiguration config = createConfiguration();
-        config.setMaxRecordLife(3, TimeUnit.SECONDS);
-        config.setMaxStorageCapacity(1024L * 1024L);
-        config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
-        config.setMaxEventFileCapacity(1024L * 1024L);
-        config.setSearchableFields(new 
ArrayList<>(SearchableFields.getStandardFields()));
-
-        repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter());
-
-        final String uuid = "00000000-0000-0000-0000-000000000000";
-        final Map<String, String> attributes = new HashMap<>();
-        attributes.put("abc", "xyz");
-        attributes.put("xyz", "abc");
-        attributes.put("filename", "file-" + uuid);
-
-        final ProvenanceEventBuilder builder = new 
StandardProvenanceEventRecord.Builder();
-        builder.setEventTime(System.currentTimeMillis());
-        builder.setEventType(ProvenanceEventType.RECEIVE);
-        builder.setTransitUri("nifi://unit-test");
-        builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
-        builder.setComponentId("1234");
-        builder.setComponentType("dummy processor");
-
-        for (int i = 0; i < 10; i++) {
-            attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i);
-            builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
-            repo.registerEvent(builder.build());
-        }
-
-        repo.waitForRollover();
-
-        final Query query = new Query(UUID.randomUUID().toString());
-        
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.FlowFileUUID, 
"00000*"));
-        
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.Filename, 
"file-*"));
-        
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, 
"12?4"));
-        
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.TransitURI, 
"nifi://*"));
-        query.setMaxResults(100);
-
-        final QuerySubmission submission = repo.submitQuery(query);
-        while (!submission.getResult().isFinished()) {
-            Thread.sleep(100L);
-        }
-
-        assertEquals(10, submission.getResult().getMatchingEvents().size());
-        for (final ProvenanceEventRecord match : 
submission.getResult().getMatchingEvents()) {
-            System.out.println(match);
-        }
-
-        Thread.sleep(2000L);
-
-        config.setMaxStorageCapacity(100L);
-        config.setMaxRecordLife(500, TimeUnit.MILLISECONDS);
-        repo.purgeOldEvents();
-        Thread.sleep(2000L);
-
-        final QueryResult newRecordSet = repo.queryEvents(query);
-        assertTrue(newRecordSet.getMatchingEvents().isEmpty());
-    }
-
-    @Test
     public void 
testIndexAndCompressOnRolloverAndSubsequentSearchMultipleStorageDirs() throws 
IOException, InterruptedException, ParseException {
         final RepositoryConfiguration config = createConfiguration();
         config.addStorageDirectory(new File("target/storage/" + 
UUID.randomUUID().toString()));
@@ -603,7 +541,7 @@ public class TestPersistentProvenanceRepository {
 
         repo.purgeOldEvents();
 
-        Thread.sleep(2000L);    // purge is async. Give it time to do its job.
+        Thread.sleep(2000L); // purge is async. Give it time to do its job.
 
         query.setMaxResults(100);
         final QuerySubmission noResultSubmission = repo.submitQuery(query);
@@ -939,7 +877,7 @@ public class TestPersistentProvenanceRepository {
         config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
         config.setMaxEventFileCapacity(1024L * 1024L);
         config.setSearchableFields(new 
ArrayList<>(SearchableFields.getStandardFields()));
-        config.setDesiredIndexSize(10);  // force new index to be created for 
each rollover
+        config.setDesiredIndexSize(10); // force new index to be created for 
each rollover
 
         repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS);
         repo.initialize(getEventReporter());
@@ -961,7 +899,7 @@ public class TestPersistentProvenanceRepository {
         for (int i = 0; i < 10; i++) {
             attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i);
             builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
-            builder.setEventTime(10L);  // make sure the events are destroyed 
when we call purge
+            builder.setEventTime(10L); // make sure the events are destroyed 
when we call purge
             repo.registerEvent(builder.build());
         }
 
@@ -1019,7 +957,7 @@ public class TestPersistentProvenanceRepository {
     @Test
     public void testBackPressure() throws IOException, InterruptedException {
         final RepositoryConfiguration config = createConfiguration();
-        config.setMaxEventFileCapacity(1L);  // force rollover on each record.
+        config.setMaxEventFileCapacity(1L); // force rollover on each record.
         config.setJournalCount(1);
 
         final AtomicInteger journalCountRef = new AtomicInteger(0);

Reply via email to