NIFI-2890 Provenance Repository Corruption (0.x)
* Corrected handling of corrupt journal file records that prevents instance 
startup and loss of records from corrupt files.  Specifically, exception 
handling was expanded to cover failures on records after the first the same as 
failures on the first record.
* Adjusted log messages  to reflect that the remainder or all of the journal 
will be skipped, not just the current record.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7d23bd7a
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7d23bd7a
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7d23bd7a

Branch: refs/heads/support/nifi-0.7.x
Commit: 7d23bd7ac6af228a3e723d18253063231c919cf9
Parents: 1c7e123
Author: Joe Skora <[email protected]>
Authored: Tue Feb 7 21:02:19 2017 +0000
Committer: Mike Moser <[email protected]>
Committed: Wed Feb 15 20:45:47 2017 +0000

----------------------------------------------------------------------
 .../PersistentProvenanceRepository.java         |  27 +-
 .../TestPersistentProvenanceRepository.java     | 332 +++++++++++++++++--
 2 files changed, 324 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/7d23bd7a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
index 8b971b5..4cec8aa 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
@@ -1564,16 +1564,19 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
                 try {
                     record = reader.nextRecord();
                 } catch (final EOFException eof) {
+                    // record will be null and reader can no longer be used
                 } catch (final Exception e) {
-                    logger.warn("Failed to generate Provenance Event Record 
from Journal due to " + e + "; it's possible that the record wasn't "
-                            + "completely written to the file. This record 
will be skipped.");
+                    logger.warn("Failed to generate Provenance Event Record 
from Journal due to " + e + "; it's "
+                            + "possible that the record wasn't completely 
written to the file. This journal will be "
+                            + "skipped.");
                     if (logger.isDebugEnabled()) {
                         logger.warn("", e);
                     }
 
                     if (eventReporter != null) {
-                        eventReporter.reportEvent(Severity.WARNING, 
EVENT_CATEGORY, "Failed to read Provenance Event Record from Journal due to " + 
e +
-                            "; it's possible that hte record wasn't completely 
written to the file. This record will be skipped.");
+                        eventReporter.reportEvent(Severity.WARNING, 
EVENT_CATEGORY, "Failed to read Provenance Event "
+                                + "Record from Journal due to " + e + "; it's 
possible that the record wasn't "
+                                + "completely written to the file. This 
journal will be skipped.");
                     }
                 }
 
@@ -1710,6 +1713,22 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
                             try {
                                 nextRecord = reader.nextRecord();
                             } catch (final EOFException eof) {
+                                // record will be null and reader can no 
longer be used
+                            } catch (final Exception e) {
+                                logger.warn("Failed to generate Provenance 
Event Record from Journal due to " + e
+                                        + "; it's possible that the record 
wasn't completely written to the file. "
+                                        + "The remainder of this journal will 
be skipped.");
+                                if (logger.isDebugEnabled()) {
+                                    logger.warn("", e);
+                                }
+
+                                if (eventReporter != null) {
+                                    
eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to read "
+                                            + "Provenance Event Record from 
Journal due to " + e + "; it's possible "
+                                            + "that the record wasn't 
completely written to the file. The remainder "
+                                            + "of this journal will be 
skipped.");
+                                }
+
                             }
 
                             if (nextRecord != null) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/7d23bd7a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
index 0aa0d0f..4001246 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
@@ -24,9 +24,14 @@ import static org.mockito.Mockito.mock;
 
 import java.io.File;
 import java.io.FileFilter;
+import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -36,6 +41,7 @@ import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -70,6 +76,7 @@ import org.apache.nifi.provenance.search.SearchableField;
 import org.apache.nifi.provenance.serialization.RecordReader;
 import org.apache.nifi.provenance.serialization.RecordReaders;
 import org.apache.nifi.provenance.serialization.RecordWriter;
+import org.apache.nifi.provenance.serialization.RecordWriters;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.stream.io.DataOutputStream;
 import org.apache.nifi.util.file.FileUtils;
@@ -77,9 +84,11 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.rules.TestName;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
@@ -92,6 +101,9 @@ public class TestPersistentProvenanceRepository {
     @Rule
     public TestName name = new TestName();
 
+    @ClassRule
+    public static TemporaryFolder tempFolder = new TemporaryFolder();
+
     private PersistentProvenanceRepository repo;
     private RepositoryConfiguration config;
 
@@ -99,6 +111,9 @@ public class TestPersistentProvenanceRepository {
     private EventReporter eventReporter;
     private List<ReportedEvent> reportedEvents = 
Collections.synchronizedList(new ArrayList<ReportedEvent>());
 
+    private static int headerSize;
+    private static int recordSize;
+
     private RepositoryConfiguration createConfiguration() {
         config = new RepositoryConfiguration();
         config.addStorageDirectory(new File("target/storage/" + 
UUID.randomUUID().toString()));
@@ -113,6 +128,30 @@ public class TestPersistentProvenanceRepository {
         
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", 
"DEBUG");
     }
 
+    @BeforeClass
+    public static void findJournalSizes() throws IOException {
+        // determine header and record size
+
+        final Map<String, String> attributes = new HashMap<>();
+        final ProvenanceEventBuilder builder = new 
StandardProvenanceEventRecord.Builder();
+        builder.setEventTime(System.currentTimeMillis());
+        builder.setEventType(ProvenanceEventType.RECEIVE);
+        builder.setTransitUri("nifi://unit-test");
+        attributes.put("uuid", "12345678-0000-0000-0000-012345678912");
+        builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+        builder.setComponentId("1234");
+        builder.setComponentType("dummy processor");
+        final ProvenanceEventRecord record = builder.build();
+
+        final File tempRecordFile = tempFolder.newFile("record.tmp");
+        final RecordWriter writer = 
RecordWriters.newRecordWriter(tempRecordFile, false, false);
+        writer.writeHeader(12345L);
+        headerSize = Long.valueOf(tempRecordFile.length()).intValue();
+        writer.writeRecord(record, 12345L);
+        recordSize = Long.valueOf(tempRecordFile.length()).intValue() - 
headerSize;
+        writer.close();
+    }
+
     @Before
     public void printTestName() {
         System.out.println("\n\n\n***********************  " + 
name.getMethodName() + "  *****************************");
@@ -138,24 +177,26 @@ public class TestPersistentProvenanceRepository {
             }
         }
 
-        // Delete all of the storage files. We do this in order to clean up 
the tons of files that
-        // we create but also to ensure that we have closed all of the file 
handles. If we leave any
-        // streams open, for instance, this will throw an IOException, causing 
our unit test to fail.
-        for (final File storageDir : config.getStorageDirectories()) {
-            int i;
-            for (i = 0; i < 3; i++) {
-                try {
-                    FileUtils.deleteFile(storageDir, true);
-                    break;
-                } catch (final IOException ioe) {
-                    // if there is a virus scanner, etc. running in the 
background we may not be able to
-                    // delete the file. Wait a sec and try again.
-                    if (i == 2) {
-                        throw ioe;
-                    } else {
-                        try {
-                            Thread.sleep(1000L);
-                        } catch (final InterruptedException ie) {
+        if (config != null) {
+            // Delete all of the storage files. We do this in order to clean 
up the tons of files that
+            // we create but also to ensure that we have closed all of the 
file handles. If we leave any
+            // streams open, for instance, this will throw an IOException, 
causing our unit test to fail.
+            for (final File storageDir : config.getStorageDirectories()) {
+                int i;
+                for (i = 0; i < 3; i++) {
+                    try {
+                        FileUtils.deleteFile(storageDir, true);
+                        break;
+                    } catch (final IOException ioe) {
+                        // if there is a virus scanner, etc. running in the 
background we may not be able to
+                        // delete the file. Wait a sec and try again.
+                        if (i == 2) {
+                            throw ioe;
+                        } else {
+                            try {
+                                Thread.sleep(1000L);
+                            } catch (final InterruptedException ie) {
+                            }
                         }
                     }
                 }
@@ -1303,10 +1344,6 @@ public class TestPersistentProvenanceRepository {
         repo.registerEvent(builder.build());
     }
 
-
-    // TODO: test EOF on merge
-    // TODO: Test journal with no records
-
     @Test
     public void testTextualQuery() throws InterruptedException, IOException, 
ParseException {
         final RepositoryConfiguration config = createConfiguration();
@@ -1375,6 +1412,28 @@ public class TestPersistentProvenanceRepository {
         }
     }
 
+    private long checkJournalRecords(final File storageDir, final Boolean 
exact) throws IOException {
+        File[] storagefiles = storageDir.listFiles();
+        long counter = 0;
+        assertNotNull(storagefiles);
+        for (final File file : storagefiles) {
+            if (file.isFile()) {
+                try (RecordReader reader = RecordReaders.newRecordReader(file, 
null, 2048)) {
+                    ProvenanceEventRecord r;
+                    ProvenanceEventRecord last = null;
+                    while ((r = reader.nextRecord()) != null) {
+                        if (exact) {
+                            assertTrue(counter++ == r.getEventId());
+                        } else {
+                            assertTrue(counter++ <= r.getEventId());
+                        }
+                    }
+                }
+            }
+        }
+        return counter;
+    }
+
     @Test
     public void testMergeJournals() throws IOException, InterruptedException {
         final RepositoryConfiguration config = createConfiguration();
@@ -1408,21 +1467,177 @@ public class TestPersistentProvenanceRepository {
         repo.waitForRollover();
 
         final File storageDir = config.getStorageDirectories().get(0);
-        long counter = 0;
-        for (final File file : storageDir.listFiles()) {
-            if (file.isFile()) {
+        assertEquals(10000, checkJournalRecords(storageDir, true));
+    }
 
-                try (RecordReader reader = RecordReaders.newRecordReader(file, 
null, 2048)) {
-                    ProvenanceEventRecord r = null;
+    private void corruptJournalFile(final File journalFile, final int position,
+                                    final String original, final String 
replacement) throws IOException {
+        final int journalLength = 
Long.valueOf(journalFile.length()).intValue();
+        final byte[] origBytes = original.getBytes();
+        final byte[] replBytes = replacement.getBytes();
+        FileInputStream journalIn = new FileInputStream(journalFile);
+        byte[] content = new byte[journalLength];
+        assertEquals(journalLength, journalIn.read(content, 0, journalLength));
+        journalIn.close();
+        assertEquals(original, new String(Arrays.copyOfRange(content, 
position, position + origBytes.length)));
+        System.arraycopy(replBytes, 0, content, position, replBytes.length);
+        FileOutputStream journalOut = new FileOutputStream(journalFile);
+        journalOut.write(content, 0, journalLength);
+        journalOut.flush();
+        journalOut.close();
+    }
 
-                    while ((r = reader.nextRecord()) != null) {
-                        assertEquals(counter++, r.getEventId());
-                    }
+    @Test
+    public void testMergeJournalsBadFirstRecord() throws IOException, 
InterruptedException {
+        final RepositoryConfiguration config = createConfiguration();
+        config.setMaxEventFileLife(3, TimeUnit.SECONDS);
+        final TestablePersistentProvenanceRepository testRepo = new 
TestablePersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
+        testRepo.initialize(getEventReporter());
+
+        final Map<String, String> attributes = new HashMap<>();
+
+        final ProvenanceEventBuilder builder = new 
StandardProvenanceEventRecord.Builder();
+        builder.setEventTime(System.currentTimeMillis());
+        builder.setEventType(ProvenanceEventType.RECEIVE);
+        builder.setTransitUri("nifi://unit-test");
+        attributes.put("uuid", "12345678-0000-0000-0000-012345678912");
+        builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+        builder.setComponentId("1234");
+        builder.setComponentType("dummy processor");
+
+        final ProvenanceEventRecord record = builder.build();
+
+        final ExecutorService exec = Executors.newFixedThreadPool(10);
+        final List<Future> futures = new ArrayList<>();
+        for (int i = 0; i < 10000; i++) {
+            futures.add(exec.submit(new Runnable() {
+                @Override
+                public void run() {
+                    testRepo.registerEvent(record);
+                }
+            }));
+        }
+
+        // wait for writers to finish and then corrupt the first record of the 
first journal file
+        for (Future future : futures) {
+            while (!future.isDone()) {
+                Thread.sleep(10);
+            }
+        }
+        RecordWriter firstWriter = testRepo.getWriters()[0];
+        corruptJournalFile(firstWriter.getFile(), headerSize + 10, "RECEIVE", 
"BADTYPE");
+
+        testRepo.recoverJournalFiles();
+
+        assertEquals("mergeJournals() should report a skipped journal", 1, 
reportedEvents.size());
+        assertEquals("mergeJournals() should report a skipped journal",
+                "Failed to read Provenance Event Record from Journal due to 
java.lang.IllegalArgumentException: "
+                        + "No enum constant 
org.apache.nifi.provenance.ProvenanceEventType.BADTYPE; it's possible "
+                        + "that the record wasn't completely written to the 
file. This journal will be skipped.",
+                reportedEvents.get(reportedEvents.size() - 1).getMessage());
+
+        final File storageDir = config.getStorageDirectories().get(0);
+        assertTrue(checkJournalRecords(storageDir, false) < 10000);
+    }
+
+    @Test
+    public void testMergeJournalsBadRecordAfterFirst() throws IOException, 
InterruptedException {
+        final RepositoryConfiguration config = createConfiguration();
+        config.setMaxEventFileLife(3, TimeUnit.SECONDS);
+        final TestablePersistentProvenanceRepository testRepo = new 
TestablePersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
+        testRepo.initialize(getEventReporter());
+
+        final Map<String, String> attributes = new HashMap<>();
+
+        final ProvenanceEventBuilder builder = new 
StandardProvenanceEventRecord.Builder();
+        builder.setEventTime(System.currentTimeMillis());
+        builder.setEventType(ProvenanceEventType.RECEIVE);
+        builder.setTransitUri("nifi://unit-test");
+        attributes.put("uuid", "12345678-0000-0000-0000-012345678912");
+        builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+        builder.setComponentId("1234");
+        builder.setComponentType("dummy processor");
+
+        final ProvenanceEventRecord record = builder.build();
+
+        final ExecutorService exec = Executors.newFixedThreadPool(10);
+        final List<Future> futures = new ArrayList<>();
+        for (int i = 0; i < 10000; i++) {
+            futures.add(exec.submit(new Runnable() {
+                @Override
+                public void run() {
+                    testRepo.registerEvent(record);
                 }
+            }));
+        }
+
+        // corrupt the first record of the first journal file
+        for (Future future : futures) {
+            while (!future.isDone()) {
+                Thread.sleep(10);
             }
         }
+        RecordWriter firstWriter = testRepo.getWriters()[0];
+        corruptJournalFile(firstWriter.getFile(), headerSize + 10 + 
recordSize, "RECEIVE", "BADTYPE");
+
+        testRepo.recoverJournalFiles();
 
-        assertEquals(10000, counter);
+        assertEquals("mergeJournals should report a skipped journal", 1, 
reportedEvents.size());
+        assertEquals("mergeJournals should report a skipped journal",
+                "Failed to read Provenance Event Record from Journal due to 
java.lang.IllegalArgumentException: "
+                        + "No enum constant 
org.apache.nifi.provenance.ProvenanceEventType.BADTYPE; it's possible "
+                        + "that the record wasn't completely written to the 
file. The remainder of this journal will "
+                        + "be skipped.",
+                reportedEvents.get(reportedEvents.size() - 1).getMessage());
+
+        final File storageDir = config.getStorageDirectories().get(0);
+        assertTrue(checkJournalRecords(storageDir, false) < 10000);
+    }
+
+    @Test
+    public void testMergeJournalsEmptyJournal() throws IOException, 
InterruptedException {
+        final RepositoryConfiguration config = createConfiguration();
+        config.setMaxEventFileLife(3, TimeUnit.SECONDS);
+        final TestablePersistentProvenanceRepository testRepo = new 
TestablePersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
+        testRepo.initialize(getEventReporter());
+
+        final Map<String, String> attributes = new HashMap<>();
+
+        final ProvenanceEventBuilder builder = new 
StandardProvenanceEventRecord.Builder();
+        builder.setEventTime(System.currentTimeMillis());
+        builder.setEventType(ProvenanceEventType.RECEIVE);
+        builder.setTransitUri("nifi://unit-test");
+        attributes.put("uuid", "12345678-0000-0000-0000-012345678912");
+        builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+        builder.setComponentId("1234");
+        builder.setComponentType("dummy processor");
+
+        final ProvenanceEventRecord record = builder.build();
+
+        final ExecutorService exec = Executors.newFixedThreadPool(10);
+        final List<Future> futures = new ArrayList<>();
+        for (int i = 0; i < config.getJournalCount() - 1; i++) {
+            futures.add(exec.submit(new Runnable() {
+                @Override
+                public void run() {
+                    testRepo.registerEvent(record);
+                }
+            }));
+        }
+
+        // wait for writers to finish and then corrupt the first record of the 
first journal file
+        for (Future future : futures) {
+            while (!future.isDone()) {
+                Thread.sleep(10);
+            }
+        }
+
+        testRepo.recoverJournalFiles();
+
+        assertEquals("mergeJournals() should not error on empty journal", 0, 
reportedEvents.size());
+
+        final File storageDir = config.getStorageDirectories().get(0);
+        assertEquals(config.getJournalCount() - 1, 
checkJournalRecords(storageDir, true));
     }
 
     @Test
@@ -1699,4 +1914,59 @@ public class TestPersistentProvenanceRepository {
             return severity;
         }
     }
+
+    private class TestablePersistentProvenanceRepository extends 
PersistentProvenanceRepository {
+
+        TestablePersistentProvenanceRepository() throws IOException {
+            super();
+        }
+
+        TestablePersistentProvenanceRepository(final RepositoryConfiguration 
configuration, final int rolloverCheckMillis) throws IOException {
+            super(configuration, rolloverCheckMillis);
+        }
+
+        RecordWriter[] getWriters() {
+            Class klass = PersistentProvenanceRepository.class;
+            Field writersField;
+            RecordWriter[] writers = null;
+            try {
+                writersField = klass.getDeclaredField("writers");
+                writersField.setAccessible(true);
+                writers = (RecordWriter[]) writersField.get(this);
+            } catch (NoSuchFieldException | IllegalAccessException e) {
+                e.printStackTrace();
+            }
+            return writers;
+        }
+
+        int getRolloverCheckMillis() {
+            Class klass = PersistentProvenanceRepository.class;
+            Field rolloverCheckMillisField;
+            int rolloverCheckMillis = -1;
+            try {
+                rolloverCheckMillisField = 
klass.getDeclaredField("rolloverCheckMillis");
+                rolloverCheckMillisField.setAccessible(true);
+                rolloverCheckMillis = (int) rolloverCheckMillisField.get(this);
+            } catch (NoSuchFieldException | IllegalAccessException e) {
+                e.printStackTrace();
+            }
+            return rolloverCheckMillis;
+        }
+
+    }
+
+    private RepositoryConfiguration createTestableRepositoryConfiguration() {
+        Class klass = PersistentProvenanceRepository.class;
+        Method createRepositoryConfigurationMethod;
+        RepositoryConfiguration configuration = null;
+        try {
+            createRepositoryConfigurationMethod = 
klass.getDeclaredMethod("createRepositoryConfiguration");
+            createRepositoryConfigurationMethod.setAccessible(true);
+            configuration = 
(RepositoryConfiguration)createRepositoryConfigurationMethod.invoke(null);
+        } catch (IllegalAccessException | NoSuchMethodException | 
InvocationTargetException e) {
+            e.printStackTrace();
+        }
+        return configuration;
+    }
+
 }


Reply via email to