Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1202#discussion_r87657911
  
    --- Diff: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
 ---
    @@ -251,353 +262,36 @@ public SwapSummary getSwapSummary(final String 
swapLocation) throws IOException
                     final InputStream bufferedIn = new 
BufferedInputStream(fis);
                     final DataInputStream in = new 
DataInputStream(bufferedIn)) {
     
    -            final int swapEncodingVersion = in.readInt();
    -            if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
    -                final String errMsg = "Cannot swap FlowFiles in from " + 
swapFile + " because the encoding version is "
    -                        + swapEncodingVersion + ", which is too new 
(expecting " + SWAP_ENCODING_VERSION + " or less)";
    -
    -                eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, 
errMsg);
    -                throw new IOException(errMsg);
    -            }
    -
    -            final int numRecords;
    -            final long contentSize;
    -            Long maxRecordId = null;
    -            try {
    -                in.readUTF(); // ignore Connection ID
    -                numRecords = in.readInt();
    -                contentSize = in.readLong();
    -
    -                if (numRecords == 0) {
    -                    return StandardSwapSummary.EMPTY_SUMMARY;
    -                }
    -
    -                if (swapEncodingVersion > 7) {
    -                    maxRecordId = in.readLong();
    -                }
    -            } catch (final EOFException eof) {
    -                logger.warn("Found premature End-of-File when reading Swap 
File {}. EOF occurred before any FlowFiles were encountered", swapLocation);
    -                return StandardSwapSummary.EMPTY_SUMMARY;
    -            }
    -
    -            final QueueSize queueSize = new QueueSize(numRecords, 
contentSize);
    -            final SwapContents swapContents = deserializeFlowFiles(in, 
queueSize, maxRecordId, swapEncodingVersion, true, claimManager, swapLocation);
    -            return swapContents.getSummary();
    -        }
    -    }
    -
    -    public static int serializeFlowFiles(final List<FlowFileRecord> 
toSwap, final FlowFileQueue queue, final String swapLocation, final 
OutputStream destination) throws IOException {
    -        if (toSwap == null || toSwap.isEmpty()) {
    -            return 0;
    -        }
    -
    -        long contentSize = 0L;
    -        for (final FlowFileRecord record : toSwap) {
    -            contentSize += record.getSize();
    -        }
    -
    -        // persist record to disk via the swap file
    -        final OutputStream bufferedOut = new 
BufferedOutputStream(destination);
    -        final DataOutputStream out = new DataOutputStream(bufferedOut);
    -        try {
    -            out.writeInt(SWAP_ENCODING_VERSION);
    -            out.writeUTF(queue.getIdentifier());
    -            out.writeInt(toSwap.size());
    -            out.writeLong(contentSize);
    -
    -            // get the max record id and write that out so that we know it 
quickly for restoration
    -            long maxRecordId = 0L;
    -            for (final FlowFileRecord flowFile : toSwap) {
    -                if (flowFile.getId() > maxRecordId) {
    -                    maxRecordId = flowFile.getId();
    -                }
    -            }
    -
    -            out.writeLong(maxRecordId);
    -
    -            for (final FlowFileRecord flowFile : toSwap) {
    -                out.writeLong(flowFile.getId());
    -                out.writeLong(flowFile.getEntryDate());
    -                out.writeLong(flowFile.getLineageStartDate());
    -                out.writeLong(flowFile.getLineageStartIndex());
    -                out.writeLong(flowFile.getLastQueueDate());
    -                out.writeLong(flowFile.getQueueDateIndex());
    -                out.writeLong(flowFile.getSize());
    -
    -                final ContentClaim claim = flowFile.getContentClaim();
    -                if (claim == null) {
    -                    out.writeBoolean(false);
    -                } else {
    -                    out.writeBoolean(true);
    -                    final ResourceClaim resourceClaim = 
claim.getResourceClaim();
    -                    out.writeUTF(resourceClaim.getId());
    -                    out.writeUTF(resourceClaim.getContainer());
    -                    out.writeUTF(resourceClaim.getSection());
    -                    out.writeLong(claim.getOffset());
    -                    out.writeLong(claim.getLength());
    -                    out.writeLong(flowFile.getContentClaimOffset());
    -                    out.writeBoolean(resourceClaim.isLossTolerant());
    -                }
    -
    -                final Map<String, String> attributes = 
flowFile.getAttributes();
    -                out.writeInt(attributes.size());
    -                for (final Map.Entry<String, String> entry : 
attributes.entrySet()) {
    -                    writeString(entry.getKey(), out);
    -                    writeString(entry.getValue(), out);
    -                }
    -            }
    -        } finally {
    -            out.flush();
    +            final SwapDeserializer deserializer = 
createSwapDeserializer(in);
    +            return deserializer.getSwapSummary(in, swapLocation, 
claimManager);
             }
    -
    -        logger.info("Successfully swapped out {} FlowFiles from {} to Swap 
File {}", toSwap.size(), queue, swapLocation);
    -
    -        return toSwap.size();
         }
     
    -    private static void writeString(final String toWrite, final 
OutputStream out) throws IOException {
    -        final byte[] bytes = toWrite.getBytes(StandardCharsets.UTF_8);
    -        final int utflen = bytes.length;
     
    -        if (utflen < 65535) {
    -            out.write(utflen >>> 8);
    -            out.write(utflen);
    -            out.write(bytes);
    -        } else {
    -            out.write(255);
    -            out.write(255);
    -            out.write(utflen >>> 24);
    -            out.write(utflen >>> 16);
    -            out.write(utflen >>> 8);
    -            out.write(utflen);
    -            out.write(bytes);
    -        }
    -    }
    -
    -    static SwapContents deserializeFlowFiles(final DataInputStream in, 
final String swapLocation, final FlowFileQueue queue, final 
ResourceClaimManager claimManager) throws IOException {
    -        final int swapEncodingVersion = in.readInt();
    -        if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
    -            throw new IOException("Cannot swap FlowFiles in from SwapFile 
because the encoding version is "
    -                    + swapEncodingVersion + ", which is too new (expecting 
" + SWAP_ENCODING_VERSION + " or less)");
    -        }
    +    private SwapDeserializer createSwapDeserializer(final DataInputStream 
dis) throws IOException {
    +        dis.mark(4);
    --- End diff --
    
    Probably best to use MAGIC_HEADER.length here instead of 4 like the init 
for the byte array uses.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to