NIFI-3273: Added nifi-toolkit-flowfile-repo that contains a simple Java class 
that is capable of recovering a FlowFile Repository manually in the case of an 
operating system crash that results in trailing 0's being dumped into the edit 
logs. Also refactored flowfile repo into some independent modules so that it 
additional capabilities can be added in the future to examine the flowfile repo


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0207f21c
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0207f21c
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0207f21c

Branch: refs/heads/master
Commit: 0207f21ce4bb68536843d65ecc9110ec834028cd
Parents: 141334c
Author: Mark Payne <[email protected]>
Authored: Wed Mar 22 13:08:30 2017 -0400
Committer: joewitt <[email protected]>
Committed: Wed Apr 19 21:44:23 2017 -0700

----------------------------------------------------------------------
 .../nifi-flowfile-repo-serialization/pom.xml    |  44 ++
 .../repository/RepositoryRecordSerde.java       |  68 +++
 .../RepositoryRecordSerdeFactory.java           |  95 ++++
 .../repository/SchemaRepositoryRecordSerde.java | 239 +++++++++
 .../WriteAheadRepositoryRecordSerde.java        | 517 +++++++++++++++++++
 .../repository/schema/ContentClaimFieldMap.java | 125 +++++
 .../repository/schema/ContentClaimSchema.java   |  63 +++
 .../schema/FlowFileRecordFieldMap.java          |  99 ++++
 .../repository/schema/FlowFileSchema.java       |  87 ++++
 .../schema/RepositoryRecordFieldMap.java        |  83 +++
 .../schema/RepositoryRecordSchema.java          | 138 +++++
 .../schema/RepositoryRecordUpdate.java          |  73 +++
 .../schema/ResourceClaimFieldMap.java           |  85 +++
 .../SchemaRepositoryRecordSerdeTest.java        | 266 ++++++++++
 .../nifi-framework/nifi-framework-core/pom.xml  |   8 +
 .../repository/RepositoryRecordSerde.java       |  68 ---
 .../RepositoryRecordSerdeFactory.java           |  95 ----
 .../repository/SchemaRepositoryRecordSerde.java | 239 ---------
 .../repository/StandardFlowFileRecord.java      | 341 ------------
 .../repository/StandardRepositoryRecord.java    | 221 --------
 .../WriteAheadRepositoryRecordSerde.java        | 517 -------------------
 .../repository/claim/StandardContentClaim.java  | 105 ----
 .../repository/claim/StandardResourceClaim.java | 122 -----
 .../claim/StandardResourceClaimManager.java     | 219 --------
 .../repository/schema/ContentClaimFieldMap.java | 125 -----
 .../repository/schema/ContentClaimSchema.java   |  63 ---
 .../schema/FlowFileRecordFieldMap.java          |  99 ----
 .../repository/schema/FlowFileSchema.java       |  87 ----
 .../schema/RepositoryRecordFieldMap.java        |  83 ---
 .../schema/RepositoryRecordSchema.java          | 138 -----
 .../schema/RepositoryRecordUpdate.java          |  73 ---
 .../schema/ResourceClaimFieldMap.java           |  85 ---
 .../nifi-repository-models/pom.xml              |  44 ++
 .../repository/StandardFlowFileRecord.java      | 341 ++++++++++++
 .../repository/StandardRepositoryRecord.java    | 221 ++++++++
 .../repository/claim/StandardContentClaim.java  | 105 ++++
 .../repository/claim/StandardResourceClaim.java | 122 +++++
 .../claim/StandardResourceClaimManager.java     | 219 ++++++++
 .../nifi-framework/pom.xml                      |   2 +
 nifi-toolkit/nifi-toolkit-flowfile-repo/pom.xml |  27 +
 .../flowfile/RepairCorruptedFileEndings.java    | 287 ++++++++++
 .../TestRepairCorruptedFileEndings.java         | 169 ++++++
 nifi-toolkit/pom.xml                            |   1 +
 pom.xml                                         | 231 +++++----
 44 files changed, 3663 insertions(+), 2776 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/pom.xml
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/pom.xml
new file mode 100644
index 0000000..02988c0
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/pom.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more 
contributor 
+    license agreements. See the NOTICE file distributed with this work for 
additional 
+    information regarding copyright ownership. The ASF licenses this file to 
+    You under the Apache License, Version 2.0 (the "License"); you may not use 
+    this file except in compliance with the License. You may obtain a copy of 
+    the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
+    by applicable law or agreed to in writing, software distributed under the 
+    License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 
CONDITIONS 
+    OF ANY KIND, either express or implied. See the License for the specific 
+    language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-framework</artifactId>
+        <version>1.2.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>nifi-flowfile-repo-serialization</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-framework-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-repository-models</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-write-ahead-log</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-schema-utils</artifactId>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerde.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerde.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerde.java
new file mode 100644
index 0000000..44ed62d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerde.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository;
+
+import java.util.Map;
+
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.wali.SerDe;
+import org.wali.UpdateType;
+
+public abstract class RepositoryRecordSerde implements SerDe<RepositoryRecord> 
{
+    private Map<String, FlowFileQueue> flowFileQueueMap = null;
+
+    protected void setQueueMap(final Map<String, FlowFileQueue> queueMap) {
+        this.flowFileQueueMap = queueMap;
+    }
+
+    protected Map<String, FlowFileQueue> getQueueMap() {
+        return flowFileQueueMap;
+    }
+
+    protected FlowFileQueue getFlowFileQueue(final String queueId) {
+        return flowFileQueueMap.get(queueId);
+    }
+
+    @Override
+    public Long getRecordIdentifier(final RepositoryRecord record) {
+        return record.getCurrent().getId();
+    }
+
+    @Override
+    public UpdateType getUpdateType(final RepositoryRecord record) {
+        switch (record.getType()) {
+            case CONTENTMISSING:
+            case DELETE:
+                return UpdateType.DELETE;
+            case CREATE:
+                return UpdateType.CREATE;
+            case UPDATE:
+                return UpdateType.UPDATE;
+            case SWAP_OUT:
+                return UpdateType.SWAP_OUT;
+            case SWAP_IN:
+                return UpdateType.SWAP_IN;
+        }
+        return null;
+    }
+
+    @Override
+    public String getLocation(final RepositoryRecord record) {
+        return record.getSwapLocation();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerdeFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerdeFactory.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerdeFactory.java
new file mode 100644
index 0000000..c19fa94
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerdeFactory.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository;
+
+import java.util.Map;
+
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.wali.SerDe;
+import org.wali.SerDeFactory;
+import org.wali.UpdateType;
+
+public class RepositoryRecordSerdeFactory implements 
SerDeFactory<RepositoryRecord> {
+    private final String LEGACY_SERDE_ENCODING_NAME = 
"org.apache.nifi.controller.repository.WriteAheadFlowFileRepository$WriteAheadRecordSerde";
+    private final ResourceClaimManager resourceClaimManager;
+    private Map<String, FlowFileQueue> flowFileQueueMap = null;
+
+    public RepositoryRecordSerdeFactory(final ResourceClaimManager 
claimManager) {
+        this.resourceClaimManager = claimManager;
+    }
+
+    protected void setQueueMap(final Map<String, FlowFileQueue> queueMap) {
+        this.flowFileQueueMap = queueMap;
+    }
+
+    protected Map<String, FlowFileQueue> getQueueMap() {
+        return flowFileQueueMap;
+    }
+
+    @Override
+    public SerDe<RepositoryRecord> createSerDe(final String encodingName) {
+        if (encodingName == null || 
SchemaRepositoryRecordSerde.class.getName().equals(encodingName)) {
+            final SchemaRepositoryRecordSerde serde = new 
SchemaRepositoryRecordSerde(resourceClaimManager);
+            serde.setQueueMap(flowFileQueueMap);
+            return serde;
+        }
+
+        if 
(WriteAheadRepositoryRecordSerde.class.getName().equals(encodingName)
+            || LEGACY_SERDE_ENCODING_NAME.equals(encodingName)) {
+            final WriteAheadRepositoryRecordSerde serde = new 
WriteAheadRepositoryRecordSerde(resourceClaimManager);
+            serde.setQueueMap(flowFileQueueMap);
+            return serde;
+        }
+
+        throw new IllegalArgumentException("Cannot create Deserializer for 
Repository Records because the encoding '" + encodingName + "' is not known");
+    }
+
+    protected FlowFileQueue getFlowFileQueue(final String queueId) {
+        return flowFileQueueMap.get(queueId);
+    }
+
+    @Override
+    public Long getRecordIdentifier(final RepositoryRecord record) {
+        return record.getCurrent().getId();
+    }
+
+    @Override
+    public UpdateType getUpdateType(final RepositoryRecord record) {
+        switch (record.getType()) {
+            case CONTENTMISSING:
+            case DELETE:
+                return UpdateType.DELETE;
+            case CREATE:
+                return UpdateType.CREATE;
+            case UPDATE:
+                return UpdateType.UPDATE;
+            case SWAP_OUT:
+                return UpdateType.SWAP_OUT;
+            case SWAP_IN:
+                return UpdateType.SWAP_IN;
+        }
+        return null;
+    }
+
+    @Override
+    public String getLocation(final RepositoryRecord record) {
+        return record.getSwapLocation();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java
new file mode 100644
index 0000000..221f8ce
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.schema.ContentClaimFieldMap;
+import org.apache.nifi.controller.repository.schema.ContentClaimSchema;
+import org.apache.nifi.controller.repository.schema.FlowFileSchema;
+import org.apache.nifi.controller.repository.schema.RepositoryRecordFieldMap;
+import org.apache.nifi.controller.repository.schema.RepositoryRecordSchema;
+import org.apache.nifi.controller.repository.schema.RepositoryRecordUpdate;
+import org.apache.nifi.repository.schema.FieldType;
+import org.apache.nifi.repository.schema.Record;
+import org.apache.nifi.repository.schema.RecordSchema;
+import org.apache.nifi.repository.schema.Repetition;
+import org.apache.nifi.repository.schema.SchemaRecordReader;
+import org.apache.nifi.repository.schema.SchemaRecordWriter;
+import org.apache.nifi.repository.schema.SimpleRecordField;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wali.SerDe;
+import org.wali.UpdateType;
+
+public class SchemaRepositoryRecordSerde extends RepositoryRecordSerde 
implements SerDe<RepositoryRecord> {
+    private static final Logger logger = 
LoggerFactory.getLogger(SchemaRepositoryRecordSerde.class);
+    private static final int MAX_ENCODING_VERSION = 2;
+
+    private final RecordSchema writeSchema = 
RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V2;
+    private final RecordSchema contentClaimSchema = 
ContentClaimSchema.CONTENT_CLAIM_SCHEMA_V1;
+
+    private final ResourceClaimManager resourceClaimManager;
+    private volatile RecordSchema recoverySchema;
+
+    public SchemaRepositoryRecordSerde(final ResourceClaimManager 
resourceClaimManager) {
+        this.resourceClaimManager = resourceClaimManager;
+    }
+
+    @Override
+    public void writeHeader(final DataOutputStream out) throws IOException {
+        writeSchema.writeTo(out);
+    }
+
+    @Override
+    public void serializeEdit(final RepositoryRecord previousRecordState, 
final RepositoryRecord newRecordState, final DataOutputStream out) throws 
IOException {
+        serializeRecord(newRecordState, out);
+    }
+
+    @Override
+    public void serializeRecord(final RepositoryRecord record, final 
DataOutputStream out) throws IOException {
+        final RecordSchema schema;
+        switch (record.getType()) {
+            case CREATE:
+            case UPDATE:
+                schema = RepositoryRecordSchema.CREATE_OR_UPDATE_SCHEMA_V2;
+                break;
+            case CONTENTMISSING:
+            case DELETE:
+                schema = RepositoryRecordSchema.DELETE_SCHEMA_V2;
+                break;
+            case SWAP_IN:
+                schema = RepositoryRecordSchema.SWAP_IN_SCHEMA_V2;
+                break;
+            case SWAP_OUT:
+                schema = RepositoryRecordSchema.SWAP_OUT_SCHEMA_V2;
+                break;
+            default:
+                throw new IllegalArgumentException("Received Repository Record 
with unknown Update Type: " + record.getType()); // won't happen.
+        }
+
+        serializeRecord(record, out, schema, 
RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V2);
+    }
+
+
+    protected void serializeRecord(final RepositoryRecord record, final 
DataOutputStream out, RecordSchema schema, RecordSchema repositoryRecordSchema) 
throws IOException {
+        final RepositoryRecordFieldMap fieldMap = new 
RepositoryRecordFieldMap(record, schema, contentClaimSchema);
+        final RepositoryRecordUpdate update = new 
RepositoryRecordUpdate(fieldMap, repositoryRecordSchema);
+        new SchemaRecordWriter().writeRecord(update, out);
+    }
+
+    @Override
+    public void readHeader(final DataInputStream in) throws IOException {
+        recoverySchema = RecordSchema.readFrom(in);
+    }
+
+    @Override
+    public RepositoryRecord deserializeEdit(final DataInputStream in, final 
Map<Object, RepositoryRecord> currentRecordStates, final int version) throws 
IOException {
+        return deserializeRecord(in, version);
+    }
+
+    @Override
+    public RepositoryRecord deserializeRecord(final DataInputStream in, final 
int version) throws IOException {
+        final SchemaRecordReader reader = 
SchemaRecordReader.fromSchema(recoverySchema);
+        final Record updateRecord = reader.readRecord(in);
+        if (updateRecord == null) {
+            // null may be returned by reader.readRecord() if it encounters 
end-of-stream
+            return null;
+        }
+
+        // Top level is always going to be a "Repository Record Update" record 
because we need a 'Union' type record at the
+        // top level that indicates which type of record we have.
+        final Record record = (Record) 
updateRecord.getFieldValue(RepositoryRecordSchema.REPOSITORY_RECORD_UPDATE_V2);
+
+        final String actionType = (String) 
record.getFieldValue(RepositoryRecordSchema.ACTION_TYPE_FIELD);
+        final UpdateType updateType = UpdateType.valueOf(actionType);
+        switch (updateType) {
+            case CREATE:
+                return createRecord(record);
+            case DELETE:
+                return deleteRecord(record);
+            case SWAP_IN:
+                return swapInRecord(record);
+            case SWAP_OUT:
+                return swapOutRecord(record);
+            case UPDATE:
+                return updateRecord(record);
+            default:
+                throw new IOException("Found unrecognized Update Type '" + 
actionType + "'");
+        }
+    }
+
+
+    @SuppressWarnings("unchecked")
+    private StandardRepositoryRecord createRecord(final Record record) {
+        final StandardFlowFileRecord.Builder ffBuilder = new 
StandardFlowFileRecord.Builder();
+        ffBuilder.id((Long) 
record.getFieldValue(RepositoryRecordSchema.RECORD_ID));
+        ffBuilder.entryDate((Long) 
record.getFieldValue(FlowFileSchema.ENTRY_DATE));
+
+        final Long lastQueueDate = (Long) 
record.getFieldValue(FlowFileSchema.QUEUE_DATE);
+        final Long queueDateIndex = (Long) 
record.getFieldValue(FlowFileSchema.QUEUE_DATE_INDEX);
+        ffBuilder.lastQueued(lastQueueDate, queueDateIndex);
+
+        final Long lineageStartDate = (Long) 
record.getFieldValue(FlowFileSchema.LINEAGE_START_DATE);
+        final Long lineageStartIndex = (Long) 
record.getFieldValue(FlowFileSchema.LINEAGE_START_INDEX);
+        ffBuilder.lineageStart(lineageStartDate, lineageStartIndex);
+
+        populateContentClaim(ffBuilder, record);
+        ffBuilder.size((Long) 
record.getFieldValue(FlowFileSchema.FLOWFILE_SIZE));
+
+        ffBuilder.addAttributes((Map<String, String>) 
record.getFieldValue(FlowFileSchema.ATTRIBUTES));
+
+        final FlowFileRecord flowFileRecord = ffBuilder.build();
+
+        final String queueId = (String) 
record.getFieldValue(RepositoryRecordSchema.QUEUE_IDENTIFIER);
+        final FlowFileQueue queue = getFlowFileQueue(queueId);
+
+        final StandardRepositoryRecord repoRecord = new 
StandardRepositoryRecord(queue, flowFileRecord);
+        requireFlowFileQueue(repoRecord, queueId);
+        return repoRecord;
+    }
+
+    private void requireFlowFileQueue(final StandardRepositoryRecord 
repoRecord, final String queueId) {
+        if (queueId == null || queueId.trim().isEmpty()) {
+            logger.warn("{} does not have a Queue associated with it; this 
record will be discarded", repoRecord.getCurrent());
+            repoRecord.markForAbort();
+        } else if (repoRecord.getOriginalQueue() == null) {
+            logger.warn("{} maps to unknown Queue {}; this record will be 
discarded", repoRecord.getCurrent(), queueId);
+            repoRecord.markForAbort();
+        }
+    }
+
+    private void populateContentClaim(final StandardFlowFileRecord.Builder 
ffBuilder, final Record record) {
+        final Object claimMap = 
record.getFieldValue(FlowFileSchema.CONTENT_CLAIM);
+        if (claimMap == null) {
+            return;
+        }
+
+        final Record claimRecord = (Record) claimMap;
+        final ContentClaim contentClaim = 
ContentClaimFieldMap.getContentClaim(claimRecord, resourceClaimManager);
+        final Long offset = 
ContentClaimFieldMap.getContentClaimOffset(claimRecord);
+
+        ffBuilder.contentClaim(contentClaim);
+        ffBuilder.contentClaimOffset(offset);
+    }
+
+    private RepositoryRecord updateRecord(final Record record) {
+        return createRecord(record);
+    }
+
+    private RepositoryRecord deleteRecord(final Record record) {
+        final Long recordId = (Long) 
record.getFieldValue(RepositoryRecordSchema.RECORD_ID_FIELD);
+        final StandardFlowFileRecord.Builder ffBuilder = new 
StandardFlowFileRecord.Builder().id(recordId);
+        final FlowFileRecord flowFileRecord = ffBuilder.build();
+
+        final StandardRepositoryRecord repoRecord = new 
StandardRepositoryRecord((FlowFileQueue) null, flowFileRecord);
+        repoRecord.markForDelete();
+        return repoRecord;
+    }
+
+    private RepositoryRecord swapInRecord(final Record record) {
+        final StandardRepositoryRecord repoRecord = createRecord(record);
+        final String swapLocation = (String) record.getFieldValue(new 
SimpleRecordField(RepositoryRecordSchema.SWAP_LOCATION, FieldType.STRING, 
Repetition.EXACTLY_ONE));
+        repoRecord.setSwapLocation(swapLocation);
+
+        final String queueId = (String) 
record.getFieldValue(RepositoryRecordSchema.QUEUE_IDENTIFIER);
+        requireFlowFileQueue(repoRecord, queueId);
+        return repoRecord;
+    }
+
+    private RepositoryRecord swapOutRecord(final Record record) {
+        final Long recordId = (Long) 
record.getFieldValue(RepositoryRecordSchema.RECORD_ID_FIELD);
+        final String queueId = (String) record.getFieldValue(new 
SimpleRecordField(RepositoryRecordSchema.QUEUE_IDENTIFIER, FieldType.STRING, 
Repetition.EXACTLY_ONE));
+        final String swapLocation = (String) record.getFieldValue(new 
SimpleRecordField(RepositoryRecordSchema.SWAP_LOCATION, FieldType.STRING, 
Repetition.EXACTLY_ONE));
+        final FlowFileQueue queue = getFlowFileQueue(queueId);
+
+        final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
+            .id(recordId)
+            .build();
+
+        return new StandardRepositoryRecord(queue, flowFileRecord, 
swapLocation);
+    }
+
+    @Override
+    public int getVersion() {
+        return MAX_ENCODING_VERSION;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/WriteAheadRepositoryRecordSerde.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/WriteAheadRepositoryRecordSerde.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/WriteAheadRepositoryRecordSerde.java
new file mode 100644
index 0000000..e8ce44e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/WriteAheadRepositoryRecordSerde.java
@@ -0,0 +1,517 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
+import org.apache.nifi.flowfile.FlowFile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wali.SerDe;
+import org.wali.UpdateType;
+
+public class WriteAheadRepositoryRecordSerde extends RepositoryRecordSerde 
implements SerDe<RepositoryRecord> {
+    private static final Logger logger = 
LoggerFactory.getLogger(WriteAheadRepositoryRecordSerde.class);
+
+    private static final int CURRENT_ENCODING_VERSION = 9;
+
+    public static final byte ACTION_CREATE = 0;
+    public static final byte ACTION_UPDATE = 1;
+    public static final byte ACTION_DELETE = 2;
+    public static final byte ACTION_SWAPPED_OUT = 3;
+    public static final byte ACTION_SWAPPED_IN = 4;
+
+    private long recordsRestored = 0L;
+    private final ResourceClaimManager claimManager;
+
+    public WriteAheadRepositoryRecordSerde(final ResourceClaimManager 
claimManager) {
+        this.claimManager = claimManager;
+    }
+
+    @Override
+    public void serializeEdit(final RepositoryRecord previousRecordState, 
final RepositoryRecord record, final DataOutputStream out) throws IOException {
+        serializeEdit(previousRecordState, record, out, false);
+    }
+
+    public void serializeEdit(final RepositoryRecord previousRecordState, 
final RepositoryRecord record, final DataOutputStream out, final boolean 
forceAttributesWritten) throws IOException {
+        if (record.isMarkedForAbort()) {
+            logger.warn("Repository Record {} is marked to be aborted; it will 
be persisted in the FlowFileRepository as a DELETE record", record);
+            out.write(ACTION_DELETE);
+            out.writeLong(getRecordIdentifier(record));
+            serializeContentClaim(record.getCurrentClaim(), 
record.getCurrentClaimOffset(), out);
+            return;
+        }
+
+        final UpdateType updateType = getUpdateType(record);
+
+        if (updateType.equals(UpdateType.DELETE)) {
+            out.write(ACTION_DELETE);
+            out.writeLong(getRecordIdentifier(record));
+            serializeContentClaim(record.getCurrentClaim(), 
record.getCurrentClaimOffset(), out);
+            return;
+        }
+
+        // If there's a Destination Connection, that's the one that we want to 
associated with this record.
+        // However, on restart, we will restore the FlowFile and set this 
connection to its "originalConnection".
+        // If we then serialize the FlowFile again before it's transferred, 
it's important to allow this to happen,
+        // so we use the originalConnection instead
+        FlowFileQueue associatedQueue = record.getDestination();
+        if (associatedQueue == null) {
+            associatedQueue = record.getOriginalQueue();
+        }
+
+        if (updateType.equals(UpdateType.SWAP_OUT)) {
+            out.write(ACTION_SWAPPED_OUT);
+            out.writeLong(getRecordIdentifier(record));
+            out.writeUTF(associatedQueue.getIdentifier());
+            out.writeUTF(getLocation(record));
+            return;
+        }
+
+        final FlowFile flowFile = record.getCurrent();
+        final ContentClaim claim = record.getCurrentClaim();
+
+        switch (updateType) {
+            case UPDATE:
+                out.write(ACTION_UPDATE);
+                break;
+            case CREATE:
+                out.write(ACTION_CREATE);
+                break;
+            case SWAP_IN:
+                out.write(ACTION_SWAPPED_IN);
+                break;
+            default:
+                throw new AssertionError();
+        }
+
+        out.writeLong(getRecordIdentifier(record));
+        out.writeLong(flowFile.getEntryDate());
+        out.writeLong(flowFile.getLineageStartDate());
+        out.writeLong(flowFile.getLineageStartIndex());
+
+        final Long queueDate = flowFile.getLastQueueDate();
+        out.writeLong(queueDate == null ? System.currentTimeMillis() : 
queueDate);
+        out.writeLong(flowFile.getQueueDateIndex());
+        out.writeLong(flowFile.getSize());
+
+        if (associatedQueue == null) {
+            logger.warn("{} Repository Record {} has no Connection associated 
with it; it will be destroyed on restart",
+                new Object[] {this, record});
+            writeString("", out);
+        } else {
+            writeString(associatedQueue.getIdentifier(), out);
+        }
+
+        serializeContentClaim(claim, record.getCurrentClaimOffset(), out);
+
+        if (forceAttributesWritten || record.isAttributesChanged() || 
updateType == UpdateType.CREATE || updateType == UpdateType.SWAP_IN) {
+            out.write(1);   // indicate attributes changed
+            final Map<String, String> attributes = flowFile.getAttributes();
+            out.writeInt(attributes.size());
+            for (final Map.Entry<String, String> entry : 
attributes.entrySet()) {
+                writeString(entry.getKey(), out);
+                writeString(entry.getValue(), out);
+            }
+        } else {
+            out.write(0);   // indicate attributes did not change
+        }
+
+        if (updateType == UpdateType.SWAP_IN) {
+            out.writeUTF(record.getSwapLocation());
+        }
+    }
+
+    @Override
+    public RepositoryRecord deserializeEdit(final DataInputStream in, final 
Map<Object, RepositoryRecord> currentRecordStates, final int version) throws 
IOException {
+        final int action = in.read();
+        final long recordId = in.readLong();
+        if (action == ACTION_DELETE) {
+            final StandardFlowFileRecord.Builder ffBuilder = new 
StandardFlowFileRecord.Builder().id(recordId);
+
+            if (version > 4) {
+                deserializeClaim(in, version, ffBuilder);
+            }
+
+            final FlowFileRecord flowFileRecord = ffBuilder.build();
+            final StandardRepositoryRecord record = new 
StandardRepositoryRecord((FlowFileQueue) null, flowFileRecord);
+            record.markForDelete();
+
+            return record;
+        }
+
+        if (action == ACTION_SWAPPED_OUT) {
+            final String queueId = in.readUTF();
+            final String location = in.readUTF();
+            final FlowFileQueue queue = getFlowFileQueue(queueId);
+
+            final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
+                .id(recordId)
+                .build();
+
+            return new StandardRepositoryRecord(queue, flowFileRecord, 
location);
+        }
+
+        final StandardFlowFileRecord.Builder ffBuilder = new 
StandardFlowFileRecord.Builder();
+        final RepositoryRecord record = currentRecordStates.get(recordId);
+        ffBuilder.id(recordId);
+        if (record != null) {
+            ffBuilder.fromFlowFile(record.getCurrent());
+        }
+        ffBuilder.entryDate(in.readLong());
+
+        if (version > 1) {
+            // read the lineage identifiers and lineage start date, which were 
added in version 2.
+            if (version < 9) {
+                final int numLineageIds = in.readInt();
+                for (int i = 0; i < numLineageIds; i++) {
+                    in.readUTF(); //skip identifiers
+                }
+            }
+            final long lineageStartDate = in.readLong();
+            final long lineageStartIndex;
+            if (version > 7) {
+                lineageStartIndex = in.readLong();
+            } else {
+                lineageStartIndex = 0L;
+            }
+            ffBuilder.lineageStart(lineageStartDate, lineageStartIndex);
+
+            if (version > 5) {
+                final long lastQueueDate = in.readLong();
+                final long queueDateIndex;
+                if (version > 7) {
+                    queueDateIndex = in.readLong();
+                } else {
+                    queueDateIndex = 0L;
+                }
+
+                ffBuilder.lastQueued(lastQueueDate, queueDateIndex);
+            }
+        }
+
+        ffBuilder.size(in.readLong());
+        final String connectionId = readString(in);
+
+        logger.debug("{} -> {}", new Object[] {recordId, connectionId});
+
+        deserializeClaim(in, version, ffBuilder);
+
+        // recover new attributes, if they changed
+        final int attributesChanged = in.read();
+        if (attributesChanged == -1) {
+            throw new EOFException();
+        } else if (attributesChanged == 1) {
+            final int numAttributes = in.readInt();
+            final Map<String, String> attributes = new HashMap<>();
+            for (int i = 0; i < numAttributes; i++) {
+                final String key = readString(in);
+                final String value = readString(in);
+                attributes.put(key, value);
+            }
+
+            ffBuilder.addAttributes(attributes);
+        } else if (attributesChanged != 0) {
+            throw new IOException("Attribute Change Qualifier not found in 
stream; found value: "
+                + attributesChanged + " after successfully restoring " + 
recordsRestored + " records. The FlowFile Repository appears to be corrupt!");
+        }
+
+        final FlowFileRecord flowFile = ffBuilder.build();
+        String swapLocation = null;
+        if (action == ACTION_SWAPPED_IN) {
+            swapLocation = in.readUTF();
+        }
+
+        final FlowFileQueue queue = getFlowFileQueue(connectionId);
+        final StandardRepositoryRecord standardRepoRecord = new 
StandardRepositoryRecord(queue, flowFile);
+        if (swapLocation != null) {
+            standardRepoRecord.setSwapLocation(swapLocation);
+        }
+
+        if (connectionId.isEmpty()) {
+            logger.warn("{} does not have a Queue associated with it; this 
record will be discarded", flowFile);
+            standardRepoRecord.markForAbort();
+        } else if (queue == null) {
+            logger.warn("{} maps to unknown Queue {}; this record will be 
discarded", flowFile, connectionId);
+            standardRepoRecord.markForAbort();
+        }
+
+        recordsRestored++;
+        return standardRepoRecord;
+    }
+
+    @Override
+    public StandardRepositoryRecord deserializeRecord(final DataInputStream 
in, final int version) throws IOException {
+        final int action = in.read();
+        if (action == -1) {
+            return null;
+        }
+
+        final long recordId = in.readLong();
+        if (action == ACTION_DELETE) {
+            final StandardFlowFileRecord.Builder ffBuilder = new 
StandardFlowFileRecord.Builder().id(recordId);
+
+            if (version > 4) {
+                deserializeClaim(in, version, ffBuilder);
+            }
+
+            final FlowFileRecord flowFileRecord = ffBuilder.build();
+            final StandardRepositoryRecord record = new 
StandardRepositoryRecord((FlowFileQueue) null, flowFileRecord);
+            record.markForDelete();
+            return record;
+        }
+
+        // if action was not delete, it must be create/swap in
+        final StandardFlowFileRecord.Builder ffBuilder = new 
StandardFlowFileRecord.Builder();
+        final long entryDate = in.readLong();
+
+        if (version > 1) {
+            // read the lineage identifiers and lineage start date, which were 
added in version 2.
+            if (version < 9) {
+                final int numLineageIds = in.readInt();
+                for (int i = 0; i < numLineageIds; i++) {
+                    in.readUTF(); //skip identifiers
+                }
+            }
+
+            final long lineageStartDate = in.readLong();
+            final long lineageStartIndex;
+            if (version > 7) {
+                lineageStartIndex = in.readLong();
+            } else {
+                lineageStartIndex = 0L;
+            }
+            ffBuilder.lineageStart(lineageStartDate, lineageStartIndex);
+
+            if (version > 5) {
+                final long lastQueueDate = in.readLong();
+                final long queueDateIndex;
+                if (version > 7) {
+                    queueDateIndex = in.readLong();
+                } else {
+                    queueDateIndex = 0L;
+                }
+
+                ffBuilder.lastQueued(lastQueueDate, queueDateIndex);
+            }
+        }
+
+        final long size = in.readLong();
+        final String connectionId = readString(in);
+
+        logger.debug("{} -> {}", new Object[] {recordId, connectionId});
+
+        ffBuilder.id(recordId);
+        ffBuilder.entryDate(entryDate);
+        ffBuilder.size(size);
+
+        deserializeClaim(in, version, ffBuilder);
+
+        final int attributesChanged = in.read();
+        if (attributesChanged == 1) {
+            final int numAttributes = in.readInt();
+            final Map<String, String> attributes = new HashMap<>();
+            for (int i = 0; i < numAttributes; i++) {
+                final String key = readString(in);
+                final String value = readString(in);
+                attributes.put(key, value);
+            }
+
+            ffBuilder.addAttributes(attributes);
+        } else if (attributesChanged == -1) {
+            throw new EOFException();
+        } else if (attributesChanged != 0) {
+            throw new IOException("Attribute Change Qualifier not found in 
stream; found value: "
+                + attributesChanged + " after successfully restoring " + 
recordsRestored + " records");
+        }
+
+        final FlowFileRecord flowFile = ffBuilder.build();
+        String swapLocation = null;
+        if (action == ACTION_SWAPPED_IN) {
+            swapLocation = in.readUTF();
+        }
+
+        final StandardRepositoryRecord record;
+        final FlowFileQueue queue = getFlowFileQueue(connectionId);
+        record = new StandardRepositoryRecord(queue, flowFile);
+        if (swapLocation != null) {
+            record.setSwapLocation(swapLocation);
+        }
+
+        if (connectionId.isEmpty()) {
+            logger.warn("{} does not have a FlowFile Queue associated with it; 
this record will be discarded", flowFile);
+            record.markForAbort();
+        } else if (queue == null) {
+            logger.warn("{} maps to unknown FlowFile Queue {}; this record 
will be discarded", flowFile, connectionId);
+            record.markForAbort();
+        }
+
+        recordsRestored++;
+        return record;
+    }
+
+    @Override
+    public void serializeRecord(final RepositoryRecord record, final 
DataOutputStream out) throws IOException {
+        serializeEdit(null, record, out, true);
+    }
+
+    private void serializeContentClaim(final ContentClaim claim, final long 
offset, final DataOutputStream out) throws IOException {
+        if (claim == null) {
+            out.write(0);
+        } else {
+            out.write(1);
+
+            final ResourceClaim resourceClaim = claim.getResourceClaim();
+            writeString(resourceClaim.getId(), out);
+            writeString(resourceClaim.getContainer(), out);
+            writeString(resourceClaim.getSection(), out);
+            out.writeLong(claim.getOffset());
+            out.writeLong(claim.getLength());
+
+            out.writeLong(offset);
+            out.writeBoolean(resourceClaim.isLossTolerant());
+        }
+    }
+
+    private void deserializeClaim(final DataInputStream in, final int 
serializationVersion, final StandardFlowFileRecord.Builder ffBuilder) throws 
IOException {
+        // determine current Content Claim.
+        final int claimExists = in.read();
+        if (claimExists == 1) {
+            final String claimId;
+            if (serializationVersion < 4) {
+                claimId = String.valueOf(in.readLong());
+            } else {
+                claimId = readString(in);
+            }
+
+            final String container = readString(in);
+            final String section = readString(in);
+
+            final long resourceOffset;
+            final long resourceLength;
+            if (serializationVersion < 7) {
+                resourceOffset = 0L;
+                resourceLength = -1L;
+            } else {
+                resourceOffset = in.readLong();
+                resourceLength = in.readLong();
+            }
+
+            final long claimOffset = in.readLong();
+
+            final boolean lossTolerant;
+            if (serializationVersion >= 3) {
+                lossTolerant = in.readBoolean();
+            } else {
+                lossTolerant = false;
+            }
+
+            final ResourceClaim resourceClaim = 
claimManager.newResourceClaim(container, section, claimId, lossTolerant, false);
+            final StandardContentClaim contentClaim = new 
StandardContentClaim(resourceClaim, resourceOffset);
+            contentClaim.setLength(resourceLength);
+
+            ffBuilder.contentClaim(contentClaim);
+            ffBuilder.contentClaimOffset(claimOffset);
+        } else if (claimExists == -1) {
+            throw new EOFException();
+        } else if (claimExists != 0) {
+            throw new IOException("Claim Existence Qualifier not found in 
stream; found value: "
+                + claimExists + " after successfully restoring " + 
recordsRestored + " records");
+        }
+    }
+
+    private void writeString(final String toWrite, final OutputStream out) 
throws IOException {
+        final byte[] bytes = toWrite.getBytes("UTF-8");
+        final int utflen = bytes.length;
+
+        if (utflen < 65535) {
+            out.write(utflen >>> 8);
+            out.write(utflen);
+            out.write(bytes);
+        } else {
+            out.write(255);
+            out.write(255);
+            out.write(utflen >>> 24);
+            out.write(utflen >>> 16);
+            out.write(utflen >>> 8);
+            out.write(utflen);
+            out.write(bytes);
+        }
+    }
+
+    private String readString(final InputStream in) throws IOException {
+        final Integer numBytes = readFieldLength(in);
+        if (numBytes == null) {
+            throw new EOFException();
+        }
+        final byte[] bytes = new byte[numBytes];
+        fillBuffer(in, bytes, numBytes);
+        return new String(bytes, "UTF-8");
+    }
+
+    private Integer readFieldLength(final InputStream in) throws IOException {
+        final int firstValue = in.read();
+        final int secondValue = in.read();
+        if (firstValue < 0) {
+            return null;
+        }
+        if (secondValue < 0) {
+            throw new EOFException();
+        }
+        if (firstValue == 0xff && secondValue == 0xff) {
+            final int ch1 = in.read();
+            final int ch2 = in.read();
+            final int ch3 = in.read();
+            final int ch4 = in.read();
+            if ((ch1 | ch2 | ch3 | ch4) < 0) {
+                throw new EOFException();
+            }
+            return (ch1 << 24) + (ch2 << 16) + (ch3 << 8) + ch4;
+        } else {
+            return (firstValue << 8) + secondValue;
+        }
+    }
+
+    private void fillBuffer(final InputStream in, final byte[] buffer, final 
int length) throws IOException {
+        int bytesRead;
+        int totalBytesRead = 0;
+        while ((bytesRead = in.read(buffer, totalBytesRead, length - 
totalBytesRead)) > 0) {
+            totalBytesRead += bytesRead;
+        }
+        if (totalBytesRead != length) {
+            throw new EOFException();
+        }
+    }
+
+    @Override
+    public int getVersion() {
+        return CURRENT_ENCODING_VERSION;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimFieldMap.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimFieldMap.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimFieldMap.java
new file mode 100644
index 0000000..b218ee6
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimFieldMap.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository.schema;
+
+import java.util.List;
+
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
+import org.apache.nifi.repository.schema.Record;
+import org.apache.nifi.repository.schema.RecordField;
+import org.apache.nifi.repository.schema.RecordSchema;
+
+public class ContentClaimFieldMap implements Record {
+    private final ContentClaim contentClaim;
+    private final long contentClaimOffset;
+    private final ResourceClaimFieldMap resourceClaimFieldMap;
+    private final RecordSchema schema;
+
+    public ContentClaimFieldMap(final ContentClaim contentClaim, final long 
contentClaimOffset, final RecordSchema schema) {
+        this.contentClaim = contentClaim;
+        this.contentClaimOffset = contentClaimOffset;
+        this.schema = schema;
+
+        final List<RecordField> resourceClaimFields = 
schema.getField(ContentClaimSchema.RESOURCE_CLAIM).getSubFields();
+        final RecordSchema resourceClaimSchema = new 
RecordSchema(resourceClaimFields);
+        this.resourceClaimFieldMap = new 
ResourceClaimFieldMap(contentClaim.getResourceClaim(), resourceClaimSchema);
+    }
+
+    @Override
+    public Object getFieldValue(final String fieldName) {
+        switch (fieldName) {
+            case ContentClaimSchema.RESOURCE_CLAIM:
+                return resourceClaimFieldMap;
+            case ContentClaimSchema.CONTENT_CLAIM_LENGTH:
+                return contentClaim.getLength();
+            case ContentClaimSchema.CONTENT_CLAIM_OFFSET:
+                return contentClaimOffset;
+            case ContentClaimSchema.RESOURCE_CLAIM_OFFSET:
+                return contentClaim.getOffset();
+            default:
+                return null;
+        }
+    }
+
+    @Override
+    public RecordSchema getSchema() {
+        return schema;
+    }
+
+    @Override
+    public int hashCode() {
+        return (int) (31 + contentClaimOffset + 21 * 
resourceClaimFieldMap.hashCode());
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+
+        ContentClaimFieldMap other = (ContentClaimFieldMap) obj;
+        if (contentClaimOffset != other.contentClaimOffset) {
+            return false;
+        }
+
+        if (resourceClaimFieldMap == null) {
+            if (other.resourceClaimFieldMap != null) {
+                return false;
+            }
+        } else if (!resourceClaimFieldMap.equals(other.resourceClaimFieldMap)) 
{
+            return false;
+        }
+
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return "ContentClaimFieldMap[" + contentClaim + "]";
+    }
+
+    public static ContentClaim getContentClaim(final Record claimRecord, final 
ResourceClaimManager resourceClaimManager) {
+        final Record resourceClaimRecord = (Record) 
claimRecord.getFieldValue(ContentClaimSchema.RESOURCE_CLAIM);
+        final String container = (String) 
resourceClaimRecord.getFieldValue(ContentClaimSchema.CLAIM_CONTAINER);
+        final String section = (String) 
resourceClaimRecord.getFieldValue(ContentClaimSchema.CLAIM_SECTION);
+        final String identifier = (String) 
resourceClaimRecord.getFieldValue(ContentClaimSchema.CLAIM_IDENTIFIER);
+        final Boolean lossTolerant = (Boolean) 
resourceClaimRecord.getFieldValue(ContentClaimSchema.LOSS_TOLERANT);
+
+        final Long length = (Long) 
claimRecord.getFieldValue(ContentClaimSchema.CONTENT_CLAIM_LENGTH);
+        final Long resourceOffset = (Long) 
claimRecord.getFieldValue(ContentClaimSchema.RESOURCE_CLAIM_OFFSET);
+
+        final ResourceClaim resourceClaim = 
resourceClaimManager.newResourceClaim(container, section, identifier, 
lossTolerant, false);
+        final StandardContentClaim contentClaim = new 
StandardContentClaim(resourceClaim, resourceOffset);
+        contentClaim.setLength(length);
+
+        return contentClaim;
+    }
+
+    public static Long getContentClaimOffset(final Record claimRecord) {
+        return (Long) 
claimRecord.getFieldValue(ContentClaimSchema.CONTENT_CLAIM_OFFSET);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimSchema.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimSchema.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimSchema.java
new file mode 100644
index 0000000..c55c758
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimSchema.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository.schema;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.nifi.repository.schema.ComplexRecordField;
+import org.apache.nifi.repository.schema.FieldType;
+import org.apache.nifi.repository.schema.RecordField;
+import org.apache.nifi.repository.schema.RecordSchema;
+import org.apache.nifi.repository.schema.Repetition;
+import org.apache.nifi.repository.schema.SimpleRecordField;
+
+public class ContentClaimSchema {
+
+    // resource claim fields
+    public static final String CLAIM_CONTAINER = "Container";
+    public static final String CLAIM_SECTION = "Section";
+    public static final String CLAIM_IDENTIFIER = "Identifier";
+    public static final String LOSS_TOLERANT = "Loss Tolerant";
+    public static final String RESOURCE_CLAIM = "Resource Claim";
+
+    // content claim fields
+    public static final String RESOURCE_CLAIM_OFFSET = "Resource Claim 
Offset"; // offset into resource claim where the content claim begins
+    public static final String CONTENT_CLAIM_OFFSET = "Content Claim Offset"; 
// offset into the content claim where the flowfile begins
+    public static final String CONTENT_CLAIM_LENGTH = "Content Claim Length";
+
+    public static final RecordSchema CONTENT_CLAIM_SCHEMA_V1;
+    public static final RecordSchema RESOURCE_CLAIM_SCHEMA_V1;
+
+    static {
+        final List<RecordField> resourceClaimFields = new ArrayList<>();
+        resourceClaimFields.add(new SimpleRecordField(CLAIM_CONTAINER, 
FieldType.STRING, Repetition.EXACTLY_ONE));
+        resourceClaimFields.add(new SimpleRecordField(CLAIM_SECTION, 
FieldType.STRING, Repetition.EXACTLY_ONE));
+        resourceClaimFields.add(new SimpleRecordField(CLAIM_IDENTIFIER, 
FieldType.STRING, Repetition.EXACTLY_ONE));
+        resourceClaimFields.add(new SimpleRecordField(LOSS_TOLERANT, 
FieldType.BOOLEAN, Repetition.EXACTLY_ONE));
+        RESOURCE_CLAIM_SCHEMA_V1 = new 
RecordSchema(Collections.unmodifiableList(resourceClaimFields));
+
+        final List<RecordField> contentClaimFields = new ArrayList<>();
+        contentClaimFields.add(new ComplexRecordField(RESOURCE_CLAIM, 
Repetition.EXACTLY_ONE, resourceClaimFields));
+        contentClaimFields.add(new SimpleRecordField(RESOURCE_CLAIM_OFFSET, 
FieldType.LONG, Repetition.EXACTLY_ONE));
+        contentClaimFields.add(new SimpleRecordField(CONTENT_CLAIM_OFFSET, 
FieldType.LONG, Repetition.EXACTLY_ONE));
+        contentClaimFields.add(new SimpleRecordField(CONTENT_CLAIM_LENGTH, 
FieldType.LONG, Repetition.EXACTLY_ONE));
+        CONTENT_CLAIM_SCHEMA_V1 = new 
RecordSchema(Collections.unmodifiableList(contentClaimFields));
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileRecordFieldMap.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileRecordFieldMap.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileRecordFieldMap.java
new file mode 100644
index 0000000..ff0615f
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileRecordFieldMap.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository.schema;
+
+import java.util.Map;
+
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.StandardFlowFileRecord;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.repository.schema.Record;
+import org.apache.nifi.repository.schema.RecordField;
+import org.apache.nifi.repository.schema.RecordSchema;
+
+public class FlowFileRecordFieldMap implements Record {
+    private final FlowFileRecord flowFile;
+    private final RecordSchema schema;
+    private final RecordSchema contentClaimSchema;
+    private final ContentClaimFieldMap contentClaim;
+
+    public FlowFileRecordFieldMap(final FlowFileRecord flowFile, final 
RecordSchema schema) {
+        this.flowFile = flowFile;
+        this.schema = schema;
+
+        final RecordField contentClaimField = 
schema.getField(FlowFileSchema.CONTENT_CLAIM);
+        contentClaimSchema = new 
RecordSchema(contentClaimField.getSubFields());
+        contentClaim = flowFile.getContentClaim() == null ? null : new 
ContentClaimFieldMap(flowFile.getContentClaim(), 
flowFile.getContentClaimOffset(), contentClaimSchema);
+    }
+
+    @Override
+    public RecordSchema getSchema() {
+        return schema;
+    }
+
+    @Override
+    public Object getFieldValue(final String fieldName) {
+        switch (fieldName) {
+            case FlowFileSchema.ATTRIBUTES:
+                return flowFile.getAttributes();
+            case FlowFileSchema.CONTENT_CLAIM:
+                return contentClaim;
+            case FlowFileSchema.ENTRY_DATE:
+                return flowFile.getEntryDate();
+            case FlowFileSchema.FLOWFILE_SIZE:
+                return flowFile.getSize();
+            case FlowFileSchema.LINEAGE_START_DATE:
+                return flowFile.getLineageStartDate();
+            case FlowFileSchema.LINEAGE_START_INDEX:
+                return flowFile.getLineageStartIndex();
+            case FlowFileSchema.QUEUE_DATE:
+                return flowFile.getLastQueueDate();
+            case FlowFileSchema.QUEUE_DATE_INDEX:
+                return flowFile.getQueueDateIndex();
+            case FlowFileSchema.RECORD_ID:
+                return flowFile.getId();
+        }
+
+        return null;
+    }
+
+    @SuppressWarnings("unchecked")
+    public static FlowFileRecord getFlowFile(final Record record, final 
ResourceClaimManager claimManager) {
+        final StandardFlowFileRecord.Builder builder = new 
StandardFlowFileRecord.Builder();
+        builder.id((Long) record.getFieldValue(FlowFileSchema.RECORD_ID));
+        builder.entryDate((Long) 
record.getFieldValue(FlowFileSchema.ENTRY_DATE));
+        builder.size((Long) 
record.getFieldValue(FlowFileSchema.FLOWFILE_SIZE));
+        builder.addAttributes((Map<String, String>) 
record.getFieldValue(FlowFileSchema.ATTRIBUTES));
+        builder.lineageStart((Long) 
record.getFieldValue(FlowFileSchema.LINEAGE_START_DATE), (Long) 
record.getFieldValue(FlowFileSchema.LINEAGE_START_INDEX));
+        builder.lastQueued((Long) 
record.getFieldValue(FlowFileSchema.QUEUE_DATE), (Long) 
record.getFieldValue(FlowFileSchema.QUEUE_DATE_INDEX));
+
+        final Record contentClaimRecord = (Record) 
record.getFieldValue(FlowFileSchema.CONTENT_CLAIM);
+        if (contentClaimRecord != null) {
+            final ContentClaim claim = 
ContentClaimFieldMap.getContentClaim(contentClaimRecord, claimManager);
+            builder.contentClaim(claim);
+
+            final Long offset = 
ContentClaimFieldMap.getContentClaimOffset(contentClaimRecord);
+            if (offset != null) {
+                builder.contentClaimOffset(offset);
+            }
+        }
+
+        return builder.build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileSchema.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileSchema.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileSchema.java
new file mode 100644
index 0000000..6af3066
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileSchema.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository.schema;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.nifi.repository.schema.ComplexRecordField;
+import org.apache.nifi.repository.schema.FieldType;
+import org.apache.nifi.repository.schema.MapRecordField;
+import org.apache.nifi.repository.schema.RecordField;
+import org.apache.nifi.repository.schema.RecordSchema;
+import org.apache.nifi.repository.schema.Repetition;
+import org.apache.nifi.repository.schema.SimpleRecordField;
+
+public class FlowFileSchema {
+
+    public static final String RECORD_ID = "Record ID";
+    public static final String ENTRY_DATE = "Entry Date";
+    public static final String LINEAGE_START_DATE = "Lineage Start Date";
+    public static final String LINEAGE_START_INDEX = "Lineage Start Index";
+    public static final String QUEUE_DATE = "Queued Date";
+    public static final String QUEUE_DATE_INDEX = "Queued Date Index";
+    public static final String FLOWFILE_SIZE = "FlowFile Size";
+    public static final String CONTENT_CLAIM = "Content Claim";
+    public static final String ATTRIBUTES = "Attributes";
+
+    // attribute fields
+    public static final String ATTRIBUTE_NAME = "Attribute Name";
+    public static final String ATTRIBUTE_VALUE = "Attribute Value";
+
+    public static final RecordSchema FLOWFILE_SCHEMA_V1;
+    public static final RecordSchema FLOWFILE_SCHEMA_V2;
+
+    static {
+        final List<RecordField> flowFileFields = new ArrayList<>();
+
+        final RecordField attributeNameField = new 
SimpleRecordField(ATTRIBUTE_NAME, FieldType.STRING, Repetition.EXACTLY_ONE);
+        final RecordField attributeValueField = new 
SimpleRecordField(ATTRIBUTE_VALUE, FieldType.STRING, Repetition.EXACTLY_ONE);
+
+        flowFileFields.add(new SimpleRecordField(RECORD_ID, FieldType.LONG, 
Repetition.EXACTLY_ONE));
+        flowFileFields.add(new SimpleRecordField(ENTRY_DATE, FieldType.LONG, 
Repetition.EXACTLY_ONE));
+        flowFileFields.add(new SimpleRecordField(LINEAGE_START_DATE, 
FieldType.LONG, Repetition.EXACTLY_ONE));
+        flowFileFields.add(new SimpleRecordField(LINEAGE_START_INDEX, 
FieldType.LONG, Repetition.EXACTLY_ONE));
+        flowFileFields.add(new SimpleRecordField(QUEUE_DATE, FieldType.LONG, 
Repetition.EXACTLY_ONE));
+        flowFileFields.add(new SimpleRecordField(QUEUE_DATE_INDEX, 
FieldType.LONG, Repetition.EXACTLY_ONE));
+        flowFileFields.add(new SimpleRecordField(FLOWFILE_SIZE, 
FieldType.LONG, Repetition.EXACTLY_ONE));
+        flowFileFields.add(new ComplexRecordField(CONTENT_CLAIM, 
Repetition.ZERO_OR_ONE, 
ContentClaimSchema.CONTENT_CLAIM_SCHEMA_V1.getFields()));
+        flowFileFields.add(new MapRecordField(ATTRIBUTES, attributeNameField, 
attributeValueField, Repetition.ZERO_OR_ONE));
+
+        FLOWFILE_SCHEMA_V1 = new RecordSchema(flowFileFields);
+    }
+
+    static {
+        final List<RecordField> flowFileFields = new ArrayList<>();
+
+        final RecordField attributeNameField = new 
SimpleRecordField(ATTRIBUTE_NAME, FieldType.LONG_STRING, 
Repetition.EXACTLY_ONE);
+        final RecordField attributeValueField = new 
SimpleRecordField(ATTRIBUTE_VALUE, FieldType.LONG_STRING, 
Repetition.EXACTLY_ONE);
+
+        flowFileFields.add(new SimpleRecordField(RECORD_ID, FieldType.LONG, 
Repetition.EXACTLY_ONE));
+        flowFileFields.add(new SimpleRecordField(ENTRY_DATE, FieldType.LONG, 
Repetition.EXACTLY_ONE));
+        flowFileFields.add(new SimpleRecordField(LINEAGE_START_DATE, 
FieldType.LONG, Repetition.EXACTLY_ONE));
+        flowFileFields.add(new SimpleRecordField(LINEAGE_START_INDEX, 
FieldType.LONG, Repetition.EXACTLY_ONE));
+        flowFileFields.add(new SimpleRecordField(QUEUE_DATE, FieldType.LONG, 
Repetition.EXACTLY_ONE));
+        flowFileFields.add(new SimpleRecordField(QUEUE_DATE_INDEX, 
FieldType.LONG, Repetition.EXACTLY_ONE));
+        flowFileFields.add(new SimpleRecordField(FLOWFILE_SIZE, 
FieldType.LONG, Repetition.EXACTLY_ONE));
+        flowFileFields.add(new ComplexRecordField(CONTENT_CLAIM, 
Repetition.ZERO_OR_ONE, 
ContentClaimSchema.CONTENT_CLAIM_SCHEMA_V1.getFields()));
+        flowFileFields.add(new MapRecordField(ATTRIBUTES, attributeNameField, 
attributeValueField, Repetition.ZERO_OR_ONE));
+
+        FLOWFILE_SCHEMA_V2 = new RecordSchema(flowFileFields);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordFieldMap.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordFieldMap.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordFieldMap.java
new file mode 100644
index 0000000..5fe4889
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordFieldMap.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository.schema;
+
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.RepositoryRecord;
+import org.apache.nifi.repository.schema.Record;
+import org.apache.nifi.repository.schema.RecordSchema;
+
+public class RepositoryRecordFieldMap implements Record {
+    private final RepositoryRecord record;
+    private final FlowFileRecord flowFile;
+    private final RecordSchema schema;
+    private final RecordSchema contentClaimSchema;
+
+    public RepositoryRecordFieldMap(final RepositoryRecord record, final 
RecordSchema repoRecordSchema, final RecordSchema contentClaimSchema) {
+        this.schema = repoRecordSchema;
+        this.contentClaimSchema = contentClaimSchema;
+        this.record = record;
+        this.flowFile = record.getCurrent();
+    }
+
+    @Override
+    public Object getFieldValue(final String fieldName) {
+        switch (fieldName) {
+            case RepositoryRecordSchema.ACTION_TYPE:
+                return record.getType().name();
+            case RepositoryRecordSchema.RECORD_ID:
+                return record.getCurrent().getId();
+            case RepositoryRecordSchema.SWAP_LOCATION:
+                return record.getSwapLocation();
+            case FlowFileSchema.ATTRIBUTES:
+                return flowFile.getAttributes();
+            case FlowFileSchema.ENTRY_DATE:
+                return flowFile.getEntryDate();
+            case FlowFileSchema.FLOWFILE_SIZE:
+                return flowFile.getSize();
+            case FlowFileSchema.LINEAGE_START_DATE:
+                return flowFile.getLineageStartDate();
+            case FlowFileSchema.LINEAGE_START_INDEX:
+                return flowFile.getLineageStartIndex();
+            case FlowFileSchema.QUEUE_DATE:
+                return flowFile.getLastQueueDate();
+            case FlowFileSchema.QUEUE_DATE_INDEX:
+                return flowFile.getQueueDateIndex();
+            case FlowFileSchema.CONTENT_CLAIM:
+                final ContentClaimFieldMap contentClaimFieldMap = 
record.getCurrentClaim() == null ? null
+                    : new ContentClaimFieldMap(record.getCurrentClaim(), 
record.getCurrentClaimOffset(), contentClaimSchema);
+                return contentClaimFieldMap;
+            case RepositoryRecordSchema.QUEUE_IDENTIFIER:
+                final FlowFileQueue queue = record.getDestination() == null ? 
record.getOriginalQueue() : record.getDestination();
+                return queue == null ? null : queue.getIdentifier();
+            default:
+                return null;
+        }
+    }
+
+    @Override
+    public RecordSchema getSchema() {
+        return schema;
+    }
+
+    @Override
+    public String toString() {
+        return "RepositoryRecordFieldMap[" + record + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordSchema.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordSchema.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordSchema.java
new file mode 100644
index 0000000..db77c8b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordSchema.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository.schema;
+
+import org.apache.nifi.repository.schema.ComplexRecordField;
+import org.apache.nifi.repository.schema.FieldType;
+import org.apache.nifi.repository.schema.RecordField;
+import org.apache.nifi.repository.schema.RecordSchema;
+import org.apache.nifi.repository.schema.Repetition;
+import org.apache.nifi.repository.schema.SimpleRecordField;
+import org.apache.nifi.repository.schema.UnionRecordField;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class RepositoryRecordSchema {
+    public static final String REPOSITORY_RECORD_UPDATE_V1 = "Repository 
Record Update";  // top level field name
+    public static final String REPOSITORY_RECORD_UPDATE_V2 = "Repository 
Record Update";  // top level field name
+
+    // repository record fields
+    public static final String ACTION_TYPE = "Action";
+    public static final String RECORD_ID = "Record ID";
+    public static final String QUEUE_IDENTIFIER = "Queue Identifier";
+    public static final String SWAP_LOCATION = "Swap Location";
+
+    // Update types
+    public static final String CREATE_OR_UPDATE_ACTION = "Create or Update";
+    public static final String DELETE_ACTION = "Delete";
+    public static final String SWAP_IN_ACTION = "Swap In";
+    public static final String SWAP_OUT_ACTION = "Swap Out";
+
+    public static final RecordSchema REPOSITORY_RECORD_SCHEMA_V1;
+    public static final RecordSchema CREATE_OR_UPDATE_SCHEMA_V1;
+    public static final RecordSchema DELETE_SCHEMA_V1;
+    public static final RecordSchema SWAP_IN_SCHEMA_V1;
+    public static final RecordSchema SWAP_OUT_SCHEMA_V1;
+
+    public static final RecordSchema REPOSITORY_RECORD_SCHEMA_V2;
+    public static final RecordSchema CREATE_OR_UPDATE_SCHEMA_V2;
+    public static final RecordSchema DELETE_SCHEMA_V2;
+    public static final RecordSchema SWAP_IN_SCHEMA_V2;
+    public static final RecordSchema SWAP_OUT_SCHEMA_V2;
+
+    public static final RecordField ACTION_TYPE_FIELD = new 
SimpleRecordField(ACTION_TYPE, FieldType.STRING, Repetition.EXACTLY_ONE);
+    public static final RecordField RECORD_ID_FIELD = new 
SimpleRecordField(RECORD_ID, FieldType.LONG, Repetition.EXACTLY_ONE);
+
+    static {
+        // Fields for "Create" or "Update" records
+        final List<RecordField> createOrUpdateFields = new ArrayList<>();
+        createOrUpdateFields.add(ACTION_TYPE_FIELD);
+        
createOrUpdateFields.addAll(FlowFileSchema.FLOWFILE_SCHEMA_V1.getFields());
+
+        createOrUpdateFields.add(new SimpleRecordField(QUEUE_IDENTIFIER, 
FieldType.STRING, Repetition.EXACTLY_ONE));
+        createOrUpdateFields.add(new SimpleRecordField(SWAP_LOCATION, 
FieldType.STRING, Repetition.ZERO_OR_ONE));
+        final ComplexRecordField createOrUpdate = new 
ComplexRecordField(CREATE_OR_UPDATE_ACTION, Repetition.EXACTLY_ONE, 
createOrUpdateFields);
+        CREATE_OR_UPDATE_SCHEMA_V1 = new RecordSchema(createOrUpdateFields);
+
+        // Fields for "Delete" records
+        final List<RecordField> deleteFields = new ArrayList<>();
+        deleteFields.add(ACTION_TYPE_FIELD);
+        deleteFields.add(RECORD_ID_FIELD);
+        final ComplexRecordField delete = new 
ComplexRecordField(DELETE_ACTION, Repetition.EXACTLY_ONE, deleteFields);
+        DELETE_SCHEMA_V1 = new RecordSchema(deleteFields);
+
+        // Fields for "Swap Out" records
+        final List<RecordField> swapOutFields = new ArrayList<>();
+        swapOutFields.add(ACTION_TYPE_FIELD);
+        swapOutFields.add(RECORD_ID_FIELD);
+        swapOutFields.add(new SimpleRecordField(QUEUE_IDENTIFIER, 
FieldType.STRING, Repetition.EXACTLY_ONE));
+        swapOutFields.add(new SimpleRecordField(SWAP_LOCATION, 
FieldType.STRING, Repetition.EXACTLY_ONE));
+        final ComplexRecordField swapOut = new 
ComplexRecordField(SWAP_OUT_ACTION, Repetition.EXACTLY_ONE, swapOutFields);
+        SWAP_OUT_SCHEMA_V1 = new RecordSchema(swapOutFields);
+
+        // Fields for "Swap In" records
+        final List<RecordField> swapInFields = new 
ArrayList<>(createOrUpdateFields);
+        swapInFields.add(new SimpleRecordField(SWAP_LOCATION, 
FieldType.STRING, Repetition.EXACTLY_ONE));
+        final ComplexRecordField swapIn = new 
ComplexRecordField(SWAP_IN_ACTION, Repetition.EXACTLY_ONE, swapInFields);
+        SWAP_IN_SCHEMA_V1 = new RecordSchema(swapInFields);
+
+        // Union Field that creates the top-level field type
+        final UnionRecordField repoUpdateField = new 
UnionRecordField(REPOSITORY_RECORD_UPDATE_V1, Repetition.EXACTLY_ONE, 
createOrUpdate, delete, swapOut, swapIn);
+        REPOSITORY_RECORD_SCHEMA_V1 = new 
RecordSchema(Collections.singletonList(repoUpdateField));
+    }
+
+    static {
+        // Fields for "Create" or "Update" records
+        final List<RecordField> createOrUpdateFields = new ArrayList<>();
+        createOrUpdateFields.add(ACTION_TYPE_FIELD);
+        
createOrUpdateFields.addAll(FlowFileSchema.FLOWFILE_SCHEMA_V2.getFields());
+
+        createOrUpdateFields.add(new SimpleRecordField(QUEUE_IDENTIFIER, 
FieldType.STRING, Repetition.EXACTLY_ONE));
+        createOrUpdateFields.add(new SimpleRecordField(SWAP_LOCATION, 
FieldType.STRING, Repetition.ZERO_OR_ONE));
+        final ComplexRecordField createOrUpdate = new 
ComplexRecordField(CREATE_OR_UPDATE_ACTION, Repetition.EXACTLY_ONE, 
createOrUpdateFields);
+        CREATE_OR_UPDATE_SCHEMA_V2 = new RecordSchema(createOrUpdateFields);
+
+        // Fields for "Delete" records
+        final List<RecordField> deleteFields = new ArrayList<>();
+        deleteFields.add(ACTION_TYPE_FIELD);
+        deleteFields.add(RECORD_ID_FIELD);
+        final ComplexRecordField delete = new 
ComplexRecordField(DELETE_ACTION, Repetition.EXACTLY_ONE, deleteFields);
+        DELETE_SCHEMA_V2 = new RecordSchema(deleteFields);
+
+        // Fields for "Swap Out" records
+        final List<RecordField> swapOutFields = new ArrayList<>();
+        swapOutFields.add(ACTION_TYPE_FIELD);
+        swapOutFields.add(RECORD_ID_FIELD);
+        swapOutFields.add(new SimpleRecordField(QUEUE_IDENTIFIER, 
FieldType.STRING, Repetition.EXACTLY_ONE));
+        swapOutFields.add(new SimpleRecordField(SWAP_LOCATION, 
FieldType.STRING, Repetition.EXACTLY_ONE));
+        final ComplexRecordField swapOut = new 
ComplexRecordField(SWAP_OUT_ACTION, Repetition.EXACTLY_ONE, swapOutFields);
+        SWAP_OUT_SCHEMA_V2 = new RecordSchema(swapOutFields);
+
+        // Fields for "Swap In" records
+        final List<RecordField> swapInFields = new 
ArrayList<>(createOrUpdateFields);
+        swapInFields.add(new SimpleRecordField(SWAP_LOCATION, 
FieldType.STRING, Repetition.EXACTLY_ONE));
+        final ComplexRecordField swapIn = new 
ComplexRecordField(SWAP_IN_ACTION, Repetition.EXACTLY_ONE, swapInFields);
+        SWAP_IN_SCHEMA_V2 = new RecordSchema(swapInFields);
+
+        // Union Field that creates the top-level field type
+        final UnionRecordField repoUpdateField = new 
UnionRecordField(REPOSITORY_RECORD_UPDATE_V2, Repetition.EXACTLY_ONE, 
createOrUpdate, delete, swapOut, swapIn);
+        REPOSITORY_RECORD_SCHEMA_V2 = new 
RecordSchema(Collections.singletonList(repoUpdateField));
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordUpdate.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordUpdate.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordUpdate.java
new file mode 100644
index 0000000..93fa4e4
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordUpdate.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository.schema;
+
+import org.apache.nifi.controller.repository.RepositoryRecordType;
+import org.apache.nifi.repository.schema.NamedValue;
+import org.apache.nifi.repository.schema.Record;
+import org.apache.nifi.repository.schema.RecordSchema;
+import org.wali.UpdateType;
+
+public class RepositoryRecordUpdate implements Record {
+    private final RecordSchema schema;
+    private final RepositoryRecordFieldMap fieldMap;
+
+    public RepositoryRecordUpdate(final RepositoryRecordFieldMap fieldMap, 
final RecordSchema schema) {
+        this.schema = schema;
+        this.fieldMap = fieldMap;
+    }
+
+    @Override
+    public RecordSchema getSchema() {
+        return schema;
+    }
+
+    @Override
+    public Object getFieldValue(final String fieldName) {
+        if 
(RepositoryRecordSchema.REPOSITORY_RECORD_UPDATE_V2.equals(fieldName)) {
+            String actionType = (String) 
fieldMap.getFieldValue(RepositoryRecordSchema.ACTION_TYPE);
+            if (RepositoryRecordType.CONTENTMISSING.name().equals(actionType)) 
{
+                actionType = RepositoryRecordType.DELETE.name();
+            }
+            final UpdateType updateType = UpdateType.valueOf(actionType);
+
+            final String actionName;
+            switch (updateType) {
+                case CREATE:
+                case UPDATE:
+                    actionName = 
RepositoryRecordSchema.CREATE_OR_UPDATE_ACTION;
+                    break;
+                case DELETE:
+                    actionName = RepositoryRecordSchema.DELETE_ACTION;
+                    break;
+                case SWAP_IN:
+                    actionName = RepositoryRecordSchema.SWAP_IN_ACTION;
+                    break;
+                case SWAP_OUT:
+                    actionName = RepositoryRecordSchema.SWAP_OUT_ACTION;
+                    break;
+                default:
+                    return null;
+            }
+
+            return new NamedValue(actionName, fieldMap);
+        }
+        return null;
+    }
+
+}

Reply via email to