[ 
https://issues.apache.org/jira/browse/NIFI-2854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15658581#comment-15658581
 ] 

ASF GitHub Bot commented on NIFI-2854:
--------------------------------------

Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1202#discussion_r87657911
  
    --- Diff: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
 ---
    @@ -251,353 +262,36 @@ public SwapSummary getSwapSummary(final String 
swapLocation) throws IOException
                     final InputStream bufferedIn = new 
BufferedInputStream(fis);
                     final DataInputStream in = new 
DataInputStream(bufferedIn)) {
     
    -            final int swapEncodingVersion = in.readInt();
    -            if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
    -                final String errMsg = "Cannot swap FlowFiles in from " + 
swapFile + " because the encoding version is "
    -                        + swapEncodingVersion + ", which is too new 
(expecting " + SWAP_ENCODING_VERSION + " or less)";
    -
    -                eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, 
errMsg);
    -                throw new IOException(errMsg);
    -            }
    -
    -            final int numRecords;
    -            final long contentSize;
    -            Long maxRecordId = null;
    -            try {
    -                in.readUTF(); // ignore Connection ID
    -                numRecords = in.readInt();
    -                contentSize = in.readLong();
    -
    -                if (numRecords == 0) {
    -                    return StandardSwapSummary.EMPTY_SUMMARY;
    -                }
    -
    -                if (swapEncodingVersion > 7) {
    -                    maxRecordId = in.readLong();
    -                }
    -            } catch (final EOFException eof) {
    -                logger.warn("Found premature End-of-File when reading Swap 
File {}. EOF occurred before any FlowFiles were encountered", swapLocation);
    -                return StandardSwapSummary.EMPTY_SUMMARY;
    -            }
    -
    -            final QueueSize queueSize = new QueueSize(numRecords, 
contentSize);
    -            final SwapContents swapContents = deserializeFlowFiles(in, 
queueSize, maxRecordId, swapEncodingVersion, true, claimManager, swapLocation);
    -            return swapContents.getSummary();
    -        }
    -    }
    -
    -    public static int serializeFlowFiles(final List<FlowFileRecord> 
toSwap, final FlowFileQueue queue, final String swapLocation, final 
OutputStream destination) throws IOException {
    -        if (toSwap == null || toSwap.isEmpty()) {
    -            return 0;
    -        }
    -
    -        long contentSize = 0L;
    -        for (final FlowFileRecord record : toSwap) {
    -            contentSize += record.getSize();
    -        }
    -
    -        // persist record to disk via the swap file
    -        final OutputStream bufferedOut = new 
BufferedOutputStream(destination);
    -        final DataOutputStream out = new DataOutputStream(bufferedOut);
    -        try {
    -            out.writeInt(SWAP_ENCODING_VERSION);
    -            out.writeUTF(queue.getIdentifier());
    -            out.writeInt(toSwap.size());
    -            out.writeLong(contentSize);
    -
    -            // get the max record id and write that out so that we know it 
quickly for restoration
    -            long maxRecordId = 0L;
    -            for (final FlowFileRecord flowFile : toSwap) {
    -                if (flowFile.getId() > maxRecordId) {
    -                    maxRecordId = flowFile.getId();
    -                }
    -            }
    -
    -            out.writeLong(maxRecordId);
    -
    -            for (final FlowFileRecord flowFile : toSwap) {
    -                out.writeLong(flowFile.getId());
    -                out.writeLong(flowFile.getEntryDate());
    -                out.writeLong(flowFile.getLineageStartDate());
    -                out.writeLong(flowFile.getLineageStartIndex());
    -                out.writeLong(flowFile.getLastQueueDate());
    -                out.writeLong(flowFile.getQueueDateIndex());
    -                out.writeLong(flowFile.getSize());
    -
    -                final ContentClaim claim = flowFile.getContentClaim();
    -                if (claim == null) {
    -                    out.writeBoolean(false);
    -                } else {
    -                    out.writeBoolean(true);
    -                    final ResourceClaim resourceClaim = 
claim.getResourceClaim();
    -                    out.writeUTF(resourceClaim.getId());
    -                    out.writeUTF(resourceClaim.getContainer());
    -                    out.writeUTF(resourceClaim.getSection());
    -                    out.writeLong(claim.getOffset());
    -                    out.writeLong(claim.getLength());
    -                    out.writeLong(flowFile.getContentClaimOffset());
    -                    out.writeBoolean(resourceClaim.isLossTolerant());
    -                }
    -
    -                final Map<String, String> attributes = 
flowFile.getAttributes();
    -                out.writeInt(attributes.size());
    -                for (final Map.Entry<String, String> entry : 
attributes.entrySet()) {
    -                    writeString(entry.getKey(), out);
    -                    writeString(entry.getValue(), out);
    -                }
    -            }
    -        } finally {
    -            out.flush();
    +            final SwapDeserializer deserializer = 
createSwapDeserializer(in);
    +            return deserializer.getSwapSummary(in, swapLocation, 
claimManager);
             }
    -
    -        logger.info("Successfully swapped out {} FlowFiles from {} to Swap 
File {}", toSwap.size(), queue, swapLocation);
    -
    -        return toSwap.size();
         }
     
    -    private static void writeString(final String toWrite, final 
OutputStream out) throws IOException {
    -        final byte[] bytes = toWrite.getBytes(StandardCharsets.UTF_8);
    -        final int utflen = bytes.length;
     
    -        if (utflen < 65535) {
    -            out.write(utflen >>> 8);
    -            out.write(utflen);
    -            out.write(bytes);
    -        } else {
    -            out.write(255);
    -            out.write(255);
    -            out.write(utflen >>> 24);
    -            out.write(utflen >>> 16);
    -            out.write(utflen >>> 8);
    -            out.write(utflen);
    -            out.write(bytes);
    -        }
    -    }
    -
    -    static SwapContents deserializeFlowFiles(final DataInputStream in, 
final String swapLocation, final FlowFileQueue queue, final 
ResourceClaimManager claimManager) throws IOException {
    -        final int swapEncodingVersion = in.readInt();
    -        if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
    -            throw new IOException("Cannot swap FlowFiles in from SwapFile 
because the encoding version is "
    -                    + swapEncodingVersion + ", which is too new (expecting 
" + SWAP_ENCODING_VERSION + " or less)");
    -        }
    +    private SwapDeserializer createSwapDeserializer(final DataInputStream 
dis) throws IOException {
    +        dis.mark(4);
    --- End diff --
    
    Probably best to use MAGIC_HEADER.length here instead of 4 like the init 
for the byte array uses.


> Enable repositories to support upgrades and rollback in well defined scenarios
> ------------------------------------------------------------------------------
>
>                 Key: NIFI-2854
>                 URL: https://issues.apache.org/jira/browse/NIFI-2854
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Core Framework
>            Reporter: Mark Payne
>            Assignee: Mark Payne
>             Fix For: 1.1.0
>
>
> The flowfile, swapfile, provenance, and content repositories play a very 
> important roll in NiFi's ability to be safely upgraded and rolled back.  We 
> need to have well documented behaviors, designs, and version adherence so 
> that users can safely rely on these mechanisms.
> Once this is formalized and in place we should update our versioning guidance 
> to reflect this as well.
> The following would be true from NiFi 1.2.0 onward
> * No changes to how the repositories are persisted to disk can be made which 
> will break forward/backward compatibility and specifically this means that 
> things like the way each is serialized to disk cannot change.
> * If changes are made which impact forward or backward compatibility they 
> should be reserved for major releases only and should include a utility to 
> help users with pre-existing data convert from some older format to the newer 
> format.  It may not be feasible to have rollback on major releases.
> * The content repository should not be changed within a major release cycle 
> in any way that will harm forward or backward compatibility.
> * The flow file repository can change in that new fields can be added to 
> existing write ahead log record types but no fields can be removed nor can 
> any new types be added.  Once a field is considered required it must remain 
> required.  Changes may only be made across minor version changes - not 
> incremental.
> * Swap File storage should follow very similar rules to the flow file 
> repository.  Adding a schema to the swap file header may allow some variation 
> there but the variation should only be hints to optimize how they're 
> processed and not change their behavior otherwise. Changes are only permitted 
> during minor version releases.
> * Provenance repository changes are only permitted during minor version 
> releases.  These changes may include adding or removing fields from existing 
> event types.  If a field is considered required it must always be considered 
> required.  If a field is removed then it must not be a required field and 
> there must be a sensible default an older version could use if that value is 
> not found in new data once rolled back.  New event types may be added.  
> Fields or event types not known to older version, if seen after a rollback, 
> will simply be ignored.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to