[ https://issues.apache.org/jira/browse/NIFI-2854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15658581#comment-15658581 ]
ASF GitHub Bot commented on NIFI-2854: -------------------------------------- Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/1202#discussion_r87657911 --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java --- @@ -251,353 +262,36 @@ public SwapSummary getSwapSummary(final String swapLocation) throws IOException final InputStream bufferedIn = new BufferedInputStream(fis); final DataInputStream in = new DataInputStream(bufferedIn)) { - final int swapEncodingVersion = in.readInt(); - if (swapEncodingVersion > SWAP_ENCODING_VERSION) { - final String errMsg = "Cannot swap FlowFiles in from " + swapFile + " because the encoding version is " - + swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)"; - - eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg); - throw new IOException(errMsg); - } - - final int numRecords; - final long contentSize; - Long maxRecordId = null; - try { - in.readUTF(); // ignore Connection ID - numRecords = in.readInt(); - contentSize = in.readLong(); - - if (numRecords == 0) { - return StandardSwapSummary.EMPTY_SUMMARY; - } - - if (swapEncodingVersion > 7) { - maxRecordId = in.readLong(); - } - } catch (final EOFException eof) { - logger.warn("Found premature End-of-File when reading Swap File {}. EOF occurred before any FlowFiles were encountered", swapLocation); - return StandardSwapSummary.EMPTY_SUMMARY; - } - - final QueueSize queueSize = new QueueSize(numRecords, contentSize); - final SwapContents swapContents = deserializeFlowFiles(in, queueSize, maxRecordId, swapEncodingVersion, true, claimManager, swapLocation); - return swapContents.getSummary(); - } - } - - public static int serializeFlowFiles(final List<FlowFileRecord> toSwap, final FlowFileQueue queue, final String swapLocation, final OutputStream destination) throws IOException { - if (toSwap == null || toSwap.isEmpty()) { - return 0; - } - - long contentSize = 0L; - for (final FlowFileRecord record : toSwap) { - contentSize += record.getSize(); - } - - // persist record to disk via the swap file - final OutputStream bufferedOut = new BufferedOutputStream(destination); - final DataOutputStream out = new DataOutputStream(bufferedOut); - try { - out.writeInt(SWAP_ENCODING_VERSION); - out.writeUTF(queue.getIdentifier()); - out.writeInt(toSwap.size()); - out.writeLong(contentSize); - - // get the max record id and write that out so that we know it quickly for restoration - long maxRecordId = 0L; - for (final FlowFileRecord flowFile : toSwap) { - if (flowFile.getId() > maxRecordId) { - maxRecordId = flowFile.getId(); - } - } - - out.writeLong(maxRecordId); - - for (final FlowFileRecord flowFile : toSwap) { - out.writeLong(flowFile.getId()); - out.writeLong(flowFile.getEntryDate()); - out.writeLong(flowFile.getLineageStartDate()); - out.writeLong(flowFile.getLineageStartIndex()); - out.writeLong(flowFile.getLastQueueDate()); - out.writeLong(flowFile.getQueueDateIndex()); - out.writeLong(flowFile.getSize()); - - final ContentClaim claim = flowFile.getContentClaim(); - if (claim == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - final ResourceClaim resourceClaim = claim.getResourceClaim(); - out.writeUTF(resourceClaim.getId()); - out.writeUTF(resourceClaim.getContainer()); - out.writeUTF(resourceClaim.getSection()); - out.writeLong(claim.getOffset()); - out.writeLong(claim.getLength()); - out.writeLong(flowFile.getContentClaimOffset()); - out.writeBoolean(resourceClaim.isLossTolerant()); - } - - final Map<String, String> attributes = flowFile.getAttributes(); - out.writeInt(attributes.size()); - for (final Map.Entry<String, String> entry : attributes.entrySet()) { - writeString(entry.getKey(), out); - writeString(entry.getValue(), out); - } - } - } finally { - out.flush(); + final SwapDeserializer deserializer = createSwapDeserializer(in); + return deserializer.getSwapSummary(in, swapLocation, claimManager); } - - logger.info("Successfully swapped out {} FlowFiles from {} to Swap File {}", toSwap.size(), queue, swapLocation); - - return toSwap.size(); } - private static void writeString(final String toWrite, final OutputStream out) throws IOException { - final byte[] bytes = toWrite.getBytes(StandardCharsets.UTF_8); - final int utflen = bytes.length; - if (utflen < 65535) { - out.write(utflen >>> 8); - out.write(utflen); - out.write(bytes); - } else { - out.write(255); - out.write(255); - out.write(utflen >>> 24); - out.write(utflen >>> 16); - out.write(utflen >>> 8); - out.write(utflen); - out.write(bytes); - } - } - - static SwapContents deserializeFlowFiles(final DataInputStream in, final String swapLocation, final FlowFileQueue queue, final ResourceClaimManager claimManager) throws IOException { - final int swapEncodingVersion = in.readInt(); - if (swapEncodingVersion > SWAP_ENCODING_VERSION) { - throw new IOException("Cannot swap FlowFiles in from SwapFile because the encoding version is " - + swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)"); - } + private SwapDeserializer createSwapDeserializer(final DataInputStream dis) throws IOException { + dis.mark(4); --- End diff -- Probably best to use MAGIC_HEADER.length here instead of 4 like the init for the byte array uses. > Enable repositories to support upgrades and rollback in well defined scenarios > ------------------------------------------------------------------------------ > > Key: NIFI-2854 > URL: https://issues.apache.org/jira/browse/NIFI-2854 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework > Reporter: Mark Payne > Assignee: Mark Payne > Fix For: 1.1.0 > > > The flowfile, swapfile, provenance, and content repositories play a very > important roll in NiFi's ability to be safely upgraded and rolled back. We > need to have well documented behaviors, designs, and version adherence so > that users can safely rely on these mechanisms. > Once this is formalized and in place we should update our versioning guidance > to reflect this as well. > The following would be true from NiFi 1.2.0 onward > * No changes to how the repositories are persisted to disk can be made which > will break forward/backward compatibility and specifically this means that > things like the way each is serialized to disk cannot change. > * If changes are made which impact forward or backward compatibility they > should be reserved for major releases only and should include a utility to > help users with pre-existing data convert from some older format to the newer > format. It may not be feasible to have rollback on major releases. > * The content repository should not be changed within a major release cycle > in any way that will harm forward or backward compatibility. > * The flow file repository can change in that new fields can be added to > existing write ahead log record types but no fields can be removed nor can > any new types be added. Once a field is considered required it must remain > required. Changes may only be made across minor version changes - not > incremental. > * Swap File storage should follow very similar rules to the flow file > repository. Adding a schema to the swap file header may allow some variation > there but the variation should only be hints to optimize how they're > processed and not change their behavior otherwise. Changes are only permitted > during minor version releases. > * Provenance repository changes are only permitted during minor version > releases. These changes may include adding or removing fields from existing > event types. If a field is considered required it must always be considered > required. If a field is removed then it must not be a required field and > there must be a sensible default an older version could use if that value is > not found in new data once rolled back. New event types may be added. > Fields or event types not known to older version, if seen after a rollback, > will simply be ignored. -- This message was sent by Atlassian JIRA (v6.3.4#6332)