This is an automated email from the ASF dual-hosted git repository.

bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 83ac191  NIFI-5997: If we swap out data, ensure that we do not 
increment the size of the queue by the size of the data that we failed to swap 
out. Also, if the FlowFile Repo does not know about a given swap file, do not 
restore it on restart
83ac191 is described below

commit 83ac191736e8036f82da467ceb1940b50d9886f0
Author: Mark Payne <[email protected]>
AuthorDate: Fri Feb 1 13:10:51 2019 -0500

    NIFI-5997: If we swap out data, ensure that we do not increment the size of 
the queue by the size of the data that we failed to swap out. Also, if the 
FlowFile Repo does not know about a given swap file, do not restore it on 
restart
    
    This closes #3290.
    
    Signed-off-by: Bryan Bende <[email protected]>
---
 .../exception/FlowFileAccessException.java         |   2 +-
 .../java/org/apache/nifi/wali/HashMapSnapshot.java |  20 ++--
 .../nifi/wali/SequentialAccessWriteAheadLog.java   |  20 +++-
 .../org/apache/nifi/wali/WriteAheadSnapshot.java   |   3 +
 .../main/java/org/wali/WriteAheadRepository.java   |   2 +-
 .../controller/repository/FlowFileRepository.java  |  26 ++++-
 .../nifi-framework/nifi-framework-core/pom.xml     |   1 +
 .../nifi/controller/FileSystemSwapManager.java     |  20 +++-
 .../controller/queue/SwappablePriorityQueue.java   |   8 +-
 .../repository/StandardProcessSession.java         |  30 +++++-
 .../repository/VolatileFlowFileRepository.java     |  14 ++-
 .../repository/WriteAheadFlowFileRepository.java   |  88 +++++++++++++---
 .../apache/nifi/controller/MockSwapManager.java    |  38 +++++--
 .../nifi/controller/TestFileSystemSwapManager.java | 114 ++++++++++++++++++---
 .../clustered/TestSwappablePriorityQueue.java      |  59 ++++++++---
 .../repository/TestStandardProcessSession.java     | 113 +++++++++++++-------
 .../TestWriteAheadFlowFileRepository.java          |  96 +++++++++++++++++
 .../src/test/resources/swap/444-old-swap-file.swap | Bin 0 -> 1730054 bytes
 18 files changed, 536 insertions(+), 118 deletions(-)

diff --git 
a/nifi-api/src/main/java/org/apache/nifi/processor/exception/FlowFileAccessException.java
 
b/nifi-api/src/main/java/org/apache/nifi/processor/exception/FlowFileAccessException.java
index c7e9c22..c64ea1c 100644
--- 
a/nifi-api/src/main/java/org/apache/nifi/processor/exception/FlowFileAccessException.java
+++ 
b/nifi-api/src/main/java/org/apache/nifi/processor/exception/FlowFileAccessException.java
@@ -18,7 +18,7 @@ package org.apache.nifi.processor.exception;
 
 /**
  * Indicates an issue occurred while accessing the content of a FlowFile, such
- * as an IOException.
+ * as an IOException,  or obtaining a reference to the FlowFile
  *
  */
 public class FlowFileAccessException extends RuntimeException {
diff --git 
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/HashMapSnapshot.java
 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/HashMapSnapshot.java
index 0dad62c..002ecd2 100644
--- 
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/HashMapSnapshot.java
+++ 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/HashMapSnapshot.java
@@ -17,6 +17,12 @@
 
 package org.apache.nifi.wali;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wali.SerDe;
+import org.wali.SerDeFactory;
+import org.wali.UpdateType;
+
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
@@ -37,12 +43,6 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wali.SerDe;
-import org.wali.SerDeFactory;
-import org.wali.UpdateType;
-
 public class HashMapSnapshot<T> implements WriteAheadSnapshot<T>, 
RecordLookup<T> {
     private static final Logger logger = 
LoggerFactory.getLogger(HashMapSnapshot.class);
     private static final int ENCODING_VERSION = 1;
@@ -216,10 +216,14 @@ public class HashMapSnapshot<T> implements 
WriteAheadSnapshot<T>, RecordLookup<T
         return recordMap.get(recordId);
     }
 
-
     @Override
     public SnapshotCapture<T> prepareSnapshot(final long maxTransactionId) {
-        return new Snapshot(new HashMap<>(recordMap), new 
HashSet<>(swapLocations), maxTransactionId);
+        return prepareSnapshot(maxTransactionId, this.swapLocations);
+    }
+
+    @Override
+    public SnapshotCapture<T> prepareSnapshot(final long maxTransactionId, 
final Set<String> swapFileLocations) {
+        return new Snapshot(new HashMap<>(recordMap), new 
HashSet<>(swapFileLocations), maxTransactionId);
     }
 
     private int getVersion() {
diff --git 
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java
 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java
index 11eb31c..240a212 100644
--- 
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java
+++ 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java
@@ -29,6 +29,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -65,6 +66,7 @@ public class SequentialAccessWriteAheadLog<T> implements 
WriteAheadRepository<T>
     private final File journalsDirectory;
     private final SerDeFactory<T> serdeFactory;
     private final SyncListener syncListener;
+    private final Set<String> recoveredSwapLocations = new HashSet<>();
 
     private final ReadWriteLock journalRWLock = new ReentrantReadWriteLock();
     private final Lock journalReadLock = journalRWLock.readLock();
@@ -144,6 +146,7 @@ public class SequentialAccessWriteAheadLog<T> implements 
WriteAheadRepository<T>
         final long recoverStart = System.nanoTime();
         recovered = true;
         snapshotRecovery = snapshot.recover();
+        
this.recoveredSwapLocations.addAll(snapshotRecovery.getRecoveredSwapLocations());
 
         final long snapshotRecoveryMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - recoverStart);
 
@@ -212,7 +215,9 @@ public class SequentialAccessWriteAheadLog<T> implements 
WriteAheadRepository<T>
         final long recoveryMillis = 
TimeUnit.MILLISECONDS.convert(recoverNanos, TimeUnit.NANOSECONDS);
         logger.info("Successfully recovered {} records in {} milliseconds. Now 
checkpointing to ensure that Write-Ahead Log is in a consistent state", 
recoveredRecords.size(), recoveryMillis);
 
-        checkpoint();
+        this.recoveredSwapLocations.addAll(swapLocations);
+
+        checkpoint(this.recoveredSwapLocations);
 
         return recoveredRecords.values();
     }
@@ -238,11 +243,15 @@ public class SequentialAccessWriteAheadLog<T> implements 
WriteAheadRepository<T>
             throw new IllegalStateException("Cannot retrieve the Recovered 
Swap Locations until record recovery has been performed");
         }
 
-        return snapshotRecovery.getRecoveredSwapLocations();
+        return Collections.unmodifiableSet(this.recoveredSwapLocations);
     }
 
     @Override
     public int checkpoint() throws IOException {
+        return checkpoint(null);
+    }
+
+    private int checkpoint(final Set<String> swapLocations) throws IOException 
{
         final SnapshotCapture<T> snapshotCapture;
 
         final long startNanos = System.nanoTime();
@@ -276,7 +285,12 @@ public class SequentialAccessWriteAheadLog<T> implements 
WriteAheadRepository<T>
             final File[] existingFiles = 
journalsDirectory.listFiles(this::isJournalFile);
             existingJournals = (existingFiles == null) ? new File[0] : 
existingFiles;
 
-            snapshotCapture = snapshot.prepareSnapshot(nextTransactionId - 1);
+            if (swapLocations == null) {
+                snapshotCapture = snapshot.prepareSnapshot(nextTransactionId - 
1);
+            } else {
+                snapshotCapture = snapshot.prepareSnapshot(nextTransactionId - 
1, swapLocations);
+            }
+
 
             // Create a new journal. We name the journal file <next 
transaction id>.journal but it is possible
             // that we could have an empty journal file already created. If 
this happens, we don't want to create
diff --git 
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadSnapshot.java
 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadSnapshot.java
index a4cbcd2..fd7cfd8 100644
--- 
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadSnapshot.java
+++ 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadSnapshot.java
@@ -19,10 +19,13 @@ package org.apache.nifi.wali;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Set;
 
 public interface WriteAheadSnapshot<T> {
     SnapshotCapture<T> prepareSnapshot(long maxTransactionId);
 
+    SnapshotCapture<T> prepareSnapshot(long maxTransactionId, Set<String> 
swapLocations);
+
     void writeSnapshot(SnapshotCapture<T> snapshot) throws IOException;
 
     SnapshotRecovery<T> recover() throws IOException;
diff --git 
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/WriteAheadRepository.java
 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/WriteAheadRepository.java
index 05fc8a5..b7f18ab 100644
--- 
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/WriteAheadRepository.java
+++ 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/WriteAheadRepository.java
@@ -65,7 +65,7 @@ public interface WriteAheadRepository<T> {
      * if power is lost or the Operating System crashes
      * @throws IOException if failure to update repo
      * @throws IllegalArgumentException if multiple records within the given
-     * Collection have the same ID, as specified by {@link Record#getId()}
+     * Collection have the same ID, as specified by {@link 
SerDe#getRecordIdentifier(Object)}
      * method
      *
      * @return the index of the Partition that performed the update
diff --git 
a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
 
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
index 6560c0a..b9ff249 100644
--- 
a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
+++ 
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
@@ -16,14 +16,14 @@
  */
 package org.apache.nifi.controller.repository;
 
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 
-import org.apache.nifi.controller.queue.FlowFileQueue;
-import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-
 /**
  * Implementations must be thread safe
  *
@@ -128,4 +128,24 @@ public interface FlowFileRepository extends Closeable {
      * @throws IOException if swap fails
      */
     void swapFlowFilesIn(String swapLocation, List<FlowFileRecord> 
flowFileRecords, FlowFileQueue flowFileQueue) throws IOException;
+
+    /**
+     * <p>
+     * Determines whether or not the given swap location suffix is a valid, 
known location according to this FlowFileRepository. Note that while
+     * the {@link #swapFlowFilesIn(String, List, FlowFileQueue)} and {@link 
#swapFlowFilesOut(List, FlowFileQueue, String)} methods expect
+     * a full "swap location" this method expects only the "suffix" of a swap 
location. For example, if the location points to a file, this method
+     * would expect only the filename, not the full path.
+     * </p>
+     *
+     * <p>
+     * This method differs from the others because the other methods want to 
store the swap location or recover from a given location. However,
+     * this method is used to verify that the location is known. If for any 
reason, NiFi is stopped, its FlowFile Repository relocated to a new
+     * location (for example, a different disk partition), and restarted, the 
swap location would not match if we used the full location. Therefore,
+     * by using only the "suffix" (i.e. the filename for a file-based 
implementation), we can avoid worrying about relocation.
+     * </p>
+     *
+     * @param swapLocationSuffix the suffix of the location to check
+     * @return <code>true</code> if the swap location is known and valid, 
<code>false</code> otherwise
+     */
+    boolean isValidSwapLocationSuffix(String swapLocationSuffix);
 }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
index 6bb1ea8..4b9af46 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
@@ -239,6 +239,7 @@
                         <exclude>src/test/resources/bye.txt</exclude>
                         
<exclude>src/test/resources/old-swap-file.swap</exclude>
                         <exclude>src/test/resources/xxe_template.xml</exclude>
+                        
<exclude>src/test/resources/swap/444-old-swap-file.swap</exclude>
                     </excludes>
                 </configuration>
             </plugin>
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
index 5f8f925..b2717c2 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.controller;
 
 import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.controller.repository.FlowFileRecord;
 import org.apache.nifi.controller.repository.FlowFileRepository;
 import org.apache.nifi.controller.repository.FlowFileSwapManager;
@@ -27,6 +28,8 @@ import 
org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 import org.apache.nifi.controller.swap.SchemaSwapDeserializer;
 import org.apache.nifi.controller.swap.SchemaSwapSerializer;
 import org.apache.nifi.controller.swap.SimpleSwapDeserializer;
+import org.apache.nifi.controller.swap.StandardSwapContents;
+import org.apache.nifi.controller.swap.StandardSwapSummary;
 import org.apache.nifi.controller.swap.SwapDeserializer;
 import org.apache.nifi.controller.swap.SwapSerializer;
 import org.apache.nifi.events.EventReporter;
@@ -95,14 +98,17 @@ public class FileSystemSwapManager implements 
FlowFileSwapManager {
     }
 
     public FileSystemSwapManager(final NiFiProperties nifiProperties) {
-        final Path flowFileRepoPath = 
nifiProperties.getFlowFileRepositoryPath();
+        this(nifiProperties.getFlowFileRepositoryPath());
+    }
 
+    public FileSystemSwapManager(final Path flowFileRepoPath) {
         this.storageDirectory = flowFileRepoPath.resolve("swap").toFile();
         if (!storageDirectory.exists() && !storageDirectory.mkdirs()) {
             throw new RuntimeException("Cannot create Swap Storage directory " 
+ storageDirectory.getAbsolutePath());
         }
     }
 
+
     @Override
     public synchronized void initialize(final SwapManagerInitializationContext 
initializationContext) {
         this.claimManager = initializationContext.getResourceClaimManager();
@@ -152,6 +158,16 @@ public class FileSystemSwapManager implements 
FlowFileSwapManager {
     @Override
     public SwapContents swapIn(final String swapLocation, final FlowFileQueue 
flowFileQueue) throws IOException {
         final File swapFile = new File(swapLocation);
+
+        final boolean validLocation = 
flowFileRepository.isValidSwapLocationSuffix(swapFile.getName());
+        if (!validLocation) {
+            warn("Cannot swap in FlowFiles from location " + swapLocation + " 
because the FlowFile Repository does not know about this Swap Location. " +
+                "This file should be manually removed. This typically occurs 
when a Swap File is written but the FlowFile Repository is not updated yet to 
reflect this. " +
+                "This is generally not a cause for concern, but may be 
indicative of a failure to update the FlowFile Repository.");
+            final SwapSummary swapSummary = new StandardSwapSummary(new 
QueueSize(0, 0), 0L, Collections.emptyList());
+            return new StandardSwapContents(swapSummary, 
Collections.emptyList());
+        }
+
         final SwapContents swapContents = peek(swapLocation, flowFileQueue);
         flowFileRepository.swapFlowFilesIn(swapFile.getAbsolutePath(), 
swapContents.getFlowFiles(), flowFileQueue);
 
@@ -311,7 +327,7 @@ public class FileSystemSwapManager implements 
FlowFileSwapManager {
             }
         }
 
-        Collections.sort(swapLocations, new SwapFileComparator());
+        swapLocations.sort(new SwapFileComparator());
         return swapLocations;
     }
 
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
index 058c714..df19f44 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
@@ -180,19 +180,23 @@ public class SwappablePriorityQueue {
         int flowFilesSwappedOut = 0;
         final List<String> swapLocations = new ArrayList<>(numSwapFiles);
         for (int i = 0; i < numSwapFiles; i++) {
+            long bytesSwappedThisIteration = 0L;
+
             // Create a new swap file for the next SWAP_RECORD_POLL_SIZE 
records
             final List<FlowFileRecord> toSwap = new 
ArrayList<>(SWAP_RECORD_POLL_SIZE);
             for (int j = 0; j < SWAP_RECORD_POLL_SIZE; j++) {
                 final FlowFileRecord flowFile = tempQueue.poll();
                 toSwap.add(flowFile);
-                bytesSwappedOut += flowFile.getSize();
-                flowFilesSwappedOut++;
+                bytesSwappedThisIteration += flowFile.getSize();
             }
 
             try {
                 Collections.reverse(toSwap); // currently ordered in reverse 
priority order based on the ordering of the temp queue.
                 final String swapLocation = swapManager.swapOut(toSwap, 
flowFileQueue, swapPartitionName);
                 swapLocations.add(swapLocation);
+
+                bytesSwappedOut += bytesSwappedThisIteration;
+                flowFilesSwappedOut += toSwap.size();
             } catch (final IOException ioe) {
                 tempQueue.addAll(toSwap); // if we failed, we must add the 
FlowFiles back to the queue.
 
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index cc3ac19..216449c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -161,7 +161,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
     // so that we are able to aggregate many into a single Fork Event.
     private final Map<FlowFile, ProvenanceEventBuilder> forkEventBuilders = 
new HashMap<>();
 
-    private Checkpoint checkpoint = new Checkpoint();
+    private Checkpoint checkpoint = null;
     private final ContentClaimWriteCache claimCache;
 
     public StandardProcessSession(final RepositoryContext context, final 
TaskTermination taskTermination) {
@@ -1489,7 +1489,18 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
     private void registerDequeuedRecord(final FlowFileRecord flowFile, final 
Connection connection) {
         final StandardRepositoryRecord record = new 
StandardRepositoryRecord(connection.getFlowFileQueue(), flowFile);
-        records.put(flowFile.getId(), record);
+
+        // Ensure that the checkpoint does not have a FlowFile with the same 
ID already. This should not occur,
+        // but this is a safety check just to make sure, because if it were to 
occur, and we did process the FlowFile,
+        // we would have a lot of problems, since the map is keyed off of the 
FlowFile ID.
+        if (this.checkpoint != null) {
+            final StandardRepositoryRecord checkpointedRecord = 
this.checkpoint.getRecord(flowFile);
+            handleConflictingId(flowFile, connection, checkpointedRecord);
+        }
+
+        final StandardRepositoryRecord existingRecord = 
records.putIfAbsent(flowFile.getId(), record);
+        handleConflictingId(flowFile, connection, existingRecord); // Ensure 
that we have no conflicts
+
         flowFilesIn++;
         contentSizeIn += flowFile.getSize();
 
@@ -1503,6 +1514,21 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         incrementConnectionOutputCounts(connection, flowFile);
     }
 
+    private void handleConflictingId(final FlowFileRecord flowFile, final 
Connection connection, final StandardRepositoryRecord conflict) {
+        if (conflict == null) {
+            // No conflict
+            return;
+        }
+
+        LOG.error("Attempted to pull {} from {} but the Session already has a 
FlowFile with the same ID ({}): {}, which was pulled from {}. This means that 
the system has two FlowFiles with the" +
+            " same ID, which should not happen.", flowFile, connection, 
flowFile.getId(), conflict.getCurrent(), conflict.getOriginalQueue());
+        connection.getFlowFileQueue().put(flowFile);
+
+        rollback(true, false);
+        throw new FlowFileAccessException("Attempted to pull a FlowFile with 
ID " + flowFile.getId() + " from Connection "
+            + connection + " but a FlowFile with that ID already exists in the 
session");
+    }
+
     @Override
     public void adjustCounter(final String name, final long delta, final 
boolean immediate) {
         verifyTaskActive();
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
index dee5346..da714a6 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
@@ -16,16 +16,16 @@
  */
 package org.apache.nifi.controller.repository;
 
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
 /**
  * <p>
  * An in-memory implementation of the {@link FlowFileRepository}. Upon 
restart, all FlowFiles will be discarded, including those that have been 
swapped out by a {@link FlowFileSwapManager}.
@@ -137,4 +137,8 @@ public class VolatileFlowFileRepository implements 
FlowFileRepository {
     public void swapFlowFilesOut(List<FlowFileRecord> swappedOut, 
FlowFileQueue queue, String swapLocation) throws IOException {
     }
 
+    @Override
+    public boolean isValidSwapLocationSuffix(final String swapLocationSuffix) {
+        return false;
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
index b5a61c6..d8e45f2 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
@@ -16,6 +16,19 @@
  */
 package org.apache.nifi.controller.repository;
 
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.wali.SequentialAccessWriteAheadLog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wali.MinimalLockingWriteAheadLog;
+import org.wali.SyncListener;
+import org.wali.WriteAheadRepository;
+
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
@@ -42,19 +55,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
-import org.apache.nifi.controller.queue.FlowFileQueue;
-import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ResourceClaim;
-import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-import org.apache.nifi.util.FormatUtils;
-import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.wali.SequentialAccessWriteAheadLog;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wali.MinimalLockingWriteAheadLog;
-import org.wali.SyncListener;
-import org.wali.WriteAheadRepository;
-
 /**
  * <p>
  * Implements FlowFile Repository using WALI as the backing store.
@@ -101,6 +101,8 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
     private final int numPartitions;
     private final ScheduledExecutorService checkpointExecutor;
 
+    private final Set<String> swapLocationSuffixes = new HashSet<>(); // 
guraded by synchronizing on object itself
+
     // effectively final
     private WriteAheadRepository<RepositoryRecord> wal;
     private RepositoryRecordSerdeFactory serdeFactory;
@@ -134,7 +136,7 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
      */
     public WriteAheadFlowFileRepository() {
         alwaysSync = false;
-        checkpointDelayMillis = 0l;
+        checkpointDelayMillis = 0L;
         numPartitions = 0;
         checkpointExecutor = null;
         walImplementation = null;
@@ -278,6 +280,13 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
         return !resourceClaim.isInUse();
     }
 
+    @Override
+    public boolean isValidSwapLocationSuffix(final String swapLocationSuffix) {
+        synchronized (swapLocationSuffixes) {
+            return swapLocationSuffixes.contains(swapLocationSuffix);
+        }
+    }
+
     private void updateRepository(final Collection<RepositoryRecord> records, 
final boolean sync) throws IOException {
         for (final RepositoryRecord record : records) {
             if (record.getType() != RepositoryRecordType.DELETE && 
record.getType() != RepositoryRecordType.CONTENTMISSING
@@ -308,6 +317,10 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
         // This does not, however, cause problems, as ContentRepository should 
handle this
         // This does indicate that some refactoring should probably be 
performed, though, as this is not a very clean interface.
         final Set<ResourceClaim> claimsToAdd = new HashSet<>();
+
+        final Set<String> swapLocationsAdded = new HashSet<>();
+        final Set<String> swapLocationsRemoved = new HashSet<>();
+
         for (final RepositoryRecord record : records) {
             if (record.getType() == RepositoryRecordType.DELETE) {
                 // For any DELETE record that we have, if claim is 
destructible, mark it so
@@ -324,6 +337,14 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
                 if (record.getOriginalClaim() != null && 
record.getCurrentClaim() != record.getOriginalClaim() && 
isDestructable(record.getOriginalClaim())) {
                     
claimsToAdd.add(record.getOriginalClaim().getResourceClaim());
                 }
+            } else if (record.getType() == RepositoryRecordType.SWAP_OUT) {
+                final String swapLocation = record.getSwapLocation();
+                swapLocationsAdded.add(swapLocation);
+                swapLocationsRemoved.remove(swapLocation);
+            } else if (record.getType() == RepositoryRecordType.SWAP_IN) {
+                final String swapLocation = record.getSwapLocation();
+                swapLocationsRemoved.add(swapLocation);
+                swapLocationsAdded.remove(swapLocation);
             }
 
             final List<ContentClaim> transientClaims = 
record.getTransientClaims();
@@ -336,6 +357,14 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
             }
         }
 
+        // If we have swapped files in or out, we need to ensure that we 
update our swapLocationSuffixes.
+        if (!swapLocationsAdded.isEmpty() || !swapLocationsRemoved.isEmpty()) {
+            synchronized (swapLocationSuffixes) {
+                swapLocationsRemoved.forEach(loc -> 
swapLocationSuffixes.remove(getLocationSuffix(loc)));
+                swapLocationsAdded.forEach(loc -> 
swapLocationSuffixes.add(getLocationSuffix(loc)));
+            }
+        }
+
         if (!claimsToAdd.isEmpty()) {
             // Get / Register a Set<ContentClaim> for the given Partiton Index
             final Integer partitionKey = Integer.valueOf(partitionIndex);
@@ -352,6 +381,20 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
         }
     }
 
+    protected static String getLocationSuffix(final String swapLocation) {
+        if (swapLocation == null) {
+            return null;
+        }
+
+        final String withoutTrailing = (swapLocation.endsWith("/") && 
swapLocation.length() > 1) ? swapLocation.substring(0, swapLocation.length() - 
1) : swapLocation;
+        final int lastIndex = withoutTrailing.lastIndexOf("/");
+        if (lastIndex < 0 || lastIndex >= withoutTrailing.length() - 1) {
+            return withoutTrailing;
+        }
+
+        return withoutTrailing.substring(lastIndex + 1);
+    }
+
     @Override
     public void onSync(final int partitionIndex) {
         final BlockingQueue<ResourceClaim> claimQueue = 
claimsAwaitingDestruction.get(Integer.valueOf(partitionIndex));
@@ -407,6 +450,10 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
         // update WALI to indicate that the records were swapped out.
         wal.update(repoRecords, true);
 
+        synchronized (this.swapLocationSuffixes) {
+            this.swapLocationSuffixes.add(getLocationSuffix(swapLocation));
+        }
+
         logger.info("Successfully swapped out {} FlowFiles from {} to Swap 
File {}", new Object[]{swappedOut.size(), queue, swapLocation});
     }
 
@@ -423,6 +470,11 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
         }
 
         updateRepository(repoRecords, true);
+
+        synchronized (this.swapLocationSuffixes) {
+            this.swapLocationSuffixes.add(getLocationSuffix(swapLocation));
+        }
+
         logger.info("Repository updated to reflect that {} FlowFiles were 
swapped in to {}", new Object[]{swapRecords.size(), queue});
     }
 
@@ -544,6 +596,12 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
         // Repo was written using that impl, that we properly recover from the 
implementation.
         Collection<RepositoryRecord> recordList = wal.recoverRecords();
 
+        final Set<String> recoveredSwapLocations = 
wal.getRecoveredSwapLocations();
+        synchronized (this.swapLocationSuffixes) {
+            recoveredSwapLocations.forEach(loc -> 
this.swapLocationSuffixes.add(getLocationSuffix(loc)));
+            logger.debug("Recovered {} Swap Files: {}", 
swapLocationSuffixes.size(), swapLocationSuffixes);
+        }
+
         // If we didn't recover any records from our write-ahead log, attempt 
to recover records from the other implementation
         // of the write-ahead log. We do this in case the user changed the 
"nifi.flowfile.repository.wal.impl" property.
         // In such a case, we still want to recover the records from the 
previous FlowFile Repository and write them into the new one.
@@ -591,7 +649,7 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
         // Set the AtomicLong to 1 more than the max ID so that calls to 
#getNextFlowFileSequence() will
         // return the appropriate number.
         flowFileSequenceGenerator.set(maxId + 1);
-        logger.info("Successfully restored {} FlowFiles", recordList.size() - 
numFlowFilesMissingQueue);
+        logger.info("Successfully restored {} FlowFiles and {} Swap Files", 
recordList.size() - numFlowFilesMissingQueue, recoveredSwapLocations.size());
         if (numFlowFilesMissingQueue > 0) {
             logger.warn("On recovery, found {} FlowFiles whose queue no longer 
exists. These FlowFiles will be dropped.", numFlowFilesMissingQueue);
         }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/MockSwapManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/MockSwapManager.java
index 33b71f0..a1206c7 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/MockSwapManager.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/MockSwapManager.java
@@ -17,16 +17,6 @@
 
 package org.apache.nifi.controller;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.stream.Collectors;
-
 import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.controller.repository.FlowFileRecord;
@@ -39,6 +29,16 @@ import 
org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.swap.StandardSwapContents;
 import org.apache.nifi.controller.swap.StandardSwapSummary;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
 public class MockSwapManager implements FlowFileSwapManager {
     public final Map<String, List<FlowFileRecord>> swappedOut = new 
HashMap<>();
     public int swapOutCalledCount = 0;
@@ -49,10 +49,22 @@ public class MockSwapManager implements FlowFileSwapManager 
{
     public int failSwapInAfterN = -1;
     public Throwable failSwapInFailure = null;
 
+    private int failSwapOutAfterN = -1;
+    private IOException failSwapOutFailure = null;
+
     public void setSwapInFailure(final Throwable t) {
         this.failSwapInFailure = t;
     }
 
+    public void setSwapOutFailureOnNthIteration(final int n) {
+        setSwapOutFailureOnNthIteration(n, null);
+    }
+
+    public void setSwapOutFailureOnNthIteration(final int n, final IOException 
failureException) {
+        this.failSwapOutAfterN = n;
+        this.failSwapOutFailure = failureException;
+    }
+
     @Override
     public void initialize(final SwapManagerInitializationContext 
initializationContext) {
 
@@ -65,6 +77,12 @@ public class MockSwapManager implements FlowFileSwapManager {
     @Override
     public String swapOut(List<FlowFileRecord> flowFiles, FlowFileQueue 
flowFileQueue, final String partitionName) throws IOException {
         swapOutCalledCount++;
+
+        if (failSwapOutAfterN > -1 && swapOutCalledCount >= failSwapOutAfterN) 
{
+            final IOException ioe = failSwapOutFailure == null ? new 
IOException("Intentional Unit Test IOException on swap out call number " + 
swapOutCalledCount) : failSwapOutFailure;
+            throw ioe;
+        }
+
         final String location = UUID.randomUUID().toString() + "." + 
partitionName;
         swappedOut.put(location, new ArrayList<>(flowFiles));
         return location;
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
index 46bea31..dd71f0e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
@@ -16,28 +16,37 @@
  */
 package org.apache.nifi.controller;
 
-import static org.junit.Assert.assertEquals;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.FlowFileRepository;
+import org.apache.nifi.controller.repository.SwapContents;
+import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
 
 import java.io.BufferedInputStream;
 import java.io.DataInputStream;
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.nifi.controller.queue.FlowFileQueue;
-import org.apache.nifi.controller.repository.FlowFileRecord;
-import org.apache.nifi.controller.repository.FlowFileRepository;
-import org.apache.nifi.controller.repository.SwapContents;
-import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
-import org.apache.nifi.controller.repository.claim.ResourceClaim;
-import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-import org.apache.nifi.events.EventReporter;
-import org.junit.Test;
-import org.mockito.Mockito;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.when;
 
 public class TestFileSystemSwapManager {
 
@@ -48,7 +57,7 @@ public class TestFileSystemSwapManager {
                 final DataInputStream in = new DataInputStream(new 
BufferedInputStream(fis))) {
 
             final FlowFileQueue flowFileQueue = 
Mockito.mock(FlowFileQueue.class);
-            
Mockito.when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
+            
when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
 
             final FileSystemSwapManager swapManager = createSwapManager();
             final SwapContents swapContents = 
swapManager.peek("src/test/resources/old-swap-file.swap", flowFileQueue);
@@ -63,11 +72,88 @@ public class TestFileSystemSwapManager {
         }
     }
 
+    @Test
+    public void testFailureOnRepoSwapOut() throws IOException {
+        final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
+        
when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
+
+        final FlowFileRepository flowFileRepo = 
Mockito.mock(FlowFileRepository.class);
+        Mockito.doThrow(new IOException("Intentional IOException for unit 
test"))
+            .when(flowFileRepo).updateRepository(anyCollection());
+
+        final FileSystemSwapManager swapManager = createSwapManager();
+
+        final List<FlowFileRecord> flowFileRecords = new ArrayList<>();
+        for (int i=0; i < 10000; i++) {
+            flowFileRecords.add(new MockFlowFileRecord(i));
+        }
+
+        try {
+            swapManager.swapOut(flowFileRecords, flowFileQueue, "partition-1");
+            Assert.fail("Expected IOException");
+        } catch (final IOException ioe) {
+            // expected
+        }
+    }
+
+    @Test
+    public void testSwapFileUnknownToRepoNotSwappedIn() throws IOException {
+        final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
+        when(flowFileQueue.getIdentifier()).thenReturn("");
+
+        final File targetDir = new File("target/swap");
+        targetDir.mkdirs();
+
+        final File targetFile = new File(targetDir, "444-old-swap-file.swap");
+        final File originalSwapFile = new 
File("src/test/resources/swap/444-old-swap-file.swap");
+        try (final OutputStream fos = new FileOutputStream(targetFile);
+             final InputStream fis = new FileInputStream(originalSwapFile)) {
+            StreamUtils.copy(fis, fos);
+        }
+
+        final FileSystemSwapManager swapManager = new 
FileSystemSwapManager(Paths.get("target"));
+        final ResourceClaimManager resourceClaimManager = new 
NopResourceClaimManager();
+        final FlowFileRepository flowFileRepo = 
Mockito.mock(FlowFileRepository.class);
+
+        swapManager.initialize(new SwapManagerInitializationContext() {
+            @Override
+            public ResourceClaimManager getResourceClaimManager() {
+                return resourceClaimManager;
+            }
+
+            @Override
+            public FlowFileRepository getFlowFileRepository() {
+                return flowFileRepo;
+            }
+
+            @Override
+            public EventReporter getEventReporter() {
+                return EventReporter.NO_OP;
+            }
+        });
+
+        
when(flowFileRepo.isValidSwapLocationSuffix(anyString())).thenReturn(false);
+        final List<String> recoveredLocations = 
swapManager.recoverSwapLocations(flowFileQueue, null);
+        assertEquals(1, recoveredLocations.size());
+
+        final String firstLocation = recoveredLocations.get(0);
+        final SwapContents emptyContents = swapManager.swapIn(firstLocation, 
flowFileQueue);
+        assertEquals(0, emptyContents.getFlowFiles().size());
+
+        
when(flowFileRepo.isValidSwapLocationSuffix(anyString())).thenReturn(true);
+        
when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
+        final SwapContents contents = swapManager.swapIn(firstLocation, 
flowFileQueue);
+        assertEquals(10000, contents.getFlowFiles().size());
+    }
 
     private FileSystemSwapManager createSwapManager() {
+        final FlowFileRepository flowFileRepo = 
Mockito.mock(FlowFileRepository.class);
+        return createSwapManager(flowFileRepo);
+    }
+
+    private FileSystemSwapManager createSwapManager(final FlowFileRepository 
flowFileRepo) {
         final FileSystemSwapManager swapManager = new FileSystemSwapManager();
         final ResourceClaimManager resourceClaimManager = new 
NopResourceClaimManager();
-        final FlowFileRepository flowfileRepo = 
Mockito.mock(FlowFileRepository.class);
         swapManager.initialize(new SwapManagerInitializationContext() {
             @Override
             public ResourceClaimManager getResourceClaimManager() {
@@ -76,7 +162,7 @@ public class TestFileSystemSwapManager {
 
             @Override
             public FlowFileRepository getFlowFileRepository() {
-                return flowfileRepo;
+                return flowFileRepo;
             }
 
             @Override
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java
index 71ad257..ef1a063 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java
@@ -17,21 +17,6 @@
 
 package org.apache.nifi.controller.queue.clustered;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
 import org.apache.nifi.controller.MockFlowFileRecord;
 import org.apache.nifi.controller.MockSwapManager;
 import org.apache.nifi.controller.queue.DropFlowFileAction;
@@ -42,16 +27,34 @@ import 
org.apache.nifi.controller.queue.SwappablePriorityQueue;
 import org.apache.nifi.controller.repository.FlowFileRecord;
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.util.MockFlowFile;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 public class TestSwappablePriorityQueue {
 
     private MockSwapManager swapManager;
-    private final EventReporter eventReporter = EventReporter.NO_OP;
+    private final List<String> events = new ArrayList<>();
+    private EventReporter eventReporter;
+
     private final FlowFileQueue flowFileQueue = 
Mockito.mock(FlowFileQueue.class);
     private final DropFlowFileAction dropAction = (flowFiles, requestor) -> {
         return new QueueSize(flowFiles.size(), 
flowFiles.stream().mapToLong(FlowFileRecord::getSize).sum());
@@ -63,12 +66,36 @@ public class TestSwappablePriorityQueue {
     public void setup() {
         swapManager = new MockSwapManager();
 
+        events.clear();
+        eventReporter = new EventReporter() {
+            @Override
+            public void reportEvent(final Severity severity, final String 
category, final String message) {
+                events.add(message);
+            }
+        };
+
         when(flowFileQueue.getIdentifier()).thenReturn("unit-test");
         queue = new SwappablePriorityQueue(swapManager, 10000, eventReporter, 
flowFileQueue, dropAction, "local");
     }
 
 
     @Test
+    public void testSwapOutFailureLeavesCorrectQueueSize() {
+        swapManager.setSwapOutFailureOnNthIteration(1, null);
+
+        for (int i = 0; i < 19999; i++) {
+            queue.put(new MockFlowFile(i));
+        }
+
+        assertEquals(19999, queue.size().getObjectCount());
+        assertEquals(0, events.size());
+
+        queue.put(new MockFlowFile(20000));
+        assertEquals(20000, queue.size().getObjectCount());
+        assertEquals(1, events.size()); // Expect a single failure event to be 
emitted
+    }
+
+    @Test
     public void testPrioritizer() {
         final FlowFilePrioritizer prioritizer = (o1, o2) -> 
Long.compare(o1.getId(), o2.getId());
         queue.setPriorities(Collections.singletonList(prioritizer));
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index efe2bd4..7cd2fd6 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -277,14 +277,13 @@ public class TestStandardProcessSession {
         connList.add(conn1);
         connList.add(conn2);
 
-        final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
+        final StandardFlowFileRecord.Builder flowFileRecordBuilder = new 
StandardFlowFileRecord.Builder()
             .id(1000L)
             .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-            .entryDate(System.currentTimeMillis())
-            .build();
+            .entryDate(System.currentTimeMillis());
 
-        flowFileQueue.put(flowFileRecord);
-        flowFileQueue.put(flowFileRecord);
+        flowFileQueue.put(flowFileRecordBuilder.build());
+        flowFileQueue.put(flowFileRecordBuilder.id(1001).build());
 
         when(connectable.getIncomingConnections()).thenReturn(connList);
 
@@ -296,6 +295,36 @@ public class TestStandardProcessSession {
     }
 
     @Test
+    public void testHandlingOfMultipleFlowFilesWithSameId() {
+        // Add two FlowFiles with the same ID
+        for (int i=0; i < 2; i++) {
+            final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
+                .id(1000L)
+                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+                .entryDate(System.currentTimeMillis())
+                .size(0L)
+                .build();
+
+            flowFileQueue.put(flowFileRecord);
+        }
+
+        final Relationship relationship = new 
Relationship.Builder().name("A").build();
+
+        FlowFile ff1 = session.get();
+        assertNotNull(ff1);
+
+        session.transfer(ff1, relationship);
+
+        try {
+            session.get();
+            Assert.fail("Should not have been able to poll second FlowFile 
with same ID");
+        } catch (final FlowFileAccessException e) {
+            // Expected
+        }
+    }
+
+
+    @Test
     public void testCloneOriginalDataSmaller() throws IOException {
         final byte[] originalContent = "hello".getBytes();
         final byte[] replacementContent = "NEW DATA".getBytes();
@@ -416,14 +445,14 @@ public class TestStandardProcessSession {
         connList.add(conn1);
         connList.add(conn2);
 
-        final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
+
+        final StandardFlowFileRecord.Builder flowFileRecord = new 
StandardFlowFileRecord.Builder()
             .id(1000L)
             .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-            .entryDate(System.currentTimeMillis())
-            .build();
+            .entryDate(System.currentTimeMillis());
 
-        flowFileQueue.put(flowFileRecord);
-        flowFileQueue.put(flowFileRecord);
+        flowFileQueue.put(flowFileRecord.build());
+        flowFileQueue.put(flowFileRecord.id(1001).build());
 
         when(connectable.getIncomingConnections()).thenReturn(connList);
 
@@ -475,14 +504,13 @@ public class TestStandardProcessSession {
         connList.add(conn1);
         connList.add(conn2);
 
-        final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
+        final StandardFlowFileRecord.Builder flowFileRecordBuilder = new 
StandardFlowFileRecord.Builder()
             .id(1000L)
             .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-            .entryDate(System.currentTimeMillis())
-            .build();
+            .entryDate(System.currentTimeMillis());
 
-        flowFileQueue.put(flowFileRecord);
-        flowFileQueue.put(flowFileRecord);
+        flowFileQueue.put(flowFileRecordBuilder.build());
+        flowFileQueue.put(flowFileRecordBuilder.id(10001L).build());
 
         when(connectable.getIncomingConnections()).thenReturn(connList);
 
@@ -1383,10 +1411,11 @@ public class TestStandardProcessSession {
     @Test
     public void 
testContentNotFoundExceptionThrownWhenUnableToReadDataStreamCallbackOffsetTooLarge()
 {
         final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
-                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-                .entryDate(System.currentTimeMillis())
+            .id(1)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
             .contentClaim(new 
StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, 
false), 0L))
-                .build();
+            .build();
         flowFileQueue.put(flowFileRecord);
 
         FlowFile ff1 = session.get();
@@ -1399,12 +1428,13 @@ public class TestStandardProcessSession {
         session.commit();
 
         final FlowFileRecord flowFileRecord2 = new 
StandardFlowFileRecord.Builder()
-                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-                .entryDate(System.currentTimeMillis())
+            .id(2)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
             .contentClaim(new 
StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, 
false), 0L))
-                .contentClaimOffset(1000L)
-                .size(1000L)
-                .build();
+            .contentClaimOffset(1000L)
+            .size(1000L)
+            .build();
         flowFileQueue.put(flowFileRecord2);
 
         // attempt to read the data.
@@ -1424,10 +1454,11 @@ public class TestStandardProcessSession {
     @Test
     public void 
testContentNotFoundExceptionThrownWhenUnableToReadDataOffsetTooLarge() {
         final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
-                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-                .entryDate(System.currentTimeMillis())
+            .id(1)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
             .contentClaim(new 
StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, 
false), 0L))
-                .build();
+            .build();
 
         flowFileQueue.put(flowFileRecord);
 
@@ -1441,10 +1472,11 @@ public class TestStandardProcessSession {
         session.commit();
 
         final FlowFileRecord flowFileRecord2 = new 
StandardFlowFileRecord.Builder()
-                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-                .entryDate(System.currentTimeMillis())
+            .id(2)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
             .contentClaim(new 
StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, 
false), 0L))
-                .contentClaimOffset(1000L).size(1L).build();
+            .contentClaimOffset(1000L).size(1L).build();
         flowFileQueue.put(flowFileRecord2);
 
         // attempt to read the data.
@@ -1544,11 +1576,15 @@ public class TestStandardProcessSession {
 
     @Test
     public void testRollbackAfterCheckpoint() {
-        final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
-                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-                .entryDate(System.currentTimeMillis())
+        final StandardFlowFileRecord.Builder recordBuilder = new 
StandardFlowFileRecord.Builder()
+            .id(1)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
             .contentClaim(new 
StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, 
false), 0L))
-                .contentClaimOffset(0L).size(0L).build();
+            .contentClaimOffset(0L)
+            .size(0L);
+
+        final FlowFileRecord flowFileRecord = recordBuilder.build();
         flowFileQueue.put(flowFileRecord);
 
         final FlowFile originalFlowFile = session.get();
@@ -1574,7 +1610,7 @@ public class TestStandardProcessSession {
 
         session.rollback();
 
-        flowFileQueue.put(flowFileRecord);
+        flowFileQueue.put(recordBuilder.id(2).build());
         assertFalse(flowFileQueue.isActiveQueueEmpty());
 
         final FlowFile originalRound2 = session.get();
@@ -1596,8 +1632,8 @@ public class TestStandardProcessSession {
 
         session.commit();
 
-        // FlowFile transferred back to queue
-        assertEquals(1, flowFileQueue.size().getObjectCount());
+        // FlowFiles transferred back to queue
+        assertEquals(2, flowFileQueue.size().getObjectCount());
         assertFalse(flowFileQueue.isUnacknowledgedFlowFile());
         assertFalse(flowFileQueue.isActiveQueueEmpty());
     }
@@ -2116,6 +2152,11 @@ public class TestStandardProcessSession {
         @Override
         public void initialize(ResourceClaimManager claimManager) throws 
IOException {
         }
+
+        @Override
+        public boolean isValidSwapLocationSuffix(final String 
swapLocationSuffix) {
+            return false;
+        }
     }
 
     private static class MockContentRepository implements ContentRepository {
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
index a3ee5c1..1761bd8 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
@@ -70,6 +70,7 @@ import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
@@ -396,6 +397,101 @@ public class TestWriteAheadFlowFileRepository {
     }
 
 
+    @Test
+    public void testGetLocationSuffix() {
+        assertEquals("/", WriteAheadFlowFileRepository.getLocationSuffix("/"));
+        assertEquals("", WriteAheadFlowFileRepository.getLocationSuffix(""));
+        assertEquals(null, 
WriteAheadFlowFileRepository.getLocationSuffix(null));
+        assertEquals("test.txt", 
WriteAheadFlowFileRepository.getLocationSuffix("test.txt"));
+        assertEquals("test.txt", 
WriteAheadFlowFileRepository.getLocationSuffix("/test.txt"));
+        assertEquals("test.txt", 
WriteAheadFlowFileRepository.getLocationSuffix("/tmp/test.txt"));
+        assertEquals("test.txt", 
WriteAheadFlowFileRepository.getLocationSuffix("//test.txt"));
+        assertEquals("test.txt", 
WriteAheadFlowFileRepository.getLocationSuffix("/path/to/other/file/repository/test.txt"));
+        assertEquals("test.txt", 
WriteAheadFlowFileRepository.getLocationSuffix("test.txt/"));
+        assertEquals("test.txt", 
WriteAheadFlowFileRepository.getLocationSuffix("/path/to/test.txt/"));
+    }
+
+    @Test
+    public void testSwapLocationsRestored() throws IOException {
+        final Path path = Paths.get("target/test-swap-repo");
+        if (Files.exists(path)) {
+            FileUtils.deleteFile(path.toFile(), true);
+        }
+
+        final WriteAheadFlowFileRepository repo = new 
WriteAheadFlowFileRepository(NiFiProperties.createBasicNiFiProperties(null, 
null));
+        repo.initialize(new StandardResourceClaimManager());
+
+        final TestQueueProvider queueProvider = new TestQueueProvider();
+        repo.loadFlowFiles(queueProvider, 0L);
+
+        final Connection connection = Mockito.mock(Connection.class);
+        when(connection.getIdentifier()).thenReturn("1234");
+
+        final FlowFileQueue queue = Mockito.mock(FlowFileQueue.class);
+        when(queue.getIdentifier()).thenReturn("1234");
+        when(connection.getFlowFileQueue()).thenReturn(queue);
+
+        queueProvider.addConnection(connection);
+
+        StandardFlowFileRecord.Builder ffBuilder = new 
StandardFlowFileRecord.Builder();
+        ffBuilder.id(1L);
+        ffBuilder.size(0L);
+        final FlowFileRecord flowFileRecord = ffBuilder.build();
+
+        final List<RepositoryRecord> records = new ArrayList<>();
+        final StandardRepositoryRecord record = new 
StandardRepositoryRecord(queue, flowFileRecord, "swap123");
+        record.setDestination(queue);
+        records.add(record);
+
+        repo.updateRepository(records);
+        repo.close();
+
+        // restore
+        final WriteAheadFlowFileRepository repo2 = new 
WriteAheadFlowFileRepository(NiFiProperties.createBasicNiFiProperties(null, 
null));
+        repo2.initialize(new StandardResourceClaimManager());
+        repo2.loadFlowFiles(queueProvider, 0L);
+        assertTrue(repo2.isValidSwapLocationSuffix("swap123"));
+        assertFalse(repo2.isValidSwapLocationSuffix("other"));
+        repo2.close();
+    }
+
+    @Test
+    public void testSwapLocationsUpdatedOnRepoUpdate() throws IOException {
+        final Path path = Paths.get("target/test-swap-repo");
+        if (Files.exists(path)) {
+            FileUtils.deleteFile(path.toFile(), true);
+        }
+
+        final WriteAheadFlowFileRepository repo = new 
WriteAheadFlowFileRepository(NiFiProperties.createBasicNiFiProperties(null, 
null));
+        repo.initialize(new StandardResourceClaimManager());
+
+        final TestQueueProvider queueProvider = new TestQueueProvider();
+        repo.loadFlowFiles(queueProvider, 0L);
+
+        final Connection connection = Mockito.mock(Connection.class);
+        when(connection.getIdentifier()).thenReturn("1234");
+
+        final FlowFileQueue queue = Mockito.mock(FlowFileQueue.class);
+        when(queue.getIdentifier()).thenReturn("1234");
+        when(connection.getFlowFileQueue()).thenReturn(queue);
+
+        queueProvider.addConnection(connection);
+
+        StandardFlowFileRecord.Builder ffBuilder = new 
StandardFlowFileRecord.Builder();
+        ffBuilder.id(1L);
+        ffBuilder.size(0L);
+        final FlowFileRecord flowFileRecord = ffBuilder.build();
+
+        final List<RepositoryRecord> records = new ArrayList<>();
+        final StandardRepositoryRecord record = new 
StandardRepositoryRecord(queue, flowFileRecord, "/tmp/swap123");
+        record.setDestination(queue);
+        records.add(record);
+
+        assertFalse(repo.isValidSwapLocationSuffix("swap123"));
+        repo.updateRepository(records);
+        assertTrue(repo.isValidSwapLocationSuffix("swap123"));
+        repo.close();
+    }
 
     @Test
     public void testResourceClaimsIncremented() throws IOException {
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/swap/444-old-swap-file.swap
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/swap/444-old-swap-file.swap
new file mode 100755
index 0000000..0176ed9
Binary files /dev/null and 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/swap/444-old-swap-file.swap
 differ

Reply via email to