http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java new file mode 100644 index 0000000..3943504 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java @@ -0,0 +1,467 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.lucene; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class IndexManager implements Closeable { + private static final Logger logger = LoggerFactory.getLogger(IndexManager.class); + + private final Lock lock = new ReentrantLock(); + private final Map<File, IndexWriterCount> writerCounts = new HashMap<>(); + private final Map<File, List<ActiveIndexSearcher>> activeSearchers = new HashMap<>(); + + + public void removeIndex(final File indexDirectory) { + final File absoluteFile = indexDirectory.getAbsoluteFile(); + logger.info("Removing index {}", indexDirectory); + + lock.lock(); + try { + final IndexWriterCount count = writerCounts.remove(absoluteFile); + if ( count != null ) { + try { + count.close(); + } catch (final IOException ioe) { + logger.warn("Failed to close Index Writer {} for {}", count.getWriter(), absoluteFile); + if ( logger.isDebugEnabled() ) { + logger.warn("", ioe); + } + } + } + + for ( final List<ActiveIndexSearcher> searcherList : activeSearchers.values() ) { + for ( final ActiveIndexSearcher searcher : searcherList ) { + try { + searcher.close(); + } catch (final IOException ioe) { + logger.warn("Failed to close Index Searcher {} for {} due to {}", + searcher.getSearcher(), absoluteFile, ioe); + if ( logger.isDebugEnabled() ) { + logger.warn("", ioe); + } + } + } + } + } finally { + lock.unlock(); + } + } + + public IndexWriter borrowIndexWriter(final File indexingDirectory) throws IOException { + final File absoluteFile = indexingDirectory.getAbsoluteFile(); + logger.debug("Borrowing index writer for {}", indexingDirectory); + + lock.lock(); + try { + IndexWriterCount writerCount = writerCounts.remove(absoluteFile); + if ( writerCount == null ) { + final List<Closeable> closeables = new ArrayList<>(); + final Directory directory = FSDirectory.open(indexingDirectory); + closeables.add(directory); + + try { + final Analyzer analyzer = new StandardAnalyzer(); + closeables.add(analyzer); + + final IndexWriterConfig config = new IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer); + config.setWriteLockTimeout(300000L); + + final IndexWriter indexWriter = new IndexWriter(directory, config); + writerCount = new IndexWriterCount(indexWriter, analyzer, directory, 1); + logger.debug("Providing new index writer for {}", indexingDirectory); + } catch (final IOException ioe) { + for ( final Closeable closeable : closeables ) { + try { + closeable.close(); + } catch (final IOException ioe2) { + ioe.addSuppressed(ioe2); + } + } + + throw ioe; + } + + writerCounts.put(absoluteFile, writerCount); + } else { + logger.debug("Providing existing index writer for {} and incrementing count to {}", indexingDirectory, writerCount.getCount() + 1); + writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(), + writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1)); + } + + return writerCount.getWriter(); + } finally { + lock.unlock(); + } + } + + public void returnIndexWriter(final File indexingDirectory, final IndexWriter writer) { + final File absoluteFile = indexingDirectory.getAbsoluteFile(); + logger.debug("Returning Index Writer for {} to IndexManager", indexingDirectory); + + lock.lock(); + try { + IndexWriterCount count = writerCounts.remove(absoluteFile); + + try { + if ( count == null ) { + logger.warn("Index Writer {} was returned to IndexManager for {}, but this writer is not known. " + + "This could potentially lead to a resource leak", writer, indexingDirectory); + writer.close(); + } else if ( count.getCount() <= 1 ) { + // we are finished with this writer. + logger.debug("Closing Index Writer for {}", indexingDirectory); + count.close(); + } else { + // decrement the count. + logger.debug("Decrementing count for Index Writer for {} to {}", indexingDirectory, count.getCount() - 1); + writerCounts.put(absoluteFile, new IndexWriterCount(count.getWriter(), count.getAnalyzer(), count.getDirectory(), count.getCount() - 1)); + } + } catch (final IOException ioe) { + logger.warn("Failed to close Index Writer {} due to {}", writer, ioe); + if ( logger.isDebugEnabled() ) { + logger.warn("", ioe); + } + } + } finally { + lock.unlock(); + } + } + + + public IndexSearcher borrowIndexSearcher(final File indexDir) throws IOException { + final File absoluteFile = indexDir.getAbsoluteFile(); + logger.debug("Borrowing index searcher for {}", indexDir); + + lock.lock(); + try { + // check if we already have a reader cached. + List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile); + if ( currentlyCached == null ) { + currentlyCached = new ArrayList<>(); + activeSearchers.put(absoluteFile, currentlyCached); + } else { + // keep track of any searchers that have been closed so that we can remove them + // from our cache later. + final Set<ActiveIndexSearcher> expired = new HashSet<>(); + + try { + for ( final ActiveIndexSearcher searcher : currentlyCached ) { + if ( searcher.isCache() ) { + final int refCount = searcher.getSearcher().getIndexReader().getRefCount(); + if ( refCount <= 0 ) { + // if refCount == 0, then the reader has been closed, so we need to discard the searcher + logger.debug("Reference count for cached Index Searcher for {} is currently {}; " + + "removing cached searcher", absoluteFile, refCount); + expired.add(searcher); + continue; + } + + logger.debug("Providing previously cached index searcher for {}", indexDir); + return searcher.getSearcher(); + } + } + } finally { + // if we have any expired index searchers, we need to close them and remove them + // from the cache so that we don't try to use them again later. + for ( final ActiveIndexSearcher searcher : expired ) { + try { + searcher.close(); + } catch (final Exception e) { + logger.debug("Failed to close 'expired' IndexSearcher {}", searcher); + } + + currentlyCached.remove(searcher); + } + } + } + + IndexWriterCount writerCount = writerCounts.remove(absoluteFile); + if ( writerCount == null ) { + final Directory directory = FSDirectory.open(absoluteFile); + logger.debug("No Index Writer currently exists for {}; creating a cachable reader", indexDir); + + try { + final DirectoryReader directoryReader = DirectoryReader.open(directory); + final IndexSearcher searcher = new IndexSearcher(directoryReader); + + // we want to cache the searcher that we create, since it's just a reader. + final ActiveIndexSearcher cached = new ActiveIndexSearcher(searcher, directoryReader, directory, true); + currentlyCached.add(cached); + + return cached.getSearcher(); + } catch (final IOException e) { + try { + directory.close(); + } catch (final IOException ioe) { + e.addSuppressed(ioe); + } + + throw e; + } + } else { + logger.debug("Index Writer currently exists for {}; creating a non-cachable reader and incrementing " + + "counter to {}", indexDir, writerCount.getCount() + 1); + + // increment the writer count to ensure that it's kept open. + writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(), + writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1)); + + // create a new Index Searcher from the writer so that we don't have an issue with trying + // to read from a directory that's locked. If we get the "no segments* file found" with + // Lucene, this indicates that an IndexWriter already has the directory open. + final IndexWriter writer = writerCount.getWriter(); + final DirectoryReader directoryReader = DirectoryReader.open(writer, false); + final IndexSearcher searcher = new IndexSearcher(directoryReader); + + // we don't want to cache this searcher because it's based on a writer, so we want to get + // new values the next time that we search. + final ActiveIndexSearcher activeSearcher = new ActiveIndexSearcher(searcher, directoryReader, null, false); + + currentlyCached.add(activeSearcher); + return activeSearcher.getSearcher(); + } + } finally { + lock.unlock(); + } + } + + + public void returnIndexSearcher(final File indexDirectory, final IndexSearcher searcher) { + final File absoluteFile = indexDirectory.getAbsoluteFile(); + logger.debug("Returning index searcher for {} to IndexManager", indexDirectory); + + lock.lock(); + try { + // check if we already have a reader cached. + List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile); + if ( currentlyCached == null ) { + logger.warn("Received Index Searcher for {} but no searcher was provided for that directory; this could " + + "result in a resource leak", indexDirectory); + return; + } + + final Iterator<ActiveIndexSearcher> itr = currentlyCached.iterator(); + while (itr.hasNext()) { + final ActiveIndexSearcher activeSearcher = itr.next(); + if ( activeSearcher.getSearcher().equals(searcher) ) { + if ( activeSearcher.isCache() ) { + // the searcher is cached. Just leave it open. + logger.debug("Index searcher for {} is cached; leaving open", indexDirectory); + return; + } else { + // searcher is not cached. It was created from a writer, and we want + // the newest updates the next time that we get a searcher, so we will + // go ahead and close this one out. + itr.remove(); + + // decrement the writer count because we incremented it when creating the searcher + final IndexWriterCount writerCount = writerCounts.remove(absoluteFile); + if ( writerCount != null ) { + if ( writerCount.getCount() <= 1 ) { + try { + logger.debug("Index searcher for {} is not cached. Writer count is " + + "decremented to {}; closing writer", indexDirectory, writerCount.getCount() - 1); + + writerCount.close(); + } catch (final IOException ioe) { + logger.warn("Failed to close Index Writer for {} due to {}", absoluteFile, ioe); + if ( logger.isDebugEnabled() ) { + logger.warn("", ioe); + } + } + } else { + logger.debug("Index searcher for {} is not cached. Writer count is decremented " + + "to {}; leaving writer open", indexDirectory, writerCount.getCount() - 1); + + writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(), + writerCount.getAnalyzer(), writerCount.getDirectory(), + writerCount.getCount() - 1)); + } + } + + try { + logger.debug("Closing Index Searcher for {}", indexDirectory); + activeSearcher.close(); + } catch (final IOException ioe) { + logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe); + if ( logger.isDebugEnabled() ) { + logger.warn("", ioe); + } + } + } + } + } + } finally { + lock.unlock(); + } + } + + @Override + public void close() throws IOException { + logger.debug("Closing Index Manager"); + + lock.lock(); + try { + IOException ioe = null; + + for ( final IndexWriterCount count : writerCounts.values() ) { + try { + count.close(); + } catch (final IOException e) { + if ( ioe == null ) { + ioe = e; + } else { + ioe.addSuppressed(e); + } + } + } + + for (final List<ActiveIndexSearcher> searcherList : activeSearchers.values()) { + for (final ActiveIndexSearcher searcher : searcherList) { + try { + searcher.close(); + } catch (final IOException e) { + if ( ioe == null ) { + ioe = e; + } else { + ioe.addSuppressed(e); + } + } + } + } + + if ( ioe != null ) { + throw ioe; + } + } finally { + lock.unlock(); + } + } + + + private static void close(final Closeable... closeables) throws IOException { + IOException ioe = null; + for ( final Closeable closeable : closeables ) { + if ( closeable == null ) { + continue; + } + + try { + closeable.close(); + } catch (final IOException e) { + if ( ioe == null ) { + ioe = e; + } else { + ioe.addSuppressed(e); + } + } + } + + if ( ioe != null ) { + throw ioe; + } + } + + + private static class ActiveIndexSearcher implements Closeable { + private final IndexSearcher searcher; + private final DirectoryReader directoryReader; + private final Directory directory; + private final boolean cache; + + public ActiveIndexSearcher(IndexSearcher searcher, DirectoryReader directoryReader, + Directory directory, final boolean cache) { + this.searcher = searcher; + this.directoryReader = directoryReader; + this.directory = directory; + this.cache = cache; + } + + public boolean isCache() { + return cache; + } + + public IndexSearcher getSearcher() { + return searcher; + } + + @Override + public void close() throws IOException { + IndexManager.close(directoryReader, directory); + } + } + + + private static class IndexWriterCount implements Closeable { + private final IndexWriter writer; + private final Analyzer analyzer; + private final Directory directory; + private final int count; + + public IndexWriterCount(final IndexWriter writer, final Analyzer analyzer, final Directory directory, final int count) { + this.writer = writer; + this.analyzer = analyzer; + this.directory = directory; + this.count = count; + } + + public Analyzer getAnalyzer() { + return analyzer; + } + + public Directory getDirectory() { + return directory; + } + + public IndexWriter getWriter() { + return writer; + } + + public int getCount() { + return count; + } + + @Override + public void close() throws IOException { + IndexManager.close(writer, analyzer, directory); + } + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java index e2854c3..dcb6e08 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java @@ -17,31 +17,33 @@ package org.apache.nifi.provenance.lucene; import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.Collections; import java.util.Date; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.nifi.provenance.PersistentProvenanceRepository; -import org.apache.nifi.provenance.ProvenanceEventRecord; -import org.apache.nifi.provenance.StandardQueryResult; - -import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.IndexNotFoundException; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; import org.apache.lucene.search.TopDocs; -import org.apache.lucene.store.FSDirectory; +import org.apache.nifi.provenance.PersistentProvenanceRepository; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.StandardQueryResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class IndexSearch { - + private final Logger logger = LoggerFactory.getLogger(IndexSearch.class); private final PersistentProvenanceRepository repository; private final File indexDirectory; + private final IndexManager indexManager; - public IndexSearch(final PersistentProvenanceRepository repo, final File indexDirectory) { + public IndexSearch(final PersistentProvenanceRepository repo, final File indexDirectory, final IndexManager indexManager) { this.repository = repo; this.indexDirectory = indexDirectory; + this.indexManager = indexManager; } public StandardQueryResult search(final org.apache.nifi.provenance.search.Query provenanceQuery, final AtomicInteger retrievedCount) throws IOException { @@ -55,30 +57,57 @@ public class IndexSearch { final StandardQueryResult sqr = new StandardQueryResult(provenanceQuery, 1); final Set<ProvenanceEventRecord> matchingRecords; - try (final DirectoryReader directoryReader = DirectoryReader.open(FSDirectory.open(indexDirectory))) { - final IndexSearcher searcher = new IndexSearcher(directoryReader); - - if (provenanceQuery.getEndDate() == null) { - provenanceQuery.setEndDate(new Date()); - } - final Query luceneQuery = LuceneUtil.convertQuery(provenanceQuery); + if (provenanceQuery.getEndDate() == null) { + provenanceQuery.setEndDate(new Date()); + } + final Query luceneQuery = LuceneUtil.convertQuery(provenanceQuery); - TopDocs topDocs = searcher.search(luceneQuery, provenanceQuery.getMaxResults()); + final long start = System.nanoTime(); + IndexSearcher searcher = null; + try { + searcher = indexManager.borrowIndexSearcher(indexDirectory); + final long searchStartNanos = System.nanoTime(); + final long openSearcherNanos = searchStartNanos - start; + + final TopDocs topDocs = searcher.search(luceneQuery, provenanceQuery.getMaxResults()); + final long finishSearch = System.nanoTime(); + final long searchNanos = finishSearch - searchStartNanos; + + logger.debug("Searching {} took {} millis; opening searcher took {} millis", this, + TimeUnit.NANOSECONDS.toMillis(searchNanos), TimeUnit.NANOSECONDS.toMillis(openSearcherNanos)); + if (topDocs.totalHits == 0) { sqr.update(Collections.<ProvenanceEventRecord>emptyList(), 0); return sqr; } final DocsReader docsReader = new DocsReader(repository.getConfiguration().getStorageDirectories()); - matchingRecords = docsReader.read(topDocs, directoryReader, repository.getAllLogFiles(), retrievedCount, provenanceQuery.getMaxResults()); - + matchingRecords = docsReader.read(topDocs, searcher.getIndexReader(), repository.getAllLogFiles(), retrievedCount, provenanceQuery.getMaxResults()); + + final long readRecordsNanos = System.nanoTime() - finishSearch; + logger.debug("Reading {} records took {} millis for {}", matchingRecords.size(), TimeUnit.NANOSECONDS.toMillis(readRecordsNanos), this); + sqr.update(matchingRecords, topDocs.totalHits); return sqr; - } catch (final IndexNotFoundException e) { - // nothing has been indexed yet. + } catch (final FileNotFoundException e) { + // nothing has been indexed yet, or the data has already aged off + logger.warn("Attempted to search Provenance Index {} but could not find the file due to {}", indexDirectory, e); + if ( logger.isDebugEnabled() ) { + logger.warn("", e); + } + sqr.update(Collections.<ProvenanceEventRecord>emptyList(), 0); return sqr; + } finally { + if ( searcher != null ) { + indexManager.returnIndexSearcher(indexDirectory, searcher); + } } } + + @Override + public String toString() { + return "IndexSearcher[" + indexDirectory + "]"; + } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java index 214267a..5e87913 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java @@ -24,27 +24,27 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; -import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.provenance.IndexConfiguration; -import org.apache.nifi.provenance.PersistentProvenanceRepository; -import org.apache.nifi.provenance.ProvenanceEventType; -import org.apache.nifi.provenance.SearchableFields; -import org.apache.nifi.provenance.StandardProvenanceEventRecord; -import org.apache.nifi.provenance.rollover.RolloverAction; -import org.apache.nifi.provenance.search.SearchableField; -import org.apache.nifi.provenance.serialization.RecordReader; -import org.apache.nifi.provenance.serialization.RecordReaders; - import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.standard.StandardAnalyzer; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field.Store; +import org.apache.lucene.document.IntField; import org.apache.lucene.document.LongField; import org.apache.lucene.document.StringField; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.provenance.IndexConfiguration; +import org.apache.nifi.provenance.PersistentProvenanceRepository; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.provenance.SearchableFields; +import org.apache.nifi.provenance.StandardProvenanceEventRecord; +import org.apache.nifi.provenance.rollover.RolloverAction; +import org.apache.nifi.provenance.search.SearchableField; +import org.apache.nifi.provenance.serialization.RecordReader; +import org.apache.nifi.provenance.serialization.RecordReaders; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,15 +72,93 @@ public class IndexingAction implements RolloverAction { doc.add(new StringField(field.getSearchableFieldName(), value.toLowerCase(), store)); } + + public void index(final StandardProvenanceEventRecord record, final IndexWriter indexWriter, final Integer blockIndex) throws IOException { + final Map<String, String> attributes = record.getAttributes(); + + final Document doc = new Document(); + addField(doc, SearchableFields.FlowFileUUID, record.getFlowFileUuid(), Store.NO); + addField(doc, SearchableFields.Filename, attributes.get(CoreAttributes.FILENAME.key()), Store.NO); + addField(doc, SearchableFields.ComponentID, record.getComponentId(), Store.NO); + addField(doc, SearchableFields.AlternateIdentifierURI, record.getAlternateIdentifierUri(), Store.NO); + addField(doc, SearchableFields.EventType, record.getEventType().name(), Store.NO); + addField(doc, SearchableFields.Relationship, record.getRelationship(), Store.NO); + addField(doc, SearchableFields.Details, record.getDetails(), Store.NO); + addField(doc, SearchableFields.ContentClaimSection, record.getContentClaimSection(), Store.NO); + addField(doc, SearchableFields.ContentClaimContainer, record.getContentClaimContainer(), Store.NO); + addField(doc, SearchableFields.ContentClaimIdentifier, record.getContentClaimIdentifier(), Store.NO); + addField(doc, SearchableFields.SourceQueueIdentifier, record.getSourceQueueIdentifier(), Store.NO); + + if (nonAttributeSearchableFields.contains(SearchableFields.TransitURI)) { + addField(doc, SearchableFields.TransitURI, record.getTransitUri(), Store.NO); + } + + for (final SearchableField searchableField : attributeSearchableFields) { + addField(doc, searchableField, attributes.get(searchableField.getSearchableFieldName()), Store.NO); + } + + final String storageFilename = LuceneUtil.substringBefore(record.getStorageFilename(), "."); + + // Index the fields that we always index (unless there's nothing else to index at all) + if (!doc.getFields().isEmpty()) { + doc.add(new LongField(SearchableFields.LineageStartDate.getSearchableFieldName(), record.getLineageStartDate(), Store.NO)); + doc.add(new LongField(SearchableFields.EventTime.getSearchableFieldName(), record.getEventTime(), Store.NO)); + doc.add(new LongField(SearchableFields.FileSize.getSearchableFieldName(), record.getFileSize(), Store.NO)); + doc.add(new StringField(FieldNames.STORAGE_FILENAME, storageFilename, Store.YES)); + + if ( blockIndex == null ) { + doc.add(new LongField(FieldNames.STORAGE_FILE_OFFSET, record.getStorageByteOffset(), Store.YES)); + } else { + doc.add(new IntField(FieldNames.BLOCK_INDEX, blockIndex, Store.YES)); + doc.add(new LongField(SearchableFields.Identifier.getSearchableFieldName(), record.getEventId(), Store.YES)); + } + + for (final String lineageIdentifier : record.getLineageIdentifiers()) { + addField(doc, SearchableFields.LineageIdentifier, lineageIdentifier, Store.NO); + } + + // If it's event is a FORK, or JOIN, add the FlowFileUUID for all child/parent UUIDs. + if (record.getEventType() == ProvenanceEventType.FORK || record.getEventType() == ProvenanceEventType.CLONE || record.getEventType() == ProvenanceEventType.REPLAY) { + for (final String uuid : record.getChildUuids()) { + if (!uuid.equals(record.getFlowFileUuid())) { + addField(doc, SearchableFields.FlowFileUUID, uuid, Store.NO); + } + } + } else if (record.getEventType() == ProvenanceEventType.JOIN) { + for (final String uuid : record.getParentUuids()) { + if (!uuid.equals(record.getFlowFileUuid())) { + addField(doc, SearchableFields.FlowFileUUID, uuid, Store.NO); + } + } + } else if (record.getEventType() == ProvenanceEventType.RECEIVE && record.getSourceSystemFlowFileIdentifier() != null) { + // If we get a receive with a Source System FlowFile Identifier, we add another Document that shows the UUID + // that the Source System uses to refer to the data. + final String sourceIdentifier = record.getSourceSystemFlowFileIdentifier(); + final String sourceFlowFileUUID; + final int lastColon = sourceIdentifier.lastIndexOf(":"); + if (lastColon > -1 && lastColon < sourceIdentifier.length() - 2) { + sourceFlowFileUUID = sourceIdentifier.substring(lastColon + 1); + } else { + sourceFlowFileUUID = null; + } + + if (sourceFlowFileUUID != null) { + addField(doc, SearchableFields.FlowFileUUID, sourceFlowFileUUID, Store.NO); + } + } + + indexWriter.addDocument(doc); + } + } + @Override - @SuppressWarnings("deprecation") public File execute(final File fileRolledOver) throws IOException { final File indexingDirectory = indexConfiguration.getWritableIndexDirectory(fileRolledOver); int indexCount = 0; long maxId = -1L; try (final Directory directory = FSDirectory.open(indexingDirectory); - final Analyzer analyzer = new StandardAnalyzer(LuceneUtil.LUCENE_VERSION)) { + final Analyzer analyzer = new StandardAnalyzer()) { final IndexWriterConfig config = new IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer); config.setWriteLockTimeout(300000L); @@ -89,6 +167,13 @@ public class IndexingAction implements RolloverAction { final RecordReader reader = RecordReaders.newRecordReader(fileRolledOver, repository.getAllLogFiles())) { StandardProvenanceEventRecord record; while (true) { + final Integer blockIndex; + if ( reader.isBlockIndexAvailable() ) { + blockIndex = reader.getBlockIndex(); + } else { + blockIndex = null; + } + try { record = reader.nextRecord(); } catch (final EOFException eof) { @@ -104,76 +189,8 @@ public class IndexingAction implements RolloverAction { maxId = record.getEventId(); - final Map<String, String> attributes = record.getAttributes(); - - final Document doc = new Document(); - addField(doc, SearchableFields.FlowFileUUID, record.getFlowFileUuid(), Store.NO); - addField(doc, SearchableFields.Filename, attributes.get(CoreAttributes.FILENAME.key()), Store.NO); - addField(doc, SearchableFields.ComponentID, record.getComponentId(), Store.NO); - addField(doc, SearchableFields.AlternateIdentifierURI, record.getAlternateIdentifierUri(), Store.NO); - addField(doc, SearchableFields.EventType, record.getEventType().name(), Store.NO); - addField(doc, SearchableFields.Relationship, record.getRelationship(), Store.NO); - addField(doc, SearchableFields.Details, record.getDetails(), Store.NO); - addField(doc, SearchableFields.ContentClaimSection, record.getContentClaimSection(), Store.NO); - addField(doc, SearchableFields.ContentClaimContainer, record.getContentClaimContainer(), Store.NO); - addField(doc, SearchableFields.ContentClaimIdentifier, record.getContentClaimIdentifier(), Store.NO); - addField(doc, SearchableFields.SourceQueueIdentifier, record.getSourceQueueIdentifier(), Store.NO); - - if (nonAttributeSearchableFields.contains(SearchableFields.TransitURI)) { - addField(doc, SearchableFields.TransitURI, record.getTransitUri(), Store.NO); - } - - for (final SearchableField searchableField : attributeSearchableFields) { - addField(doc, searchableField, attributes.get(searchableField.getSearchableFieldName()), Store.NO); - } - - final String storageFilename = LuceneUtil.substringBefore(record.getStorageFilename(), "."); - - // Index the fields that we always index (unless there's nothing else to index at all) - if (!doc.getFields().isEmpty()) { - doc.add(new LongField(SearchableFields.LineageStartDate.getSearchableFieldName(), record.getLineageStartDate(), Store.NO)); - doc.add(new LongField(SearchableFields.EventTime.getSearchableFieldName(), record.getEventTime(), Store.NO)); - doc.add(new LongField(SearchableFields.FileSize.getSearchableFieldName(), record.getFileSize(), Store.NO)); - doc.add(new StringField(FieldNames.STORAGE_FILENAME, storageFilename, Store.YES)); - doc.add(new LongField(FieldNames.STORAGE_FILE_OFFSET, record.getStorageByteOffset(), Store.YES)); - - for (final String lineageIdentifier : record.getLineageIdentifiers()) { - addField(doc, SearchableFields.LineageIdentifier, lineageIdentifier, Store.NO); - } - - // If it's event is a FORK, or JOIN, add the FlowFileUUID for all child/parent UUIDs. - if (record.getEventType() == ProvenanceEventType.FORK || record.getEventType() == ProvenanceEventType.CLONE || record.getEventType() == ProvenanceEventType.REPLAY) { - for (final String uuid : record.getChildUuids()) { - if (!uuid.equals(record.getFlowFileUuid())) { - addField(doc, SearchableFields.FlowFileUUID, uuid, Store.NO); - } - } - } else if (record.getEventType() == ProvenanceEventType.JOIN) { - for (final String uuid : record.getParentUuids()) { - if (!uuid.equals(record.getFlowFileUuid())) { - addField(doc, SearchableFields.FlowFileUUID, uuid, Store.NO); - } - } - } else if (record.getEventType() == ProvenanceEventType.RECEIVE && record.getSourceSystemFlowFileIdentifier() != null) { - // If we get a receive with a Source System FlowFile Identifier, we add another Document that shows the UUID - // that the Source System uses to refer to the data. - final String sourceIdentifier = record.getSourceSystemFlowFileIdentifier(); - final String sourceFlowFileUUID; - final int lastColon = sourceIdentifier.lastIndexOf(":"); - if (lastColon > -1 && lastColon < sourceIdentifier.length() - 2) { - sourceFlowFileUUID = sourceIdentifier.substring(lastColon + 1); - } else { - sourceFlowFileUUID = null; - } - - if (sourceFlowFileUUID != null) { - addField(doc, SearchableFields.FlowFileUUID, sourceFlowFileUUID, Store.NO); - } - } - - indexWriter.addDocument(doc); - indexCount++; - } + index(record, indexWriter, blockIndex); + indexCount++; } indexWriter.commit(); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java index a7076d5..59dc10b 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java @@ -27,8 +27,8 @@ import java.util.List; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.provenance.SearchableFields; import org.apache.nifi.provenance.search.SearchTerm; - import org.apache.lucene.document.Document; +import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.Term; import org.apache.lucene.search.BooleanClause; import org.apache.lucene.search.BooleanClause.Occur; @@ -78,7 +78,16 @@ public class LuceneUtil { final String searchString = baseName + "."; for (final Path path : allProvenanceLogs) { if (path.toFile().getName().startsWith(searchString)) { - matchingFiles.add(path.toFile()); + final File file = path.toFile(); + if ( file.exists() ) { + matchingFiles.add(file); + } else { + final File dir = file.getParentFile(); + final File gzFile = new File(dir, file.getName() + ".gz"); + if ( gzFile.exists() ) { + matchingFiles.add(gzFile); + } + } } } @@ -132,6 +141,19 @@ public class LuceneUtil { return filenameComp; } + final IndexableField fileOffset1 = o1.getField(FieldNames.BLOCK_INDEX); + final IndexableField fileOffset2 = o1.getField(FieldNames.BLOCK_INDEX); + if ( fileOffset1 != null && fileOffset2 != null ) { + final int blockIndexResult = Long.compare(fileOffset1.numericValue().longValue(), fileOffset2.numericValue().longValue()); + if ( blockIndexResult != 0 ) { + return blockIndexResult; + } + + final long eventId1 = o1.getField(SearchableFields.Identifier.getSearchableFieldName()).numericValue().longValue(); + final long eventId2 = o2.getField(SearchableFields.Identifier.getSearchableFieldName()).numericValue().longValue(); + return Long.compare(eventId1, eventId2); + } + final long offset1 = o1.getField(FieldNames.STORAGE_FILE_OFFSET).numericValue().longValue(); final long offset2 = o2.getField(FieldNames.STORAGE_FILE_OFFSET).numericValue().longValue(); return Long.compare(offset1, offset2); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java index 862bc2b..8bdc88a 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java @@ -20,12 +20,79 @@ import java.io.Closeable; import java.io.IOException; import org.apache.nifi.provenance.StandardProvenanceEventRecord; +import org.apache.nifi.provenance.toc.TocReader; public interface RecordReader extends Closeable { + /** + * Returns the next record in the reader, or <code>null</code> if there is no more data available. + * @return + * @throws IOException + */ StandardProvenanceEventRecord nextRecord() throws IOException; + /** + * Skips the specified number of bytes + * @param bytesToSkip + * @throws IOException + */ void skip(long bytesToSkip) throws IOException; + /** + * Skips to the specified byte offset in the underlying stream. + * @param position + * @throws IOException if the underlying stream throws IOException, or if the reader has already + * passed the specified byte offset + */ void skipTo(long position) throws IOException; + + /** + * Skips to the specified compression block + * + * @param blockIndex + * @throws IOException if the underlying stream throws IOException, or if the reader has already + * read passed the specified compression block index + * @throws IllegalStateException if the RecordReader does not have a TableOfContents associated with it + */ + void skipToBlock(int blockIndex) throws IOException; + + /** + * Returns the block index that the Reader is currently reading from. + * Note that the block index is incremented at the beginning of the {@link #nextRecord()} + * method. This means that this method will return the block from which the previous record was read, + * if calling {@link #nextRecord()} continually, not the block from which the next record will be read. + * @return + */ + int getBlockIndex(); + + /** + * Returns <code>true</code> if the compression block index is available. It will be available + * if and only if the reader is created with a TableOfContents + * + * @return + */ + boolean isBlockIndexAvailable(); + + /** + * Returns the {@link TocReader} that is used to keep track of compression blocks, if one exists, + * <code>null</code> otherwise + * @return + */ + TocReader getTocReader(); + + /** + * Returns the number of bytes that have been consumed from the stream (read or skipped). + * @return + */ + long getBytesConsumed(); + + /** + * Returns the ID of the last event in this record reader, or -1 if the reader has no records or + * has already read through all records. Note: This method will consume the stream until the end, + * so no more records will be available on this reader after calling this method. + * + * @return + * @throws IOException + */ + long getMaxEventId() throws IOException; } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java index 8f06995..dff281c 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java @@ -16,7 +16,6 @@ */ package org.apache.nifi.provenance.serialization; -import java.io.DataInputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; @@ -24,82 +23,90 @@ import java.io.IOException; import java.io.InputStream; import java.nio.file.Path; import java.util.Collection; -import java.util.zip.GZIPInputStream; -import org.apache.nifi.stream.io.BufferedInputStream; import org.apache.nifi.provenance.StandardRecordReader; import org.apache.nifi.provenance.lucene.LuceneUtil; +import org.apache.nifi.provenance.toc.StandardTocReader; +import org.apache.nifi.provenance.toc.TocReader; +import org.apache.nifi.provenance.toc.TocUtil; public class RecordReaders { public static RecordReader newRecordReader(File file, final Collection<Path> provenanceLogFiles) throws IOException { final File originalFile = file; - - if (!file.exists()) { - if (provenanceLogFiles == null) { - throw new FileNotFoundException(file.toString()); - } - - final String baseName = LuceneUtil.substringBefore(file.getName(), ".") + "."; - for (final Path path : provenanceLogFiles) { - if (path.toFile().getName().startsWith(baseName)) { - file = path.toFile(); - break; - } - } - } - InputStream fis = null; - if ( file.exists() ) { - try { - fis = new FileInputStream(file); - } catch (final FileNotFoundException fnfe) { - fis = null; - } - } - - openStream: while ( fis == null ) { - final File dir = file.getParentFile(); - final String baseName = LuceneUtil.substringBefore(file.getName(), "."); - - // depending on which rollover actions have occurred, we could have 3 possibilities for the - // filename that we need. The majority of the time, we will use the extension ".prov.indexed.gz" - // because most often we are compressing on rollover and most often we have already finished - // compressing by the time that we are querying the data. - for ( final String extension : new String[] {".indexed.prov.gz", ".indexed.prov", ".prov"} ) { - file = new File(dir, baseName + extension); - if ( file.exists() ) { - try { - fis = new FileInputStream(file); - break openStream; - } catch (final FileNotFoundException fnfe) { - // file was modified by a RolloverAction after we verified that it exists but before we could - // create an InputStream for it. Start over. - fis = null; - continue openStream; - } - } - } - - break; - } - if ( fis == null ) { - throw new FileNotFoundException("Unable to locate file " + originalFile); - } - final InputStream readableStream; - if (file.getName().endsWith(".gz")) { - readableStream = new BufferedInputStream(new GZIPInputStream(fis)); - } else { - readableStream = new BufferedInputStream(fis); + try { + if (!file.exists()) { + if (provenanceLogFiles != null) { + final String baseName = LuceneUtil.substringBefore(file.getName(), ".") + "."; + for (final Path path : provenanceLogFiles) { + if (path.toFile().getName().startsWith(baseName)) { + file = path.toFile(); + break; + } + } + } + } + + if ( file.exists() ) { + try { + fis = new FileInputStream(file); + } catch (final FileNotFoundException fnfe) { + fis = null; + } + } + + String filename = file.getName(); + openStream: while ( fis == null ) { + final File dir = file.getParentFile(); + final String baseName = LuceneUtil.substringBefore(file.getName(), "."); + + // depending on which rollover actions have occurred, we could have 3 possibilities for the + // filename that we need. The majority of the time, we will use the extension ".prov.indexed.gz" + // because most often we are compressing on rollover and most often we have already finished + // compressing by the time that we are querying the data. + for ( final String extension : new String[] {".prov.gz", ".prov"} ) { + file = new File(dir, baseName + extension); + if ( file.exists() ) { + try { + fis = new FileInputStream(file); + filename = baseName + extension; + break openStream; + } catch (final FileNotFoundException fnfe) { + // file was modified by a RolloverAction after we verified that it exists but before we could + // create an InputStream for it. Start over. + fis = null; + continue openStream; + } + } + } + + break; + } + + if ( fis == null ) { + throw new FileNotFoundException("Unable to locate file " + originalFile); + } + + final File tocFile = TocUtil.getTocFile(file); + if ( tocFile.exists() ) { + final TocReader tocReader = new StandardTocReader(tocFile); + return new StandardRecordReader(fis, filename, tocReader); + } else { + return new StandardRecordReader(fis, filename); + } + } catch (final IOException ioe) { + if ( fis != null ) { + try { + fis.close(); + } catch (final IOException inner) { + ioe.addSuppressed(inner); + } + } + + throw ioe; } - - final DataInputStream dis = new DataInputStream(readableStream); - @SuppressWarnings("unused") - final String repoClassName = dis.readUTF(); - final int serializationVersion = dis.readInt(); - - return new StandardRecordReader(dis, serializationVersion, file.getName()); } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java index de98ab9..58f4dc2 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java @@ -21,6 +21,7 @@ import java.io.File; import java.io.IOException; import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.toc.TocWriter; public interface RecordWriter extends Closeable { @@ -82,4 +83,9 @@ public interface RecordWriter extends Closeable { */ void sync() throws IOException; + /** + * Returns the TOC Writer that is being used to write the Table of Contents for this journal + * @return + */ + TocWriter getTocWriter(); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java index 15349de..47b7c7e 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java @@ -20,11 +20,20 @@ import java.io.File; import java.io.IOException; import org.apache.nifi.provenance.StandardRecordWriter; +import org.apache.nifi.provenance.toc.StandardTocWriter; +import org.apache.nifi.provenance.toc.TocUtil; +import org.apache.nifi.provenance.toc.TocWriter; public class RecordWriters { + private static final int DEFAULT_COMPRESSION_BLOCK_SIZE = 1024 * 1024; // 1 MB - public static RecordWriter newRecordWriter(final File file) throws IOException { - return new StandardRecordWriter(file); + public static RecordWriter newRecordWriter(final File file, final boolean compressed, final boolean createToc) throws IOException { + return newRecordWriter(file, compressed, createToc, DEFAULT_COMPRESSION_BLOCK_SIZE); + } + + public static RecordWriter newRecordWriter(final File file, final boolean compressed, final boolean createToc, final int compressionBlockBytes) throws IOException { + final TocWriter tocWriter = createToc ? new StandardTocWriter(TocUtil.getTocFile(file), false, false) : null; + return new StandardRecordWriter(file, tocWriter, compressed, compressionBlockBytes); } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java new file mode 100644 index 0000000..8944cec --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.toc; + +import java.io.DataInputStream; +import java.io.EOFException; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; + +/** + * Standard implementation of TocReader. + * + * Expects .toc file to be in the following format; + * + * byte 0: version + * byte 1: boolean: compressionFlag -> 0 = journal is NOT compressed, 1 = journal is compressed + * byte 2-9: long: offset of block 0 + * byte 10-17: long: offset of block 1 + * ... + * byte (N*8+2)-(N*8+9): long: offset of block N + */ +public class StandardTocReader implements TocReader { + private final boolean compressed; + private final long[] offsets; + + public StandardTocReader(final File file) throws IOException { + try (final FileInputStream fis = new FileInputStream(file); + final DataInputStream dis = new DataInputStream(fis)) { + + final int version = dis.read(); + if ( version < 0 ) { + throw new EOFException(); + } + + final int compressionFlag = dis.read(); + if ( compressionFlag < 0 ) { + throw new EOFException(); + } + + if ( compressionFlag == 0 ) { + compressed = false; + } else if ( compressionFlag == 1 ) { + compressed = true; + } else { + throw new IOException("Table of Contents appears to be corrupt: could not read 'compression flag' from header; expected value of 0 or 1 but got " + compressionFlag); + } + + final int numBlocks = (int) ((file.length() - 2) / 8); + offsets = new long[numBlocks]; + + for (int i=0; i < numBlocks; i++) { + offsets[i] = dis.readLong(); + } + } + } + + @Override + public boolean isCompressed() { + return compressed; + } + + @Override + public long getBlockOffset(final int blockIndex) { + if ( blockIndex >= offsets.length ) { + return -1L; + } + return offsets[blockIndex]; + } + + @Override + public long getLastBlockOffset() { + if ( offsets.length == 0 ) { + return 0L; + } + return offsets[offsets.length - 1]; + } + + @Override + public void close() throws IOException { + } + + @Override + public int getBlockIndex(final long blockOffset) { + for (int i=0; i < offsets.length; i++) { + if ( offsets[i] > blockOffset ) { + return i-1; + } + } + + return offsets.length - 1; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java new file mode 100644 index 0000000..488f225 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.toc; + +import java.io.BufferedOutputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.Files; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Standard implementation of {@link TocWriter}. + * + * Format of .toc file: + * byte 0: version + * byte 1: compressed: 0 -> not compressed, 1 -> compressed + * byte 2-9: long: offset of block 0 + * byte 10-17: long: offset of block 1 + * ... + * byte (N*8+2)-(N*8+9): long: offset of block N + */ +public class StandardTocWriter implements TocWriter { + private static final Logger logger = LoggerFactory.getLogger(StandardTocWriter.class); + + public static final byte VERSION = 1; + + private final File file; + private final FileOutputStream fos; + private final boolean alwaysSync; + private int index = -1; + + /** + * Creates a StandardTocWriter that writes to the given file. + * @param file the file to write to + * @param compressionFlag whether or not the journal is compressed + * @throws FileNotFoundException + */ + public StandardTocWriter(final File file, final boolean compressionFlag, final boolean alwaysSync) throws IOException { + final File tocDir = file.getParentFile(); + if ( !tocDir.exists() ) { + Files.createDirectories(tocDir.toPath()); + } + + this.file = file; + fos = new FileOutputStream(file); + this.alwaysSync = alwaysSync; + + final byte[] header = new byte[2]; + header[0] = VERSION; + header[1] = (byte) (compressionFlag ? 1 : 0); + fos.write(header); + fos.flush(); + + if ( alwaysSync ) { + sync(); + } + } + + @Override + public void addBlockOffset(final long offset) throws IOException { + final BufferedOutputStream bos = new BufferedOutputStream(fos); + final DataOutputStream dos = new DataOutputStream(bos); + dos.writeLong(offset); + dos.flush(); + index++; + logger.debug("Adding block {} at offset {}", index, offset); + + if ( alwaysSync ) { + sync(); + } + } + + @Override + public void sync() throws IOException { + fos.getFD().sync(); + } + + @Override + public int getCurrentBlockIndex() { + return index; + } + + @Override + public void close() throws IOException { + if (alwaysSync) { + fos.getFD().sync(); + } + + fos.close(); + } + + @Override + public File getFile() { + return file; + } + + @Override + public String toString() { + return "TOC Writer for " + file; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java new file mode 100644 index 0000000..7c197be --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.toc; + +import java.io.Closeable; + +/** + * <p> + * Reads a Table of Contents (.toc file) for a corresponding Journal File. We use a Table of Contents + * to map a Block Index to an offset into the Journal file where that Block begins. We do this so that + * we can then persist a Block Index for an event and then compress the Journal later. This way, we can + * get good compression by compressing a large batch of events at once, and this way we can also look up + * an event in a Journal that has not been compressed by looking in the Table of Contents or lookup the + * event in a Journal post-compression by simply rewriting the TOC while we compress the data. + * </p> + */ +public interface TocReader extends Closeable { + + /** + * Indicates whether or not the corresponding Journal file is compressed + * @return + */ + boolean isCompressed(); + + /** + * Returns the byte offset into the Journal File for the Block with the given index. + * @param blockIndex + * @return + */ + long getBlockOffset(int blockIndex); + + /** + * Returns the byte offset into the Journal File of the last Block in the given index + * @return + */ + long getLastBlockOffset(); + + /** + * Returns the index of the block that contains the given offset + * @param blockOffset + * @return + */ + int getBlockIndex(long blockOffset); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocUtil.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocUtil.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocUtil.java new file mode 100644 index 0000000..c30ac98 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocUtil.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.toc; + +import java.io.File; + +import org.apache.nifi.provenance.lucene.LuceneUtil; + +public class TocUtil { + + /** + * Returns the file that should be used as the Table of Contents for the given Journal File + * @param journalFile + * @return + */ + public static File getTocFile(final File journalFile) { + final File tocDir = new File(journalFile.getParentFile(), "toc"); + final String basename = LuceneUtil.substringBefore(journalFile.getName(), "."); + final File tocFile = new File(tocDir, basename + ".toc"); + return tocFile; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java new file mode 100644 index 0000000..c678053 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.toc; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; + +/** + * Writes a .toc file + */ +public interface TocWriter extends Closeable { + + /** + * Adds the given block offset as the next Block Offset in the Table of Contents + * @param offset + * @throws IOException + */ + void addBlockOffset(long offset) throws IOException; + + /** + * Returns the index of the current Block + * @return + */ + int getCurrentBlockIndex(); + + /** + * Returns the file that is currently being written to + * @return + */ + File getFile(); + + /** + * Synchronizes the data with the underlying storage device + * @throws IOException + */ + void sync() throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java index 5be208b..25a363f 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.provenance; +import static org.apache.nifi.provenance.TestUtil.createFlowFile; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -25,14 +26,14 @@ import java.io.FileFilter; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.core.SimpleAnalyzer; @@ -45,7 +46,6 @@ import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.TopDocs; import org.apache.lucene.store.FSDirectory; import org.apache.nifi.events.EventReporter; -import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.provenance.lineage.EventNode; import org.apache.nifi.provenance.lineage.Lineage; import org.apache.nifi.provenance.lineage.LineageEdge; @@ -59,8 +59,10 @@ import org.apache.nifi.provenance.search.SearchableField; import org.apache.nifi.provenance.serialization.RecordReader; import org.apache.nifi.provenance.serialization.RecordReaders; import org.apache.nifi.reporting.Severity; +import org.apache.nifi.util.file.FileUtils; import org.junit.After; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; @@ -72,87 +74,47 @@ public class TestPersistentProvenanceRepository { public TestName name = new TestName(); private PersistentProvenanceRepository repo; + private RepositoryConfiguration config; public static final int DEFAULT_ROLLOVER_MILLIS = 2000; private RepositoryConfiguration createConfiguration() { - final RepositoryConfiguration config = new RepositoryConfiguration(); + config = new RepositoryConfiguration(); config.addStorageDirectory(new File("target/storage/" + UUID.randomUUID().toString())); - config.setCompressOnRollover(false); + config.setCompressOnRollover(true); config.setMaxEventFileLife(2000L, TimeUnit.SECONDS); + config.setCompressionBlockBytes(100); return config; } + @BeforeClass + public static void setLogLevel() { + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG"); + } + @Before public void printTestName() { System.out.println("\n\n\n*********************** " + name.getMethodName() + " *****************************"); } @After - public void closeRepo() { + public void closeRepo() throws IOException { if (repo != null) { try { repo.close(); } catch (final IOException ioe) { } } + + // Delete all of the storage files. We do this in order to clean up the tons of files that + // we create but also to ensure that we have closed all of the file handles. If we leave any + // streams open, for instance, this will throw an IOException, causing our unit test to fail. + for ( final File storageDir : config.getStorageDirectories() ) { + FileUtils.deleteFile(storageDir, true); + } } - private FlowFile createFlowFile(final long id, final long fileSize, final Map<String, String> attributes) { - final Map<String, String> attrCopy = new HashMap<>(attributes); - - return new FlowFile() { - @Override - public long getId() { - return id; - } - - @Override - public long getEntryDate() { - return System.currentTimeMillis(); - } - - @Override - public Set<String> getLineageIdentifiers() { - return new HashSet<String>(); - } - - @Override - public long getLineageStartDate() { - return System.currentTimeMillis(); - } - - @Override - public Long getLastQueueDate() { - return System.currentTimeMillis(); - } - - @Override - public boolean isPenalized() { - return false; - } - - @Override - public String getAttribute(final String s) { - return attrCopy.get(s); - } - - @Override - public long getSize() { - return fileSize; - } - - @Override - public Map<String, String> getAttributes() { - return attrCopy; - } - - @Override - public int compareTo(final FlowFile o) { - return 0; - } - }; - } + private EventReporter getEventReporter() { return new EventReporter() { @@ -261,6 +223,8 @@ public class TestPersistentProvenanceRepository { repo.registerEvent(record); } + Thread.sleep(1000L); + repo.close(); Thread.sleep(500L); // Give the repo time to shutdown (i.e., close all file handles, etc.) @@ -417,10 +381,10 @@ public class TestPersistentProvenanceRepository { @Test public void testIndexAndCompressOnRolloverAndSubsequentSearch() throws IOException, InterruptedException, ParseException { final RepositoryConfiguration config = createConfiguration(); - config.setMaxRecordLife(3, TimeUnit.SECONDS); - config.setMaxStorageCapacity(1024L * 1024L); + config.setMaxRecordLife(30, TimeUnit.SECONDS); + config.setMaxStorageCapacity(1024L * 1024L * 10); config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS); - config.setMaxEventFileCapacity(1024L * 1024L); + config.setMaxEventFileCapacity(1024L * 1024L * 10); config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields())); repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); @@ -923,12 +887,16 @@ public class TestPersistentProvenanceRepository { final PersistentProvenanceRepository secondRepo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); secondRepo.initialize(getEventReporter()); - final ProvenanceEventRecord event11 = builder.build(); - secondRepo.registerEvent(event11); - secondRepo.waitForRollover(); - final ProvenanceEventRecord event11Retrieved = secondRepo.getEvent(10L); - assertNotNull(event11Retrieved); - assertEquals(10, event11Retrieved.getEventId()); + try { + final ProvenanceEventRecord event11 = builder.build(); + secondRepo.registerEvent(event11); + secondRepo.waitForRollover(); + final ProvenanceEventRecord event11Retrieved = secondRepo.getEvent(10L); + assertNotNull(event11Retrieved); + assertEquals(10, event11Retrieved.getEventId()); + } finally { + secondRepo.close(); + } } @Test @@ -998,6 +966,73 @@ public class TestPersistentProvenanceRepository { storageDirFiles = config.getStorageDirectories().get(0).listFiles(indexFileFilter); assertEquals(0, storageDirFiles.length); } + + + @Test + public void testBackPressure() throws IOException, InterruptedException { + final RepositoryConfiguration config = createConfiguration(); + config.setMaxEventFileCapacity(1L); // force rollover on each record. + config.setJournalCount(1); + + final AtomicInteger journalCountRef = new AtomicInteger(0); + + repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) { + @Override + protected int getJournalCount() { + return journalCountRef.get(); + } + }; + repo.initialize(getEventReporter()); + + final Map<String, String> attributes = new HashMap<>(); + final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); + builder.setEventTime(System.currentTimeMillis()); + builder.setEventType(ProvenanceEventType.RECEIVE); + builder.setTransitUri("nifi://unit-test"); + attributes.put("uuid", UUID.randomUUID().toString()); + builder.fromFlowFile(createFlowFile(3L, 3000L, attributes)); + builder.setComponentId("1234"); + builder.setComponentType("dummy processor"); + + // ensure that we can register the events. + for (int i = 0; i < 10; i++) { + builder.fromFlowFile(createFlowFile(i, 3000L, attributes)); + attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i); + repo.registerEvent(builder.build()); + } + + // set number of journals to 6 so that we will block. + journalCountRef.set(6); + + final AtomicLong threadNanos = new AtomicLong(0L); + final Thread t = new Thread(new Runnable() { + @Override + public void run() { + final long start = System.nanoTime(); + builder.fromFlowFile(createFlowFile(13, 3000L, attributes)); + attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + 13); + repo.registerEvent(builder.build()); + threadNanos.set(System.nanoTime() - start); + } + }); + t.start(); + + Thread.sleep(1500L); + + journalCountRef.set(1); + t.join(); + + final int threadMillis = (int) TimeUnit.NANOSECONDS.toMillis(threadNanos.get()); + assertTrue(threadMillis > 1200); // use 1200 to account for the fact that the timing is not exact + + builder.fromFlowFile(createFlowFile(15, 3000L, attributes)); + attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + 15); + repo.registerEvent(builder.build()); + } + + + // TODO: test EOF on merge + // TODO: Test journal with no records @Test public void testTextualQuery() throws InterruptedException, IOException, ParseException {
