Repository: nifi
Updated Branches:
  refs/heads/master ca887308a -> 8ffa1703b


NIFI-3389 - Using long string type for attribute name and value in 
FlowFileSchema

This closes #1446.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/8ffa1703
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/8ffa1703
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/8ffa1703

Branch: refs/heads/master
Commit: 8ffa1703ba0ff82aa2127c511adfaf2241fb1c8e
Parents: ca88730
Author: Bryan Rosander <[email protected]>
Authored: Wed Jan 25 11:40:05 2017 -0500
Committer: Mark Payne <[email protected]>
Committed: Tue Feb 14 13:04:09 2017 -0500

----------------------------------------------------------------------
 .../repository/SchemaRepositoryRecordSerde.java |  22 +-
 .../repository/schema/FlowFileSchema.java       |  20 ++
 .../schema/RepositoryRecordSchema.java          |  54 +++-
 .../schema/RepositoryRecordUpdate.java          |   2 +-
 .../controller/swap/SchemaSwapSerializer.java   |   4 +-
 .../apache/nifi/controller/swap/SwapSchema.java |  34 ++-
 .../SchemaRepositoryRecordSerdeTest.java        | 266 +++++++++++++++++++
 7 files changed, 383 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/8ffa1703/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java
index c0c9d18..75f6ff2 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java
@@ -45,9 +45,9 @@ import org.wali.UpdateType;
 
 public class SchemaRepositoryRecordSerde extends RepositoryRecordSerde 
implements SerDe<RepositoryRecord> {
     private static final Logger logger = 
LoggerFactory.getLogger(SchemaRepositoryRecordSerde.class);
-    private static final int MAX_ENCODING_VERSION = 1;
+    private static final int MAX_ENCODING_VERSION = 2;
 
-    private final RecordSchema writeSchema = 
RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1;
+    private final RecordSchema writeSchema = 
RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V2;
     private final RecordSchema contentClaimSchema = 
ContentClaimSchema.CONTENT_CLAIM_SCHEMA_V1;
 
     private final ResourceClaimManager resourceClaimManager;
@@ -73,25 +73,29 @@ public class SchemaRepositoryRecordSerde extends 
RepositoryRecordSerde implement
         switch (record.getType()) {
             case CREATE:
             case UPDATE:
-                schema = RepositoryRecordSchema.CREATE_OR_UPDATE_SCHEMA_V1;
+                schema = RepositoryRecordSchema.CREATE_OR_UPDATE_SCHEMA_V2;
                 break;
             case CONTENTMISSING:
             case DELETE:
-                schema = RepositoryRecordSchema.DELETE_SCHEMA_V1;
+                schema = RepositoryRecordSchema.DELETE_SCHEMA_V2;
                 break;
             case SWAP_IN:
-                schema = RepositoryRecordSchema.SWAP_IN_SCHEMA_V1;
+                schema = RepositoryRecordSchema.SWAP_IN_SCHEMA_V2;
                 break;
             case SWAP_OUT:
-                schema = RepositoryRecordSchema.SWAP_OUT_SCHEMA_V1;
+                schema = RepositoryRecordSchema.SWAP_OUT_SCHEMA_V2;
                 break;
             default:
                 throw new IllegalArgumentException("Received Repository Record 
with unknown Update Type: " + record.getType()); // won't happen.
         }
 
-        final RepositoryRecordFieldMap fieldMap = new 
RepositoryRecordFieldMap(record, schema, contentClaimSchema);
-        final RepositoryRecordUpdate update = new 
RepositoryRecordUpdate(fieldMap, 
RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1);
+        serializeRecord(record, out, schema, 
RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V2);
+    }
 
+
+    protected void serializeRecord(final RepositoryRecord record, final 
DataOutputStream out, RecordSchema schema, RecordSchema repositoryRecordSchema) 
throws IOException {
+        final RepositoryRecordFieldMap fieldMap = new 
RepositoryRecordFieldMap(record, schema, contentClaimSchema);
+        final RepositoryRecordUpdate update = new 
RepositoryRecordUpdate(fieldMap, repositoryRecordSchema);
         new SchemaRecordWriter().writeRecord(update, out);
     }
 
@@ -112,7 +116,7 @@ public class SchemaRepositoryRecordSerde extends 
RepositoryRecordSerde implement
 
         // Top level is always going to be a "Repository Record Update" record 
because we need a 'Union' type record at the
         // top level that indicates which type of record we have.
-        final Record record = (Record) 
updateRecord.getFieldValue(RepositoryRecordSchema.REPOSITORY_RECORD_UPDATE_V1);
+        final Record record = (Record) 
updateRecord.getFieldValue(RepositoryRecordSchema.REPOSITORY_RECORD_UPDATE_V2);
 
         final String actionType = (String) 
record.getFieldValue(RepositoryRecordSchema.ACTION_TYPE_FIELD);
         final UpdateType updateType = UpdateType.valueOf(actionType);

http://git-wip-us.apache.org/repos/asf/nifi/blob/8ffa1703/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileSchema.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileSchema.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileSchema.java
index 53eab70..6af3066 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileSchema.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileSchema.java
@@ -45,6 +45,7 @@ public class FlowFileSchema {
     public static final String ATTRIBUTE_VALUE = "Attribute Value";
 
     public static final RecordSchema FLOWFILE_SCHEMA_V1;
+    public static final RecordSchema FLOWFILE_SCHEMA_V2;
 
     static {
         final List<RecordField> flowFileFields = new ArrayList<>();
@@ -64,4 +65,23 @@ public class FlowFileSchema {
 
         FLOWFILE_SCHEMA_V1 = new RecordSchema(flowFileFields);
     }
+
+    static {
+        final List<RecordField> flowFileFields = new ArrayList<>();
+
+        final RecordField attributeNameField = new 
SimpleRecordField(ATTRIBUTE_NAME, FieldType.LONG_STRING, 
Repetition.EXACTLY_ONE);
+        final RecordField attributeValueField = new 
SimpleRecordField(ATTRIBUTE_VALUE, FieldType.LONG_STRING, 
Repetition.EXACTLY_ONE);
+
+        flowFileFields.add(new SimpleRecordField(RECORD_ID, FieldType.LONG, 
Repetition.EXACTLY_ONE));
+        flowFileFields.add(new SimpleRecordField(ENTRY_DATE, FieldType.LONG, 
Repetition.EXACTLY_ONE));
+        flowFileFields.add(new SimpleRecordField(LINEAGE_START_DATE, 
FieldType.LONG, Repetition.EXACTLY_ONE));
+        flowFileFields.add(new SimpleRecordField(LINEAGE_START_INDEX, 
FieldType.LONG, Repetition.EXACTLY_ONE));
+        flowFileFields.add(new SimpleRecordField(QUEUE_DATE, FieldType.LONG, 
Repetition.EXACTLY_ONE));
+        flowFileFields.add(new SimpleRecordField(QUEUE_DATE_INDEX, 
FieldType.LONG, Repetition.EXACTLY_ONE));
+        flowFileFields.add(new SimpleRecordField(FLOWFILE_SIZE, 
FieldType.LONG, Repetition.EXACTLY_ONE));
+        flowFileFields.add(new ComplexRecordField(CONTENT_CLAIM, 
Repetition.ZERO_OR_ONE, 
ContentClaimSchema.CONTENT_CLAIM_SCHEMA_V1.getFields()));
+        flowFileFields.add(new MapRecordField(ATTRIBUTES, attributeNameField, 
attributeValueField, Repetition.ZERO_OR_ONE));
+
+        FLOWFILE_SCHEMA_V2 = new RecordSchema(flowFileFields);
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/8ffa1703/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordSchema.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordSchema.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordSchema.java
index f99b5d9..db77c8b 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordSchema.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordSchema.java
@@ -17,10 +17,6 @@
 
 package org.apache.nifi.controller.repository.schema;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
 import org.apache.nifi.repository.schema.ComplexRecordField;
 import org.apache.nifi.repository.schema.FieldType;
 import org.apache.nifi.repository.schema.RecordField;
@@ -29,9 +25,13 @@ import org.apache.nifi.repository.schema.Repetition;
 import org.apache.nifi.repository.schema.SimpleRecordField;
 import org.apache.nifi.repository.schema.UnionRecordField;
 
-public class RepositoryRecordSchema {
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 
+public class RepositoryRecordSchema {
     public static final String REPOSITORY_RECORD_UPDATE_V1 = "Repository 
Record Update";  // top level field name
+    public static final String REPOSITORY_RECORD_UPDATE_V2 = "Repository 
Record Update";  // top level field name
 
     // repository record fields
     public static final String ACTION_TYPE = "Action";
@@ -51,6 +51,12 @@ public class RepositoryRecordSchema {
     public static final RecordSchema SWAP_IN_SCHEMA_V1;
     public static final RecordSchema SWAP_OUT_SCHEMA_V1;
 
+    public static final RecordSchema REPOSITORY_RECORD_SCHEMA_V2;
+    public static final RecordSchema CREATE_OR_UPDATE_SCHEMA_V2;
+    public static final RecordSchema DELETE_SCHEMA_V2;
+    public static final RecordSchema SWAP_IN_SCHEMA_V2;
+    public static final RecordSchema SWAP_OUT_SCHEMA_V2;
+
     public static final RecordField ACTION_TYPE_FIELD = new 
SimpleRecordField(ACTION_TYPE, FieldType.STRING, Repetition.EXACTLY_ONE);
     public static final RecordField RECORD_ID_FIELD = new 
SimpleRecordField(RECORD_ID, FieldType.LONG, Repetition.EXACTLY_ONE);
 
@@ -91,4 +97,42 @@ public class RepositoryRecordSchema {
         final UnionRecordField repoUpdateField = new 
UnionRecordField(REPOSITORY_RECORD_UPDATE_V1, Repetition.EXACTLY_ONE, 
createOrUpdate, delete, swapOut, swapIn);
         REPOSITORY_RECORD_SCHEMA_V1 = new 
RecordSchema(Collections.singletonList(repoUpdateField));
     }
+
+    static {
+        // Fields for "Create" or "Update" records
+        final List<RecordField> createOrUpdateFields = new ArrayList<>();
+        createOrUpdateFields.add(ACTION_TYPE_FIELD);
+        
createOrUpdateFields.addAll(FlowFileSchema.FLOWFILE_SCHEMA_V2.getFields());
+
+        createOrUpdateFields.add(new SimpleRecordField(QUEUE_IDENTIFIER, 
FieldType.STRING, Repetition.EXACTLY_ONE));
+        createOrUpdateFields.add(new SimpleRecordField(SWAP_LOCATION, 
FieldType.STRING, Repetition.ZERO_OR_ONE));
+        final ComplexRecordField createOrUpdate = new 
ComplexRecordField(CREATE_OR_UPDATE_ACTION, Repetition.EXACTLY_ONE, 
createOrUpdateFields);
+        CREATE_OR_UPDATE_SCHEMA_V2 = new RecordSchema(createOrUpdateFields);
+
+        // Fields for "Delete" records
+        final List<RecordField> deleteFields = new ArrayList<>();
+        deleteFields.add(ACTION_TYPE_FIELD);
+        deleteFields.add(RECORD_ID_FIELD);
+        final ComplexRecordField delete = new 
ComplexRecordField(DELETE_ACTION, Repetition.EXACTLY_ONE, deleteFields);
+        DELETE_SCHEMA_V2 = new RecordSchema(deleteFields);
+
+        // Fields for "Swap Out" records
+        final List<RecordField> swapOutFields = new ArrayList<>();
+        swapOutFields.add(ACTION_TYPE_FIELD);
+        swapOutFields.add(RECORD_ID_FIELD);
+        swapOutFields.add(new SimpleRecordField(QUEUE_IDENTIFIER, 
FieldType.STRING, Repetition.EXACTLY_ONE));
+        swapOutFields.add(new SimpleRecordField(SWAP_LOCATION, 
FieldType.STRING, Repetition.EXACTLY_ONE));
+        final ComplexRecordField swapOut = new 
ComplexRecordField(SWAP_OUT_ACTION, Repetition.EXACTLY_ONE, swapOutFields);
+        SWAP_OUT_SCHEMA_V2 = new RecordSchema(swapOutFields);
+
+        // Fields for "Swap In" records
+        final List<RecordField> swapInFields = new 
ArrayList<>(createOrUpdateFields);
+        swapInFields.add(new SimpleRecordField(SWAP_LOCATION, 
FieldType.STRING, Repetition.EXACTLY_ONE));
+        final ComplexRecordField swapIn = new 
ComplexRecordField(SWAP_IN_ACTION, Repetition.EXACTLY_ONE, swapInFields);
+        SWAP_IN_SCHEMA_V2 = new RecordSchema(swapInFields);
+
+        // Union Field that creates the top-level field type
+        final UnionRecordField repoUpdateField = new 
UnionRecordField(REPOSITORY_RECORD_UPDATE_V2, Repetition.EXACTLY_ONE, 
createOrUpdate, delete, swapOut, swapIn);
+        REPOSITORY_RECORD_SCHEMA_V2 = new 
RecordSchema(Collections.singletonList(repoUpdateField));
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/8ffa1703/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordUpdate.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordUpdate.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordUpdate.java
index ad51f4d..c11353b 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordUpdate.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordUpdate.java
@@ -38,7 +38,7 @@ public class RepositoryRecordUpdate implements Record {
 
     @Override
     public Object getFieldValue(final String fieldName) {
-        if 
(RepositoryRecordSchema.REPOSITORY_RECORD_UPDATE_V1.equals(fieldName)) {
+        if 
(RepositoryRecordSchema.REPOSITORY_RECORD_UPDATE_V2.equals(fieldName)) {
             final String actionType = (String) 
fieldMap.getFieldValue(RepositoryRecordSchema.ACTION_TYPE);
             final UpdateType updateType = UpdateType.valueOf(actionType);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/8ffa1703/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SchemaSwapSerializer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SchemaSwapSerializer.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SchemaSwapSerializer.java
index 195f55a..96c7ddc 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SchemaSwapSerializer.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SchemaSwapSerializer.java
@@ -46,7 +46,7 @@ import org.apache.nifi.repository.schema.SimpleRecordField;
 public class SchemaSwapSerializer implements SwapSerializer {
     static final String SERIALIZATION_NAME = "Schema Swap Serialization";
 
-    private final RecordSchema schema = SwapSchema.FULL_SWAP_FILE_SCHEMA_V1;
+    private final RecordSchema schema = SwapSchema.FULL_SWAP_FILE_SCHEMA_V2;
     private final RecordSchema flowFileSchema = new 
RecordSchema(schema.getField(SwapSchema.FLOWFILE_CONTENTS).getSubFields());
 
     @Override
@@ -78,7 +78,7 @@ public class SchemaSwapSerializer implements SwapSerializer {
 
         // Create a simple record to hold the summary and the flowfile contents
         final RecordField summaryField = new 
SimpleRecordField(SwapSchema.SWAP_SUMMARY, FieldType.COMPLEX, 
Repetition.EXACTLY_ONE);
-        final RecordField contentsField = new 
ComplexRecordField(SwapSchema.FLOWFILE_CONTENTS, Repetition.ZERO_OR_MORE, 
FlowFileSchema.FLOWFILE_SCHEMA_V1.getFields());
+        final RecordField contentsField = new 
ComplexRecordField(SwapSchema.FLOWFILE_CONTENTS, Repetition.ZERO_OR_MORE, 
FlowFileSchema.FLOWFILE_SCHEMA_V2.getFields());
         final List<RecordField> fields = new ArrayList<>(2);
         fields.add(summaryField);
         fields.add(contentsField);

http://git-wip-us.apache.org/repos/asf/nifi/blob/8ffa1703/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SwapSchema.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SwapSchema.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SwapSchema.java
index 70fb539..6908900 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SwapSchema.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SwapSchema.java
@@ -32,11 +32,14 @@ import org.apache.nifi.repository.schema.Repetition;
 import org.apache.nifi.repository.schema.SimpleRecordField;
 
 public class SwapSchema {
-
     public static final RecordSchema SWAP_SUMMARY_SCHEMA_V1;
     public static final RecordSchema SWAP_CONTENTS_SCHEMA_V1;
     public static final RecordSchema FULL_SWAP_FILE_SCHEMA_V1;
 
+    public static final RecordSchema SWAP_SUMMARY_SCHEMA_V2;
+    public static final RecordSchema SWAP_CONTENTS_SCHEMA_V2;
+    public static final RecordSchema FULL_SWAP_FILE_SCHEMA_V2;
+
     public static final String RESOURCE_CLAIMS = "Resource Claims";
     public static final String RESOURCE_CLAIM = "Resource Claim";
     public static final String RESOURCE_CLAIM_COUNT = "Claim Count";
@@ -48,7 +51,6 @@ public class SwapSchema {
     public static final String SWAP_SUMMARY = "Swap Summary";
     public static final String FLOWFILE_CONTENTS = "FlowFiles";
 
-
     static {
         final RecordField queueIdentifier = new 
SimpleRecordField(QUEUE_IDENTIFIER, FieldType.STRING, Repetition.EXACTLY_ONE);
         final RecordField flowFileCount = new 
SimpleRecordField(FLOWFILE_COUNT, FieldType.INT, Repetition.EXACTLY_ONE);
@@ -76,4 +78,32 @@ public class SwapSchema {
         fullSchemaFields.add(new ComplexRecordField(FLOWFILE_CONTENTS, 
Repetition.ZERO_OR_MORE, FlowFileSchema.FLOWFILE_SCHEMA_V1.getFields()));
         FULL_SWAP_FILE_SCHEMA_V1 = new RecordSchema(fullSchemaFields);
     }
+
+    static {
+        final RecordField queueIdentifier = new 
SimpleRecordField(QUEUE_IDENTIFIER, FieldType.STRING, Repetition.EXACTLY_ONE);
+        final RecordField flowFileCount = new 
SimpleRecordField(FLOWFILE_COUNT, FieldType.INT, Repetition.EXACTLY_ONE);
+        final RecordField flowFileSize = new SimpleRecordField(FLOWFILE_SIZE, 
FieldType.LONG, Repetition.EXACTLY_ONE);
+        final RecordField maxRecordId = new SimpleRecordField(MAX_RECORD_ID, 
FieldType.LONG, Repetition.EXACTLY_ONE);
+
+        final RecordField resourceClaimField = new 
ComplexRecordField(RESOURCE_CLAIM, Repetition.EXACTLY_ONE, 
ContentClaimSchema.RESOURCE_CLAIM_SCHEMA_V1.getFields());
+        final RecordField claimCountField = new 
SimpleRecordField(RESOURCE_CLAIM_COUNT, FieldType.INT, Repetition.EXACTLY_ONE);
+        final RecordField resourceClaims = new MapRecordField(RESOURCE_CLAIMS, 
resourceClaimField, claimCountField, Repetition.EXACTLY_ONE);
+
+        final List<RecordField> summaryFields = new ArrayList<>();
+        summaryFields.add(queueIdentifier);
+        summaryFields.add(flowFileCount);
+        summaryFields.add(flowFileSize);
+        summaryFields.add(maxRecordId);
+        summaryFields.add(resourceClaims);
+        SWAP_SUMMARY_SCHEMA_V2 = new RecordSchema(summaryFields);
+
+        final RecordField flowFiles = new 
ComplexRecordField(FLOWFILE_CONTENTS, Repetition.ZERO_OR_MORE, 
FlowFileSchema.FLOWFILE_SCHEMA_V2.getFields());
+        final List<RecordField> contentsFields = 
Collections.singletonList(flowFiles);
+        SWAP_CONTENTS_SCHEMA_V2 = new RecordSchema(contentsFields);
+
+        final List<RecordField> fullSchemaFields = new ArrayList<>();
+        fullSchemaFields.add(new ComplexRecordField(SWAP_SUMMARY, 
Repetition.EXACTLY_ONE, summaryFields));
+        fullSchemaFields.add(new ComplexRecordField(FLOWFILE_CONTENTS, 
Repetition.ZERO_OR_MORE, FlowFileSchema.FLOWFILE_SCHEMA_V2.getFields()));
+        FULL_SWAP_FILE_SCHEMA_V2 = new RecordSchema(fullSchemaFields);
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/8ffa1703/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerdeTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerdeTest.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerdeTest.java
new file mode 100644
index 0000000..59b0e7b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerdeTest.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository;
+
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import 
org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
+import org.apache.nifi.controller.repository.schema.RepositoryRecordSchema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.nifi.controller.repository.RepositoryRecordType.SWAP_IN;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class SchemaRepositoryRecordSerdeTest {
+    public static final String TEST_QUEUE_IDENTIFIER = "testQueueIdentifier";
+    private StandardResourceClaimManager resourceClaimManager;
+    private SchemaRepositoryRecordSerde schemaRepositoryRecordSerde;
+    private Map<String, FlowFileQueue> queueMap;
+    private FlowFileQueue flowFileQueue;
+    private ByteArrayOutputStream byteArrayOutputStream;
+    private DataOutputStream dataOutputStream;
+
+    @Before
+    public void setup() {
+        resourceClaimManager = new StandardResourceClaimManager();
+        schemaRepositoryRecordSerde = new 
SchemaRepositoryRecordSerde(resourceClaimManager);
+        queueMap = new HashMap<>();
+        schemaRepositoryRecordSerde.setQueueMap(queueMap);
+        flowFileQueue = createMockQueue(TEST_QUEUE_IDENTIFIER);
+        byteArrayOutputStream = new ByteArrayOutputStream();
+        dataOutputStream = new DataOutputStream(byteArrayOutputStream);
+    }
+
+    @After
+    public void teardown() {
+        resourceClaimManager.purge();
+    }
+
+    @Test
+    public void testV1CreateCantHandleLongAttributeName() throws IOException {
+        
RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1.writeTo(dataOutputStream);
+        Map<String, String> attributes = new HashMap<>();
+        StringBuilder stringBuilder = new StringBuilder();
+        for (int i = 0; i < 65536; i++) {
+            stringBuilder.append('a');
+        }
+        attributes.put(stringBuilder.toString(), "testValue");
+        
schemaRepositoryRecordSerde.serializeRecord(createCreateFlowFileRecord(attributes),
 dataOutputStream,
+                RepositoryRecordSchema.CREATE_OR_UPDATE_SCHEMA_V1, 
RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1);
+
+        DataInputStream dataInputStream = createDataInputStream();
+        schemaRepositoryRecordSerde.readHeader(dataInputStream);
+        RepositoryRecord repositoryRecord = 
schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
+        assertNotEquals(attributes, 
repositoryRecord.getCurrent().getAttributes());
+    }
+
+    @Test
+    public void testV1CreateCantHandleLongAttributeValue() throws IOException {
+        
RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1.writeTo(dataOutputStream);
+        Map<String, String> attributes = new HashMap<>();
+        StringBuilder stringBuilder = new StringBuilder();
+        for (int i = 0; i < 65536; i++) {
+            stringBuilder.append('a');
+        }
+        attributes.put("testName", stringBuilder.toString());
+        
schemaRepositoryRecordSerde.serializeRecord(createCreateFlowFileRecord(attributes),
 dataOutputStream,
+                RepositoryRecordSchema.CREATE_OR_UPDATE_SCHEMA_V1, 
RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1);
+
+        DataInputStream dataInputStream = createDataInputStream();
+        schemaRepositoryRecordSerde.readHeader(dataInputStream);
+        RepositoryRecord repositoryRecord = 
schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
+        assertNotEquals(attributes, 
repositoryRecord.getCurrent().getAttributes());
+    }
+
+    @Test
+    public void testV2CreateCanHandleLongAttributeName() throws IOException {
+        schemaRepositoryRecordSerde.writeHeader(dataOutputStream);
+        Map<String, String> attributes = new HashMap<>();
+        StringBuilder stringBuilder = new StringBuilder();
+        for (int i = 0; i < 65536; i++) {
+            stringBuilder.append('a');
+        }
+        attributes.put(stringBuilder.toString(), "testValue");
+        
schemaRepositoryRecordSerde.serializeRecord(createCreateFlowFileRecord(attributes),
 dataOutputStream);
+
+        DataInputStream dataInputStream = createDataInputStream();
+        schemaRepositoryRecordSerde.readHeader(dataInputStream);
+        RepositoryRecord repositoryRecord = 
schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
+        assertEquals(attributes, 
repositoryRecord.getCurrent().getAttributes());
+    }
+
+    @Test
+    public void testV2CreateCanHandleLongAttributeValue() throws IOException {
+        schemaRepositoryRecordSerde.writeHeader(dataOutputStream);
+        Map<String, String> attributes = new HashMap<>();
+        StringBuilder stringBuilder = new StringBuilder();
+        for (int i = 0; i < 65536; i++) {
+            stringBuilder.append('a');
+        }
+        attributes.put("testName", stringBuilder.toString());
+        
schemaRepositoryRecordSerde.serializeRecord(createCreateFlowFileRecord(attributes),
 dataOutputStream);
+
+        DataInputStream dataInputStream = createDataInputStream();
+        schemaRepositoryRecordSerde.readHeader(dataInputStream);
+        RepositoryRecord repositoryRecord = 
schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
+        assertEquals(attributes, 
repositoryRecord.getCurrent().getAttributes());
+    }
+
+    @Test
+    public void testRoundTripCreateV1ToV2() throws IOException {
+        
RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1.writeTo(dataOutputStream);
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put("testName", "testValue");
+        
schemaRepositoryRecordSerde.serializeRecord(createCreateFlowFileRecord(attributes),
 dataOutputStream,
+                RepositoryRecordSchema.CREATE_OR_UPDATE_SCHEMA_V1, 
RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1);
+
+        DataInputStream dataInputStream = createDataInputStream();
+        schemaRepositoryRecordSerde.readHeader(dataInputStream);
+        RepositoryRecord repositoryRecord = 
schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
+        assertEquals(attributes, 
repositoryRecord.getCurrent().getAttributes());
+    }
+
+    @Test
+    public void testV1SwapInCantHandleLongAttributeName() throws IOException {
+        
RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1.writeTo(dataOutputStream);
+        Map<String, String> attributes = new HashMap<>();
+        StringBuilder stringBuilder = new StringBuilder();
+        for (int i = 0; i < 65536; i++) {
+            stringBuilder.append('a');
+        }
+        attributes.put(stringBuilder.toString(), "testValue");
+        StandardRepositoryRecord record = 
createCreateFlowFileRecord(attributes);
+        record.setSwapLocation("fake");
+        assertEquals(SWAP_IN, record.getType());
+        schemaRepositoryRecordSerde.serializeRecord(record, dataOutputStream, 
RepositoryRecordSchema.SWAP_IN_SCHEMA_V1, 
RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1);
+
+        DataInputStream dataInputStream = createDataInputStream();
+        schemaRepositoryRecordSerde.readHeader(dataInputStream);
+        RepositoryRecord repositoryRecord = 
schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
+        assertNotEquals(attributes, 
repositoryRecord.getCurrent().getAttributes());
+    }
+
+    @Test
+    public void testV1SwapInCantHandleLongAttributeValue() throws IOException {
+        
RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1.writeTo(dataOutputStream);
+        Map<String, String> attributes = new HashMap<>();
+        StringBuilder stringBuilder = new StringBuilder();
+        for (int i = 0; i < 65536; i++) {
+            stringBuilder.append('a');
+        }
+        attributes.put("testName", stringBuilder.toString());
+        StandardRepositoryRecord record = 
createCreateFlowFileRecord(attributes);
+        record.setSwapLocation("fake");
+        assertEquals(SWAP_IN, record.getType());
+        schemaRepositoryRecordSerde.serializeRecord(record, dataOutputStream, 
RepositoryRecordSchema.SWAP_IN_SCHEMA_V1, 
RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1);
+
+        DataInputStream dataInputStream = createDataInputStream();
+        schemaRepositoryRecordSerde.readHeader(dataInputStream);
+        RepositoryRecord repositoryRecord = 
schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
+        assertNotEquals(attributes, 
repositoryRecord.getCurrent().getAttributes());
+    }
+
+    @Test
+    public void testV2SwapInCanHandleLongAttributeName() throws IOException {
+        schemaRepositoryRecordSerde.writeHeader(dataOutputStream);
+        Map<String, String> attributes = new HashMap<>();
+        StringBuilder stringBuilder = new StringBuilder();
+        for (int i = 0; i < 65536; i++) {
+            stringBuilder.append('a');
+        }
+        attributes.put(stringBuilder.toString(), "testValue");
+        StandardRepositoryRecord record = 
createCreateFlowFileRecord(attributes);
+        record.setSwapLocation("fake");
+        assertEquals(SWAP_IN, record.getType());
+        schemaRepositoryRecordSerde.serializeRecord(record, dataOutputStream);
+
+        DataInputStream dataInputStream = createDataInputStream();
+        schemaRepositoryRecordSerde.readHeader(dataInputStream);
+        RepositoryRecord repositoryRecord = 
schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
+        assertEquals(attributes, 
repositoryRecord.getCurrent().getAttributes());
+    }
+
+    @Test
+    public void testV2SwapInCanHandleLongAttributeValue() throws IOException {
+        schemaRepositoryRecordSerde.writeHeader(dataOutputStream);
+        Map<String, String> attributes = new HashMap<>();
+        StringBuilder stringBuilder = new StringBuilder();
+        for (int i = 0; i < 65536; i++) {
+            stringBuilder.append('a');
+        }
+        attributes.put("testName", stringBuilder.toString());
+        StandardRepositoryRecord record = 
createCreateFlowFileRecord(attributes);
+        record.setSwapLocation("fake");
+        assertEquals(SWAP_IN, record.getType());
+        schemaRepositoryRecordSerde.serializeRecord(record, dataOutputStream);
+
+        DataInputStream dataInputStream = createDataInputStream();
+        schemaRepositoryRecordSerde.readHeader(dataInputStream);
+        RepositoryRecord repositoryRecord = 
schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
+        assertEquals(attributes, 
repositoryRecord.getCurrent().getAttributes());
+    }
+
+    @Test
+    public void testRoundTripSwapInV1ToV2() throws IOException {
+        
RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1.writeTo(dataOutputStream);
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put("testName", "testValue");
+        StandardRepositoryRecord record = 
createCreateFlowFileRecord(attributes);
+        record.setSwapLocation("fake");
+        assertEquals(SWAP_IN, record.getType());
+        schemaRepositoryRecordSerde.serializeRecord(record, dataOutputStream, 
RepositoryRecordSchema.SWAP_IN_SCHEMA_V1, 
RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1);
+
+        DataInputStream dataInputStream = createDataInputStream();
+        schemaRepositoryRecordSerde.readHeader(dataInputStream);
+        RepositoryRecord repositoryRecord = 
schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
+        assertEquals(attributes, 
repositoryRecord.getCurrent().getAttributes());
+        assertEquals(SWAP_IN, repositoryRecord.getType());
+    }
+
+    private DataInputStream createDataInputStream() throws IOException {
+        dataOutputStream.flush();
+        return new DataInputStream(new 
ByteArrayInputStream(byteArrayOutputStream.toByteArray()));
+    }
+
+    private StandardRepositoryRecord createCreateFlowFileRecord(Map<String, 
String> attributes) {
+        StandardRepositoryRecord standardRepositoryRecord = new 
StandardRepositoryRecord(flowFileQueue);
+        StandardFlowFileRecord.Builder flowFileRecordBuilder = new 
StandardFlowFileRecord.Builder();
+        flowFileRecordBuilder.addAttributes(attributes);
+        standardRepositoryRecord.setWorking(flowFileRecordBuilder.build());
+        return standardRepositoryRecord;
+    }
+
+    private FlowFileQueue createMockQueue(String identifier) {
+        FlowFileQueue flowFileQueue = mock(FlowFileQueue.class);
+        when(flowFileQueue.getIdentifier()).thenReturn(identifier);
+        queueMap.put(identifier, flowFileQueue);
+        return flowFileQueue;
+    }
+}

Reply via email to