NIFI-527: Code cleanup
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/3cd18b0b Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/3cd18b0b Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/3cd18b0b Branch: refs/heads/NIFI-292 Commit: 3cd18b0babc5133e35a2771bc0d0acaf974c381f Parents: 666de3d Author: Mark Payne <[email protected]> Authored: Mon Apr 27 14:13:55 2015 -0400 Committer: Mark Payne <[email protected]> Committed: Mon Apr 27 14:13:55 2015 -0400 ---------------------------------------------------------------------- .../nifi/provenance/IndexConfiguration.java | 12 +- .../PersistentProvenanceRepository.java | 612 +++++++------- .../provenance/RepositoryConfiguration.java | 106 +-- .../nifi/provenance/StandardRecordReader.java | 246 +++--- .../nifi/provenance/StandardRecordWriter.java | 138 ++-- .../provenance/expiration/ExpirationAction.java | 6 +- .../provenance/lucene/DeleteIndexAction.java | 12 +- .../nifi/provenance/lucene/DocsReader.java | 79 +- .../nifi/provenance/lucene/IndexManager.java | 820 +++++++++---------- .../nifi/provenance/lucene/IndexSearch.java | 38 +- .../nifi/provenance/lucene/IndexingAction.java | 119 +-- .../nifi/provenance/lucene/LineageQuery.java | 6 +- .../nifi/provenance/lucene/LuceneUtil.java | 38 +- .../provenance/rollover/CompressionAction.java | 59 -- .../provenance/rollover/RolloverAction.java | 35 - .../provenance/serialization/RecordReader.java | 57 +- .../provenance/serialization/RecordReaders.java | 136 +-- .../provenance/serialization/RecordWriter.java | 23 +- .../provenance/serialization/RecordWriters.java | 8 +- .../nifi/provenance/toc/StandardTocReader.java | 44 +- .../nifi/provenance/toc/StandardTocWriter.java | 35 +- .../apache/nifi/provenance/toc/TocReader.java | 20 +- .../org/apache/nifi/provenance/toc/TocUtil.java | 27 +- .../apache/nifi/provenance/toc/TocWriter.java | 16 +- .../TestPersistentProvenanceRepository.java | 118 +-- .../TestStandardRecordReaderWriter.java | 162 ++-- .../org/apache/nifi/provenance/TestUtil.java | 2 +- .../provenance/toc/TestStandardTocReader.java | 20 +- .../provenance/toc/TestStandardTocWriter.java | 4 +- 29 files changed, 1391 insertions(+), 1607 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java index a5474d5..3beab65 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java @@ -92,7 +92,7 @@ public class IndexConfiguration { } return firstRecord.getEventTime(); } catch (final FileNotFoundException | EOFException fnf) { - return null; // file no longer exists or there's no record in this file + return null; // file no longer exists or there's no record in this file } catch (final IOException ioe) { logger.warn("Failed to read first entry in file {} due to {}", provenanceLogFile, ioe.toString()); logger.warn("", ioe); @@ -201,7 +201,8 @@ public class IndexConfiguration { * desired * @param endTime the end time of the query for which the indices are * desired - * @return + * @return the index directories that are applicable only for the given time + * span (times inclusive). */ public List<File> getIndexDirectories(final Long startTime, final Long endTime) { if (startTime == null && endTime == null) { @@ -252,7 +253,8 @@ public class IndexConfiguration { * * @param provenanceLogFile the provenance log file for which the index * directories are desired - * @return + * @return the index directories that are applicable only for the given + * event log */ public List<File> getIndexDirectories(final File provenanceLogFile) { final List<File> dirs = new ArrayList<>(); @@ -334,9 +336,7 @@ public class IndexConfiguration { } /** - * Returns the amount of disk space in bytes used by all of the indices - * - * @return + * @return the amount of disk space in bytes used by all of the indices */ public long getIndexSize() { lock.lock(); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java index 48cc164..fe89a5e 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java @@ -139,7 +139,6 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository private final List<ExpirationAction> expirationActions = new ArrayList<>(); - private final IndexingAction indexingAction; private final ConcurrentMap<String, AsyncQuerySubmission> querySubmissionMap = new ConcurrentHashMap<>(); private final ConcurrentMap<String, AsyncLineageSubmission> lineageSubmissionMap = new ConcurrentHashMap<>(); @@ -151,7 +150,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository private final AtomicBoolean initialized = new AtomicBoolean(false); private final AtomicBoolean repoDirty = new AtomicBoolean(false); - // we keep the last 1000 records on hand so that when the UI is opened and it asks for the last 1000 records we don't need to + // we keep the last 1000 records on hand so that when the UI is opened and it asks for the last 1000 records we don't need to // read them. Since this is a very cheap operation to keep them, it's worth the tiny expense for the improved user experience. private final RingBuffer<ProvenanceEventRecord> latestRecords = new RingBuffer<>(1000); private EventReporter eventReporter; @@ -184,13 +183,6 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository this.indexManager = new IndexManager(); this.alwaysSync = configuration.isAlwaysSync(); this.rolloverCheckMillis = rolloverCheckMillis; - - final List<SearchableField> fields = configuration.getSearchableFields(); - if (fields != null && !fields.isEmpty()) { - indexingAction = new IndexingAction(this, indexConfig); - } else { - indexingAction = null; - } scheduledExecService = Executors.newScheduledThreadPool(3, new NamedThreadFactory("Provenance Maintenance Thread")); queryExecService = Executors.newFixedThreadPool(configuration.getQueryThreadPoolSize(), new NamedThreadFactory("Provenance Query Thread")); @@ -205,69 +197,69 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository @Override public void initialize(final EventReporter eventReporter) throws IOException { - writeLock.lock(); - try { - if (initialized.getAndSet(true)) { - return; - } - - this.eventReporter = eventReporter; - - recover(); - - if (configuration.isAllowRollover()) { - writers = createWriters(configuration, idGenerator.get()); - } - - if (configuration.isAllowRollover()) { - scheduledExecService.scheduleWithFixedDelay(new Runnable() { - @Override - public void run() { - // Check if we need to roll over - if (needToRollover()) { - // it appears that we do need to roll over. Obtain write lock so that we can do so, and then - // confirm that we still need to. - writeLock.lock(); - try { - logger.debug("Obtained write lock to perform periodic rollover"); - - if (needToRollover()) { - try { - rollover(false); - } catch (final Exception e) { - logger.error("Failed to roll over Provenance Event Log due to {}", e.toString()); - logger.error("", e); - } - } - } finally { - writeLock.unlock(); - } - } - } - }, rolloverCheckMillis, rolloverCheckMillis, TimeUnit.MILLISECONDS); - - scheduledExecService.scheduleWithFixedDelay(new RemoveExpiredQueryResults(), 30L, 3L, TimeUnit.SECONDS); - scheduledExecService.scheduleWithFixedDelay(new Runnable() { - @Override - public void run() { - try { - purgeOldEvents(); - } catch (final Exception e) { - logger.error("Failed to purge old events from Provenance Repo due to {}", e.toString()); - if (logger.isDebugEnabled()) { - logger.error("", e); - } - eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to purge old events from Provenance Repo due to " + e.toString()); - } - } - }, 1L, 1L, TimeUnit.MINUTES); - - expirationActions.add(new DeleteIndexAction(this, indexConfig, indexManager)); - expirationActions.add(new FileRemovalAction()); - } - } finally { - writeLock.unlock(); - } + writeLock.lock(); + try { + if (initialized.getAndSet(true)) { + return; + } + + this.eventReporter = eventReporter; + + recover(); + + if (configuration.isAllowRollover()) { + writers = createWriters(configuration, idGenerator.get()); + } + + if (configuration.isAllowRollover()) { + scheduledExecService.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + // Check if we need to roll over + if (needToRollover()) { + // it appears that we do need to roll over. Obtain write lock so that we can do so, and then + // confirm that we still need to. + writeLock.lock(); + try { + logger.debug("Obtained write lock to perform periodic rollover"); + + if (needToRollover()) { + try { + rollover(false); + } catch (final Exception e) { + logger.error("Failed to roll over Provenance Event Log due to {}", e.toString()); + logger.error("", e); + } + } + } finally { + writeLock.unlock(); + } + } + } + }, rolloverCheckMillis, rolloverCheckMillis, TimeUnit.MILLISECONDS); + + scheduledExecService.scheduleWithFixedDelay(new RemoveExpiredQueryResults(), 30L, 3L, TimeUnit.SECONDS); + scheduledExecService.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + try { + purgeOldEvents(); + } catch (final Exception e) { + logger.error("Failed to purge old events from Provenance Repo due to {}", e.toString()); + if (logger.isDebugEnabled()) { + logger.error("", e); + } + eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to purge old events from Provenance Repo due to " + e.toString()); + } + } + }, 1L, 1L, TimeUnit.MINUTES); + + expirationActions.add(new DeleteIndexAction(this, indexConfig, indexManager)); + expirationActions.add(new FileRemovalAction()); + } + } finally { + writeLock.unlock(); + } } private static RepositoryConfiguration createRepositoryConfiguration() throws IOException { @@ -489,28 +481,26 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository maxIdFile = file; } - if (firstId > maxIndexedId && indexingAction != null && indexingAction.hasBeenPerformed(file)) { + if (firstId > maxIndexedId) { maxIndexedId = firstId - 1; } - if (firstId < minIndexedId && indexingAction != null && indexingAction.hasBeenPerformed(file)) { + if (firstId < minIndexedId) { minIndexedId = firstId; } } if (maxIdFile != null) { - final boolean lastFileIndexed = indexingAction == null ? false : indexingAction.hasBeenPerformed(maxIdFile); - // Determine the max ID in the last file. try (final RecordReader reader = RecordReaders.newRecordReader(maxIdFile, getAllLogFiles())) { - final long eventId = reader.getMaxEventId(); + final long eventId = reader.getMaxEventId(); if (eventId > maxId) { maxId = eventId; } // If the ID is greater than the max indexed id and this file was indexed, then // update the max indexed id - if (eventId > maxIndexedId && lastFileIndexed) { + if (eventId > maxIndexedId) { maxIndexedId = eventId; } } catch (final IOException ioe) { @@ -567,7 +557,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository // Read the records in the last file to find its max id if (greatestMinIdFile != null) { try (final RecordReader recordReader = RecordReaders.newRecordReader(greatestMinIdFile, Collections.<Path>emptyList())) { - maxId = recordReader.getMaxEventId(); + maxId = recordReader.getMaxEventId(); } } @@ -604,11 +594,11 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository queryExecService.shutdownNow(); indexManager.close(); - + if ( writers != null ) { - for (final RecordWriter writer : writers) { - writer.close(); - } + for (final RecordWriter writer : writers) { + writer.close(); + } } } finally { writeLock.unlock(); @@ -624,7 +614,8 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository readLock.lock(); try { if (repoDirty.get()) { - logger.debug("Cannot persist provenance record because there was an IOException last time a record persistence was attempted. Will not attempt to persist more records until the repo has been rolled over."); + logger.debug("Cannot persist provenance record because there was an IOException last time a record persistence was attempted. " + + "Will not attempt to persist more records until the repo has been rolled over."); return; } @@ -670,7 +661,8 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } catch (final IOException ioe) { logger.error("Failed to persist Provenance Event due to {}. Will not attempt to write to the Provenance Repository again until the repository has rolled over.", ioe.toString()); logger.error("", ioe); - eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to persist Provenance Event due to " + ioe.toString() + ". Will not attempt to write to the Provenance Repository again until the repository has rolled over"); + eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to persist Provenance Event due to " + ioe.toString() + + ". Will not attempt to write to the Provenance Repository again until the repository has rolled over"); // Switch from readLock to writeLock so that we can perform rollover readLock.unlock(); @@ -735,9 +727,9 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository /** * Returns the size, in bytes, of the Repository storage * - * @param logFiles - * @param timeCutoff - * @return + * @param logFiles the log files to consider + * @param timeCutoff if a log file's last modified date is before timeCutoff, it will be skipped + * @return the size of all log files given whose last mod date comes after (or equal to) timeCutoff */ public long getSize(final List<File> logFiles, final long timeCutoff) { long bytesUsed = 0L; @@ -760,7 +752,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository /** * Purges old events from the repository * - * @throws IOException + * @throws IOException if unable to purge old events due to an I/O problem */ void purgeOldEvents() throws IOException { while (!recoveryFinished.get()) { @@ -858,12 +850,16 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository removed.add(baseName); } catch (final FileNotFoundException fnf) { - logger.warn("Failed to perform Expiration Action {} on Provenance Event file {} because the file no longer exists; will not perform additional Expiration Actions on this file", currentAction, file); + logger.warn("Failed to perform Expiration Action {} on Provenance Event file {} because the file no longer exists; will not " + + "perform additional Expiration Actions on this file", currentAction, file); removed.add(baseName); } catch (final Throwable t) { - logger.warn("Failed to perform Expiration Action {} on Provenance Event file {} due to {}; will not perform additional Expiration Actions on this file at this time", currentAction, file, t.toString()); + logger.warn("Failed to perform Expiration Action {} on Provenance Event file {} due to {}; will not perform additional " + + "Expiration Actions on this file at this time", currentAction, file, t.toString()); logger.warn("", t); - eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to perform Expiration Action " + currentAction + " on Provenance Event file " + file + " due to " + t.toString() + "; will not perform additional Expiration Actions on this file at this time"); + eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to perform Expiration Action " + currentAction + + " on Provenance Event file " + file + " due to " + t.toString() + "; will not perform additional Expiration Actions " + + "on this file at this time"); } } @@ -906,24 +902,24 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository // made protected for testing purposes protected int getJournalCount() { - // determine how many 'journals' we have in the journals directories + // determine how many 'journals' we have in the journals directories int journalFileCount = 0; for ( final File storageDir : configuration.getStorageDirectories() ) { - final File journalsDir = new File(storageDir, "journals"); - final File[] journalFiles = journalsDir.listFiles(); - if ( journalFiles != null ) { - journalFileCount += journalFiles.length; - } + final File journalsDir = new File(storageDir, "journals"); + final File[] journalFiles = journalsDir.listFiles(); + if ( journalFiles != null ) { + journalFileCount += journalFiles.length; + } } - + return journalFileCount; } - + /** * MUST be called with the write lock held * - * @param force - * @throws IOException + * @param force if true, will force a rollover regardless of whether or not data has been written + * @throws IOException if unable to complete rollover */ private void rollover(final boolean force) throws IOException { if (!configuration.isAllowRollover()) { @@ -938,44 +934,44 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository final File writerFile = writer.getFile(); journalsToMerge.add(writerFile); try { - writer.close(); + writer.close(); } catch (final IOException ioe) { - logger.warn("Failed to close {} due to {}", writer, ioe.toString()); - if ( logger.isDebugEnabled() ) { - logger.warn("", ioe); - } + logger.warn("Failed to close {} due to {}", writer, ioe.toString()); + if ( logger.isDebugEnabled() ) { + logger.warn("", ioe); + } } } if ( logger.isDebugEnabled() ) { - logger.debug("Going to merge {} files for journals starting with ID {}", journalsToMerge.size(), LuceneUtil.substringBefore(journalsToMerge.get(0).getName(), ".")); + logger.debug("Going to merge {} files for journals starting with ID {}", journalsToMerge.size(), LuceneUtil.substringBefore(journalsToMerge.get(0).getName(), ".")); } int journalFileCount = getJournalCount(); final int journalCountThreshold = configuration.getJournalCount() * 5; if ( journalFileCount > journalCountThreshold ) { - logger.warn("The rate of the dataflow is exceeding the provenance recording rate.â" - + "Slowing down flow to accomodate. Currently, there are {} journal files and " - + "threshold for blocking is {}", journalFileCount, journalCountThreshold); - eventReporter.reportEvent(Severity.WARNING, "Provenance Repository", "The rate of the dataflow is " - + "exceeding the provenance recording rate.âSlowing down flow to accomodate"); - - while (journalFileCount > journalCountThreshold) { - try { - Thread.sleep(1000L); - } catch (final InterruptedException ie) { - } - - logger.debug("Provenance Repository is still behind. Keeping flow slowed down " - + "to accomodate. Currently, there are {} journal files and " - + "threshold for blocking is {}", journalFileCount, journalCountThreshold); - - journalFileCount = getJournalCount(); - } - - logger.info("Provenance Repository has no caught up with rolling over journal files. Current number of " - + "journal files to be rolled over is {}", journalFileCount); - } - + logger.warn("The rate of the dataflow is exceeding the provenance recording rate.â" + + "Slowing down flow to accomodate. Currently, there are {} journal files and " + + "threshold for blocking is {}", journalFileCount, journalCountThreshold); + eventReporter.reportEvent(Severity.WARNING, "Provenance Repository", "The rate of the dataflow is " + + "exceeding the provenance recording rate.âSlowing down flow to accomodate"); + + while (journalFileCount > journalCountThreshold) { + try { + Thread.sleep(1000L); + } catch (final InterruptedException ie) { + } + + logger.debug("Provenance Repository is still behind. Keeping flow slowed down " + + "to accomodate. Currently, there are {} journal files and " + + "threshold for blocking is {}", journalFileCount, journalCountThreshold); + + journalFileCount = getJournalCount(); + } + + logger.info("Provenance Repository has no caught up with rolling over journal files. Current number of " + + "journal files to be rolled over is {}", journalFileCount); + } + writers = createWriters(configuration, idGenerator.get()); streamStartTime.set(System.currentTimeMillis()); recordsWrittenSinceRollover.getAndSet(0); @@ -989,24 +985,24 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository final Runnable rolloverRunnable = new Runnable() { @Override public void run() { - try { - final File fileRolledOver; - - try { - fileRolledOver = mergeJournals(journalsToMerge, storageDir, getMergeFile(journalsToMerge, storageDir), eventReporter, latestRecords); - repoDirty.set(false); - } catch (final IOException ioe) { - repoDirty.set(true); - logger.error("Failed to merge Journal Files {} into a Provenance Log File due to {}", journalsToMerge, ioe.toString()); - logger.error("", ioe); - return; - } - - if (fileRolledOver == null) { - return; - } - File file = fileRolledOver; - + try { + final File fileRolledOver; + + try { + fileRolledOver = mergeJournals(journalsToMerge, storageDir, getMergeFile(journalsToMerge, storageDir), eventReporter, latestRecords); + repoDirty.set(false); + } catch (final IOException ioe) { + repoDirty.set(true); + logger.error("Failed to merge Journal Files {} into a Provenance Log File due to {}", journalsToMerge, ioe.toString()); + logger.error("", ioe); + return; + } + + if (fileRolledOver == null) { + return; + } + File file = fileRolledOver; + // update our map of id to Path // need lock to update the map, even though it's an AtomicReference, AtomicReference allows those doing a // get() to obtain the most up-to-date version but we use a writeLock to prevent multiple threads modifying @@ -1021,24 +1017,24 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } finally { writeLock.unlock(); } - - logger.info("Successfully Rolled over Provenance Event file containing {} records", recordsWritten); - rolloverCompletions.getAndIncrement(); - - // We have finished successfully. Cancel the future so that we don't run anymore - Future<?> future; - while ((future = futureReference.get()) == null) { - try { - Thread.sleep(10L); - } catch (final InterruptedException ie) { - } - } - - future.cancel(false); - } catch (final Throwable t) { - logger.error("Failed to rollover Provenance repository due to {}", t.toString()); - logger.error("", t); - } + + logger.info("Successfully Rolled over Provenance Event file containing {} records", recordsWritten); + rolloverCompletions.getAndIncrement(); + + // We have finished successfully. Cancel the future so that we don't run anymore + Future<?> future; + while ((future = futureReference.get()) == null) { + try { + Thread.sleep(10L); + } catch (final InterruptedException ie) { + } + } + + future.cancel(false); + } catch (final Throwable t) { + logger.error("Failed to rollover Provenance repository due to {}", t.toString()); + logger.error("", t); + } } }; @@ -1074,10 +1070,10 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } for (final File journalFile : journalFiles) { - if ( journalFile.isDirectory() ) { - continue; - } - + if ( journalFile.isDirectory() ) { + continue; + } + final String basename = LuceneUtil.substringBefore(journalFile.getName(), "."); List<File> files = journalMap.get(basename); if (files == null) { @@ -1120,83 +1116,84 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository return mergedFile; } - File mergeJournals(final List<File> journalFiles, final File storageDir, final File mergedFile, final EventReporter eventReporter, final RingBuffer<ProvenanceEventRecord> ringBuffer) throws IOException { - logger.debug("Merging {} to {}", journalFiles, mergedFile); - if ( this.closed ) { - logger.info("Provenance Repository has been closed; will not merge journal files to {}", mergedFile); - return null; - } - + File mergeJournals(final List<File> journalFiles, final File storageDir, final File mergedFile, final EventReporter eventReporter, + final RingBuffer<ProvenanceEventRecord> ringBuffer) throws IOException { + logger.debug("Merging {} to {}", journalFiles, mergedFile); + if ( this.closed ) { + logger.info("Provenance Repository has been closed; will not merge journal files to {}", mergedFile); + return null; + } + if (journalFiles.isEmpty()) { return null; } Collections.sort(journalFiles, new Comparator<File>() { - @Override - public int compare(final File o1, final File o2) { - final String suffix1 = LuceneUtil.substringAfterLast(o1.getName(), "."); - final String suffix2 = LuceneUtil.substringAfterLast(o2.getName(), "."); - - try { - final int journalIndex1 = Integer.parseInt(suffix1); - final int journalIndex2 = Integer.parseInt(suffix2); - return Integer.compare(journalIndex1, journalIndex2); - } catch (final NumberFormatException nfe) { - return o1.getName().compareTo(o2.getName()); - } - } + @Override + public int compare(final File o1, final File o2) { + final String suffix1 = LuceneUtil.substringAfterLast(o1.getName(), "."); + final String suffix2 = LuceneUtil.substringAfterLast(o2.getName(), "."); + + try { + final int journalIndex1 = Integer.parseInt(suffix1); + final int journalIndex2 = Integer.parseInt(suffix2); + return Integer.compare(journalIndex1, journalIndex2); + } catch (final NumberFormatException nfe) { + return o1.getName().compareTo(o2.getName()); + } + } }); - + final String firstJournalFile = journalFiles.get(0).getName(); final String firstFileSuffix = LuceneUtil.substringAfterLast(firstJournalFile, "."); final boolean allPartialFiles = firstFileSuffix.equals("0"); - + // check if we have all of the "partial" files for the journal. if (allPartialFiles) { - if ( mergedFile.exists() ) { - // we have all "partial" files and there is already a merged file. Delete the data from the index - // because the merge file may not be fully merged. We will re-merge. - logger.warn("Merged Journal File {} already exists; however, all partial journal files also exist " - + "so assuming that the merge did not finish. Repeating procedure in order to ensure consistency."); - - final DeleteIndexAction deleteAction = new DeleteIndexAction(this, indexConfig, indexManager); - try { - deleteAction.execute(mergedFile); - } catch (final Exception e) { - logger.warn("Failed to delete records from Journal File {} from the index; this could potentially result in duplicates. Failure was due to {}", mergedFile, e.toString()); - if ( logger.isDebugEnabled() ) { - logger.warn("", e); - } - } - - // Since we only store the file's basename, block offset, and event ID, and because the newly created file could end up on - // a different Storage Directory than the original, we need to ensure that we delete both the partially merged - // file and the TOC file. Otherwise, we could get the wrong copy and have issues retrieving events. - if ( !mergedFile.delete() ) { - logger.error("Failed to delete partially written Provenance Journal File {}. This may result in events from this journal " - + "file not being able to be displayed. This file should be deleted manually.", mergedFile); - } - - final File tocFile = TocUtil.getTocFile(mergedFile); - if ( tocFile.exists() && !tocFile.delete() ) { - logger.error("Failed to delete .toc file {}; this may result in not being able to read the Provenance Events from the {} Journal File. " - + "This can be corrected by manually deleting the {} file", tocFile, mergedFile, tocFile); - } - } + if ( mergedFile.exists() ) { + // we have all "partial" files and there is already a merged file. Delete the data from the index + // because the merge file may not be fully merged. We will re-merge. + logger.warn("Merged Journal File {} already exists; however, all partial journal files also exist " + + "so assuming that the merge did not finish. Repeating procedure in order to ensure consistency."); + + final DeleteIndexAction deleteAction = new DeleteIndexAction(this, indexConfig, indexManager); + try { + deleteAction.execute(mergedFile); + } catch (final Exception e) { + logger.warn("Failed to delete records from Journal File {} from the index; this could potentially result in duplicates. Failure was due to {}", mergedFile, e.toString()); + if ( logger.isDebugEnabled() ) { + logger.warn("", e); + } + } + + // Since we only store the file's basename, block offset, and event ID, and because the newly created file could end up on + // a different Storage Directory than the original, we need to ensure that we delete both the partially merged + // file and the TOC file. Otherwise, we could get the wrong copy and have issues retrieving events. + if ( !mergedFile.delete() ) { + logger.error("Failed to delete partially written Provenance Journal File {}. This may result in events from this journal " + + "file not being able to be displayed. This file should be deleted manually.", mergedFile); + } + + final File tocFile = TocUtil.getTocFile(mergedFile); + if ( tocFile.exists() && !tocFile.delete() ) { + logger.error("Failed to delete .toc file {}; this may result in not being able to read the Provenance Events from the {} Journal File. " + + "This can be corrected by manually deleting the {} file", tocFile, mergedFile, tocFile); + } + } } else { - logger.warn("Cannot merge journal files {} because expected first file to end with extension '.0' " - + "but it did not; assuming that the files were already merged but only some finished deletion " - + "before restart. Deleting remaining partial journal files.", journalFiles); - - for ( final File file : journalFiles ) { - if ( !file.delete() && file.exists() ) { - logger.warn("Failed to delete unneeded journal file {}; this file should be cleaned up manually", file); - } - } - - return null; - } - + logger.warn("Cannot merge journal files {} because expected first file to end with extension '.0' " + + "but it did not; assuming that the files were already merged but only some finished deletion " + + "before restart. Deleting remaining partial journal files.", journalFiles); + + for ( final File file : journalFiles ) { + if ( !file.delete() && file.exists() ) { + logger.warn("Failed to delete unneeded journal file {}; this file should be cleaned up manually", file); + } + } + + return null; + } + final long startNanos = System.nanoTime(); // Map each journal to a RecordReader @@ -1241,12 +1238,14 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository record = reader.nextRecord(); } catch (final EOFException eof) { } catch (final Exception e) { - logger.warn("Failed to generate Provenance Event Record from Journal due to " + e + "; it's possible that the record wasn't completely written to the file. This record will be skipped."); + logger.warn("Failed to generate Provenance Event Record from Journal due to " + e + "; it's possible that the record wasn't " + + "completely written to the file. This record will be skipped."); if (logger.isDebugEnabled()) { logger.warn("", e); } - eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to read Provenance Event Record from Journal due to " + e + "; it's possible that hte record wasn't completely written to the file. This record will be skipped."); + eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to read Provenance Event Record from Journal due to " + e + + "; it's possible that hte record wasn't completely written to the file. This record will be skipped."); } if (record == null) { @@ -1261,47 +1260,47 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository try (final RecordWriter writer = RecordWriters.newRecordWriter(writerFile, configuration.isCompressOnRollover(), true)) { writer.writeHeader(); - final IndexingAction indexingAction = new IndexingAction(this, indexConfig); - + final IndexingAction indexingAction = new IndexingAction(this); + final File indexingDirectory = indexConfig.getWritableIndexDirectory(writerFile); final IndexWriter indexWriter = indexManager.borrowIndexWriter(indexingDirectory); try { - long maxId = 0L; - - while (!recordToReaderMap.isEmpty()) { - final Map.Entry<StandardProvenanceEventRecord, RecordReader> entry = recordToReaderMap.entrySet().iterator().next(); - final StandardProvenanceEventRecord record = entry.getKey(); - final RecordReader reader = entry.getValue(); - - writer.writeRecord(record, record.getEventId()); - final int blockIndex = writer.getTocWriter().getCurrentBlockIndex(); - - indexingAction.index(record, indexWriter, blockIndex); - maxId = record.getEventId(); - - ringBuffer.add(record); - records++; - - // Remove this entry from the map - recordToReaderMap.remove(record); - - // Get the next entry from this reader and add it to the map - StandardProvenanceEventRecord nextRecord = null; - - try { - nextRecord = reader.nextRecord(); - } catch (final EOFException eof) { - } - - if (nextRecord != null) { - recordToReaderMap.put(nextRecord, reader); - } - } - - indexWriter.commit(); - indexConfig.setMaxIdIndexed(maxId); + long maxId = 0L; + + while (!recordToReaderMap.isEmpty()) { + final Map.Entry<StandardProvenanceEventRecord, RecordReader> entry = recordToReaderMap.entrySet().iterator().next(); + final StandardProvenanceEventRecord record = entry.getKey(); + final RecordReader reader = entry.getValue(); + + writer.writeRecord(record, record.getEventId()); + final int blockIndex = writer.getTocWriter().getCurrentBlockIndex(); + + indexingAction.index(record, indexWriter, blockIndex); + maxId = record.getEventId(); + + ringBuffer.add(record); + records++; + + // Remove this entry from the map + recordToReaderMap.remove(record); + + // Get the next entry from this reader and add it to the map + StandardProvenanceEventRecord nextRecord = null; + + try { + nextRecord = reader.nextRecord(); + } catch (final EOFException eof) { + } + + if (nextRecord != null) { + recordToReaderMap.put(nextRecord, reader); + } + } + + indexWriter.commit(); + indexConfig.setMaxIdIndexed(maxId); } finally { - indexManager.returnIndexWriter(indexingDirectory, indexWriter); + indexManager.returnIndexWriter(indexingDirectory, indexWriter); } } } finally { @@ -1319,7 +1318,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository logger.warn("Failed to remove temporary journal file {}; this file should be cleaned up manually", journalFile.getAbsolutePath()); eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to remove temporary journal file " + journalFile.getAbsolutePath() + "; this file should be cleaned up manually"); } - + final File tocFile = TocUtil.getTocFile(journalFile); if (!tocFile.delete() && tocFile.exists()) { logger.warn("Failed to remove temporary journal TOC file {}; this file should be cleaned up manually", tocFile.getAbsolutePath()); @@ -1374,7 +1373,8 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository public QuerySubmission submitQuery(final Query query) { final int numQueries = querySubmissionMap.size(); if (numQueries > MAX_UNDELETED_QUERY_RESULTS) { - throw new IllegalStateException("Cannot process query because there are currently " + numQueries + " queries whose results have not been deleted due to poorly behaving clients not issuing DELETE requests. Please try again later."); + throw new IllegalStateException("Cannot process query because there are currently " + numQueries + " queries whose results have not " + + "been deleted due to poorly behaving clients not issuing DELETE requests. Please try again later."); } if (query.getEndDate() != null && query.getStartDate() != null && query.getStartDate().getTime() > query.getEndDate().getTime()) { @@ -1416,7 +1416,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository final AtomicInteger retrievalCount = new AtomicInteger(0); final List<File> indexDirectories = indexConfig.getIndexDirectories( query.getStartDate() == null ? null : query.getStartDate().getTime(), - query.getEndDate() == null ? null : query.getEndDate().getTime()); + query.getEndDate() == null ? null : query.getEndDate().getTime()); final AsyncQuerySubmission result = new AsyncQuerySubmission(query, indexDirectories.size()); querySubmissionMap.put(query.getIdentifier(), result); @@ -1432,11 +1432,11 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } /** - * REMOVE-ME: This is for testing only and can be removed. + * This is for testing only and not actually used other than in debugging * - * @param luceneQuery - * @return - * @throws IOException + * @param luceneQuery the lucene query to execute + * @return an Iterator of ProvenanceEventRecord that match the query + * @throws IOException if unable to perform the query */ public Iterator<ProvenanceEventRecord> queryLucene(final org.apache.lucene.search.Query luceneQuery) throws IOException { final List<File> indexFiles = indexConfig.getIndexDirectories(); @@ -1601,7 +1601,8 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository return computeLineage(Collections.<String>singleton(flowFileUuid), LineageComputationType.FLOWFILE_LINEAGE, null, 0L, Long.MAX_VALUE); } - private Lineage computeLineage(final Collection<String> flowFileUuids, final LineageComputationType computationType, final Long eventId, final Long startTimestamp, final Long endTimestamp) throws IOException { + private Lineage computeLineage(final Collection<String> flowFileUuids, final LineageComputationType computationType, final Long eventId, final Long startTimestamp, + final Long endTimestamp) throws IOException { final AsyncLineageSubmission submission = submitLineageComputation(flowFileUuids, computationType, eventId, startTimestamp, endTimestamp); final StandardLineageResult result = submission.getResult(); while (!result.isFinished()) { @@ -1623,7 +1624,8 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository return submitLineageComputation(Collections.singleton(flowFileUuid), LineageComputationType.FLOWFILE_LINEAGE, null, 0L, Long.MAX_VALUE); } - private AsyncLineageSubmission submitLineageComputation(final Collection<String> flowFileUuids, final LineageComputationType computationType, final Long eventId, final long startTimestamp, final long endTimestamp) { + private AsyncLineageSubmission submitLineageComputation(final Collection<String> flowFileUuids, final LineageComputationType computationType, + final Long eventId, final long startTimestamp, final long endTimestamp) { final List<File> indexDirs = indexConfig.getIndexDirectories(startTimestamp, endTimestamp); final AsyncLineageSubmission result = new AsyncLineageSubmission(computationType, eventId, flowFileUuids, indexDirs.size()); lineageSubmissionMap.put(result.getLineageIdentifier(), result); @@ -1647,16 +1649,16 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } switch (event.getEventType()) { - case CLONE: - case FORK: - case JOIN: - case REPLAY: - return submitLineageComputation(event.getChildUuids(), LineageComputationType.EXPAND_CHILDREN, eventId, event.getEventTime(), Long.MAX_VALUE); - default: - final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1); - lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); - submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its children cannot be expanded"); - return submission; + case CLONE: + case FORK: + case JOIN: + case REPLAY: + return submitLineageComputation(event.getChildUuids(), LineageComputationType.EXPAND_CHILDREN, eventId, event.getEventTime(), Long.MAX_VALUE); + default: + final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1); + lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); + submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its children cannot be expanded"); + return submission; } } catch (final IOException ioe) { final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1); @@ -1684,17 +1686,17 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } switch (event.getEventType()) { - case JOIN: - case FORK: - case CLONE: - case REPLAY: - return submitLineageComputation(event.getParentUuids(), LineageComputationType.EXPAND_PARENTS, eventId, 0L, event.getEventTime()); - default: { - final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String>emptyList(), 1); - lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); - submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its parents cannot be expanded"); - return submission; - } + case JOIN: + case FORK: + case CLONE: + case REPLAY: + return submitLineageComputation(event.getParentUuids(), LineageComputationType.EXPAND_PARENTS, eventId, 0L, event.getEventTime()); + default: { + final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String>emptyList(), 1); + lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); + submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its parents cannot be expanded"); + return submission; + } } } catch (final IOException ioe) { final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String>emptyList(), 1); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java index 3951591..d0d147c 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java @@ -34,7 +34,7 @@ public class RepositoryConfiguration { private long desiredIndexBytes = 1024L * 1024L * 500L; // 500 MB private int journalCount = 16; private int compressionBlockBytes = 1024 * 1024; - + private List<SearchableField> searchableFields = new ArrayList<>(); private List<SearchableField> searchableAttributes = new ArrayList<>(); private boolean compress = true; @@ -50,19 +50,19 @@ public class RepositoryConfiguration { return allowRollover; } - + public int getCompressionBlockBytes() { - return compressionBlockBytes; - } + return compressionBlockBytes; + } - public void setCompressionBlockBytes(int compressionBlockBytes) { - this.compressionBlockBytes = compressionBlockBytes; - } + public void setCompressionBlockBytes(int compressionBlockBytes) { + this.compressionBlockBytes = compressionBlockBytes; + } - /** + /** * Specifies where the repository will store data * - * @return + * @return the directories where provenance files will be stored */ public List<File> getStorageDirectories() { return Collections.unmodifiableList(storageDirectories); @@ -71,18 +71,15 @@ public class RepositoryConfiguration { /** * Specifies where the repository should store data * - * @param storageDirectory + * @param storageDirectory the directory to store provenance files */ public void addStorageDirectory(final File storageDirectory) { this.storageDirectories.add(storageDirectory); } /** - * Returns the minimum amount of time that a given record will stay in the - * repository - * - * @param timeUnit - * @return + * @param timeUnit the desired time unit + * @return the max amount of time that a given record will stay in the repository */ public long getMaxRecordLife(final TimeUnit timeUnit) { return timeUnit.convert(recordLifeMillis, TimeUnit.MILLISECONDS); @@ -91,8 +88,8 @@ public class RepositoryConfiguration { /** * Specifies how long a record should stay in the repository * - * @param maxRecordLife - * @param timeUnit + * @param maxRecordLife the max amount of time to keep a record in the repo + * @param timeUnit the period of time used by maxRecordLife */ public void setMaxRecordLife(final long maxRecordLife, final TimeUnit timeUnit) { this.recordLifeMillis = TimeUnit.MILLISECONDS.convert(maxRecordLife, timeUnit); @@ -101,7 +98,7 @@ public class RepositoryConfiguration { /** * Returns the maximum amount of data to store in the repository (in bytes) * - * @return + * @return the maximum amount of disk space to use for the prov repo */ public long getMaxStorageCapacity() { return storageCapacity; @@ -109,107 +106,91 @@ public class RepositoryConfiguration { /** * Sets the maximum amount of data to store in the repository (in bytes) - * @param maxStorageCapacity + * + * @param maxStorageCapacity the maximum amount of disk space to use for the prov repo */ public void setMaxStorageCapacity(final long maxStorageCapacity) { this.storageCapacity = maxStorageCapacity; } /** - * Returns the maximum amount of time to write to a single event file - * - * @param timeUnit - * @return + * @param timeUnit the desired time unit for the returned value + * @return the maximum amount of time that the repo will write to a single event file */ public long getMaxEventFileLife(final TimeUnit timeUnit) { return timeUnit.convert(eventFileMillis, TimeUnit.MILLISECONDS); } /** - * Sets the maximum amount of time to write to a single event file - * - * @param maxEventFileTime - * @param timeUnit + * @param maxEventFileTime the max amount of time to write to a single event file + * @param timeUnit the units for the value supplied by maxEventFileTime */ public void setMaxEventFileLife(final long maxEventFileTime, final TimeUnit timeUnit) { this.eventFileMillis = TimeUnit.MILLISECONDS.convert(maxEventFileTime, timeUnit); } /** - * Returns the maximum number of bytes (pre-compression) that will be + * @return the maximum number of bytes (pre-compression) that will be * written to a single event file before the file is rolled over - * - * @return */ public long getMaxEventFileCapacity() { return eventFileBytes; } /** - * Sets the maximum number of bytes (pre-compression) that will be written + * @param maxEventFileBytes the maximum number of bytes (pre-compression) that will be written * to a single event file before the file is rolled over - * - * @param maxEventFileBytes */ public void setMaxEventFileCapacity(final long maxEventFileBytes) { this.eventFileBytes = maxEventFileBytes; } /** - * Returns the fields that can be indexed - * - * @return + * @return the fields that should be indexed */ public List<SearchableField> getSearchableFields() { return Collections.unmodifiableList(searchableFields); } /** - * Sets the fields to index - * - * @param searchableFields + * @param searchableFields the fields to index */ public void setSearchableFields(final List<SearchableField> searchableFields) { this.searchableFields = new ArrayList<>(searchableFields); } /** - * Returns the FlowFile attributes that can be indexed - * - * @return + * @return the FlowFile attributes that should be indexed */ public List<SearchableField> getSearchableAttributes() { return Collections.unmodifiableList(searchableAttributes); } /** - * Sets the FlowFile attributes to index - * - * @param searchableAttributes + * @param searchableAttributes the FlowFile attributes to index */ public void setSearchableAttributes(final List<SearchableField> searchableAttributes) { this.searchableAttributes = new ArrayList<>(searchableAttributes); } /** - * Indicates whether or not event files will be compressed when they are + * @return whether or not event files will be compressed when they are * rolled over - * - * @return */ public boolean isCompressOnRollover() { return compress; } /** - * Specifies whether or not to compress event files on rollover - * - * @param compress + * @param compress if true, the data will be compressed when rolled over */ public void setCompressOnRollover(final boolean compress) { this.compress = compress; } + /** + * @return the number of threads to use to query the repo + */ public int getQueryThreadPoolSize() { return queryThreadPoolSize; } @@ -246,27 +227,23 @@ public class RepositoryConfiguration { * </li> * </ol> * - * @param bytes + * @param bytes the number of bytes to write to an index before beginning a new shard */ public void setDesiredIndexSize(final long bytes) { this.desiredIndexBytes = bytes; } /** - * Returns the desired size of each index shard. See the - * {@Link #setDesiredIndexSize} method for an explanation of why we choose + * @return the desired size of each index shard. See the + * {@link #setDesiredIndexSize} method for an explanation of why we choose * to shard the index. - * - * @return */ public long getDesiredIndexSize() { return desiredIndexBytes; } /** - * Sets the number of Journal files to use when persisting records. - * - * @param numJournals + * @param numJournals the number of Journal files to use when persisting records. */ public void setJournalCount(final int numJournals) { if (numJournals < 1) { @@ -277,19 +254,14 @@ public class RepositoryConfiguration { } /** - * Returns the number of Journal files that will be used when persisting - * records. - * - * @return + * @return the number of Journal files that will be used when persisting records. */ public int getJournalCount() { return journalCount; } /** - * Specifies whether or not the Repository should sync all updates to disk. - * - * @return + * @return <code>true</code> if the repository will perform an 'fsync' for all updates to disk */ public boolean isAlwaysSync() { return alwaysSync; @@ -301,7 +273,7 @@ public class RepositoryConfiguration { * persisted across restarted, even if there is a power failure or a sudden * Operating System crash, but it can be very expensive. * - * @param alwaysSync + * @param alwaysSync whether or not to perform an 'fsync' for all updates to disk */ public void setAlwaysSync(boolean alwaysSync) { this.alwaysSync = alwaysSync; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java index 9bbf195..ca0d5ed 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java @@ -39,40 +39,40 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class StandardRecordReader implements RecordReader { - private static final Logger logger = LoggerFactory.getLogger(StandardRecordReader.class); - - private final ByteCountingInputStream rawInputStream; + private static final Logger logger = LoggerFactory.getLogger(StandardRecordReader.class); + + private final ByteCountingInputStream rawInputStream; private final String filename; private final int serializationVersion; private final boolean compressed; private final TocReader tocReader; private final int headerLength; - + private DataInputStream dis; private ByteCountingInputStream byteCountingIn; public StandardRecordReader(final InputStream in, final String filename) throws IOException { - this(in, filename, null); + this(in, filename, null); } - + public StandardRecordReader(final InputStream in, final String filename, final TocReader tocReader) throws IOException { - logger.trace("Creating RecordReader for {}", filename); - - rawInputStream = new ByteCountingInputStream(in); + logger.trace("Creating RecordReader for {}", filename); + + rawInputStream = new ByteCountingInputStream(in); final InputStream limitedStream; if ( tocReader == null ) { - limitedStream = rawInputStream; + limitedStream = rawInputStream; } else { - final long offset1 = tocReader.getBlockOffset(1); - if ( offset1 < 0 ) { - limitedStream = rawInputStream; - } else { - limitedStream = new LimitingInputStream(rawInputStream, offset1 - rawInputStream.getBytesConsumed()); - } - } - - final InputStream readableStream; + final long offset1 = tocReader.getBlockOffset(1); + if ( offset1 < 0 ) { + limitedStream = rawInputStream; + } else { + limitedStream = new LimitingInputStream(rawInputStream, offset1 - rawInputStream.getBytesConsumed()); + } + } + + final InputStream readableStream; if (filename.endsWith(".gz")) { readableStream = new BufferedInputStream(new GZIPInputStream(limitedStream)); compressed = true; @@ -83,11 +83,11 @@ public class StandardRecordReader implements RecordReader { byteCountingIn = new ByteCountingInputStream(readableStream); dis = new DataInputStream(byteCountingIn); - + final String repoClassName = dis.readUTF(); final int serializationVersion = dis.readInt(); - headerLength = repoClassName.getBytes(StandardCharsets.UTF_8).length + 2 + 4; // 2 bytes for string length, 4 for integer. - + headerLength = repoClassName.getBytes(StandardCharsets.UTF_8).length + 2 + 4; // 2 bytes for string length, 4 for integer. + if (serializationVersion < 1 || serializationVersion > 8) { throw new IllegalArgumentException("Unable to deserialize record because the version is " + serializationVersion + " and supported versions are 1-8"); } @@ -99,52 +99,52 @@ public class StandardRecordReader implements RecordReader { @Override public void skipToBlock(final int blockIndex) throws IOException { - if ( tocReader == null ) { - throw new IllegalStateException("Cannot skip to block " + blockIndex + " for Provenance Log " + filename + " because no Table-of-Contents file was found for this Log"); - } - - if ( blockIndex < 0 ) { - throw new IllegalArgumentException("Cannot skip to block " + blockIndex + " because the value is negative"); - } - - if ( blockIndex == getBlockIndex() ) { - return; - } - - final long offset = tocReader.getBlockOffset(blockIndex); - if ( offset < 0 ) { - throw new IOException("Unable to find block " + blockIndex + " in Provenance Log " + filename); - } - - final long curOffset = rawInputStream.getBytesConsumed(); - - final long bytesToSkip = offset - curOffset; - if ( bytesToSkip >= 0 ) { - try { - StreamUtils.skip(rawInputStream, bytesToSkip); - logger.debug("Skipped stream from offset {} to {} ({} bytes skipped)", curOffset, offset, bytesToSkip); - } catch (final IOException e) { - throw new IOException("Failed to skip to offset " + offset + " for block " + blockIndex + " of Provenance Log " + filename, e); - } - - resetStreamForNextBlock(); - } + if ( tocReader == null ) { + throw new IllegalStateException("Cannot skip to block " + blockIndex + " for Provenance Log " + filename + " because no Table-of-Contents file was found for this Log"); + } + + if ( blockIndex < 0 ) { + throw new IllegalArgumentException("Cannot skip to block " + blockIndex + " because the value is negative"); + } + + if ( blockIndex == getBlockIndex() ) { + return; + } + + final long offset = tocReader.getBlockOffset(blockIndex); + if ( offset < 0 ) { + throw new IOException("Unable to find block " + blockIndex + " in Provenance Log " + filename); + } + + final long curOffset = rawInputStream.getBytesConsumed(); + + final long bytesToSkip = offset - curOffset; + if ( bytesToSkip >= 0 ) { + try { + StreamUtils.skip(rawInputStream, bytesToSkip); + logger.debug("Skipped stream from offset {} to {} ({} bytes skipped)", curOffset, offset, bytesToSkip); + } catch (final IOException e) { + throw new IOException("Failed to skip to offset " + offset + " for block " + blockIndex + " of Provenance Log " + filename, e); + } + + resetStreamForNextBlock(); + } } - + private void resetStreamForNextBlock() throws IOException { - final InputStream limitedStream; + final InputStream limitedStream; if ( tocReader == null ) { - limitedStream = rawInputStream; + limitedStream = rawInputStream; } else { - final long offset = tocReader.getBlockOffset(1 + getBlockIndex()); - if ( offset < 0 ) { - limitedStream = rawInputStream; - } else { - limitedStream = new LimitingInputStream(rawInputStream, offset - rawInputStream.getBytesConsumed()); - } - } - - final InputStream readableStream; + final long offset = tocReader.getBlockOffset(1 + getBlockIndex()); + if ( offset < 0 ) { + limitedStream = rawInputStream; + } else { + limitedStream = new LimitingInputStream(rawInputStream, offset - rawInputStream.getBytesConsumed()); + } + } + + final InputStream readableStream; if (compressed) { readableStream = new BufferedInputStream(new GZIPInputStream(limitedStream)); } else { @@ -154,32 +154,32 @@ public class StandardRecordReader implements RecordReader { byteCountingIn = new ByteCountingInputStream(readableStream, rawInputStream.getBytesConsumed()); dis = new DataInputStream(byteCountingIn); } - - + + @Override public TocReader getTocReader() { - return tocReader; + return tocReader; } - + @Override public boolean isBlockIndexAvailable() { - return tocReader != null; + return tocReader != null; } - + @Override public int getBlockIndex() { - if ( tocReader == null ) { - throw new IllegalStateException("Cannot determine Block Index because no Table-of-Contents could be found for Provenance Log " + filename); - } - - return tocReader.getBlockIndex(rawInputStream.getBytesConsumed()); + if ( tocReader == null ) { + throw new IllegalStateException("Cannot determine Block Index because no Table-of-Contents could be found for Provenance Log " + filename); + } + + return tocReader.getBlockIndex(rawInputStream.getBytesConsumed()); } - + @Override public long getBytesConsumed() { - return byteCountingIn.getBytesConsumed(); + return byteCountingIn.getBytesConsumed(); } - + private StandardProvenanceEventRecord readPreVersion6Record() throws IOException { final long startOffset = byteCountingIn.getBytesConsumed(); @@ -374,17 +374,17 @@ public class StandardRecordReader implements RecordReader { } private String readUUID(final DataInputStream in) throws IOException { - if ( serializationVersion < 8 ) { - final long msb = in.readLong(); - final long lsb = in.readLong(); - return new UUID(msb, lsb).toString(); - } else { - // before version 8, we serialized UUID's as two longs in order to - // write less data. However, in version 8 we changed to just writing - // out the string because it's extremely expensive to call UUID.fromString. - // In the end, since we generally compress, the savings in minimal anyway. - return in.readUTF(); - } + if ( serializationVersion < 8 ) { + final long msb = in.readLong(); + final long lsb = in.readLong(); + return new UUID(msb, lsb).toString(); + } else { + // before version 8, we serialized UUID's as two longs in order to + // write less data. However, in version 8 we changed to just writing + // out the string because it's extremely expensive to call UUID.fromString. + // In the end, since we generally compress, the savings in minimal anyway. + return in.readUTF(); + } } private String readNullableString(final DataInputStream in) throws IOException { @@ -416,53 +416,53 @@ public class StandardRecordReader implements RecordReader { byteCountingIn.mark(1); int nextByte = byteCountingIn.read(); byteCountingIn.reset(); - + if ( nextByte < 0 ) { - try { - resetStreamForNextBlock(); - } catch (final EOFException eof) { - return false; - } - + try { + resetStreamForNextBlock(); + } catch (final EOFException eof) { + return false; + } + byteCountingIn.mark(1); nextByte = byteCountingIn.read(); byteCountingIn.reset(); } - + return (nextByte >= 0); } - + @Override public long getMaxEventId() throws IOException { - if ( tocReader != null ) { - final long lastBlockOffset = tocReader.getLastBlockOffset(); - skipToBlock(tocReader.getBlockIndex(lastBlockOffset)); - } - - ProvenanceEventRecord record; - ProvenanceEventRecord lastRecord = null; - try { - while ((record = nextRecord()) != null) { - lastRecord = record; - } - } catch (final EOFException eof) { - // This can happen if we stop NIFi while the record is being written. - // This is OK, we just ignore this record. The session will not have been - // committed, so we can just process the FlowFile again. - } - - return (lastRecord == null) ? -1L : lastRecord.getEventId(); + if ( tocReader != null ) { + final long lastBlockOffset = tocReader.getLastBlockOffset(); + skipToBlock(tocReader.getBlockIndex(lastBlockOffset)); + } + + ProvenanceEventRecord record; + ProvenanceEventRecord lastRecord = null; + try { + while ((record = nextRecord()) != null) { + lastRecord = record; + } + } catch (final EOFException eof) { + // This can happen if we stop NIFi while the record is being written. + // This is OK, we just ignore this record. The session will not have been + // committed, so we can just process the FlowFile again. + } + + return (lastRecord == null) ? -1L : lastRecord.getEventId(); } @Override public void close() throws IOException { - logger.trace("Closing Record Reader for {}", filename); - + logger.trace("Closing Record Reader for {}", filename); + dis.close(); rawInputStream.close(); - + if ( tocReader != null ) { - tocReader.close(); + tocReader.close(); } } @@ -473,9 +473,9 @@ public class StandardRecordReader implements RecordReader { @Override public void skipTo(final long position) throws IOException { - // we are subtracting headerLength from the number of bytes consumed because we used to - // consider the offset of the first record "0" - now we consider it whatever position it - // it really is in the stream. + // we are subtracting headerLength from the number of bytes consumed because we used to + // consider the offset of the first record "0" - now we consider it whatever position it + // it really is in the stream. final long currentPosition = byteCountingIn.getBytesConsumed() - headerLength; if (currentPosition == position) { return; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java index dbb2c48..3095f13 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java @@ -36,15 +36,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class StandardRecordWriter implements RecordWriter { - private static final Logger logger = LoggerFactory.getLogger(StandardRecordWriter.class); - + private static final Logger logger = LoggerFactory.getLogger(StandardRecordWriter.class); + private final File file; private final FileOutputStream fos; private final ByteCountingOutputStream rawOutStream; private final TocWriter tocWriter; private final boolean compressed; private final int uncompressedBlockSize; - + private DataOutputStream out; private ByteCountingOutputStream byteCountingOut; private long lastBlockOffset = 0L; @@ -52,21 +52,21 @@ public class StandardRecordWriter implements RecordWriter { private final Lock lock = new ReentrantLock(); - + public StandardRecordWriter(final File file, final TocWriter writer, final boolean compressed, final int uncompressedBlockSize) throws IOException { - logger.trace("Creating Record Writer for {}", file.getName()); - + logger.trace("Creating Record Writer for {}", file.getName()); + this.file = file; this.compressed = compressed; this.fos = new FileOutputStream(file); rawOutStream = new ByteCountingOutputStream(fos); this.uncompressedBlockSize = uncompressedBlockSize; - + this.tocWriter = writer; } static void writeUUID(final DataOutputStream out, final String uuid) throws IOException { - out.writeUTF(uuid); + out.writeUTF(uuid); } static void writeUUIDs(final DataOutputStream out, final Collection<String> list) throws IOException { @@ -85,49 +85,49 @@ public class StandardRecordWriter implements RecordWriter { return file; } - @Override + @Override public synchronized void writeHeader() throws IOException { lastBlockOffset = rawOutStream.getBytesWritten(); resetWriteStream(); - + out.writeUTF(PersistentProvenanceRepository.class.getName()); out.writeInt(PersistentProvenanceRepository.SERIALIZATION_VERSION); out.flush(); } - + private void resetWriteStream() throws IOException { - if ( out != null ) { - out.flush(); - } - - final long byteOffset = (byteCountingOut == null) ? rawOutStream.getBytesWritten() : byteCountingOut.getBytesWritten(); - - final OutputStream writableStream; - if ( compressed ) { - // because of the way that GZIPOutputStream works, we need to call close() on it in order for it - // to write its trailing bytes. But we don't want to close the underlying OutputStream, so we wrap - // the underlying OutputStream in a NonCloseableOutputStream - if ( out != null ) { - out.close(); - } - - if ( tocWriter != null ) { - tocWriter.addBlockOffset(rawOutStream.getBytesWritten()); - } - - writableStream = new BufferedOutputStream(new GZIPOutputStream(new NonCloseableOutputStream(rawOutStream), 1), 65536); - } else { - if ( tocWriter != null ) { - tocWriter.addBlockOffset(rawOutStream.getBytesWritten()); - } - - writableStream = new BufferedOutputStream(rawOutStream, 65536); - } - + if ( out != null ) { + out.flush(); + } + + final long byteOffset = (byteCountingOut == null) ? rawOutStream.getBytesWritten() : byteCountingOut.getBytesWritten(); + + final OutputStream writableStream; + if ( compressed ) { + // because of the way that GZIPOutputStream works, we need to call close() on it in order for it + // to write its trailing bytes. But we don't want to close the underlying OutputStream, so we wrap + // the underlying OutputStream in a NonCloseableOutputStream + if ( out != null ) { + out.close(); + } + + if ( tocWriter != null ) { + tocWriter.addBlockOffset(rawOutStream.getBytesWritten()); + } + + writableStream = new BufferedOutputStream(new GZIPOutputStream(new NonCloseableOutputStream(rawOutStream), 1), 65536); + } else { + if ( tocWriter != null ) { + tocWriter.addBlockOffset(rawOutStream.getBytesWritten()); + } + + writableStream = new BufferedOutputStream(rawOutStream, 65536); + } + this.byteCountingOut = new ByteCountingOutputStream(writableStream, byteOffset); this.out = new DataOutputStream(byteCountingOut); } - + @Override public synchronized long writeRecord(final ProvenanceEventRecord record, long recordIdentifier) throws IOException { @@ -136,16 +136,16 @@ public class StandardRecordWriter implements RecordWriter { // add a new block to the TOC if needed. if ( tocWriter != null && (startBytes - lastBlockOffset >= uncompressedBlockSize) ) { - lastBlockOffset = startBytes; - - if ( compressed ) { - // because of the way that GZIPOutputStream works, we need to call close() on it in order for it - // to write its trailing bytes. But we don't want to close the underlying OutputStream, so we wrap - // the underlying OutputStream in a NonCloseableOutputStream - resetWriteStream(); - } + lastBlockOffset = startBytes; + + if ( compressed ) { + // because of the way that GZIPOutputStream works, we need to call close() on it in order for it + // to write its trailing bytes. But we don't want to close the underlying OutputStream, so we wrap + // the underlying OutputStream in a NonCloseableOutputStream + resetWriteStream(); + } } - + out.writeLong(recordIdentifier); out.writeUTF(record.getEventType().name()); out.writeLong(record.getEventTime()); @@ -175,7 +175,7 @@ public class StandardRecordWriter implements RecordWriter { writeLongNullableString(out, entry.getValue()); } - // If Content Claim Info is present, write out a 'TRUE' followed by claim info. Else, write out 'false'. + // If Content Claim Info is present, write out a 'TRUE' followed by claim info. Else, write out 'false'. if (record.getContentClaimSection() != null && record.getContentClaimContainer() != null && record.getContentClaimIdentifier() != null) { out.writeBoolean(true); out.writeUTF(record.getContentClaimContainer()); @@ -261,24 +261,24 @@ public class StandardRecordWriter implements RecordWriter { @Override public synchronized void close() throws IOException { - logger.trace("Closing Record Writer for {}", file.getName()); - + logger.trace("Closing Record Writer for {}", file.getName()); + lock(); try { - try { - out.flush(); - out.close(); - } finally { - rawOutStream.close(); - - if ( tocWriter != null ) { - tocWriter.close(); - } - } + try { + out.flush(); + out.close(); + } finally { + rawOutStream.close(); + + if ( tocWriter != null ) { + tocWriter.close(); + } + } } finally { unlock(); } - + } @Override @@ -308,14 +308,14 @@ public class StandardRecordWriter implements RecordWriter { @Override public void sync() throws IOException { - if ( tocWriter != null ) { - tocWriter.sync(); - } - fos.getFD().sync(); + if ( tocWriter != null ) { + tocWriter.sync(); + } + fos.getFD().sync(); } - + @Override public TocWriter getTocWriter() { - return tocWriter; + return tocWriter; } }
