Repository: nifi Updated Branches: refs/heads/master 600586d6b -> f772f2f09
NIFI-4671: This closes #2328. Ensure that Avro Schemas that are created properly denote fields as being nullable iff the schemas says they are, for non-top-level fields Signed-off-by: joewitt <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/f772f2f0 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/f772f2f0 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/f772f2f0 Branch: refs/heads/master Commit: f772f2f093de38a97e6a71988628b5fd5aa4139c Parents: 600586d Author: Mark Payne <[email protected]> Authored: Wed Dec 6 11:40:05 2017 -0500 Committer: joewitt <[email protected]> Committed: Mon Dec 11 11:46:15 2017 -0500 ---------------------------------------------------------------------- .../nifi/serialization/record/ResultSetRecordSet.java | 11 ++++++++++- .../nifi/serialization/record/util/DataTypeUtils.java | 2 +- .../src/main/java/org/apache/nifi/avro/AvroTypeUtil.java | 7 ++++--- .../org/apache/nifi/csv/CSVHeaderSchemaStrategy.java | 2 +- .../src/main/java/org/apache/nifi/grok/GrokReader.java | 4 ++-- 5 files changed, 18 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/f772f2f0/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java index b6daab7..ad26d79 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java @@ -127,7 +127,16 @@ public class ResultSetRecordSet implements RecordSet, Closeable { final DataType dataType = getDataType(sqlType, rs, column); final String fieldName = metadata.getColumnLabel(column); - final RecordField field = new RecordField(fieldName, dataType); + + final int nullableFlag = metadata.isNullable(column); + final boolean nullable; + if (nullableFlag == ResultSetMetaData.columnNoNulls) { + nullable = false; + } else { + nullable = true; + } + + final RecordField field = new RecordField(fieldName, dataType, nullable); fields.add(field); } http://git-wip-us.apache.org/repos/asf/nifi/blob/f772f2f0/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java index 55a4d69..ccd9270 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java @@ -985,7 +985,7 @@ public class DataTypeUtils { dataType = RecordFieldType.CHOICE.getChoiceDataType(thisField.getDataType(), otherField.getDataType()); } - return new RecordField(fieldName, dataType, defaultValue, aliases); + return new RecordField(fieldName, dataType, defaultValue, aliases, thisField.isNullable() || otherField.isNullable()); } public static boolean isScalarValue(final DataType dataType, final Object value) { http://git-wip-us.apache.org/repos/asf/nifi/blob/f772f2f0/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java index abc381f..c5256c4 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java @@ -289,11 +289,12 @@ public class AvroTypeUtil { final String fieldName = field.name(); final Schema fieldSchema = field.schema(); final DataType fieldType = determineDataType(fieldSchema, knownRecordTypes); + final boolean nullable = isNullable(fieldSchema); if (field.defaultVal() == JsonProperties.NULL_VALUE) { - recordFields.add(new RecordField(fieldName, fieldType, field.aliases())); + recordFields.add(new RecordField(fieldName, fieldType, field.aliases(), nullable)); } else { - recordFields.add(new RecordField(fieldName, fieldType, field.defaultVal(), field.aliases())); + recordFields.add(new RecordField(fieldName, fieldType, field.defaultVal(), field.aliases(), nullable)); } } @@ -800,7 +801,7 @@ public class AvroTypeUtil { final DataType elementType = AvroTypeUtil.determineDataType(avroSchema.getValueType()); final List<RecordField> mapFields = new ArrayList<>(); for (final String key : map.keySet()) { - mapFields.add(new RecordField(key, elementType)); + mapFields.add(new RecordField(key, elementType, true)); } final RecordSchema mapSchema = new SimpleRecordSchema(mapFields); return new MapRecord(mapSchema, map); http://git-wip-us.apache.org/repos/asf/nifi/blob/f772f2f0/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java index 642f360..9c31cca 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java @@ -65,7 +65,7 @@ public class CSVHeaderSchemaStrategy implements SchemaAccessStrategy { final List<RecordField> fields = new ArrayList<>(); for (final String columnName : csvParser.getHeaderMap().keySet()) { - fields.add(new RecordField(columnName, RecordFieldType.STRING.getDataType())); + fields.add(new RecordField(columnName, RecordFieldType.STRING.getDataType(), true)); } return new SimpleRecordSchema(fields); http://git-wip-us.apache.org/repos/asf/nifi/blob/f772f2f0/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java index 4a26975..6eea8e3 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java @@ -150,8 +150,8 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac String grokExpression = grok.getOriginalGrokPattern(); populateSchemaFieldNames(grok, grokExpression, fields); - fields.add(new RecordField(GrokRecordReader.STACK_TRACE_COLUMN_NAME, RecordFieldType.STRING.getDataType())); - fields.add(new RecordField(GrokRecordReader.RAW_MESSAGE_NAME, RecordFieldType.STRING.getDataType())); + fields.add(new RecordField(GrokRecordReader.STACK_TRACE_COLUMN_NAME, RecordFieldType.STRING.getDataType(), true)); + fields.add(new RecordField(GrokRecordReader.RAW_MESSAGE_NAME, RecordFieldType.STRING.getDataType(), true)); final RecordSchema schema = new SimpleRecordSchema(fields); return schema;
