[ 
https://issues.apache.org/jira/browse/NIFI-2854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15658576#comment-15658576
 ] 

ASF GitHub Bot commented on NIFI-2854:
--------------------------------------

Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1202#discussion_r87631014
  
    --- Diff: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadRepositoryRecordSerde.java
 ---
    @@ -0,0 +1,517 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.controller.repository;
    +
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.EOFException;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import org.apache.nifi.controller.queue.FlowFileQueue;
    +import org.apache.nifi.controller.repository.claim.ContentClaim;
    +import org.apache.nifi.controller.repository.claim.ResourceClaim;
    +import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
    +import org.apache.nifi.controller.repository.claim.StandardContentClaim;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.wali.SerDe;
    +import org.wali.UpdateType;
    +
    +public class WriteAheadRepositoryRecordSerde extends RepositoryRecordSerde 
implements SerDe<RepositoryRecord> {
    +    private static final Logger logger = 
LoggerFactory.getLogger(WriteAheadRepositoryRecordSerde.class);
    +
    +    private static final int CURRENT_ENCODING_VERSION = 9;
    +
    +    public static final byte ACTION_CREATE = 0;
    +    public static final byte ACTION_UPDATE = 1;
    +    public static final byte ACTION_DELETE = 2;
    +    public static final byte ACTION_SWAPPED_OUT = 3;
    +    public static final byte ACTION_SWAPPED_IN = 4;
    +
    +    private long recordsRestored = 0L;
    +    private final ResourceClaimManager claimManager;
    +
    +    public WriteAheadRepositoryRecordSerde(final ResourceClaimManager 
claimManager) {
    +        this.claimManager = claimManager;
    +    }
    +
    +    @Override
    +    public void serializeEdit(final RepositoryRecord previousRecordState, 
final RepositoryRecord record, final DataOutputStream out) throws IOException {
    +        serializeEdit(previousRecordState, record, out, false);
    +    }
    +
    +    public void serializeEdit(final RepositoryRecord previousRecordState, 
final RepositoryRecord record, final DataOutputStream out, final boolean 
forceAttributesWritten) throws IOException {
    +        if (record.isMarkedForAbort()) {
    +            logger.warn("Repository Record {} is marked to be aborted; it 
will be persisted in the FlowFileRepository as a DELETE record", record);
    +            out.write(ACTION_DELETE);
    +            out.writeLong(getRecordIdentifier(record));
    +            serializeContentClaim(record.getCurrentClaim(), 
record.getCurrentClaimOffset(), out);
    +            return;
    +        }
    +
    +        final UpdateType updateType = getUpdateType(record);
    +
    +        if (updateType.equals(UpdateType.DELETE)) {
    +            out.write(ACTION_DELETE);
    +            out.writeLong(getRecordIdentifier(record));
    +            serializeContentClaim(record.getCurrentClaim(), 
record.getCurrentClaimOffset(), out);
    +            return;
    +        }
    +
    +        // If there's a Destination Connection, that's the one that we 
want to associated with this record.
    +        // However, on restart, we will restore the FlowFile and set this 
connection to its "originalConnection".
    +        // If we then serialize the FlowFile again before it's 
transferred, it's important to allow this to happen,
    +        // so we use the originalConnection instead
    +        FlowFileQueue associatedQueue = record.getDestination();
    +        if (associatedQueue == null) {
    +            associatedQueue = record.getOriginalQueue();
    +        }
    +
    +        if (updateType.equals(UpdateType.SWAP_OUT)) {
    +            out.write(ACTION_SWAPPED_OUT);
    +            out.writeLong(getRecordIdentifier(record));
    +            out.writeUTF(associatedQueue.getIdentifier());
    +            out.writeUTF(getLocation(record));
    +            return;
    +        }
    +
    +        final FlowFile flowFile = record.getCurrent();
    +        final ContentClaim claim = record.getCurrentClaim();
    +
    +        switch (updateType) {
    +            case UPDATE:
    +                out.write(ACTION_UPDATE);
    +                break;
    +            case CREATE:
    +                out.write(ACTION_CREATE);
    +                break;
    +            case SWAP_IN:
    +                out.write(ACTION_SWAPPED_IN);
    +                break;
    +            default:
    +                throw new AssertionError();
    +        }
    +
    +        out.writeLong(getRecordIdentifier(record));
    +        out.writeLong(flowFile.getEntryDate());
    +        out.writeLong(flowFile.getLineageStartDate());
    +        out.writeLong(flowFile.getLineageStartIndex());
    +
    +        final Long queueDate = flowFile.getLastQueueDate();
    +        out.writeLong(queueDate == null ? System.currentTimeMillis() : 
queueDate);
    +        out.writeLong(flowFile.getQueueDateIndex());
    +        out.writeLong(flowFile.getSize());
    +
    +        if (associatedQueue == null) {
    +            logger.warn("{} Repository Record {} has no Connection 
associated with it; it will be destroyed on restart",
    +                new Object[] {this, record});
    +            writeString("", out);
    +        } else {
    +            writeString(associatedQueue.getIdentifier(), out);
    +        }
    +
    +        serializeContentClaim(claim, record.getCurrentClaimOffset(), out);
    +
    +        if (forceAttributesWritten || record.isAttributesChanged() || 
updateType == UpdateType.CREATE || updateType == UpdateType.SWAP_IN) {
    +            out.write(1);   // indicate attributes changed
    +            final Map<String, String> attributes = 
flowFile.getAttributes();
    +            out.writeInt(attributes.size());
    +            for (final Map.Entry<String, String> entry : 
attributes.entrySet()) {
    +                writeString(entry.getKey(), out);
    +                writeString(entry.getValue(), out);
    +            }
    +        } else {
    +            out.write(0);   // indicate attributes did not change
    +        }
    +
    +        if (updateType == UpdateType.SWAP_IN) {
    +            out.writeUTF(record.getSwapLocation());
    +        }
    +    }
    +
    +    @Override
    +    public RepositoryRecord deserializeEdit(final DataInputStream in, 
final Map<Object, RepositoryRecord> currentRecordStates, final int version) 
throws IOException {
    +        final int action = in.read();
    +        final long recordId = in.readLong();
    +        if (action == ACTION_DELETE) {
    +            final StandardFlowFileRecord.Builder ffBuilder = new 
StandardFlowFileRecord.Builder().id(recordId);
    +
    +            if (version > 4) {
    +                deserializeClaim(in, version, ffBuilder);
    +            }
    +
    +            final FlowFileRecord flowFileRecord = ffBuilder.build();
    +            final StandardRepositoryRecord record = new 
StandardRepositoryRecord((FlowFileQueue) null, flowFileRecord);
    +            record.markForDelete();
    +
    +            return record;
    +        }
    +
    +        if (action == ACTION_SWAPPED_OUT) {
    +            final String queueId = in.readUTF();
    +            final String location = in.readUTF();
    +            final FlowFileQueue queue = getFlowFileQueue(queueId);
    +
    +            final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
    +                .id(recordId)
    +                .build();
    +
    +            return new StandardRepositoryRecord(queue, flowFileRecord, 
location);
    +        }
    +
    +        final StandardFlowFileRecord.Builder ffBuilder = new 
StandardFlowFileRecord.Builder();
    +        final RepositoryRecord record = currentRecordStates.get(recordId);
    +        ffBuilder.id(recordId);
    +        if (record != null) {
    +            ffBuilder.fromFlowFile(record.getCurrent());
    +        }
    +        ffBuilder.entryDate(in.readLong());
    +
    +        if (version > 1) {
    +            // read the lineage identifiers and lineage start date, which 
were added in version 2.
    +            if (version < 9) {
    +                final int numLineageIds = in.readInt();
    +                for (int i = 0; i < numLineageIds; i++) {
    +                    in.readUTF(); //skip identifiers
    +                }
    +            }
    +            final long lineageStartDate = in.readLong();
    +            final long lineageStartIndex;
    +            if (version > 7) {
    +                lineageStartIndex = in.readLong();
    +            } else {
    +                lineageStartIndex = 0L;
    +            }
    +            ffBuilder.lineageStart(lineageStartDate, lineageStartIndex);
    +
    +            if (version > 5) {
    +                final long lastQueueDate = in.readLong();
    +                final long queueDateIndex;
    +                if (version > 7) {
    +                    queueDateIndex = in.readLong();
    +                } else {
    +                    queueDateIndex = 0L;
    +                }
    +
    +                ffBuilder.lastQueued(lastQueueDate, queueDateIndex);
    +            }
    +        }
    +
    +        ffBuilder.size(in.readLong());
    +        final String connectionId = readString(in);
    +
    +        logger.debug("{} -> {}", new Object[] {recordId, connectionId});
    +
    +        deserializeClaim(in, version, ffBuilder);
    +
    +        // recover new attributes, if they changed
    +        final int attributesChanged = in.read();
    +        if (attributesChanged == -1) {
    +            throw new EOFException();
    +        } else if (attributesChanged == 1) {
    +            final int numAttributes = in.readInt();
    +            final Map<String, String> attributes = new HashMap<>();
    +            for (int i = 0; i < numAttributes; i++) {
    +                final String key = readString(in);
    +                final String value = readString(in);
    +                attributes.put(key, value);
    +            }
    +
    +            ffBuilder.addAttributes(attributes);
    +        } else if (attributesChanged != 0) {
    +            throw new IOException("Attribute Change Qualifier not found in 
stream; found value: "
    +                + attributesChanged + " after successfully restoring " + 
recordsRestored + " records. The FlowFile Repository appears to be corrupt!");
    +        }
    +
    +        final FlowFileRecord flowFile = ffBuilder.build();
    +        String swapLocation = null;
    +        if (action == ACTION_SWAPPED_IN) {
    +            swapLocation = in.readUTF();
    +        }
    +
    +        final FlowFileQueue queue = getFlowFileQueue(connectionId);
    +        final StandardRepositoryRecord standardRepoRecord = new 
StandardRepositoryRecord(queue, flowFile);
    +        if (swapLocation != null) {
    --- End diff --
    
    The logic here is slightly different than the old 
WriteAheadRepositorySerde. In the old, it would check it if the FlowFile Queue 
was null and only do this logic of checking the swap location, connectionID and 
queue if it wasn't (line 637 of WriteAheadFlowFileRepository). 
    
    Just wanted to draw attention to the slight difference to be sure it's 
correct.


> Enable repositories to support upgrades and rollback in well defined scenarios
> ------------------------------------------------------------------------------
>
>                 Key: NIFI-2854
>                 URL: https://issues.apache.org/jira/browse/NIFI-2854
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Core Framework
>            Reporter: Mark Payne
>            Assignee: Mark Payne
>             Fix For: 1.1.0
>
>
> The flowfile, swapfile, provenance, and content repositories play a very 
> important roll in NiFi's ability to be safely upgraded and rolled back.  We 
> need to have well documented behaviors, designs, and version adherence so 
> that users can safely rely on these mechanisms.
> Once this is formalized and in place we should update our versioning guidance 
> to reflect this as well.
> The following would be true from NiFi 1.2.0 onward
> * No changes to how the repositories are persisted to disk can be made which 
> will break forward/backward compatibility and specifically this means that 
> things like the way each is serialized to disk cannot change.
> * If changes are made which impact forward or backward compatibility they 
> should be reserved for major releases only and should include a utility to 
> help users with pre-existing data convert from some older format to the newer 
> format.  It may not be feasible to have rollback on major releases.
> * The content repository should not be changed within a major release cycle 
> in any way that will harm forward or backward compatibility.
> * The flow file repository can change in that new fields can be added to 
> existing write ahead log record types but no fields can be removed nor can 
> any new types be added.  Once a field is considered required it must remain 
> required.  Changes may only be made across minor version changes - not 
> incremental.
> * Swap File storage should follow very similar rules to the flow file 
> repository.  Adding a schema to the swap file header may allow some variation 
> there but the variation should only be hints to optimize how they're 
> processed and not change their behavior otherwise. Changes are only permitted 
> during minor version releases.
> * Provenance repository changes are only permitted during minor version 
> releases.  These changes may include adding or removing fields from existing 
> event types.  If a field is considered required it must always be considered 
> required.  If a field is removed then it must not be a required field and 
> there must be a sensible default an older version could use if that value is 
> not found in new data once rolled back.  New event types may be added.  
> Fields or event types not known to older version, if seen after a rollback, 
> will simply be ignored.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to