[
https://issues.apache.org/jira/browse/NIFI-2854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15663987#comment-15663987
]
ASF GitHub Bot commented on NIFI-2854:
--------------------------------------
Github user markap14 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1202#discussion_r87804332
--- Diff:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
---
@@ -251,353 +262,36 @@ public SwapSummary getSwapSummary(final String
swapLocation) throws IOException
final InputStream bufferedIn = new
BufferedInputStream(fis);
final DataInputStream in = new
DataInputStream(bufferedIn)) {
- final int swapEncodingVersion = in.readInt();
- if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
- final String errMsg = "Cannot swap FlowFiles in from " +
swapFile + " because the encoding version is "
- + swapEncodingVersion + ", which is too new
(expecting " + SWAP_ENCODING_VERSION + " or less)";
-
- eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY,
errMsg);
- throw new IOException(errMsg);
- }
-
- final int numRecords;
- final long contentSize;
- Long maxRecordId = null;
- try {
- in.readUTF(); // ignore Connection ID
- numRecords = in.readInt();
- contentSize = in.readLong();
-
- if (numRecords == 0) {
- return StandardSwapSummary.EMPTY_SUMMARY;
- }
-
- if (swapEncodingVersion > 7) {
- maxRecordId = in.readLong();
- }
- } catch (final EOFException eof) {
- logger.warn("Found premature End-of-File when reading Swap
File {}. EOF occurred before any FlowFiles were encountered", swapLocation);
- return StandardSwapSummary.EMPTY_SUMMARY;
- }
-
- final QueueSize queueSize = new QueueSize(numRecords,
contentSize);
- final SwapContents swapContents = deserializeFlowFiles(in,
queueSize, maxRecordId, swapEncodingVersion, true, claimManager, swapLocation);
- return swapContents.getSummary();
- }
- }
-
- public static int serializeFlowFiles(final List<FlowFileRecord>
toSwap, final FlowFileQueue queue, final String swapLocation, final
OutputStream destination) throws IOException {
- if (toSwap == null || toSwap.isEmpty()) {
- return 0;
- }
-
- long contentSize = 0L;
- for (final FlowFileRecord record : toSwap) {
- contentSize += record.getSize();
- }
-
- // persist record to disk via the swap file
- final OutputStream bufferedOut = new
BufferedOutputStream(destination);
- final DataOutputStream out = new DataOutputStream(bufferedOut);
- try {
- out.writeInt(SWAP_ENCODING_VERSION);
- out.writeUTF(queue.getIdentifier());
- out.writeInt(toSwap.size());
- out.writeLong(contentSize);
-
- // get the max record id and write that out so that we know it
quickly for restoration
- long maxRecordId = 0L;
- for (final FlowFileRecord flowFile : toSwap) {
- if (flowFile.getId() > maxRecordId) {
- maxRecordId = flowFile.getId();
- }
- }
-
- out.writeLong(maxRecordId);
-
- for (final FlowFileRecord flowFile : toSwap) {
- out.writeLong(flowFile.getId());
- out.writeLong(flowFile.getEntryDate());
- out.writeLong(flowFile.getLineageStartDate());
- out.writeLong(flowFile.getLineageStartIndex());
- out.writeLong(flowFile.getLastQueueDate());
- out.writeLong(flowFile.getQueueDateIndex());
- out.writeLong(flowFile.getSize());
-
- final ContentClaim claim = flowFile.getContentClaim();
- if (claim == null) {
- out.writeBoolean(false);
- } else {
- out.writeBoolean(true);
- final ResourceClaim resourceClaim =
claim.getResourceClaim();
- out.writeUTF(resourceClaim.getId());
- out.writeUTF(resourceClaim.getContainer());
- out.writeUTF(resourceClaim.getSection());
- out.writeLong(claim.getOffset());
- out.writeLong(claim.getLength());
- out.writeLong(flowFile.getContentClaimOffset());
- out.writeBoolean(resourceClaim.isLossTolerant());
- }
-
- final Map<String, String> attributes =
flowFile.getAttributes();
- out.writeInt(attributes.size());
- for (final Map.Entry<String, String> entry :
attributes.entrySet()) {
- writeString(entry.getKey(), out);
- writeString(entry.getValue(), out);
- }
- }
- } finally {
- out.flush();
+ final SwapDeserializer deserializer =
createSwapDeserializer(in);
+ return deserializer.getSwapSummary(in, swapLocation,
claimManager);
}
-
- logger.info("Successfully swapped out {} FlowFiles from {} to Swap
File {}", toSwap.size(), queue, swapLocation);
-
- return toSwap.size();
}
- private static void writeString(final String toWrite, final
OutputStream out) throws IOException {
- final byte[] bytes = toWrite.getBytes(StandardCharsets.UTF_8);
- final int utflen = bytes.length;
- if (utflen < 65535) {
- out.write(utflen >>> 8);
- out.write(utflen);
- out.write(bytes);
- } else {
- out.write(255);
- out.write(255);
- out.write(utflen >>> 24);
- out.write(utflen >>> 16);
- out.write(utflen >>> 8);
- out.write(utflen);
- out.write(bytes);
- }
- }
-
- static SwapContents deserializeFlowFiles(final DataInputStream in,
final String swapLocation, final FlowFileQueue queue, final
ResourceClaimManager claimManager) throws IOException {
- final int swapEncodingVersion = in.readInt();
- if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
- throw new IOException("Cannot swap FlowFiles in from SwapFile
because the encoding version is "
- + swapEncodingVersion + ", which is too new (expecting
" + SWAP_ENCODING_VERSION + " or less)");
- }
+ private SwapDeserializer createSwapDeserializer(final DataInputStream
dis) throws IOException {
+ dis.mark(4);
--- End diff --
Good call.
> Enable repositories to support upgrades and rollback in well defined scenarios
> ------------------------------------------------------------------------------
>
> Key: NIFI-2854
> URL: https://issues.apache.org/jira/browse/NIFI-2854
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Core Framework
> Reporter: Mark Payne
> Assignee: Mark Payne
> Fix For: 1.1.0
>
>
> The flowfile, swapfile, provenance, and content repositories play a very
> important roll in NiFi's ability to be safely upgraded and rolled back. We
> need to have well documented behaviors, designs, and version adherence so
> that users can safely rely on these mechanisms.
> Once this is formalized and in place we should update our versioning guidance
> to reflect this as well.
> The following would be true from NiFi 1.2.0 onward
> * No changes to how the repositories are persisted to disk can be made which
> will break forward/backward compatibility and specifically this means that
> things like the way each is serialized to disk cannot change.
> * If changes are made which impact forward or backward compatibility they
> should be reserved for major releases only and should include a utility to
> help users with pre-existing data convert from some older format to the newer
> format. It may not be feasible to have rollback on major releases.
> * The content repository should not be changed within a major release cycle
> in any way that will harm forward or backward compatibility.
> * The flow file repository can change in that new fields can be added to
> existing write ahead log record types but no fields can be removed nor can
> any new types be added. Once a field is considered required it must remain
> required. Changes may only be made across minor version changes - not
> incremental.
> * Swap File storage should follow very similar rules to the flow file
> repository. Adding a schema to the swap file header may allow some variation
> there but the variation should only be hints to optimize how they're
> processed and not change their behavior otherwise. Changes are only permitted
> during minor version releases.
> * Provenance repository changes are only permitted during minor version
> releases. These changes may include adding or removing fields from existing
> event types. If a field is considered required it must always be considered
> required. If a field is removed then it must not be a required field and
> there must be a sensible default an older version could use if that value is
> not found in new data once rolled back. New event types may be added.
> Fields or event types not known to older version, if seen after a rollback,
> will simply be ignored.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)