Repository: incubator-nifi Updated Branches: refs/heads/develop 4baf48ae9 -> 20f11b1a7
NIFI-556: Refactored TOC's to include min event id for each block Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/20f11b1a Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/20f11b1a Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/20f11b1a Branch: refs/heads/develop Commit: 20f11b1a7764aabb6656488955ef0b4e5649356e Parents: 4baf48a Author: Mark Payne <[email protected]> Authored: Wed Apr 29 13:22:09 2015 -0400 Committer: Mark Payne <[email protected]> Committed: Wed Apr 29 13:22:09 2015 -0400 ---------------------------------------------------------------------- .../PersistentProvenanceRepository.java | 46 ++++++++++++++++-- .../nifi/provenance/StandardRecordWriter.java | 20 +++++--- .../provenance/serialization/RecordWriter.java | 3 +- .../nifi/provenance/toc/StandardTocReader.java | 51 +++++++++++++++++++- .../nifi/provenance/toc/StandardTocWriter.java | 5 +- .../apache/nifi/provenance/toc/TocReader.java | 9 ++++ .../apache/nifi/provenance/toc/TocWriter.java | 7 ++- .../TestStandardRecordReaderWriter.java | 16 +++--- .../provenance/toc/TestStandardTocReader.java | 29 ++++++++++- 9 files changed, 159 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/20f11b1a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java index 3bdd38f..6e05535 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java @@ -87,11 +87,13 @@ import org.apache.nifi.provenance.serialization.RecordReader; import org.apache.nifi.provenance.serialization.RecordReaders; import org.apache.nifi.provenance.serialization.RecordWriter; import org.apache.nifi.provenance.serialization.RecordWriters; +import org.apache.nifi.provenance.toc.TocReader; import org.apache.nifi.provenance.toc.TocUtil; import org.apache.nifi.reporting.Severity; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.RingBuffer; +import org.apache.nifi.util.RingBuffer.ForEachEvaluator; import org.apache.nifi.util.StopWatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -328,7 +330,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository final File journalFile = new File(journalDirectory, String.valueOf(initialRecordId) + ".journal." + i); writers[i] = RecordWriters.newRecordWriter(journalFile, false, false); - writers[i].writeHeader(); + writers[i].writeHeader(initialRecordId); } logger.info("Created new Provenance Event Writers for events starting with ID {}", initialRecordId); @@ -361,6 +363,19 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository for (final Path path : paths) { try (RecordReader reader = RecordReaders.newRecordReader(path.toFile(), getAllLogFiles())) { + // if this is the first record, try to find out the block index and jump directly to + // the block index. This avoids having to read through a lot of data that we don't care about + // just to get to the first record that we want. + if ( records.isEmpty() ) { + final TocReader tocReader = reader.getTocReader(); + if ( tocReader != null ) { + final Integer blockIndex = tocReader.getBlockIndexForEventId(firstRecordId); + if (blockIndex != null) { + reader.skipToBlock(blockIndex); + } + } + } + StandardProvenanceEventRecord record; while (records.size() < maxRecords && ((record = reader.nextRecord()) != null)) { if (record.getEventId() >= firstRecordId) { @@ -1231,6 +1246,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } }); + long minEventId = 0L; long earliestTimestamp = System.currentTimeMillis(); for (final RecordReader reader : readers) { StandardProvenanceEventRecord record = null; @@ -1256,13 +1272,26 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository if ( record.getEventTime() < earliestTimestamp ) { earliestTimestamp = record.getEventTime(); } + + if ( record.getEventId() < minEventId ) { + minEventId = record.getEventId(); + } + recordToReaderMap.put(record, reader); } + // We want to keep track of the last 1000 events in the files so that we can add them to 'ringBuffer'. + // However, we don't want to add them directly to ringBuffer, because once they are added to ringBuffer, they are + // available in query results. As a result, we can have the issue where we've not finished indexing the file + // but we try to create the lineage for events in that file. In order to avoid this, we will add the records + // to a temporary RingBuffer and after we finish merging the records will then copy the data to the + // ringBuffer provided as a method argument. + final RingBuffer<ProvenanceEventRecord> latestRecords = new RingBuffer<>(1000); + // loop over each entry in the map, persisting the records to the merged file in order, and populating the map // with the next entry from the journal file from which the previous record was written. try (final RecordWriter writer = RecordWriters.newRecordWriter(writerFile, configuration.isCompressOnRollover(), true)) { - writer.writeHeader(); + writer.writeHeader(minEventId); final IndexingAction indexingAction = new IndexingAction(this); @@ -1282,7 +1311,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository indexingAction.index(record, indexWriter, blockIndex); maxId = record.getEventId(); - ringBuffer.add(record); + latestRecords.add(record); records++; // Remove this entry from the map @@ -1307,6 +1336,15 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository indexManager.returnIndexWriter(indexingDirectory, indexWriter); } } + + // record should now be available in the repository. We can copy the values from latestRecords to ringBuffer. + latestRecords.forEach(new ForEachEvaluator<ProvenanceEventRecord>() { + @Override + public boolean evaluate(final ProvenanceEventRecord event) { + ringBuffer.add(event); + return true; + } + }); } finally { for (final RecordReader reader : readers) { try { @@ -1694,7 +1732,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository case FORK: case CLONE: case REPLAY: - return submitLineageComputation(event.getParentUuids(), LineageComputationType.EXPAND_PARENTS, eventId, 0L, event.getEventTime()); + return submitLineageComputation(event.getParentUuids(), LineageComputationType.EXPAND_PARENTS, eventId, event.getLineageStartDate(), event.getEventTime()); default: { final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String>emptyList(), 1); lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/20f11b1a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java index 3095f13..50caee1 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java @@ -86,16 +86,22 @@ public class StandardRecordWriter implements RecordWriter { } @Override - public synchronized void writeHeader() throws IOException { + public synchronized void writeHeader(final long firstEventId) throws IOException { lastBlockOffset = rawOutStream.getBytesWritten(); - resetWriteStream(); + resetWriteStream(firstEventId); out.writeUTF(PersistentProvenanceRepository.class.getName()); out.writeInt(PersistentProvenanceRepository.SERIALIZATION_VERSION); out.flush(); } - private void resetWriteStream() throws IOException { + + /** + * Resets the streams to prepare for a new block + * @param eventId the first id that will be written to the new block + * @throws IOException if unable to flush/close the current streams properly + */ + private void resetWriteStream(final long eventId) throws IOException { if ( out != null ) { out.flush(); } @@ -112,13 +118,13 @@ public class StandardRecordWriter implements RecordWriter { } if ( tocWriter != null ) { - tocWriter.addBlockOffset(rawOutStream.getBytesWritten()); + tocWriter.addBlockOffset(rawOutStream.getBytesWritten(), eventId); } writableStream = new BufferedOutputStream(new GZIPOutputStream(new NonCloseableOutputStream(rawOutStream), 1), 65536); } else { if ( tocWriter != null ) { - tocWriter.addBlockOffset(rawOutStream.getBytesWritten()); + tocWriter.addBlockOffset(rawOutStream.getBytesWritten(), eventId); } writableStream = new BufferedOutputStream(rawOutStream, 65536); @@ -130,7 +136,7 @@ public class StandardRecordWriter implements RecordWriter { @Override - public synchronized long writeRecord(final ProvenanceEventRecord record, long recordIdentifier) throws IOException { + public synchronized long writeRecord(final ProvenanceEventRecord record, final long recordIdentifier) throws IOException { final ProvenanceEventType recordType = record.getEventType(); final long startBytes = byteCountingOut.getBytesWritten(); @@ -142,7 +148,7 @@ public class StandardRecordWriter implements RecordWriter { // because of the way that GZIPOutputStream works, we need to call close() on it in order for it // to write its trailing bytes. But we don't want to close the underlying OutputStream, so we wrap // the underlying OutputStream in a NonCloseableOutputStream - resetWriteStream(); + resetWriteStream(recordIdentifier); } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/20f11b1a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java index d89fd6f..7c9bcc0 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java @@ -28,9 +28,10 @@ public interface RecordWriter extends Closeable { /** * Writes header information to the underlying stream * + * @param firstEventId the ID of the first provenance event that will be written to the stream * @throws IOException if unable to write header information to the underlying stream */ - void writeHeader() throws IOException; + void writeHeader(long firstEventId) throws IOException; /** * Writes the given record out to the underlying stream http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/20f11b1a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java index 7c13a2a..61f86e7 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java @@ -37,6 +37,7 @@ import java.io.IOException; public class StandardTocReader implements TocReader { private final boolean compressed; private final long[] offsets; + private final long[] firstEventIds; public StandardTocReader(final File file) throws IOException { try (final FileInputStream fis = new FileInputStream(file); @@ -60,11 +61,32 @@ public class StandardTocReader implements TocReader { throw new IOException("Table of Contents appears to be corrupt: could not read 'compression flag' from header; expected value of 0 or 1 but got " + compressionFlag); } - final int numBlocks = (int) ((file.length() - 2) / 8); + final int blockInfoBytes; + switch (version) { + case 1: + blockInfoBytes = 8; + break; + case 2: + default: + blockInfoBytes = 16; + break; + } + + final int numBlocks = (int) ((file.length() - 2) / blockInfoBytes); offsets = new long[numBlocks]; + if ( version > 1 ) { + firstEventIds = new long[numBlocks]; + } else { + firstEventIds = new long[0]; + } + for (int i=0; i < numBlocks; i++) { offsets[i] = dis.readLong(); + + if ( version > 1 ) { + firstEventIds[i] = dis.readLong(); + } } } } @@ -98,11 +120,36 @@ public class StandardTocReader implements TocReader { public int getBlockIndex(final long blockOffset) { for (int i=0; i < offsets.length; i++) { if ( offsets[i] > blockOffset ) { - return i-1; + // if the offset is less than the offset of our first block, + // just return 0 to indicate the first block. Otherwise, + // return i-1 because i represents the first block whose offset is + // greater than 'blockOffset'. + return (i == 0) ? 0 : i-1; } } + // None of the blocks have an offset greater than the provided offset. + // Therefore, if the event is present, it must be in the last block. return offsets.length - 1; } + @Override + public Integer getBlockIndexForEventId(final long eventId) { + // if we don't have event ID's stored in the TOC (which happens for version 1 of the TOC), + // or if the event ID is less than the first Event ID in this TOC, then the Event ID + // is unknown -- return null. + if ( firstEventIds.length == 0 || eventId < firstEventIds[0] ) { + return null; + } + + for (int i=1; i < firstEventIds.length; i++) { + if ( firstEventIds[i] > eventId ) { + return i-1; + } + } + + // None of the blocks start with an Event ID greater than the provided ID. + // Therefore, if the event is present, it must be in the last block. + return firstEventIds.length - 1; + } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/20f11b1a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java index 10de459..afa5d13 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java @@ -40,7 +40,7 @@ import org.slf4j.LoggerFactory; public class StandardTocWriter implements TocWriter { private static final Logger logger = LoggerFactory.getLogger(StandardTocWriter.class); - public static final byte VERSION = 1; + public static final byte VERSION = 2; private final File file; private final FileOutputStream fos; @@ -75,10 +75,11 @@ public class StandardTocWriter implements TocWriter { } @Override - public void addBlockOffset(final long offset) throws IOException { + public void addBlockOffset(final long offset, final long firstEventId) throws IOException { final BufferedOutputStream bos = new BufferedOutputStream(fos); final DataOutputStream dos = new DataOutputStream(bos); dos.writeLong(offset); + dos.writeLong(firstEventId); dos.flush(); index++; logger.debug("Adding block {} at offset {}", index, offset); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/20f11b1a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java index 97e2838..f7ddd59 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java @@ -59,4 +59,13 @@ public interface TocReader extends Closeable { * @return the index of the block that contains the given offset */ int getBlockIndex(long blockOffset); + + /** + * Returns the block index where the given event ID should be found + * + * @param eventId the ID of the provenance event of interest + * @return the block index where the given event ID should be found, or <code>null</code> if + * the block index is not known + */ + Integer getBlockIndexForEventId(long eventId); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/20f11b1a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java index 38f910f..90faea1 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java @@ -21,16 +21,19 @@ import java.io.File; import java.io.IOException; /** - * Writes a .toc file + * Writes a Table-of-Contents (.toc) file */ public interface TocWriter extends Closeable { /** * Adds the given block offset as the next Block Offset in the Table of Contents + * * @param offset the byte offset at which the block begins + * @param firstEventId the ID of the first Provenance Event that will be in the block + * * @throws IOException if unable to persist the block index */ - void addBlockOffset(long offset) throws IOException; + void addBlockOffset(long offset, long firstEventId) throws IOException; /** * @return the index of the current Block http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/20f11b1a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java index 136f244..f242642 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java @@ -67,7 +67,7 @@ public class TestStandardRecordReaderWriter { final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false); final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, false, 1024 * 1024); - writer.writeHeader(); + writer.writeHeader(1L); writer.writeRecord(createEvent(), 1L); writer.close(); @@ -77,7 +77,7 @@ public class TestStandardRecordReaderWriter { final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) { assertEquals(0, reader.getBlockIndex()); reader.skipToBlock(0); - StandardProvenanceEventRecord recovered = reader.nextRecord(); + final StandardProvenanceEventRecord recovered = reader.nextRecord(); assertNotNull(recovered); assertEquals("nifi://unit-test", recovered.getTransitUri()); @@ -95,7 +95,7 @@ public class TestStandardRecordReaderWriter { final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false); final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, true, 100); - writer.writeHeader(); + writer.writeHeader(1L); writer.writeRecord(createEvent(), 1L); writer.close(); @@ -105,7 +105,7 @@ public class TestStandardRecordReaderWriter { final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) { assertEquals(0, reader.getBlockIndex()); reader.skipToBlock(0); - StandardProvenanceEventRecord recovered = reader.nextRecord(); + final StandardProvenanceEventRecord recovered = reader.nextRecord(); assertNotNull(recovered); assertEquals("nifi://unit-test", recovered.getTransitUri()); @@ -124,7 +124,7 @@ public class TestStandardRecordReaderWriter { // new record each 1 MB of uncompressed data final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, true, 1024 * 1024); - writer.writeHeader(); + writer.writeHeader(1L); for (int i=0; i < 10; i++) { writer.writeRecord(createEvent(), i); } @@ -143,7 +143,7 @@ public class TestStandardRecordReaderWriter { reader.skipToBlock(0); } - StandardProvenanceEventRecord recovered = reader.nextRecord(); + final StandardProvenanceEventRecord recovered = reader.nextRecord(); assertNotNull(recovered); assertEquals("nifi://unit-test", recovered.getTransitUri()); } @@ -163,7 +163,7 @@ public class TestStandardRecordReaderWriter { // new block each 10 bytes final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, true, 100); - writer.writeHeader(); + writer.writeHeader(1L); for (int i=0; i < 10; i++) { writer.writeRecord(createEvent(), i); } @@ -174,7 +174,7 @@ public class TestStandardRecordReaderWriter { try (final FileInputStream fis = new FileInputStream(journalFile); final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) { for (int i=0; i < 10; i++) { - StandardProvenanceEventRecord recovered = reader.nextRecord(); + final StandardProvenanceEventRecord recovered = reader.nextRecord(); System.out.println(recovered); assertNotNull(recovered); assertEquals((long) i, recovered.getEventId()); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/20f11b1a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java index 87400a0..9a5f424 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java @@ -64,15 +64,42 @@ public class TestStandardTocReader { @Test - public void testGetBlockIndex() throws IOException { + public void testGetBlockIndexV1() throws IOException { final File file = new File("target/" + UUID.randomUUID().toString()); try (final OutputStream out = new FileOutputStream(file); final DataOutputStream dos = new DataOutputStream(out)) { + out.write(1); out.write(0); + + for (int i=0; i < 1024; i++) { + dos.writeLong(i * 1024L); + } + } + + try { + try(final StandardTocReader reader = new StandardTocReader(file)) { + assertFalse(reader.isCompressed()); + + for (int i=0; i < 1024; i++) { + assertEquals(i * 1024, reader.getBlockOffset(i)); + } + } + } finally { + file.delete(); + } + } + + @Test + public void testGetBlockIndexV2() throws IOException { + final File file = new File("target/" + UUID.randomUUID().toString()); + try (final OutputStream out = new FileOutputStream(file); + final DataOutputStream dos = new DataOutputStream(out)) { + out.write(2); out.write(0); for (int i=0; i < 1024; i++) { dos.writeLong(i * 1024L); + dos.writeLong(0L); } }
