http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java index 8944cec..7c13a2a 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java @@ -24,9 +24,9 @@ import java.io.IOException; /** * Standard implementation of TocReader. - * + * * Expects .toc file to be in the following format; - * + * * byte 0: version * byte 1: boolean: compressionFlag -> 0 = journal is NOT compressed, 1 = journal is compressed * byte 2-9: long: offset of block 0 @@ -37,21 +37,21 @@ import java.io.IOException; public class StandardTocReader implements TocReader { private final boolean compressed; private final long[] offsets; - + public StandardTocReader(final File file) throws IOException { try (final FileInputStream fis = new FileInputStream(file); - final DataInputStream dis = new DataInputStream(fis)) { - + final DataInputStream dis = new DataInputStream(fis)) { + final int version = dis.read(); if ( version < 0 ) { throw new EOFException(); } - + final int compressionFlag = dis.read(); if ( compressionFlag < 0 ) { throw new EOFException(); } - + if ( compressionFlag == 0 ) { compressed = false; } else if ( compressionFlag == 1 ) { @@ -59,21 +59,21 @@ public class StandardTocReader implements TocReader { } else { throw new IOException("Table of Contents appears to be corrupt: could not read 'compression flag' from header; expected value of 0 or 1 but got " + compressionFlag); } - + final int numBlocks = (int) ((file.length() - 2) / 8); offsets = new long[numBlocks]; - + for (int i=0; i < numBlocks; i++) { offsets[i] = dis.readLong(); } } } - + @Override public boolean isCompressed() { return compressed; } - + @Override public long getBlockOffset(final int blockIndex) { if ( blockIndex >= offsets.length ) { @@ -89,20 +89,20 @@ public class StandardTocReader implements TocReader { } return offsets[offsets.length - 1]; } - + @Override public void close() throws IOException { } - @Override - public int getBlockIndex(final long blockOffset) { - for (int i=0; i < offsets.length; i++) { - if ( offsets[i] > blockOffset ) { - return i-1; - } - } - - return offsets.length - 1; - } + @Override + public int getBlockIndex(final long blockOffset) { + for (int i=0; i < offsets.length; i++) { + if ( offsets[i] > blockOffset ) { + return i-1; + } + } + + return offsets.length - 1; + } }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java index 488f225..10de459 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java @@ -19,7 +19,6 @@ package org.apache.nifi.provenance.toc; import java.io.BufferedOutputStream; import java.io.DataOutputStream; import java.io.File; -import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.nio.file.Files; @@ -29,7 +28,7 @@ import org.slf4j.LoggerFactory; /** * Standard implementation of {@link TocWriter}. - * + * * Format of .toc file: * byte 0: version * byte 1: compressed: 0 -> not compressed, 1 -> compressed @@ -39,27 +38,27 @@ import org.slf4j.LoggerFactory; * byte (N*8+2)-(N*8+9): long: offset of block N */ public class StandardTocWriter implements TocWriter { - private static final Logger logger = LoggerFactory.getLogger(StandardTocWriter.class); - + private static final Logger logger = LoggerFactory.getLogger(StandardTocWriter.class); + public static final byte VERSION = 1; - + private final File file; private final FileOutputStream fos; private final boolean alwaysSync; private int index = -1; - + /** * Creates a StandardTocWriter that writes to the given file. * @param file the file to write to * @param compressionFlag whether or not the journal is compressed - * @throws FileNotFoundException + * @throws IOException if unable to write header info to the specified file */ public StandardTocWriter(final File file, final boolean compressionFlag, final boolean alwaysSync) throws IOException { final File tocDir = file.getParentFile(); if ( !tocDir.exists() ) { - Files.createDirectories(tocDir.toPath()); + Files.createDirectories(tocDir.toPath()); } - + this.file = file; fos = new FileOutputStream(file); this.alwaysSync = alwaysSync; @@ -69,12 +68,12 @@ public class StandardTocWriter implements TocWriter { header[1] = (byte) (compressionFlag ? 1 : 0); fos.write(header); fos.flush(); - + if ( alwaysSync ) { sync(); } } - + @Override public void addBlockOffset(final long offset) throws IOException { final BufferedOutputStream bos = new BufferedOutputStream(fos); @@ -83,17 +82,17 @@ public class StandardTocWriter implements TocWriter { dos.flush(); index++; logger.debug("Adding block {} at offset {}", index, offset); - + if ( alwaysSync ) { sync(); } } - + @Override public void sync() throws IOException { - fos.getFD().sync(); + fos.getFD().sync(); } - + @Override public int getCurrentBlockIndex() { return index; @@ -104,15 +103,15 @@ public class StandardTocWriter implements TocWriter { if (alwaysSync) { fos.getFD().sync(); } - + fos.close(); } - + @Override public File getFile() { return file; } - + @Override public String toString() { return "TOC Writer for " + file; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java index 7c197be..97e2838 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java @@ -32,27 +32,31 @@ public interface TocReader extends Closeable { /** * Indicates whether or not the corresponding Journal file is compressed - * @return + * @return <code>true</code> if the event file is compressed */ boolean isCompressed(); /** * Returns the byte offset into the Journal File for the Block with the given index. - * @param blockIndex - * @return + * + * @param blockIndex the block index to get the byte offset for + * @return the byte offset for the given block index, or <code>-1</code> if the given block index + * does not exist */ long getBlockOffset(int blockIndex); - + /** * Returns the byte offset into the Journal File of the last Block in the given index - * @return + * @return the byte offset into the Journal File of the last Block in the given index */ long getLastBlockOffset(); - + /** * Returns the index of the block that contains the given offset - * @param blockOffset - * @return + * + * @param blockOffset the byte offset for which the block index is desired + * + * @return the index of the block that contains the given offset */ int getBlockIndex(long blockOffset); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocUtil.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocUtil.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocUtil.java index c30ac98..3fa7d67 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocUtil.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocUtil.java @@ -22,16 +22,19 @@ import org.apache.nifi.provenance.lucene.LuceneUtil; public class TocUtil { - /** - * Returns the file that should be used as the Table of Contents for the given Journal File - * @param journalFile - * @return - */ - public static File getTocFile(final File journalFile) { - final File tocDir = new File(journalFile.getParentFile(), "toc"); - final String basename = LuceneUtil.substringBefore(journalFile.getName(), "."); - final File tocFile = new File(tocDir, basename + ".toc"); - return tocFile; - } - + /** + * Returns the file that should be used as the Table of Contents for the given Journal File. + * Note, if no TOC exists for the given Journal File, a File will still be returned but the file + * will not actually exist. + * + * @param journalFile the journal file for which to get the Table of Contents + * @return the file that represents the Table of Contents for the specified journal file. + */ + public static File getTocFile(final File journalFile) { + final File tocDir = new File(journalFile.getParentFile(), "toc"); + final String basename = LuceneUtil.substringBefore(journalFile.getName(), "."); + final File tocFile = new File(tocDir, basename + ".toc"); + return tocFile; + } + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java index c678053..38f910f 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java @@ -27,26 +27,24 @@ public interface TocWriter extends Closeable { /** * Adds the given block offset as the next Block Offset in the Table of Contents - * @param offset - * @throws IOException + * @param offset the byte offset at which the block begins + * @throws IOException if unable to persist the block index */ void addBlockOffset(long offset) throws IOException; - + /** - * Returns the index of the current Block - * @return + * @return the index of the current Block */ int getCurrentBlockIndex(); - + /** - * Returns the file that is currently being written to - * @return + * @return the file that is currently being written to */ File getFile(); /** * Synchronizes the data with the underlying storage device - * @throws IOException + * @throws IOException if unable to synchronize the data with the underlying storage device */ void sync() throws IOException; } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java index 5541ab5..7d97bcd 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java @@ -75,7 +75,7 @@ public class TestPersistentProvenanceRepository { private PersistentProvenanceRepository repo; private RepositoryConfiguration config; - + public static final int DEFAULT_ROLLOVER_MILLIS = 2000; private RepositoryConfiguration createConfiguration() { @@ -89,9 +89,9 @@ public class TestPersistentProvenanceRepository { @BeforeClass public static void setLogLevel() { - System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG"); + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG"); } - + @Before public void printTestName() { System.out.println("\n\n\n*********************** " + name.getMethodName() + " *****************************"); @@ -105,33 +105,33 @@ public class TestPersistentProvenanceRepository { } catch (final IOException ioe) { } } - + // Delete all of the storage files. We do this in order to clean up the tons of files that // we create but also to ensure that we have closed all of the file handles. If we leave any // streams open, for instance, this will throw an IOException, causing our unit test to fail. for ( final File storageDir : config.getStorageDirectories() ) { - int i; - for (i=0; i < 3; i++) { - try { - FileUtils.deleteFile(storageDir, true); - break; - } catch (final IOException ioe) { - // if there is a virus scanner, etc. running in the background we may not be able to - // delete the file. Wait a sec and try again. - if ( i == 2 ) { - throw ioe; - } else { - try { - Thread.sleep(1000L); - } catch (final InterruptedException ie) { - } - } - } - } + int i; + for (i=0; i < 3; i++) { + try { + FileUtils.deleteFile(storageDir, true); + break; + } catch (final IOException ioe) { + // if there is a virus scanner, etc. running in the background we may not be able to + // delete the file. Wait a sec and try again. + if ( i == 2 ) { + throw ioe; + } else { + try { + Thread.sleep(1000L); + } catch (final InterruptedException ie) { + } + } + } + } } } - + private EventReporter getEventReporter() { return new EventReporter() { @@ -241,7 +241,7 @@ public class TestPersistentProvenanceRepository { } Thread.sleep(1000L); - + repo.close(); Thread.sleep(500L); // Give the repo time to shutdown (i.e., close all file handles, etc.) @@ -431,7 +431,7 @@ public class TestPersistentProvenanceRepository { repo.waitForRollover(); final Query query = new Query(UUID.randomUUID().toString()); -// query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.FlowFileUUID, "00000000-0000-0000-0000*")); + // query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.FlowFileUUID, "00000000-0000-0000-0000*")); query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.Filename, "file-*")); query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "12?4")); query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.TransitURI, "nifi://*")); @@ -905,14 +905,14 @@ public class TestPersistentProvenanceRepository { secondRepo.initialize(getEventReporter()); try { - final ProvenanceEventRecord event11 = builder.build(); - secondRepo.registerEvent(event11); - secondRepo.waitForRollover(); - final ProvenanceEventRecord event11Retrieved = secondRepo.getEvent(10L); - assertNotNull(event11Retrieved); - assertEquals(10, event11Retrieved.getEventId()); + final ProvenanceEventRecord event11 = builder.build(); + secondRepo.registerEvent(event11); + secondRepo.waitForRollover(); + final ProvenanceEventRecord event11Retrieved = secondRepo.getEvent(10L); + assertNotNull(event11Retrieved); + assertEquals(10, event11Retrieved.getEventId()); } finally { - secondRepo.close(); + secondRepo.close(); } } @@ -983,26 +983,26 @@ public class TestPersistentProvenanceRepository { storageDirFiles = config.getStorageDirectories().get(0).listFiles(indexFileFilter); assertEquals(0, storageDirFiles.length); } - - + + @Test public void testBackPressure() throws IOException, InterruptedException { final RepositoryConfiguration config = createConfiguration(); - config.setMaxEventFileCapacity(1L); // force rollover on each record. + config.setMaxEventFileCapacity(1L); // force rollover on each record. config.setJournalCount(1); - + final AtomicInteger journalCountRef = new AtomicInteger(0); - - repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) { - @Override - protected int getJournalCount() { - return journalCountRef.get(); - } - }; + + repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) { + @Override + protected int getJournalCount() { + return journalCountRef.get(); + } + }; repo.initialize(getEventReporter()); - final Map<String, String> attributes = new HashMap<>(); - final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); + final Map<String, String> attributes = new HashMap<>(); + final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); builder.setEventTime(System.currentTimeMillis()); builder.setEventType(ProvenanceEventType.RECEIVE); builder.setTransitUri("nifi://unit-test"); @@ -1023,31 +1023,31 @@ public class TestPersistentProvenanceRepository { final AtomicLong threadNanos = new AtomicLong(0L); final Thread t = new Thread(new Runnable() { - @Override - public void run() { - final long start = System.nanoTime(); - builder.fromFlowFile(createFlowFile(13, 3000L, attributes)); - attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + 13); - repo.registerEvent(builder.build()); - threadNanos.set(System.nanoTime() - start); - } + @Override + public void run() { + final long start = System.nanoTime(); + builder.fromFlowFile(createFlowFile(13, 3000L, attributes)); + attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + 13); + repo.registerEvent(builder.build()); + threadNanos.set(System.nanoTime() - start); + } }); t.start(); Thread.sleep(1500L); - + journalCountRef.set(1); t.join(); - + final int threadMillis = (int) TimeUnit.NANOSECONDS.toMillis(threadNanos.get()); - assertTrue(threadMillis > 1200); // use 1200 to account for the fact that the timing is not exact - + assertTrue(threadMillis > 1200); // use 1200 to account for the fact that the timing is not exact + builder.fromFlowFile(createFlowFile(15, 3000L, attributes)); attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + 15); repo.registerEvent(builder.build()); } - - + + // TODO: test EOF on merge // TODO: Test journal with no records http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java index 6f85b94..136f244 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java @@ -40,15 +40,15 @@ import org.junit.Test; public class TestStandardRecordReaderWriter { @BeforeClass public static void setLogLevel() { - System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG"); + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG"); } - private ProvenanceEventRecord createEvent() { - final Map<String, String> attributes = new HashMap<>(); - attributes.put("filename", "1.txt"); + private ProvenanceEventRecord createEvent() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("filename", "1.txt"); attributes.put("uuid", UUID.randomUUID().toString()); - final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); + final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); builder.setEventTime(System.currentTimeMillis()); builder.setEventType(ProvenanceEventType.RECEIVE); builder.setTransitUri("nifi://unit-test"); @@ -58,132 +58,132 @@ public class TestStandardRecordReaderWriter { final ProvenanceEventRecord record = builder.build(); return record; - } - - @Test - public void testSimpleWriteWithToc() throws IOException { + } + + @Test + public void testSimpleWriteWithToc() throws IOException { final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite"); final File tocFile = TocUtil.getTocFile(journalFile); final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false); final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, false, 1024 * 1024); - + writer.writeHeader(); writer.writeRecord(createEvent(), 1L); writer.close(); final TocReader tocReader = new StandardTocReader(tocFile); - + try (final FileInputStream fis = new FileInputStream(journalFile); - final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) { - assertEquals(0, reader.getBlockIndex()); - reader.skipToBlock(0); - StandardProvenanceEventRecord recovered = reader.nextRecord(); - assertNotNull(recovered); - - assertEquals("nifi://unit-test", recovered.getTransitUri()); - assertNull(reader.nextRecord()); + final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) { + assertEquals(0, reader.getBlockIndex()); + reader.skipToBlock(0); + StandardProvenanceEventRecord recovered = reader.nextRecord(); + assertNotNull(recovered); + + assertEquals("nifi://unit-test", recovered.getTransitUri()); + assertNull(reader.nextRecord()); } - + FileUtils.deleteFile(journalFile.getParentFile(), true); - } - - - @Test - public void testSingleRecordCompressed() throws IOException { + } + + + @Test + public void testSingleRecordCompressed() throws IOException { final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz"); final File tocFile = TocUtil.getTocFile(journalFile); final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false); final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, true, 100); - + writer.writeHeader(); writer.writeRecord(createEvent(), 1L); writer.close(); final TocReader tocReader = new StandardTocReader(tocFile); - + try (final FileInputStream fis = new FileInputStream(journalFile); - final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) { - assertEquals(0, reader.getBlockIndex()); - reader.skipToBlock(0); - StandardProvenanceEventRecord recovered = reader.nextRecord(); - assertNotNull(recovered); - - assertEquals("nifi://unit-test", recovered.getTransitUri()); - assertNull(reader.nextRecord()); + final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) { + assertEquals(0, reader.getBlockIndex()); + reader.skipToBlock(0); + StandardProvenanceEventRecord recovered = reader.nextRecord(); + assertNotNull(recovered); + + assertEquals("nifi://unit-test", recovered.getTransitUri()); + assertNull(reader.nextRecord()); } - + FileUtils.deleteFile(journalFile.getParentFile(), true); - } - - - @Test - public void testMultipleRecordsSameBlockCompressed() throws IOException { + } + + + @Test + public void testMultipleRecordsSameBlockCompressed() throws IOException { final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz"); final File tocFile = TocUtil.getTocFile(journalFile); final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false); // new record each 1 MB of uncompressed data final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, true, 1024 * 1024); - + writer.writeHeader(); for (int i=0; i < 10; i++) { - writer.writeRecord(createEvent(), i); + writer.writeRecord(createEvent(), i); } writer.close(); final TocReader tocReader = new StandardTocReader(tocFile); - + try (final FileInputStream fis = new FileInputStream(journalFile); - final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) { - for (int i=0; i < 10; i++) { - assertEquals(0, reader.getBlockIndex()); - - // call skipToBlock half the time to ensure that we can; avoid calling it - // the other half of the time to ensure that it's okay. - if (i <= 5) { - reader.skipToBlock(0); - } - - StandardProvenanceEventRecord recovered = reader.nextRecord(); - assertNotNull(recovered); - assertEquals("nifi://unit-test", recovered.getTransitUri()); - } - - assertNull(reader.nextRecord()); + final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) { + for (int i=0; i < 10; i++) { + assertEquals(0, reader.getBlockIndex()); + + // call skipToBlock half the time to ensure that we can; avoid calling it + // the other half of the time to ensure that it's okay. + if (i <= 5) { + reader.skipToBlock(0); + } + + StandardProvenanceEventRecord recovered = reader.nextRecord(); + assertNotNull(recovered); + assertEquals("nifi://unit-test", recovered.getTransitUri()); + } + + assertNull(reader.nextRecord()); } - + FileUtils.deleteFile(journalFile.getParentFile(), true); - } - - - @Test - public void testMultipleRecordsMultipleBlocksCompressed() throws IOException { + } + + + @Test + public void testMultipleRecordsMultipleBlocksCompressed() throws IOException { final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz"); final File tocFile = TocUtil.getTocFile(journalFile); final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false); // new block each 10 bytes final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, true, 100); - + writer.writeHeader(); for (int i=0; i < 10; i++) { - writer.writeRecord(createEvent(), i); + writer.writeRecord(createEvent(), i); } writer.close(); final TocReader tocReader = new StandardTocReader(tocFile); - + try (final FileInputStream fis = new FileInputStream(journalFile); - final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) { - for (int i=0; i < 10; i++) { - StandardProvenanceEventRecord recovered = reader.nextRecord(); - System.out.println(recovered); - assertNotNull(recovered); - assertEquals((long) i, recovered.getEventId()); - assertEquals("nifi://unit-test", recovered.getTransitUri()); - } - - assertNull(reader.nextRecord()); + final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) { + for (int i=0; i < 10; i++) { + StandardProvenanceEventRecord recovered = reader.nextRecord(); + System.out.println(recovered); + assertNotNull(recovered); + assertEquals((long) i, recovered.getEventId()); + assertEquals("nifi://unit-test", recovered.getTransitUri()); + } + + assertNull(reader.nextRecord()); } - + FileUtils.deleteFile(journalFile.getParentFile(), true); - } + } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java index 7459fe8..eb0f736 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java @@ -24,7 +24,7 @@ import java.util.Set; import org.apache.nifi.flowfile.FlowFile; public class TestUtil { - public static FlowFile createFlowFile(final long id, final long fileSize, final Map<String, String> attributes) { + public static FlowFile createFlowFile(final long id, final long fileSize, final Map<String, String> attributes) { final Map<String, String> attrCopy = new HashMap<>(attributes); return new FlowFile() { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java index 30326e7..87400a0 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java @@ -38,7 +38,7 @@ public class TestStandardTocReader { out.write(0); out.write(0); } - + try { try(final StandardTocReader reader = new StandardTocReader(file)) { assertFalse(reader.isCompressed()); @@ -46,13 +46,13 @@ public class TestStandardTocReader { } finally { file.delete(); } - - + + try (final OutputStream out = new FileOutputStream(file)) { out.write(0); out.write(1); } - + try { try(final StandardTocReader reader = new StandardTocReader(file)) { assertTrue(reader.isCompressed()); @@ -61,25 +61,25 @@ public class TestStandardTocReader { file.delete(); } } - - + + @Test public void testGetBlockIndex() throws IOException { final File file = new File("target/" + UUID.randomUUID().toString()); try (final OutputStream out = new FileOutputStream(file); - final DataOutputStream dos = new DataOutputStream(out)) { + final DataOutputStream dos = new DataOutputStream(out)) { out.write(0); out.write(0); - + for (int i=0; i < 1024; i++) { dos.writeLong(i * 1024L); } } - + try { try(final StandardTocReader reader = new StandardTocReader(file)) { assertFalse(reader.isCompressed()); - + for (int i=0; i < 1024; i++) { assertEquals(i * 1024, reader.getBlockOffset(i)); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocWriter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocWriter.java index 70f55a2..aebe0d5 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocWriter.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocWriter.java @@ -31,12 +31,12 @@ public class TestStandardTocWriter { final File tocFile = new File("target/" + UUID.randomUUID().toString() + ".toc"); try { assertTrue( tocFile.createNewFile() ); - + try (final StandardTocWriter writer = new StandardTocWriter(tocFile, false, false)) { } } finally { FileUtils.deleteFile(tocFile, false); } } - + }
