Repository: nifi
Updated Branches:
  refs/heads/master f742a3a6a -> 4700b8653


NIFI-5138: Bug fix to ensure that when we have a CHOICE between two or more 
REOCRD types that we choose the appropriate RECORD type when creating the 
Record in the JSON Reader.

Signed-off-by: Pierre Villard <[email protected]>

This closes #2670.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/4700b865
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/4700b865
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/4700b865

Branch: refs/heads/master
Commit: 4700b8653dc980b1cf8985430683b79eb64922a4
Parents: f742a3a
Author: Mark Payne <[email protected]>
Authored: Wed May 2 09:13:52 2018 -0400
Committer: Pierre Villard <[email protected]>
Committed: Wed May 9 18:38:04 2018 +0200

----------------------------------------------------------------------
 .../nifi/record/path/StandardFieldValue.java    |  2 +-
 .../record/util/DataTypeUtils.java              | 36 +++++++++++-
 .../nifi-record-serialization-services/pom.xml  |  2 +
 .../nifi/json/AbstractJsonRowRecordReader.java  | 36 ++++++++++--
 .../nifi/json/JsonTreeRowRecordReader.java      |  2 +-
 .../nifi/json/TestJsonTreeRowRecordReader.java  | 59 ++++++++++++++++++++
 .../json/elements-for-record-choice.json        | 11 ++++
 .../src/test/resources/json/record-choice.avsc  | 15 +++++
 8 files changed, 154 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/4700b865/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardFieldValue.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardFieldValue.java
 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardFieldValue.java
index 5897086..7526c0c 100644
--- 
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardFieldValue.java
+++ 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardFieldValue.java
@@ -78,7 +78,7 @@ public class StandardFieldValue implements FieldValue {
             return Arrays.toString((Object[]) value);
         }
 
-        return value.toString();
+        return String.valueOf(value);
     }
 
     protected static FieldValue validateParentRecord(final FieldValue parent) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/4700b865/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
index 477b02a..d15f379 100644
--- 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
@@ -52,8 +52,11 @@ import 
org.apache.nifi.serialization.record.type.ArrayDataType;
 import org.apache.nifi.serialization.record.type.ChoiceDataType;
 import org.apache.nifi.serialization.record.type.MapDataType;
 import org.apache.nifi.serialization.record.type.RecordDataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class DataTypeUtils {
+    private static final Logger logger = 
LoggerFactory.getLogger(DataTypeUtils.class);
 
     // Regexes for parsing Floating-Point numbers
     private static final String OptionalSign  = "[\\-\\+]?";
@@ -192,8 +195,37 @@ public class DataTypeUtils {
                 return isIntegerTypeCompatible(value);
             case LONG:
                 return isLongTypeCompatible(value);
-            case RECORD:
-                return isRecordTypeCompatible(value);
+            case RECORD: {
+                if (value == null) {
+                    return false;
+                }
+                if (!(value instanceof Record)) {
+                    return false;
+                }
+
+                final RecordSchema schema = ((RecordDataType) 
dataType).getChildSchema();
+                if (schema == null) {
+                    return true;
+                }
+
+                final Record record = (Record) value;
+                for (final RecordField childField : schema.getFields()) {
+                    final Object childValue = record.getValue(childField);
+                    if (childValue == null && !childField.isNullable()) {
+                        logger.debug("Value is not compatible with schema 
because field {} has a null value, which is not allowed in the schema", 
childField.getFieldName());
+                        return false;
+                    }
+                    if (childValue == null) {
+                        continue; // consider compatible
+                    }
+
+                    if (!isCompatibleDataType(childValue, 
childField.getDataType())) {
+                        return false;
+                    }
+                }
+
+                return true;
+            }
             case SHORT:
                 return isShortTypeCompatible(value);
             case TIME:

http://git-wip-us.apache.org/repos/asf/nifi/blob/4700b865/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
index 0c54b7c..c57e07c 100755
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
@@ -126,6 +126,8 @@
                         
<exclude>src/test/resources/json/single-element-nested-array.json</exclude>
                         
<exclude>src/test/resources/json/single-element-nested.json</exclude>
                         
<exclude>src/test/resources/json/output/dataTypes.json</exclude>
+                        
<exclude>src/test/resources/json/elements-for-record-choice.json</exclude>
+                        
<exclude>src/test/resources/json/record-choice.avsc</exclude>
                         <exclude>src/test/resources/xml/people.xml</exclude>
                         <exclude>src/test/resources/xml/people2.xml</exclude>
                         <exclude>src/test/resources/xml/people3.xml</exclude>

http://git-wip-us.apache.org/repos/asf/nifi/blob/4700b865/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
index cc08d34..4f9a791 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
@@ -35,7 +35,9 @@ import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.ChoiceDataType;
 import org.apache.nifi.serialization.record.type.RecordDataType;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
 import org.codehaus.jackson.JsonFactory;
 import org.codehaus.jackson.JsonNode;
 import org.codehaus.jackson.JsonParseException;
@@ -134,7 +136,7 @@ public abstract class AbstractJsonRowRecordReader 
implements RecordReader {
                 final ArrayDataType arrayDataType = (ArrayDataType) dataType;
                 elementDataType = arrayDataType.getElementType();
             } else {
-                elementDataType = null;
+                elementDataType = dataType;
             }
 
             for (final JsonNode node : arrayNode) {
@@ -146,12 +148,34 @@ public abstract class AbstractJsonRowRecordReader 
implements RecordReader {
         }
 
         if (fieldNode.isObject()) {
-            RecordSchema childSchema;
+            RecordSchema childSchema = null;
             if (dataType != null && RecordFieldType.RECORD == 
dataType.getFieldType()) {
                 final RecordDataType recordDataType = (RecordDataType) 
dataType;
                 childSchema = recordDataType.getChildSchema();
-            } else {
-                childSchema = null;
+            } else if (dataType != null && RecordFieldType.CHOICE == 
dataType.getFieldType()) {
+                final ChoiceDataType choiceDataType = (ChoiceDataType) 
dataType;
+
+                for (final DataType possibleDataType : 
choiceDataType.getPossibleSubTypes()) {
+                    if (possibleDataType.getFieldType() != 
RecordFieldType.RECORD) {
+                        continue;
+                    }
+
+                    final RecordSchema possibleSchema = ((RecordDataType) 
possibleDataType).getChildSchema();
+
+                    final Map<String, Object> childValues = new HashMap<>();
+                    final Iterator<String> fieldNames = 
fieldNode.getFieldNames();
+                    while (fieldNames.hasNext()) {
+                        final String childFieldName = fieldNames.next();
+
+                        final Object childValue = 
getRawNodeValue(fieldNode.get(childFieldName), 
possibleSchema.getDataType(childFieldName).orElse(null));
+                        childValues.put(childFieldName, childValue);
+                    }
+
+                    final Record possibleRecord = new 
MapRecord(possibleSchema, childValues);
+                    if (DataTypeUtils.isCompatibleDataType(possibleRecord, 
possibleDataType)) {
+                        return possibleRecord;
+                    }
+                }
             }
 
             if (childSchema == null) {
@@ -162,7 +186,9 @@ public abstract class AbstractJsonRowRecordReader 
implements RecordReader {
             final Map<String, Object> childValues = new HashMap<>();
             while (fieldNames.hasNext()) {
                 final String childFieldName = fieldNames.next();
-                final Object childValue = 
getRawNodeValue(fieldNode.get(childFieldName), dataType);
+
+                final DataType childDataType = 
childSchema.getDataType(childFieldName).orElse(null);
+                final Object childValue = 
getRawNodeValue(fieldNode.get(childFieldName), childDataType);
                 childValues.put(childFieldName, childValue);
             }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/4700b865/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
index 9e2c965..e53fcc0 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
@@ -222,7 +222,7 @@ public class JsonTreeRowRecordReader extends 
AbstractJsonRowRecordReader {
                 }
             }
             case CHOICE: {
-                return DataTypeUtils.convertType(getRawNodeValue(fieldNode), 
desiredType, fieldName);
+                return DataTypeUtils.convertType(getRawNodeValue(fieldNode, 
desiredType), desiredType, fieldName);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/4700b865/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
index 73abdff..d71fd32 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
@@ -18,6 +18,7 @@
 package org.apache.nifi.json;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
@@ -36,6 +37,8 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import org.apache.avro.Schema;
+import org.apache.nifi.avro.AvroTypeUtil;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.SimpleRecordSchema;
@@ -136,6 +139,62 @@ public class TestJsonTreeRowRecordReader {
     }
 
     @Test
+    public void testChoiceOfRecordTypes() throws IOException, 
MalformedRecordException {
+        final Schema avroSchema = new Schema.Parser().parse(new 
File("src/test/resources/json/record-choice.avsc"));
+        final RecordSchema recordSchema = 
AvroTypeUtil.createSchema(avroSchema);
+
+        try (final InputStream in = new FileInputStream(new 
File("src/test/resources/json/elements-for-record-choice.json"));
+            final JsonTreeRowRecordReader reader = new 
JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), recordSchema, 
dateFormat, timeFormat, timestampFormat)) {
+
+            // evaluate first record
+            final Record firstRecord = reader.nextRecord();
+            assertNotNull(firstRecord);
+            final RecordSchema firstOuterSchema = firstRecord.getSchema();
+            assertEquals(Arrays.asList("id", "child"), 
firstOuterSchema.getFieldNames());
+            assertEquals("1234", firstRecord.getValue("id"));
+
+            // record should have a schema that indicates that the 'child' is 
a CHOICE of 2 different record types
+            
assertTrue(firstOuterSchema.getDataType("child").get().getFieldType() == 
RecordFieldType.CHOICE);
+            final List<DataType> firstSubTypes = ((ChoiceDataType) 
firstOuterSchema.getDataType("child").get()).getPossibleSubTypes();
+            assertEquals(2, firstSubTypes.size());
+            assertEquals(2L, firstSubTypes.stream().filter(type -> 
type.getFieldType() == RecordFieldType.RECORD).count());
+
+            // child record should have a schema with "id" as the only field
+            final Object childObject = firstRecord.getValue("child");
+            assertTrue(childObject instanceof Record);
+            final Record firstChildRecord = (Record) childObject;
+            final RecordSchema firstChildSchema = firstChildRecord.getSchema();
+
+            assertEquals(Arrays.asList("id"), 
firstChildSchema.getFieldNames());
+
+            // evaluate second record
+            final Record secondRecord = reader.nextRecord();
+            assertNotNull(secondRecord);
+
+            final RecordSchema secondOuterSchema = secondRecord.getSchema();
+            assertEquals(Arrays.asList("id", "child"), 
secondOuterSchema.getFieldNames());
+            assertEquals("1234", secondRecord.getValue("id"));
+
+            // record should have a schema that indicates that the 'child' is 
a CHOICE of 2 different record types
+            
assertTrue(secondOuterSchema.getDataType("child").get().getFieldType() == 
RecordFieldType.CHOICE);
+            final List<DataType> secondSubTypes = ((ChoiceDataType) 
secondOuterSchema.getDataType("child").get()).getPossibleSubTypes();
+            assertEquals(2, secondSubTypes.size());
+            assertEquals(2L, secondSubTypes.stream().filter(type -> 
type.getFieldType() == RecordFieldType.RECORD).count());
+
+            // child record should have a schema with "name" as the only field
+            final Object secondChildObject = secondRecord.getValue("child");
+            assertTrue(secondChildObject instanceof Record);
+            final Record secondChildRecord = (Record) secondChildObject;
+            final RecordSchema secondChildSchema = 
secondChildRecord.getSchema();
+
+            assertEquals(Arrays.asList("name"), 
secondChildSchema.getFieldNames());
+
+            assertNull(reader.nextRecord());
+        }
+
+    }
+
+    @Test
     public void testReadArray() throws IOException, MalformedRecordException {
         final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/4700b865/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/elements-for-record-choice.json
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/elements-for-record-choice.json
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/elements-for-record-choice.json
new file mode 100644
index 0000000..f154528
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/elements-for-record-choice.json
@@ -0,0 +1,11 @@
+[{
+  "id": "1234",
+  "child": {
+      "id": "4321"
+  }
+}, {
+  "id": "1234",
+  "child": {
+      "name": "child"
+  }
+}]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/4700b865/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/record-choice.avsc
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/record-choice.avsc
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/record-choice.avsc
new file mode 100644
index 0000000..51ea31e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/record-choice.avsc
@@ -0,0 +1,15 @@
+{
+  "name": "top", "namespace": "nifi",
+  "type": "record",
+  "fields": [
+    { "name": "id", "type": "string" },
+    { "name": "child", "type": [{
+         "name": "first", "type": "record",
+         "fields": [{ "name": "name", "type": "string" }]
+       }, {
+         "name": "second", "type": "record",
+         "fields": [{ "name": "id", "type": "string" }]
+       }]
+     }
+  ]
+}
\ No newline at end of file

Reply via email to