Repository: nifi
Updated Branches:
  refs/heads/master 7f15626af -> 2839a2f21


NIFI-15: Address issue where incomplete swap files can result in continually 
attempting to swap in data without ever being successful

Signed-off-by: Aldrin Piri <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/2839a2f2
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/2839a2f2
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/2839a2f2

Branch: refs/heads/master
Commit: 2839a2f21542abf1224dec2726268e40a4cd1d74
Parents: 7f15626
Author: Mark Payne <[email protected]>
Authored: Sun Feb 21 14:03:37 2016 -0500
Committer: Aldrin Piri <[email protected]>
Committed: Wed Mar 9 23:46:53 2016 -0500

----------------------------------------------------------------------
 .../repository/FlowFileSwapManager.java         |  10 +-
 .../repository/IncompleteSwapFileException.java |  45 ++++
 .../controller/repository/SwapContents.java     |  40 +++
 .../nifi/controller/FileSystemSwapManager.java  | 261 ++++++++++---------
 .../nifi/controller/StandardFlowFileQueue.java  |  65 ++++-
 .../controller/swap/StandardSwapContents.java   |  47 ++++
 .../controller/TestFileSystemSwapManager.java   |  11 +-
 .../controller/TestStandardFlowFileQueue.java   | 102 +++++++-
 .../TestWriteAheadFlowFileRepository.java       |  14 +-
 9 files changed, 447 insertions(+), 148 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/2839a2f2/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
 
b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
index e07cf1a..7092a6f 100644
--- 
a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
+++ 
b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
@@ -51,17 +51,17 @@ public interface FlowFileSwapManager {
     String swapOut(List<FlowFileRecord> flowFiles, FlowFileQueue 
flowFileQueue) throws IOException;
 
     /**
-     * Recovers the SwapFiles from the swap file that lives at the given 
location. This action
+     * Recovers the FlowFiles from the swap file that lives at the given 
location. This action
      * provides a view of the FlowFiles but does not actively swap them in, 
meaning that the swap file
      * at the given location remains in that location and the FlowFile 
Repository is not updated.
      *
      * @param swapLocation the location of the swap file
      * @param flowFileQueue the queue that the FlowFiles belong to
-     * @return the FlowFiles that live at the given swap location
+     * @return a SwapContents that includes the FlowFiles that live at the 
given swap location
      *
      * @throws IOException if unable to recover the FlowFiles from the given 
location
      */
-    List<FlowFileRecord> peek(String swapLocation, FlowFileQueue 
flowFileQueue) throws IOException;
+    SwapContents peek(String swapLocation, FlowFileQueue flowFileQueue) throws 
IncompleteSwapFileException, IOException;
 
     /**
      * Recovers the FlowFiles from the swap file that lives at the given 
location and belongs
@@ -71,12 +71,12 @@ public interface FlowFileSwapManager {
      * @param swapLocation the location of the swap file
      * @param flowFileQueue the queue to which the FlowFiles belong
      *
-     * @return the FlowFiles that are stored in the given location
+     * @return a SwapContents that includes FlowFiles that are stored in the 
given location
      *
      * @throws IOException if unable to recover the FlowFiles from the given 
location or update the
      *             FlowFileRepository
      */
-    List<FlowFileRecord> swapIn(String swapLocation, FlowFileQueue 
flowFileQueue) throws IOException;
+    SwapContents swapIn(String swapLocation, FlowFileQueue flowFileQueue) 
throws IncompleteSwapFileException, IOException;
 
     /**
      * Determines swap files that exist for the given FlowFileQueue

http://git-wip-us.apache.org/repos/asf/nifi/blob/2839a2f2/nifi-api/src/main/java/org/apache/nifi/controller/repository/IncompleteSwapFileException.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/controller/repository/IncompleteSwapFileException.java
 
b/nifi-api/src/main/java/org/apache/nifi/controller/repository/IncompleteSwapFileException.java
new file mode 100644
index 0000000..4408f02
--- /dev/null
+++ 
b/nifi-api/src/main/java/org/apache/nifi/controller/repository/IncompleteSwapFileException.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository;
+
+import java.io.EOFException;
+
+/**
+ * Signals that a Swap File could not be complete read in/parsed because the 
data was
+ * not all present
+ */
+public class IncompleteSwapFileException extends EOFException {
+    private static final long serialVersionUID = -6818558584430076898L;
+
+    private final String swapLocation;
+    private final SwapContents partialContents;
+
+    public IncompleteSwapFileException(final String swapLocation, final 
SwapContents partialContents) {
+        super();
+        this.swapLocation = swapLocation;
+        this.partialContents = partialContents;
+    }
+
+    public String getSwapLocation() {
+        return swapLocation;
+    }
+
+    public SwapContents getPartialContents() {
+        return partialContents;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/2839a2f2/nifi-api/src/main/java/org/apache/nifi/controller/repository/SwapContents.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/controller/repository/SwapContents.java
 
b/nifi-api/src/main/java/org/apache/nifi/controller/repository/SwapContents.java
new file mode 100644
index 0000000..2fb7ba4
--- /dev/null
+++ 
b/nifi-api/src/main/java/org/apache/nifi/controller/repository/SwapContents.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository;
+
+import java.util.List;
+
+/**
+ * When FlowFiles are constructed from a Swap File, there is information in 
addition to
+ * the FlowFiles themselves that get stored and recovered. SwapContents 
provides a
+ * mechanism to encapsulate all of the information contained within a Swap 
File in a
+ * single object
+ */
+public interface SwapContents {
+
+    /**
+     * @return a summary of information included in a Swap File
+     */
+    SwapSummary getSummary();
+
+    /**
+     * @return the FlowFiles that are contained within a Swap File
+     */
+    List<FlowFileRecord> getFlowFiles();
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/2839a2f2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
index 00b52cc..156389b 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
@@ -45,13 +45,16 @@ import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.controller.repository.FlowFileRecord;
 import org.apache.nifi.controller.repository.FlowFileRepository;
 import org.apache.nifi.controller.repository.FlowFileSwapManager;
+import org.apache.nifi.controller.repository.IncompleteSwapFileException;
 import org.apache.nifi.controller.repository.StandardFlowFileRecord;
+import org.apache.nifi.controller.repository.SwapContents;
 import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
 import org.apache.nifi.controller.repository.SwapSummary;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 import org.apache.nifi.controller.repository.claim.StandardContentClaim;
+import org.apache.nifi.controller.swap.StandardSwapContents;
 import org.apache.nifi.controller.swap.StandardSwapSummary;
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.reporting.Severity;
@@ -130,33 +133,33 @@ public class FileSystemSwapManager implements 
FlowFileSwapManager {
 
 
     @Override
-    public List<FlowFileRecord> swapIn(final String swapLocation, final 
FlowFileQueue flowFileQueue) throws IOException {
+    public SwapContents swapIn(final String swapLocation, final FlowFileQueue 
flowFileQueue) throws IOException {
         final File swapFile = new File(swapLocation);
-        final List<FlowFileRecord> swappedFlowFiles = peek(swapLocation, 
flowFileQueue);
-        flowFileRepository.swapFlowFilesIn(swapFile.getAbsolutePath(), 
swappedFlowFiles, flowFileQueue);
+        final SwapContents swapContents = peek(swapLocation, flowFileQueue);
+        flowFileRepository.swapFlowFilesIn(swapFile.getAbsolutePath(), 
swapContents.getFlowFiles(), flowFileQueue);
 
         if (!swapFile.delete()) {
             warn("Swapped in FlowFiles from file " + 
swapFile.getAbsolutePath() + " but failed to delete the file; this file should 
be cleaned up manually");
         }
 
-        return swappedFlowFiles;
+        return swapContents;
     }
 
     @Override
-    public List<FlowFileRecord> peek(final String swapLocation, final 
FlowFileQueue flowFileQueue) throws IOException {
+    public SwapContents peek(final String swapLocation, final FlowFileQueue 
flowFileQueue) throws IOException {
         final File swapFile = new File(swapLocation);
         if (!swapFile.exists()) {
             throw new FileNotFoundException("Failed to swap in FlowFiles from 
external storage location " + swapLocation + " into FlowFile Queue because the 
file could not be found");
         }
 
-        final List<FlowFileRecord> swappedFlowFiles;
+        final SwapContents swapContents;
         try (final InputStream fis = new FileInputStream(swapFile);
             final InputStream bis = new BufferedInputStream(fis);
             final DataInputStream in = new DataInputStream(bis)) {
-            swappedFlowFiles = deserializeFlowFiles(in, swapLocation, 
flowFileQueue, claimManager);
+            swapContents = deserializeFlowFiles(in, swapLocation, 
flowFileQueue, claimManager);
         }
 
-        return swappedFlowFiles;
+        return swapContents;
     }
 
 
@@ -240,7 +243,6 @@ public class FileSystemSwapManager implements 
FlowFileSwapManager {
     }
 
     @Override
-    @SuppressWarnings("deprecation")
     public SwapSummary getSwapSummary(final String swapLocation) throws 
IOException {
         final File swapFile = new File(swapLocation);
 
@@ -258,35 +260,29 @@ public class FileSystemSwapManager implements 
FlowFileSwapManager {
                 throw new IOException(errMsg);
             }
 
-            in.readUTF(); // ignore Connection ID
-            final int numRecords = in.readInt();
-            final long contentSize = in.readLong();
-
-            if (numRecords == 0) {
-                return StandardSwapSummary.EMPTY_SUMMARY;
-            }
-
+            final int numRecords;
+            final long contentSize;
             Long maxRecordId = null;
-            if (swapEncodingVersion > 7) {
-                maxRecordId = in.readLong();
-            }
+            try {
+                in.readUTF(); // ignore Connection ID
+                numRecords = in.readInt();
+                contentSize = in.readLong();
 
-            // Before swap encoding version 8, we did not write out the max 
record id, so we have to read all
-            // swap files to determine the max record id
-            final List<ResourceClaim> resourceClaims = new 
ArrayList<>(numRecords);
-            final List<FlowFileRecord> records = deserializeFlowFiles(in, 
numRecords, swapEncodingVersion, false, claimManager);
-            for (final FlowFileRecord record : records) {
-                if (maxRecordId == null || record.getId() > maxRecordId) {
-                    maxRecordId = record.getId();
+                if (numRecords == 0) {
+                    return StandardSwapSummary.EMPTY_SUMMARY;
                 }
 
-                final ContentClaim contentClaim = record.getContentClaim();
-                if (contentClaim != null) {
-                    resourceClaims.add(contentClaim.getResourceClaim());
+                if (swapEncodingVersion > 7) {
+                    maxRecordId = in.readLong();
                 }
+            } catch (final EOFException eof) {
+                logger.warn("Found premature End-of-File when reading Swap 
File {}. EOF occurred before any FlowFiles were encountered", swapLocation);
+                return StandardSwapSummary.EMPTY_SUMMARY;
             }
 
-            return new StandardSwapSummary(new QueueSize(numRecords, 
contentSize), maxRecordId, resourceClaims);
+            final QueueSize queueSize = new QueueSize(numRecords, contentSize);
+            final SwapContents swapContents = deserializeFlowFiles(in, 
queueSize, maxRecordId, swapEncodingVersion, true, claimManager, swapLocation);
+            return swapContents.getSummary();
         }
     }
 
@@ -385,7 +381,7 @@ public class FileSystemSwapManager implements 
FlowFileSwapManager {
         }
     }
 
-    static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, 
final String swapLocation, final FlowFileQueue queue, final 
ResourceClaimManager claimManager) throws IOException {
+    static SwapContents deserializeFlowFiles(final DataInputStream in, final 
String swapLocation, final FlowFileQueue queue, final ResourceClaimManager 
claimManager) throws IOException {
         final int swapEncodingVersion = in.readInt();
         if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
             throw new IOException("Cannot swap FlowFiles in from SwapFile 
because the encoding version is "
@@ -398,115 +394,146 @@ public class FileSystemSwapManager implements 
FlowFileSwapManager {
                 " because those FlowFiles belong to Connection with ID " + 
connectionId + " and an attempt was made to swap them into a Connection with ID 
" + queue.getIdentifier());
         }
 
-        final int numRecords = in.readInt();
-        in.readLong(); // Content Size
-        if (swapEncodingVersion > 7) {
-            in.readLong(); // Max Record ID
+        int numRecords = 0;
+        long contentSize = 0L;
+        Long maxRecordId = null;
+        try {
+            numRecords = in.readInt();
+            contentSize = in.readLong(); // Content Size
+            if (swapEncodingVersion > 7) {
+                maxRecordId = in.readLong(); // Max Record ID
+            }
+        } catch (final EOFException eof) {
+            final QueueSize queueSize = new QueueSize(numRecords, contentSize);
+            final SwapSummary summary = new StandardSwapSummary(queueSize, 
maxRecordId, Collections.<ResourceClaim> emptyList());
+            final SwapContents partialContents = new 
StandardSwapContents(summary, Collections.<FlowFileRecord> emptyList());
+            throw new IncompleteSwapFileException(swapLocation, 
partialContents);
         }
 
-        return deserializeFlowFiles(in, numRecords, swapEncodingVersion, 
false, claimManager);
+        final QueueSize queueSize = new QueueSize(numRecords, contentSize);
+        return deserializeFlowFiles(in, queueSize, maxRecordId, 
swapEncodingVersion, false, claimManager, swapLocation);
     }
 
-    private static List<FlowFileRecord> deserializeFlowFiles(final 
DataInputStream in, final int numFlowFiles,
-        final int serializationVersion, final boolean incrementContentClaims, 
final ResourceClaimManager claimManager) throws IOException {
-        final List<FlowFileRecord> flowFiles = new ArrayList<>();
-        for (int i = 0; i < numFlowFiles; i++) {
-            // legacy encoding had an "action" because it used to be couple 
with FlowFile Repository code
-            if (serializationVersion < 3) {
-                final int action = in.read();
-                if (action != 1) {
-                    throw new IOException("Swap File is version " + 
serializationVersion + " but did not contain a 'UPDATE' record type");
-                }
-            }
-
-            final StandardFlowFileRecord.Builder ffBuilder = new 
StandardFlowFileRecord.Builder();
-            ffBuilder.id(in.readLong());
-            ffBuilder.entryDate(in.readLong());
+    private static SwapContents deserializeFlowFiles(final DataInputStream in, 
final QueueSize queueSize, final Long maxRecordId,
+        final int serializationVersion, final boolean incrementContentClaims, 
final ResourceClaimManager claimManager, final String location) throws 
IOException {
+        final List<FlowFileRecord> flowFiles = new 
ArrayList<>(queueSize.getObjectCount());
+        final List<ResourceClaim> resourceClaims = new 
ArrayList<>(queueSize.getObjectCount());
+        Long maxId = maxRecordId;
 
-            if (serializationVersion > 1) {
-                // Lineage information was added in version 2
-                final int numLineageIdentifiers = in.readInt();
-                final Set<String> lineageIdentifiers = new 
HashSet<>(numLineageIdentifiers);
-                for (int lineageIdIdx = 0; lineageIdIdx < 
numLineageIdentifiers; lineageIdIdx++) {
-                    lineageIdentifiers.add(in.readUTF());
+        for (int i = 0; i < queueSize.getObjectCount(); i++) {
+            try {
+                // legacy encoding had an "action" because it used to be 
couple with FlowFile Repository code
+                if (serializationVersion < 3) {
+                    final int action = in.read();
+                    if (action != 1) {
+                        throw new IOException("Swap File is version " + 
serializationVersion + " but did not contain a 'UPDATE' record type");
+                    }
                 }
-                ffBuilder.lineageIdentifiers(lineageIdentifiers);
-                ffBuilder.lineageStartDate(in.readLong());
 
-                if (serializationVersion > 5) {
-                    ffBuilder.lastQueueDate(in.readLong());
+                final StandardFlowFileRecord.Builder ffBuilder = new 
StandardFlowFileRecord.Builder();
+                final long recordId = in.readLong();
+                if (maxId == null || recordId > maxId) {
+                    maxId = recordId;
                 }
-            }
-
-            ffBuilder.size(in.readLong());
-
-            if (serializationVersion < 3) {
-                readString(in); // connection Id
-            }
 
-            final boolean hasClaim = in.readBoolean();
-            if (hasClaim) {
-                final String claimId;
-                if (serializationVersion < 5) {
-                    claimId = String.valueOf(in.readLong());
-                } else {
-                    claimId = in.readUTF();
+                ffBuilder.id(recordId);
+                ffBuilder.entryDate(in.readLong());
+
+                if (serializationVersion > 1) {
+                    // Lineage information was added in version 2
+                    final int numLineageIdentifiers = in.readInt();
+                    final Set<String> lineageIdentifiers = new 
HashSet<>(numLineageIdentifiers);
+                    for (int lineageIdIdx = 0; lineageIdIdx < 
numLineageIdentifiers; lineageIdIdx++) {
+                        lineageIdentifiers.add(in.readUTF());
+                    }
+                    ffBuilder.lineageIdentifiers(lineageIdentifiers);
+                    ffBuilder.lineageStartDate(in.readLong());
+
+                    if (serializationVersion > 5) {
+                        ffBuilder.lastQueueDate(in.readLong());
+                    }
                 }
 
-                final String container = in.readUTF();
-                final String section = in.readUTF();
+                ffBuilder.size(in.readLong());
 
-                final long resourceOffset;
-                final long resourceLength;
-                if (serializationVersion < 6) {
-                    resourceOffset = 0L;
-                    resourceLength = -1L;
-                } else {
-                    resourceOffset = in.readLong();
-                    resourceLength = in.readLong();
+                if (serializationVersion < 3) {
+                    readString(in); // connection Id
                 }
 
-                final long claimOffset = in.readLong();
-
-                final boolean lossTolerant;
-                if (serializationVersion >= 4) {
-                    lossTolerant = in.readBoolean();
-                } else {
-                    lossTolerant = false;
+                final boolean hasClaim = in.readBoolean();
+                ResourceClaim resourceClaim = null;
+                if (hasClaim) {
+                    final String claimId;
+                    if (serializationVersion < 5) {
+                        claimId = String.valueOf(in.readLong());
+                    } else {
+                        claimId = in.readUTF();
+                    }
+
+                    final String container = in.readUTF();
+                    final String section = in.readUTF();
+
+                    final long resourceOffset;
+                    final long resourceLength;
+                    if (serializationVersion < 6) {
+                        resourceOffset = 0L;
+                        resourceLength = -1L;
+                    } else {
+                        resourceOffset = in.readLong();
+                        resourceLength = in.readLong();
+                    }
+
+                    final long claimOffset = in.readLong();
+
+                    final boolean lossTolerant;
+                    if (serializationVersion >= 4) {
+                        lossTolerant = in.readBoolean();
+                    } else {
+                        lossTolerant = false;
+                    }
+
+                    resourceClaim = claimManager.newResourceClaim(container, 
section, claimId, lossTolerant);
+                    final StandardContentClaim claim = new 
StandardContentClaim(resourceClaim, resourceOffset);
+                    claim.setLength(resourceLength);
+
+                    if (incrementContentClaims) {
+                        claimManager.incrementClaimantCount(resourceClaim);
+                    }
+
+                    ffBuilder.contentClaim(claim);
+                    ffBuilder.contentClaimOffset(claimOffset);
                 }
 
-                final ResourceClaim resourceClaim = 
claimManager.newResourceClaim(container, section, claimId, lossTolerant);
-                final StandardContentClaim claim = new 
StandardContentClaim(resourceClaim, resourceOffset);
-                claim.setLength(resourceLength);
-
-                if (incrementContentClaims) {
-                    claimManager.incrementClaimantCount(resourceClaim);
+                boolean attributesChanged = true;
+                if (serializationVersion < 3) {
+                    attributesChanged = in.readBoolean();
                 }
 
-                ffBuilder.contentClaim(claim);
-                ffBuilder.contentClaimOffset(claimOffset);
-            }
-
-            boolean attributesChanged = true;
-            if (serializationVersion < 3) {
-                attributesChanged = in.readBoolean();
-            }
+                if (attributesChanged) {
+                    final int numAttributes = in.readInt();
+                    for (int j = 0; j < numAttributes; j++) {
+                        final String key = readString(in);
+                        final String value = readString(in);
 
-            if (attributesChanged) {
-                final int numAttributes = in.readInt();
-                for (int j = 0; j < numAttributes; j++) {
-                    final String key = readString(in);
-                    final String value = readString(in);
+                        ffBuilder.addAttribute(key, value);
+                    }
+                }
 
-                    ffBuilder.addAttribute(key, value);
+                final FlowFileRecord record = ffBuilder.build();
+                if (resourceClaim != null) {
+                    resourceClaims.add(resourceClaim);
                 }
-            }
 
-            final FlowFileRecord record = ffBuilder.build();
-            flowFiles.add(record);
+                flowFiles.add(record);
+            } catch (final EOFException eof) {
+                final SwapSummary swapSummary = new 
StandardSwapSummary(queueSize, maxId, resourceClaims);
+                final SwapContents partialContents = new 
StandardSwapContents(swapSummary, flowFiles);
+                throw new IncompleteSwapFileException(location, 
partialContents);
+            }
         }
 
-        return flowFiles;
+        final SwapSummary swapSummary = new StandardSwapSummary(queueSize, 
maxId, resourceClaims);
+        return new StandardSwapContents(swapSummary, flowFiles);
     }
 
     private static String readString(final InputStream in) throws IOException {

http://git-wip-us.apache.org/repos/asf/nifi/blob/2839a2f2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index 22aacdc..0f3ffe0 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -47,8 +47,10 @@ import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.controller.repository.FlowFileRecord;
 import org.apache.nifi.controller.repository.FlowFileRepository;
 import org.apache.nifi.controller.repository.FlowFileSwapManager;
+import org.apache.nifi.controller.repository.IncompleteSwapFileException;
 import org.apache.nifi.controller.repository.RepositoryRecord;
 import org.apache.nifi.controller.repository.RepositoryRecordType;
+import org.apache.nifi.controller.repository.SwapContents;
 import org.apache.nifi.controller.repository.SwapSummary;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaim;
@@ -456,16 +458,15 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
         // first.
         if (!swapLocations.isEmpty()) {
             final String swapLocation = swapLocations.remove(0);
+            boolean partialContents = false;
+            SwapContents swapContents = null;
             try {
-                final List<FlowFileRecord> swappedIn = 
swapManager.swapIn(swapLocation, this);
-                long swapSize = 0L;
-                for (final FlowFileRecord flowFile : swappedIn) {
-                    swapSize += flowFile.getSize();
-                }
-                incrementSwapQueueSize(-swappedIn.size(), -swapSize, -1);
-                incrementActiveQueueSize(swappedIn.size(), swapSize);
-                activeQueue.addAll(swappedIn);
-                return;
+                swapContents = swapManager.swapIn(swapLocation, this);
+            } catch (final IncompleteSwapFileException isfe) {
+                logger.error("Failed to swap in all FlowFiles from Swap File 
{}; Swap File ended prematurely. The records that were present will still be 
swapped in", swapLocation);
+                logger.error("", isfe);
+                swapContents = isfe.getPartialContents();
+                partialContents = true;
             } catch (final FileNotFoundException fnfe) {
                 logger.error("Failed to swap in FlowFiles from Swap File {} 
because the Swap File can no longer be found", swapLocation);
                 if (eventReporter != null) {
@@ -481,6 +482,28 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
                 }
                 return;
             }
+
+            final QueueSize swapSize = 
swapContents.getSummary().getQueueSize();
+            final long contentSize = swapSize.getByteCount();
+            final int flowFileCount = swapSize.getObjectCount();
+            incrementSwapQueueSize(-flowFileCount, -contentSize, -1);
+
+            if (partialContents) {
+                // if we have partial results, we need to calculate the 
content size of the flowfiles
+                // actually swapped back in.
+                long contentSizeSwappedIn = 0L;
+                for (final FlowFileRecord swappedIn : 
swapContents.getFlowFiles()) {
+                    contentSizeSwappedIn += swappedIn.getSize();
+                }
+
+                incrementActiveQueueSize(swapContents.getFlowFiles().size(), 
contentSizeSwappedIn);
+            } else {
+                // we swapped in the whole swap file. We can just use the info 
that we got from the summary.
+                incrementActiveQueueSize(flowFileCount, contentSize);
+            }
+
+            activeQueue.addAll(swapContents.getFlowFiles());
+            return;
         }
 
         // this is the most common condition (nothing is swapped out), so do 
the check first and avoid the expense
@@ -710,6 +733,7 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
         }
 
         @Override
+        @SuppressWarnings("deprecation")
         public int compare(final FlowFileRecord f1, final FlowFileRecord f2) {
             int returnVal = 0;
             final boolean f1Penalized = f1.isPenalized();
@@ -1111,23 +1135,36 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
                         while (swapLocationItr.hasNext()) {
                             final String swapLocation = swapLocationItr.next();
 
-                            List<FlowFileRecord> swappedIn = null;
+                            SwapContents swapContents = null;
                             try {
                                 if (dropRequest.getState() == 
DropFlowFileState.CANCELED) {
                                     logger.info("Cancel requested for 
DropFlowFileRequest {}", requestIdentifier);
                                     return;
                                 }
 
-                                swappedIn = swapManager.swapIn(swapLocation, 
StandardFlowFileQueue.this);
-                                droppedSize = drop(swappedIn, requestor);
+                                swapContents = 
swapManager.swapIn(swapLocation, StandardFlowFileQueue.this);
+                                droppedSize = 
drop(swapContents.getFlowFiles(), requestor);
+                            } catch (final IncompleteSwapFileException isfe) {
+                                swapContents = isfe.getPartialContents();
+                                final String warnMsg = "Failed to swap in 
FlowFiles from Swap File " + swapLocation + " because the file was corrupt. "
+                                    + "Some FlowFiles may not be dropped from 
the queue until NiFi is restarted.";
+
+                                logger.warn(warnMsg);
+                                if (eventReporter != null) {
+                                    
eventReporter.reportEvent(Severity.WARNING, "Drop FlowFiles", warnMsg);
+                                }
                             } catch (final IOException ioe) {
                                 logger.error("Failed to swap in FlowFiles from 
Swap File {} in order to drop the FlowFiles for Connection {} due to {}",
                                     swapLocation, 
StandardFlowFileQueue.this.getIdentifier(), ioe.toString());
                                 logger.error("", ioe);
+                                if (eventReporter != null) {
+                                    eventReporter.reportEvent(Severity.ERROR, 
"Drop FlowFiles", "Failed to swap in FlowFiles from Swap File " + swapLocation
+                                        + ". The FlowFiles contained in this 
Swap File will not be dropped from the queue");
+                                }
 
                                 
dropRequest.setState(DropFlowFileState.FAILURE, "Failed to swap in FlowFiles 
from Swap File " + swapLocation + " due to " + ioe.toString());
-                                if (swappedIn != null) {
-                                    activeQueue.addAll(swappedIn); // ensure 
that we don't lose the FlowFiles from our queue.
+                                if (swapContents != null) {
+                                    
activeQueue.addAll(swapContents.getFlowFiles()); // ensure that we don't lose 
the FlowFiles from our queue.
                                 }
 
                                 return;

http://git-wip-us.apache.org/repos/asf/nifi/blob/2839a2f2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/StandardSwapContents.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/StandardSwapContents.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/StandardSwapContents.java
new file mode 100644
index 0000000..7e64a2a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/StandardSwapContents.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.swap;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.SwapContents;
+import org.apache.nifi.controller.repository.SwapSummary;
+
+public class StandardSwapContents implements SwapContents {
+    public static final SwapContents EMPTY_SWAP_CONTENTS = new 
StandardSwapContents(StandardSwapSummary.EMPTY_SUMMARY, 
Collections.<FlowFileRecord> emptyList());
+
+    private final SwapSummary summary;
+    private final List<FlowFileRecord> flowFiles;
+
+    public StandardSwapContents(final SwapSummary summary, final 
List<FlowFileRecord> flowFiles) {
+        this.summary = summary;
+        this.flowFiles = flowFiles;
+    }
+
+    @Override
+    public SwapSummary getSummary() {
+        return summary;
+    }
+
+    @Override
+    public List<FlowFileRecord> getFlowFiles() {
+        return flowFiles;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/2839a2f2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
index f7191c5..fcfd524 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
@@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.SwapContents;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
@@ -57,7 +58,8 @@ public class TestFileSystemSwapManager {
             final FlowFileQueue flowFileQueue = 
Mockito.mock(FlowFileQueue.class);
             
Mockito.when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
 
-            final List<FlowFileRecord> records = 
FileSystemSwapManager.deserializeFlowFiles(in, 
"/src/test/resources/old-swap-file.swap", flowFileQueue, new 
NopResourceClaimManager());
+            final SwapContents swapContents = 
FileSystemSwapManager.deserializeFlowFiles(in, 
"/src/test/resources/old-swap-file.swap", flowFileQueue, new 
NopResourceClaimManager());
+            final List<FlowFileRecord> records = swapContents.getFlowFiles();
             assertEquals(10000, records.size());
 
             for (final FlowFileRecord record : records) {
@@ -68,6 +70,7 @@ public class TestFileSystemSwapManager {
     }
 
     @Test
+    @SuppressWarnings("deprecation")
     public void testRoundTripSerializeDeserialize() throws IOException {
         final List<FlowFileRecord> toSwap = new ArrayList<>(10000);
         final Map<String, String> attrs = new HashMap<>();
@@ -88,16 +91,16 @@ public class TestFileSystemSwapManager {
             FileSystemSwapManager.serializeFlowFiles(toSwap, flowFileQueue, 
swapLocation, fos);
         }
 
-        final List<FlowFileRecord> swappedIn;
+        final SwapContents swappedIn;
         try (final FileInputStream fis = new FileInputStream(swapFile);
             final DataInputStream dis = new DataInputStream(fis)) {
             swappedIn = FileSystemSwapManager.deserializeFlowFiles(dis, 
swapLocation, flowFileQueue, Mockito.mock(ResourceClaimManager.class));
         }
 
-        assertEquals(toSwap.size(), swappedIn.size());
+        assertEquals(toSwap.size(), swappedIn.getFlowFiles().size());
         for (int i = 0; i < toSwap.size(); i++) {
             final FlowFileRecord pre = toSwap.get(i);
-            final FlowFileRecord post = swappedIn.get(i);
+            final FlowFileRecord post = swappedIn.getFlowFiles().get(i);
 
             assertEquals(pre.getSize(), post.getSize());
             assertEquals(pre.getAttributes(), post.getAttributes());

http://git-wip-us.apache.org/repos/asf/nifi/blob/2839a2f2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
index 412e376..fc33d99 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
@@ -45,11 +45,14 @@ import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.controller.repository.FlowFileRecord;
 import org.apache.nifi.controller.repository.FlowFileRepository;
 import org.apache.nifi.controller.repository.FlowFileSwapManager;
+import org.apache.nifi.controller.repository.IncompleteSwapFileException;
+import org.apache.nifi.controller.repository.SwapContents;
 import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
 import org.apache.nifi.controller.repository.SwapSummary;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.swap.StandardSwapContents;
 import org.apache.nifi.controller.swap.StandardSwapSummary;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.FlowFilePrioritizer;
@@ -378,6 +381,68 @@ public class TestStandardFlowFileQueue {
         queue.poll(exp);
     }
 
+    @Test
+    public void testQueueCountsUpdatedWhenIncompleteSwapFile() {
+        for (int i = 1; i <= 20000; i++) {
+            queue.put(new TestFlowFile());
+        }
+
+        assertEquals(20000, queue.size().getObjectCount());
+        assertEquals(20000, queue.size().getByteCount());
+
+        assertEquals(1, swapManager.swappedOut.size());
+
+        // when we swap in, cause an IncompleteSwapFileException to be
+        // thrown and contain only 9,999 of the 10,000 FlowFiles
+        swapManager.enableIncompleteSwapFileException(9999);
+        final Set<FlowFileRecord> expired = Collections.emptySet();
+        FlowFileRecord flowFile;
+
+        for (int i = 0; i < 10000; i++) {
+            flowFile = queue.poll(expired);
+            assertNotNull(flowFile);
+            queue.acknowledge(Collections.singleton(flowFile));
+        }
+
+        // 10,000 FlowFiles on queue - all swapped out
+        assertEquals(10000, queue.size().getObjectCount());
+        assertEquals(10000, queue.size().getByteCount());
+        assertEquals(1, swapManager.swappedOut.size());
+        assertEquals(0, swapManager.swapInCalledCount);
+
+        // Trigger swap in. This will remove 1 FlowFile from queue, leaving 
9,999 but
+        // on swap in, we will get only 9,999 FlowFiles put onto the queue, 
and the queue size will
+        // be decremented by 10,000 (because the Swap File's header tells us 
that there are 10K
+        // FlowFiles, even though only 9999 are in the swap file)
+        flowFile = queue.poll(expired);
+        assertNotNull(flowFile);
+        queue.acknowledge(Collections.singleton(flowFile));
+
+        // size should be 9,998 because we lost 1 on Swap In, and then we 
pulled one above.
+        assertEquals(9998, queue.size().getObjectCount());
+        assertEquals(9998, queue.size().getByteCount());
+        assertEquals(0, swapManager.swappedOut.size());
+        assertEquals(1, swapManager.swapInCalledCount);
+
+        for (int i = 0; i < 9998; i++) {
+            flowFile = queue.poll(expired);
+            assertNotNull("Null FlowFile when i = " + i, flowFile);
+            queue.acknowledge(Collections.singleton(flowFile));
+
+            final QueueSize queueSize = queue.size();
+            assertEquals(9998 - i - 1, queueSize.getObjectCount());
+            assertEquals(9998 - i - 1, queueSize.getByteCount());
+        }
+
+        final QueueSize queueSize = queue.size();
+        assertEquals(0, queueSize.getObjectCount());
+        assertEquals(0L, queueSize.getByteCount());
+
+        flowFile = queue.poll(expired);
+        assertNull(flowFile);
+    }
+
+
     @Test(timeout = 120000)
     public void testDropSwappedFlowFiles() {
         for (int i = 1; i <= 210000; i++) {
@@ -445,11 +510,21 @@ public class TestStandardFlowFileQueue {
         int swapOutCalledCount = 0;
         int swapInCalledCount = 0;
 
+        private int incompleteSwapFileRecordsToInclude = -1;
+
         @Override
         public void initialize(final SwapManagerInitializationContext 
initializationContext) {
 
         }
 
+        public void enableIncompleteSwapFileException(final int 
flowFilesToInclude) {
+            incompleteSwapFileRecordsToInclude = flowFilesToInclude;
+        }
+
+        public void disableIncompleteSwapFileException() {
+            this.incompleteSwapFileRecordsToInclude = -1;
+        }
+
         @Override
         public String swapOut(List<FlowFileRecord> flowFiles, FlowFileQueue 
flowFileQueue) throws IOException {
             swapOutCalledCount++;
@@ -458,15 +533,34 @@ public class TestStandardFlowFileQueue {
             return location;
         }
 
+        private void throwIncompleteIfNecessary(final String swapLocation, 
final boolean remove) throws IOException {
+            if (incompleteSwapFileRecordsToInclude > -1) {
+                final SwapSummary summary = getSwapSummary(swapLocation);
+
+                final List<FlowFileRecord> records;
+                if (remove) {
+                    records = swappedOut.remove(swapLocation);
+                } else {
+                    records = swappedOut.get(swapLocation);
+                }
+
+                final List<FlowFileRecord> partial = records.subList(0, 
incompleteSwapFileRecordsToInclude);
+                final SwapContents partialContents = new 
StandardSwapContents(summary, partial);
+                throw new IncompleteSwapFileException(swapLocation, 
partialContents);
+            }
+        }
+
         @Override
-        public List<FlowFileRecord> peek(String swapLocation, final 
FlowFileQueue flowFileQueue) throws IOException {
-            return new ArrayList<FlowFileRecord>(swappedOut.get(swapLocation));
+        public SwapContents peek(String swapLocation, final FlowFileQueue 
flowFileQueue) throws IOException {
+            throwIncompleteIfNecessary(swapLocation, false);
+            return new StandardSwapContents(getSwapSummary(swapLocation), 
swappedOut.get(swapLocation));
         }
 
         @Override
-        public List<FlowFileRecord> swapIn(String swapLocation, FlowFileQueue 
flowFileQueue) throws IOException {
+        public SwapContents swapIn(String swapLocation, FlowFileQueue 
flowFileQueue) throws IOException {
             swapInCalledCount++;
-            return swappedOut.remove(swapLocation);
+            throwIncompleteIfNecessary(swapLocation, true);
+            return new StandardSwapContents(getSwapSummary(swapLocation), 
swappedOut.remove(swapLocation));
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/2839a2f2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
index c1c7b45..cd4aa27 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
@@ -46,6 +46,7 @@ import 
org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 import org.apache.nifi.controller.repository.claim.StandardContentClaim;
 import org.apache.nifi.controller.repository.claim.StandardResourceClaim;
 import 
org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
+import org.apache.nifi.controller.swap.StandardSwapContents;
 import org.apache.nifi.controller.swap.StandardSwapSummary;
 import org.apache.nifi.util.file.FileUtils;
 import org.junit.Before;
@@ -156,6 +157,7 @@ public class TestWriteAheadFlowFileRepository {
     }
 
     @Test
+    @SuppressWarnings("deprecation")
     public void testRestartWithOneRecord() throws IOException {
         final Path path = Paths.get("target/test-repo");
         if (Files.exists(path)) {
@@ -269,23 +271,27 @@ public class TestWriteAheadFlowFileRepository {
         }
 
         @Override
-        public List<FlowFileRecord> peek(String swapLocation, FlowFileQueue 
flowFileQueue) throws IOException {
+        public SwapContents peek(String swapLocation, FlowFileQueue 
flowFileQueue) throws IOException {
             Map<String, List<FlowFileRecord>> swapMap = 
swappedRecords.get(flowFileQueue);
             if (swapMap == null) {
                 return null;
             }
 
-            return Collections.unmodifiableList(swapMap.get(swapLocation));
+            final List<FlowFileRecord> flowFiles = swapMap.get(swapLocation);
+            final SwapSummary summary = getSwapSummary(swapLocation);
+            return new StandardSwapContents(summary, flowFiles);
         }
 
         @Override
-        public List<FlowFileRecord> swapIn(String swapLocation, FlowFileQueue 
flowFileQueue) throws IOException {
+        public SwapContents swapIn(String swapLocation, FlowFileQueue 
flowFileQueue) throws IOException {
             Map<String, List<FlowFileRecord>> swapMap = 
swappedRecords.get(flowFileQueue);
             if (swapMap == null) {
                 return null;
             }
 
-            return swapMap.remove(swapLocation);
+            final List<FlowFileRecord> flowFiles = 
swapMap.remove(swapLocation);
+            final SwapSummary summary = getSwapSummary(swapLocation);
+            return new StandardSwapContents(summary, flowFiles);
         }
 
         @Override

Reply via email to