Repository: nifi Updated Branches: refs/heads/NIFI-744 301078bec -> b8cee5105
NIFI-793: Added multi-threading to the indexing in the Persistent Provenance Repository Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/f260ec76 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/f260ec76 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/f260ec76 Branch: refs/heads/NIFI-744 Commit: f260ec760241f8aeaaa96e97e40282e603f6cea2 Parents: 75ed16c Author: Mark Payne <[email protected]> Authored: Wed Jul 29 14:28:10 2015 -0400 Committer: Mark Payne <[email protected]> Committed: Thu Jul 30 09:54:50 2015 -0400 ---------------------------------------------------------------------- nifi/nifi-assembly/pom.xml | 1 + .../org/apache/nifi/util/NiFiProperties.java | 1 + .../src/main/resources/conf/nifi.properties | 1 + .../PersistentProvenanceRepository.java | 146 +++++++++++++++---- .../provenance/RepositoryConfiguration.java | 34 +++-- 5 files changed, 143 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/f260ec76/nifi/nifi-assembly/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-assembly/pom.xml b/nifi/nifi-assembly/pom.xml index 60a6545..34d6c25 100644 --- a/nifi/nifi-assembly/pom.xml +++ b/nifi/nifi-assembly/pom.xml @@ -280,6 +280,7 @@ language governing permissions and limitations under the License. --> <nifi.provenance.repository.rollover.time>30 secs</nifi.provenance.repository.rollover.time> <nifi.provenance.repository.rollover.size>100 MB</nifi.provenance.repository.rollover.size> <nifi.provenance.repository.query.threads>2</nifi.provenance.repository.query.threads> + <nifi.provenance.repository.index.threads>1</nifi.provenance.repository.index.threads> <nifi.provenance.repository.compress.on.rollover>true</nifi.provenance.repository.compress.on.rollover> <nifi.provenance.repository.indexed.fields>EventType, FlowFileUUID, Filename, ProcessorID, Relationship</nifi.provenance.repository.indexed.fields> <nifi.provenance.repository.indexed.attributes /> http://git-wip-us.apache.org/repos/asf/nifi/blob/f260ec76/nifi/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java index e25f5d6..520e0ba 100644 --- a/nifi/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java +++ b/nifi/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java @@ -99,6 +99,7 @@ public class NiFiProperties extends Properties { public static final String PROVENANCE_ROLLOVER_TIME = "nifi.provenance.repository.rollover.time"; public static final String PROVENANCE_ROLLOVER_SIZE = "nifi.provenance.repository.rollover.size"; public static final String PROVENANCE_QUERY_THREAD_POOL_SIZE = "nifi.provenance.repository.query.threads"; + public static final String PROVENANCE_INDEX_THREAD_POOL_SIZE = "nifi.provenance.repository.index.threads"; public static final String PROVENANCE_COMPRESS_ON_ROLLOVER = "nifi.provenance.repository.compress.on.rollover"; public static final String PROVENANCE_INDEXED_FIELDS = "nifi.provenance.repository.indexed.fields"; public static final String PROVENANCE_INDEXED_ATTRIBUTES = "nifi.provenance.repository.indexed.attributes"; http://git-wip-us.apache.org/repos/asf/nifi/blob/f260ec76/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties index 4043076..63e5391 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties @@ -71,6 +71,7 @@ nifi.provenance.repository.max.storage.size=${nifi.provenance.repository.max.sto nifi.provenance.repository.rollover.time=${nifi.provenance.repository.rollover.time} nifi.provenance.repository.rollover.size=${nifi.provenance.repository.rollover.size} nifi.provenance.repository.query.threads=${nifi.provenance.repository.query.threads} +nifi.provenance.repository.index.threads=${nifi.provenance.repository.index.threads} nifi.provenance.repository.compress.on.rollover=${nifi.provenance.repository.compress.on.rollover} nifi.provenance.repository.always.sync=${nifi.provenance.repository.always.sync} nifi.provenance.repository.journal.count=${nifi.provenance.repository.journal.count} http://git-wip-us.apache.org/repos/asf/nifi/blob/f260ec76/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java index 81d883a..4408e3d 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java @@ -38,6 +38,7 @@ import java.util.Map; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -45,6 +46,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -95,6 +97,7 @@ import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.RingBuffer; import org.apache.nifi.util.RingBuffer.ForEachEvaluator; import org.apache.nifi.util.StopWatch; +import org.apache.nifi.util.Tuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -278,6 +281,8 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository final String rolloverSize = properties.getProperty(NiFiProperties.PROVENANCE_ROLLOVER_SIZE, "100 MB"); final String shardSize = properties.getProperty(NiFiProperties.PROVENANCE_INDEX_SHARD_SIZE, "500 MB"); final int queryThreads = properties.getIntegerProperty(NiFiProperties.PROVENANCE_QUERY_THREAD_POOL_SIZE, 2); + final int indexThreads = properties.getIntegerProperty( + NiFiProperties.PROVENANCE_INDEX_THREAD_POOL_SIZE, 1); final int journalCount = properties.getIntegerProperty(NiFiProperties.PROVENANCE_JOURNAL_COUNT, 16); final long storageMillis = FormatUtils.getTimeDuration(storageTime, TimeUnit.MILLISECONDS); @@ -326,6 +331,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository config.setMaxRecordLife(storageMillis, TimeUnit.MILLISECONDS); config.setMaxStorageCapacity(maxStorageBytes); config.setQueryThreadPoolSize(queryThreads); + config.setIndexThreadPoolSize(indexThreads); config.setJournalCount(journalCount); config.setMaxAttributeChars(maxAttrChars); @@ -795,7 +801,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository * * @throws IOException if unable to purge old events due to an I/O problem */ - void purgeOldEvents() throws IOException { + synchronized void purgeOldEvents() throws IOException { while (!recoveryFinished.get()) { try { Thread.sleep(100L); @@ -1009,6 +1015,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } if (fileRolledOver == null) { + logger.debug("Couldn't merge journals. Will try again in 10 seconds. journalsToMerge: {}, storageDir: {}", journalsToMerge, storageDir); return; } final File file = fileRolledOver; @@ -1063,19 +1070,31 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository if (journalFileCount > journalCountThreshold || repoSize > sizeThreshold) { logger.warn("The rate of the dataflow is exceeding the provenance recording rate. " - + "Slowing down flow to accomodate. Currently, there are {} journal files ({} bytes) and " + + "Slowing down flow to accommodate. Currently, there are {} journal files ({} bytes) and " + "threshold for blocking is {} ({} bytes)", journalFileCount, repoSize, journalCountThreshold, sizeThreshold); eventReporter.reportEvent(Severity.WARNING, "Provenance Repository", "The rate of the dataflow is " - + "exceeding the provenance recording rate. Slowing down flow to accomodate"); + + "exceeding the provenance recording rate. Slowing down flow to accommodate"); while (journalFileCount > journalCountThreshold || repoSize > sizeThreshold) { - try { - Thread.sleep(1000L); - } catch (final InterruptedException ie) { + if (repoSize > sizeThreshold) { + logger.debug("Provenance Repository has exceeded its size threshold; will trigger purging of oldest events"); + purgeOldEvents(); + + journalFileCount = getJournalCount(); + repoSize = getSize(getLogFiles(), 0L); + continue; + } else { + // if we are constrained by the number of journal files rather than the size of the repo, + // then we will just sleep a bit because another thread is already actively merging the journals, + // due to the runnable that we scheduled above + try { + Thread.sleep(100L); + } catch (final InterruptedException ie) { + } } logger.debug("Provenance Repository is still behind. Keeping flow slowed down " - + "to accomodate. Currently, there are {} journal files ({} bytes) and " + + "to accommodate. Currently, there are {} journal files ({} bytes) and " + "threshold for blocking is {} ({} bytes)", journalFileCount, repoSize, journalCountThreshold, sizeThreshold); journalFileCount = getJournalCount(); @@ -1169,6 +1188,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } if (journalFiles.isEmpty()) { + logger.debug("Couldn't merge journals: Journal Files is empty; won't merge journals"); return null; } @@ -1328,45 +1348,110 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository final IndexingAction indexingAction = new IndexingAction(this); final File indexingDirectory = indexConfig.getWritableIndexDirectory(writerFile, earliestTimestamp); + long maxId = 0L; + + final BlockingQueue<Tuple<StandardProvenanceEventRecord, Integer>> eventQueue = new LinkedBlockingQueue<>(100); + final AtomicBoolean finishedAdding = new AtomicBoolean(false); + final List<Future<?>> futures = new ArrayList<>(); + final IndexWriter indexWriter = indexManager.borrowIndexWriter(indexingDirectory); try { - long maxId = 0L; + final ExecutorService exec = Executors.newFixedThreadPool(configuration.getIndexThreadPoolSize(), new ThreadFactory() { + @Override + public Thread newThread(final Runnable r) { + final Thread t = Executors.defaultThreadFactory().newThread(r); + t.setName("Index Provenance Events"); + return t; + } + }); - while (!recordToReaderMap.isEmpty()) { - final Map.Entry<StandardProvenanceEventRecord, RecordReader> entry = recordToReaderMap.entrySet().iterator().next(); - final StandardProvenanceEventRecord record = entry.getKey(); - final RecordReader reader = entry.getValue(); + try { + for (int i = 0; i < 6; i++) { + final Callable<Object> callable = new Callable<Object>() { + @Override + public Object call() throws IOException { + while (!eventQueue.isEmpty() || !finishedAdding.get()) { + final Tuple<StandardProvenanceEventRecord, Integer> tuple; + try { + tuple = eventQueue.poll(10, TimeUnit.MILLISECONDS); + } catch (final InterruptedException ie) { + continue; + } + + if (tuple == null) { + continue; + } + + indexingAction.index(tuple.getKey(), indexWriter, tuple.getValue()); + } - writer.writeRecord(record, record.getEventId()); - final int blockIndex = writer.getTocWriter().getCurrentBlockIndex(); + return null; + } + }; - indexingAction.index(record, indexWriter, blockIndex); - maxId = record.getEventId(); + final Future<?> future = exec.submit(callable); + futures.add(future); + } - latestRecords.add(truncateAttributes(record)); - records++; + while (!recordToReaderMap.isEmpty()) { + final Map.Entry<StandardProvenanceEventRecord, RecordReader> entry = recordToReaderMap.entrySet().iterator().next(); + final StandardProvenanceEventRecord record = entry.getKey(); + final RecordReader reader = entry.getValue(); - // Remove this entry from the map - recordToReaderMap.remove(record); + writer.writeRecord(record, record.getEventId()); + final int blockIndex = writer.getTocWriter().getCurrentBlockIndex(); - // Get the next entry from this reader and add it to the map - StandardProvenanceEventRecord nextRecord = null; + boolean accepted = false; + while (!accepted) { + try { + accepted = eventQueue.offer(new Tuple<>(record, blockIndex), 10, TimeUnit.MILLISECONDS); + } catch (final InterruptedException ie) { + } + } + maxId = record.getEventId(); - try { - nextRecord = reader.nextRecord(); - } catch (final EOFException eof) { - } + latestRecords.add(truncateAttributes(record)); + records++; + + // Remove this entry from the map + recordToReaderMap.remove(record); - if (nextRecord != null) { - recordToReaderMap.put(nextRecord, reader); + // Get the next entry from this reader and add it to the map + StandardProvenanceEventRecord nextRecord = null; + + try { + nextRecord = reader.nextRecord(); + } catch (final EOFException eof) { + } + + if (nextRecord != null) { + recordToReaderMap.put(nextRecord, reader); + } } + } finally { + finishedAdding.set(true); + exec.shutdown(); } - indexWriter.commit(); - indexConfig.setMaxIdIndexed(maxId); + for (final Future<?> future : futures) { + try { + future.get(); + } catch (final ExecutionException ee) { + final Throwable t = ee.getCause(); + if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } + + throw new RuntimeException(t); + } catch (final InterruptedException e) { + throw new RuntimeException("Thread interrupted"); + } + } } finally { indexManager.returnIndexWriter(indexingDirectory, indexWriter); } + + indexConfig.setMaxIdIndexed(maxId); } // record should now be available in the repository. We can copy the values from latestRecords to ringBuffer. @@ -1402,6 +1487,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository if (records == 0) { writerFile.delete(); + logger.debug("Couldn't merge journals: No Records to merge"); return null; } else { final long nanos = System.nanoTime() - startNanos; http://git-wip-us.apache.org/repos/asf/nifi/blob/f260ec76/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java index 381d778..e63133a 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java @@ -16,14 +16,14 @@ */ package org.apache.nifi.provenance; -import org.apache.nifi.provenance.search.SearchableField; - import java.io.File; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; +import org.apache.nifi.provenance.search.SearchableField; + public class RepositoryConfiguration { private final List<File> storageDirectories = new ArrayList<>(); @@ -40,7 +40,8 @@ public class RepositoryConfiguration { private List<SearchableField> searchableAttributes = new ArrayList<>(); private boolean compress = true; private boolean alwaysSync = false; - private int queryThreadPoolSize = 1; + private int queryThreadPoolSize = 2; + private int indexThreadPoolSize = 1; private boolean allowRollover = true; public void setAllowRollover(final boolean allow) { @@ -204,6 +205,20 @@ public class RepositoryConfiguration { } /** + * @return the number of threads to use to index provenance events + */ + public int getIndexThreadPoolSize() { + return indexThreadPoolSize; + } + + public void setIndexThreadPoolSize(final int indexThreadPoolSize) { + if (indexThreadPoolSize < 1) { + throw new IllegalArgumentException(); + } + this.indexThreadPoolSize = indexThreadPoolSize; + } + + /** * <p> * Specifies the desired size of each Provenance Event index shard, in * bytes. We shard the index for a few reasons: @@ -213,22 +228,21 @@ public class RepositoryConfiguration { * <li> * A very large index requires a significant amount of Java heap space to * search. As the size of the shard increases, the required Java heap space - * also increases. - * </li> + * also increases.</li> * <li> * By having multiple shards, we have the ability to use multiple concurrent * threads to search the individual shards, resulting in far less latency - * when performing a search across millions or billions of records. - * </li> + * when performing a search across millions or billions of records.</li> * <li> * We keep track of which time ranges each index shard spans. As a result, * we are able to determine which shards need to be searched if a search * provides a date range. This can greatly increase the speed of a search - * and reduce resource utilization. - * </li> + * and reduce resource utilization.</li> * </ol> * - * @param bytes the number of bytes to write to an index before beginning a new shard + * @param bytes + * the number of bytes to write to an index before beginning a + * new shard */ public void setDesiredIndexSize(final long bytes) { this.desiredIndexBytes = bytes;
