[
https://issues.apache.org/jira/browse/NIFI-2854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15658573#comment-15658573
]
ASF GitHub Bot commented on NIFI-2854:
--------------------------------------
Github user JPercivall commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1202#discussion_r87629214
--- Diff:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadRepositoryRecordSerde.java
---
@@ -0,0 +1,517 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
+import org.apache.nifi.flowfile.FlowFile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wali.SerDe;
+import org.wali.UpdateType;
+
+public class WriteAheadRepositoryRecordSerde extends RepositoryRecordSerde
implements SerDe<RepositoryRecord> {
+ private static final Logger logger =
LoggerFactory.getLogger(WriteAheadRepositoryRecordSerde.class);
+
+ private static final int CURRENT_ENCODING_VERSION = 9;
+
+ public static final byte ACTION_CREATE = 0;
+ public static final byte ACTION_UPDATE = 1;
+ public static final byte ACTION_DELETE = 2;
+ public static final byte ACTION_SWAPPED_OUT = 3;
+ public static final byte ACTION_SWAPPED_IN = 4;
+
+ private long recordsRestored = 0L;
+ private final ResourceClaimManager claimManager;
+
+ public WriteAheadRepositoryRecordSerde(final ResourceClaimManager
claimManager) {
+ this.claimManager = claimManager;
+ }
+
+ @Override
+ public void serializeEdit(final RepositoryRecord previousRecordState,
final RepositoryRecord record, final DataOutputStream out) throws IOException {
+ serializeEdit(previousRecordState, record, out, false);
+ }
+
+ public void serializeEdit(final RepositoryRecord previousRecordState,
final RepositoryRecord record, final DataOutputStream out, final boolean
forceAttributesWritten) throws IOException {
+ if (record.isMarkedForAbort()) {
+ logger.warn("Repository Record {} is marked to be aborted; it
will be persisted in the FlowFileRepository as a DELETE record", record);
+ out.write(ACTION_DELETE);
+ out.writeLong(getRecordIdentifier(record));
+ serializeContentClaim(record.getCurrentClaim(),
record.getCurrentClaimOffset(), out);
+ return;
+ }
+
+ final UpdateType updateType = getUpdateType(record);
+
+ if (updateType.equals(UpdateType.DELETE)) {
+ out.write(ACTION_DELETE);
+ out.writeLong(getRecordIdentifier(record));
+ serializeContentClaim(record.getCurrentClaim(),
record.getCurrentClaimOffset(), out);
+ return;
+ }
+
+ // If there's a Destination Connection, that's the one that we
want to associated with this record.
+ // However, on restart, we will restore the FlowFile and set this
connection to its "originalConnection".
+ // If we then serialize the FlowFile again before it's
transferred, it's important to allow this to happen,
+ // so we use the originalConnection instead
+ FlowFileQueue associatedQueue = record.getDestination();
+ if (associatedQueue == null) {
+ associatedQueue = record.getOriginalQueue();
+ }
+
+ if (updateType.equals(UpdateType.SWAP_OUT)) {
+ out.write(ACTION_SWAPPED_OUT);
+ out.writeLong(getRecordIdentifier(record));
+ out.writeUTF(associatedQueue.getIdentifier());
+ out.writeUTF(getLocation(record));
+ return;
+ }
+
+ final FlowFile flowFile = record.getCurrent();
+ final ContentClaim claim = record.getCurrentClaim();
+
+ switch (updateType) {
+ case UPDATE:
+ out.write(ACTION_UPDATE);
+ break;
+ case CREATE:
+ out.write(ACTION_CREATE);
+ break;
+ case SWAP_IN:
+ out.write(ACTION_SWAPPED_IN);
+ break;
+ default:
+ throw new AssertionError();
+ }
+
+ out.writeLong(getRecordIdentifier(record));
+ out.writeLong(flowFile.getEntryDate());
+ out.writeLong(flowFile.getLineageStartDate());
+ out.writeLong(flowFile.getLineageStartIndex());
+
+ final Long queueDate = flowFile.getLastQueueDate();
+ out.writeLong(queueDate == null ? System.currentTimeMillis() :
queueDate);
+ out.writeLong(flowFile.getQueueDateIndex());
+ out.writeLong(flowFile.getSize());
+
+ if (associatedQueue == null) {
+ logger.warn("{} Repository Record {} has no Connection
associated with it; it will be destroyed on restart",
+ new Object[] {this, record});
+ writeString("", out);
+ } else {
+ writeString(associatedQueue.getIdentifier(), out);
+ }
+
+ serializeContentClaim(claim, record.getCurrentClaimOffset(), out);
+
+ if (forceAttributesWritten || record.isAttributesChanged() ||
updateType == UpdateType.CREATE || updateType == UpdateType.SWAP_IN) {
+ out.write(1); // indicate attributes changed
+ final Map<String, String> attributes =
flowFile.getAttributes();
+ out.writeInt(attributes.size());
+ for (final Map.Entry<String, String> entry :
attributes.entrySet()) {
+ writeString(entry.getKey(), out);
+ writeString(entry.getValue(), out);
+ }
+ } else {
+ out.write(0); // indicate attributes did not change
+ }
+
+ if (updateType == UpdateType.SWAP_IN) {
+ out.writeUTF(record.getSwapLocation());
+ }
+ }
+
+ @Override
+ public RepositoryRecord deserializeEdit(final DataInputStream in,
final Map<Object, RepositoryRecord> currentRecordStates, final int version)
throws IOException {
+ final int action = in.read();
+ final long recordId = in.readLong();
+ if (action == ACTION_DELETE) {
+ final StandardFlowFileRecord.Builder ffBuilder = new
StandardFlowFileRecord.Builder().id(recordId);
+
+ if (version > 4) {
+ deserializeClaim(in, version, ffBuilder);
+ }
+
+ final FlowFileRecord flowFileRecord = ffBuilder.build();
+ final StandardRepositoryRecord record = new
StandardRepositoryRecord((FlowFileQueue) null, flowFileRecord);
+ record.markForDelete();
+
+ return record;
+ }
+
+ if (action == ACTION_SWAPPED_OUT) {
+ final String queueId = in.readUTF();
+ final String location = in.readUTF();
+ final FlowFileQueue queue = getFlowFileQueue(queueId);
+
+ final FlowFileRecord flowFileRecord = new
StandardFlowFileRecord.Builder()
+ .id(recordId)
+ .build();
+
+ return new StandardRepositoryRecord(queue, flowFileRecord,
location);
+ }
+
+ final StandardFlowFileRecord.Builder ffBuilder = new
StandardFlowFileRecord.Builder();
+ final RepositoryRecord record = currentRecordStates.get(recordId);
+ ffBuilder.id(recordId);
+ if (record != null) {
+ ffBuilder.fromFlowFile(record.getCurrent());
+ }
+ ffBuilder.entryDate(in.readLong());
+
+ if (version > 1) {
--- End diff --
Is this versioning information documented anywhere?
> Enable repositories to support upgrades and rollback in well defined scenarios
> ------------------------------------------------------------------------------
>
> Key: NIFI-2854
> URL: https://issues.apache.org/jira/browse/NIFI-2854
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Core Framework
> Reporter: Mark Payne
> Assignee: Mark Payne
> Fix For: 1.1.0
>
>
> The flowfile, swapfile, provenance, and content repositories play a very
> important roll in NiFi's ability to be safely upgraded and rolled back. We
> need to have well documented behaviors, designs, and version adherence so
> that users can safely rely on these mechanisms.
> Once this is formalized and in place we should update our versioning guidance
> to reflect this as well.
> The following would be true from NiFi 1.2.0 onward
> * No changes to how the repositories are persisted to disk can be made which
> will break forward/backward compatibility and specifically this means that
> things like the way each is serialized to disk cannot change.
> * If changes are made which impact forward or backward compatibility they
> should be reserved for major releases only and should include a utility to
> help users with pre-existing data convert from some older format to the newer
> format. It may not be feasible to have rollback on major releases.
> * The content repository should not be changed within a major release cycle
> in any way that will harm forward or backward compatibility.
> * The flow file repository can change in that new fields can be added to
> existing write ahead log record types but no fields can be removed nor can
> any new types be added. Once a field is considered required it must remain
> required. Changes may only be made across minor version changes - not
> incremental.
> * Swap File storage should follow very similar rules to the flow file
> repository. Adding a schema to the swap file header may allow some variation
> there but the variation should only be hints to optimize how they're
> processed and not change their behavior otherwise. Changes are only permitted
> during minor version releases.
> * Provenance repository changes are only permitted during minor version
> releases. These changes may include adding or removing fields from existing
> event types. If a field is considered required it must always be considered
> required. If a field is removed then it must not be a required field and
> there must be a sensible default an older version could use if that value is
> not found in new data once rolled back. New event types may be added.
> Fields or event types not known to older version, if seen after a rollback,
> will simply be ignored.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)