Repository: nifi
Updated Branches:
  refs/heads/master a54962126 -> dbf0c7893


http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
index c829566..7ab56ed 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
@@ -30,46 +30,29 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.file.Path;
 import java.util.ArrayList;
-import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
 import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
 
-import org.apache.nifi.controller.repository.ConnectionSwapInfo;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.controller.repository.FlowFileRecord;
 import org.apache.nifi.controller.repository.FlowFileRepository;
 import org.apache.nifi.controller.repository.FlowFileSwapManager;
-import org.apache.nifi.controller.repository.QueueProvider;
 import org.apache.nifi.controller.repository.StandardFlowFileRecord;
+import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 import org.apache.nifi.controller.repository.claim.StandardContentClaim;
-import org.apache.nifi.engine.FlowEngine;
 import org.apache.nifi.events.EventReporter;
-import org.apache.nifi.stream.io.BufferedOutputStream;
-import org.apache.nifi.processor.QueueSize;
 import org.apache.nifi.reporting.Severity;
-import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.stream.io.BufferedOutputStream;
 import org.apache.nifi.util.NiFiProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -85,24 +68,17 @@ public class FileSystemSwapManager implements 
FlowFileSwapManager {
     private static final Pattern SWAP_FILE_PATTERN = 
Pattern.compile("\\d+-.+\\.swap");
     private static final Pattern TEMP_SWAP_FILE_PATTERN = 
Pattern.compile("\\d+-.+\\.swap\\.part");
 
-    public static final int SWAP_ENCODING_VERSION = 7;
+    public static final int SWAP_ENCODING_VERSION = 8;
     public static final String EVENT_CATEGORY = "Swap FlowFiles";
+    private static final Logger logger = 
LoggerFactory.getLogger(FileSystemSwapManager.class);
 
-    private final ScheduledExecutorService swapQueueIdentifierExecutor;
-    private final ScheduledExecutorService swapInExecutor;
-    private volatile FlowFileRepository flowFileRepository;
-    private volatile EventReporter eventReporter;
-
-    // Maintains a mapping of FlowFile Queue to the a QueueLockWrapper, which 
provides queue locking and necessary state for swapping back in
-    private final ConcurrentMap<FlowFileQueue, QueueLockWrapper> swapMap = new 
ConcurrentHashMap<>();
     private final File storageDirectory;
-    private final long swapInMillis;
-    private final long swapOutMillis;
-    private final int swapOutThreadCount;
 
-    private ResourceClaimManager claimManager; // effectively final
+    // effectively final
+    private FlowFileRepository flowFileRepository;
+    private EventReporter eventReporter;
+    private ResourceClaimManager claimManager;
 
-    private static final Logger logger = 
LoggerFactory.getLogger(FileSystemSwapManager.class);
 
     public FileSystemSwapManager() {
         final NiFiProperties properties = NiFiProperties.getInstance();
@@ -112,42 +88,240 @@ public class FileSystemSwapManager implements 
FlowFileSwapManager {
         if (!storageDirectory.exists() && !storageDirectory.mkdirs()) {
             throw new RuntimeException("Cannot create Swap Storage directory " 
+ storageDirectory.getAbsolutePath());
         }
+    }
 
-        swapQueueIdentifierExecutor = new FlowEngine(1, "Identifies Queues for 
FlowFile Swapping");
 
-        swapInMillis = 
FormatUtils.getTimeDuration(properties.getSwapInPeriod(), 
TimeUnit.MILLISECONDS);
-        swapOutMillis = 
FormatUtils.getTimeDuration(properties.getSwapOutPeriod(), 
TimeUnit.MILLISECONDS);
-        swapOutThreadCount = properties.getSwapOutThreads();
-        swapInExecutor = new FlowEngine(properties.getSwapInThreads(), "Swap 
In FlowFiles");
+    @Override
+    public synchronized void initialize(final SwapManagerInitializationContext 
initializationContext) {
+        this.claimManager = initializationContext.getResourceClaimManager();
+        this.eventReporter = initializationContext.getEventReporter();
+        this.flowFileRepository = 
initializationContext.getFlowFileRepository();
     }
 
     @Override
+    public String swapOut(final List<FlowFileRecord> toSwap, final 
FlowFileQueue flowFileQueue) throws IOException {
+        if (toSwap == null || toSwap.isEmpty()) {
+            return null;
+        }
+
+        final File swapFile = new File(storageDirectory, 
System.currentTimeMillis() + "-" + flowFileQueue.getIdentifier() + "-" + 
UUID.randomUUID().toString() + ".swap");
+        final File swapTempFile = new File(swapFile.getParentFile(), 
swapFile.getName() + ".part");
+        final String swapLocation = swapFile.getAbsolutePath();
+
+        try (final FileOutputStream fos = new FileOutputStream(swapTempFile)) {
+            serializeFlowFiles(toSwap, flowFileQueue, swapLocation, fos);
+            fos.getFD().sync();
+        }
+
+        if (swapTempFile.renameTo(swapFile)) {
+            flowFileRepository.swapFlowFilesOut(toSwap, flowFileQueue, 
swapLocation);
+        } else {
+            error("Failed to swap out FlowFiles from " + flowFileQueue + " due 
to: Unable to rename swap file from " + swapTempFile + " to " + swapFile);
+        }
+
+        return swapLocation;
+    }
+
+
+    @Override
+    public List<FlowFileRecord> swapIn(final String swapLocation, final 
FlowFileQueue flowFileQueue) throws IOException {
+        final File swapFile = new File(swapLocation);
+        final List<FlowFileRecord> swappedFlowFiles = peek(swapLocation, 
flowFileQueue);
+        flowFileRepository.swapFlowFilesIn(swapFile.getAbsolutePath(), 
swappedFlowFiles, flowFileQueue);
+
+        if (!swapFile.delete()) {
+            warn("Swapped in FlowFiles from file " + 
swapFile.getAbsolutePath() + " but failed to delete the file; this file should 
be cleaned up manually");
+        }
+
+        // TODO: When FlowFile Queue performs this operation, it needs to take 
the following error handling logic into account:
+
+        /*
+         * } catch (final EOFException eof) {
+         * error("Failed to Swap In FlowFiles for " + flowFileQueue + " due 
to: Corrupt Swap File; will remove this Swap File: " + swapFile);
+         *
+         * if (!swapFile.delete()) {
+         * warn("Failed to remove corrupt Swap File " + swapFile + "; This 
file should be cleaned up manually");
+         * }
+         * } catch (final FileNotFoundException fnfe) {
+         * error("Failed to Swap In FlowFiles for " + flowFileQueue + " due 
to: Could not find Swap File " + swapFile);
+         * } catch (final Exception e) {
+         * error("Failed to Swap In FlowFiles for " + flowFileQueue + " due to 
" + e, e);
+         *
+         * if (swapFile != null) {
+         * queue.add(swapFile);
+         * }
+         * }
+         */
+        return swappedFlowFiles;
+    }
+
+    @Override
+    public List<FlowFileRecord> peek(final String swapLocation, final 
FlowFileQueue flowFileQueue) throws IOException {
+        final File swapFile = new File(swapLocation);
+        if (!swapFile.exists()) {
+            throw new FileNotFoundException("Failed to swap in FlowFiles from 
external storage location " + swapLocation + " into FlowFile Queue because the 
file could not be found");
+        }
+
+        final List<FlowFileRecord> swappedFlowFiles;
+        try (final InputStream fis = new FileInputStream(swapFile);
+            final DataInputStream in = new DataInputStream(fis)) {
+            swappedFlowFiles = deserializeFlowFiles(in, flowFileQueue, 
swapLocation, claimManager);
+        }
+
+        return swappedFlowFiles;
+    }
+
+
+    @Override
     public void purge() {
         final File[] swapFiles = storageDirectory.listFiles(new 
FilenameFilter() {
             @Override
             public boolean accept(final File dir, final String name) {
-                return SWAP_FILE_PATTERN.matcher(name).matches();
+                return SWAP_FILE_PATTERN.matcher(name).matches() || 
TEMP_SWAP_FILE_PATTERN.matcher(name).matches();
+            }
+        });
+
+        for (final File file : swapFiles) {
+            if (!file.delete()) {
+                warn("Failed to delete Swap File " + file + " when purging 
FlowFile Swap Manager");
+            }
+        }
+    }
+
+    @Override
+    public List<String> recoverSwapLocations(final FlowFileQueue 
flowFileQueue) throws IOException {
+        final File[] swapFiles = storageDirectory.listFiles(new 
FilenameFilter() {
+            @Override
+            public boolean accept(final File dir, final String name) {
+                return SWAP_FILE_PATTERN.matcher(name).matches() || 
TEMP_SWAP_FILE_PATTERN.matcher(name).matches();
             }
         });
 
-        if (swapFiles != null) {
-            for (final File file : swapFiles) {
-                if (!file.delete() && file.exists()) {
-                    logger.warn("Failed to delete SWAP file {}", file);
+        if (swapFiles == null) {
+            return Collections.emptyList();
+        }
+
+        final List<String> swapLocations = new ArrayList<>();
+        // remove in .part files, as they are partial swap files that did not 
get written fully.
+        for (final File swapFile : swapFiles) {
+            if (TEMP_SWAP_FILE_PATTERN.matcher(swapFile.getName()).matches()) {
+                if (swapFile.delete()) {
+                    logger.info("Removed incomplete/temporary Swap File " + 
swapFile);
+                } else {
+                    warn("Failed to remove incomplete/temporary Swap File " + 
swapFile + "; this file should be cleaned up manually");
                 }
+
+                continue;
             }
+
+            // split the filename by dashes. The old filenaming scheme was 
"<timestamp>-<randomuuid>.swap" but the new naming scheme is
+            // "<timestamp>-<queue identifier>-<random uuid>.swap". If we have 
two dashes, then we can just check if the queue ID is equal
+            // to the id of the queue given and if not we can just move on.
+            final String[] splits = swapFile.getName().split("-");
+            if (splits.length == 3) {
+                final String queueIdentifier = splits[1];
+                if (!queueIdentifier.equals(flowFileQueue.getIdentifier())) {
+                    continue;
+                }
+            }
+
+            // Read the queue identifier from the swap file to check if the 
swap file is for this queue
+            try (final InputStream fis = new FileInputStream(swapFile);
+                final InputStream bufferedIn = new BufferedInputStream(fis);
+                final DataInputStream in = new DataInputStream(bufferedIn)) {
+
+                final int swapEncodingVersion = in.readInt();
+                if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
+                    final String errMsg = "Cannot swap FlowFiles in from " + 
swapFile + " because the encoding version is "
+                        + swapEncodingVersion + ", which is too new (expecting 
" + SWAP_ENCODING_VERSION + " or less)";
+
+                    eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, 
errMsg);
+                    throw new IOException(errMsg);
+                }
+
+                final String connectionId = in.readUTF();
+                if (connectionId.equals(flowFileQueue.getIdentifier())) {
+                    swapLocations.add(swapFile.getAbsolutePath());
+                }
+            }
+        }
+
+        Collections.sort(swapLocations, new SwapFileComparator());
+        return swapLocations;
+    }
+
+    @Override
+    public QueueSize getSwapSize(final String swapLocation) throws IOException 
{
+        final File swapFile = new File(swapLocation);
+
+        // read record from disk via the swap file
+        try (final InputStream fis = new FileInputStream(swapFile);
+            final InputStream bufferedIn = new BufferedInputStream(fis);
+            final DataInputStream in = new DataInputStream(bufferedIn)) {
+
+            final int swapEncodingVersion = in.readInt();
+            if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
+                final String errMsg = "Cannot swap FlowFiles in from " + 
swapFile + " because the encoding version is "
+                    + swapEncodingVersion + ", which is too new (expecting " + 
SWAP_ENCODING_VERSION + " or less)";
+
+                eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, 
errMsg);
+                throw new IOException(errMsg);
+            }
+
+            in.readUTF(); // ignore Connection ID
+            final int numRecords = in.readInt();
+            final long contentSize = in.readLong();
+
+            return new QueueSize(numRecords, contentSize);
         }
     }
 
     @Override
-    public synchronized void start(final FlowFileRepository 
flowFileRepository, final QueueProvider connectionProvider, final 
ResourceClaimManager claimManager, final EventReporter eventReporter) {
-        this.claimManager = claimManager;
-        this.flowFileRepository = flowFileRepository;
-        this.eventReporter = eventReporter;
-        swapQueueIdentifierExecutor.scheduleWithFixedDelay(new 
QueueIdentifier(connectionProvider), swapOutMillis, swapOutMillis, 
TimeUnit.MILLISECONDS);
-        swapInExecutor.scheduleWithFixedDelay(new SwapInTask(), swapInMillis, 
swapInMillis, TimeUnit.MILLISECONDS);
+    public Long getMaxRecordId(final String swapLocation) throws IOException {
+        final File swapFile = new File(swapLocation);
+
+        // read record from disk via the swap file
+        try (final InputStream fis = new FileInputStream(swapFile);
+            final InputStream bufferedIn = new BufferedInputStream(fis);
+            final DataInputStream in = new DataInputStream(bufferedIn)) {
+
+            final int swapEncodingVersion = in.readInt();
+            if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
+                final String errMsg = "Cannot swap FlowFiles in from " + 
swapFile + " because the encoding version is "
+                    + swapEncodingVersion + ", which is too new (expecting " + 
SWAP_ENCODING_VERSION + " or less)";
+
+                eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, 
errMsg);
+                throw new IOException(errMsg);
+            }
+
+            in.readUTF(); // ignore connection id
+            final int numRecords = in.readInt();
+            in.readLong(); // ignore content size
+
+            if (numRecords == 0) {
+                return null;
+            }
+
+            if (swapEncodingVersion > 7) {
+                final long maxRecordId = in.readLong();
+                return maxRecordId;
+            }
+
+            // Before swap encoding version 8, we did not write out the max 
record id, so we have to read all
+            // swap files to determine the max record id
+            final List<FlowFileRecord> records = deserializeFlowFiles(in, 
numRecords, swapEncodingVersion, true, claimManager);
+            long maxId = 0L;
+            for (final FlowFileRecord record : records) {
+                if (record.getId() > maxId) {
+                    maxId = record.getId();
+                }
+            }
+
+            return maxId;
+        }
     }
 
+
     public int serializeFlowFiles(final List<FlowFileRecord> toSwap, final 
FlowFileQueue queue, final String swapLocation, final OutputStream destination) 
throws IOException {
         if (toSwap == null || toSwap.isEmpty()) {
             return 0;
@@ -167,6 +341,16 @@ public class FileSystemSwapManager implements 
FlowFileSwapManager {
             out.writeInt(toSwap.size());
             out.writeLong(contentSize);
 
+            // get the max record id and write that out so that we know it 
quickly for restoration
+            long maxRecordId = 0L;
+            for (final FlowFileRecord flowFile : toSwap) {
+                if (flowFile.getId() > maxRecordId) {
+                    maxRecordId = flowFile.getId();
+                }
+            }
+
+            out.writeLong(maxRecordId);
+
             for (final FlowFileRecord flowFile : toSwap) {
                 out.writeLong(flowFile.getId());
                 out.writeLong(flowFile.getEntryDate());
@@ -207,7 +391,7 @@ public class FileSystemSwapManager implements 
FlowFileSwapManager {
             out.flush();
         }
 
-        logger.info("Successfully swapped out {} FlowFiles from {} to Swap 
File {}", new Object[]{toSwap.size(), queue, swapLocation});
+        logger.info("Successfully swapped out {} FlowFiles from {} to Swap 
File {}", new Object[] {toSwap.size(), queue, swapLocation});
 
         return toSwap.size();
     }
@@ -231,26 +415,27 @@ public class FileSystemSwapManager implements 
FlowFileSwapManager {
         }
     }
 
-    static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, 
final FlowFileQueue queue, final ResourceClaimManager claimManager) throws 
IOException {
+    static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, 
final FlowFileQueue queue, final String swapLocation, final 
ResourceClaimManager claimManager) throws IOException {
         final int swapEncodingVersion = in.readInt();
         if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
             throw new IOException("Cannot swap FlowFiles in from SwapFile 
because the encoding version is "
-                    + swapEncodingVersion + ", which is too new (expecting " + 
SWAP_ENCODING_VERSION + " or less)");
+                + swapEncodingVersion + ", which is too new (expecting " + 
SWAP_ENCODING_VERSION + " or less)");
         }
 
         final String connectionId = in.readUTF();
         if (!connectionId.equals(queue.getIdentifier())) {
-            throw new IllegalArgumentException("Cannot restore Swap File 
because the file indicates that records belong to Connection with ID " + 
connectionId + " but received Connection " + queue);
+            throw new IllegalArgumentException("Cannot restore contents from 
FlowFile Swap File " + swapLocation +
+                " because the file indicates that records belong to Connection 
with ID " + connectionId + " but attempted to swap those records into " + 
queue);
         }
 
         final int numRecords = in.readInt();
-        in.readLong();  // Content Size
+        in.readLong(); // Content Size
 
-        return deserializeFlowFiles(in, numRecords, queue, 
swapEncodingVersion, false, claimManager);
+        return deserializeFlowFiles(in, numRecords, swapEncodingVersion, 
false, claimManager);
     }
 
-    static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, 
final int numFlowFiles, final FlowFileQueue queue,
-            final int serializationVersion, final boolean 
incrementContentClaims, final ResourceClaimManager claimManager) throws 
IOException {
+    static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, 
final int numFlowFiles,
+        final int serializationVersion, final boolean incrementContentClaims, 
final ResourceClaimManager claimManager) throws IOException {
         final List<FlowFileRecord> flowFiles = new ArrayList<>();
         for (int i = 0; i < numFlowFiles; i++) {
             // legacy encoding had an "action" because it used to be couple 
with FlowFile Repository code
@@ -395,109 +580,6 @@ public class FileSystemSwapManager implements 
FlowFileSwapManager {
         }
     }
 
-    private class QueueIdentifier implements Runnable {
-
-        private final QueueProvider connectionProvider;
-
-        public QueueIdentifier(final QueueProvider connectionProvider) {
-            this.connectionProvider = connectionProvider;
-        }
-
-        @Override
-        public void run() {
-            final Collection<FlowFileQueue> allQueues = 
connectionProvider.getAllQueues();
-            final BlockingQueue<FlowFileQueue> connectionQueue = new 
LinkedBlockingQueue<>(allQueues);
-
-            final ThreadFactory threadFactory = new ThreadFactory() {
-                @Override
-                public Thread newThread(final Runnable r) {
-                    final Thread t = new Thread(r);
-                    t.setName("Swap Out FlowFiles");
-                    return t;
-                }
-            };
-
-            final ExecutorService workerExecutor = 
Executors.newFixedThreadPool(swapOutThreadCount, threadFactory);
-            for (int i = 0; i < swapOutThreadCount; i++) {
-                workerExecutor.submit(new SwapOutTask(connectionQueue));
-            }
-
-            workerExecutor.shutdown();
-
-            try {
-                workerExecutor.awaitTermination(10, TimeUnit.MINUTES);
-            } catch (final InterruptedException e) {
-                // oh well...
-            }
-        }
-    }
-
-    private class SwapInTask implements Runnable {
-
-        @Override
-        public void run() {
-            for (final Map.Entry<FlowFileQueue, QueueLockWrapper> entry : 
swapMap.entrySet()) {
-                final FlowFileQueue flowFileQueue = entry.getKey();
-
-                // if queue is more than 60% of its swap threshold, don't swap 
flowfiles in
-                if (flowFileQueue.unswappedSize() >= 
flowFileQueue.getSwapThreshold() * 0.6F) {
-                    continue;
-                }
-
-                final QueueLockWrapper queueLockWrapper = entry.getValue();
-                if (queueLockWrapper.getLock().tryLock()) {
-                    try {
-                        final Queue<File> queue = queueLockWrapper.getQueue();
-
-                        // Swap FlowFiles in until we hit 90% of the 
threshold, or until we're out of files.
-                        while (flowFileQueue.unswappedSize() < 
flowFileQueue.getSwapThreshold() * 0.9F) {
-                            File swapFile = null;
-                            try {
-                                swapFile = queue.poll();
-                                if (swapFile == null) {
-                                    break;
-                                }
-
-                                try (final InputStream fis = new 
FileInputStream(swapFile);
-                                        final DataInputStream in = new 
DataInputStream(fis)) {
-                                    final List<FlowFileRecord> 
swappedFlowFiles = deserializeFlowFiles(in, flowFileQueue, claimManager);
-                                    
flowFileRepository.swapFlowFilesIn(swapFile.getAbsolutePath(), 
swappedFlowFiles, flowFileQueue);
-                                    
flowFileQueue.putSwappedRecords(swappedFlowFiles);
-                                }
-
-                                if (!swapFile.delete()) {
-                                    warn("Swapped in FlowFiles from file " + 
swapFile.getAbsolutePath() + " but failed to delete the file; this file should 
be cleaned up manually");
-                                }
-                            } catch (final EOFException eof) {
-                                error("Failed to Swap In FlowFiles for " + 
flowFileQueue + " due to: Corrupt Swap File; will remove this Swap File: " + 
swapFile);
-
-                                if (!swapFile.delete()) {
-                                    warn("Failed to remove corrupt Swap File " 
+ swapFile + "; This file should be cleaned up manually");
-                                }
-                            } catch (final FileNotFoundException fnfe) {
-                                error("Failed to Swap In FlowFiles for " + 
flowFileQueue + " due to: Could not find Swap File " + swapFile);
-                            } catch (final Exception e) {
-                                error("Failed to Swap In FlowFiles for " + 
flowFileQueue + " due to " + e, e);
-
-                                if (swapFile != null) {
-                                    queue.add(swapFile);
-                                }
-                            }
-                        }
-                    } finally {
-                        queueLockWrapper.getLock().unlock();
-                    }
-                }
-            }
-        }
-    }
-
-    private void error(final String error, final Throwable t) {
-        error(error);
-        if (logger.isDebugEnabled()) {
-            logger.error("", t);
-        }
-    }
 
     private void error(final String error) {
         logger.error(error);
@@ -513,199 +595,9 @@ public class FileSystemSwapManager implements 
FlowFileSwapManager {
         }
     }
 
-    private class SwapOutTask implements Runnable {
-
-        private final BlockingQueue<FlowFileQueue> connectionQueue;
-
-        public SwapOutTask(final BlockingQueue<FlowFileQueue> connectionQueue) 
{
-            this.connectionQueue = connectionQueue;
-        }
-
-        @Override
-        public void run() {
-            while (true) {
-                final FlowFileQueue flowFileQueue = connectionQueue.poll();
-                if (flowFileQueue == null) {
-                    logger.debug("No more FlowFile Queues to Swap Out");
-                    return;
-                }
-
-                if (logger.isDebugEnabled()) {
-                    logger.debug("{} has {} FlowFiles to swap out", 
flowFileQueue, flowFileQueue.getSwapQueueSize());
-                }
-
-                while (flowFileQueue.getSwapQueueSize() >= MINIMUM_SWAP_COUNT) 
{
-                    final File swapFile = new File(storageDirectory, 
System.currentTimeMillis() + "-" + UUID.randomUUID().toString() + ".swap");
-                    final File swapTempFile = new 
File(swapFile.getParentFile(), swapFile.getName() + ".part");
-                    final String swapLocation = swapFile.getAbsolutePath();
-                    final List<FlowFileRecord> toSwap = 
flowFileQueue.pollSwappableRecords();
-
-                    int recordsSwapped;
-                    try {
-                        try (final FileOutputStream fos = new 
FileOutputStream(swapTempFile)) {
-                            recordsSwapped = serializeFlowFiles(toSwap, 
flowFileQueue, swapLocation, fos);
-                            fos.getFD().sync();
-                        }
-
-                        if (swapTempFile.renameTo(swapFile)) {
-                            flowFileRepository.swapFlowFilesOut(toSwap, 
flowFileQueue, swapLocation);
-                        } else {
-                            error("Failed to swap out FlowFiles from " + 
flowFileQueue + " due to: Unable to rename swap file from " + swapTempFile + " 
to " + swapFile);
-                            recordsSwapped = 0;
-                        }
-                    } catch (final IOException ioe) {
-                        recordsSwapped = 0;
-                        flowFileQueue.putSwappedRecords(toSwap);
-                        error("Failed to swap out " + toSwap.size() + " 
FlowFiles from " + flowFileQueue + " to Swap File " + swapLocation + " due to " 
+ ioe, ioe);
-                    }
-
-                    if (recordsSwapped > 0) {
-                        QueueLockWrapper swapQueue = 
swapMap.get(flowFileQueue);
-                        if (swapQueue == null) {
-                            swapQueue = new QueueLockWrapper(new 
LinkedBlockingQueue<File>());
-                            final QueueLockWrapper oldQueue = 
swapMap.putIfAbsent(flowFileQueue, swapQueue);
-                            if (oldQueue != null) {
-                                swapQueue = oldQueue;
-                            }
-                        }
-
-                        swapQueue.getQueue().add(swapFile);
-                    } else {
-                        swapTempFile.delete();
-                    }
-                }
-            }
-        }
-    }
-
-    /**
-     * Recovers FlowFiles from all Swap Files, returning the largest FlowFile 
ID that was recovered.
-     *
-     * @param queueProvider provider
-     * @return the largest FlowFile ID that was recovered
-     */
-    @Override
-    public long recoverSwappedFlowFiles(final QueueProvider queueProvider, 
final ResourceClaimManager claimManager) {
-        final File[] swapFiles = storageDirectory.listFiles(new 
FilenameFilter() {
-            @Override
-            public boolean accept(final File dir, final String name) {
-                return SWAP_FILE_PATTERN.matcher(name).matches() || 
TEMP_SWAP_FILE_PATTERN.matcher(name).matches();
-            }
-        });
-
-        if (swapFiles == null) {
-            return 0L;
-        }
-
-        final Collection<FlowFileQueue> allQueues = 
queueProvider.getAllQueues();
-        final Map<String, FlowFileQueue> queueMap = new HashMap<>();
-        for (final FlowFileQueue queue : allQueues) {
-            queueMap.put(queue.getIdentifier(), queue);
-        }
-
-        final ConnectionSwapInfo swapInfo = new ConnectionSwapInfo();
-        int swappedCount = 0;
-        long swappedBytes = 0L;
-        long maxRecoveredId = 0L;
-
-        for (final File swapFile : swapFiles) {
-            if (TEMP_SWAP_FILE_PATTERN.matcher(swapFile.getName()).matches()) {
-                if (swapFile.delete()) {
-                    logger.info("Removed incomplete/temporary Swap File " + 
swapFile);
-                } else {
-                    warn("Failed to remove incomplete/temporary Swap File " + 
swapFile + "; this file should be cleaned up manually");
-                }
-
-                continue;
-            }
-
-            // read record to disk via the swap file
-            try (final InputStream fis = new FileInputStream(swapFile);
-                    final InputStream bufferedIn = new 
BufferedInputStream(fis);
-                    final DataInputStream in = new 
DataInputStream(bufferedIn)) {
-
-                final int swapEncodingVersion = in.readInt();
-                if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
-                    final String errMsg = "Cannot swap FlowFiles in from " + 
swapFile + " because the encoding version is "
-                            + swapEncodingVersion + ", which is too new 
(expecting " + SWAP_ENCODING_VERSION + " or less)";
-
-                    eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, 
errMsg);
-                    throw new IOException(errMsg);
-                }
-
-                final String connectionId = in.readUTF();
-                final FlowFileQueue queue = queueMap.get(connectionId);
-                if (queue == null) {
-                    error("Cannot recover Swapped FlowFiles from Swap File " + 
swapFile + " because the FlowFiles belong to a Connection with ID "
-                            + connectionId + " and that Connection does not 
exist");
-                    continue;
-                }
-
-                final int numRecords = in.readInt();
-                final long contentSize = in.readLong();
-
-                swapInfo.addSwapSizeInfo(connectionId, 
swapFile.getAbsolutePath(), new QueueSize(numRecords, contentSize));
-                swappedCount += numRecords;
-                swappedBytes += contentSize;
-
-                final List<FlowFileRecord> records = deserializeFlowFiles(in, 
numRecords, queue, swapEncodingVersion, true, claimManager);
-                long maxId = 0L;
-                for (final FlowFileRecord record : records) {
-                    if (record.getId() > maxId) {
-                        maxId = record.getId();
-                    }
-                }
-
-                if (maxId > maxRecoveredId) {
-                    maxRecoveredId = maxId;
-                }
-            } catch (final IOException ioe) {
-                error("Cannot recover Swapped FlowFiles from Swap File " + 
swapFile + " due to " + ioe, ioe);
-            }
-        }
-
-        restoreSwapLocations(queueMap.values(), swapInfo);
-        logger.info("Recovered {} FlowFiles ({} bytes) from Swap Files", 
swappedCount, swappedBytes);
-        return maxRecoveredId;
-    }
-
-    public void restoreSwapLocations(final Collection<FlowFileQueue> 
flowFileQueues, final ConnectionSwapInfo swapInfo) {
-        for (final FlowFileQueue queue : flowFileQueues) {
-            final String queueId = queue.getIdentifier();
-            final Collection<String> swapFileLocations = 
swapInfo.getSwapFileLocations(queueId);
-            if (swapFileLocations == null || swapFileLocations.isEmpty()) {
-                continue;
-            }
-
-            final SortedMap<String, QueueSize> sortedFileQueueMap = new 
TreeMap<>(new SwapFileComparator());
-            for (final String swapFileLocation : swapFileLocations) {
-                final QueueSize queueSize = swapInfo.getSwappedSize(queueId, 
swapFileLocation);
-                sortedFileQueueMap.put(swapFileLocation, queueSize);
-            }
-
-            QueueLockWrapper fileQueue = swapMap.get(queue);
-            if (fileQueue == null) {
-                fileQueue = new QueueLockWrapper(new 
LinkedBlockingQueue<File>());
-                swapMap.put(queue, fileQueue);
-            }
-
-            for (final Map.Entry<String, QueueSize> innerEntry : 
sortedFileQueueMap.entrySet()) {
-                final File swapFile = new File(innerEntry.getKey());
-                final QueueSize size = innerEntry.getValue();
-                fileQueue.getQueue().add(swapFile);
-                queue.incrementSwapCount(size.getObjectCount(), 
size.getByteCount());
-            }
-        }
-    }
 
-    @Override
-    public void shutdown() {
-        swapQueueIdentifierExecutor.shutdownNow();
-        swapInExecutor.shutdownNow();
-    }
 
     private static class SwapFileComparator implements Comparator<String> {
-
         @Override
         public int compare(final String o1, final String o2) {
             if (o1 == o2) {
@@ -755,34 +647,4 @@ public class FileSystemSwapManager implements 
FlowFileSwapManager {
         }
     }
 
-    private static class QueueLockWrapper {
-
-        private final Lock lock = new ReentrantLock();
-        private final Queue<File> queue;
-
-        public QueueLockWrapper(final Queue<File> queue) {
-            this.queue = queue;
-        }
-
-        public Queue<File> getQueue() {
-            return queue;
-        }
-
-        public Lock getLock() {
-            return lock;
-        }
-
-        @Override
-        public int hashCode() {
-            return queue.hashCode();
-        }
-
-        @Override
-        public boolean equals(final Object obj) {
-            if (obj instanceof QueueLockWrapper) {
-                return queue.equals(((QueueLockWrapper) obj).queue);
-            }
-            return false;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index d9c3f39..23746ce 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -80,6 +80,8 @@ import 
org.apache.nifi.controller.exception.ComponentLifeCycleException;
 import org.apache.nifi.controller.exception.ProcessorInstantiationException;
 import org.apache.nifi.controller.label.Label;
 import org.apache.nifi.controller.label.StandardLabel;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.QueueSize;
 import 
org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
 import org.apache.nifi.controller.reporting.ReportingTaskProvider;
 import 
org.apache.nifi.controller.reporting.StandardReportingInitializationContext;
@@ -97,6 +99,7 @@ import 
org.apache.nifi.controller.repository.RepositoryStatusReport;
 import org.apache.nifi.controller.repository.StandardCounterRepository;
 import org.apache.nifi.controller.repository.StandardFlowFileRecord;
 import org.apache.nifi.controller.repository.StandardRepositoryRecord;
+import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.ContentDirection;
 import org.apache.nifi.controller.repository.claim.ResourceClaim;
@@ -152,7 +155,6 @@ import org.apache.nifi.nar.NarCloseable;
 import org.apache.nifi.nar.NarThreadContextClassLoader;
 import org.apache.nifi.processor.Processor;
 import org.apache.nifi.processor.ProcessorInitializationContext;
-import org.apache.nifi.processor.QueueSize;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.SimpleProcessLogger;
 import org.apache.nifi.processor.StandardProcessorInitializationContext;
@@ -216,7 +218,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
     public static final String SCHEDULE_MINIMUM_NANOSECONDS = 
"flowcontroller.minimum.nanoseconds";
     public static final String GRACEFUL_SHUTDOWN_PERIOD = 
"nifi.flowcontroller.graceful.shutdown.seconds";
     public static final long DEFAULT_GRACEFUL_SHUTDOWN_SECONDS = 10;
-    public static final int METRICS_RESERVOIR_SIZE = 288;   // 1 day worth of 
5-minute captures
+    public static final int METRICS_RESERVOIR_SIZE = 288; // 1 day worth of 
5-minute captures
 
     public static final String ROOT_GROUP_ID_ALIAS = "root";
     public static final String DEFAULT_ROOT_GROUP_NAME = "NiFi Flow";
@@ -245,7 +247,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
     private final UserService userService;
     private final EventDrivenWorkerQueue eventDrivenWorkerQueue;
     private final ComponentStatusRepository componentStatusRepository;
-    private final long systemStartTime = System.currentTimeMillis();    // 
time at which the node was started
+    private final long systemStartTime = System.currentTimeMillis(); // time 
at which the node was started
     private final ConcurrentMap<String, ReportingTaskNode> reportingTasks = 
new ConcurrentHashMap<>();
 
     // The Heartbeat Bean is used to provide an Atomic Reference to data that 
is used in heartbeats that may
@@ -336,38 +338,36 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
     private final Lock readLock = rwLock.readLock();
     private final Lock writeLock = rwLock.writeLock();
 
-    private FlowFileSwapManager flowFileSwapManager;    // guarded by 
read/write lock
-
     private static final Logger LOG = 
LoggerFactory.getLogger(FlowController.class);
     private static final Logger heartbeatLogger = 
LoggerFactory.getLogger("org.apache.nifi.cluster.heartbeat");
 
     public static FlowController createStandaloneInstance(
-            final FlowFileEventRepository flowFileEventRepo,
-            final NiFiProperties properties,
-            final UserService userService,
-            final StringEncryptor encryptor) {
+        final FlowFileEventRepository flowFileEventRepo,
+        final NiFiProperties properties,
+        final UserService userService,
+        final StringEncryptor encryptor) {
         return new FlowController(
-                flowFileEventRepo,
-                properties,
-                userService,
-                encryptor,
-                /* configuredForClustering */ false,
-                /* NodeProtocolSender */ null);
+            flowFileEventRepo,
+            properties,
+            userService,
+            encryptor,
+            /* configuredForClustering */ false,
+            /* NodeProtocolSender */ null);
     }
 
     public static FlowController createClusteredInstance(
-            final FlowFileEventRepository flowFileEventRepo,
-            final NiFiProperties properties,
-            final UserService userService,
-            final StringEncryptor encryptor,
-            final NodeProtocolSender protocolSender) {
+        final FlowFileEventRepository flowFileEventRepo,
+        final NiFiProperties properties,
+        final UserService userService,
+        final StringEncryptor encryptor,
+        final NodeProtocolSender protocolSender) {
         final FlowController flowController = new FlowController(
-                flowFileEventRepo,
-                properties,
-                userService,
-                encryptor,
-                /* configuredForClustering */ true,
-                /* NodeProtocolSender */ protocolSender);
+            flowFileEventRepo,
+            properties,
+            userService,
+            encryptor,
+            /* configuredForClustering */ true,
+            /* NodeProtocolSender */ protocolSender);
 
         
flowController.setClusterManagerRemoteSiteInfo(properties.getRemoteInputPort(), 
properties.isSiteToSiteSecure());
 
@@ -375,12 +375,12 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
     }
 
     private FlowController(
-            final FlowFileEventRepository flowFileEventRepo,
-            final NiFiProperties properties,
-            final UserService userService,
-            final StringEncryptor encryptor,
-            final boolean configuredForClustering,
-            final NodeProtocolSender protocolSender) {
+        final FlowFileEventRepository flowFileEventRepo,
+        final NiFiProperties properties,
+        final UserService userService,
+        final StringEncryptor encryptor,
+        final boolean configuredForClustering,
+        final NodeProtocolSender protocolSender) {
 
         maxTimerDrivenThreads = new AtomicInteger(10);
         maxEventDrivenThreads = new AtomicInteger(5);
@@ -416,7 +416,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
 
         final ProcessContextFactory contextFactory = new 
ProcessContextFactory(contentRepository, flowFileRepository, 
flowFileEventRepository, counterRepositoryRef.get(), provenanceEventRepository);
         processScheduler.setSchedulingAgent(SchedulingStrategy.EVENT_DRIVEN, 
new EventDrivenSchedulingAgent(
-                eventDrivenEngineRef.get(), this, eventDrivenWorkerQueue, 
contextFactory, maxEventDrivenThreads.get(), encryptor));
+            eventDrivenEngineRef.get(), this, eventDrivenWorkerQueue, 
contextFactory, maxEventDrivenThreads.get(), encryptor));
 
         final QuartzSchedulingAgent quartzSchedulingAgent = new 
QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, 
encryptor);
         final TimerDrivenSchedulingAgent timerDrivenAgent = new 
TimerDrivenSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, 
encryptor);
@@ -468,7 +468,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             externalSiteListener = null;
         } else if (isSiteToSiteSecure && sslContext == null) {
             LOG.error("Unable to create Secure Site-to-Site Listener because 
not all required Keystore/Truststore "
-                    + "Properties are set. Site-to-Site functionality will be 
disabled until this problem is has been fixed.");
+                + "Properties are set. Site-to-Site functionality will be 
disabled until this problem is has been fixed.");
             externalSiteListener = null;
         } else {
             // Register the SocketFlowFileServerProtocol as the appropriate 
resource for site-to-site Server Protocol
@@ -501,7 +501,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         final String implementationClassName = 
properties.getProperty(NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION, 
DEFAULT_FLOWFILE_REPO_IMPLEMENTATION);
         if (implementationClassName == null) {
             throw new RuntimeException("Cannot create FlowFile Repository 
because the NiFi Properties is missing the following property: "
-                    + NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION);
+                + NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION);
         }
 
         try {
@@ -543,14 +543,22 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
     public void initializeFlow() throws IOException {
         writeLock.lock();
         try {
-            flowFileSwapManager = createSwapManager(properties);
+            // get all connections/queues and recover from swap files.
+            final List<Connection> connections = 
getGroup(getRootGroupId()).findAllConnections();
 
             long maxIdFromSwapFiles = -1L;
-            if (flowFileSwapManager != null) {
-                if (flowFileRepository.isVolatile()) {
-                    flowFileSwapManager.purge();
-                } else {
-                    maxIdFromSwapFiles = 
flowFileSwapManager.recoverSwappedFlowFiles(this, contentClaimManager);
+            if (flowFileRepository.isVolatile()) {
+                for (final Connection connection : connections) {
+                    final FlowFileQueue queue = connection.getFlowFileQueue();
+                    queue.purgeSwapFiles();
+                }
+            } else {
+                for (final Connection connection : connections) {
+                    final FlowFileQueue queue = connection.getFlowFileQueue();
+                    final Long maxFlowFileId = queue.recoverSwappedFlowFiles();
+                    if (maxFlowFileId != null && maxFlowFileId > 
maxIdFromSwapFiles) {
+                        maxIdFromSwapFiles = maxFlowFileId;
+                    }
                 }
             }
 
@@ -560,10 +568,6 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             // ContentRepository to purge superfluous files
             contentRepository.cleanup();
 
-            if (flowFileSwapManager != null) {
-                flowFileSwapManager.start(flowFileRepository, this, 
contentClaimManager, createEventReporter(bulletinRepository));
-            }
-
             if (externalSiteListener != null) {
                 externalSiteListener.start();
             }
@@ -612,7 +616,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                             startConnectable(connectable);
                         }
                     } catch (final Throwable t) {
-                        LOG.error("Unable to start {} due to {}", new 
Object[]{connectable, t.toString()});
+                        LOG.error("Unable to start {} due to {}", new Object[] 
{connectable, t.toString()});
                         if (LOG.isDebugEnabled()) {
                             LOG.error("", t);
                         }
@@ -627,7 +631,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                         
remoteGroupPort.getRemoteProcessGroup().startTransmitting(remoteGroupPort);
                         startedTransmitting++;
                     } catch (final Throwable t) {
-                        LOG.error("Unable to start transmitting with {} due to 
{}", new Object[]{remoteGroupPort, t});
+                        LOG.error("Unable to start transmitting with {} due to 
{}", new Object[] {remoteGroupPort, t});
                     }
                 }
 
@@ -642,7 +646,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                             startConnectable(connectable);
                         }
                     } catch (final Throwable t) {
-                        LOG.error("Unable to start {} due to {}", new 
Object[]{connectable, t});
+                        LOG.error("Unable to start {} due to {}", new Object[] 
{connectable, t});
                     }
                 }
 
@@ -658,7 +662,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         final String implementationClassName = 
properties.getProperty(NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION, 
DEFAULT_CONTENT_REPO_IMPLEMENTATION);
         if (implementationClassName == null) {
             throw new RuntimeException("Cannot create Provenance Repository 
because the NiFi Properties is missing the following property: "
-                    + NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION);
+                + NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION);
         }
 
         try {
@@ -676,7 +680,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         final String implementationClassName = 
properties.getProperty(NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS, 
DEFAULT_PROVENANCE_REPO_IMPLEMENTATION);
         if (implementationClassName == null) {
             throw new RuntimeException("Cannot create Provenance Repository 
because the NiFi Properties is missing the following property: "
-                    + NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS);
+                + NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS);
         }
 
         try {
@@ -690,7 +694,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         final String implementationClassName = 
properties.getProperty(NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION,
 DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION);
         if (implementationClassName == null) {
             throw new RuntimeException("Cannot create Component Status 
Repository because the NiFi Properties is missing the following property: "
-                    + 
NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION);
+                + NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION);
         }
 
         try {
@@ -721,7 +725,38 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             relationships.add(new 
Relationship.Builder().name(relationshipName).build());
         }
 
-        return builder.id(requireNonNull(id).intern()).name(name == null ? 
null : 
name.intern()).relationships(relationships).source(requireNonNull(source)).destination(destination).build();
+        // Create and initialize a FlowFileSwapManager for this connection
+        final FlowFileSwapManager swapManager = createSwapManager(properties);
+        final EventReporter eventReporter = 
createEventReporter(getBulletinRepository());
+        try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+            final SwapManagerInitializationContext initializationContext = new 
SwapManagerInitializationContext() {
+                @Override
+                public ResourceClaimManager getResourceClaimManager() {
+                    return getResourceClaimManager();
+                }
+
+                @Override
+                public FlowFileRepository getFlowFileRepository() {
+                    return flowFileRepository;
+                }
+
+                @Override
+                public EventReporter getEventReporter() {
+                    return eventReporter;
+                }
+            };
+
+            swapManager.initialize(initializationContext);
+        }
+
+        return builder.id(requireNonNull(id).intern())
+            .name(name == null ? null : name.intern())
+            .relationships(relationships)
+            .source(requireNonNull(source))
+            .destination(destination)
+            .swapManager(swapManager)
+            .eventReporter(eventReporter)
+            .build();
     }
 
     /**
@@ -910,7 +945,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         name = requireNonNull(name).intern();
         verifyPortIdDoesNotExist(id);
         return new StandardRootGroupPort(id, name, null, 
TransferDirection.RECEIVE, ConnectableType.INPUT_PORT,
-                userService, getBulletinRepository(), processScheduler, 
Boolean.TRUE.equals(isSiteToSiteSecure));
+            userService, getBulletinRepository(), processScheduler, 
Boolean.TRUE.equals(isSiteToSiteSecure));
     }
 
     /**
@@ -927,7 +962,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         name = requireNonNull(name).intern();
         verifyPortIdDoesNotExist(id);
         return new StandardRootGroupPort(id, name, null, 
TransferDirection.SEND, ConnectableType.OUTPUT_PORT,
-                userService, getBulletinRepository(), processScheduler, 
Boolean.TRUE.equals(isSiteToSiteSecure));
+            userService, getBulletinRepository(), processScheduler, 
Boolean.TRUE.equals(isSiteToSiteSecure));
     }
 
     /**
@@ -1083,24 +1118,20 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             try {
                 flowFileRepository.close();
             } catch (final Throwable t) {
-                LOG.warn("Unable to shut down FlowFileRepository due to {}", 
new Object[]{t});
+                LOG.warn("Unable to shut down FlowFileRepository due to {}", 
new Object[] {t});
             }
 
             if (this.timerDrivenEngineRef.get().isTerminated() && 
eventDrivenEngineRef.get().isTerminated()) {
                 LOG.info("Controller has been terminated successfully.");
             } else {
                 LOG.warn("Controller hasn't terminated properly.  There exists 
an uninterruptable thread that "
-                        + "will take an indeterminate amount of time to stop.  
Might need to kill the program manually.");
+                    + "will take an indeterminate amount of time to stop.  
Might need to kill the program manually.");
             }
 
             if (externalSiteListener != null) {
                 externalSiteListener.stop();
             }
 
-            if (flowFileSwapManager != null) {
-                flowFileSwapManager.shutdown();
-            }
-
             if (processScheduler != null) {
                 processScheduler.shutdown();
             }
@@ -1153,7 +1184,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
      * @throws FlowSynchronizationException if updates to the controller 
failed. If this exception is thrown, then the controller should be considered 
unsafe to be used
      */
     public void synchronize(final FlowSynchronizer synchronizer, final 
DataFlow dataFlow)
-            throws FlowSerializationException, FlowSynchronizationException, 
UninheritableFlowException {
+        throws FlowSerializationException, FlowSynchronizationException, 
UninheritableFlowException {
         writeLock.lock();
         try {
             LOG.debug("Synchronizing controller with proposed flow");
@@ -1199,7 +1230,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
      *
      * @param maxThreadCount
      *
-     * This method must be called while holding the write lock!
+     *            This method must be called while holding the write lock!
      */
     private void setMaxThreadCount(final int maxThreadCount, final FlowEngine 
engine, final AtomicInteger maxThreads) {
         if (maxThreadCount < 1) {
@@ -1267,7 +1298,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
      * @throws ProcessorInstantiationException
      *
      * @throws IllegalStateException if no process group can be found with the 
ID of DTO or with the ID of the DTO's parentGroupId, if the template ID 
specified is invalid, or if the DTO's Parent
-     * Group ID changes but the parent group has incoming or outgoing 
connections
+     *             Group ID changes but the parent group has incoming or 
outgoing connections
      *
      * @throws NullPointerException if the DTO or its ID is null
      */
@@ -1371,7 +1402,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
      *
      * @throws NullPointerException if either argument is null
      * @throws IllegalStateException if the snippet is not valid because a 
component in the snippet has an ID that is not unique to this flow, or because 
it shares an Input Port or Output Port at the
-     * root level whose name already exists in the given ProcessGroup, or 
because the Template contains a Processor or a Prioritizer whose class is not 
valid within this instance of NiFi.
+     *             root level whose name already exists in the given 
ProcessGroup, or because the Template contains a Processor or a Prioritizer 
whose class is not valid within this instance of NiFi.
      * @throws ProcessorInstantiationException if unable to instantiate a 
processor
      */
     public void instantiateSnippet(final ProcessGroup group, final 
FlowSnippetDTO dto) throws ProcessorInstantiationException {
@@ -2542,7 +2573,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         if (firstTimeAdded) {
             final ComponentLog componentLog = new SimpleProcessLogger(id, 
taskNode.getReportingTask());
             final ReportingInitializationContext config = new 
StandardReportingInitializationContext(id, taskNode.getName(),
-                    SchedulingStrategy.TIMER_DRIVEN, "1 min", componentLog, 
this);
+                SchedulingStrategy.TIMER_DRIVEN, "1 min", componentLog, this);
 
             try {
                 task.initialize(config);
@@ -2888,7 +2919,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         readLock.lock();
         try {
             return heartbeatGeneratorFuture != null && 
!heartbeatGeneratorFuture.isCancelled()
-                    && heartbeatSenderFuture != null && 
!heartbeatSenderFuture.isCancelled();
+                && heartbeatSenderFuture != null && 
!heartbeatSenderFuture.isCancelled();
         } finally {
             readLock.unlock();
         }
@@ -2948,7 +2979,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
 
     /**
      * @return the DN of the Cluster Manager that we are currently connected 
to, if available. This will return null if the instance is not clustered or if 
the instance is clustered but the NCM's DN
-     * is not available - for instance, if cluster communications are not 
secure
+     *         is not available - for instance, if cluster communications are 
not secure
      */
     public String getClusterManagerDN() {
         readLock.lock();
@@ -3101,10 +3132,10 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             @Override
             public boolean isContentSame() {
                 return areEqual(event.getPreviousContentClaimContainer(), 
event.getContentClaimContainer())
-                        && areEqual(event.getPreviousContentClaimSection(), 
event.getContentClaimSection())
-                        && areEqual(event.getPreviousContentClaimIdentifier(), 
event.getContentClaimIdentifier())
-                        && areEqual(event.getPreviousContentClaimOffset(), 
event.getContentClaimOffset())
-                        && areEqual(event.getPreviousFileSize(), 
event.getFileSize());
+                    && areEqual(event.getPreviousContentClaimSection(), 
event.getContentClaimSection())
+                    && areEqual(event.getPreviousContentClaimIdentifier(), 
event.getContentClaimIdentifier())
+                    && areEqual(event.getPreviousContentClaimOffset(), 
event.getContentClaimOffset())
+                    && areEqual(event.getPreviousFileSize(), 
event.getFileSize());
             }
 
             @Override
@@ -3297,7 +3328,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
 
         // Create the ContentClaim
         final ResourceClaim resourceClaim = 
contentClaimManager.newResourceClaim(event.getPreviousContentClaimContainer(),
-                event.getPreviousContentClaimSection(), 
event.getPreviousContentClaimIdentifier(), false);
+            event.getPreviousContentClaimSection(), 
event.getPreviousContentClaimIdentifier(), false);
 
         // Increment Claimant Count, since we will now be referencing the 
Content Claim
         contentClaimManager.incrementClaimantCount(resourceClaim);
@@ -3367,7 +3398,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         // Update the FlowFile Repository to indicate that we have added the 
FlowFile to the flow
         final StandardRepositoryRecord record = new 
StandardRepositoryRecord(queue, flowFileRecord);
         record.setDestination(queue);
-        
flowFileRepository.updateRepository(Collections.<RepositoryRecord>singleton(record));
+        flowFileRepository.updateRepository(Collections.<RepositoryRecord> 
singleton(record));
 
         // Enqueue the data
         queue.put(flowFileRecord);
@@ -3434,11 +3465,9 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                 protocolSender.sendBulletins(message);
                 if (LOG.isDebugEnabled()) {
                     LOG.debug(
-                            String.format(
-                                    "Sending bulletins to cluster manager at 
%s",
-                                    dateFormatter.format(new Date())
-                            )
-                    );
+                        String.format(
+                            "Sending bulletins to cluster manager at %s",
+                            dateFormatter.format(new Date())));
                 }
 
             } catch (final UnknownServiceAddressException usae) {
@@ -3496,7 +3525,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                         escapedBulletin = 
BulletinFactory.createBulletin(bulletin.getCategory(), bulletin.getLevel(), 
escapedBulletinMessage);
                     } else {
                         escapedBulletin = 
BulletinFactory.createBulletin(bulletin.getGroupId(), bulletin.getSourceId(), 
bulletin.getSourceType(),
-                                bulletin.getSourceName(), 
bulletin.getCategory(), bulletin.getLevel(), escapedBulletinMessage);
+                            bulletin.getSourceName(), bulletin.getCategory(), 
bulletin.getLevel(), escapedBulletinMessage);
                     }
                 } else {
                     escapedBulletin = bulletin;
@@ -3554,9 +3583,9 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                 final long sendMillis = 
TimeUnit.NANOSECONDS.toMillis(sendNanos);
 
                 heartbeatLogger.info("Heartbeat created at {} and sent at {}; 
send took {} millis",
-                        dateFormatter.format(new 
Date(message.getHeartbeat().getCreatedTimestamp())),
-                        dateFormatter.format(new Date()),
-                        sendMillis);
+                    dateFormatter.format(new 
Date(message.getHeartbeat().getCreatedTimestamp())),
+                    dateFormatter.format(new Date()),
+                    sendMillis);
             } catch (final UnknownServiceAddressException usae) {
                 if (heartbeatLogger.isDebugEnabled()) {
                     heartbeatLogger.debug(usae.getMessage());

http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java
index d5dba82..f70f602 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java
@@ -25,11 +25,11 @@ import java.util.Map;
 import java.util.Set;
 import java.util.regex.Pattern;
 
+import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.FlowFileFilter;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessSessionFactory;
-import org.apache.nifi.processor.QueueSize;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.io.OutputStreamCallback;

http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/ConnectionSwapInfo.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/ConnectionSwapInfo.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/ConnectionSwapInfo.java
deleted file mode 100644
index 642e8ff..0000000
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/ConnectionSwapInfo.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.repository;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.nifi.processor.QueueSize;
-
-public class ConnectionSwapInfo {
-
-    private final Map<String, Map<String, QueueSize>> connectionMap = new 
HashMap<>();
-
-    public void addSwapSizeInfo(final String connectionId, final String 
swapFileLocation, final QueueSize queueSize) {
-        Map<String, QueueSize> queueSizeMap = connectionMap.get(connectionId);
-        if (queueSizeMap == null) {
-            queueSizeMap = new HashMap<>();
-            connectionMap.put(connectionId, queueSizeMap);
-        }
-
-        queueSizeMap.put(swapFileLocation, queueSize);
-    }
-
-    public Collection<String> getSwapFileLocations(final String connectionId) {
-        final Map<String, QueueSize> sizeMap = connectionMap.get(connectionId);
-        if (sizeMap == null) {
-            return Collections.<String>emptyList();
-        }
-
-        return Collections.unmodifiableCollection(sizeMap.keySet());
-    }
-
-    public QueueSize getSwappedSize(final String connectionId, final String 
swapFileLocation) {
-        final Map<String, QueueSize> sizeMap = connectionMap.get(connectionId);
-        if (sizeMap == null) {
-            return null;
-        }
-
-        return sizeMap.get(swapFileLocation);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 3ba7e4e..a32a485 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -41,8 +41,9 @@ import java.util.regex.Pattern;
 
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.Connection;
-import org.apache.nifi.controller.FlowFileQueue;
 import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.repository.io.ByteCountingInputStream;
@@ -57,7 +58,6 @@ import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.FlowFileFilter;
 import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.QueueSize;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.FlowFileAccessException;
 import org.apache.nifi.processor.exception.FlowFileHandlingException;

http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java
index 5fcb35a..c5be81e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java
@@ -20,7 +20,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.nifi.controller.FlowFileQueue;
+import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.processor.Relationship;
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
index a85b23b..ae8824a 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
@@ -21,7 +21,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.nifi.controller.FlowFileQueue;
+import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaimManager;

http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
index 5ee5fb5..639a4c8 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
@@ -41,7 +41,7 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.nifi.controller.FlowFileQueue;
+import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaimManager;

http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
index b573006..6eeddc5 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
@@ -28,10 +28,10 @@ import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.repository.FlowFileRecord;
 import org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -42,12 +42,12 @@ public class TestFileSystemSwapManager {
         System.setProperty("nifi.properties.file.path", 
"src/test/resources/nifi.properties");
 
         try (final InputStream fis = new FileInputStream(new 
File("src/test/resources/old-swap-file.swap"));
-                final DataInputStream in = new DataInputStream(new 
BufferedInputStream(fis))) {
+            final DataInputStream in = new DataInputStream(new 
BufferedInputStream(fis))) {
 
             final FlowFileQueue flowFileQueue = 
Mockito.mock(FlowFileQueue.class);
             
Mockito.when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
 
-            final List<FlowFileRecord> records = 
FileSystemSwapManager.deserializeFlowFiles(in, flowFileQueue, new 
NopResourceClaimManager());
+            final List<FlowFileRecord> records = 
FileSystemSwapManager.deserializeFlowFiles(in, flowFileQueue, 
"/src/test/resources/old-swap-file.swap", new NopResourceClaimManager());
             assertEquals(10000, records.size());
 
             for (final FlowFileRecord record : records) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index 0e11923..12f8e5e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -48,9 +48,9 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.ConnectableType;
 import org.apache.nifi.connectable.Connection;
-import org.apache.nifi.controller.FlowFileQueue;
 import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.StandardFlowFileQueue;
+import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
@@ -133,7 +133,8 @@ public class TestStandardProcessSession {
         final Connection connection = Mockito.mock(Connection.class);
         final ProcessScheduler processScheduler = 
Mockito.mock(ProcessScheduler.class);
 
-        flowFileQueue = new StandardFlowFileQueue("1", connection, 
processScheduler, 10000);
+        final FlowFileSwapManager swapManager = 
Mockito.mock(FlowFileSwapManager.class);
+        flowFileQueue = new StandardFlowFileQueue("1", connection, 
processScheduler, swapManager, null, 10000);
         when(connection.getFlowFileQueue()).thenReturn(flowFileQueue);
 
         Mockito.doAnswer(new Answer<Object>() {
@@ -445,7 +446,7 @@ public class TestStandardProcessSession {
         session.transfer(newFlowFile, new 
Relationship.Builder().name("A").build());
         session.commit();
 
-        assertEquals(1, provenanceRepo.getEvents(0L, 100000).size());  // 1 
event for both parents and children
+        assertEquals(1, provenanceRepo.getEvents(0L, 100000).size()); // 1 
event for both parents and children
     }
 
     @Test
@@ -809,7 +810,7 @@ public class TestStandardProcessSession {
             .entryDate(System.currentTimeMillis())
             .contentClaim(new StandardContentClaim(new 
StandardResourceClaim("x", "x", "0", true), 0L))
 
-            .contentClaimOffset(1000L).size(1L).build();
+        .contentClaimOffset(1000L).size(1L).build();
         flowFileQueue.put(flowFileRecord2);
 
         // attempt to read the data.

http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
index 2138928..e836b44 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
@@ -18,6 +18,7 @@ package org.apache.nifi.controller.repository;
 
 import org.apache.nifi.controller.repository.WriteAheadFlowFileRepository;
 import org.apache.nifi.controller.repository.StandardRepositoryRecord;
+import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.repository.StandardFlowFileRecord;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.any;
@@ -33,7 +34,6 @@ import java.util.Collection;
 import java.util.List;
 
 import org.apache.nifi.connectable.Connection;
-import org.apache.nifi.controller.FlowFileQueue;
 import 
org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
 import org.apache.nifi.util.file.FileUtils;
 

Reply via email to