http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java new file mode 100644 index 0000000..6f85b94 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance; + +import static org.apache.nifi.provenance.TestUtil.createFlowFile; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import org.apache.nifi.provenance.toc.StandardTocReader; +import org.apache.nifi.provenance.toc.StandardTocWriter; +import org.apache.nifi.provenance.toc.TocReader; +import org.apache.nifi.provenance.toc.TocUtil; +import org.apache.nifi.provenance.toc.TocWriter; +import org.apache.nifi.util.file.FileUtils; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestStandardRecordReaderWriter { + @BeforeClass + public static void setLogLevel() { + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG"); + } + + private ProvenanceEventRecord createEvent() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("filename", "1.txt"); + attributes.put("uuid", UUID.randomUUID().toString()); + + final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); + builder.setEventTime(System.currentTimeMillis()); + builder.setEventType(ProvenanceEventType.RECEIVE); + builder.setTransitUri("nifi://unit-test"); + builder.fromFlowFile(createFlowFile(3L, 3000L, attributes)); + builder.setComponentId("1234"); + builder.setComponentType("dummy processor"); + final ProvenanceEventRecord record = builder.build(); + + return record; + } + + @Test + public void testSimpleWriteWithToc() throws IOException { + final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite"); + final File tocFile = TocUtil.getTocFile(journalFile); + final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false); + final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, false, 1024 * 1024); + + writer.writeHeader(); + writer.writeRecord(createEvent(), 1L); + writer.close(); + + final TocReader tocReader = new StandardTocReader(tocFile); + + try (final FileInputStream fis = new FileInputStream(journalFile); + final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) { + assertEquals(0, reader.getBlockIndex()); + reader.skipToBlock(0); + StandardProvenanceEventRecord recovered = reader.nextRecord(); + assertNotNull(recovered); + + assertEquals("nifi://unit-test", recovered.getTransitUri()); + assertNull(reader.nextRecord()); + } + + FileUtils.deleteFile(journalFile.getParentFile(), true); + } + + + @Test + public void testSingleRecordCompressed() throws IOException { + final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz"); + final File tocFile = TocUtil.getTocFile(journalFile); + final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false); + final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, true, 100); + + writer.writeHeader(); + writer.writeRecord(createEvent(), 1L); + writer.close(); + + final TocReader tocReader = new StandardTocReader(tocFile); + + try (final FileInputStream fis = new FileInputStream(journalFile); + final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) { + assertEquals(0, reader.getBlockIndex()); + reader.skipToBlock(0); + StandardProvenanceEventRecord recovered = reader.nextRecord(); + assertNotNull(recovered); + + assertEquals("nifi://unit-test", recovered.getTransitUri()); + assertNull(reader.nextRecord()); + } + + FileUtils.deleteFile(journalFile.getParentFile(), true); + } + + + @Test + public void testMultipleRecordsSameBlockCompressed() throws IOException { + final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz"); + final File tocFile = TocUtil.getTocFile(journalFile); + final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false); + // new record each 1 MB of uncompressed data + final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, true, 1024 * 1024); + + writer.writeHeader(); + for (int i=0; i < 10; i++) { + writer.writeRecord(createEvent(), i); + } + writer.close(); + + final TocReader tocReader = new StandardTocReader(tocFile); + + try (final FileInputStream fis = new FileInputStream(journalFile); + final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) { + for (int i=0; i < 10; i++) { + assertEquals(0, reader.getBlockIndex()); + + // call skipToBlock half the time to ensure that we can; avoid calling it + // the other half of the time to ensure that it's okay. + if (i <= 5) { + reader.skipToBlock(0); + } + + StandardProvenanceEventRecord recovered = reader.nextRecord(); + assertNotNull(recovered); + assertEquals("nifi://unit-test", recovered.getTransitUri()); + } + + assertNull(reader.nextRecord()); + } + + FileUtils.deleteFile(journalFile.getParentFile(), true); + } + + + @Test + public void testMultipleRecordsMultipleBlocksCompressed() throws IOException { + final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz"); + final File tocFile = TocUtil.getTocFile(journalFile); + final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false); + // new block each 10 bytes + final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, true, 100); + + writer.writeHeader(); + for (int i=0; i < 10; i++) { + writer.writeRecord(createEvent(), i); + } + writer.close(); + + final TocReader tocReader = new StandardTocReader(tocFile); + + try (final FileInputStream fis = new FileInputStream(journalFile); + final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) { + for (int i=0; i < 10; i++) { + StandardProvenanceEventRecord recovered = reader.nextRecord(); + System.out.println(recovered); + assertNotNull(recovered); + assertEquals((long) i, recovered.getEventId()); + assertEquals("nifi://unit-test", recovered.getTransitUri()); + } + + assertNull(reader.nextRecord()); + } + + FileUtils.deleteFile(journalFile.getParentFile(), true); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java new file mode 100644 index 0000000..7459fe8 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.nifi.flowfile.FlowFile; + +public class TestUtil { + public static FlowFile createFlowFile(final long id, final long fileSize, final Map<String, String> attributes) { + final Map<String, String> attrCopy = new HashMap<>(attributes); + + return new FlowFile() { + @Override + public long getId() { + return id; + } + + @Override + public long getEntryDate() { + return System.currentTimeMillis(); + } + + @Override + public Set<String> getLineageIdentifiers() { + return new HashSet<String>(); + } + + @Override + public long getLineageStartDate() { + return System.currentTimeMillis(); + } + + @Override + public Long getLastQueueDate() { + return System.currentTimeMillis(); + } + + @Override + public boolean isPenalized() { + return false; + } + + @Override + public String getAttribute(final String s) { + return attrCopy.get(s); + } + + @Override + public long getSize() { + return fileSize; + } + + @Override + public Map<String, String> getAttributes() { + return attrCopy; + } + + @Override + public int compareTo(final FlowFile o) { + return 0; + } + }; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java new file mode 100644 index 0000000..30326e7 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.toc; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.UUID; + +import org.junit.Test; + +public class TestStandardTocReader { + + @Test + public void testDetectsCompression() throws IOException { + final File file = new File("target/" + UUID.randomUUID().toString()); + try (final OutputStream out = new FileOutputStream(file)) { + out.write(0); + out.write(0); + } + + try { + try(final StandardTocReader reader = new StandardTocReader(file)) { + assertFalse(reader.isCompressed()); + } + } finally { + file.delete(); + } + + + try (final OutputStream out = new FileOutputStream(file)) { + out.write(0); + out.write(1); + } + + try { + try(final StandardTocReader reader = new StandardTocReader(file)) { + assertTrue(reader.isCompressed()); + } + } finally { + file.delete(); + } + } + + + @Test + public void testGetBlockIndex() throws IOException { + final File file = new File("target/" + UUID.randomUUID().toString()); + try (final OutputStream out = new FileOutputStream(file); + final DataOutputStream dos = new DataOutputStream(out)) { + out.write(0); + out.write(0); + + for (int i=0; i < 1024; i++) { + dos.writeLong(i * 1024L); + } + } + + try { + try(final StandardTocReader reader = new StandardTocReader(file)) { + assertFalse(reader.isCompressed()); + + for (int i=0; i < 1024; i++) { + assertEquals(i * 1024, reader.getBlockOffset(i)); + } + } + } finally { + file.delete(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocWriter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocWriter.java new file mode 100644 index 0000000..70f55a2 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocWriter.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.toc; + +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.util.UUID; + +import org.apache.nifi.util.file.FileUtils; +import org.junit.Test; + +public class TestStandardTocWriter { + @Test + public void testOverwriteEmptyFile() throws IOException { + final File tocFile = new File("target/" + UUID.randomUUID().toString() + ".toc"); + try { + assertTrue( tocFile.createNewFile() ); + + try (final StandardTocWriter writer = new StandardTocWriter(tocFile, false, false)) { + } + } finally { + FileUtils.deleteFile(tocFile, false); + } + } + +}
