NIFI-527: Merging develop
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/a5ac48a0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/a5ac48a0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/a5ac48a0 Branch: refs/heads/develop Commit: a5ac48a03c362dcb0b253741157d79e8791eb2d5 Parents: f442d55 Author: Mark Payne <[email protected]> Authored: Mon Apr 27 09:52:33 2015 -0400 Committer: Mark Payne <[email protected]> Committed: Mon Apr 27 09:52:33 2015 -0400 ---------------------------------------------------------------------- .../PersistentProvenanceRepository.java | 592 ++++++++++--------- .../provenance/RepositoryConfiguration.java | 14 +- .../nifi/provenance/StandardRecordReader.java | 223 ++++++- .../nifi/provenance/StandardRecordWriter.java | 114 +++- .../provenance/lucene/DeleteIndexAction.java | 75 +-- .../nifi/provenance/lucene/DocsReader.java | 100 +++- .../nifi/provenance/lucene/FieldNames.java | 1 + .../nifi/provenance/lucene/IndexManager.java | 467 +++++++++++++++ .../nifi/provenance/lucene/IndexSearch.java | 71 ++- .../nifi/provenance/lucene/IndexingAction.java | 183 +++--- .../nifi/provenance/lucene/LuceneUtil.java | 26 +- .../provenance/serialization/RecordReader.java | 67 +++ .../provenance/serialization/RecordReaders.java | 139 ++--- .../provenance/serialization/RecordWriter.java | 6 + .../provenance/serialization/RecordWriters.java | 13 +- .../nifi/provenance/toc/StandardTocReader.java | 108 ++++ .../nifi/provenance/toc/StandardTocWriter.java | 120 ++++ .../apache/nifi/provenance/toc/TocReader.java | 58 ++ .../org/apache/nifi/provenance/toc/TocUtil.java | 37 ++ .../apache/nifi/provenance/toc/TocWriter.java | 52 ++ .../TestPersistentProvenanceRepository.java | 175 +++--- .../TestStandardRecordReaderWriter.java | 189 ++++++ .../org/apache/nifi/provenance/TestUtil.java | 82 +++ .../provenance/toc/TestStandardTocReader.java | 91 +++ .../provenance/toc/TestStandardTocWriter.java | 42 ++ 25 files changed, 2426 insertions(+), 619 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java index 0502cc7..48cc164 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java @@ -21,7 +21,6 @@ import java.io.File; import java.io.FileFilter; import java.io.FileNotFoundException; import java.io.IOException; -import java.nio.file.FileAlreadyExistsException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; @@ -58,6 +57,14 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.regex.Pattern; +import org.apache.lucene.document.Document; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexNotFoundException; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.store.FSDirectory; import org.apache.nifi.events.EventReporter; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.provenance.expiration.ExpirationAction; @@ -67,12 +74,11 @@ import org.apache.nifi.provenance.lineage.Lineage; import org.apache.nifi.provenance.lineage.LineageComputationType; import org.apache.nifi.provenance.lucene.DeleteIndexAction; import org.apache.nifi.provenance.lucene.FieldNames; +import org.apache.nifi.provenance.lucene.IndexManager; import org.apache.nifi.provenance.lucene.IndexSearch; import org.apache.nifi.provenance.lucene.IndexingAction; import org.apache.nifi.provenance.lucene.LineageQuery; import org.apache.nifi.provenance.lucene.LuceneUtil; -import org.apache.nifi.provenance.rollover.CompressionAction; -import org.apache.nifi.provenance.rollover.RolloverAction; import org.apache.nifi.provenance.search.Query; import org.apache.nifi.provenance.search.QueryResult; import org.apache.nifi.provenance.search.QuerySubmission; @@ -81,18 +87,12 @@ import org.apache.nifi.provenance.serialization.RecordReader; import org.apache.nifi.provenance.serialization.RecordReaders; import org.apache.nifi.provenance.serialization.RecordWriter; import org.apache.nifi.provenance.serialization.RecordWriters; +import org.apache.nifi.provenance.toc.TocUtil; import org.apache.nifi.reporting.Severity; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.RingBuffer; import org.apache.nifi.util.StopWatch; -import org.apache.lucene.document.Document; -import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.IndexNotFoundException; -import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.ScoreDoc; -import org.apache.lucene.search.TopDocs; -import org.apache.lucene.store.FSDirectory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -102,7 +102,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository public static final String EVENT_CATEGORY = "Provenance Repository"; private static final String FILE_EXTENSION = ".prov"; private static final String TEMP_FILE_SUFFIX = ".prov.part"; - public static final int SERIALIZATION_VERSION = 7; + public static final int SERIALIZATION_VERSION = 8; public static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+"); public static final Pattern INDEX_PATTERN = Pattern.compile("index-\\d+"); public static final Pattern LOG_FILENAME_PATTERN = Pattern.compile("(\\d+).*\\.prov"); @@ -129,14 +129,14 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository private final AtomicLong streamStartTime = new AtomicLong(System.currentTimeMillis()); private final RepositoryConfiguration configuration; private final IndexConfiguration indexConfig; + private final IndexManager indexManager; private final boolean alwaysSync; private final int rolloverCheckMillis; private final ScheduledExecutorService scheduledExecService; - private final ExecutorService rolloverExecutor; + private final ScheduledExecutorService rolloverExecutor; private final ExecutorService queryExecService; - private final List<RolloverAction> rolloverActions = new ArrayList<>(); private final List<ExpirationAction> expirationActions = new ArrayList<>(); private final IndexingAction indexingAction; @@ -181,22 +181,18 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository this.maxPartitionMillis = configuration.getMaxEventFileLife(TimeUnit.MILLISECONDS); this.maxPartitionBytes = configuration.getMaxEventFileCapacity(); this.indexConfig = new IndexConfiguration(configuration); + this.indexManager = new IndexManager(); this.alwaysSync = configuration.isAlwaysSync(); this.rolloverCheckMillis = rolloverCheckMillis; final List<SearchableField> fields = configuration.getSearchableFields(); if (fields != null && !fields.isEmpty()) { indexingAction = new IndexingAction(this, indexConfig); - rolloverActions.add(indexingAction); } else { indexingAction = null; } - if (configuration.isCompressOnRollover()) { - rolloverActions.add(new CompressionAction()); - } - - scheduledExecService = Executors.newScheduledThreadPool(3); + scheduledExecService = Executors.newScheduledThreadPool(3, new NamedThreadFactory("Provenance Maintenance Thread")); queryExecService = Executors.newFixedThreadPool(configuration.getQueryThreadPoolSize(), new NamedThreadFactory("Provenance Query Thread")); // The number of rollover threads is a little bit arbitrary but comes from the idea that multiple storage directories generally @@ -204,69 +200,74 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository // disks efficiently. However, the rollover actions can be somewhat CPU intensive, so we double the number of threads in order // to account for that. final int numRolloverThreads = configuration.getStorageDirectories().size() * 2; - rolloverExecutor = Executors.newFixedThreadPool(numRolloverThreads, new NamedThreadFactory("Provenance Repository Rollover Thread")); + rolloverExecutor = Executors.newScheduledThreadPool(numRolloverThreads, new NamedThreadFactory("Provenance Repository Rollover Thread")); } @Override public void initialize(final EventReporter eventReporter) throws IOException { - if (initialized.getAndSet(true)) { - return; - } - - this.eventReporter = eventReporter; - - recover(); - - if (configuration.isAllowRollover()) { - writers = createWriters(configuration, idGenerator.get()); - } - - if (configuration.isAllowRollover()) { - scheduledExecService.scheduleWithFixedDelay(new Runnable() { - @Override - public void run() { - // Check if we need to roll over - if (needToRollover()) { - // it appears that we do need to roll over. Obtain write lock so that we can do so, and then - // confirm that we still need to. - writeLock.lock(); - try { - logger.debug("Obtained write lock to perform periodic rollover"); - - if (needToRollover()) { - try { - rollover(false); - } catch (final Exception e) { - logger.error("Failed to roll over Provenance Event Log due to {}", e.toString()); - logger.error("", e); - } - } - } finally { - writeLock.unlock(); - } - } - } - }, rolloverCheckMillis, rolloverCheckMillis, TimeUnit.MILLISECONDS); - - scheduledExecService.scheduleWithFixedDelay(new RemoveExpiredQueryResults(), 30L, 3L, TimeUnit.SECONDS); - scheduledExecService.scheduleWithFixedDelay(new Runnable() { - @Override - public void run() { - try { - purgeOldEvents(); - } catch (final Exception e) { - logger.error("Failed to purge old events from Provenance Repo due to {}", e.toString()); - if (logger.isDebugEnabled()) { - logger.error("", e); - } - eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to purge old events from Provenance Repo due to " + e.toString()); - } - } - }, 1L, 1L, TimeUnit.MINUTES); - - expirationActions.add(new DeleteIndexAction(this, indexConfig)); - expirationActions.add(new FileRemovalAction()); - } + writeLock.lock(); + try { + if (initialized.getAndSet(true)) { + return; + } + + this.eventReporter = eventReporter; + + recover(); + + if (configuration.isAllowRollover()) { + writers = createWriters(configuration, idGenerator.get()); + } + + if (configuration.isAllowRollover()) { + scheduledExecService.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + // Check if we need to roll over + if (needToRollover()) { + // it appears that we do need to roll over. Obtain write lock so that we can do so, and then + // confirm that we still need to. + writeLock.lock(); + try { + logger.debug("Obtained write lock to perform periodic rollover"); + + if (needToRollover()) { + try { + rollover(false); + } catch (final Exception e) { + logger.error("Failed to roll over Provenance Event Log due to {}", e.toString()); + logger.error("", e); + } + } + } finally { + writeLock.unlock(); + } + } + } + }, rolloverCheckMillis, rolloverCheckMillis, TimeUnit.MILLISECONDS); + + scheduledExecService.scheduleWithFixedDelay(new RemoveExpiredQueryResults(), 30L, 3L, TimeUnit.SECONDS); + scheduledExecService.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + try { + purgeOldEvents(); + } catch (final Exception e) { + logger.error("Failed to purge old events from Provenance Repo due to {}", e.toString()); + if (logger.isDebugEnabled()) { + logger.error("", e); + } + eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to purge old events from Provenance Repo due to " + e.toString()); + } + } + }, 1L, 1L, TimeUnit.MINUTES); + + expirationActions.add(new DeleteIndexAction(this, indexConfig, indexManager)); + expirationActions.add(new FileRemovalAction()); + } + } finally { + writeLock.unlock(); + } } private static RepositoryConfiguration createRepositoryConfiguration() throws IOException { @@ -334,10 +335,11 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository final File journalDirectory = new File(storageDirectory, "journals"); final File journalFile = new File(journalDirectory, String.valueOf(initialRecordId) + ".journal." + i); - writers[i] = RecordWriters.newRecordWriter(journalFile); + writers[i] = RecordWriters.newRecordWriter(journalFile, false, false); writers[i].writeHeader(); } + logger.info("Created new Provenance Event Writers for events starting with ID {}", initialRecordId); return writers; } @@ -501,18 +503,15 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository // Determine the max ID in the last file. try (final RecordReader reader = RecordReaders.newRecordReader(maxIdFile, getAllLogFiles())) { - ProvenanceEventRecord record; - while ((record = reader.nextRecord()) != null) { - final long eventId = record.getEventId(); - if (eventId > maxId) { - maxId = eventId; - } + final long eventId = reader.getMaxEventId(); + if (eventId > maxId) { + maxId = eventId; + } - // If the ID is greater than the max indexed id and this file was indexed, then - // update the max indexed id - if (eventId > maxIndexedId && lastFileIndexed) { - maxIndexedId = eventId; - } + // If the ID is greater than the max indexed id and this file was indexed, then + // update the max indexed id + if (eventId > maxIndexedId && lastFileIndexed) { + maxIndexedId = eventId; } } catch (final IOException ioe) { logger.error("Failed to read Provenance Event File {} due to {}", maxIdFile, ioe); @@ -568,16 +567,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository // Read the records in the last file to find its max id if (greatestMinIdFile != null) { try (final RecordReader recordReader = RecordReaders.newRecordReader(greatestMinIdFile, Collections.<Path>emptyList())) { - StandardProvenanceEventRecord record; - - try { - while ((record = recordReader.nextRecord()) != null) { - if (record.getEventId() > maxId) { - maxId = record.getEventId(); - } - } - } catch (final EOFException eof) { - } + maxId = recordReader.getMaxEventId(); } } @@ -599,46 +589,11 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } logger.info("Recovered {} records", recordsRecovered); - - final List<RolloverAction> rolloverActions = this.rolloverActions; - final Runnable retroactiveRollover = new Runnable() { - @Override - public void run() { - for (File toRecover : filesToRecover) { - final String baseFileName = LuceneUtil.substringBefore(toRecover.getName(), "."); - final Long fileFirstEventId = Long.parseLong(baseFileName); - - for (final RolloverAction action : rolloverActions) { - if (!action.hasBeenPerformed(toRecover)) { - try { - final StopWatch stopWatch = new StopWatch(true); - - toRecover = action.execute(toRecover); - - stopWatch.stop(); - final String duration = stopWatch.getDuration(); - logger.info("Successfully performed retroactive action {} against {} in {}", action, toRecover, duration); - - // update our map of id to Path - final Map<Long, Path> updatedMap = addToPathMap(fileFirstEventId, toRecover.toPath()); - logger.trace("After retroactive rollover action {}, Path Map: {}", action, updatedMap); - } catch (final Exception e) { - logger.error("Failed to perform retroactive rollover actions on {} due to {}", toRecover, e.toString()); - logger.error("", e); - eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to perform retroactive rollover actions on " + toRecover + " due to " + e.toString()); - } - } - } - } - } - }; - rolloverExecutor.submit(retroactiveRollover); - recoveryFinished.set(true); } @Override - public void close() throws IOException { + public synchronized void close() throws IOException { writeLock.lock(); try { logger.debug("Obtained write lock for close"); @@ -648,8 +603,12 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository rolloverExecutor.shutdownNow(); queryExecService.shutdownNow(); - for (final RecordWriter writer : writers) { - writer.close(); + indexManager.close(); + + if ( writers != null ) { + for (final RecordWriter writer : writers) { + writer.close(); + } } } finally { writeLock.unlock(); @@ -945,6 +904,21 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } } + // made protected for testing purposes + protected int getJournalCount() { + // determine how many 'journals' we have in the journals directories + int journalFileCount = 0; + for ( final File storageDir : configuration.getStorageDirectories() ) { + final File journalsDir = new File(storageDir, "journals"); + final File[] journalFiles = journalsDir.listFiles(); + if ( journalFiles != null ) { + journalFileCount += journalFiles.length; + } + } + + return journalFileCount; + } + /** * MUST be called with the write lock held * @@ -963,9 +937,45 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository for (final RecordWriter writer : writers) { final File writerFile = writer.getFile(); journalsToMerge.add(writerFile); - writer.close(); + try { + writer.close(); + } catch (final IOException ioe) { + logger.warn("Failed to close {} due to {}", writer, ioe.toString()); + if ( logger.isDebugEnabled() ) { + logger.warn("", ioe); + } + } + } + if ( logger.isDebugEnabled() ) { + logger.debug("Going to merge {} files for journals starting with ID {}", journalsToMerge.size(), LuceneUtil.substringBefore(journalsToMerge.get(0).getName(), ".")); } + int journalFileCount = getJournalCount(); + final int journalCountThreshold = configuration.getJournalCount() * 5; + if ( journalFileCount > journalCountThreshold ) { + logger.warn("The rate of the dataflow is exceeding the provenance recording rate.â" + + "Slowing down flow to accomodate. Currently, there are {} journal files and " + + "threshold for blocking is {}", journalFileCount, journalCountThreshold); + eventReporter.reportEvent(Severity.WARNING, "Provenance Repository", "The rate of the dataflow is " + + "exceeding the provenance recording rate.âSlowing down flow to accomodate"); + + while (journalFileCount > journalCountThreshold) { + try { + Thread.sleep(1000L); + } catch (final InterruptedException ie) { + } + + logger.debug("Provenance Repository is still behind. Keeping flow slowed down " + + "to accomodate. Currently, there are {} journal files and " + + "threshold for blocking is {}", journalFileCount, journalCountThreshold); + + journalFileCount = getJournalCount(); + } + + logger.info("Provenance Repository has no caught up with rolling over journal files. Current number of " + + "journal files to be rolled over is {}", journalFileCount); + } + writers = createWriters(configuration, idGenerator.get()); streamStartTime.set(System.currentTimeMillis()); recordsWrittenSinceRollover.getAndSet(0); @@ -974,60 +984,29 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository final List<File> storageDirs = configuration.getStorageDirectories(); final File storageDir = storageDirs.get((int) (storageDirIdx % storageDirs.size())); - final List<RolloverAction> actions = rolloverActions; + final AtomicReference<Future<?>> futureReference = new AtomicReference<>(); final int recordsWritten = recordsWrittenSinceRollover.getAndSet(0); final Runnable rolloverRunnable = new Runnable() { @Override public void run() { - final File fileRolledOver; - - try { - fileRolledOver = mergeJournals(journalsToMerge, storageDir, getMergeFile(journalsToMerge, storageDir), eventReporter, latestRecords); - repoDirty.set(false); - } catch (final IOException ioe) { - repoDirty.set(true); - logger.error("Failed to merge Journal Files {} into a Provenance Log File due to {}", journalsToMerge, ioe.toString()); - logger.error("", ioe); - return; - } - - if (fileRolledOver == null) { - return; - } - File file = fileRolledOver; - - for (final RolloverAction action : actions) { - try { - final StopWatch stopWatch = new StopWatch(true); - file = action.execute(file); - stopWatch.stop(); - logger.info("Successfully performed Rollover Action {} for {} in {}", action, file, stopWatch.getDuration()); - - // update our map of id to Path - // need lock to update the map, even though it's an AtomicReference, AtomicReference allows those doing a - // get() to obtain the most up-to-date version but we use a writeLock to prevent multiple threads modifying - // it at one time - writeLock.lock(); - try { - final Long fileFirstEventId = Long.valueOf(LuceneUtil.substringBefore(fileRolledOver.getName(), ".")); - SortedMap<Long, Path> newIdToPathMap = new TreeMap<>(new PathMapComparator()); - newIdToPathMap.putAll(idToPathMap.get()); - newIdToPathMap.put(fileFirstEventId, file.toPath()); - idToPathMap.set(newIdToPathMap); - logger.trace("After rollover action {}, path map: {}", action, newIdToPathMap); - } finally { - writeLock.unlock(); - } - } catch (final Throwable t) { - logger.error("Failed to perform Rollover Action {} for {}: got Exception {}", - action, fileRolledOver, t.toString()); - logger.error("", t); - - return; - } - } - - if (actions.isEmpty()) { + try { + final File fileRolledOver; + + try { + fileRolledOver = mergeJournals(journalsToMerge, storageDir, getMergeFile(journalsToMerge, storageDir), eventReporter, latestRecords); + repoDirty.set(false); + } catch (final IOException ioe) { + repoDirty.set(true); + logger.error("Failed to merge Journal Files {} into a Provenance Log File due to {}", journalsToMerge, ioe.toString()); + logger.error("", ioe); + return; + } + + if (fileRolledOver == null) { + return; + } + File file = fileRolledOver; + // update our map of id to Path // need lock to update the map, even though it's an AtomicReference, AtomicReference allows those doing a // get() to obtain the most up-to-date version but we use a writeLock to prevent multiple threads modifying @@ -1042,35 +1021,37 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } finally { writeLock.unlock(); } - } - - logger.info("Successfully Rolled over Provenance Event file containing {} records", recordsWritten); - rolloverCompletions.getAndIncrement(); + + logger.info("Successfully Rolled over Provenance Event file containing {} records", recordsWritten); + rolloverCompletions.getAndIncrement(); + + // We have finished successfully. Cancel the future so that we don't run anymore + Future<?> future; + while ((future = futureReference.get()) == null) { + try { + Thread.sleep(10L); + } catch (final InterruptedException ie) { + } + } + + future.cancel(false); + } catch (final Throwable t) { + logger.error("Failed to rollover Provenance repository due to {}", t.toString()); + logger.error("", t); + } } }; - rolloverExecutor.submit(rolloverRunnable); + // We are going to schedule the future to run every 10 seconds. This allows us to keep retrying if we + // fail for some reason. When we succeed, the Runnable will cancel itself. + final Future<?> future = rolloverExecutor.scheduleWithFixedDelay(rolloverRunnable, 0, 10, TimeUnit.SECONDS); + futureReference.set(future); streamStartTime.set(System.currentTimeMillis()); bytesWrittenSinceRollover.set(0); } } - private SortedMap<Long, Path> addToPathMap(final Long firstEventId, final Path path) { - SortedMap<Long, Path> unmodifiableMap; - boolean updated = false; - do { - final SortedMap<Long, Path> existingMap = idToPathMap.get(); - final SortedMap<Long, Path> newIdToPathMap = new TreeMap<>(new PathMapComparator()); - newIdToPathMap.putAll(existingMap); - newIdToPathMap.put(firstEventId, path); - unmodifiableMap = Collections.unmodifiableSortedMap(newIdToPathMap); - - updated = idToPathMap.compareAndSet(existingMap, unmodifiableMap); - } while (!updated); - - return unmodifiableMap; - } private Set<File> recoverJournalFiles() throws IOException { if (!configuration.isAllowRollover()) { @@ -1093,6 +1074,10 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } for (final File journalFile : journalFiles) { + if ( journalFile.isDirectory() ) { + continue; + } + final String basename = LuceneUtil.substringBefore(journalFile.getName(), "."); List<File> files = journalMap.get(basename); if (files == null) { @@ -1135,22 +1120,92 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository return mergedFile; } - static File mergeJournals(final List<File> journalFiles, final File storageDir, final File mergedFile, final EventReporter eventReporter, final RingBuffer<ProvenanceEventRecord> ringBuffer) throws IOException { - final long startNanos = System.nanoTime(); + File mergeJournals(final List<File> journalFiles, final File storageDir, final File mergedFile, final EventReporter eventReporter, final RingBuffer<ProvenanceEventRecord> ringBuffer) throws IOException { + logger.debug("Merging {} to {}", journalFiles, mergedFile); + if ( this.closed ) { + logger.info("Provenance Repository has been closed; will not merge journal files to {}", mergedFile); + return null; + } + if (journalFiles.isEmpty()) { return null; } - if (mergedFile.exists()) { - throw new FileAlreadyExistsException("Cannot Merge " + journalFiles.size() + " Journal Files into Merged Provenance Log File " + mergedFile.getAbsolutePath() + " because the Merged File already exists"); + Collections.sort(journalFiles, new Comparator<File>() { + @Override + public int compare(final File o1, final File o2) { + final String suffix1 = LuceneUtil.substringAfterLast(o1.getName(), "."); + final String suffix2 = LuceneUtil.substringAfterLast(o2.getName(), "."); + + try { + final int journalIndex1 = Integer.parseInt(suffix1); + final int journalIndex2 = Integer.parseInt(suffix2); + return Integer.compare(journalIndex1, journalIndex2); + } catch (final NumberFormatException nfe) { + return o1.getName().compareTo(o2.getName()); + } + } + }); + + final String firstJournalFile = journalFiles.get(0).getName(); + final String firstFileSuffix = LuceneUtil.substringAfterLast(firstJournalFile, "."); + final boolean allPartialFiles = firstFileSuffix.equals("0"); + + // check if we have all of the "partial" files for the journal. + if (allPartialFiles) { + if ( mergedFile.exists() ) { + // we have all "partial" files and there is already a merged file. Delete the data from the index + // because the merge file may not be fully merged. We will re-merge. + logger.warn("Merged Journal File {} already exists; however, all partial journal files also exist " + + "so assuming that the merge did not finish. Repeating procedure in order to ensure consistency."); + + final DeleteIndexAction deleteAction = new DeleteIndexAction(this, indexConfig, indexManager); + try { + deleteAction.execute(mergedFile); + } catch (final Exception e) { + logger.warn("Failed to delete records from Journal File {} from the index; this could potentially result in duplicates. Failure was due to {}", mergedFile, e.toString()); + if ( logger.isDebugEnabled() ) { + logger.warn("", e); + } + } + + // Since we only store the file's basename, block offset, and event ID, and because the newly created file could end up on + // a different Storage Directory than the original, we need to ensure that we delete both the partially merged + // file and the TOC file. Otherwise, we could get the wrong copy and have issues retrieving events. + if ( !mergedFile.delete() ) { + logger.error("Failed to delete partially written Provenance Journal File {}. This may result in events from this journal " + + "file not being able to be displayed. This file should be deleted manually.", mergedFile); + } + + final File tocFile = TocUtil.getTocFile(mergedFile); + if ( tocFile.exists() && !tocFile.delete() ) { + logger.error("Failed to delete .toc file {}; this may result in not being able to read the Provenance Events from the {} Journal File. " + + "This can be corrected by manually deleting the {} file", tocFile, mergedFile, tocFile); + } + } + } else { + logger.warn("Cannot merge journal files {} because expected first file to end with extension '.0' " + + "but it did not; assuming that the files were already merged but only some finished deletion " + + "before restart. Deleting remaining partial journal files.", journalFiles); + + for ( final File file : journalFiles ) { + if ( !file.delete() && file.exists() ) { + logger.warn("Failed to delete unneeded journal file {}; this file should be cleaned up manually", file); + } + } + + return null; } - - final File tempMergedFile = new File(mergedFile.getParentFile(), mergedFile.getName() + ".part"); + + final long startNanos = System.nanoTime(); // Map each journal to a RecordReader final List<RecordReader> readers = new ArrayList<>(); int records = 0; + final boolean isCompress = configuration.isCompressOnRollover(); + final File writerFile = isCompress ? new File(mergedFile.getParentFile(), mergedFile.getName() + ".gz") : mergedFile; + try { for (final File journalFile : journalFiles) { try { @@ -1203,32 +1258,50 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository // loop over each entry in the map, persisting the records to the merged file in order, and populating the map // with the next entry from the journal file from which the previous record was written. - try (final RecordWriter writer = RecordWriters.newRecordWriter(tempMergedFile)) { + try (final RecordWriter writer = RecordWriters.newRecordWriter(writerFile, configuration.isCompressOnRollover(), true)) { writer.writeHeader(); - while (!recordToReaderMap.isEmpty()) { - final Map.Entry<StandardProvenanceEventRecord, RecordReader> entry = recordToReaderMap.entrySet().iterator().next(); - final StandardProvenanceEventRecord record = entry.getKey(); - final RecordReader reader = entry.getValue(); - - writer.writeRecord(record, record.getEventId()); - ringBuffer.add(record); - records++; - - // Remove this entry from the map - recordToReaderMap.remove(record); - - // Get the next entry from this reader and add it to the map - StandardProvenanceEventRecord nextRecord = null; - - try { - nextRecord = reader.nextRecord(); - } catch (final EOFException eof) { - } - - if (nextRecord != null) { - recordToReaderMap.put(nextRecord, reader); - } + final IndexingAction indexingAction = new IndexingAction(this, indexConfig); + + final File indexingDirectory = indexConfig.getWritableIndexDirectory(writerFile); + final IndexWriter indexWriter = indexManager.borrowIndexWriter(indexingDirectory); + try { + long maxId = 0L; + + while (!recordToReaderMap.isEmpty()) { + final Map.Entry<StandardProvenanceEventRecord, RecordReader> entry = recordToReaderMap.entrySet().iterator().next(); + final StandardProvenanceEventRecord record = entry.getKey(); + final RecordReader reader = entry.getValue(); + + writer.writeRecord(record, record.getEventId()); + final int blockIndex = writer.getTocWriter().getCurrentBlockIndex(); + + indexingAction.index(record, indexWriter, blockIndex); + maxId = record.getEventId(); + + ringBuffer.add(record); + records++; + + // Remove this entry from the map + recordToReaderMap.remove(record); + + // Get the next entry from this reader and add it to the map + StandardProvenanceEventRecord nextRecord = null; + + try { + nextRecord = reader.nextRecord(); + } catch (final EOFException eof) { + } + + if (nextRecord != null) { + recordToReaderMap.put(nextRecord, reader); + } + } + + indexWriter.commit(); + indexConfig.setMaxIdIndexed(maxId); + } finally { + indexManager.returnIndexWriter(indexingDirectory, indexWriter); } } } finally { @@ -1240,37 +1313,22 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } } - // Attempt to rename. Keep trying for a bit if we fail. This happens often if we have some external process - // that locks files, such as a virus scanner. - boolean renamed = false; - for (int i = 0; i < 10 && !renamed; i++) { - renamed = tempMergedFile.renameTo(mergedFile); - if (!renamed) { - try { - Thread.sleep(100L); - } catch (final InterruptedException ie) { - } - } - } - - if (!renamed) { - throw new IOException("Failed to merge journal files into single merged file " + mergedFile.getAbsolutePath() + " because " + tempMergedFile.getAbsolutePath() + " could not be renamed"); - } - // Success. Remove all of the journal files, as they're no longer needed, now that they've been merged. for (final File journalFile : journalFiles) { - if (!journalFile.delete()) { - if (journalFile.exists()) { - logger.warn("Failed to remove temporary journal file {}; this file should be cleaned up manually", journalFile.getAbsolutePath()); - eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to remove temporary journal file " + journalFile.getAbsolutePath() + "; this file should be cleaned up manually"); - } else { - logger.warn("Failed to remove temporary journal file {} because it no longer exists", journalFile.getAbsolutePath()); - } + if (!journalFile.delete() && journalFile.exists()) { + logger.warn("Failed to remove temporary journal file {}; this file should be cleaned up manually", journalFile.getAbsolutePath()); + eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to remove temporary journal file " + journalFile.getAbsolutePath() + "; this file should be cleaned up manually"); + } + + final File tocFile = TocUtil.getTocFile(journalFile); + if (!tocFile.delete() && tocFile.exists()) { + logger.warn("Failed to remove temporary journal TOC file {}; this file should be cleaned up manually", tocFile.getAbsolutePath()); + eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to remove temporary journal TOC file " + tocFile.getAbsolutePath() + "; this file should be cleaned up manually"); } } if (records == 0) { - mergedFile.delete(); + writerFile.delete(); return null; } else { final long nanos = System.nanoTime() - startNanos; @@ -1278,7 +1336,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository logger.info("Successfully merged {} journal files ({} records) into single Provenance Log File {} in {} milliseconds", journalFiles.size(), records, mergedFile, millis); } - return mergedFile; + return writerFile; } @Override @@ -1779,7 +1837,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository @Override public void run() { try { - final IndexSearch search = new IndexSearch(PersistentProvenanceRepository.this, indexDir); + final IndexSearch search = new IndexSearch(PersistentProvenanceRepository.this, indexDir, indexManager); final StandardQueryResult queryResult = search.search(query, retrievalCount); submission.getResult().update(queryResult.getMatchingEvents(), queryResult.getTotalHitCount()); if (queryResult.isFinished()) { @@ -1787,7 +1845,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository query, indexDir, queryResult.getQueryTime(), queryResult.getTotalHitCount()); } } catch (final Throwable t) { - logger.error("Failed to query provenance repository due to {}", t.toString()); + logger.error("Failed to query Provenance Repository Index {} due to {}", indexDir, t.toString()); if (logger.isDebugEnabled()) { logger.error("", t); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java index d47df4f..3951591 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java @@ -33,7 +33,8 @@ public class RepositoryConfiguration { private long eventFileBytes = 1024L * 1024L * 5L; // 5 MB private long desiredIndexBytes = 1024L * 1024L * 500L; // 500 MB private int journalCount = 16; - + private int compressionBlockBytes = 1024 * 1024; + private List<SearchableField> searchableFields = new ArrayList<>(); private List<SearchableField> searchableAttributes = new ArrayList<>(); private boolean compress = true; @@ -49,7 +50,16 @@ public class RepositoryConfiguration { return allowRollover; } - /** + + public int getCompressionBlockBytes() { + return compressionBlockBytes; + } + + public void setCompressionBlockBytes(int compressionBlockBytes) { + this.compressionBlockBytes = compressionBlockBytes; + } + + /** * Specifies where the repository will store data * * @return http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java index 5e4744b..9bbf195 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java @@ -17,41 +17,173 @@ package org.apache.nifi.provenance; import java.io.DataInputStream; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; +import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.zip.GZIPInputStream; +import org.apache.nifi.provenance.serialization.RecordReader; +import org.apache.nifi.provenance.toc.TocReader; +import org.apache.nifi.stream.io.BufferedInputStream; import org.apache.nifi.stream.io.ByteCountingInputStream; +import org.apache.nifi.stream.io.LimitingInputStream; import org.apache.nifi.stream.io.StreamUtils; -import org.apache.nifi.provenance.serialization.RecordReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class StandardRecordReader implements RecordReader { - - private final DataInputStream dis; - private final ByteCountingInputStream byteCountingIn; + private static final Logger logger = LoggerFactory.getLogger(StandardRecordReader.class); + + private final ByteCountingInputStream rawInputStream; private final String filename; private final int serializationVersion; + private final boolean compressed; + private final TocReader tocReader; + private final int headerLength; + + private DataInputStream dis; + private ByteCountingInputStream byteCountingIn; + + public StandardRecordReader(final InputStream in, final String filename) throws IOException { + this(in, filename, null); + } + + public StandardRecordReader(final InputStream in, final String filename, final TocReader tocReader) throws IOException { + logger.trace("Creating RecordReader for {}", filename); + + rawInputStream = new ByteCountingInputStream(in); + + final InputStream limitedStream; + if ( tocReader == null ) { + limitedStream = rawInputStream; + } else { + final long offset1 = tocReader.getBlockOffset(1); + if ( offset1 < 0 ) { + limitedStream = rawInputStream; + } else { + limitedStream = new LimitingInputStream(rawInputStream, offset1 - rawInputStream.getBytesConsumed()); + } + } + + final InputStream readableStream; + if (filename.endsWith(".gz")) { + readableStream = new BufferedInputStream(new GZIPInputStream(limitedStream)); + compressed = true; + } else { + readableStream = new BufferedInputStream(limitedStream); + compressed = false; + } - public StandardRecordReader(final InputStream in, final int serializationVersion, final String filename) { - if (serializationVersion < 1 || serializationVersion > 7) { - throw new IllegalArgumentException("Unable to deserialize record because the version is " + serializationVersion + " and supported versions are 1-6"); + byteCountingIn = new ByteCountingInputStream(readableStream); + dis = new DataInputStream(byteCountingIn); + + final String repoClassName = dis.readUTF(); + final int serializationVersion = dis.readInt(); + headerLength = repoClassName.getBytes(StandardCharsets.UTF_8).length + 2 + 4; // 2 bytes for string length, 4 for integer. + + if (serializationVersion < 1 || serializationVersion > 8) { + throw new IllegalArgumentException("Unable to deserialize record because the version is " + serializationVersion + " and supported versions are 1-8"); } - byteCountingIn = new ByteCountingInputStream(in); - this.dis = new DataInputStream(byteCountingIn); this.serializationVersion = serializationVersion; this.filename = filename; + this.tocReader = tocReader; + } + + @Override + public void skipToBlock(final int blockIndex) throws IOException { + if ( tocReader == null ) { + throw new IllegalStateException("Cannot skip to block " + blockIndex + " for Provenance Log " + filename + " because no Table-of-Contents file was found for this Log"); + } + + if ( blockIndex < 0 ) { + throw new IllegalArgumentException("Cannot skip to block " + blockIndex + " because the value is negative"); + } + + if ( blockIndex == getBlockIndex() ) { + return; + } + + final long offset = tocReader.getBlockOffset(blockIndex); + if ( offset < 0 ) { + throw new IOException("Unable to find block " + blockIndex + " in Provenance Log " + filename); + } + + final long curOffset = rawInputStream.getBytesConsumed(); + + final long bytesToSkip = offset - curOffset; + if ( bytesToSkip >= 0 ) { + try { + StreamUtils.skip(rawInputStream, bytesToSkip); + logger.debug("Skipped stream from offset {} to {} ({} bytes skipped)", curOffset, offset, bytesToSkip); + } catch (final IOException e) { + throw new IOException("Failed to skip to offset " + offset + " for block " + blockIndex + " of Provenance Log " + filename, e); + } + + resetStreamForNextBlock(); + } } + + private void resetStreamForNextBlock() throws IOException { + final InputStream limitedStream; + if ( tocReader == null ) { + limitedStream = rawInputStream; + } else { + final long offset = tocReader.getBlockOffset(1 + getBlockIndex()); + if ( offset < 0 ) { + limitedStream = rawInputStream; + } else { + limitedStream = new LimitingInputStream(rawInputStream, offset - rawInputStream.getBytesConsumed()); + } + } + + final InputStream readableStream; + if (compressed) { + readableStream = new BufferedInputStream(new GZIPInputStream(limitedStream)); + } else { + readableStream = new BufferedInputStream(limitedStream); + } + byteCountingIn = new ByteCountingInputStream(readableStream, rawInputStream.getBytesConsumed()); + dis = new DataInputStream(byteCountingIn); + } + + + @Override + public TocReader getTocReader() { + return tocReader; + } + + @Override + public boolean isBlockIndexAvailable() { + return tocReader != null; + } + + @Override + public int getBlockIndex() { + if ( tocReader == null ) { + throw new IllegalStateException("Cannot determine Block Index because no Table-of-Contents could be found for Provenance Log " + filename); + } + + return tocReader.getBlockIndex(rawInputStream.getBytesConsumed()); + } + + @Override + public long getBytesConsumed() { + return byteCountingIn.getBytesConsumed(); + } + private StandardProvenanceEventRecord readPreVersion6Record() throws IOException { final long startOffset = byteCountingIn.getBytesConsumed(); - if (!isData(byteCountingIn)) { + if (!isData()) { return null; } @@ -137,7 +269,7 @@ public class StandardRecordReader implements RecordReader { final long startOffset = byteCountingIn.getBytesConsumed(); - if (!isData(byteCountingIn)) { + if (!isData()) { return null; } @@ -242,9 +374,17 @@ public class StandardRecordReader implements RecordReader { } private String readUUID(final DataInputStream in) throws IOException { - final long msb = in.readLong(); - final long lsb = in.readLong(); - return new UUID(msb, lsb).toString(); + if ( serializationVersion < 8 ) { + final long msb = in.readLong(); + final long lsb = in.readLong(); + return new UUID(msb, lsb).toString(); + } else { + // before version 8, we serialized UUID's as two longs in order to + // write less data. However, in version 8 we changed to just writing + // out the string because it's extremely expensive to call UUID.fromString. + // In the end, since we generally compress, the savings in minimal anyway. + return in.readUTF(); + } } private String readNullableString(final DataInputStream in) throws IOException { @@ -272,16 +412,58 @@ public class StandardRecordReader implements RecordReader { return new String(strBytes, "UTF-8"); } - private boolean isData(final InputStream in) throws IOException { - in.mark(1); - final int nextByte = in.read(); - in.reset(); + private boolean isData() throws IOException { + byteCountingIn.mark(1); + int nextByte = byteCountingIn.read(); + byteCountingIn.reset(); + + if ( nextByte < 0 ) { + try { + resetStreamForNextBlock(); + } catch (final EOFException eof) { + return false; + } + + byteCountingIn.mark(1); + nextByte = byteCountingIn.read(); + byteCountingIn.reset(); + } + return (nextByte >= 0); } + + @Override + public long getMaxEventId() throws IOException { + if ( tocReader != null ) { + final long lastBlockOffset = tocReader.getLastBlockOffset(); + skipToBlock(tocReader.getBlockIndex(lastBlockOffset)); + } + + ProvenanceEventRecord record; + ProvenanceEventRecord lastRecord = null; + try { + while ((record = nextRecord()) != null) { + lastRecord = record; + } + } catch (final EOFException eof) { + // This can happen if we stop NIFi while the record is being written. + // This is OK, we just ignore this record. The session will not have been + // committed, so we can just process the FlowFile again. + } + + return (lastRecord == null) ? -1L : lastRecord.getEventId(); + } @Override public void close() throws IOException { + logger.trace("Closing Record Reader for {}", filename); + dis.close(); + rawInputStream.close(); + + if ( tocReader != null ) { + tocReader.close(); + } } @Override @@ -291,7 +473,10 @@ public class StandardRecordReader implements RecordReader { @Override public void skipTo(final long position) throws IOException { - final long currentPosition = byteCountingIn.getBytesConsumed(); + // we are subtracting headerLength from the number of bytes consumed because we used to + // consider the offset of the first record "0" - now we consider it whatever position it + // it really is in the stream. + final long currentPosition = byteCountingIn.getBytesConsumed() - headerLength; if (currentPosition == position) { return; } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java index df93084..dbb2c48 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java @@ -19,38 +19,54 @@ package org.apache.nifi.provenance; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.util.Collection; import java.util.Map; -import java.util.UUID; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import org.apache.nifi.provenance.serialization.RecordWriter; +import org.apache.nifi.provenance.toc.TocWriter; import org.apache.nifi.stream.io.BufferedOutputStream; import org.apache.nifi.stream.io.ByteCountingOutputStream; import org.apache.nifi.stream.io.DataOutputStream; -import org.apache.nifi.provenance.serialization.RecordWriter; +import org.apache.nifi.stream.io.GZIPOutputStream; +import org.apache.nifi.stream.io.NonCloseableOutputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class StandardRecordWriter implements RecordWriter { - + private static final Logger logger = LoggerFactory.getLogger(StandardRecordWriter.class); + private final File file; - private final DataOutputStream out; - private final ByteCountingOutputStream byteCountingOut; private final FileOutputStream fos; + private final ByteCountingOutputStream rawOutStream; + private final TocWriter tocWriter; + private final boolean compressed; + private final int uncompressedBlockSize; + + private DataOutputStream out; + private ByteCountingOutputStream byteCountingOut; + private long lastBlockOffset = 0L; private int recordCount = 0; private final Lock lock = new ReentrantLock(); - public StandardRecordWriter(final File file) throws IOException { + + public StandardRecordWriter(final File file, final TocWriter writer, final boolean compressed, final int uncompressedBlockSize) throws IOException { + logger.trace("Creating Record Writer for {}", file.getName()); + this.file = file; + this.compressed = compressed; this.fos = new FileOutputStream(file); - this.byteCountingOut = new ByteCountingOutputStream(new BufferedOutputStream(fos, 65536)); - this.out = new DataOutputStream(byteCountingOut); + rawOutStream = new ByteCountingOutputStream(fos); + this.uncompressedBlockSize = uncompressedBlockSize; + + this.tocWriter = writer; } static void writeUUID(final DataOutputStream out, final String uuid) throws IOException { - final UUID uuidObj = UUID.fromString(uuid); - out.writeLong(uuidObj.getMostSignificantBits()); - out.writeLong(uuidObj.getLeastSignificantBits()); + out.writeUTF(uuid); } static void writeUUIDs(final DataOutputStream out, final Collection<String> list) throws IOException { @@ -69,18 +85,67 @@ public class StandardRecordWriter implements RecordWriter { return file; } - @Override + @Override public synchronized void writeHeader() throws IOException { + lastBlockOffset = rawOutStream.getBytesWritten(); + resetWriteStream(); + out.writeUTF(PersistentProvenanceRepository.class.getName()); out.writeInt(PersistentProvenanceRepository.SERIALIZATION_VERSION); out.flush(); } + + private void resetWriteStream() throws IOException { + if ( out != null ) { + out.flush(); + } + + final long byteOffset = (byteCountingOut == null) ? rawOutStream.getBytesWritten() : byteCountingOut.getBytesWritten(); + + final OutputStream writableStream; + if ( compressed ) { + // because of the way that GZIPOutputStream works, we need to call close() on it in order for it + // to write its trailing bytes. But we don't want to close the underlying OutputStream, so we wrap + // the underlying OutputStream in a NonCloseableOutputStream + if ( out != null ) { + out.close(); + } + + if ( tocWriter != null ) { + tocWriter.addBlockOffset(rawOutStream.getBytesWritten()); + } + + writableStream = new BufferedOutputStream(new GZIPOutputStream(new NonCloseableOutputStream(rawOutStream), 1), 65536); + } else { + if ( tocWriter != null ) { + tocWriter.addBlockOffset(rawOutStream.getBytesWritten()); + } + + writableStream = new BufferedOutputStream(rawOutStream, 65536); + } + + this.byteCountingOut = new ByteCountingOutputStream(writableStream, byteOffset); + this.out = new DataOutputStream(byteCountingOut); + } + @Override public synchronized long writeRecord(final ProvenanceEventRecord record, long recordIdentifier) throws IOException { final ProvenanceEventType recordType = record.getEventType(); final long startBytes = byteCountingOut.getBytesWritten(); + // add a new block to the TOC if needed. + if ( tocWriter != null && (startBytes - lastBlockOffset >= uncompressedBlockSize) ) { + lastBlockOffset = startBytes; + + if ( compressed ) { + // because of the way that GZIPOutputStream works, we need to call close() on it in order for it + // to write its trailing bytes. But we don't want to close the underlying OutputStream, so we wrap + // the underlying OutputStream in a NonCloseableOutputStream + resetWriteStream(); + } + } + out.writeLong(recordIdentifier); out.writeUTF(record.getEventType().name()); out.writeLong(record.getEventTime()); @@ -196,13 +261,24 @@ public class StandardRecordWriter implements RecordWriter { @Override public synchronized void close() throws IOException { + logger.trace("Closing Record Writer for {}", file.getName()); + lock(); try { - out.flush(); - out.close(); + try { + out.flush(); + out.close(); + } finally { + rawOutStream.close(); + + if ( tocWriter != null ) { + tocWriter.close(); + } + } } finally { unlock(); } + } @Override @@ -232,6 +308,14 @@ public class StandardRecordWriter implements RecordWriter { @Override public void sync() throws IOException { - fos.getFD().sync(); + if ( tocWriter != null ) { + tocWriter.sync(); + } + fos.getFD().sync(); + } + + @Override + public TocWriter getTocWriter() { + return tocWriter; } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java index 4608419..7db04aa 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java @@ -16,25 +16,17 @@ */ package org.apache.nifi.provenance.lucene; -import java.io.EOFException; import java.io.File; import java.io.IOException; import java.util.List; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.Term; import org.apache.nifi.provenance.IndexConfiguration; import org.apache.nifi.provenance.PersistentProvenanceRepository; -import org.apache.nifi.provenance.StandardProvenanceEventRecord; import org.apache.nifi.provenance.expiration.ExpirationAction; import org.apache.nifi.provenance.serialization.RecordReader; import org.apache.nifi.provenance.serialization.RecordReaders; - -import org.apache.lucene.analysis.Analyzer; -import org.apache.lucene.analysis.standard.StandardAnalyzer; -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.IndexWriterConfig; -import org.apache.lucene.index.Term; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.FSDirectory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,10 +35,12 @@ public class DeleteIndexAction implements ExpirationAction { private static final Logger logger = LoggerFactory.getLogger(DeleteIndexAction.class); private final PersistentProvenanceRepository repository; private final IndexConfiguration indexConfiguration; + private final IndexManager indexManager; - public DeleteIndexAction(final PersistentProvenanceRepository repo, final IndexConfiguration indexConfiguration) { + public DeleteIndexAction(final PersistentProvenanceRepository repo, final IndexConfiguration indexConfiguration, final IndexManager indexManager) { this.repository = repo; this.indexConfiguration = indexConfiguration; + this.indexManager = indexManager; } @Override @@ -55,51 +49,38 @@ public class DeleteIndexAction implements ExpirationAction { long numDeleted = 0; long maxEventId = -1L; try (final RecordReader reader = RecordReaders.newRecordReader(expiredFile, repository.getAllLogFiles())) { - try { - StandardProvenanceEventRecord record; - while ((record = reader.nextRecord()) != null) { - numDeleted++; - - if (record.getEventId() > maxEventId) { - maxEventId = record.getEventId(); - } - } - } catch (final EOFException eof) { - // finished reading -- the last record was not completely written out, so it is discarded. - } - } catch (final EOFException eof) { - // no data in file. - return expiredFile; + maxEventId = reader.getMaxEventId(); + } catch (final IOException ioe) { + logger.warn("Failed to obtain max ID present in journal file {}", expiredFile.getAbsolutePath()); } // remove the records from the index final List<File> indexDirs = indexConfiguration.getIndexDirectories(expiredFile); for (final File indexingDirectory : indexDirs) { - try (final Directory directory = FSDirectory.open(indexingDirectory); - final Analyzer analyzer = new StandardAnalyzer()) { - IndexWriterConfig config = new IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer); - config.setWriteLockTimeout(300000L); - - Term term = new Term(FieldNames.STORAGE_FILENAME, LuceneUtil.substringBefore(expiredFile.getName(), ".")); + final Term term = new Term(FieldNames.STORAGE_FILENAME, LuceneUtil.substringBefore(expiredFile.getName(), ".")); - boolean deleteDir = false; - try (final IndexWriter indexWriter = new IndexWriter(directory, config)) { - indexWriter.deleteDocuments(term); - indexWriter.commit(); - final int docsLeft = indexWriter.numDocs(); - deleteDir = (docsLeft <= 0); - logger.debug("After expiring {}, there are {} docs left for index {}", expiredFile, docsLeft, indexingDirectory); - } + boolean deleteDir = false; + final IndexWriter writer = indexManager.borrowIndexWriter(indexingDirectory); + try { + writer.deleteDocuments(term); + writer.commit(); + final int docsLeft = writer.numDocs(); + deleteDir = (docsLeft <= 0); + logger.debug("After expiring {}, there are {} docs left for index {}", expiredFile, docsLeft, indexingDirectory); + } finally { + indexManager.returnIndexWriter(indexingDirectory, writer); + } - // we've confirmed that all documents have been removed. Delete the index directory. - if (deleteDir) { - indexConfiguration.removeIndexDirectory(indexingDirectory); - deleteDirectory(indexingDirectory); - logger.info("Removed empty index directory {}", indexingDirectory); - } + // we've confirmed that all documents have been removed. Delete the index directory. + if (deleteDir) { + indexManager.removeIndex(indexingDirectory); + indexConfiguration.removeIndexDirectory(indexingDirectory); + + deleteDirectory(indexingDirectory); + logger.info("Removed empty index directory {}", indexingDirectory); } } - + // Update the minimum index to 1 more than the max Event ID in this file. if (maxEventId > -1L) { indexConfiguration.setMinIdIndexed(maxEventId + 1L); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java index 6446a35..5a77f42 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java @@ -23,23 +23,30 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.SearchableFields; import org.apache.nifi.provenance.StandardProvenanceEventRecord; import org.apache.nifi.provenance.serialization.RecordReader; import org.apache.nifi.provenance.serialization.RecordReaders; - +import org.apache.nifi.provenance.toc.TocReader; import org.apache.lucene.document.Document; import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexableField; import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.TopDocs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class DocsReader { - + private final Logger logger = LoggerFactory.getLogger(DocsReader.class); + public DocsReader(final List<File> storageDirectories) { } @@ -48,6 +55,7 @@ public class DocsReader { return Collections.emptySet(); } + final long start = System.nanoTime(); final int numDocs = Math.min(topDocs.scoreDocs.length, maxResults); final List<Document> docs = new ArrayList<>(numDocs); @@ -60,63 +68,102 @@ public class DocsReader { } } + final long readDocuments = System.nanoTime() - start; + logger.debug("Reading {} Lucene Documents took {} millis", docs.size(), TimeUnit.NANOSECONDS.toMillis(readDocuments)); return read(docs, allProvenanceLogFiles); } + + private long getByteOffset(final Document d, final RecordReader reader) { + final IndexableField blockField = d.getField(FieldNames.BLOCK_INDEX); + if ( blockField != null ) { + final int blockIndex = blockField.numericValue().intValue(); + final TocReader tocReader = reader.getTocReader(); + return tocReader.getBlockOffset(blockIndex); + } + + return d.getField(FieldNames.STORAGE_FILE_OFFSET).numericValue().longValue(); + } + + + private ProvenanceEventRecord getRecord(final Document d, final RecordReader reader) throws IOException { + IndexableField blockField = d.getField(FieldNames.BLOCK_INDEX); + if ( blockField == null ) { + reader.skipTo(getByteOffset(d, reader)); + } else { + reader.skipToBlock(blockField.numericValue().intValue()); + } + + StandardProvenanceEventRecord record; + while ( (record = reader.nextRecord()) != null) { + IndexableField idField = d.getField(SearchableFields.Identifier.getSearchableFieldName()); + if ( idField == null || idField.numericValue().longValue() == record.getEventId() ) { + break; + } + } + + if ( record == null ) { + throw new IOException("Failed to find Provenance Event " + d); + } else { + return record; + } + } + + public Set<ProvenanceEventRecord> read(final List<Document> docs, final Collection<Path> allProvenanceLogFiles) throws IOException { LuceneUtil.sortDocsForRetrieval(docs); RecordReader reader = null; String lastStorageFilename = null; - long lastByteOffset = 0L; final Set<ProvenanceEventRecord> matchingRecords = new LinkedHashSet<>(); + final long start = System.nanoTime(); + int logFileCount = 0; + + final Set<String> storageFilesToSkip = new HashSet<>(); + try { for (final Document d : docs) { final String storageFilename = d.getField(FieldNames.STORAGE_FILENAME).stringValue(); - final long byteOffset = d.getField(FieldNames.STORAGE_FILE_OFFSET).numericValue().longValue(); - + if ( storageFilesToSkip.contains(storageFilename) ) { + continue; + } + try { - if (reader != null && storageFilename.equals(lastStorageFilename) && byteOffset > lastByteOffset) { - // Still the same file and the offset is downstream. - try { - reader.skipTo(byteOffset); - final StandardProvenanceEventRecord record = reader.nextRecord(); - matchingRecords.add(record); - } catch (final IOException e) { - throw new FileNotFoundException("Could not find Provenance Log File with basename " + storageFilename + " in the Provenance Repository"); - } - + if (reader != null && storageFilename.equals(lastStorageFilename)) { + matchingRecords.add(getRecord(d, reader)); } else { + logger.debug("Opening log file {}", storageFilename); + + logFileCount++; if (reader != null) { reader.close(); } List<File> potentialFiles = LuceneUtil.getProvenanceLogFiles(storageFilename, allProvenanceLogFiles); if (potentialFiles.isEmpty()) { - throw new FileNotFoundException("Could not find Provenance Log File with basename " + storageFilename + " in the Provenance Repository"); + logger.warn("Could not find Provenance Log File with basename {} in the " + + "Provenance Repository; assuming file has expired and continuing without it", storageFilename); + storageFilesToSkip.add(storageFilename); + continue; } if (potentialFiles.size() > 1) { - throw new FileNotFoundException("Found multiple Provenance Log Files with basename " + storageFilename + " in the Provenance Repository"); + throw new FileNotFoundException("Found multiple Provenance Log Files with basename " + + storageFilename + " in the Provenance Repository"); } for (final File file : potentialFiles) { - reader = RecordReaders.newRecordReader(file, allProvenanceLogFiles); - try { - reader.skip(byteOffset); - - final StandardProvenanceEventRecord record = reader.nextRecord(); - matchingRecords.add(record); + reader = RecordReaders.newRecordReader(file, allProvenanceLogFiles); + matchingRecords.add(getRecord(d, reader)); } catch (final IOException e) { - throw new IOException("Failed to retrieve record from Provenance File " + file + " due to " + e, e); + throw new IOException("Failed to retrieve record " + d + " from Provenance File " + file + " due to " + e, e); } } } } finally { lastStorageFilename = storageFilename; - lastByteOffset = byteOffset; } } } finally { @@ -125,6 +172,9 @@ public class DocsReader { } } + final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + logger.debug("Took {} ms to read {} events from {} prov log files", millis, matchingRecords.size(), logFileCount); + return matchingRecords; } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/FieldNames.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/FieldNames.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/FieldNames.java index 6afc193..90a73f4 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/FieldNames.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/FieldNames.java @@ -20,4 +20,5 @@ public class FieldNames { public static final String STORAGE_FILENAME = "storage-filename"; public static final String STORAGE_FILE_OFFSET = "storage-fileOffset"; + public static final String BLOCK_INDEX = "block-index"; }
