Repository: nifi
Updated Branches:
  refs/heads/master f044ba5d4 -> e345218ea


NIFI-793: Added multi-threading to the indexing in the Persistent Provenance 
Repository


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/19f7db69
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/19f7db69
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/19f7db69

Branch: refs/heads/master
Commit: 19f7db69863f26fae8eb98b08300a31dcf7cb8cc
Parents: f171756
Author: Mark Payne <[email protected]>
Authored: Wed Aug 19 12:18:00 2015 -0400
Committer: Mark Payne <[email protected]>
Committed: Wed Aug 19 12:24:25 2015 -0400

----------------------------------------------------------------------
 nifi-assembly/pom.xml                           |   1 +
 .../org/apache/nifi/util/NiFiProperties.java    |   1 +
 .../src/main/asciidoc/administration-guide.adoc |   6 +-
 .../src/main/resources/conf/nifi.properties     |   1 +
 .../PersistentProvenanceRepository.java         | 109 ++++++++++++++++---
 .../provenance/RepositoryConfiguration.java     |  34 ++++--
 6 files changed, 127 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/19f7db69/nifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index 1f15227..a712b86 100644
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -285,6 +285,7 @@ language governing permissions and limitations under the 
License. -->
         <nifi.provenance.repository.rollover.time>30 
secs</nifi.provenance.repository.rollover.time>
         <nifi.provenance.repository.rollover.size>100 
MB</nifi.provenance.repository.rollover.size>
         
<nifi.provenance.repository.query.threads>2</nifi.provenance.repository.query.threads>
+        
<nifi.provenance.repository.index.threads>1</nifi.provenance.repository.index.threads>
         
<nifi.provenance.repository.compress.on.rollover>true</nifi.provenance.repository.compress.on.rollover>
         <nifi.provenance.repository.indexed.fields>EventType, FlowFileUUID, 
Filename, ProcessorID, Relationship</nifi.provenance.repository.indexed.fields> 
         <nifi.provenance.repository.indexed.attributes />

http://git-wip-us.apache.org/repos/asf/nifi/blob/19f7db69/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
 
b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
index e25f5d6..520e0ba 100644
--- 
a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
+++ 
b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
@@ -99,6 +99,7 @@ public class NiFiProperties extends Properties {
     public static final String PROVENANCE_ROLLOVER_TIME = 
"nifi.provenance.repository.rollover.time";
     public static final String PROVENANCE_ROLLOVER_SIZE = 
"nifi.provenance.repository.rollover.size";
     public static final String PROVENANCE_QUERY_THREAD_POOL_SIZE = 
"nifi.provenance.repository.query.threads";
+    public static final String PROVENANCE_INDEX_THREAD_POOL_SIZE = 
"nifi.provenance.repository.index.threads";
     public static final String PROVENANCE_COMPRESS_ON_ROLLOVER = 
"nifi.provenance.repository.compress.on.rollover";
     public static final String PROVENANCE_INDEXED_FIELDS = 
"nifi.provenance.repository.indexed.fields";
     public static final String PROVENANCE_INDEXED_ATTRIBUTES = 
"nifi.provenance.repository.indexed.attributes";

http://git-wip-us.apache.org/repos/asf/nifi/blob/19f7db69/nifi-docs/src/main/asciidoc/administration-guide.adoc
----------------------------------------------------------------------
diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc 
b/nifi-docs/src/main/asciidoc/administration-guide.adoc
index 7724713..4dfddf9 100644
--- a/nifi-docs/src/main/asciidoc/administration-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc
@@ -513,7 +513,11 @@ Providing three total locations, including  
_nifi.provenance.repository.director
 |nifi.provenance.repository.max.storage.size|The maximum amount of data 
provenance information to store at a time. The default is 1 GB.
 |nifi.provenance.repository.rollover.time|The amount of time to wait before 
rolling over the latest data provenance information so that it is available in 
the User Interface. The default value is 5 mins. 
 |nifi.provenance.repository.rollover.size|The amount of information to roll 
over at a time. The default value is 100 MB.
-|nifi.provenance.repository.query.threads|The number of threads to use for 
Provenance Repository queries. The default value is 2. 
+|nifi.provenance.repository.query.threads|The number of threads to use for 
Provenance Repository queries. The default value is 2.
+|nifi.provenance.repository.index.threads|The number of threads to use for 
indexing Provenance events so that they are searchable. The default value is 1.
+       For flows that operate on a very high number of FlowFiles, the indexing 
of Provenance events could become a bottleneck. If this is the case, a bulletin 
will appear, indicating that
+       "The rate of the dataflow is exceeding the provenance recording rate. 
Slowing down flow to accommodate." If this happens, increasing the value of 
this property
+       may increase the rate at which the Provenance Repository is able to 
process these records, resulting in better overall throughput. 
 |nifi.provenance.repository.compress.on.rollover|Indicates whether to compress 
the provenance information when rolling it over. The default value is _true_.
 |nifi.provenance.repository.always.sync|If set to _true_, any change to the 
repository will be synchronized to the disk, meaning that NiFi will ask the 
operating system not to cache the information. This is very expensive and can 
significantly reduce NiFi performance. However, if it is _false_, there could 
be the potential for data loss if either there is a sudden power loss or the 
operating system crashes. The default value is _false_.
 |nifi.provenance.repository.journal.count|The number of journal files that 
should be used to serialize Provenance Event data. Increasing this value will 
allow more tasks to simultaneously update the repository but will result in 
more expensive merging of the journal files later. This value should ideally be 
equal to the number of threads that are expected to update the repository 
simultaneously, but 16 tends to work well in must environments. The default 
value is 16.

http://git-wip-us.apache.org/repos/asf/nifi/blob/19f7db69/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
index 4043076..63e5391 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
@@ -71,6 +71,7 @@ 
nifi.provenance.repository.max.storage.size=${nifi.provenance.repository.max.sto
 
nifi.provenance.repository.rollover.time=${nifi.provenance.repository.rollover.time}
 
nifi.provenance.repository.rollover.size=${nifi.provenance.repository.rollover.size}
 
nifi.provenance.repository.query.threads=${nifi.provenance.repository.query.threads}
+nifi.provenance.repository.index.threads=${nifi.provenance.repository.index.threads}
 
nifi.provenance.repository.compress.on.rollover=${nifi.provenance.repository.compress.on.rollover}
 
nifi.provenance.repository.always.sync=${nifi.provenance.repository.always.sync}
 
nifi.provenance.repository.journal.count=${nifi.provenance.repository.journal.count}

http://git-wip-us.apache.org/repos/asf/nifi/blob/19f7db69/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
index a1063f0..4657686 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
@@ -38,6 +38,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -45,6 +46,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
@@ -95,6 +97,7 @@ import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.RingBuffer;
 import org.apache.nifi.util.RingBuffer.ForEachEvaluator;
 import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.Tuple;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -279,6 +282,8 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
         final String rolloverSize = 
properties.getProperty(NiFiProperties.PROVENANCE_ROLLOVER_SIZE, "100 MB");
         final String shardSize = 
properties.getProperty(NiFiProperties.PROVENANCE_INDEX_SHARD_SIZE, "500 MB");
         final int queryThreads = 
properties.getIntegerProperty(NiFiProperties.PROVENANCE_QUERY_THREAD_POOL_SIZE, 
2);
+        final int indexThreads = properties.getIntegerProperty(
+                NiFiProperties.PROVENANCE_INDEX_THREAD_POOL_SIZE, 1);
         final int journalCount = 
properties.getIntegerProperty(NiFiProperties.PROVENANCE_JOURNAL_COUNT, 16);
 
         final long storageMillis = FormatUtils.getTimeDuration(storageTime, 
TimeUnit.MILLISECONDS);
@@ -327,6 +332,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
         config.setMaxRecordLife(storageMillis, TimeUnit.MILLISECONDS);
         config.setMaxStorageCapacity(maxStorageBytes);
         config.setQueryThreadPoolSize(queryThreads);
+        config.setIndexThreadPoolSize(indexThreads);
         config.setJournalCount(journalCount);
         config.setMaxAttributeChars(maxAttrChars);
 
@@ -801,7 +807,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
      *
      * @throws IOException if unable to purge old events due to an I/O problem
      */
-    void purgeOldEvents() throws IOException {
+    synchronized void purgeOldEvents() throws IOException {
         while (!recoveryFinished.get()) {
             try {
                 Thread.sleep(100L);
@@ -1030,6 +1036,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
                         }
 
                         if (fileRolledOver == null) {
+                            logger.debug("Couldn't merge journals. Will try 
again in 10 seconds. journalsToMerge: {}, storageDir: {}", journalsToMerge, 
storageDir);
                             return;
                         }
                         final File file = fileRolledOver;
@@ -1091,19 +1098,31 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
             // that is no longer the case.
             if (journalFileCount > journalCountThreshold || repoSize > 
sizeThreshold) {
                 logger.warn("The rate of the dataflow is exceeding the 
provenance recording rate. "
-                        + "Slowing down flow to accomodate. Currently, there 
are {} journal files ({} bytes) and "
+                        + "Slowing down flow to accommodate. Currently, there 
are {} journal files ({} bytes) and "
                         + "threshold for blocking is {} ({} bytes)", 
journalFileCount, repoSize, journalCountThreshold, sizeThreshold);
                 eventReporter.reportEvent(Severity.WARNING, "Provenance 
Repository", "The rate of the dataflow is "
-                        + "exceeding the provenance recording rate. Slowing 
down flow to accomodate");
+                        + "exceeding the provenance recording rate. Slowing 
down flow to accommodate");
 
                 while (journalFileCount > journalCountThreshold || repoSize > 
sizeThreshold) {
-                    try {
-                        Thread.sleep(1000L);
-                    } catch (final InterruptedException ie) {
+                    if (repoSize > sizeThreshold) {
+                        logger.debug("Provenance Repository has exceeded its 
size threshold; will trigger purging of oldest events");
+                        purgeOldEvents();
+
+                        journalFileCount = getJournalCount();
+                        repoSize = getSize(getLogFiles(), 0L);
+                        continue;
+                    } else {
+                        // if we are constrained by the number of journal 
files rather than the size of the repo,
+                        // then we will just sleep a bit because another 
thread is already actively merging the journals,
+                        // due to the runnable that we scheduled above
+                        try {
+                            Thread.sleep(100L);
+                        } catch (final InterruptedException ie) {
+                        }
                     }
 
                     logger.debug("Provenance Repository is still behind. 
Keeping flow slowed down "
-                            + "to accomodate. Currently, there are {} journal 
files ({} bytes) and "
+                            + "to accommodate. Currently, there are {} journal 
files ({} bytes) and "
                             + "threshold for blocking is {} ({} bytes)", 
journalFileCount, repoSize, journalCountThreshold, sizeThreshold);
 
                     journalFileCount = getJournalCount();
@@ -1219,6 +1238,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
         }
 
         if (journalFiles.isEmpty()) {
+            logger.debug("Couldn't merge journals: Journal Files is empty; 
won't merge journals");
             return null;
         }
 
@@ -1380,11 +1400,51 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
                 final IndexingAction indexingAction = new IndexingAction(this);
 
                 final File indexingDirectory = 
indexConfig.getWritableIndexDirectory(writerFile, earliestTimestamp);
+                long maxId = 0L;
+
+                final BlockingQueue<Tuple<StandardProvenanceEventRecord, 
Integer>> eventQueue = new LinkedBlockingQueue<>(100);
+                final AtomicBoolean finishedAdding = new AtomicBoolean(false);
+                final List<Future<?>> futures = new ArrayList<>();
+
                 final IndexWriter indexWriter = 
indexManager.borrowIndexWriter(indexingDirectory);
                 try {
-                    long maxId = 0L;
+                    final ExecutorService exec = 
Executors.newFixedThreadPool(configuration.getIndexThreadPoolSize(), new 
ThreadFactory() {
+                        @Override
+                        public Thread newThread(final Runnable r) {
+                            final Thread t = 
Executors.defaultThreadFactory().newThread(r);
+                            t.setName("Index Provenance Events");
+                            return t;
+                        }
+                    });
 
                     try {
+                        for (int i = 0; i < 
configuration.getIndexThreadPoolSize(); i++) {
+                            final Callable<Object> callable = new 
Callable<Object>() {
+                                @Override
+                                public Object call() throws IOException {
+                                    while (!eventQueue.isEmpty() || 
!finishedAdding.get()) {
+                                        final 
Tuple<StandardProvenanceEventRecord, Integer> tuple;
+                                        try {
+                                            tuple = eventQueue.poll(10, 
TimeUnit.MILLISECONDS);
+                                        } catch (final InterruptedException 
ie) {
+                                            continue;
+                                        }
+
+                                        if (tuple == null) {
+                                            continue;
+                                        }
+
+                                        indexingAction.index(tuple.getKey(), 
indexWriter, tuple.getValue());
+                                    }
+
+                                    return null;
+                                }
+                            };
+
+                            final Future<?> future = exec.submit(callable);
+                            futures.add(future);
+                        }
+
                         while (!recordToReaderMap.isEmpty()) {
                             final Map.Entry<StandardProvenanceEventRecord, 
RecordReader> entry = recordToReaderMap.entrySet().iterator().next();
                             final StandardProvenanceEventRecord record = 
entry.getKey();
@@ -1393,7 +1453,13 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
                             writer.writeRecord(record, record.getEventId());
                             final int blockIndex = 
writer.getTocWriter().getCurrentBlockIndex();
 
-                            indexingAction.index(record, indexWriter, 
blockIndex);
+                            boolean accepted = false;
+                            while (!accepted) {
+                                try {
+                                    accepted = eventQueue.offer(new 
Tuple<>(record, blockIndex), 10, TimeUnit.MILLISECONDS);
+                                } catch (final InterruptedException ie) {
+                                }
+                            }
                             maxId = record.getEventId();
 
                             latestRecords.add(truncateAttributes(record));
@@ -1414,16 +1480,30 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
                                 recordToReaderMap.put(nextRecord, reader);
                             }
                         }
-                        indexWriter.commit();
-                    } catch (final Throwable t) {
-                        indexWriter.rollback();
-                        throw t;
+                    } finally {
+                        finishedAdding.set(true);
+                        exec.shutdown();
                     }
 
-                    indexConfig.setMaxIdIndexed(maxId);
+                    for (final Future<?> future : futures) {
+                        try {
+                            future.get();
+                        } catch (final ExecutionException ee) {
+                            final Throwable t = ee.getCause();
+                            if (t instanceof RuntimeException) {
+                                throw (RuntimeException) t;
+                            }
+
+                            throw new RuntimeException(t);
+                        } catch (final InterruptedException e) {
+                            throw new RuntimeException("Thread interrupted");
+                        }
+                    }
                 } finally {
                     indexManager.returnIndexWriter(indexingDirectory, 
indexWriter);
                 }
+
+                indexConfig.setMaxIdIndexed(maxId);
             }
 
             // record should now be available in the repository. We can copy 
the values from latestRecords to ringBuffer.
@@ -1468,6 +1548,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
 
         if (records == 0) {
             writerFile.delete();
+            logger.debug("Couldn't merge journals: No Records to merge");
             return null;
         } else {
             final long nanos = System.nanoTime() - startNanos;

http://git-wip-us.apache.org/repos/asf/nifi/blob/19f7db69/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
index 381d778..e63133a 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
@@ -16,14 +16,14 @@
  */
 package org.apache.nifi.provenance;
 
-import org.apache.nifi.provenance.search.SearchableField;
-
 import java.io.File;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.nifi.provenance.search.SearchableField;
+
 public class RepositoryConfiguration {
 
     private final List<File> storageDirectories = new ArrayList<>();
@@ -40,7 +40,8 @@ public class RepositoryConfiguration {
     private List<SearchableField> searchableAttributes = new ArrayList<>();
     private boolean compress = true;
     private boolean alwaysSync = false;
-    private int queryThreadPoolSize = 1;
+    private int queryThreadPoolSize = 2;
+    private int indexThreadPoolSize = 1;
     private boolean allowRollover = true;
 
     public void setAllowRollover(final boolean allow) {
@@ -204,6 +205,20 @@ public class RepositoryConfiguration {
     }
 
     /**
+     * @return the number of threads to use to index provenance events
+     */
+    public int getIndexThreadPoolSize() {
+        return indexThreadPoolSize;
+    }
+
+    public void setIndexThreadPoolSize(final int indexThreadPoolSize) {
+        if (indexThreadPoolSize < 1) {
+            throw new IllegalArgumentException();
+        }
+        this.indexThreadPoolSize = indexThreadPoolSize;
+    }
+
+    /**
      * <p>
      * Specifies the desired size of each Provenance Event index shard, in
      * bytes. We shard the index for a few reasons:
@@ -213,22 +228,21 @@ public class RepositoryConfiguration {
      * <li>
      * A very large index requires a significant amount of Java heap space to
      * search. As the size of the shard increases, the required Java heap space
-     * also increases.
-     * </li>
+     * also increases.</li>
      * <li>
      * By having multiple shards, we have the ability to use multiple 
concurrent
      * threads to search the individual shards, resulting in far less latency
-     * when performing a search across millions or billions of records.
-     * </li>
+     * when performing a search across millions or billions of records.</li>
      * <li>
      * We keep track of which time ranges each index shard spans. As a result,
      * we are able to determine which shards need to be searched if a search
      * provides a date range. This can greatly increase the speed of a search
-     * and reduce resource utilization.
-     * </li>
+     * and reduce resource utilization.</li>
      * </ol>
      *
-     * @param bytes the number of bytes to write to an index before beginning 
a new shard
+     * @param bytes
+     *            the number of bytes to write to an index before beginning a
+     *            new shard
      */
     public void setDesiredIndexSize(final long bytes) {
         this.desiredIndexBytes = bytes;

Reply via email to