http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java index fe89a5e..27f2cbb 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java @@ -87,11 +87,13 @@ import org.apache.nifi.provenance.serialization.RecordReader; import org.apache.nifi.provenance.serialization.RecordReaders; import org.apache.nifi.provenance.serialization.RecordWriter; import org.apache.nifi.provenance.serialization.RecordWriters; +import org.apache.nifi.provenance.toc.TocReader; import org.apache.nifi.provenance.toc.TocUtil; import org.apache.nifi.reporting.Severity; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.RingBuffer; +import org.apache.nifi.util.RingBuffer.ForEachEvaluator; import org.apache.nifi.util.StopWatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -238,6 +240,9 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } }, rolloverCheckMillis, rolloverCheckMillis, TimeUnit.MILLISECONDS); + expirationActions.add(new DeleteIndexAction(this, indexConfig, indexManager)); + expirationActions.add(new FileRemovalAction()); + scheduledExecService.scheduleWithFixedDelay(new RemoveExpiredQueryResults(), 30L, 3L, TimeUnit.SECONDS); scheduledExecService.scheduleWithFixedDelay(new Runnable() { @Override @@ -253,9 +258,6 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } } }, 1L, 1L, TimeUnit.MINUTES); - - expirationActions.add(new DeleteIndexAction(this, indexConfig, indexManager)); - expirationActions.add(new FileRemovalAction()); } } finally { writeLock.unlock(); @@ -328,7 +330,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository final File journalFile = new File(journalDirectory, String.valueOf(initialRecordId) + ".journal." + i); writers[i] = RecordWriters.newRecordWriter(journalFile, false, false); - writers[i].writeHeader(); + writers[i].writeHeader(initialRecordId); } logger.info("Created new Provenance Event Writers for events starting with ID {}", initialRecordId); @@ -361,6 +363,19 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository for (final Path path : paths) { try (RecordReader reader = RecordReaders.newRecordReader(path.toFile(), getAllLogFiles())) { + // if this is the first record, try to find out the block index and jump directly to + // the block index. This avoids having to read through a lot of data that we don't care about + // just to get to the first record that we want. + if ( records.isEmpty() ) { + final TocReader tocReader = reader.getTocReader(); + if ( tocReader != null ) { + final Integer blockIndex = tocReader.getBlockIndexForEventId(firstRecordId); + if (blockIndex != null) { + reader.skipToBlock(blockIndex); + } + } + } + StandardProvenanceEventRecord record; while (records.size() < maxRecords && ((record = reader.nextRecord()) != null)) { if (record.getEventId() >= firstRecordId) { @@ -699,7 +714,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository if (bytesWrittenSinceRollover.get() >= configuration.getMaxEventFileCapacity()) { try { rollover(false); - } catch (IOException e) { + } catch (final IOException e) { logger.error("Failed to Rollover Provenance Event Repository file due to {}", e.toString()); logger.error("", e); eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to Rollover Provenance Event Repository file due to " + e.toString()); @@ -810,8 +825,8 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } }; - // If we have too much data, start aging it off - if (bytesUsed > configuration.getMaxStorageCapacity()) { + // If we have too much data (at least 90% of our max capacity), start aging it off + if (bytesUsed > configuration.getMaxStorageCapacity() * 0.9) { Collections.sort(sortedByBasename, sortByBasenameComparator); for (final File file : sortedByBasename) { @@ -864,15 +879,15 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } // Update the Map ID to Path map to not include the removed file - // we use a lock here because otherwise we have a check-then-modify between get/set on the AtomicReference. - // But we keep the AtomicReference because in other places we do just a .get() - writeLock.lock(); - try { - logger.debug("Obtained write lock to update ID/Path Map for expiration"); - - final SortedMap<Long, Path> pathMap = idToPathMap.get(); + // We cannot obtain the write lock here because there may be a need for the lock in the rollover method, + // if we have 'backpressure applied'. This would result in a deadlock because the rollover method would be + // waiting for purgeOldEvents, and purgeOldEvents would be waiting for the write lock held by rollover. + boolean updated = false; + while (!updated) { + final SortedMap<Long, Path> existingPathMap = idToPathMap.get(); final SortedMap<Long, Path> newPathMap = new TreeMap<>(new PathMapComparator()); - newPathMap.putAll(pathMap); + newPathMap.putAll(existingPathMap); + final Iterator<Map.Entry<Long, Path>> itr = newPathMap.entrySet().iterator(); while (itr.hasNext()) { final Map.Entry<Long, Path> entry = itr.next(); @@ -883,10 +898,9 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository itr.remove(); } } - idToPathMap.set(newPathMap); + + updated = idToPathMap.compareAndSet(existingPathMap, newPathMap); logger.debug("After expiration, path map: {}", newPathMap); - } finally { - writeLock.unlock(); } } @@ -946,36 +960,6 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository logger.debug("Going to merge {} files for journals starting with ID {}", journalsToMerge.size(), LuceneUtil.substringBefore(journalsToMerge.get(0).getName(), ".")); } - int journalFileCount = getJournalCount(); - final int journalCountThreshold = configuration.getJournalCount() * 5; - if ( journalFileCount > journalCountThreshold ) { - logger.warn("The rate of the dataflow is exceeding the provenance recording rate.â" - + "Slowing down flow to accomodate. Currently, there are {} journal files and " - + "threshold for blocking is {}", journalFileCount, journalCountThreshold); - eventReporter.reportEvent(Severity.WARNING, "Provenance Repository", "The rate of the dataflow is " - + "exceeding the provenance recording rate.âSlowing down flow to accomodate"); - - while (journalFileCount > journalCountThreshold) { - try { - Thread.sleep(1000L); - } catch (final InterruptedException ie) { - } - - logger.debug("Provenance Repository is still behind. Keeping flow slowed down " - + "to accomodate. Currently, there are {} journal files and " - + "threshold for blocking is {}", journalFileCount, journalCountThreshold); - - journalFileCount = getJournalCount(); - } - - logger.info("Provenance Repository has no caught up with rolling over journal files. Current number of " - + "journal files to be rolled over is {}", journalFileCount); - } - - writers = createWriters(configuration, idGenerator.get()); - streamStartTime.set(System.currentTimeMillis()); - recordsWrittenSinceRollover.getAndSet(0); - final long storageDirIdx = storageDirectoryIndex.getAndIncrement(); final List<File> storageDirs = configuration.getStorageDirectories(); final File storageDir = storageDirs.get((int) (storageDirIdx % storageDirs.size())); @@ -1001,21 +985,19 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository if (fileRolledOver == null) { return; } - File file = fileRolledOver; + final File file = fileRolledOver; // update our map of id to Path - // need lock to update the map, even though it's an AtomicReference, AtomicReference allows those doing a - // get() to obtain the most up-to-date version but we use a writeLock to prevent multiple threads modifying - // it at one time - writeLock.lock(); - try { - final Long fileFirstEventId = Long.valueOf(LuceneUtil.substringBefore(fileRolledOver.getName(), ".")); - SortedMap<Long, Path> newIdToPathMap = new TreeMap<>(new PathMapComparator()); - newIdToPathMap.putAll(idToPathMap.get()); + // We need to make sure that another thread doesn't also update the map at the same time. We cannot + // use the write lock when purging old events, and we want to use the same approach here. + boolean updated = false; + final Long fileFirstEventId = Long.valueOf(LuceneUtil.substringBefore(fileRolledOver.getName(), ".")); + while (!updated) { + final SortedMap<Long, Path> existingPathMap = idToPathMap.get(); + final SortedMap<Long, Path> newIdToPathMap = new TreeMap<>(new PathMapComparator()); + newIdToPathMap.putAll(existingPathMap); newIdToPathMap.put(fileFirstEventId, file.toPath()); - idToPathMap.set(newIdToPathMap); - } finally { - writeLock.unlock(); + updated = idToPathMap.compareAndSet(existingPathMap, newIdToPathMap); } logger.info("Successfully Rolled over Provenance Event file containing {} records", recordsWritten); @@ -1045,6 +1027,42 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository streamStartTime.set(System.currentTimeMillis()); bytesWrittenSinceRollover.set(0); + + // We don't want to create new 'writers' until the number of unmerged journals falls below our threshold. So we wait + // here before we repopulate the 'writers' member variable and release the lock. + int journalFileCount = getJournalCount(); + long repoSize = getSize(getLogFiles(), 0L); + final int journalCountThreshold = configuration.getJournalCount() * 5; + final long sizeThreshold = (long) (configuration.getMaxStorageCapacity() * 1.1D); // do not go over 10% of max capacity + + if (journalFileCount > journalCountThreshold || repoSize > sizeThreshold) { + logger.warn("The rate of the dataflow is exceeding the provenance recording rate. " + + "Slowing down flow to accomodate. Currently, there are {} journal files ({} bytes) and " + + "threshold for blocking is {} ({} bytes)", journalFileCount, repoSize, journalCountThreshold, sizeThreshold); + eventReporter.reportEvent(Severity.WARNING, "Provenance Repository", "The rate of the dataflow is " + + "exceeding the provenance recording rate. Slowing down flow to accomodate"); + + while (journalFileCount > journalCountThreshold || repoSize > sizeThreshold) { + try { + Thread.sleep(1000L); + } catch (final InterruptedException ie) { + } + + logger.debug("Provenance Repository is still behind. Keeping flow slowed down " + + "to accomodate. Currently, there are {} journal files ({} bytes) and " + + "threshold for blocking is {} ({} bytes)", journalFileCount, repoSize, journalCountThreshold, sizeThreshold); + + journalFileCount = getJournalCount(); + repoSize = getSize(getLogFiles(), 0L); + } + + logger.info("Provenance Repository has now caught up with rolling over journal files. Current number of " + + "journal files to be rolled over is {}", journalFileCount); + } + + writers = createWriters(configuration, idGenerator.get()); + streamStartTime.set(System.currentTimeMillis()); + recordsWrittenSinceRollover.getAndSet(0); } } @@ -1231,6 +1249,8 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } }); + long minEventId = 0L; + long earliestTimestamp = System.currentTimeMillis(); for (final RecordReader reader : readers) { StandardProvenanceEventRecord record = null; @@ -1252,17 +1272,33 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository continue; } + if ( record.getEventTime() < earliestTimestamp ) { + earliestTimestamp = record.getEventTime(); + } + + if ( record.getEventId() < minEventId ) { + minEventId = record.getEventId(); + } + recordToReaderMap.put(record, reader); } + // We want to keep track of the last 1000 events in the files so that we can add them to 'ringBuffer'. + // However, we don't want to add them directly to ringBuffer, because once they are added to ringBuffer, they are + // available in query results. As a result, we can have the issue where we've not finished indexing the file + // but we try to create the lineage for events in that file. In order to avoid this, we will add the records + // to a temporary RingBuffer and after we finish merging the records will then copy the data to the + // ringBuffer provided as a method argument. + final RingBuffer<ProvenanceEventRecord> latestRecords = new RingBuffer<>(1000); + // loop over each entry in the map, persisting the records to the merged file in order, and populating the map // with the next entry from the journal file from which the previous record was written. try (final RecordWriter writer = RecordWriters.newRecordWriter(writerFile, configuration.isCompressOnRollover(), true)) { - writer.writeHeader(); + writer.writeHeader(minEventId); final IndexingAction indexingAction = new IndexingAction(this); - final File indexingDirectory = indexConfig.getWritableIndexDirectory(writerFile); + final File indexingDirectory = indexConfig.getWritableIndexDirectory(writerFile, earliestTimestamp); final IndexWriter indexWriter = indexManager.borrowIndexWriter(indexingDirectory); try { long maxId = 0L; @@ -1278,7 +1314,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository indexingAction.index(record, indexWriter, blockIndex); maxId = record.getEventId(); - ringBuffer.add(record); + latestRecords.add(record); records++; // Remove this entry from the map @@ -1303,6 +1339,15 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository indexManager.returnIndexWriter(indexingDirectory, indexWriter); } } + + // record should now be available in the repository. We can copy the values from latestRecords to ringBuffer. + latestRecords.forEach(new ForEachEvaluator<ProvenanceEventRecord>() { + @Override + public boolean evaluate(final ProvenanceEventRecord event) { + ringBuffer.add(event); + return true; + } + }); } finally { for (final RecordReader reader : readers) { try { @@ -1452,11 +1497,11 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository try (final DirectoryReader directoryReader = DirectoryReader.open(FSDirectory.open(indexDirectory))) { final IndexSearcher searcher = new IndexSearcher(directoryReader); - TopDocs topDocs = searcher.search(luceneQuery, 10000000); + final TopDocs topDocs = searcher.search(luceneQuery, 10000000); logger.info("For {}, Top Docs has {} hits; reading Lucene results", indexDirectory, topDocs.scoreDocs.length); if (topDocs.totalHits > 0) { - for (ScoreDoc scoreDoc : topDocs.scoreDocs) { + for (final ScoreDoc scoreDoc : topDocs.scoreDocs) { final int docId = scoreDoc.doc; final Document d = directoryReader.document(docId); localScoreDocs.add(d); @@ -1649,16 +1694,16 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } switch (event.getEventType()) { - case CLONE: - case FORK: - case JOIN: - case REPLAY: - return submitLineageComputation(event.getChildUuids(), LineageComputationType.EXPAND_CHILDREN, eventId, event.getEventTime(), Long.MAX_VALUE); - default: - final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1); - lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); - submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its children cannot be expanded"); - return submission; + case CLONE: + case FORK: + case JOIN: + case REPLAY: + return submitLineageComputation(event.getChildUuids(), LineageComputationType.EXPAND_CHILDREN, eventId, event.getEventTime(), Long.MAX_VALUE); + default: + final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1); + lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); + submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its children cannot be expanded"); + return submission; } } catch (final IOException ioe) { final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1); @@ -1686,17 +1731,17 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } switch (event.getEventType()) { - case JOIN: - case FORK: - case CLONE: - case REPLAY: - return submitLineageComputation(event.getParentUuids(), LineageComputationType.EXPAND_PARENTS, eventId, 0L, event.getEventTime()); - default: { - final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String>emptyList(), 1); - lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); - submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its parents cannot be expanded"); - return submission; - } + case JOIN: + case FORK: + case CLONE: + case REPLAY: + return submitLineageComputation(event.getParentUuids(), LineageComputationType.EXPAND_PARENTS, eventId, event.getLineageStartDate(), event.getEventTime()); + default: { + final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String>emptyList(), 1); + lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); + submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its parents cannot be expanded"); + return submission; + } } } catch (final IOException ioe) { final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String>emptyList(), 1); @@ -1880,7 +1925,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } try { - final Set<ProvenanceEventRecord> matchingRecords = LineageQuery.computeLineageForFlowFiles(PersistentProvenanceRepository.this, indexDir, null, flowFileUuids); + final Set<ProvenanceEventRecord> matchingRecords = LineageQuery.computeLineageForFlowFiles(PersistentProvenanceRepository.this, indexManager, indexDir, null, flowFileUuids); final StandardLineageResult result = submission.getResult(); result.update(matchingRecords);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java index 3095f13..50caee1 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java @@ -86,16 +86,22 @@ public class StandardRecordWriter implements RecordWriter { } @Override - public synchronized void writeHeader() throws IOException { + public synchronized void writeHeader(final long firstEventId) throws IOException { lastBlockOffset = rawOutStream.getBytesWritten(); - resetWriteStream(); + resetWriteStream(firstEventId); out.writeUTF(PersistentProvenanceRepository.class.getName()); out.writeInt(PersistentProvenanceRepository.SERIALIZATION_VERSION); out.flush(); } - private void resetWriteStream() throws IOException { + + /** + * Resets the streams to prepare for a new block + * @param eventId the first id that will be written to the new block + * @throws IOException if unable to flush/close the current streams properly + */ + private void resetWriteStream(final long eventId) throws IOException { if ( out != null ) { out.flush(); } @@ -112,13 +118,13 @@ public class StandardRecordWriter implements RecordWriter { } if ( tocWriter != null ) { - tocWriter.addBlockOffset(rawOutStream.getBytesWritten()); + tocWriter.addBlockOffset(rawOutStream.getBytesWritten(), eventId); } writableStream = new BufferedOutputStream(new GZIPOutputStream(new NonCloseableOutputStream(rawOutStream), 1), 65536); } else { if ( tocWriter != null ) { - tocWriter.addBlockOffset(rawOutStream.getBytesWritten()); + tocWriter.addBlockOffset(rawOutStream.getBytesWritten(), eventId); } writableStream = new BufferedOutputStream(rawOutStream, 65536); @@ -130,7 +136,7 @@ public class StandardRecordWriter implements RecordWriter { @Override - public synchronized long writeRecord(final ProvenanceEventRecord record, long recordIdentifier) throws IOException { + public synchronized long writeRecord(final ProvenanceEventRecord record, final long recordIdentifier) throws IOException { final ProvenanceEventType recordType = record.getEventType(); final long startBytes = byteCountingOut.getBytesWritten(); @@ -142,7 +148,7 @@ public class StandardRecordWriter implements RecordWriter { // because of the way that GZIPOutputStream works, we need to call close() on it in order for it // to write its trailing bytes. But we don't want to close the underlying OutputStream, so we wrap // the underlying OutputStream in a NonCloseableOutputStream - resetWriteStream(); + resetWriteStream(recordIdentifier); } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java index 3f75c00..502068b 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java @@ -19,26 +19,23 @@ package org.apache.nifi.provenance.lucene; import static java.util.Objects.requireNonNull; import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.Collection; +import java.util.Collections; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.nifi.provenance.PersistentProvenanceRepository; -import org.apache.nifi.provenance.ProvenanceEventRecord; -import org.apache.nifi.provenance.SearchableFields; - -import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.Term; import org.apache.lucene.search.BooleanClause.Occur; import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TopDocs; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.FSDirectory; +import org.apache.nifi.provenance.PersistentProvenanceRepository; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.SearchableFields; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,7 +45,7 @@ public class LineageQuery { public static final int MAX_LINEAGE_UUIDS = 100; private static final Logger logger = LoggerFactory.getLogger(LineageQuery.class); - public static Set<ProvenanceEventRecord> computeLineageForFlowFiles(final PersistentProvenanceRepository repo, final File indexDirectory, + public static Set<ProvenanceEventRecord> computeLineageForFlowFiles(final PersistentProvenanceRepository repo, final IndexManager indexManager, final File indexDirectory, final String lineageIdentifier, final Collection<String> flowFileUuids) throws IOException { if (requireNonNull(flowFileUuids).size() > MAX_LINEAGE_UUIDS) { throw new IllegalArgumentException(String.format("Cannot compute lineage for more than %s FlowFiles. This lineage contains %s.", MAX_LINEAGE_UUIDS, flowFileUuids.size())); @@ -58,52 +55,62 @@ public class LineageQuery { throw new IllegalArgumentException("Must specify either Lineage Identifier or FlowFile UUIDs to compute lineage"); } - try (final Directory fsDir = FSDirectory.open(indexDirectory); - final IndexReader indexReader = DirectoryReader.open(fsDir)) { - - final IndexSearcher searcher = new IndexSearcher(indexReader); - - // Create a query for all Events related to the FlowFiles of interest. We do this by adding all ID's as - // "SHOULD" clauses and then setting the minimum required to 1. - final BooleanQuery flowFileIdQuery; - if (flowFileUuids == null || flowFileUuids.isEmpty()) { - flowFileIdQuery = null; - } else { - flowFileIdQuery = new BooleanQuery(); - for (final String flowFileUuid : flowFileUuids) { - flowFileIdQuery.add(new TermQuery(new Term(SearchableFields.FlowFileUUID.getSearchableFieldName(), flowFileUuid)), Occur.SHOULD); + final IndexSearcher searcher; + try { + searcher = indexManager.borrowIndexSearcher(indexDirectory); + try { + // Create a query for all Events related to the FlowFiles of interest. We do this by adding all ID's as + // "SHOULD" clauses and then setting the minimum required to 1. + final BooleanQuery flowFileIdQuery; + if (flowFileUuids == null || flowFileUuids.isEmpty()) { + flowFileIdQuery = null; + } else { + flowFileIdQuery = new BooleanQuery(); + for (final String flowFileUuid : flowFileUuids) { + flowFileIdQuery.add(new TermQuery(new Term(SearchableFields.FlowFileUUID.getSearchableFieldName(), flowFileUuid)), Occur.SHOULD); + } + flowFileIdQuery.setMinimumNumberShouldMatch(1); } - flowFileIdQuery.setMinimumNumberShouldMatch(1); - } - - BooleanQuery query; - if (lineageIdentifier == null) { - query = flowFileIdQuery; - } else { - final BooleanQuery lineageIdQuery = new BooleanQuery(); - lineageIdQuery.add(new TermQuery(new Term(SearchableFields.LineageIdentifier.getSearchableFieldName(), lineageIdentifier)), Occur.MUST); - if (flowFileIdQuery == null) { - query = lineageIdQuery; + BooleanQuery query; + if (lineageIdentifier == null) { + query = flowFileIdQuery; } else { - query = new BooleanQuery(); - query.add(flowFileIdQuery, Occur.SHOULD); - query.add(lineageIdQuery, Occur.SHOULD); - query.setMinimumNumberShouldMatch(1); + final BooleanQuery lineageIdQuery = new BooleanQuery(); + lineageIdQuery.add(new TermQuery(new Term(SearchableFields.LineageIdentifier.getSearchableFieldName(), lineageIdentifier)), Occur.MUST); + + if (flowFileIdQuery == null) { + query = lineageIdQuery; + } else { + query = new BooleanQuery(); + query.add(flowFileIdQuery, Occur.SHOULD); + query.add(lineageIdQuery, Occur.SHOULD); + query.setMinimumNumberShouldMatch(1); + } } - } - final long searchStart = System.nanoTime(); - final TopDocs uuidQueryTopDocs = searcher.search(query, MAX_QUERY_RESULTS); - final long searchEnd = System.nanoTime(); + final long searchStart = System.nanoTime(); + final TopDocs uuidQueryTopDocs = searcher.search(query, MAX_QUERY_RESULTS); + final long searchEnd = System.nanoTime(); + + final DocsReader docsReader = new DocsReader(repo.getConfiguration().getStorageDirectories()); + final Set<ProvenanceEventRecord> recs = docsReader.read(uuidQueryTopDocs, searcher.getIndexReader(), repo.getAllLogFiles(), new AtomicInteger(0), Integer.MAX_VALUE); + final long readDocsEnd = System.nanoTime(); + logger.debug("Finished Lineage Query against {}; Lucene search took {} millis, reading records took {} millis", + indexDirectory, TimeUnit.NANOSECONDS.toMillis(searchEnd - searchStart), TimeUnit.NANOSECONDS.toMillis(readDocsEnd - searchEnd)); - final DocsReader docsReader = new DocsReader(repo.getConfiguration().getStorageDirectories()); - final Set<ProvenanceEventRecord> recs = docsReader.read(uuidQueryTopDocs, indexReader, repo.getAllLogFiles(), new AtomicInteger(0), Integer.MAX_VALUE); - final long readDocsEnd = System.nanoTime(); - logger.debug("Finished Lineage Query; Lucene search took {} millis, reading records took {} millis", - TimeUnit.NANOSECONDS.toMillis(searchEnd - searchStart), TimeUnit.NANOSECONDS.toMillis(readDocsEnd - searchEnd)); + return recs; + } finally { + indexManager.returnIndexSearcher(indexDirectory, searcher); + } + } catch (final FileNotFoundException fnfe) { + // nothing has been indexed yet, or the data has already aged off + logger.warn("Attempted to search Provenance Index {} but could not find the file due to {}", indexDirectory, fnfe); + if ( logger.isDebugEnabled() ) { + logger.warn("", fnfe); + } - return recs; + return Collections.emptySet(); } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java index d89fd6f..7c9bcc0 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java @@ -28,9 +28,10 @@ public interface RecordWriter extends Closeable { /** * Writes header information to the underlying stream * + * @param firstEventId the ID of the first provenance event that will be written to the stream * @throws IOException if unable to write header information to the underlying stream */ - void writeHeader() throws IOException; + void writeHeader(long firstEventId) throws IOException; /** * Writes the given record out to the underlying stream http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java index 7c13a2a..61f86e7 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java @@ -37,6 +37,7 @@ import java.io.IOException; public class StandardTocReader implements TocReader { private final boolean compressed; private final long[] offsets; + private final long[] firstEventIds; public StandardTocReader(final File file) throws IOException { try (final FileInputStream fis = new FileInputStream(file); @@ -60,11 +61,32 @@ public class StandardTocReader implements TocReader { throw new IOException("Table of Contents appears to be corrupt: could not read 'compression flag' from header; expected value of 0 or 1 but got " + compressionFlag); } - final int numBlocks = (int) ((file.length() - 2) / 8); + final int blockInfoBytes; + switch (version) { + case 1: + blockInfoBytes = 8; + break; + case 2: + default: + blockInfoBytes = 16; + break; + } + + final int numBlocks = (int) ((file.length() - 2) / blockInfoBytes); offsets = new long[numBlocks]; + if ( version > 1 ) { + firstEventIds = new long[numBlocks]; + } else { + firstEventIds = new long[0]; + } + for (int i=0; i < numBlocks; i++) { offsets[i] = dis.readLong(); + + if ( version > 1 ) { + firstEventIds[i] = dis.readLong(); + } } } } @@ -98,11 +120,36 @@ public class StandardTocReader implements TocReader { public int getBlockIndex(final long blockOffset) { for (int i=0; i < offsets.length; i++) { if ( offsets[i] > blockOffset ) { - return i-1; + // if the offset is less than the offset of our first block, + // just return 0 to indicate the first block. Otherwise, + // return i-1 because i represents the first block whose offset is + // greater than 'blockOffset'. + return (i == 0) ? 0 : i-1; } } + // None of the blocks have an offset greater than the provided offset. + // Therefore, if the event is present, it must be in the last block. return offsets.length - 1; } + @Override + public Integer getBlockIndexForEventId(final long eventId) { + // if we don't have event ID's stored in the TOC (which happens for version 1 of the TOC), + // or if the event ID is less than the first Event ID in this TOC, then the Event ID + // is unknown -- return null. + if ( firstEventIds.length == 0 || eventId < firstEventIds[0] ) { + return null; + } + + for (int i=1; i < firstEventIds.length; i++) { + if ( firstEventIds[i] > eventId ) { + return i-1; + } + } + + // None of the blocks start with an Event ID greater than the provided ID. + // Therefore, if the event is present, it must be in the last block. + return firstEventIds.length - 1; + } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java index 10de459..afa5d13 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java @@ -40,7 +40,7 @@ import org.slf4j.LoggerFactory; public class StandardTocWriter implements TocWriter { private static final Logger logger = LoggerFactory.getLogger(StandardTocWriter.class); - public static final byte VERSION = 1; + public static final byte VERSION = 2; private final File file; private final FileOutputStream fos; @@ -75,10 +75,11 @@ public class StandardTocWriter implements TocWriter { } @Override - public void addBlockOffset(final long offset) throws IOException { + public void addBlockOffset(final long offset, final long firstEventId) throws IOException { final BufferedOutputStream bos = new BufferedOutputStream(fos); final DataOutputStream dos = new DataOutputStream(bos); dos.writeLong(offset); + dos.writeLong(firstEventId); dos.flush(); index++; logger.debug("Adding block {} at offset {}", index, offset); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java index 97e2838..f7ddd59 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java @@ -59,4 +59,13 @@ public interface TocReader extends Closeable { * @return the index of the block that contains the given offset */ int getBlockIndex(long blockOffset); + + /** + * Returns the block index where the given event ID should be found + * + * @param eventId the ID of the provenance event of interest + * @return the block index where the given event ID should be found, or <code>null</code> if + * the block index is not known + */ + Integer getBlockIndexForEventId(long eventId); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java index 38f910f..90faea1 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java @@ -21,16 +21,19 @@ import java.io.File; import java.io.IOException; /** - * Writes a .toc file + * Writes a Table-of-Contents (.toc) file */ public interface TocWriter extends Closeable { /** * Adds the given block offset as the next Block Offset in the Table of Contents + * * @param offset the byte offset at which the block begins + * @param firstEventId the ID of the first Provenance Event that will be in the block + * * @throws IOException if unable to persist the block index */ - void addBlockOffset(long offset) throws IOException; + void addBlockOffset(long offset, long firstEventId) throws IOException; /** * @return the index of the current Block http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java index 136f244..f242642 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java @@ -67,7 +67,7 @@ public class TestStandardRecordReaderWriter { final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false); final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, false, 1024 * 1024); - writer.writeHeader(); + writer.writeHeader(1L); writer.writeRecord(createEvent(), 1L); writer.close(); @@ -77,7 +77,7 @@ public class TestStandardRecordReaderWriter { final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) { assertEquals(0, reader.getBlockIndex()); reader.skipToBlock(0); - StandardProvenanceEventRecord recovered = reader.nextRecord(); + final StandardProvenanceEventRecord recovered = reader.nextRecord(); assertNotNull(recovered); assertEquals("nifi://unit-test", recovered.getTransitUri()); @@ -95,7 +95,7 @@ public class TestStandardRecordReaderWriter { final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false); final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, true, 100); - writer.writeHeader(); + writer.writeHeader(1L); writer.writeRecord(createEvent(), 1L); writer.close(); @@ -105,7 +105,7 @@ public class TestStandardRecordReaderWriter { final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) { assertEquals(0, reader.getBlockIndex()); reader.skipToBlock(0); - StandardProvenanceEventRecord recovered = reader.nextRecord(); + final StandardProvenanceEventRecord recovered = reader.nextRecord(); assertNotNull(recovered); assertEquals("nifi://unit-test", recovered.getTransitUri()); @@ -124,7 +124,7 @@ public class TestStandardRecordReaderWriter { // new record each 1 MB of uncompressed data final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, true, 1024 * 1024); - writer.writeHeader(); + writer.writeHeader(1L); for (int i=0; i < 10; i++) { writer.writeRecord(createEvent(), i); } @@ -143,7 +143,7 @@ public class TestStandardRecordReaderWriter { reader.skipToBlock(0); } - StandardProvenanceEventRecord recovered = reader.nextRecord(); + final StandardProvenanceEventRecord recovered = reader.nextRecord(); assertNotNull(recovered); assertEquals("nifi://unit-test", recovered.getTransitUri()); } @@ -163,7 +163,7 @@ public class TestStandardRecordReaderWriter { // new block each 10 bytes final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, true, 100); - writer.writeHeader(); + writer.writeHeader(1L); for (int i=0; i < 10; i++) { writer.writeRecord(createEvent(), i); } @@ -174,7 +174,7 @@ public class TestStandardRecordReaderWriter { try (final FileInputStream fis = new FileInputStream(journalFile); final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) { for (int i=0; i < 10; i++) { - StandardProvenanceEventRecord recovered = reader.nextRecord(); + final StandardProvenanceEventRecord recovered = reader.nextRecord(); System.out.println(recovered); assertNotNull(recovered); assertEquals((long) i, recovered.getEventId()); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java index 87400a0..9a5f424 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java @@ -64,15 +64,42 @@ public class TestStandardTocReader { @Test - public void testGetBlockIndex() throws IOException { + public void testGetBlockIndexV1() throws IOException { final File file = new File("target/" + UUID.randomUUID().toString()); try (final OutputStream out = new FileOutputStream(file); final DataOutputStream dos = new DataOutputStream(out)) { + out.write(1); out.write(0); + + for (int i=0; i < 1024; i++) { + dos.writeLong(i * 1024L); + } + } + + try { + try(final StandardTocReader reader = new StandardTocReader(file)) { + assertFalse(reader.isCompressed()); + + for (int i=0; i < 1024; i++) { + assertEquals(i * 1024, reader.getBlockOffset(i)); + } + } + } finally { + file.delete(); + } + } + + @Test + public void testGetBlockIndexV2() throws IOException { + final File file = new File("target/" + UUID.randomUUID().toString()); + try (final OutputStream out = new FileOutputStream(file); + final DataOutputStream dos = new DataOutputStream(out)) { + out.write(2); out.write(0); for (int i=0; i < 1024; i++) { dos.writeLong(i * 1024L); + dos.writeLong(0L); } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-nar/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-nar/pom.xml b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-nar/pom.xml new file mode 100644 index 0000000..aeab88c --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-nar/pom.xml @@ -0,0 +1,36 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-solr-bundle</artifactId> + <version>0.1.0-incubating-SNAPSHOT</version> + </parent> + + <artifactId>nifi-solr-nar</artifactId> + <packaging>nar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-solr-processors</artifactId> + <version>0.1.0-incubating-SNAPSHOT</version> + </dependency> + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-nar/src/main/resources/META-INF/LICENSE ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-nar/src/main/resources/META-INF/LICENSE b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-nar/src/main/resources/META-INF/LICENSE new file mode 100644 index 0000000..36ba155 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-nar/src/main/resources/META-INF/LICENSE @@ -0,0 +1,321 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +APACHE NIFI SUBCOMPONENTS: + +The Apache NiFi project contains subcomponents with separate copyright +notices and license terms. Your use of the source code for the these +subcomponents is subject to the terms and conditions of the following +licenses. + +This product bundles 'Bouncycastle JDK Prov 16' which is available under +the MIT license. + + Copyright (c) 2000 - 2013 The Legion of the Bouncy Castle Inc. (http://www.bouncycastle.org) + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in + all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + THE SOFTWARE. + +This product bundes 'JCraft Jsch' which is available under a 3-Clause BSD +License. + + Copyright (c) 2002-2014 Atsuhiko Yamanaka, JCraft,Inc. + All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are met: + + 1. Redistributions of source code must retain the above copyright notice, + this list of conditions and the following disclaimer. + + 2. Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + + 3. The names of the authors may not be used to endorse or promote products + derived from this software without specific prior written permission. + + THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, + INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND + FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL JCRAFT, + INC. OR ANY CONTRIBUTORS TO THIS SOFTWARE BE LIABLE FOR ANY DIRECT, INDIRECT, + INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, + EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +This product bundles 'JCraft Jzlib' which is available under a 3-Clause BSD License. + + Copyright (c) 2002-2014 Atsuhiko Yamanaka, JCraft,Inc. + All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are met: + + 1. Redistributions of source code must retain the above copyright notice, + this list of conditions and the following disclaimer. + + 2. Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + + 3. The names of the authors may not be used to endorse or promote products + derived from this software without specific prior written permission. + + THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, + INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND + FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL JCRAFT, + INC. OR ANY CONTRIBUTORS TO THIS SOFTWARE BE LIABLE FOR ANY DIRECT, INDIRECT, + INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, + EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +This product bundles 'asm' which is available under an MIT style license. +For details see http://asm.ow2.org/asmdex-license.html + + Copyright (c) 2012 France Télécom + All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions + are met: + 1. Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + 2. Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + 3. Neither the name of the copyright holders nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + THE POSSIBILITY OF SUCH DAMAGE. http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-nar/src/main/resources/META-INF/NOTICE ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-nar/src/main/resources/META-INF/NOTICE b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000..54d29ae --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,60 @@ +nifi-solr-nar +Copyright 2014-2015 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +****************** +Apache Software License v2 +****************** + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Apache Commons IO + The following NOTICE information applies: + Apache Commons IO + Copyright 2002-2012 The Apache Software Foundation + + (ASLv2) Apache HttpComponents + The following NOTICE information applies: + Apache HttpClient + Copyright 1999-2014 The Apache Software Foundation + + Apache HttpCore + Copyright 2005-2014 The Apache Software Foundation + + Apache HttpMime + Copyright 1999-2013 The Apache Software Foundation + + This project contains annotations derived from JCIP-ANNOTATIONS + Copyright (c) 2005 Brian Goetz and Tim Peierls. See http://www.jcip.net + + (ASLv2) Apache Commons Lang + The following NOTICE information applies: + Apache Commons Lang + Copyright 2001-2014 The Apache Software Foundation + + This product includes software from the Spring Framework, + under the Apache License 2.0 (see: StringUtils.containsWhitespace()) + + (ASLv2) Apache ZooKeeper + The following NOTICE information applies: + Apache ZooKeeper + Copyright 2009-2012 The Apache Software Foundation + + (ASLv2) Woodstox Core ASL + The following NOTICE information applies: + This product currently only contains code developed by authors + of specific components, as identified by the source code files. + + Since product implements StAX API, it has dependencies to StAX API + classes. + + +****************** +Berkeley Software Distribution License +****************** + +The following binary components are provided under the Berkeley Software Distribution License. See project link for details. + + (BSD) StAX 2 API (org.codehaus.woodstox:stax2-api:jar:3.1.4 - http://http://docs.codehaus.org/display/WSTX/StAX2/) http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml new file mode 100644 index 0000000..df8e451 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml @@ -0,0 +1,117 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-solr-bundle</artifactId> + <version>0.1.0-incubating-SNAPSHOT</version> + </parent> + + <artifactId>nifi-solr-processors</artifactId> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.solr</groupId> + <artifactId>solr-solrj</artifactId> + <version>${solr.version}</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-processor-utils</artifactId> + </dependency> + <!-- test dependencies --> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + <version>1.1.3</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.11</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.solr</groupId> + <artifactId>solr-core</artifactId> + <version>${solr.version}</version> + <scope>test</scope> + </dependency> + <!-- These Lucene deps should be brought in through solr-core, but there + appears to be an issue with 5.0.0 that still references some 4.10.3 poms --> + <dependency> + <groupId>org.apache.lucene</groupId> + <artifactId>lucene-core</artifactId> + <version>${solr.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.lucene</groupId> + <artifactId>lucene-analyzers-common</artifactId> + <version>${solr.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.lucene</groupId> + <artifactId>lucene-queryparser</artifactId> + <version>${solr.version}</version> + <scope>test</scope> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <configuration> + <excludes> + <exclude>src/test/resources/solr/solr.xml</exclude> + <exclude>src/test/resources/testCollection/core.properties</exclude> + <exclude>src/test/resources/testCollection/conf/_rest_managed.json</exclude> + <exclude>src/test/resources/testCollection/conf/protowords.txt</exclude> + <exclude>src/test/resources/testCollection/conf/schema.xml</exclude> + <exclude>src/test/resources/testCollection/conf/solrconfig.xml</exclude> + <exclude>src/test/resources/testCollection/conf/synonyms.txt</exclude> + <exclude>src/test/resources/testCollection/conf/lang/stopwords_en.txt</exclude> + <exclude>src/test/resources/testdata/test-csv-multiple-docs.csv</exclude> + <exclude>src/test/resources/testdata/test-custom-json-single-doc.json</exclude> + <exclude>src/test/resources/testdata/test-solr-json-multiple-docs.json</exclude> + <exclude>src/test/resources/testdata/test-xml-multiple-docs.xml</exclude> + <exclude>src/test/resources/log4j.properties</exclude> + </excludes> + </configuration> + </plugin> + </plugins> + </build> +</project>
