NIFI-4023, NIFI-4077 This closes #2075. Addressed issue where repository was 
aging off the wrong index. When it should age off Index 1, it was removing 
Index 2. As a result, the earliest index is never aged off, and the newest 
index could potentially be aged off before it is ready to be. Also addressed 
issue where a query that attempts to read an event that has aged off throws 
FileNotFoundException (NIFI-4077) instead of skipping over the event. The 
JavaDocs indicate that the EventIterator should skip records that it cannot 
find, but SelectiveRecordReaderEventIterator throw FileNotFoundException instead

Signed-off-by: joewitt <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/84935d4f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/84935d4f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/84935d4f

Branch: refs/heads/master
Commit: 84935d4f7840fe9b22aa929afb403718c073e627
Parents: ae940d8
Author: Mark Payne <[email protected]>
Authored: Fri Aug 11 13:08:29 2017 -0400
Committer: joewitt <[email protected]>
Committed: Fri Aug 11 22:01:46 2017 -0700

----------------------------------------------------------------------
 .../index/lucene/IndexDirectoryManager.java     | 21 +++++-----
 .../provenance/index/lucene/IndexLocation.java  |  9 +++++
 .../index/lucene/LuceneEventIndex.java          | 15 +++++---
 .../SelectiveRecordReaderEventIterator.java     | 40 +++++++++++---------
 .../index/lucene/TestIndexDirectoryManager.java | 38 +++++++++++++++++++
 .../TestSelectiveRecordReaderEventIterator.java | 23 +++++++++++
 6 files changed, 111 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/84935d4f/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexDirectoryManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexDirectoryManager.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexDirectoryManager.java
index 53f74e0..033e8d0 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexDirectoryManager.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexDirectoryManager.java
@@ -125,9 +125,9 @@ public class IndexDirectoryManager {
         // If looking at index N, we can determine the index end time by 
assuming that it is the same as the
         // start time of index N+1. So we determine the time range of each 
index and select an index only if
         // its start time is before the given timestamp and its end time is <= 
the given timestamp.
-        for (final List<IndexLocation> startTimeWithFile : 
startTimeWithFileByStorageDirectory.values()) {
-            for (int i = 0; i < startTimeWithFile.size(); i++) {
-                final IndexLocation indexLoc = startTimeWithFile.get(i);
+        for (final List<IndexLocation> locationList : 
startTimeWithFileByStorageDirectory.values()) {
+            for (int i = 0; i < locationList.size(); i++) {
+                final IndexLocation indexLoc = locationList.get(i);
 
                 final String partition = indexLoc.getPartitionName();
                 final IndexLocation activeLocation = 
activeIndices.get(partition);
@@ -143,16 +143,13 @@ public class IndexDirectoryManager {
                     break;
                 }
 
-                if (i < startTimeWithFile.size() - 1) {
-                    final IndexLocation nextLocation = startTimeWithFile.get(i 
+ 1);
-                    final Long indexEndTime = 
nextLocation.getIndexStartTimestamp();
-                    if (indexEndTime <= timestamp) {
-                        logger.debug("Considering Index Location {} older than 
{} ({}) because its events have an EventTime "
-                            + "ranging from {} ({}) to {} ({}) based on the 
following IndexLocations: {}", nextLocation, timestamp, new Date(timestamp),
-                            indexStartTime, new Date(indexStartTime), 
indexEndTime, new Date(indexEndTime), startTimeWithFile);
+                final long indexEndTime = indexLoc.getIndexEndTimestamp();
+                if (indexEndTime <= timestamp) {
+                    logger.debug("Considering Index Location {} older than {} 
({}) because its events have an EventTime "
+                        + "ranging from {} ({}) to {} ({}) based on the 
following IndexLocations: {}", indexLoc, timestamp, new Date(timestamp),
+                        indexStartTime, new Date(indexStartTime), 
indexEndTime, new Date(indexEndTime), locationList);
 
-                        selected.add(nextLocation.getIndexDirectory());
-                    }
+                    selected.add(indexLoc.getIndexDirectory());
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/84935d4f/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexLocation.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexLocation.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexLocation.java
index f7de84f..b0fb68f 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexLocation.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexLocation.java
@@ -38,6 +38,15 @@ public class IndexLocation {
         return indexStartTimestamp;
     }
 
+    public long getIndexEndTimestamp() {
+        final long lastMod = indexDirectory.lastModified();
+        if (lastMod == 0) {
+            return System.currentTimeMillis();
+        }
+
+        return lastMod;
+    }
+
     public String getPartitionName() {
         return partitionName;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/84935d4f/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java
index 4a38071..c44c7d2 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java
@@ -648,14 +648,19 @@ public class LuceneEventIndex implements EventIndex {
     void performMaintenance() {
         try {
             final List<ProvenanceEventRecord> firstEvents = 
eventStore.getEvents(0, 1);
+
+            final long earliestEventTime;
             if (firstEvents.isEmpty()) {
-                return;
+                earliestEventTime = System.currentTimeMillis();
+                logger.debug("Found no events in the Provenance Repository. In 
order to perform maintenace of the indices, "
+                    + "will assume that the first event time is now ({})", 
System.currentTimeMillis());
+            } else {
+                final ProvenanceEventRecord firstEvent = firstEvents.get(0);
+                earliestEventTime = firstEvent.getEventTime();
+                logger.debug("First Event Time is {} ({}) with Event ID {}; 
will delete any Lucene Index that is older than this",
+                    earliestEventTime, new Date(earliestEventTime), 
firstEvent.getEventId());
             }
 
-            final ProvenanceEventRecord firstEvent = firstEvents.get(0);
-            final long earliestEventTime = firstEvent.getEventTime();
-            logger.debug("First Event Time is {} ({}) with Event ID {}; will 
delete any Lucene Index that is older than this",
-                earliestEventTime, new Date(earliestEventTime), 
firstEvent.getEventId());
             final List<File> indicesBeforeEarliestEvent = 
directoryManager.getDirectoriesBefore(earliestEventTime);
 
             for (final File index : indicesBeforeEarliestEvent) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/84935d4f/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/iterator/SelectiveRecordReaderEventIterator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/iterator/SelectiveRecordReaderEventIterator.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/iterator/SelectiveRecordReaderEventIterator.java
index c4a130b..e09fe05 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/iterator/SelectiveRecordReaderEventIterator.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/iterator/SelectiveRecordReaderEventIterator.java
@@ -17,7 +17,9 @@
 
 package org.apache.nifi.provenance.store.iterator;
 
+import java.io.EOFException;
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -123,28 +125,30 @@ public class SelectiveRecordReaderEventIterator 
implements EventIterator {
                     continue;
                 }
 
-                // If we determined which file the event should be in, and 
that's not the file that
-                // we are currently reading from, rotate the reader to the 
appropriate one.
-                if (!fileForEvent.equals(currentFile)) {
-                    if (reader != null) {
-                        try {
-                            reader.close();
-                        } catch (final Exception e) {
-                            logger.warn("Failed to close {}; some resources 
may not be cleaned up appropriately", reader);
+                try {
+                    // If we determined which file the event should be in, and 
that's not the file that
+                    // we are currently reading from, rotate the reader to the 
appropriate one.
+                    if (!fileForEvent.equals(currentFile)) {
+                        if (reader != null) {
+                            try {
+                                reader.close();
+                            } catch (final Exception e) {
+                                logger.warn("Failed to close {}; some 
resources may not be cleaned up appropriately", reader);
+                            }
                         }
-                    }
 
-                    reader = readerFactory.newRecordReader(fileForEvent, 
Collections.emptyList(), maxAttributeChars);
-                    this.currentFile = fileForEvent;
-                }
+                        reader = readerFactory.newRecordReader(fileForEvent, 
Collections.emptyList(), maxAttributeChars);
+                        this.currentFile = fileForEvent;
+                    }
 
-                final Optional<ProvenanceEventRecord> eventOption = 
reader.skipToEvent(eventId);
-                if (eventOption.isPresent() && eventOption.get().getEventId() 
== eventId) {
-                    reader.nextRecord();    // consume the event from the 
stream.
-                    return eventOption;
+                    final Optional<ProvenanceEventRecord> eventOption = 
reader.skipToEvent(eventId);
+                    if (eventOption.isPresent() && 
eventOption.get().getEventId() == eventId) {
+                        reader.nextRecord();    // consume the event from the 
stream.
+                        return eventOption;
+                    }
+                } catch (final FileNotFoundException | EOFException e) {
+                    logger.warn("Failed to retrieve Event with ID {}", 
eventId, e);
                 }
-
-                continue;
             }
 
             return Optional.empty();

http://git-wip-us.apache.org/repos/asf/nifi/blob/84935d4f/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestIndexDirectoryManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestIndexDirectoryManager.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestIndexDirectoryManager.java
index efcb601..9c29e24 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestIndexDirectoryManager.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestIndexDirectoryManager.java
@@ -129,6 +129,44 @@ public class TestIndexDirectoryManager {
         }
     }
 
+    @Test
+    public void testGetDirectoriesBefore() throws InterruptedException {
+        final RepositoryConfiguration config = createConfig(2);
+        config.setDesiredIndexSize(4096 * 128);
+
+        final File storageDir = config.getStorageDirectories().get("1");
+
+        final File index1 = new File(storageDir, "index-1");
+        final File index2 = new File(storageDir, "index-2");
+
+        final File[] allIndices = new File[] {index1, index2};
+        for (final File file : allIndices) {
+            if (file.exists()) {
+                assertTrue(file.delete());
+            }
+        }
+
+        assertTrue(index1.mkdirs());
+        // Wait 1500 millis because some file systems use only 
second-precision timestamps instead of millisecond-precision timestamps and
+        // we want to ensure that the two directories have different 
timestamps. Also using a value of 1500 instead of 1000 because sleep()
+        // can awake before the given time so we give it a buffer zone.
+        Thread.sleep(1500L);
+        final long timestamp = System.currentTimeMillis();
+        assertTrue(index2.mkdirs());
+
+        try {
+            final IndexDirectoryManager mgr = new 
IndexDirectoryManager(config);
+            mgr.initialize();
+
+            final List<File> dirsBefore = mgr.getDirectoriesBefore(timestamp);
+            assertEquals(1, dirsBefore.size());
+            assertEquals(index1, dirsBefore.get(0));
+        } finally {
+            for (final File file : allIndices) {
+                file.delete();
+            }
+        }
+    }
 
 
     private IndexLocation createLocation(final long timestamp) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/84935d4f/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/iterator/TestSelectiveRecordReaderEventIterator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/iterator/TestSelectiveRecordReaderEventIterator.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/iterator/TestSelectiveRecordReaderEventIterator.java
index 0089f61..2cfbce2 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/iterator/TestSelectiveRecordReaderEventIterator.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/iterator/TestSelectiveRecordReaderEventIterator.java
@@ -18,6 +18,7 @@
 package org.apache.nifi.provenance.store.iterator;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 
 import java.io.File;
 import java.io.IOException;
@@ -25,6 +26,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
@@ -84,6 +86,27 @@ public class TestSelectiveRecordReaderEventIterator {
     }
 
     @Test
+    public void testFileNotFound() throws IOException {
+        final File file1 = new File("1.prov");
+
+        // Filter out the first file.
+        final List<File> files = new ArrayList<>();
+        files.add(file1);
+
+        List<Long> eventIds = new ArrayList<>();
+        eventIds.add(1L);
+        eventIds.add(5L);
+
+        final RecordReaderFactory readerFactory = (file, logs, maxChars) -> {
+            return RecordReaders.newRecordReader(file, logs, maxChars);
+        };
+
+        final SelectiveRecordReaderEventIterator itr = new 
SelectiveRecordReaderEventIterator(files, readerFactory, eventIds, 65536);
+        final Optional<ProvenanceEventRecord> firstRecordOption = 
itr.nextEvent();
+        assertFalse(firstRecordOption.isPresent());
+    }
+
+    @Test
     @Ignore("For local testing only. Runs indefinitely")
     public void testPerformanceOfRandomAccessReads() throws Exception {
         final File dir = new File("target/storage/" + 
UUID.randomUUID().toString());

Reply via email to