http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/expiration/ExpirationAction.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/expiration/ExpirationAction.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/expiration/ExpirationAction.java index 8c266d1..0ffa5e6 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/expiration/ExpirationAction.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/expiration/ExpirationAction.java @@ -25,9 +25,9 @@ public interface ExpirationAction { * Performs some action against the given File and returns the new File that * contains the modified version * - * @param expiredFile - * @return - * @throws IOException + * @param expiredFile the file that was expired + * @return the new file after the file has been renamed, or the expiredFile if the file was not renamed + * @throws IOException if there was an IO problem */ File execute(File expiredFile) throws IOException;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java index 7db04aa..70bf36e 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java @@ -49,9 +49,9 @@ public class DeleteIndexAction implements ExpirationAction { long numDeleted = 0; long maxEventId = -1L; try (final RecordReader reader = RecordReaders.newRecordReader(expiredFile, repository.getAllLogFiles())) { - maxEventId = reader.getMaxEventId(); + maxEventId = reader.getMaxEventId(); } catch (final IOException ioe) { - logger.warn("Failed to obtain max ID present in journal file {}", expiredFile.getAbsolutePath()); + logger.warn("Failed to obtain max ID present in journal file {}", expiredFile.getAbsolutePath()); } // remove the records from the index @@ -68,19 +68,19 @@ public class DeleteIndexAction implements ExpirationAction { deleteDir = (docsLeft <= 0); logger.debug("After expiring {}, there are {} docs left for index {}", expiredFile, docsLeft, indexingDirectory); } finally { - indexManager.returnIndexWriter(indexingDirectory, writer); + indexManager.returnIndexWriter(indexingDirectory, writer); } // we've confirmed that all documents have been removed. Delete the index directory. if (deleteDir) { - indexManager.removeIndex(indexingDirectory); + indexManager.removeIndex(indexingDirectory); indexConfiguration.removeIndexDirectory(indexingDirectory); - + deleteDirectory(indexingDirectory); logger.info("Removed empty index directory {}", indexingDirectory); } } - + // Update the minimum index to 1 more than the max Event ID in this file. if (maxEventId > -1L) { indexConfiguration.setMinIdIndexed(maxEventId + 1L); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java index 5a77f42..98137fb 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java @@ -45,12 +45,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class DocsReader { - private final Logger logger = LoggerFactory.getLogger(DocsReader.class); - + private final Logger logger = LoggerFactory.getLogger(DocsReader.class); + public DocsReader(final List<File> storageDirectories) { } - public Set<ProvenanceEventRecord> read(final TopDocs topDocs, final IndexReader indexReader, final Collection<Path> allProvenanceLogFiles, final AtomicInteger retrievalCount, final int maxResults) throws IOException { + public Set<ProvenanceEventRecord> read(final TopDocs topDocs, final IndexReader indexReader, final Collection<Path> allProvenanceLogFiles, + final AtomicInteger retrievalCount, final int maxResults) throws IOException { if (retrievalCount.get() >= maxResults) { return Collections.emptySet(); } @@ -73,42 +74,42 @@ public class DocsReader { return read(docs, allProvenanceLogFiles); } - + private long getByteOffset(final Document d, final RecordReader reader) { final IndexableField blockField = d.getField(FieldNames.BLOCK_INDEX); if ( blockField != null ) { - final int blockIndex = blockField.numericValue().intValue(); - final TocReader tocReader = reader.getTocReader(); - return tocReader.getBlockOffset(blockIndex); + final int blockIndex = blockField.numericValue().intValue(); + final TocReader tocReader = reader.getTocReader(); + return tocReader.getBlockOffset(blockIndex); } - - return d.getField(FieldNames.STORAGE_FILE_OFFSET).numericValue().longValue(); + + return d.getField(FieldNames.STORAGE_FILE_OFFSET).numericValue().longValue(); } - - + + private ProvenanceEventRecord getRecord(final Document d, final RecordReader reader) throws IOException { - IndexableField blockField = d.getField(FieldNames.BLOCK_INDEX); - if ( blockField == null ) { - reader.skipTo(getByteOffset(d, reader)); - } else { - reader.skipToBlock(blockField.numericValue().intValue()); - } - + IndexableField blockField = d.getField(FieldNames.BLOCK_INDEX); + if ( blockField == null ) { + reader.skipTo(getByteOffset(d, reader)); + } else { + reader.skipToBlock(blockField.numericValue().intValue()); + } + StandardProvenanceEventRecord record; while ( (record = reader.nextRecord()) != null) { - IndexableField idField = d.getField(SearchableFields.Identifier.getSearchableFieldName()); - if ( idField == null || idField.numericValue().longValue() == record.getEventId() ) { - break; - } + IndexableField idField = d.getField(SearchableFields.Identifier.getSearchableFieldName()); + if ( idField == null || idField.numericValue().longValue() == record.getEventId() ) { + break; + } } - + if ( record == null ) { - throw new IOException("Failed to find Provenance Event " + d); + throw new IOException("Failed to find Provenance Event " + d); } else { - return record; + return record; } } - + public Set<ProvenanceEventRecord> read(final List<Document> docs, final Collection<Path> allProvenanceLogFiles) throws IOException { LuceneUtil.sortDocsForRetrieval(docs); @@ -119,23 +120,23 @@ public class DocsReader { final long start = System.nanoTime(); int logFileCount = 0; - + final Set<String> storageFilesToSkip = new HashSet<>(); - + try { for (final Document d : docs) { final String storageFilename = d.getField(FieldNames.STORAGE_FILENAME).stringValue(); if ( storageFilesToSkip.contains(storageFilename) ) { - continue; + continue; } - + try { if (reader != null && storageFilename.equals(lastStorageFilename)) { - matchingRecords.add(getRecord(d, reader)); + matchingRecords.add(getRecord(d, reader)); } else { - logger.debug("Opening log file {}", storageFilename); - - logFileCount++; + logger.debug("Opening log file {}", storageFilename); + + logFileCount++; if (reader != null) { reader.close(); } @@ -143,20 +144,20 @@ public class DocsReader { List<File> potentialFiles = LuceneUtil.getProvenanceLogFiles(storageFilename, allProvenanceLogFiles); if (potentialFiles.isEmpty()) { logger.warn("Could not find Provenance Log File with basename {} in the " - + "Provenance Repository; assuming file has expired and continuing without it", storageFilename); + + "Provenance Repository; assuming file has expired and continuing without it", storageFilename); storageFilesToSkip.add(storageFilename); continue; } if (potentialFiles.size() > 1) { - throw new FileNotFoundException("Found multiple Provenance Log Files with basename " + - storageFilename + " in the Provenance Repository"); + throw new FileNotFoundException("Found multiple Provenance Log Files with basename " + + storageFilename + " in the Provenance Repository"); } for (final File file : potentialFiles) { try { - reader = RecordReaders.newRecordReader(file, allProvenanceLogFiles); - matchingRecords.add(getRecord(d, reader)); + reader = RecordReaders.newRecordReader(file, allProvenanceLogFiles); + matchingRecords.add(getRecord(d, reader)); } catch (final IOException e) { throw new IOException("Failed to retrieve record " + d + " from Provenance File " + file + " due to " + e, e); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java index 3943504..9c3ec31 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java @@ -41,65 +41,65 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class IndexManager implements Closeable { - private static final Logger logger = LoggerFactory.getLogger(IndexManager.class); - - private final Lock lock = new ReentrantLock(); - private final Map<File, IndexWriterCount> writerCounts = new HashMap<>(); - private final Map<File, List<ActiveIndexSearcher>> activeSearchers = new HashMap<>(); - - - public void removeIndex(final File indexDirectory) { - final File absoluteFile = indexDirectory.getAbsoluteFile(); - logger.info("Removing index {}", indexDirectory); - - lock.lock(); - try { - final IndexWriterCount count = writerCounts.remove(absoluteFile); - if ( count != null ) { - try { - count.close(); - } catch (final IOException ioe) { - logger.warn("Failed to close Index Writer {} for {}", count.getWriter(), absoluteFile); - if ( logger.isDebugEnabled() ) { - logger.warn("", ioe); - } - } - } - - for ( final List<ActiveIndexSearcher> searcherList : activeSearchers.values() ) { - for ( final ActiveIndexSearcher searcher : searcherList ) { - try { - searcher.close(); - } catch (final IOException ioe) { - logger.warn("Failed to close Index Searcher {} for {} due to {}", - searcher.getSearcher(), absoluteFile, ioe); - if ( logger.isDebugEnabled() ) { - logger.warn("", ioe); - } - } - } - } - } finally { - lock.unlock(); - } - } - - public IndexWriter borrowIndexWriter(final File indexingDirectory) throws IOException { - final File absoluteFile = indexingDirectory.getAbsoluteFile(); - logger.debug("Borrowing index writer for {}", indexingDirectory); - - lock.lock(); - try { - IndexWriterCount writerCount = writerCounts.remove(absoluteFile); - if ( writerCount == null ) { - final List<Closeable> closeables = new ArrayList<>(); + private static final Logger logger = LoggerFactory.getLogger(IndexManager.class); + + private final Lock lock = new ReentrantLock(); + private final Map<File, IndexWriterCount> writerCounts = new HashMap<>(); + private final Map<File, List<ActiveIndexSearcher>> activeSearchers = new HashMap<>(); + + + public void removeIndex(final File indexDirectory) { + final File absoluteFile = indexDirectory.getAbsoluteFile(); + logger.info("Removing index {}", indexDirectory); + + lock.lock(); + try { + final IndexWriterCount count = writerCounts.remove(absoluteFile); + if ( count != null ) { + try { + count.close(); + } catch (final IOException ioe) { + logger.warn("Failed to close Index Writer {} for {}", count.getWriter(), absoluteFile); + if ( logger.isDebugEnabled() ) { + logger.warn("", ioe); + } + } + } + + for ( final List<ActiveIndexSearcher> searcherList : activeSearchers.values() ) { + for ( final ActiveIndexSearcher searcher : searcherList ) { + try { + searcher.close(); + } catch (final IOException ioe) { + logger.warn("Failed to close Index Searcher {} for {} due to {}", + searcher.getSearcher(), absoluteFile, ioe); + if ( logger.isDebugEnabled() ) { + logger.warn("", ioe); + } + } + } + } + } finally { + lock.unlock(); + } + } + + public IndexWriter borrowIndexWriter(final File indexingDirectory) throws IOException { + final File absoluteFile = indexingDirectory.getAbsoluteFile(); + logger.debug("Borrowing index writer for {}", indexingDirectory); + + lock.lock(); + try { + IndexWriterCount writerCount = writerCounts.remove(absoluteFile); + if ( writerCount == null ) { + final List<Closeable> closeables = new ArrayList<>(); final Directory directory = FSDirectory.open(indexingDirectory); closeables.add(directory); - + try { - final Analyzer analyzer = new StandardAnalyzer(); - closeables.add(analyzer); - + final Analyzer analyzer = new StandardAnalyzer(); + closeables.add(analyzer); + final IndexWriterConfig config = new IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer); config.setWriteLockTimeout(300000L); @@ -107,361 +107,361 @@ public class IndexManager implements Closeable { writerCount = new IndexWriterCount(indexWriter, analyzer, directory, 1); logger.debug("Providing new index writer for {}", indexingDirectory); } catch (final IOException ioe) { - for ( final Closeable closeable : closeables ) { - try { - closeable.close(); - } catch (final IOException ioe2) { - ioe.addSuppressed(ioe2); - } - } - - throw ioe; + for ( final Closeable closeable : closeables ) { + try { + closeable.close(); + } catch (final IOException ioe2) { + ioe.addSuppressed(ioe2); + } + } + + throw ioe; } - + writerCounts.put(absoluteFile, writerCount); - } else { - logger.debug("Providing existing index writer for {} and incrementing count to {}", indexingDirectory, writerCount.getCount() + 1); - writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(), - writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1)); - } - - return writerCount.getWriter(); - } finally { - lock.unlock(); - } - } - - public void returnIndexWriter(final File indexingDirectory, final IndexWriter writer) { - final File absoluteFile = indexingDirectory.getAbsoluteFile(); - logger.debug("Returning Index Writer for {} to IndexManager", indexingDirectory); - - lock.lock(); - try { - IndexWriterCount count = writerCounts.remove(absoluteFile); - - try { - if ( count == null ) { - logger.warn("Index Writer {} was returned to IndexManager for {}, but this writer is not known. " - + "This could potentially lead to a resource leak", writer, indexingDirectory); - writer.close(); - } else if ( count.getCount() <= 1 ) { - // we are finished with this writer. - logger.debug("Closing Index Writer for {}", indexingDirectory); - count.close(); - } else { - // decrement the count. - logger.debug("Decrementing count for Index Writer for {} to {}", indexingDirectory, count.getCount() - 1); - writerCounts.put(absoluteFile, new IndexWriterCount(count.getWriter(), count.getAnalyzer(), count.getDirectory(), count.getCount() - 1)); - } - } catch (final IOException ioe) { - logger.warn("Failed to close Index Writer {} due to {}", writer, ioe); - if ( logger.isDebugEnabled() ) { - logger.warn("", ioe); - } - } - } finally { - lock.unlock(); - } - } - - - public IndexSearcher borrowIndexSearcher(final File indexDir) throws IOException { - final File absoluteFile = indexDir.getAbsoluteFile(); - logger.debug("Borrowing index searcher for {}", indexDir); - - lock.lock(); - try { - // check if we already have a reader cached. - List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile); - if ( currentlyCached == null ) { - currentlyCached = new ArrayList<>(); - activeSearchers.put(absoluteFile, currentlyCached); - } else { - // keep track of any searchers that have been closed so that we can remove them - // from our cache later. - final Set<ActiveIndexSearcher> expired = new HashSet<>(); - - try { - for ( final ActiveIndexSearcher searcher : currentlyCached ) { - if ( searcher.isCache() ) { - final int refCount = searcher.getSearcher().getIndexReader().getRefCount(); - if ( refCount <= 0 ) { - // if refCount == 0, then the reader has been closed, so we need to discard the searcher - logger.debug("Reference count for cached Index Searcher for {} is currently {}; " - + "removing cached searcher", absoluteFile, refCount); - expired.add(searcher); - continue; - } - - logger.debug("Providing previously cached index searcher for {}", indexDir); - return searcher.getSearcher(); - } - } - } finally { - // if we have any expired index searchers, we need to close them and remove them - // from the cache so that we don't try to use them again later. - for ( final ActiveIndexSearcher searcher : expired ) { - try { - searcher.close(); - } catch (final Exception e) { - logger.debug("Failed to close 'expired' IndexSearcher {}", searcher); - } - - currentlyCached.remove(searcher); - } - } - } - - IndexWriterCount writerCount = writerCounts.remove(absoluteFile); - if ( writerCount == null ) { - final Directory directory = FSDirectory.open(absoluteFile); - logger.debug("No Index Writer currently exists for {}; creating a cachable reader", indexDir); - - try { - final DirectoryReader directoryReader = DirectoryReader.open(directory); - final IndexSearcher searcher = new IndexSearcher(directoryReader); - - // we want to cache the searcher that we create, since it's just a reader. - final ActiveIndexSearcher cached = new ActiveIndexSearcher(searcher, directoryReader, directory, true); - currentlyCached.add(cached); - - return cached.getSearcher(); - } catch (final IOException e) { - try { - directory.close(); - } catch (final IOException ioe) { - e.addSuppressed(ioe); - } - - throw e; - } - } else { - logger.debug("Index Writer currently exists for {}; creating a non-cachable reader and incrementing " - + "counter to {}", indexDir, writerCount.getCount() + 1); - - // increment the writer count to ensure that it's kept open. - writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(), - writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1)); - - // create a new Index Searcher from the writer so that we don't have an issue with trying - // to read from a directory that's locked. If we get the "no segments* file found" with - // Lucene, this indicates that an IndexWriter already has the directory open. - final IndexWriter writer = writerCount.getWriter(); - final DirectoryReader directoryReader = DirectoryReader.open(writer, false); - final IndexSearcher searcher = new IndexSearcher(directoryReader); - - // we don't want to cache this searcher because it's based on a writer, so we want to get - // new values the next time that we search. - final ActiveIndexSearcher activeSearcher = new ActiveIndexSearcher(searcher, directoryReader, null, false); - - currentlyCached.add(activeSearcher); - return activeSearcher.getSearcher(); - } - } finally { - lock.unlock(); - } - } - - - public void returnIndexSearcher(final File indexDirectory, final IndexSearcher searcher) { - final File absoluteFile = indexDirectory.getAbsoluteFile(); - logger.debug("Returning index searcher for {} to IndexManager", indexDirectory); - - lock.lock(); - try { - // check if we already have a reader cached. - List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile); - if ( currentlyCached == null ) { - logger.warn("Received Index Searcher for {} but no searcher was provided for that directory; this could " - + "result in a resource leak", indexDirectory); - return; - } - - final Iterator<ActiveIndexSearcher> itr = currentlyCached.iterator(); - while (itr.hasNext()) { - final ActiveIndexSearcher activeSearcher = itr.next(); - if ( activeSearcher.getSearcher().equals(searcher) ) { - if ( activeSearcher.isCache() ) { - // the searcher is cached. Just leave it open. - logger.debug("Index searcher for {} is cached; leaving open", indexDirectory); - return; - } else { - // searcher is not cached. It was created from a writer, and we want - // the newest updates the next time that we get a searcher, so we will - // go ahead and close this one out. - itr.remove(); - - // decrement the writer count because we incremented it when creating the searcher - final IndexWriterCount writerCount = writerCounts.remove(absoluteFile); - if ( writerCount != null ) { - if ( writerCount.getCount() <= 1 ) { - try { - logger.debug("Index searcher for {} is not cached. Writer count is " - + "decremented to {}; closing writer", indexDirectory, writerCount.getCount() - 1); - - writerCount.close(); - } catch (final IOException ioe) { - logger.warn("Failed to close Index Writer for {} due to {}", absoluteFile, ioe); - if ( logger.isDebugEnabled() ) { - logger.warn("", ioe); - } - } - } else { - logger.debug("Index searcher for {} is not cached. Writer count is decremented " - + "to {}; leaving writer open", indexDirectory, writerCount.getCount() - 1); - - writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(), - writerCount.getAnalyzer(), writerCount.getDirectory(), - writerCount.getCount() - 1)); - } - } - - try { - logger.debug("Closing Index Searcher for {}", indexDirectory); - activeSearcher.close(); - } catch (final IOException ioe) { - logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe); - if ( logger.isDebugEnabled() ) { - logger.warn("", ioe); - } - } - } - } - } - } finally { - lock.unlock(); - } - } - - @Override - public void close() throws IOException { - logger.debug("Closing Index Manager"); - - lock.lock(); - try { - IOException ioe = null; - - for ( final IndexWriterCount count : writerCounts.values() ) { - try { - count.close(); - } catch (final IOException e) { - if ( ioe == null ) { - ioe = e; - } else { - ioe.addSuppressed(e); - } - } - } - - for (final List<ActiveIndexSearcher> searcherList : activeSearchers.values()) { - for (final ActiveIndexSearcher searcher : searcherList) { - try { - searcher.close(); - } catch (final IOException e) { - if ( ioe == null ) { - ioe = e; - } else { - ioe.addSuppressed(e); - } - } - } - } - - if ( ioe != null ) { - throw ioe; - } - } finally { - lock.unlock(); - } - } - - - private static void close(final Closeable... closeables) throws IOException { - IOException ioe = null; - for ( final Closeable closeable : closeables ) { - if ( closeable == null ) { - continue; - } - - try { - closeable.close(); - } catch (final IOException e) { - if ( ioe == null ) { - ioe = e; - } else { - ioe.addSuppressed(e); - } - } - } - - if ( ioe != null ) { - throw ioe; - } - } - - - private static class ActiveIndexSearcher implements Closeable { - private final IndexSearcher searcher; - private final DirectoryReader directoryReader; - private final Directory directory; - private final boolean cache; - - public ActiveIndexSearcher(IndexSearcher searcher, DirectoryReader directoryReader, - Directory directory, final boolean cache) { - this.searcher = searcher; - this.directoryReader = directoryReader; - this.directory = directory; - this.cache = cache; - } - - public boolean isCache() { - return cache; - } - - public IndexSearcher getSearcher() { - return searcher; - } - - @Override - public void close() throws IOException { - IndexManager.close(directoryReader, directory); - } - } - - - private static class IndexWriterCount implements Closeable { - private final IndexWriter writer; - private final Analyzer analyzer; - private final Directory directory; - private final int count; - - public IndexWriterCount(final IndexWriter writer, final Analyzer analyzer, final Directory directory, final int count) { - this.writer = writer; - this.analyzer = analyzer; - this.directory = directory; - this.count = count; - } - - public Analyzer getAnalyzer() { - return analyzer; - } - - public Directory getDirectory() { - return directory; - } - - public IndexWriter getWriter() { - return writer; - } - - public int getCount() { - return count; - } - - @Override - public void close() throws IOException { - IndexManager.close(writer, analyzer, directory); - } - } + } else { + logger.debug("Providing existing index writer for {} and incrementing count to {}", indexingDirectory, writerCount.getCount() + 1); + writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(), + writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1)); + } + + return writerCount.getWriter(); + } finally { + lock.unlock(); + } + } + + public void returnIndexWriter(final File indexingDirectory, final IndexWriter writer) { + final File absoluteFile = indexingDirectory.getAbsoluteFile(); + logger.debug("Returning Index Writer for {} to IndexManager", indexingDirectory); + + lock.lock(); + try { + IndexWriterCount count = writerCounts.remove(absoluteFile); + + try { + if ( count == null ) { + logger.warn("Index Writer {} was returned to IndexManager for {}, but this writer is not known. " + + "This could potentially lead to a resource leak", writer, indexingDirectory); + writer.close(); + } else if ( count.getCount() <= 1 ) { + // we are finished with this writer. + logger.debug("Closing Index Writer for {}", indexingDirectory); + count.close(); + } else { + // decrement the count. + logger.debug("Decrementing count for Index Writer for {} to {}", indexingDirectory, count.getCount() - 1); + writerCounts.put(absoluteFile, new IndexWriterCount(count.getWriter(), count.getAnalyzer(), count.getDirectory(), count.getCount() - 1)); + } + } catch (final IOException ioe) { + logger.warn("Failed to close Index Writer {} due to {}", writer, ioe); + if ( logger.isDebugEnabled() ) { + logger.warn("", ioe); + } + } + } finally { + lock.unlock(); + } + } + + + public IndexSearcher borrowIndexSearcher(final File indexDir) throws IOException { + final File absoluteFile = indexDir.getAbsoluteFile(); + logger.debug("Borrowing index searcher for {}", indexDir); + + lock.lock(); + try { + // check if we already have a reader cached. + List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile); + if ( currentlyCached == null ) { + currentlyCached = new ArrayList<>(); + activeSearchers.put(absoluteFile, currentlyCached); + } else { + // keep track of any searchers that have been closed so that we can remove them + // from our cache later. + final Set<ActiveIndexSearcher> expired = new HashSet<>(); + + try { + for ( final ActiveIndexSearcher searcher : currentlyCached ) { + if ( searcher.isCache() ) { + final int refCount = searcher.getSearcher().getIndexReader().getRefCount(); + if ( refCount <= 0 ) { + // if refCount == 0, then the reader has been closed, so we need to discard the searcher + logger.debug("Reference count for cached Index Searcher for {} is currently {}; " + + "removing cached searcher", absoluteFile, refCount); + expired.add(searcher); + continue; + } + + logger.debug("Providing previously cached index searcher for {}", indexDir); + return searcher.getSearcher(); + } + } + } finally { + // if we have any expired index searchers, we need to close them and remove them + // from the cache so that we don't try to use them again later. + for ( final ActiveIndexSearcher searcher : expired ) { + try { + searcher.close(); + } catch (final Exception e) { + logger.debug("Failed to close 'expired' IndexSearcher {}", searcher); + } + + currentlyCached.remove(searcher); + } + } + } + + IndexWriterCount writerCount = writerCounts.remove(absoluteFile); + if ( writerCount == null ) { + final Directory directory = FSDirectory.open(absoluteFile); + logger.debug("No Index Writer currently exists for {}; creating a cachable reader", indexDir); + + try { + final DirectoryReader directoryReader = DirectoryReader.open(directory); + final IndexSearcher searcher = new IndexSearcher(directoryReader); + + // we want to cache the searcher that we create, since it's just a reader. + final ActiveIndexSearcher cached = new ActiveIndexSearcher(searcher, directoryReader, directory, true); + currentlyCached.add(cached); + + return cached.getSearcher(); + } catch (final IOException e) { + try { + directory.close(); + } catch (final IOException ioe) { + e.addSuppressed(ioe); + } + + throw e; + } + } else { + logger.debug("Index Writer currently exists for {}; creating a non-cachable reader and incrementing " + + "counter to {}", indexDir, writerCount.getCount() + 1); + + // increment the writer count to ensure that it's kept open. + writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(), + writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1)); + + // create a new Index Searcher from the writer so that we don't have an issue with trying + // to read from a directory that's locked. If we get the "no segments* file found" with + // Lucene, this indicates that an IndexWriter already has the directory open. + final IndexWriter writer = writerCount.getWriter(); + final DirectoryReader directoryReader = DirectoryReader.open(writer, false); + final IndexSearcher searcher = new IndexSearcher(directoryReader); + + // we don't want to cache this searcher because it's based on a writer, so we want to get + // new values the next time that we search. + final ActiveIndexSearcher activeSearcher = new ActiveIndexSearcher(searcher, directoryReader, null, false); + + currentlyCached.add(activeSearcher); + return activeSearcher.getSearcher(); + } + } finally { + lock.unlock(); + } + } + + + public void returnIndexSearcher(final File indexDirectory, final IndexSearcher searcher) { + final File absoluteFile = indexDirectory.getAbsoluteFile(); + logger.debug("Returning index searcher for {} to IndexManager", indexDirectory); + + lock.lock(); + try { + // check if we already have a reader cached. + List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile); + if ( currentlyCached == null ) { + logger.warn("Received Index Searcher for {} but no searcher was provided for that directory; this could " + + "result in a resource leak", indexDirectory); + return; + } + + final Iterator<ActiveIndexSearcher> itr = currentlyCached.iterator(); + while (itr.hasNext()) { + final ActiveIndexSearcher activeSearcher = itr.next(); + if ( activeSearcher.getSearcher().equals(searcher) ) { + if ( activeSearcher.isCache() ) { + // the searcher is cached. Just leave it open. + logger.debug("Index searcher for {} is cached; leaving open", indexDirectory); + return; + } else { + // searcher is not cached. It was created from a writer, and we want + // the newest updates the next time that we get a searcher, so we will + // go ahead and close this one out. + itr.remove(); + + // decrement the writer count because we incremented it when creating the searcher + final IndexWriterCount writerCount = writerCounts.remove(absoluteFile); + if ( writerCount != null ) { + if ( writerCount.getCount() <= 1 ) { + try { + logger.debug("Index searcher for {} is not cached. Writer count is " + + "decremented to {}; closing writer", indexDirectory, writerCount.getCount() - 1); + + writerCount.close(); + } catch (final IOException ioe) { + logger.warn("Failed to close Index Writer for {} due to {}", absoluteFile, ioe); + if ( logger.isDebugEnabled() ) { + logger.warn("", ioe); + } + } + } else { + logger.debug("Index searcher for {} is not cached. Writer count is decremented " + + "to {}; leaving writer open", indexDirectory, writerCount.getCount() - 1); + + writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(), + writerCount.getAnalyzer(), writerCount.getDirectory(), + writerCount.getCount() - 1)); + } + } + + try { + logger.debug("Closing Index Searcher for {}", indexDirectory); + activeSearcher.close(); + } catch (final IOException ioe) { + logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe); + if ( logger.isDebugEnabled() ) { + logger.warn("", ioe); + } + } + } + } + } + } finally { + lock.unlock(); + } + } + + @Override + public void close() throws IOException { + logger.debug("Closing Index Manager"); + + lock.lock(); + try { + IOException ioe = null; + + for ( final IndexWriterCount count : writerCounts.values() ) { + try { + count.close(); + } catch (final IOException e) { + if ( ioe == null ) { + ioe = e; + } else { + ioe.addSuppressed(e); + } + } + } + + for (final List<ActiveIndexSearcher> searcherList : activeSearchers.values()) { + for (final ActiveIndexSearcher searcher : searcherList) { + try { + searcher.close(); + } catch (final IOException e) { + if ( ioe == null ) { + ioe = e; + } else { + ioe.addSuppressed(e); + } + } + } + } + + if ( ioe != null ) { + throw ioe; + } + } finally { + lock.unlock(); + } + } + + + private static void close(final Closeable... closeables) throws IOException { + IOException ioe = null; + for ( final Closeable closeable : closeables ) { + if ( closeable == null ) { + continue; + } + + try { + closeable.close(); + } catch (final IOException e) { + if ( ioe == null ) { + ioe = e; + } else { + ioe.addSuppressed(e); + } + } + } + + if ( ioe != null ) { + throw ioe; + } + } + + + private static class ActiveIndexSearcher implements Closeable { + private final IndexSearcher searcher; + private final DirectoryReader directoryReader; + private final Directory directory; + private final boolean cache; + + public ActiveIndexSearcher(IndexSearcher searcher, DirectoryReader directoryReader, + Directory directory, final boolean cache) { + this.searcher = searcher; + this.directoryReader = directoryReader; + this.directory = directory; + this.cache = cache; + } + + public boolean isCache() { + return cache; + } + + public IndexSearcher getSearcher() { + return searcher; + } + + @Override + public void close() throws IOException { + IndexManager.close(directoryReader, directory); + } + } + + + private static class IndexWriterCount implements Closeable { + private final IndexWriter writer; + private final Analyzer analyzer; + private final Directory directory; + private final int count; + + public IndexWriterCount(final IndexWriter writer, final Analyzer analyzer, final Directory directory, final int count) { + this.writer = writer; + this.analyzer = analyzer; + this.directory = directory; + this.count = count; + } + + public Analyzer getAnalyzer() { + return analyzer; + } + + public Directory getDirectory() { + return directory; + } + + public IndexWriter getWriter() { + return writer; + } + + public int getCount() { + return count; + } + + @Override + public void close() throws IOException { + IndexManager.close(writer, analyzer, directory); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java index dcb6e08..53869f4 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java @@ -35,7 +35,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class IndexSearch { - private final Logger logger = LoggerFactory.getLogger(IndexSearch.class); + private final Logger logger = LoggerFactory.getLogger(IndexSearch.class); private final PersistentProvenanceRepository repository; private final File indexDirectory; private final IndexManager indexManager; @@ -65,17 +65,17 @@ public class IndexSearch { final long start = System.nanoTime(); IndexSearcher searcher = null; try { - searcher = indexManager.borrowIndexSearcher(indexDirectory); + searcher = indexManager.borrowIndexSearcher(indexDirectory); final long searchStartNanos = System.nanoTime(); final long openSearcherNanos = searchStartNanos - start; - + final TopDocs topDocs = searcher.search(luceneQuery, provenanceQuery.getMaxResults()); final long finishSearch = System.nanoTime(); final long searchNanos = finishSearch - searchStartNanos; - - logger.debug("Searching {} took {} millis; opening searcher took {} millis", this, - TimeUnit.NANOSECONDS.toMillis(searchNanos), TimeUnit.NANOSECONDS.toMillis(openSearcherNanos)); - + + logger.debug("Searching {} took {} millis; opening searcher took {} millis", this, + TimeUnit.NANOSECONDS.toMillis(searchNanos), TimeUnit.NANOSECONDS.toMillis(openSearcherNanos)); + if (topDocs.totalHits == 0) { sqr.update(Collections.<ProvenanceEventRecord>emptyList(), 0); return sqr; @@ -83,31 +83,31 @@ public class IndexSearch { final DocsReader docsReader = new DocsReader(repository.getConfiguration().getStorageDirectories()); matchingRecords = docsReader.read(topDocs, searcher.getIndexReader(), repository.getAllLogFiles(), retrievedCount, provenanceQuery.getMaxResults()); - + final long readRecordsNanos = System.nanoTime() - finishSearch; logger.debug("Reading {} records took {} millis for {}", matchingRecords.size(), TimeUnit.NANOSECONDS.toMillis(readRecordsNanos), this); - + sqr.update(matchingRecords, topDocs.totalHits); return sqr; } catch (final FileNotFoundException e) { // nothing has been indexed yet, or the data has already aged off - logger.warn("Attempted to search Provenance Index {} but could not find the file due to {}", indexDirectory, e); - if ( logger.isDebugEnabled() ) { - logger.warn("", e); - } - + logger.warn("Attempted to search Provenance Index {} but could not find the file due to {}", indexDirectory, e); + if ( logger.isDebugEnabled() ) { + logger.warn("", e); + } + sqr.update(Collections.<ProvenanceEventRecord>emptyList(), 0); return sqr; } finally { - if ( searcher != null ) { - indexManager.returnIndexSearcher(indexDirectory, searcher); - } + if ( searcher != null ) { + indexManager.returnIndexSearcher(indexDirectory, searcher); + } } } - + @Override public String toString() { - return "IndexSearcher[" + indexDirectory + "]"; + return "IndexSearcher[" + indexDirectory + "]"; } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java index 5e87913..46be391 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java @@ -16,50 +16,30 @@ */ package org.apache.nifi.provenance.lucene; -import java.io.EOFException; -import java.io.File; import java.io.IOException; import java.util.Collections; import java.util.HashSet; import java.util.Map; import java.util.Set; -import org.apache.lucene.analysis.Analyzer; -import org.apache.lucene.analysis.standard.StandardAnalyzer; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field.Store; import org.apache.lucene.document.IntField; import org.apache.lucene.document.LongField; import org.apache.lucene.document.StringField; import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.IndexWriterConfig; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.FSDirectory; import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.provenance.IndexConfiguration; import org.apache.nifi.provenance.PersistentProvenanceRepository; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.provenance.SearchableFields; import org.apache.nifi.provenance.StandardProvenanceEventRecord; -import org.apache.nifi.provenance.rollover.RolloverAction; import org.apache.nifi.provenance.search.SearchableField; -import org.apache.nifi.provenance.serialization.RecordReader; -import org.apache.nifi.provenance.serialization.RecordReaders; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -public class IndexingAction implements RolloverAction { - - private final PersistentProvenanceRepository repository; +public class IndexingAction { private final Set<SearchableField> nonAttributeSearchableFields; private final Set<SearchableField> attributeSearchableFields; - private final IndexConfiguration indexConfiguration; - private static final Logger logger = LoggerFactory.getLogger(IndexingAction.class); - - public IndexingAction(final PersistentProvenanceRepository repo, final IndexConfiguration indexConfig) { - repository = repo; - indexConfiguration = indexConfig; + public IndexingAction(final PersistentProvenanceRepository repo) { attributeSearchableFields = Collections.unmodifiableSet(new HashSet<>(repo.getConfiguration().getSearchableAttributes())); nonAttributeSearchableFields = Collections.unmodifiableSet(new HashSet<>(repo.getConfiguration().getSearchableFields())); } @@ -72,7 +52,7 @@ public class IndexingAction implements RolloverAction { doc.add(new StringField(field.getSearchableFieldName(), value.toLowerCase(), store)); } - + public void index(final StandardProvenanceEventRecord record, final IndexWriter indexWriter, final Integer blockIndex) throws IOException { final Map<String, String> attributes = record.getAttributes(); @@ -105,14 +85,14 @@ public class IndexingAction implements RolloverAction { doc.add(new LongField(SearchableFields.EventTime.getSearchableFieldName(), record.getEventTime(), Store.NO)); doc.add(new LongField(SearchableFields.FileSize.getSearchableFieldName(), record.getFileSize(), Store.NO)); doc.add(new StringField(FieldNames.STORAGE_FILENAME, storageFilename, Store.YES)); - + if ( blockIndex == null ) { - doc.add(new LongField(FieldNames.STORAGE_FILE_OFFSET, record.getStorageByteOffset(), Store.YES)); + doc.add(new LongField(FieldNames.STORAGE_FILE_OFFSET, record.getStorageByteOffset(), Store.YES)); } else { - doc.add(new IntField(FieldNames.BLOCK_INDEX, blockIndex, Store.YES)); - doc.add(new LongField(SearchableFields.Identifier.getSearchableFieldName(), record.getEventId(), Store.YES)); + doc.add(new IntField(FieldNames.BLOCK_INDEX, blockIndex, Store.YES)); + doc.add(new LongField(SearchableFields.Identifier.getSearchableFieldName(), record.getEventId(), Store.YES)); } - + for (final String lineageIdentifier : record.getLineageIdentifiers()) { addField(doc, SearchableFields.LineageIdentifier, lineageIdentifier, Store.NO); } @@ -150,87 +130,4 @@ public class IndexingAction implements RolloverAction { indexWriter.addDocument(doc); } } - - @Override - public File execute(final File fileRolledOver) throws IOException { - final File indexingDirectory = indexConfiguration.getWritableIndexDirectory(fileRolledOver); - int indexCount = 0; - long maxId = -1L; - - try (final Directory directory = FSDirectory.open(indexingDirectory); - final Analyzer analyzer = new StandardAnalyzer()) { - - final IndexWriterConfig config = new IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer); - config.setWriteLockTimeout(300000L); - - try (final IndexWriter indexWriter = new IndexWriter(directory, config); - final RecordReader reader = RecordReaders.newRecordReader(fileRolledOver, repository.getAllLogFiles())) { - StandardProvenanceEventRecord record; - while (true) { - final Integer blockIndex; - if ( reader.isBlockIndexAvailable() ) { - blockIndex = reader.getBlockIndex(); - } else { - blockIndex = null; - } - - try { - record = reader.nextRecord(); - } catch (final EOFException eof) { - // system was restarted while writing to the log file. Nothing we can do here, so ignore this record. - // On system restart, the FlowFiles should be back in their "original" queues, so the events will be re-created - // when the data is re-processed - break; - } - - if (record == null) { - break; - } - - maxId = record.getEventId(); - - index(record, indexWriter, blockIndex); - indexCount++; - } - - indexWriter.commit(); - } catch (final EOFException eof) { - // nothing in the file. Move on. - } - } finally { - if (maxId >= -1) { - indexConfiguration.setMaxIdIndexed(maxId); - } - } - - final File newFile = new File(fileRolledOver.getParent(), - LuceneUtil.substringBeforeLast(fileRolledOver.getName(), ".") - + ".indexed." - + LuceneUtil.substringAfterLast(fileRolledOver.getName(), ".")); - - boolean renamed = false; - for (int i = 0; i < 10 && !renamed; i++) { - renamed = fileRolledOver.renameTo(newFile); - if (!renamed) { - try { - Thread.sleep(25L); - } catch (final InterruptedException e) { - } - } - } - - if (renamed) { - logger.info("Finished indexing Provenance Log File {} to index {} with {} records indexed and renamed file to {}", - fileRolledOver, indexingDirectory, indexCount, newFile); - return newFile; - } else { - logger.warn("Finished indexing Provenance Log File {} with {} records indexed but failed to rename file to {}; indexed {} records", new Object[]{fileRolledOver, indexCount, newFile, indexCount}); - return fileRolledOver; - } - } - - @Override - public boolean hasBeenPerformed(final File fileRolledOver) { - return fileRolledOver.getName().contains(".indexed."); - } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java index 54cde15..3f75c00 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java @@ -48,7 +48,8 @@ public class LineageQuery { public static final int MAX_LINEAGE_UUIDS = 100; private static final Logger logger = LoggerFactory.getLogger(LineageQuery.class); - public static Set<ProvenanceEventRecord> computeLineageForFlowFiles(final PersistentProvenanceRepository repo, final File indexDirectory, final String lineageIdentifier, final Collection<String> flowFileUuids) throws IOException { + public static Set<ProvenanceEventRecord> computeLineageForFlowFiles(final PersistentProvenanceRepository repo, final File indexDirectory, + final String lineageIdentifier, final Collection<String> flowFileUuids) throws IOException { if (requireNonNull(flowFileUuids).size() > MAX_LINEAGE_UUIDS) { throw new IllegalArgumentException(String.format("Cannot compute lineage for more than %s FlowFiles. This lineage contains %s.", MAX_LINEAGE_UUIDS, flowFileUuids.size())); } @@ -99,7 +100,8 @@ public class LineageQuery { final DocsReader docsReader = new DocsReader(repo.getConfiguration().getStorageDirectories()); final Set<ProvenanceEventRecord> recs = docsReader.read(uuidQueryTopDocs, indexReader, repo.getAllLogFiles(), new AtomicInteger(0), Integer.MAX_VALUE); final long readDocsEnd = System.nanoTime(); - logger.debug("Finished Lineage Query; Lucene search took {} millis, reading records took {} millis", TimeUnit.NANOSECONDS.toMillis(searchEnd - searchStart), TimeUnit.NANOSECONDS.toMillis(readDocsEnd - searchEnd)); + logger.debug("Finished Lineage Query; Lucene search took {} millis, reading records took {} millis", + TimeUnit.NANOSECONDS.toMillis(searchEnd - searchStart), TimeUnit.NANOSECONDS.toMillis(readDocsEnd - searchEnd)); return recs; } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java index 59dc10b..c622ea1 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java @@ -78,16 +78,16 @@ public class LuceneUtil { final String searchString = baseName + "."; for (final Path path : allProvenanceLogs) { if (path.toFile().getName().startsWith(searchString)) { - final File file = path.toFile(); - if ( file.exists() ) { - matchingFiles.add(file); - } else { - final File dir = file.getParentFile(); - final File gzFile = new File(dir, file.getName() + ".gz"); - if ( gzFile.exists() ) { - matchingFiles.add(gzFile); - } - } + final File file = path.toFile(); + if ( file.exists() ) { + matchingFiles.add(file); + } else { + final File dir = file.getParentFile(); + final File gzFile = new File(dir, file.getName() + ".gz"); + if ( gzFile.exists() ) { + matchingFiles.add(gzFile); + } + } } } @@ -144,16 +144,16 @@ public class LuceneUtil { final IndexableField fileOffset1 = o1.getField(FieldNames.BLOCK_INDEX); final IndexableField fileOffset2 = o1.getField(FieldNames.BLOCK_INDEX); if ( fileOffset1 != null && fileOffset2 != null ) { - final int blockIndexResult = Long.compare(fileOffset1.numericValue().longValue(), fileOffset2.numericValue().longValue()); - if ( blockIndexResult != 0 ) { - return blockIndexResult; - } - - final long eventId1 = o1.getField(SearchableFields.Identifier.getSearchableFieldName()).numericValue().longValue(); - final long eventId2 = o2.getField(SearchableFields.Identifier.getSearchableFieldName()).numericValue().longValue(); - return Long.compare(eventId1, eventId2); + final int blockIndexResult = Long.compare(fileOffset1.numericValue().longValue(), fileOffset2.numericValue().longValue()); + if ( blockIndexResult != 0 ) { + return blockIndexResult; + } + + final long eventId1 = o1.getField(SearchableFields.Identifier.getSearchableFieldName()).numericValue().longValue(); + final long eventId2 = o2.getField(SearchableFields.Identifier.getSearchableFieldName()).numericValue().longValue(); + return Long.compare(eventId1, eventId2); } - + final long offset1 = o1.getField(FieldNames.STORAGE_FILE_OFFSET).numericValue().longValue(); final long offset2 = o2.getField(FieldNames.STORAGE_FILE_OFFSET).numericValue().longValue(); return Long.compare(offset1, offset2); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/rollover/CompressionAction.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/rollover/CompressionAction.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/rollover/CompressionAction.java deleted file mode 100644 index d014618..0000000 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/rollover/CompressionAction.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.provenance.rollover; - -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStream; - -import org.apache.nifi.stream.io.GZIPOutputStream; -import org.apache.nifi.stream.io.StreamUtils; -import org.apache.nifi.provenance.lucene.IndexingAction; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class CompressionAction implements RolloverAction { - - private static final Logger logger = LoggerFactory.getLogger(IndexingAction.class); - - @Override - public File execute(final File fileRolledOver) throws IOException { - final File gzFile = new File(fileRolledOver.getParent(), fileRolledOver.getName() + ".gz"); - try (final FileInputStream in = new FileInputStream(fileRolledOver); - final OutputStream fos = new FileOutputStream(gzFile); - final GZIPOutputStream gzipOut = new GZIPOutputStream(fos, 1)) { - StreamUtils.copy(in, gzipOut); - in.getFD().sync(); - } - - boolean deleted = false; - for (int i = 0; i < 10 && !deleted; i++) { - deleted = fileRolledOver.delete(); - } - - logger.info("Finished compressing Provenance Log File {}", fileRolledOver); - return gzFile; - } - - @Override - public boolean hasBeenPerformed(final File fileRolledOver) { - return fileRolledOver.getName().contains(".gz"); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/rollover/RolloverAction.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/rollover/RolloverAction.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/rollover/RolloverAction.java deleted file mode 100644 index 33401e9..0000000 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/rollover/RolloverAction.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.provenance.rollover; - -import java.io.File; -import java.io.IOException; - -public interface RolloverAction { - - /** - * Performs some action against the given File and returns the new File that - * contains the modified version - * - * @param fileRolledOver - * @return - * @throws IOException - */ - File execute(File fileRolledOver) throws IOException; - - boolean hasBeenPerformed(File fileRolledOver); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java index 8bdc88a..91c8222 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java @@ -24,75 +24,80 @@ import org.apache.nifi.provenance.toc.TocReader; public interface RecordReader extends Closeable { - /** - * Returns the next record in the reader, or <code>null</code> if there is no more data available. - * @return - * @throws IOException - */ + /** + * Returns the next record in the reader, or <code>null</code> if there is no more data available. + * @return the next Provenance event in the stream + * @throws IOException if unable to read the next event from the stream + */ StandardProvenanceEventRecord nextRecord() throws IOException; /** * Skips the specified number of bytes - * @param bytesToSkip - * @throws IOException + * @param bytesToSkip the number of bytes to skip ahead + * @throws IOException if unable to skip ahead the specified number of bytes (e.g., the stream does + * not contain this many more bytes) */ void skip(long bytesToSkip) throws IOException; /** * Skips to the specified byte offset in the underlying stream. - * @param position + * @param position the byte offset to skip to * @throws IOException if the underlying stream throws IOException, or if the reader has already * passed the specified byte offset */ void skipTo(long position) throws IOException; - + /** * Skips to the specified compression block - * - * @param blockIndex + * + * @param blockIndex the byte index to skip to * @throws IOException if the underlying stream throws IOException, or if the reader has already * read passed the specified compression block index * @throws IllegalStateException if the RecordReader does not have a TableOfContents associated with it */ void skipToBlock(int blockIndex) throws IOException; - + /** * Returns the block index that the Reader is currently reading from. * Note that the block index is incremented at the beginning of the {@link #nextRecord()} - * method. This means that this method will return the block from which the previous record was read, + * method. This means that this method will return the block from which the previous record was read, * if calling {@link #nextRecord()} continually, not the block from which the next record will be read. - * @return + * + * @return the current block index + * @throws IllegalStateException if the reader is reading a provenance event file that does not contain + * a Table of Contents */ int getBlockIndex(); - + /** * Returns <code>true</code> if the compression block index is available. It will be available * if and only if the reader is created with a TableOfContents - * - * @return + * + * @return true if the reader is reading from an event file that has a Table of Contents */ boolean isBlockIndexAvailable(); - + /** * Returns the {@link TocReader} that is used to keep track of compression blocks, if one exists, * <code>null</code> otherwise - * @return + * + * @return the TocReader if the underlying event file has an Table of Contents, <code>null</code> otherwise. */ TocReader getTocReader(); - + /** - * Returns the number of bytes that have been consumed from the stream (read or skipped). - * @return + * @return the number of bytes that have been consumed from the stream (read or skipped). */ long getBytesConsumed(); - + /** * Returns the ID of the last event in this record reader, or -1 if the reader has no records or * has already read through all records. Note: This method will consume the stream until the end, * so no more records will be available on this reader after calling this method. - * - * @return - * @throws IOException + * + * @return the ID of the last event in this record reader, or -1 if the reader has no records or + * has already read through all records + * @throws IOException if unable to get id of the last event */ long getMaxEventId() throws IOException; } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java index dff281c..cab5e6f 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java @@ -37,75 +37,75 @@ public class RecordReaders { InputStream fis = null; try { - if (!file.exists()) { - if (provenanceLogFiles != null) { - final String baseName = LuceneUtil.substringBefore(file.getName(), ".") + "."; - for (final Path path : provenanceLogFiles) { - if (path.toFile().getName().startsWith(baseName)) { - file = path.toFile(); - break; - } - } - } - } - - if ( file.exists() ) { - try { - fis = new FileInputStream(file); - } catch (final FileNotFoundException fnfe) { - fis = null; - } - } - - String filename = file.getName(); - openStream: while ( fis == null ) { - final File dir = file.getParentFile(); - final String baseName = LuceneUtil.substringBefore(file.getName(), "."); - - // depending on which rollover actions have occurred, we could have 3 possibilities for the - // filename that we need. The majority of the time, we will use the extension ".prov.indexed.gz" - // because most often we are compressing on rollover and most often we have already finished - // compressing by the time that we are querying the data. - for ( final String extension : new String[] {".prov.gz", ".prov"} ) { - file = new File(dir, baseName + extension); - if ( file.exists() ) { - try { - fis = new FileInputStream(file); - filename = baseName + extension; - break openStream; - } catch (final FileNotFoundException fnfe) { - // file was modified by a RolloverAction after we verified that it exists but before we could - // create an InputStream for it. Start over. - fis = null; - continue openStream; - } - } - } - - break; - } - - if ( fis == null ) { - throw new FileNotFoundException("Unable to locate file " + originalFile); - } - - final File tocFile = TocUtil.getTocFile(file); - if ( tocFile.exists() ) { - final TocReader tocReader = new StandardTocReader(tocFile); - return new StandardRecordReader(fis, filename, tocReader); - } else { - return new StandardRecordReader(fis, filename); - } + if (!file.exists()) { + if (provenanceLogFiles != null) { + final String baseName = LuceneUtil.substringBefore(file.getName(), ".") + "."; + for (final Path path : provenanceLogFiles) { + if (path.toFile().getName().startsWith(baseName)) { + file = path.toFile(); + break; + } + } + } + } + + if ( file.exists() ) { + try { + fis = new FileInputStream(file); + } catch (final FileNotFoundException fnfe) { + fis = null; + } + } + + String filename = file.getName(); + openStream: while ( fis == null ) { + final File dir = file.getParentFile(); + final String baseName = LuceneUtil.substringBefore(file.getName(), "."); + + // depending on which rollover actions have occurred, we could have 3 possibilities for the + // filename that we need. The majority of the time, we will use the extension ".prov.indexed.gz" + // because most often we are compressing on rollover and most often we have already finished + // compressing by the time that we are querying the data. + for ( final String extension : new String[] {".prov.gz", ".prov"} ) { + file = new File(dir, baseName + extension); + if ( file.exists() ) { + try { + fis = new FileInputStream(file); + filename = baseName + extension; + break openStream; + } catch (final FileNotFoundException fnfe) { + // file was modified by a RolloverAction after we verified that it exists but before we could + // create an InputStream for it. Start over. + fis = null; + continue openStream; + } + } + } + + break; + } + + if ( fis == null ) { + throw new FileNotFoundException("Unable to locate file " + originalFile); + } + + final File tocFile = TocUtil.getTocFile(file); + if ( tocFile.exists() ) { + final TocReader tocReader = new StandardTocReader(tocFile); + return new StandardRecordReader(fis, filename, tocReader); + } else { + return new StandardRecordReader(fis, filename); + } } catch (final IOException ioe) { - if ( fis != null ) { - try { - fis.close(); - } catch (final IOException inner) { - ioe.addSuppressed(inner); - } - } - - throw ioe; + if ( fis != null ) { + try { + fis.close(); + } catch (final IOException inner) { + ioe.addSuppressed(inner); + } + } + + throw ioe; } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java index 58f4dc2..d89fd6f 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java @@ -28,31 +28,27 @@ public interface RecordWriter extends Closeable { /** * Writes header information to the underlying stream * - * @throws IOException + * @throws IOException if unable to write header information to the underlying stream */ void writeHeader() throws IOException; /** * Writes the given record out to the underlying stream * - * @param record - * @param recordIdentifier + * @param record the record to write + * @param recordIdentifier the new identifier of the record * @return the number of bytes written for the given records - * @throws IOException + * @throws IOException if unable to write the record to the stream */ long writeRecord(ProvenanceEventRecord record, long recordIdentifier) throws IOException; /** - * Returns the number of Records that have been written to this RecordWriter - * - * @return + * @return the number of Records that have been written to this RecordWriter */ int getRecordsWritten(); /** - * Returns the file that this RecordWriter is writing to - * - * @return + * @return the file that this RecordWriter is writing to */ File getFile(); @@ -73,19 +69,18 @@ public interface RecordWriter extends Closeable { * not immediately available, returns <code>false</code>; otherwise, obtains * the lock and returns <code>true</code>. * - * @return + * @return <code>true</code> if the lock was obtained, <code>false</code> otherwise. */ boolean tryLock(); /** * Syncs the content written to this writer to disk. - * @throws java.io.IOException + * @throws IOException if unable to sync content to disk */ void sync() throws IOException; /** - * Returns the TOC Writer that is being used to write the Table of Contents for this journal - * @return + * @return the TOC Writer that is being used to write the Table of Contents for this journal */ TocWriter getTocWriter(); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java index 47b7c7e..cf8f7b4 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java @@ -25,14 +25,14 @@ import org.apache.nifi.provenance.toc.TocUtil; import org.apache.nifi.provenance.toc.TocWriter; public class RecordWriters { - private static final int DEFAULT_COMPRESSION_BLOCK_SIZE = 1024 * 1024; // 1 MB + private static final int DEFAULT_COMPRESSION_BLOCK_SIZE = 1024 * 1024; // 1 MB public static RecordWriter newRecordWriter(final File file, final boolean compressed, final boolean createToc) throws IOException { - return newRecordWriter(file, compressed, createToc, DEFAULT_COMPRESSION_BLOCK_SIZE); + return newRecordWriter(file, compressed, createToc, DEFAULT_COMPRESSION_BLOCK_SIZE); } - + public static RecordWriter newRecordWriter(final File file, final boolean compressed, final boolean createToc, final int compressionBlockBytes) throws IOException { - final TocWriter tocWriter = createToc ? new StandardTocWriter(TocUtil.getTocFile(file), false, false) : null; + final TocWriter tocWriter = createToc ? new StandardTocWriter(TocUtil.getTocFile(file), false, false) : null; return new StandardRecordWriter(file, tocWriter, compressed, compressionBlockBytes); }
