Repository: nifi
Updated Branches:
  refs/heads/master 600586d6b -> f772f2f09


NIFI-4671: This closes #2328. Ensure that Avro Schemas that are created 
properly denote fields as being nullable iff the schemas says they are, for 
non-top-level fields

Signed-off-by: joewitt <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/f772f2f0
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/f772f2f0
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/f772f2f0

Branch: refs/heads/master
Commit: f772f2f093de38a97e6a71988628b5fd5aa4139c
Parents: 600586d
Author: Mark Payne <[email protected]>
Authored: Wed Dec 6 11:40:05 2017 -0500
Committer: joewitt <[email protected]>
Committed: Mon Dec 11 11:46:15 2017 -0500

----------------------------------------------------------------------
 .../nifi/serialization/record/ResultSetRecordSet.java    | 11 ++++++++++-
 .../nifi/serialization/record/util/DataTypeUtils.java    |  2 +-
 .../src/main/java/org/apache/nifi/avro/AvroTypeUtil.java |  7 ++++---
 .../org/apache/nifi/csv/CSVHeaderSchemaStrategy.java     |  2 +-
 .../src/main/java/org/apache/nifi/grok/GrokReader.java   |  4 ++--
 5 files changed, 18 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/f772f2f0/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
index b6daab7..ad26d79 100644
--- 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
@@ -127,7 +127,16 @@ public class ResultSetRecordSet implements RecordSet, 
Closeable {
 
             final DataType dataType = getDataType(sqlType, rs, column);
             final String fieldName = metadata.getColumnLabel(column);
-            final RecordField field = new RecordField(fieldName, dataType);
+
+            final int nullableFlag = metadata.isNullable(column);
+            final boolean nullable;
+            if (nullableFlag == ResultSetMetaData.columnNoNulls) {
+                nullable = false;
+            } else {
+                nullable = true;
+            }
+
+            final RecordField field = new RecordField(fieldName, dataType, 
nullable);
             fields.add(field);
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/f772f2f0/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
index 55a4d69..ccd9270 100644
--- 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
@@ -985,7 +985,7 @@ public class DataTypeUtils {
             dataType = 
RecordFieldType.CHOICE.getChoiceDataType(thisField.getDataType(), 
otherField.getDataType());
         }
 
-        return new RecordField(fieldName, dataType, defaultValue, aliases);
+        return new RecordField(fieldName, dataType, defaultValue, aliases, 
thisField.isNullable() || otherField.isNullable());
     }
 
     public static boolean isScalarValue(final DataType dataType, final Object 
value) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/f772f2f0/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
index abc381f..c5256c4 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
@@ -289,11 +289,12 @@ public class AvroTypeUtil {
                         final String fieldName = field.name();
                         final Schema fieldSchema = field.schema();
                         final DataType fieldType = 
determineDataType(fieldSchema, knownRecordTypes);
+                        final boolean nullable = isNullable(fieldSchema);
 
                         if (field.defaultVal() == JsonProperties.NULL_VALUE) {
-                            recordFields.add(new RecordField(fieldName, 
fieldType, field.aliases()));
+                            recordFields.add(new RecordField(fieldName, 
fieldType, field.aliases(), nullable));
                         } else {
-                            recordFields.add(new RecordField(fieldName, 
fieldType, field.defaultVal(), field.aliases()));
+                            recordFields.add(new RecordField(fieldName, 
fieldType, field.defaultVal(), field.aliases(), nullable));
                         }
                     }
 
@@ -800,7 +801,7 @@ public class AvroTypeUtil {
                 final DataType elementType = 
AvroTypeUtil.determineDataType(avroSchema.getValueType());
                 final List<RecordField> mapFields = new ArrayList<>();
                 for (final String key : map.keySet()) {
-                    mapFields.add(new RecordField(key, elementType));
+                    mapFields.add(new RecordField(key, elementType, true));
                 }
                 final RecordSchema mapSchema = new 
SimpleRecordSchema(mapFields);
                 return new MapRecord(mapSchema, map);

http://git-wip-us.apache.org/repos/asf/nifi/blob/f772f2f0/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java
index 642f360..9c31cca 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java
@@ -65,7 +65,7 @@ public class CSVHeaderSchemaStrategy implements 
SchemaAccessStrategy {
 
                 final List<RecordField> fields = new ArrayList<>();
                 for (final String columnName : 
csvParser.getHeaderMap().keySet()) {
-                    fields.add(new RecordField(columnName, 
RecordFieldType.STRING.getDataType()));
+                    fields.add(new RecordField(columnName, 
RecordFieldType.STRING.getDataType(), true));
                 }
 
                 return new SimpleRecordSchema(fields);

http://git-wip-us.apache.org/repos/asf/nifi/blob/f772f2f0/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
index 4a26975..6eea8e3 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
@@ -150,8 +150,8 @@ public class GrokReader extends SchemaRegistryService 
implements RecordReaderFac
         String grokExpression = grok.getOriginalGrokPattern();
         populateSchemaFieldNames(grok, grokExpression, fields);
 
-        fields.add(new RecordField(GrokRecordReader.STACK_TRACE_COLUMN_NAME, 
RecordFieldType.STRING.getDataType()));
-        fields.add(new RecordField(GrokRecordReader.RAW_MESSAGE_NAME, 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField(GrokRecordReader.STACK_TRACE_COLUMN_NAME, 
RecordFieldType.STRING.getDataType(), true));
+        fields.add(new RecordField(GrokRecordReader.RAW_MESSAGE_NAME, 
RecordFieldType.STRING.getDataType(), true));
 
         final RecordSchema schema = new SimpleRecordSchema(fields);
         return schema;

Reply via email to