Github user markap14 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1202#discussion_r87804332
--- Diff:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
---
@@ -251,353 +262,36 @@ public SwapSummary getSwapSummary(final String
swapLocation) throws IOException
final InputStream bufferedIn = new
BufferedInputStream(fis);
final DataInputStream in = new
DataInputStream(bufferedIn)) {
- final int swapEncodingVersion = in.readInt();
- if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
- final String errMsg = "Cannot swap FlowFiles in from " +
swapFile + " because the encoding version is "
- + swapEncodingVersion + ", which is too new
(expecting " + SWAP_ENCODING_VERSION + " or less)";
-
- eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY,
errMsg);
- throw new IOException(errMsg);
- }
-
- final int numRecords;
- final long contentSize;
- Long maxRecordId = null;
- try {
- in.readUTF(); // ignore Connection ID
- numRecords = in.readInt();
- contentSize = in.readLong();
-
- if (numRecords == 0) {
- return StandardSwapSummary.EMPTY_SUMMARY;
- }
-
- if (swapEncodingVersion > 7) {
- maxRecordId = in.readLong();
- }
- } catch (final EOFException eof) {
- logger.warn("Found premature End-of-File when reading Swap
File {}. EOF occurred before any FlowFiles were encountered", swapLocation);
- return StandardSwapSummary.EMPTY_SUMMARY;
- }
-
- final QueueSize queueSize = new QueueSize(numRecords,
contentSize);
- final SwapContents swapContents = deserializeFlowFiles(in,
queueSize, maxRecordId, swapEncodingVersion, true, claimManager, swapLocation);
- return swapContents.getSummary();
- }
- }
-
- public static int serializeFlowFiles(final List<FlowFileRecord>
toSwap, final FlowFileQueue queue, final String swapLocation, final
OutputStream destination) throws IOException {
- if (toSwap == null || toSwap.isEmpty()) {
- return 0;
- }
-
- long contentSize = 0L;
- for (final FlowFileRecord record : toSwap) {
- contentSize += record.getSize();
- }
-
- // persist record to disk via the swap file
- final OutputStream bufferedOut = new
BufferedOutputStream(destination);
- final DataOutputStream out = new DataOutputStream(bufferedOut);
- try {
- out.writeInt(SWAP_ENCODING_VERSION);
- out.writeUTF(queue.getIdentifier());
- out.writeInt(toSwap.size());
- out.writeLong(contentSize);
-
- // get the max record id and write that out so that we know it
quickly for restoration
- long maxRecordId = 0L;
- for (final FlowFileRecord flowFile : toSwap) {
- if (flowFile.getId() > maxRecordId) {
- maxRecordId = flowFile.getId();
- }
- }
-
- out.writeLong(maxRecordId);
-
- for (final FlowFileRecord flowFile : toSwap) {
- out.writeLong(flowFile.getId());
- out.writeLong(flowFile.getEntryDate());
- out.writeLong(flowFile.getLineageStartDate());
- out.writeLong(flowFile.getLineageStartIndex());
- out.writeLong(flowFile.getLastQueueDate());
- out.writeLong(flowFile.getQueueDateIndex());
- out.writeLong(flowFile.getSize());
-
- final ContentClaim claim = flowFile.getContentClaim();
- if (claim == null) {
- out.writeBoolean(false);
- } else {
- out.writeBoolean(true);
- final ResourceClaim resourceClaim =
claim.getResourceClaim();
- out.writeUTF(resourceClaim.getId());
- out.writeUTF(resourceClaim.getContainer());
- out.writeUTF(resourceClaim.getSection());
- out.writeLong(claim.getOffset());
- out.writeLong(claim.getLength());
- out.writeLong(flowFile.getContentClaimOffset());
- out.writeBoolean(resourceClaim.isLossTolerant());
- }
-
- final Map<String, String> attributes =
flowFile.getAttributes();
- out.writeInt(attributes.size());
- for (final Map.Entry<String, String> entry :
attributes.entrySet()) {
- writeString(entry.getKey(), out);
- writeString(entry.getValue(), out);
- }
- }
- } finally {
- out.flush();
+ final SwapDeserializer deserializer =
createSwapDeserializer(in);
+ return deserializer.getSwapSummary(in, swapLocation,
claimManager);
}
-
- logger.info("Successfully swapped out {} FlowFiles from {} to Swap
File {}", toSwap.size(), queue, swapLocation);
-
- return toSwap.size();
}
- private static void writeString(final String toWrite, final
OutputStream out) throws IOException {
- final byte[] bytes = toWrite.getBytes(StandardCharsets.UTF_8);
- final int utflen = bytes.length;
- if (utflen < 65535) {
- out.write(utflen >>> 8);
- out.write(utflen);
- out.write(bytes);
- } else {
- out.write(255);
- out.write(255);
- out.write(utflen >>> 24);
- out.write(utflen >>> 16);
- out.write(utflen >>> 8);
- out.write(utflen);
- out.write(bytes);
- }
- }
-
- static SwapContents deserializeFlowFiles(final DataInputStream in,
final String swapLocation, final FlowFileQueue queue, final
ResourceClaimManager claimManager) throws IOException {
- final int swapEncodingVersion = in.readInt();
- if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
- throw new IOException("Cannot swap FlowFiles in from SwapFile
because the encoding version is "
- + swapEncodingVersion + ", which is too new (expecting
" + SWAP_ENCODING_VERSION + " or less)");
- }
+ private SwapDeserializer createSwapDeserializer(final DataInputStream
dis) throws IOException {
+ dis.mark(4);
--- End diff --
Good call.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---