Repository: nifi-minifi Updated Branches: refs/heads/master 66dbda90c -> fb5548198
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/fb554819/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestMiNiFiPersistentProvenanceRepository.java ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestMiNiFiPersistentProvenanceRepository.java b/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestMiNiFiPersistentProvenanceRepository.java new file mode 100644 index 0000000..118a2a6 --- /dev/null +++ b/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestMiNiFiPersistentProvenanceRepository.java @@ -0,0 +1,691 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance; + +import static org.apache.nifi.provenance.TestUtil.createFlowFile; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.zip.GZIPOutputStream; + +import org.apache.lucene.queryparser.classic.ParseException; +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.provenance.MiNiFiPersistentProvenanceRepository.MethodNotSupportedException; +import org.apache.nifi.provenance.serialization.RecordReader; +import org.apache.nifi.provenance.serialization.RecordReaders; +import org.apache.nifi.provenance.serialization.RecordWriter; +import org.apache.nifi.reporting.Severity; +import org.apache.nifi.stream.io.DataOutputStream; +import org.apache.nifi.util.file.FileUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +public class TestMiNiFiPersistentProvenanceRepository { + + @Rule + public TestName name = new TestName(); + + private MiNiFiPersistentProvenanceRepository repo; + private RepositoryConfiguration config; + + public static final int DEFAULT_ROLLOVER_MILLIS = 2000; + private EventReporter eventReporter; + private List<ReportedEvent> reportedEvents = Collections.synchronizedList(new ArrayList<ReportedEvent>()); + + private RepositoryConfiguration createConfiguration() { + config = new RepositoryConfiguration(); + config.addStorageDirectory(new File("target/storage/" + UUID.randomUUID().toString())); + config.setCompressOnRollover(true); + config.setMaxEventFileLife(2000L, TimeUnit.SECONDS); + config.setCompressionBlockBytes(100); + return config; + } + + @BeforeClass + public static void setLogLevel() { + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG"); + } + + @Before + public void printTestName() { + reportedEvents.clear(); + eventReporter = new EventReporter() { + private static final long serialVersionUID = 1L; + + @Override + public void reportEvent(Severity severity, String category, String message) { + reportedEvents.add(new ReportedEvent(severity, category, message)); + System.out.println(severity + " : " + category + " : " + message); + } + }; + } + + @After + public void closeRepo() throws IOException { + if (repo != null) { + try { + repo.close(); + } catch (final IOException ioe) { + } + } + + // Delete all of the storage files. We do this in order to clean up the tons of files that + // we create but also to ensure that we have closed all of the file handles. If we leave any + // streams open, for instance, this will throw an IOException, causing our unit test to fail. + for (final File storageDir : config.getStorageDirectories()) { + int i; + for (i = 0; i < 3; i++) { + try { + FileUtils.deleteFile(storageDir, true); + break; + } catch (final IOException ioe) { + // if there is a virus scanner, etc. running in the background we may not be able to + // delete the file. Wait a sec and try again. + if (i == 2) { + throw ioe; + } else { + try { + Thread.sleep(1000L); + } catch (final InterruptedException ie) { + } + } + } + } + } + } + + + + private EventReporter getEventReporter() { + return eventReporter; + } + + + @Test + public void testAddAndRecover() throws IOException, InterruptedException { + final RepositoryConfiguration config = createConfiguration(); + config.setMaxEventFileCapacity(1L); + config.setMaxEventFileLife(1, TimeUnit.SECONDS); + repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); + repo.initialize(getEventReporter()); + + final Map<String, String> attributes = new HashMap<>(); + attributes.put("abc", "xyz"); + attributes.put("xyz", "abc"); + attributes.put("uuid", UUID.randomUUID().toString()); + + final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); + builder.setEventTime(System.currentTimeMillis()); + builder.setEventType(ProvenanceEventType.RECEIVE); + builder.setTransitUri("nifi://unit-test"); + builder.fromFlowFile(createFlowFile(3L, 3000L, attributes)); + builder.setComponentId("1234"); + builder.setComponentType("dummy processor"); + final ProvenanceEventRecord record = builder.build(); + + for (int i = 0; i < 10; i++) { + repo.registerEvent(record); + } + + Assert.assertEquals("Did not establish the correct, Max Event Id", 9, repo.getMaxEventId().intValue()); + + Thread.sleep(1000L); + + repo.close(); + Thread.sleep(500L); // Give the repo time to shutdown (i.e., close all file handles, etc.) + + repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); + repo.initialize(getEventReporter()); + final List<ProvenanceEventRecord> recoveredRecords = repo.getEvents(0L, 12); + + Assert.assertEquals("Did not establish the correct, Max Event Id through recovery after reloading", 9, repo.getMaxEventId().intValue()); + + assertEquals(10, recoveredRecords.size()); + for (int i = 0; i < 10; i++) { + final ProvenanceEventRecord recovered = recoveredRecords.get(i); + assertEquals(i, recovered.getEventId()); + assertEquals("nifi://unit-test", recovered.getTransitUri()); + assertEquals(ProvenanceEventType.RECEIVE, recovered.getEventType()); + assertEquals(attributes, recovered.getAttributes()); + } + } + + + @Test + public void testCompressOnRollover() throws IOException, InterruptedException, ParseException { + final RepositoryConfiguration config = createConfiguration(); + config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS); + config.setCompressOnRollover(true); + repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); + repo.initialize(getEventReporter()); + + final String uuid = "00000000-0000-0000-0000-000000000000"; + final Map<String, String> attributes = new HashMap<>(); + attributes.put("abc", "xyz"); + attributes.put("xyz", "abc"); + attributes.put("filename", "file-" + uuid); + + final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); + builder.setEventTime(System.currentTimeMillis()); + builder.setEventType(ProvenanceEventType.RECEIVE); + builder.setTransitUri("nifi://unit-test"); + attributes.put("uuid", uuid); + builder.fromFlowFile(createFlowFile(3L, 3000L, attributes)); + builder.setComponentId("1234"); + builder.setComponentType("dummy processor"); + + for (int i = 0; i < 10; i++) { + builder.fromFlowFile(createFlowFile(i, 3000L, attributes)); + repo.registerEvent(builder.build()); + } + + repo.waitForRollover(); + final File storageDir = config.getStorageDirectories().get(0); + final File compressedLogFile = new File(storageDir, "0.prov.gz"); + assertTrue(compressedLogFile.exists()); + } + + @Test(expected = MethodNotSupportedException.class) + public void testLineageRequestNotSupported() throws IOException, InterruptedException, ParseException { + final RepositoryConfiguration config = createConfiguration(); + config.setMaxRecordLife(3, TimeUnit.SECONDS); + config.setMaxStorageCapacity(1024L * 1024L); + config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS); + config.setMaxEventFileCapacity(1024L * 1024L); + config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields())); + + repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); + repo.initialize(getEventReporter()); + + final String uuid = "00000000-0000-0000-0000-000000000001"; + final Map<String, String> attributes = new HashMap<>(); + attributes.put("abc", "xyz"); + attributes.put("uuid", uuid); + attributes.put("filename", "file-" + uuid); + + repo.submitLineageComputation(uuid); + } + + + @Test + public void testCorrectProvenanceEventIdOnRestore() throws IOException { + final RepositoryConfiguration config = createConfiguration(); + config.setMaxEventFileLife(1, TimeUnit.SECONDS); + repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); + repo.initialize(getEventReporter()); + + final String uuid = "00000000-0000-0000-0000-000000000000"; + final Map<String, String> attributes = new HashMap<>(); + attributes.put("abc", "xyz"); + attributes.put("xyz", "abc"); + attributes.put("filename", "file-" + uuid); + + final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); + builder.setEventTime(System.currentTimeMillis()); + builder.setEventType(ProvenanceEventType.RECEIVE); + builder.setTransitUri("nifi://unit-test"); + attributes.put("uuid", uuid); + builder.fromFlowFile(createFlowFile(3L, 3000L, attributes)); + builder.setComponentId("1234"); + builder.setComponentType("dummy processor"); + + for (int i = 0; i < 10; i++) { + builder.fromFlowFile(createFlowFile(i, 3000L, attributes)); + attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i); + repo.registerEvent(builder.build()); + } + + repo.close(); + + final MiNiFiPersistentProvenanceRepository secondRepo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); + secondRepo.initialize(getEventReporter()); + + try { + final ProvenanceEventRecord event11 = builder.build(); + secondRepo.registerEvent(event11); + secondRepo.waitForRollover(); + final ProvenanceEventRecord event11Retrieved = secondRepo.getEvent(10L); + assertNotNull(event11Retrieved); + assertEquals(10, event11Retrieved.getEventId()); + Assert.assertEquals(10, secondRepo.getMaxEventId().intValue()); + } finally { + secondRepo.close(); + } + } + + /** + * Here the event file is simply corrupted by virtue of not having any event + * records while having correct headers + */ + @Test + public void testWithWithEventFileMissingRecord() throws Exception { + File eventFile = this.prepCorruptedEventFileTests(); + + DataOutputStream in = new DataOutputStream(new GZIPOutputStream(new FileOutputStream(eventFile))); + in.writeUTF("BlahBlah"); + in.writeInt(4); + in.close(); + assertTrue(eventFile.exists()); + + final List<ProvenanceEventRecord> events = repo.getEvents(0, 100); + assertEquals(10, events.size()); + } + + /** + * Here the event file is simply corrupted by virtue of being empty (0 + * bytes) + */ + @Test + public void testWithWithEventFileCorrupted() throws Exception { + File eventFile = this.prepCorruptedEventFileTests(); + + DataOutputStream in = new DataOutputStream(new GZIPOutputStream(new FileOutputStream(eventFile))); + in.close(); + + final List<ProvenanceEventRecord> events = repo.getEvents(0, 100); + assertEquals(10, events.size()); + } + + private File prepCorruptedEventFileTests() throws Exception { + RepositoryConfiguration config = createConfiguration(); + config.setMaxStorageCapacity(1024L * 1024L); + config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS); + config.setMaxEventFileCapacity(1024L * 1024L); + config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields())); + config.setDesiredIndexSize(10); + + repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); + repo.initialize(getEventReporter()); + + String uuid = UUID.randomUUID().toString(); + for (int i = 0; i < 20; i++) { + ProvenanceEventRecord record = repo.eventBuilder().fromFlowFile(mock(FlowFile.class)) + .setEventType(ProvenanceEventType.CREATE).setComponentId("foo-" + i).setComponentType("myComponent") + .setFlowFileUUID(uuid).build(); + repo.registerEvent(record); + if (i == 9) { + repo.waitForRollover(); + Thread.sleep(2000L); + } + } + repo.waitForRollover(); + File eventFile = new File(config.getStorageDirectories().get(0), "10.prov.gz"); + assertTrue(eventFile.delete()); + return eventFile; + } + + @Test + public void testBackPressure() throws IOException, InterruptedException { + final RepositoryConfiguration config = createConfiguration(); + config.setMaxEventFileCapacity(1L); // force rollover on each record. + config.setJournalCount(1); + + final AtomicInteger journalCountRef = new AtomicInteger(0); + + repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) { + @Override + protected int getJournalCount() { + return journalCountRef.get(); + } + }; + repo.initialize(getEventReporter()); + + final Map<String, String> attributes = new HashMap<>(); + final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); + builder.setEventTime(System.currentTimeMillis()); + builder.setEventType(ProvenanceEventType.RECEIVE); + builder.setTransitUri("nifi://unit-test"); + attributes.put("uuid", UUID.randomUUID().toString()); + builder.fromFlowFile(createFlowFile(3L, 3000L, attributes)); + builder.setComponentId("1234"); + builder.setComponentType("dummy processor"); + + // ensure that we can register the events. + for (int i = 0; i < 10; i++) { + builder.fromFlowFile(createFlowFile(i, 3000L, attributes)); + attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i); + repo.registerEvent(builder.build()); + } + + // set number of journals to 6 so that we will block. + journalCountRef.set(6); + + final AtomicLong threadNanos = new AtomicLong(0L); + final Thread t = new Thread(new Runnable() { + @Override + public void run() { + final long start = System.nanoTime(); + builder.fromFlowFile(createFlowFile(13, 3000L, attributes)); + attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + 13); + repo.registerEvent(builder.build()); + threadNanos.set(System.nanoTime() - start); + } + }); + t.start(); + + Thread.sleep(1500L); + + journalCountRef.set(1); + t.join(); + + final int threadMillis = (int) TimeUnit.NANOSECONDS.toMillis(threadNanos.get()); + assertTrue(threadMillis > 1200); // use 1200 to account for the fact that the timing is not exact + + builder.fromFlowFile(createFlowFile(15, 3000L, attributes)); + attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + 15); + repo.registerEvent(builder.build()); + } + + @Test + public void testMergeJournals() throws IOException, InterruptedException { + final RepositoryConfiguration config = createConfiguration(); + config.setMaxEventFileLife(3, TimeUnit.SECONDS); + repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); + repo.initialize(getEventReporter()); + + final Map<String, String> attributes = new HashMap<>(); + + final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); + builder.setEventTime(System.currentTimeMillis()); + builder.setEventType(ProvenanceEventType.RECEIVE); + builder.setTransitUri("nifi://unit-test"); + attributes.put("uuid", "12345678-0000-0000-0000-012345678912"); + builder.fromFlowFile(createFlowFile(3L, 3000L, attributes)); + builder.setComponentId("1234"); + builder.setComponentType("dummy processor"); + + final ProvenanceEventRecord record = builder.build(); + + final ExecutorService exec = Executors.newFixedThreadPool(10); + for (int i = 0; i < 10000; i++) { + exec.submit(new Runnable() { + @Override + public void run() { + repo.registerEvent(record); + } + }); + } + + repo.waitForRollover(); + + final File storageDir = config.getStorageDirectories().get(0); + long counter = 0; + for (final File file : storageDir.listFiles()) { + if (file.isFile()) { + + try (RecordReader reader = RecordReaders.newRecordReader(file, null, 2048)) { + ProvenanceEventRecord r = null; + + while ((r = reader.nextRecord()) != null) { + assertEquals(counter++, r.getEventId()); + } + } + } + } + + assertEquals(10000, counter); + } + + @Test + public void testTruncateAttributes() throws IOException, InterruptedException { + final RepositoryConfiguration config = createConfiguration(); + config.setMaxAttributeChars(50); + config.setMaxEventFileLife(3, TimeUnit.SECONDS); + repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); + repo.initialize(getEventReporter()); + + final Map<String, String> attributes = new HashMap<>(); + attributes.put("75chars", "123456789012345678901234567890123456789012345678901234567890123456789012345"); + + final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); + builder.setEventTime(System.currentTimeMillis()); + builder.setEventType(ProvenanceEventType.RECEIVE); + builder.setTransitUri("nifi://unit-test"); + attributes.put("uuid", "12345678-0000-0000-0000-012345678912"); + builder.fromFlowFile(createFlowFile(3L, 3000L, attributes)); + builder.setComponentId("1234"); + builder.setComponentType("dummy processor"); + + final ProvenanceEventRecord record = builder.build(); + repo.registerEvent(record); + repo.waitForRollover(); + + final ProvenanceEventRecord retrieved = repo.getEvent(0L); + assertNotNull(retrieved); + assertEquals("12345678-0000-0000-0000-012345678912", retrieved.getAttributes().get("uuid")); + assertEquals("12345678901234567890123456789012345678901234567890", retrieved.getAttributes().get("75chars")); + } + + + @Test + public void testFailureToCreateWriterDoesNotPreventSubsequentRollover() throws IOException, InterruptedException { + final RepositoryConfiguration config = createConfiguration(); + config.setMaxAttributeChars(50); + config.setMaxEventFileLife(3, TimeUnit.SECONDS); + + // Create a repo that will allow only a single writer to be created. + final IOException failure = new IOException("Already created writers once. Unit test causing failure."); + repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) { + int iterations = 0; + + @Override + protected RecordWriter[] createWriters(RepositoryConfiguration config, long initialRecordId) throws IOException { + if (iterations++ == 1) { + throw failure; + } else { + return super.createWriters(config, initialRecordId); + } + } + }; + + // initialize with our event reporter + repo.initialize(getEventReporter()); + + // create some events in the journal files. + final Map<String, String> attributes = new HashMap<>(); + attributes.put("75chars", "123456789012345678901234567890123456789012345678901234567890123456789012345"); + + final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); + builder.setEventTime(System.currentTimeMillis()); + builder.setEventType(ProvenanceEventType.RECEIVE); + builder.setTransitUri("nifi://unit-test"); + attributes.put("uuid", "12345678-0000-0000-0000-012345678912"); + builder.fromFlowFile(createFlowFile(3L, 3000L, attributes)); + builder.setComponentId("1234"); + builder.setComponentType("dummy processor"); + + for (int i = 0; i < 50; i++) { + final ProvenanceEventRecord event = builder.build(); + repo.registerEvent(event); + } + + // Attempt to rollover but fail to create new writers. + try { + repo.rolloverWithLock(true); + Assert.fail("Expected to get IOException when calling rolloverWithLock"); + } catch (final IOException ioe) { + assertTrue(ioe == failure); + } + + // Wait for the first rollover to succeed. + repo.waitForRollover(); + + // This time when we rollover, we should not have a problem rolling over. + repo.rolloverWithLock(true); + + // Ensure that no errors were reported. + assertEquals(0, reportedEvents.size()); + } + + + @Test + public void testBehaviorOnOutOfMemory() throws IOException, InterruptedException { + final RepositoryConfiguration config = createConfiguration(); + config.setMaxEventFileLife(3, TimeUnit.MINUTES); + config.setJournalCount(4); + + // Create a repository that overrides the createWriters() method so that we can return writers that will throw + // OutOfMemoryError where we want to + final AtomicBoolean causeOOME = new AtomicBoolean(false); + repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) { + @Override + protected RecordWriter[] createWriters(RepositoryConfiguration config, long initialRecordId) throws IOException { + final RecordWriter[] recordWriters = super.createWriters(config, initialRecordId); + + // Spy on each of the writers so that a call to writeUUID throws an OutOfMemoryError if we set the + // causeOOME flag to true + final StandardRecordWriter[] spiedWriters = new StandardRecordWriter[recordWriters.length]; + for (int i = 0; i < recordWriters.length; i++) { + final StandardRecordWriter writer = (StandardRecordWriter) recordWriters[i]; + + spiedWriters[i] = Mockito.spy(writer); + Mockito.doAnswer(new Answer<Object>() { + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable { + if (causeOOME.get()) { + throw new OutOfMemoryError(); + } else { + writer.writeUUID(invocation.getArgumentAt(0, DataOutputStream.class), invocation.getArgumentAt(1, String.class)); + } + return null; + } + }).when(spiedWriters[i]).writeUUID(Mockito.any(DataOutputStream.class), Mockito.any(String.class)); + } + + // return the writers that we are spying on + return spiedWriters; + } + }; + repo.initialize(getEventReporter()); + + final Map<String, String> attributes = new HashMap<>(); + attributes.put("75chars", "123456789012345678901234567890123456789012345678901234567890123456789012345"); + + final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); + builder.setEventTime(System.currentTimeMillis()); + builder.setEventType(ProvenanceEventType.RECEIVE); + builder.setTransitUri("nifi://unit-test"); + attributes.put("uuid", "12345678-0000-0000-0000-012345678912"); + builder.fromFlowFile(createFlowFile(3L, 3000L, attributes)); + builder.setComponentId("1234"); + builder.setComponentType("dummy processor"); + + // first make sure that we are able to write to the repo successfully. + for (int i = 0; i < 4; i++) { + final ProvenanceEventRecord record = builder.build(); + repo.registerEvent(record); + } + + // cause OOME to occur + causeOOME.set(true); + + // write 4 times to make sure that we mark all partitions as dirty + for (int i = 0; i < 4; i++) { + final ProvenanceEventRecord record = builder.build(); + try { + repo.registerEvent(record); + Assert.fail("Expected OutOfMmeoryError but was able to register event"); + } catch (final OutOfMemoryError oome) { + } + } + + // now that all partitions are dirty, ensure that as we keep trying to write, we get an IllegalStateException + // and that we don't corrupt the repository by writing partial records + for (int i = 0; i < 8; i++) { + final ProvenanceEventRecord record = builder.build(); + try { + repo.registerEvent(record); + Assert.fail("Expected OutOfMmeoryError but was able to register event"); + } catch (final IllegalStateException ise) { + } + } + + // close repo so that we can create a new one to recover records + repo.close(); + + // make sure we can recover + final MiNiFiPersistentProvenanceRepository recoveryRepo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) { + @Override + protected Set<File> recoverJournalFiles() throws IOException { + try { + return super.recoverJournalFiles(); + } catch (final IOException ioe) { + Assert.fail("Failed to recover properly"); + return null; + } + } + }; + + try { + recoveryRepo.initialize(getEventReporter()); + } finally { + recoveryRepo.close(); + } + } + + + private static class ReportedEvent { + private final Severity severity; + private final String category; + private final String message; + + public ReportedEvent(final Severity severity, final String category, final String message) { + this.severity = severity; + this.category = category; + this.message = message; + } + + public String getCategory() { + return category; + } + + public String getMessage() { + return message; + } + + public Severity getSeverity() { + return severity; + } + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/fb554819/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java b/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java new file mode 100644 index 0000000..26766d6 --- /dev/null +++ b/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.nifi.flowfile.FlowFile; + +public class TestUtil { + public static FlowFile createFlowFile(final long id, final long fileSize, final Map<String, String> attributes) { + final Map<String, String> attrCopy = new HashMap<>(attributes); + + return new FlowFile() { + @Override + public long getId() { + return id; + } + + @Override + public long getEntryDate() { + return System.currentTimeMillis(); + } + + @Override + public Set<String> getLineageIdentifiers() { + return new HashSet<String>(); + } + + @Override + public long getLineageStartDate() { + return System.currentTimeMillis(); + } + + @Override + public Long getLastQueueDate() { + return System.currentTimeMillis(); + } + + @Override + public boolean isPenalized() { + return false; + } + + @Override + public String getAttribute(final String s) { + return attrCopy.get(s); + } + + @Override + public long getSize() { + return fileSize; + } + + @Override + public Map<String, String> getAttributes() { + return attrCopy; + } + + @Override + public int compareTo(final FlowFile o) { + return 0; + } + }; + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/fb554819/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-provenance-repository-nar/pom.xml ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-provenance-repository-nar/pom.xml b/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-provenance-repository-nar/pom.xml new file mode 100644 index 0000000..1140773 --- /dev/null +++ b/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-provenance-repository-nar/pom.xml @@ -0,0 +1,42 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <artifactId>minifi-provenance-repository-bundle</artifactId> + <groupId>org.apache.nifi.minifi</groupId> + <version>0.0.1-SNAPSHOT</version> + </parent> + + <groupId>org.apache.nifi.minifi</groupId> + <artifactId>minifi-provenance-repository-nar</artifactId> + <version>0.0.1-SNAPSHOT</version> + <packaging>nar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.nifi.minifi</groupId> + <artifactId>minifi-persistent-provenance-repository</artifactId> + <version>0.0.1-SNAPSHOT</version> + </dependency> + </dependencies> + + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/fb554819/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-provenance-repository-nar/src/main/resources/META-INF/NOTICE ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-provenance-repository-nar/src/main/resources/META-INF/NOTICE b/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-provenance-repository-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000..9cfe874 --- /dev/null +++ b/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-provenance-repository-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,202 @@ +minifi-provenance-repository-nar +Copyright 2014-2016 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +=========================================== +Apache Software License v2 +=========================================== + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Apache Lucene + The following NOTICE information applies: + Apache Lucene + Copyright 2014 The Apache Software Foundation + + Includes software from other Apache Software Foundation projects, + including, but not limited to: + - Apache Ant + - Apache Jakarta Regexp + - Apache Commons + - Apache Xerces + + ICU4J, (under analysis/icu) is licensed under an MIT styles license + and Copyright (c) 1995-2008 International Business Machines Corporation and others + + Some data files (under analysis/icu/src/data) are derived from Unicode data such + as the Unicode Character Database. See http://unicode.org/copyright.html for more + details. + + Brics Automaton (under core/src/java/org/apache/lucene/util/automaton) is + BSD-licensed, created by Anders Møller. See http://www.brics.dk/automaton/ + + The levenshtein automata tables (under core/src/java/org/apache/lucene/util/automaton) were + automatically generated with the moman/finenight FSA library, created by + Jean-Philippe Barrette-LaPierre. This library is available under an MIT license, + see http://sites.google.com/site/rrettesite/moman and + http://bitbucket.org/jpbarrette/moman/overview/ + + The class org.apache.lucene.util.WeakIdentityMap was derived from + the Apache CXF project and is Apache License 2.0. + + The Google Code Prettify is Apache License 2.0. + See http://code.google.com/p/google-code-prettify/ + + JUnit (junit-4.10) is licensed under the Common Public License v. 1.0 + See http://junit.sourceforge.net/cpl-v10.html + + This product includes code (JaspellTernarySearchTrie) from Java Spelling Checkin + g Package (jaspell): http://jaspell.sourceforge.net/ + License: The BSD License (http://www.opensource.org/licenses/bsd-license.php) + + The snowball stemmers in + analysis/common/src/java/net/sf/snowball + were developed by Martin Porter and Richard Boulton. + The snowball stopword lists in + analysis/common/src/resources/org/apache/lucene/analysis/snowball + were developed by Martin Porter and Richard Boulton. + The full snowball package is available from + http://snowball.tartarus.org/ + + The KStem stemmer in + analysis/common/src/org/apache/lucene/analysis/en + was developed by Bob Krovetz and Sergio Guzman-Lara (CIIR-UMass Amherst) + under the BSD-license. + + The Arabic,Persian,Romanian,Bulgarian, and Hindi analyzers (common) come with a default + stopword list that is BSD-licensed created by Jacques Savoy. These files reside in: + analysis/common/src/resources/org/apache/lucene/analysis/ar/stopwords.txt, + analysis/common/src/resources/org/apache/lucene/analysis/fa/stopwords.txt, + analysis/common/src/resources/org/apache/lucene/analysis/ro/stopwords.txt, + analysis/common/src/resources/org/apache/lucene/analysis/bg/stopwords.txt, + analysis/common/src/resources/org/apache/lucene/analysis/hi/stopwords.txt + See http://members.unine.ch/jacques.savoy/clef/index.html. + + The German,Spanish,Finnish,French,Hungarian,Italian,Portuguese,Russian and Swedish light stemmers + (common) are based on BSD-licensed reference implementations created by Jacques Savoy and + Ljiljana Dolamic. These files reside in: + analysis/common/src/java/org/apache/lucene/analysis/de/GermanLightStemmer.java + analysis/common/src/java/org/apache/lucene/analysis/de/GermanMinimalStemmer.java + analysis/common/src/java/org/apache/lucene/analysis/es/SpanishLightStemmer.java + analysis/common/src/java/org/apache/lucene/analysis/fi/FinnishLightStemmer.java + analysis/common/src/java/org/apache/lucene/analysis/fr/FrenchLightStemmer.java + analysis/common/src/java/org/apache/lucene/analysis/fr/FrenchMinimalStemmer.java + analysis/common/src/java/org/apache/lucene/analysis/hu/HungarianLightStemmer.java + analysis/common/src/java/org/apache/lucene/analysis/it/ItalianLightStemmer.java + analysis/common/src/java/org/apache/lucene/analysis/pt/PortugueseLightStemmer.java + analysis/common/src/java/org/apache/lucene/analysis/ru/RussianLightStemmer.java + analysis/common/src/java/org/apache/lucene/analysis/sv/SwedishLightStemmer.java + + The Stempel analyzer (stempel) includes BSD-licensed software developed + by the Egothor project http://egothor.sf.net/, created by Leo Galambos, Martin Kvapil, + and Edmond Nolan. + + The Polish analyzer (stempel) comes with a default + stopword list that is BSD-licensed created by the Carrot2 project. The file resides + in stempel/src/resources/org/apache/lucene/analysis/pl/stopwords.txt. + See http://project.carrot2.org/license.html. + + The SmartChineseAnalyzer source code (smartcn) was + provided by Xiaoping Gao and copyright 2009 by www.imdict.net. + + WordBreakTestUnicode_*.java (under modules/analysis/common/src/test/) + is derived from Unicode data such as the Unicode Character Database. + See http://unicode.org/copyright.html for more details. + + The Morfologik analyzer (morfologik) includes BSD-licensed software + developed by Dawid Weiss and Marcin MiÅkowski (http://morfologik.blogspot.com/). + + Morfologik uses data from Polish ispell/myspell dictionary + (http://www.sjp.pl/slownik/en/) licenced on the terms of (inter alia) + LGPL and Creative Commons ShareAlike. + + Morfologic includes data from BSD-licensed dictionary of Polish (SGJP) + (http://sgjp.pl/morfeusz/) + + Servlet-api.jar and javax.servlet-*.jar are under the CDDL license, the original + source code for this can be found at http://www.eclipse.org/jetty/downloads.php + + =========================================================================== + Kuromoji Japanese Morphological Analyzer - Apache Lucene Integration + =========================================================================== + + This software includes a binary and/or source version of data from + + mecab-ipadic-2.7.0-20070801 + + which can be obtained from + + http://atilika.com/releases/mecab-ipadic/mecab-ipadic-2.7.0-20070801.tar.gz + + or + + http://jaist.dl.sourceforge.net/project/mecab/mecab-ipadic/2.7.0-20070801/mecab-ipadic-2.7.0-20070801.tar.gz + + =========================================================================== + mecab-ipadic-2.7.0-20070801 Notice + =========================================================================== + + Nara Institute of Science and Technology (NAIST), + the copyright holders, disclaims all warranties with regard to this + software, including all implied warranties of merchantability and + fitness, in no event shall NAIST be liable for + any special, indirect or consequential damages or any damages + whatsoever resulting from loss of use, data or profits, whether in an + action of contract, negligence or other tortuous action, arising out + of or in connection with the use or performance of this software. + + A large portion of the dictionary entries + originate from ICOT Free Software. The following conditions for ICOT + Free Software applies to the current dictionary as well. + + Each User may also freely distribute the Program, whether in its + original form or modified, to any third party or parties, PROVIDED + that the provisions of Section 3 ("NO WARRANTY") will ALWAYS appear + on, or be attached to, the Program, which is distributed substantially + in the same form as set out herein and that such intended + distribution, if actually made, will neither violate or otherwise + contravene any of the laws and regulations of the countries having + jurisdiction over the User or the intended distribution itself. + + NO WARRANTY + + The program was produced on an experimental basis in the course of the + research and development conducted during the project and is provided + to users as so produced on an experimental basis. Accordingly, the + program is provided without any warranty whatsoever, whether express, + implied, statutory or otherwise. The term "warranty" used herein + includes, but is not limited to, any warranty of the quality, + performance, merchantability and fitness for a particular purpose of + the program and the nonexistence of any infringement or violation of + any right of any third party. + + Each user of the program will agree and understand, and be deemed to + have agreed and understood, that there is no warranty whatsoever for + the program and, accordingly, the entire risk arising from or + otherwise connected with the program is assumed by the user. + + Therefore, neither ICOT, the copyright holder, or any other + organization that participated in or was otherwise related to the + development of the program and their respective officials, directors, + officers and other employees shall be held liable for any and all + damages, including, without limitation, general, special, incidental + and consequential damages, arising out of or otherwise in connection + with the use or inability to use the program or any product, material + or result produced or otherwise obtained by using the program, + regardless of whether they have been advised of, or otherwise had + knowledge of, the possibility of such damages at any time during the + project or thereafter. Each user will be deemed to have agreed to the + foregoing by his or her commencement of use of the program. The term + "use" as used herein includes, but is not limited to, the use, + modification, copying and distribution of the program and the + production of secondary products from the program. + + In the case where the program, whether in its original form or + modified, was distributed or delivered to or received by a user from + any person, organization or entity other than ICOT, unless it makes or + grants independently of ICOT any specific warranty to the user in + writing, such person, organization or entity, will also be exempted + from and not be held liable to the user for any such damages as noted + above as far as the program is concerned. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/fb554819/minifi-nar-bundles/minifi-provenance-repository-bundle/pom.xml ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-provenance-repository-bundle/pom.xml b/minifi-nar-bundles/minifi-provenance-repository-bundle/pom.xml new file mode 100644 index 0000000..117110c --- /dev/null +++ b/minifi-nar-bundles/minifi-provenance-repository-bundle/pom.xml @@ -0,0 +1,37 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi.minifi</groupId> + <artifactId>minifi-nar-bundles</artifactId> + <version>0.0.1-SNAPSHOT</version> + </parent> + + <artifactId>minifi-provenance-repository-bundle</artifactId> + <packaging>pom</packaging> + + <modules> + <module>minifi-provenance-repository-nar</module> + <module>minifi-persistent-provenance-repository</module> + </modules> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/fb554819/minifi-nar-bundles/pom.xml ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/pom.xml b/minifi-nar-bundles/pom.xml index 79a8414..84d0aef 100644 --- a/minifi-nar-bundles/pom.xml +++ b/minifi-nar-bundles/pom.xml @@ -25,6 +25,7 @@ <packaging>pom</packaging> <modules> <module>minifi-framework-bundle</module> + <module>minifi-provenance-repository-bundle</module> <module>minifi-provenance-reporting-bundle</module> </modules> </project> http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/fb554819/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index b765c7f..f07b6d1 100644 --- a/pom.xml +++ b/pom.xml @@ -138,6 +138,30 @@ limitations under the License. <version>0.0.1-SNAPSHOT</version> </dependency> <dependency> + <groupId>org.apache.nifi.minifi</groupId> + <artifactId>minifi-provenance-repository-nar</artifactId> + <version>0.0.1-SNAPSHOT</version> + <type>nar</type> + </dependency> + <dependency> + <groupId>org.apache.nifi.minifi</groupId> + <artifactId>minifi-provenance-reporting-nar</artifactId> + <version>0.0.1-SNAPSHOT</version> + <type>nar</type> + </dependency> + <dependency> + <groupId>org.apache.nifi.minifi</groupId> + <artifactId>minifi-resources</artifactId> + <classifier>resources</classifier> + <type>zip</type> + <version>0.0.1-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.nifi.minifi</groupId> + <artifactId>minifi-runtime</artifactId> + <version>0.0.1-SNAPSHOT</version> + </dependency> + <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.1.3</version>
