NIFI-4142: This closes #2015. Refactored Record Reader/Writer to allow for reading/writing "raw records". Implemented ValidateRecord. Updated Record Reader to take two parameters for nextRecord: (boolean coerceTypes) and (boolean dropUnknownFields)
Signed-off-by: joewitt <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/451f9cf1 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/451f9cf1 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/451f9cf1 Branch: refs/heads/master Commit: 451f9cf12407c4f7abeb8e538e3f55ecbf40abab Parents: 84935d4 Author: Mark Payne <[email protected]> Authored: Fri Jun 30 08:32:01 2017 -0400 Committer: joewitt <[email protected]> Committed: Fri Aug 11 22:01:46 2017 -0700 ---------------------------------------------------------------------- .../serialization/AbstractRecordSetWriter.java | 4 + .../apache/nifi/serialization/RecordReader.java | 27 +- .../apache/nifi/serialization/RecordWriter.java | 12 +- .../SchemaValidationException.java | 30 ++ .../nifi/serialization/record/MapRecord.java | 90 +++- .../serialization/record/RawRecordWriter.java | 33 ++ .../nifi/serialization/record/Record.java | 32 +- .../nifi/serialization/record/RecordField.java | 36 +- .../record/TypeMismatchException.java | 28 -- .../util/IllegalTypeConversionException.java | 4 +- .../validation/RecordSchemaValidator.java | 26 ++ .../validation/SchemaValidationResult.java | 26 ++ .../record/validation/ValidationContext.java | 26 ++ .../record/validation/ValidationError.java | 31 ++ .../record/validation/ValidationErrorType.java | 41 ++ .../java/org/apache/nifi/avro/AvroTypeUtil.java | 38 +- .../record/CommaSeparatedRecordReader.java | 2 +- .../serialization/record/MockRecordParser.java | 2 +- .../validation/SchemaValidationContext.java | 44 ++ .../StandardSchemaValidationResult.java | 45 ++ .../validation/StandardSchemaValidator.java | 263 +++++++++++ .../validation/StandardValidationError.java | 129 ++++++ .../validation/TestStandardSchemaValidator.java | 306 +++++++++++++ .../kafka/pubsub/util/MockRecordParser.java | 3 +- .../groovy/test_record_reader_inline.groovy | 2 +- .../groovy/test_record_reader_xml.groovy | 2 +- .../nifi-standard-processors/pom.xml | 29 +- .../processors/standard/ValidateRecord.java | 457 +++++++++++++++++++ .../org.apache.nifi.processor.Processor | 1 + .../org/apache/nifi/avro/AvroRecordReader.java | 2 +- .../java/org/apache/nifi/csv/CSVReader.java | 18 +- .../org/apache/nifi/csv/CSVRecordReader.java | 121 +++-- .../main/java/org/apache/nifi/csv/CSVUtils.java | 16 +- .../org/apache/nifi/csv/WriteCSVResult.java | 74 ++- .../java/org/apache/nifi/grok/GrokReader.java | 7 +- .../org/apache/nifi/grok/GrokRecordReader.java | 113 +++-- .../nifi/json/AbstractJsonRowRecordReader.java | 71 ++- .../nifi/json/JsonPathRowRecordReader.java | 79 +++- .../nifi/json/JsonTreeRowRecordReader.java | 83 ++-- .../org/apache/nifi/json/WriteJsonResult.java | 114 ++++- .../additionalDetails.html | 167 ++++++- .../avro/TestAvroReaderWithEmbeddedSchema.java | 12 +- .../apache/nifi/csv/TestCSVRecordReader.java | 172 ++++++- .../org/apache/nifi/csv/TestWriteCSVResult.java | 167 +++++++ .../apache/nifi/grok/TestGrokRecordReader.java | 16 +- .../nifi/json/TestJsonTreeRowRecordReader.java | 68 +++ .../apache/nifi/json/TestWriteJsonResult.java | 158 +++++++ 47 files changed, 2953 insertions(+), 274 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java index 4de5ce3..1e653c1 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java @@ -92,6 +92,10 @@ public abstract class AbstractRecordSetWriter implements RecordSetWriter { return WriteResult.of(recordCount, attributes == null ? Collections.emptyMap() : attributes); } + protected int incrementRecordCount() { + return ++recordCount; + } + /** * Method that is called as a result of {@link #beginRecordSet()} being called. This gives subclasses * the chance to react to a new RecordSet beginning but prevents the subclass from changing how this http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordReader.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordReader.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordReader.java index add248e..188f032 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordReader.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordReader.java @@ -38,14 +38,37 @@ import org.apache.nifi.serialization.record.RecordSet; public interface RecordReader extends Closeable { /** - * Returns the next record in the stream or <code>null</code> if no more records are available. + * Returns the next record in the stream or <code>null</code> if no more records are available. Types will be coerced and any unknown fields will be dropped. * * @return the next record in the stream or <code>null</code> if no more records are available. * * @throws IOException if unable to read from the underlying data * @throws MalformedRecordException if an unrecoverable failure occurs when trying to parse a record + * @throws SchemaValidationException if a Record contains a field that violates the schema and cannot be coerced into the appropriate field type. */ - Record nextRecord() throws IOException, MalformedRecordException; + default Record nextRecord() throws IOException, MalformedRecordException { + return nextRecord(true, true); + } + + /** + * Reads the next record from the underlying stream. If type coercion is enabled, then any field in the Record whose type does not + * match the schema will be coerced to the correct type and a MalformedRecordException will be thrown if unable to coerce the data into + * the correct type. If type coercion is disabled, then no type coercion will occur. As a result, calling + * {@link Record#getValue(org.apache.nifi.serialization.record.RecordField)} + * may return any type of Object, such as a String or another Record, even though the schema indicates that the field must be an integer. + * + * @param coerceTypes whether or not fields in the Record should be validated against the schema and coerced when necessary + * @param dropUnknownFields if <code>true</code>, any field that is found in the data that is not present in the schema will be dropped. If <code>false</code>, + * those fields will still be part of the Record (though their type cannot be coerced, since the schema does not provide a type for it). + * + * @return the next record in the stream or <code>null</code> if no more records are available + * @throws IOException if unable to read from the underlying data + * @throws MalformedRecordException if an unrecoverable failure occurs when trying to parse a record, or a Record contains a field + * that violates the schema and cannot be coerced into the appropriate field type. + * @throws SchemaValidationException if a Record contains a field that violates the schema and cannot be coerced into the appropriate + * field type and schema enforcement is enabled + */ + Record nextRecord(boolean coerceTypes, boolean dropUnknownFields) throws IOException, MalformedRecordException; /** * @return a RecordSchema that is appropriate for the records in the stream http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java index 6c21a39..3e1c4ab 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java @@ -24,24 +24,24 @@ import org.apache.nifi.serialization.record.Record; public interface RecordWriter extends Closeable { /** - * Writes the given result set to the given output stream + * Writes the given record to the underlying stream * - * @param record the record set to serialize + * @param record the record to write * @return the results of writing the data - * @throws IOException if unable to write to the given OutputStream + * @throws IOException if unable to write to the underlying stream */ WriteResult write(Record record) throws IOException; /** - * @return the MIME Type that the Result Set Writer produces. This will be added to FlowFiles using + * @return the MIME Type that the Record Writer produces. This will be added to FlowFiles using * the mime.type attribute. */ String getMimeType(); /** - * Flushes any buffered data to the underlying storage mechanism + * Flushes any buffered data to the underlying stream * - * @throws IOException if unable to write to the underlying storage mechanism + * @throws IOException if unable to write to the underlying stream */ void flush() throws IOException; } http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SchemaValidationException.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SchemaValidationException.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SchemaValidationException.java new file mode 100644 index 0000000..796cc8f --- /dev/null +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SchemaValidationException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization; + +public class SchemaValidationException extends RuntimeException { + + public SchemaValidationException(final String message) { + super(message); + } + + public SchemaValidationException(final String message, final Throwable cause) { + super(message, cause); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java index ca33e32..c3444ed 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java @@ -17,12 +17,16 @@ package org.apache.nifi.serialization.record; +import java.text.DateFormat; import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; +import java.util.function.Supplier; +import org.apache.nifi.serialization.SchemaValidationException; import org.apache.nifi.serialization.record.type.ArrayDataType; import org.apache.nifi.serialization.record.type.MapDataType; import org.apache.nifi.serialization.record.util.DataTypeUtils; @@ -32,17 +36,67 @@ public class MapRecord implements Record { private RecordSchema schema; private final Map<String, Object> values; private Optional<SerializedForm> serializedForm; + private final boolean checkTypes; + private final boolean dropUnknownFields; + public MapRecord(final RecordSchema schema, final Map<String, Object> values) { + this(schema, values, false, false); + } + + public MapRecord(final RecordSchema schema, final Map<String, Object> values, final boolean checkTypes, final boolean dropUnknownFields) { + Objects.requireNonNull(values); + this.schema = Objects.requireNonNull(schema); - this.values = Objects.requireNonNull(values); + this.values = checkTypes ? checkTypes(values, schema) : values; this.serializedForm = Optional.empty(); + this.checkTypes = checkTypes; + this.dropUnknownFields = dropUnknownFields; } public MapRecord(final RecordSchema schema, final Map<String, Object> values, final SerializedForm serializedForm) { + this(schema, values, serializedForm, false, false); + } + + public MapRecord(final RecordSchema schema, final Map<String, Object> values, final SerializedForm serializedForm, final boolean checkTypes, final boolean dropUnknownFields) { + Objects.requireNonNull(values); + this.schema = Objects.requireNonNull(schema); - this.values = Objects.requireNonNull(values); + this.values = checkTypes ? checkTypes(values, schema) : values; this.serializedForm = Optional.ofNullable(serializedForm); + this.checkTypes = checkTypes; + this.dropUnknownFields = dropUnknownFields; + } + + private Map<String, Object> checkTypes(final Map<String, Object> values, final RecordSchema schema) { + for (final RecordField field : schema.getFields()) { + final Object value = getExplicitValue(field, values); + + if (value == null) { + if (field.isNullable()) { + continue; + } + + throw new SchemaValidationException("Field " + field.getFieldName() + " cannot be null"); + } + + if (!DataTypeUtils.isCompatibleDataType(value, field.getDataType())) { + throw new SchemaValidationException("Field " + field.getFieldName() + " has a value of " + value + + ", which cannot be coerced into the appropriate data type of " + field.getDataType()); + } + } + + return values; + } + + @Override + public boolean isDropUnknownFields() { + return dropUnknownFields; + } + + @Override + public boolean isTypeChecked() { + return checkTypes; } @Override @@ -67,7 +121,11 @@ public class MapRecord implements Record { return getValue(fieldOption.get()); } - return null; + if (dropUnknownFields) { + return null; + } + + return this.values.get(fieldName); } @Override @@ -115,6 +173,10 @@ public class MapRecord implements Record { } private Object getExplicitValue(final RecordField field) { + return getExplicitValue(field, this.values); + } + + private Object getExplicitValue(final RecordField field, final Map<String, Object> values) { final String canonicalFieldName = field.getFieldName(); // We use containsKey here instead of just calling get() and checking for a null value @@ -139,11 +201,11 @@ public class MapRecord implements Record { @Override public String getAsString(final String fieldName) { final Optional<DataType> dataTypeOption = schema.getDataType(fieldName); - if (!dataTypeOption.isPresent()) { - return null; + if (dataTypeOption.isPresent()) { + return convertToString(getValue(fieldName), dataTypeOption.get().getFormat()); } - return convertToString(getValue(fieldName), dataTypeOption.get().getFormat()); + return DataTypeUtils.toString(getValue(fieldName), (Supplier<DateFormat>) null); } @Override @@ -239,11 +301,20 @@ public class MapRecord implements Record { public void setValue(final String fieldName, final Object value) { final Optional<RecordField> field = getSchema().getField(fieldName); if (!field.isPresent()) { + if (dropUnknownFields) { + return; + } + + final Object previousValue = values.put(fieldName, value); + if (!Objects.equals(value, previousValue)) { + serializedForm = Optional.empty(); + } + return; } final RecordField recordField = field.get(); - final Object coerced = DataTypeUtils.convertType(value, recordField.getDataType(), fieldName); + final Object coerced = isTypeChecked() ? DataTypeUtils.convertType(value, recordField.getDataType(), fieldName) : value; final Object previousValue = values.put(recordField.getFieldName(), coerced); if (!Objects.equals(coerced, previousValue)) { serializedForm = Optional.empty(); @@ -327,4 +398,9 @@ public class MapRecord implements Record { public void incorporateSchema(RecordSchema other) { this.schema = DataTypeUtils.merge(this.schema, other); } + + @Override + public Set<String> getRawFieldNames() { + return values.keySet(); + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RawRecordWriter.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RawRecordWriter.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RawRecordWriter.java new file mode 100644 index 0000000..b62c50e --- /dev/null +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RawRecordWriter.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization.record; + +import java.io.IOException; + +import org.apache.nifi.serialization.WriteResult; + +public interface RawRecordWriter { + /** + * Writes the given result set to the underlying stream + * + * @param record the record to write + * @return the results of writing the data + * @throws IOException if unable to write to the underlying stream + */ + WriteResult writeRawRecord(Record record) throws IOException; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/Record.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/Record.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/Record.java index 822352d..1a89225 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/Record.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/Record.java @@ -19,6 +19,7 @@ package org.apache.nifi.serialization.record; import java.util.Date; import java.util.Optional; +import java.util.Set; import org.apache.nifi.serialization.record.util.IllegalTypeConversionException; @@ -27,6 +28,24 @@ public interface Record { RecordSchema getSchema(); /** + * Indicates whether or not field values for this record are expected to be coerced into the type designated by the schema. + * If <code>true</code>, then it is safe to assume that calling {@link #getValue(RecordField)} will return an Object of the appropriate + * type according to the schema, or an object that can be coerced into the appropriate type. If type checking + * is not enabled, then calling {@link #getValue(RecordField)} can return an object of any type. + * + * @return <code>true</code> if type checking is enabled, <code>false</code> otherwise. + */ + boolean isTypeChecked(); + + /** + * If <code>true</code>, any field that is added to the record will be drop unless the field is known by the schema + * + * @return <code>true</code> if fields that are unknown to the schema will be dropped, <code>false</code> + * if all field values are retained. + */ + boolean isDropUnknownFields(); + + /** * Updates the Record's schema to to incorporate all of the fields in the given schema. If both schemas have a * field with the same name but a different type, then the existing schema will be updated to have a * {@link RecordFieldType#CHOICE} field with both types as choices. If two fields have the same name but different @@ -45,7 +64,9 @@ public interface Record { /** * <p> - * Returns a view of the the values of the fields in this Record. + * Returns a view of the the values of the fields in this Record. Note that this method returns values only for + * those entries in the Record's schema. This allows the Record to guarantee that it will return the values in + * the order dictated by the schema. * </p> * * <b>NOTE:</b> The array that is returned may be an underlying array that is backing @@ -134,4 +155,13 @@ public interface Record { * name is not a Map */ void setMapValue(String fieldName, String mapKey, Object value); + + /** + * Returns a Set that contains the names of all of the fields that are present in the Record, regardless of + * whether or not those fields are contained in the schema. To determine which fields exist in the Schema, use + * {@link #getSchema()}.{@link RecordSchema#getFieldNames() getFieldNames()} instead. + * + * @return a Set that contains the names of all of the fields that are present in the Record + */ + Set<String> getRawFieldNames(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordField.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordField.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordField.java index c7cd290..41da6be 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordField.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordField.java @@ -24,24 +24,43 @@ import java.util.Objects; import java.util.Set; public class RecordField { + private static final boolean DEFAULT_NULLABLE = true; + private final String fieldName; private final DataType dataType; private final Set<String> aliases; private final Object defaultValue; + private final boolean nullable; public RecordField(final String fieldName, final DataType dataType) { - this(fieldName, dataType, null, Collections.emptySet()); + this(fieldName, dataType, null, Collections.emptySet(), DEFAULT_NULLABLE); + } + + public RecordField(final String fieldName, final DataType dataType, final boolean nullable) { + this(fieldName, dataType, null, Collections.emptySet(), nullable); } public RecordField(final String fieldName, final DataType dataType, final Object defaultValue) { - this(fieldName, dataType, defaultValue, Collections.emptySet()); + this(fieldName, dataType, defaultValue, Collections.emptySet(), DEFAULT_NULLABLE); + } + + public RecordField(final String fieldName, final DataType dataType, final Object defaultValue, final boolean nullable) { + this(fieldName, dataType, defaultValue, Collections.emptySet(), nullable); } public RecordField(final String fieldName, final DataType dataType, final Set<String> aliases) { - this(fieldName, dataType, null, aliases); + this(fieldName, dataType, null, aliases, DEFAULT_NULLABLE); + } + + public RecordField(final String fieldName, final DataType dataType, final Set<String> aliases, final boolean nullable) { + this(fieldName, dataType, null, aliases, nullable); } public RecordField(final String fieldName, final DataType dataType, final Object defaultValue, final Set<String> aliases) { + this(fieldName, dataType, defaultValue, aliases, DEFAULT_NULLABLE); + } + + public RecordField(final String fieldName, final DataType dataType, final Object defaultValue, final Set<String> aliases, final boolean nullable) { if (defaultValue != null && !DataTypeUtils.isCompatibleDataType(defaultValue, dataType)) { throw new IllegalArgumentException("Cannot set the default value for field [" + fieldName + "] to [" + defaultValue + "] because that is not a valid value for Data Type [" + dataType + "]"); @@ -51,6 +70,7 @@ public class RecordField { this.dataType = Objects.requireNonNull(dataType); this.aliases = Collections.unmodifiableSet(Objects.requireNonNull(aliases)); this.defaultValue = defaultValue; + this.nullable = nullable; } public String getFieldName() { @@ -69,6 +89,10 @@ public class RecordField { return defaultValue; } + public boolean isNullable() { + return nullable; + } + @Override public int hashCode() { final int prime = 31; @@ -77,6 +101,7 @@ public class RecordField { result = prime * result + fieldName.hashCode(); result = prime * result + aliases.hashCode(); result = prime * result + ((defaultValue == null) ? 0 : defaultValue.hashCode()); + result = prime * result + Boolean.hashCode(nullable); return result; } @@ -94,11 +119,12 @@ public class RecordField { } RecordField other = (RecordField) obj; - return dataType.equals(other.getDataType()) && fieldName.equals(other.getFieldName()) && aliases.equals(other.getAliases()) && Objects.equals(defaultValue, other.defaultValue); + return dataType.equals(other.getDataType()) && fieldName.equals(other.getFieldName()) && aliases.equals(other.getAliases()) && Objects.equals(defaultValue, other.defaultValue) + && nullable == other.nullable; } @Override public String toString() { - return "RecordField[name=" + fieldName + ", dataType=" + dataType + (aliases.isEmpty() ? "" : ", aliases=" + aliases) + "]"; + return "RecordField[name=" + fieldName + ", dataType=" + dataType + (aliases.isEmpty() ? "" : ", aliases=" + aliases) + ", nullable=" + nullable + "]"; } } http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/TypeMismatchException.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/TypeMismatchException.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/TypeMismatchException.java deleted file mode 100644 index af5f909..0000000 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/TypeMismatchException.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.serialization.record; - -public class TypeMismatchException extends RuntimeException { - public TypeMismatchException(String message) { - super(message); - } - - public TypeMismatchException(String message, Throwable cause) { - super(message, cause); - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/IllegalTypeConversionException.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/IllegalTypeConversionException.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/IllegalTypeConversionException.java index 38b5d20..32c5ea5 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/IllegalTypeConversionException.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/IllegalTypeConversionException.java @@ -17,7 +17,9 @@ package org.apache.nifi.serialization.record.util; -public class IllegalTypeConversionException extends RuntimeException { +import org.apache.nifi.serialization.SchemaValidationException; + +public class IllegalTypeConversionException extends SchemaValidationException { public IllegalTypeConversionException(final String message) { super(message); http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/validation/RecordSchemaValidator.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/validation/RecordSchemaValidator.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/validation/RecordSchemaValidator.java new file mode 100644 index 0000000..f7dd880 --- /dev/null +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/validation/RecordSchemaValidator.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization.record.validation; + +import org.apache.nifi.serialization.record.Record; + +public interface RecordSchemaValidator { + + SchemaValidationResult validate(Record record); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/validation/SchemaValidationResult.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/validation/SchemaValidationResult.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/validation/SchemaValidationResult.java new file mode 100644 index 0000000..423120a --- /dev/null +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/validation/SchemaValidationResult.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization.record.validation; + +import java.util.Collection; + +public interface SchemaValidationResult { + boolean isValid(); + + Collection<ValidationError> getValidationErrors(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/validation/ValidationContext.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/validation/ValidationContext.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/validation/ValidationContext.java new file mode 100644 index 0000000..441dbb2 --- /dev/null +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/validation/ValidationContext.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization.record.validation; + +public interface ValidationContext { + + boolean isExtraFieldAllowed(); + + boolean isTypeCoercionAllowed(); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/validation/ValidationError.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/validation/ValidationError.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/validation/ValidationError.java new file mode 100644 index 0000000..6f65bc5 --- /dev/null +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/validation/ValidationError.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization.record.validation; + +import java.util.Optional; + +public interface ValidationError { + + Optional<String> getFieldName(); + + Optional<Object> getInputValue(); + + String getExplanation(); + + ValidationErrorType getType(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/validation/ValidationErrorType.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/validation/ValidationErrorType.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/validation/ValidationErrorType.java new file mode 100644 index 0000000..3f014eb --- /dev/null +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/validation/ValidationErrorType.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization.record.validation; + +public enum ValidationErrorType { + /** + * A required field (i.e., a field that is not 'nullable') exists in the schema, but the record had no value for this field + * or the value for this field was <code>null</code>. + */ + MISSING_FIELD, + + /** + * The record had a field that was not valid according to the schema. + */ + EXTRA_FIELD, + + /** + * The record had a value for a field, but the value was not valid according to the schema. + */ + INVALID_FIELD, + + /** + * Some other sort of validation error occurred. + */ + OTHER; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java index a9f051e..abc381f 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java @@ -75,6 +75,7 @@ public class AvroTypeUtil { private static final String LOGICAL_TYPE_TIMESTAMP_MICROS = "timestamp-micros"; private static final String LOGICAL_TYPE_DECIMAL = "decimal"; + public static Schema extractAvroSchema(final RecordSchema recordSchema) { if (recordSchema == null) { throw new IllegalArgumentException("RecordSchema cannot be null"); @@ -110,7 +111,7 @@ public class AvroTypeUtil { } private static Field buildAvroField(final RecordField recordField) { - final Schema schema = buildAvroSchema(recordField.getDataType(), recordField.getFieldName()); + final Schema schema = buildAvroSchema(recordField.getDataType(), recordField.getFieldName(), recordField.isNullable()); final Field field = new Field(recordField.getFieldName(), schema, null, recordField.getDefaultValue()); for (final String alias : recordField.getAliases()) { field.addAlias(alias); @@ -119,7 +120,7 @@ public class AvroTypeUtil { return field; } - private static Schema buildAvroSchema(final DataType dataType, final String fieldName) { + private static Schema buildAvroSchema(final DataType dataType, final String fieldName, final boolean nullable) { final Schema schema; switch (dataType.getFieldType()) { @@ -129,7 +130,7 @@ public class AvroTypeUtil { if (RecordFieldType.BYTE.equals(elementDataType.getFieldType())) { schema = Schema.create(Type.BYTES); } else { - final Schema elementType = buildAvroSchema(elementDataType, fieldName); + final Schema elementType = buildAvroSchema(elementDataType, fieldName, false); schema = Schema.createArray(elementType); } break; @@ -151,7 +152,7 @@ public class AvroTypeUtil { final List<Schema> unionTypes = new ArrayList<>(options.size()); for (final DataType option : options) { - unionTypes.add(buildAvroSchema(option, fieldName)); + unionTypes.add(buildAvroSchema(option, fieldName, false)); } schema = Schema.createUnion(unionTypes); @@ -173,7 +174,7 @@ public class AvroTypeUtil { schema = Schema.create(Type.LONG); break; case MAP: - schema = Schema.createMap(buildAvroSchema(((MapDataType) dataType).getValueType(), fieldName)); + schema = Schema.createMap(buildAvroSchema(((MapDataType) dataType).getValueType(), fieldName, false)); break; case RECORD: final RecordDataType recordDataType = (RecordDataType) dataType; @@ -204,7 +205,11 @@ public class AvroTypeUtil { return null; } - return nullable(schema); + if (nullable) { + return nullable(schema); + } else { + return schema; + } } private static Schema nullable(final Schema schema) { @@ -362,12 +367,14 @@ public class AvroTypeUtil { final List<RecordField> recordFields = new ArrayList<>(avroSchema.getFields().size()); for (final Field field : avroSchema.getFields()) { final String fieldName = field.name(); - final DataType dataType = AvroTypeUtil.determineDataType(field.schema(), knownRecords); + final Schema fieldSchema = field.schema(); + final DataType dataType = AvroTypeUtil.determineDataType(fieldSchema, knownRecords); + final boolean nullable = isNullable(fieldSchema); if (field.defaultVal() == JsonProperties.NULL_VALUE) { - recordFields.add(new RecordField(fieldName, dataType, field.aliases())); + recordFields.add(new RecordField(fieldName, dataType, field.aliases(), nullable)); } else { - recordFields.add(new RecordField(fieldName, dataType, field.defaultVal(), field.aliases())); + recordFields.add(new RecordField(fieldName, dataType, field.defaultVal(), field.aliases(), nullable)); } } @@ -375,6 +382,19 @@ public class AvroTypeUtil { return recordSchema; } + public static boolean isNullable(final Schema schema) { + final Type schemaType = schema.getType(); + if (schemaType == Type.UNION) { + for (final Schema unionSchema : schema.getTypes()) { + if (isNullable(unionSchema)) { + return true; + } + } + } + + return schemaType == Type.NULL; + } + public static Object[] convertByteArray(final byte[] bytes) { final Object[] array = new Object[bytes.length]; for (int i = 0; i < bytes.length; i++) { http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/CommaSeparatedRecordReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/CommaSeparatedRecordReader.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/CommaSeparatedRecordReader.java index 8973055..1c22687 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/CommaSeparatedRecordReader.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/CommaSeparatedRecordReader.java @@ -70,7 +70,7 @@ public class CommaSeparatedRecordReader extends AbstractControllerService implem } @Override - public Record nextRecord() throws IOException, MalformedRecordException { + public Record nextRecord(final boolean coerceTypes, final boolean dropUnknown) throws IOException, MalformedRecordException { if (failAfterN > -1 && recordCount >= failAfterN) { throw new MalformedRecordException("Intentional Unit Test Exception because " + recordCount + " records have been read"); } http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java index 251eb46..b6606a8 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java @@ -76,7 +76,7 @@ public class MockRecordParser extends AbstractControllerService implements Recor } @Override - public Record nextRecord() throws IOException, MalformedRecordException { + public Record nextRecord(final boolean coerceTypes, final boolean dropUnknown) throws IOException, MalformedRecordException { if (failAfterN >= recordCount) { throw new MalformedRecordException("Intentional Unit Test Exception because " + recordCount + " records have been read"); } http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/validation/SchemaValidationContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/validation/SchemaValidationContext.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/validation/SchemaValidationContext.java new file mode 100644 index 0000000..53711de --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/validation/SchemaValidationContext.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.schema.validation; + +import org.apache.nifi.serialization.record.RecordSchema; + +public class SchemaValidationContext { + private final RecordSchema schema; + private final boolean allowExtraFields; + private final boolean strictTypeChecking; + + public SchemaValidationContext(final RecordSchema schema, final boolean allowExtraFields, final boolean strictTypeChecking) { + this.schema = schema; + this.allowExtraFields = allowExtraFields; + this.strictTypeChecking = strictTypeChecking; + } + + public RecordSchema getSchema() { + return schema; + } + + public boolean isExtraFieldAllowed() { + return allowExtraFields; + } + + public boolean isStrictTypeChecking() { + return strictTypeChecking; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/validation/StandardSchemaValidationResult.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/validation/StandardSchemaValidationResult.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/validation/StandardSchemaValidationResult.java new file mode 100644 index 0000000..f304c9f --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/validation/StandardSchemaValidationResult.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.schema.validation; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import org.apache.nifi.serialization.record.validation.SchemaValidationResult; +import org.apache.nifi.serialization.record.validation.ValidationError; + +public class StandardSchemaValidationResult implements SchemaValidationResult { + + private final List<ValidationError> validationErrors = new ArrayList<>(); + + @Override + public boolean isValid() { + return validationErrors.isEmpty(); + } + + @Override + public Collection<ValidationError> getValidationErrors() { + return Collections.unmodifiableList(validationErrors); + } + + public void addValidationError(final ValidationError validationError) { + this.validationErrors.add(validationError); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/validation/StandardSchemaValidator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/validation/StandardSchemaValidator.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/validation/StandardSchemaValidator.java new file mode 100644 index 0000000..d467962 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/validation/StandardSchemaValidator.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.schema.validation; + +import java.math.BigInteger; +import java.util.Map; + +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.type.ArrayDataType; +import org.apache.nifi.serialization.record.type.ChoiceDataType; +import org.apache.nifi.serialization.record.type.MapDataType; +import org.apache.nifi.serialization.record.type.RecordDataType; +import org.apache.nifi.serialization.record.util.DataTypeUtils; +import org.apache.nifi.serialization.record.validation.RecordSchemaValidator; +import org.apache.nifi.serialization.record.validation.SchemaValidationResult; +import org.apache.nifi.serialization.record.validation.ValidationError; +import org.apache.nifi.serialization.record.validation.ValidationErrorType; + +public class StandardSchemaValidator implements RecordSchemaValidator { + private final SchemaValidationContext validationContext; + + public StandardSchemaValidator(final SchemaValidationContext validationContext) { + this.validationContext = validationContext; + } + + @Override + public SchemaValidationResult validate(final Record record) { + return validate(record, validationContext.getSchema(), ""); + } + + private SchemaValidationResult validate(final Record record, final RecordSchema schema, final String fieldPrefix) { + // Ensure that for every field in the schema, the type is correct (if we care) and that + // a value is present (unless it is nullable). + final StandardSchemaValidationResult result = new StandardSchemaValidationResult(); + + for (final RecordField field : schema.getFields()) { + final Object rawValue = record.getValue(field); + + // If there is no value, then it is always valid unless the field is required. + if (rawValue == null) { + if (!field.isNullable() && field.getDefaultValue() == null) { + result.addValidationError(new StandardValidationError(concat(fieldPrefix, field), ValidationErrorType.MISSING_FIELD, "Field is required")); + } + + continue; + } + + // Check that the type is correct. + final DataType dataType = field.getDataType(); + if (validationContext.isStrictTypeChecking()) { + if (!isTypeCorrect(rawValue, dataType)) { + result.addValidationError(new StandardValidationError(concat(fieldPrefix, field), rawValue, ValidationErrorType.INVALID_FIELD, + "Value is of type " + rawValue.getClass().getName() + " but was expected to be of type " + dataType)); + + continue; + } + } else { + // Use a lenient type check. This will be true if, for instance, a value is the String "123" and should be an integer + // but will be false if the value is "123" and should be an Array or Record. + if (!DataTypeUtils.isCompatibleDataType(rawValue, dataType)) { + result.addValidationError(new StandardValidationError(concat(fieldPrefix, field), rawValue, ValidationErrorType.INVALID_FIELD, + "Value is of type " + rawValue.getClass().getName() + " but was expected to be of type " + dataType)); + + continue; + } + } + + // If the field type is RECORD, or if the field type is a CHOICE that allows for a RECORD and the value is a RECORD, then we + // need to dig into each of the sub-fields. To do this, we first need to determine the 'canonical data type'. + final DataType canonicalDataType = getCanonicalDataType(dataType, rawValue, result, fieldPrefix, field); + if (canonicalDataType == null) { + continue; + } + + // Now that we have the 'canonical data type', we check if it is a Record. If so, we need to validate each sub-field. + verifyComplexType(dataType, rawValue, result, fieldPrefix, field); + } + + if (!validationContext.isExtraFieldAllowed()) { + for (final String fieldName : record.getRawFieldNames()) { + if (!schema.getDataType(fieldName).isPresent()) { + result.addValidationError(new StandardValidationError(fieldPrefix + "/" + fieldName, ValidationErrorType.EXTRA_FIELD, "Field is not present in the schema")); + } + } + } + + return result; + } + + private void verifyComplexType(final DataType dataType, final Object rawValue, final StandardSchemaValidationResult result, final String fieldPrefix, final RecordField field) { + // If the field type is RECORD, or if the field type is a CHOICE that allows for a RECORD and the value is a RECORD, then we + // need to dig into each of the sub-fields. To do this, we first need to determine the 'canonical data type'. + final DataType canonicalDataType = getCanonicalDataType(dataType, rawValue, result, fieldPrefix, field); + if (canonicalDataType == null) { + return; + } + + // Now that we have the 'canonical data type', we check if it is a Record. If so, we need to validate each sub-field. + if (canonicalDataType.getFieldType() == RecordFieldType.RECORD) { + verifyChildRecord(canonicalDataType, rawValue, dataType, result, field, fieldPrefix); + } + + if (canonicalDataType.getFieldType() == RecordFieldType.ARRAY) { + final ArrayDataType arrayDataType = (ArrayDataType) canonicalDataType; + final DataType elementType = arrayDataType.getElementType(); + final Object[] arrayObject = (Object[]) rawValue; + + int i=0; + for (final Object arrayValue : arrayObject) { + verifyComplexType(elementType, arrayValue, result, fieldPrefix + "[" + i + "]", field); + i++; + } + } + } + + private DataType getCanonicalDataType(final DataType dataType, final Object rawValue, final StandardSchemaValidationResult result, final String fieldPrefix, final RecordField field) { + final RecordFieldType fieldType = dataType.getFieldType(); + final DataType canonicalDataType; + if (fieldType == RecordFieldType.CHOICE) { + canonicalDataType = DataTypeUtils.chooseDataType(rawValue, (ChoiceDataType) dataType); + + if (canonicalDataType == null) { + result.addValidationError(new StandardValidationError(concat(fieldPrefix, field), rawValue, ValidationErrorType.INVALID_FIELD, + "Value is of type " + rawValue.getClass().getName() + " but was expected to be of type " + dataType)); + + return null; + } + } else { + canonicalDataType = dataType; + } + + return canonicalDataType; + } + + private void verifyChildRecord(final DataType canonicalDataType, final Object rawValue, final DataType expectedDataType, final StandardSchemaValidationResult result, + final RecordField field, final String fieldPrefix) { + // Now that we have the 'canonical data type', we check if it is a Record. If so, we need to validate each sub-field. + if (canonicalDataType.getFieldType() == RecordFieldType.RECORD) { + if (!(rawValue instanceof Record)) { // sanity check + result.addValidationError(new StandardValidationError(concat(fieldPrefix, field), rawValue, ValidationErrorType.INVALID_FIELD, + "Value is of type " + rawValue.getClass().getName() + " but was expected to be of type " + expectedDataType)); + + return; + } + + final RecordDataType recordDataType = (RecordDataType) canonicalDataType; + final RecordSchema childSchema = recordDataType.getChildSchema(); + + final String fullChildFieldName = concat(fieldPrefix, field); + final SchemaValidationResult childValidationResult = validate((Record) rawValue, childSchema, fullChildFieldName); + if (childValidationResult.isValid()) { + return; + } + + for (final ValidationError validationError : childValidationResult.getValidationErrors()) { + result.addValidationError(validationError); + } + } + } + + private boolean isTypeCorrect(final Object value, final DataType dataType) { + switch (dataType.getFieldType()) { + case ARRAY: + if (!(value instanceof Object[])) { + return false; + } + + final ArrayDataType arrayDataType = (ArrayDataType) dataType; + final DataType elementType = arrayDataType.getElementType(); + + final Object[] array = (Object[]) value; + for (final Object arrayVal : array) { + if (!isTypeCorrect(arrayVal, elementType)) { + return false; + } + } + + return true; + case MAP: + if (!(value instanceof Map)) { + return false; + } + + final MapDataType mapDataType = (MapDataType) dataType; + final DataType valueDataType = mapDataType.getValueType(); + final Map<?, ?> map = (Map<?, ?>) value; + + for (final Object mapValue : map.values()) { + if (!isTypeCorrect(mapValue, valueDataType)) { + return false; + } + } + + return true; + case RECORD: + return value instanceof Record; + case CHOICE: + final ChoiceDataType choiceDataType = (ChoiceDataType) dataType; + for (final DataType choice : choiceDataType.getPossibleSubTypes()) { + if (isTypeCorrect(value, choice)) { + return true; + } + } + + return false; + case BIGINT: + return value instanceof BigInteger; + case BOOLEAN: + return value instanceof Boolean; + case BYTE: + return value instanceof Byte; + case CHAR: + return value instanceof Character; + case DATE: + return value instanceof java.sql.Date; + case DOUBLE: + return value instanceof Double; + case FLOAT: + // Some readers do not provide float vs. double. + // We should consider if it makes sense to allow either a Float or a Double here or have + // a Reader indicate whether or not it supports higher precision, etc. + // Same goes for Short/Integer + return value instanceof Float; + case INT: + return value instanceof Integer; + case LONG: + return value instanceof Long; + case SHORT: + return value instanceof Short; + case STRING: + return value instanceof String; + case TIME: + return value instanceof java.sql.Time; + case TIMESTAMP: + return value instanceof java.sql.Timestamp; + } + + return false; + } + + private String concat(final String fieldPrefix, final RecordField field) { + return fieldPrefix + "/" + field.getFieldName(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/validation/StandardValidationError.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/validation/StandardValidationError.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/validation/StandardValidationError.java new file mode 100644 index 0000000..fdd44d4 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/validation/StandardValidationError.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.schema.validation; + +import java.util.Objects; +import java.util.Optional; + +import org.apache.nifi.serialization.record.validation.ValidationError; +import org.apache.nifi.serialization.record.validation.ValidationErrorType; + +public class StandardValidationError implements ValidationError { + private final Optional<String> fieldName; + private final Optional<Object> inputValue; + private final String explanation; + private final ValidationErrorType type; + + + public StandardValidationError(final String fieldName, final Object value, final ValidationErrorType type, final String explanation) { + this.fieldName = Optional.ofNullable(fieldName); + this.inputValue = Optional.ofNullable(value); + this.type = type; + this.explanation = explanation; + } + + public StandardValidationError(final String fieldName, final ValidationErrorType type, final String explanation) { + this.fieldName = Optional.ofNullable(fieldName); + this.inputValue = Optional.empty(); + this.type = type; + this.explanation = Objects.requireNonNull(explanation); + } + + public StandardValidationError(final ValidationErrorType type, final String explanation) { + this.fieldName = Optional.empty(); + this.inputValue = Optional.empty(); + this.type = type; + this.explanation = Objects.requireNonNull(explanation); + } + + @Override + public ValidationErrorType getType() { + return type; + } + + @Override + public Optional<String> getFieldName() { + return fieldName; + } + + @Override + public Optional<Object> getInputValue() { + return inputValue; + } + + @Override + public String getExplanation() { + return explanation; + } + + @Override + public String toString() { + if (fieldName.isPresent()) { + if (inputValue.isPresent()) { + final Object input = inputValue.get(); + if (input instanceof Object[]) { + final StringBuilder sb = new StringBuilder("["); + final Object[] array = (Object[]) input; + for (int i=0; i < array.length; i++) { + + final Object arrayValue = array[i]; + if (arrayValue instanceof String) { + sb.append('"').append(array[i]).append('"'); + } else { + sb.append(array[i]); + } + + if (i < array.length - 1) { + sb.append(", "); + } + } + sb.append("]"); + + return sb.toString() + " is not a valid value for " + fieldName.get() + ": " + explanation; + } else { + return inputValue.get() + " is not a valid value for " + fieldName.get() + ": " + explanation; + } + } else { + return fieldName.get() + " is invalid due to: " + explanation; + } + } + + return explanation; + } + + @Override + public int hashCode() { + return 31 + 17 * fieldName.hashCode() + 17 * inputValue.hashCode() + 17 * explanation.hashCode(); + } + + @Override + public boolean equals(final Object obj) { + if (obj == null) { + return false; + } + if (obj == this) { + return true; + } + if (!(obj instanceof ValidationError)) { + return false; + } + + final ValidationError other = (ValidationError) obj; + return getFieldName().equals(other.getFieldName()) && getInputValue().equals(other.getInputValue()) && getExplanation().equals(other.getExplanation()); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/validation/TestStandardSchemaValidator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/validation/TestStandardSchemaValidator.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/validation/TestStandardSchemaValidator.java new file mode 100644 index 0000000..f323a03 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/validation/TestStandardSchemaValidator.java @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.schema.validation; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.math.BigInteger; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.text.DateFormat; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.TimeZone; + +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.validation.SchemaValidationResult; +import org.apache.nifi.serialization.record.validation.ValidationError; +import org.junit.Test; + +public class TestStandardSchemaValidator { + + @Test + public void testValidateCorrectSimpleTypesStrictValidation() throws ParseException { + final List<RecordField> fields = new ArrayList<>(); + for (final RecordFieldType fieldType : RecordFieldType.values()) { + if (fieldType == RecordFieldType.CHOICE) { + final List<DataType> possibleTypes = new ArrayList<>(); + possibleTypes.add(RecordFieldType.INT.getDataType()); + possibleTypes.add(RecordFieldType.LONG.getDataType()); + + fields.add(new RecordField(fieldType.name().toLowerCase(), fieldType.getChoiceDataType(possibleTypes))); + } else if (fieldType == RecordFieldType.MAP) { + fields.add(new RecordField(fieldType.name().toLowerCase(), fieldType.getMapDataType(RecordFieldType.INT.getDataType()))); + } else { + fields.add(new RecordField(fieldType.name().toLowerCase(), fieldType.getDataType())); + } + } + + final DateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS"); + df.setTimeZone(TimeZone.getTimeZone("gmt")); + final long time = df.parse("2017/01/01 17:00:00.000").getTime(); + + final Map<String, Object> intMap = new LinkedHashMap<>(); + intMap.put("height", 48); + intMap.put("width", 96); + + final RecordSchema schema = new SimpleRecordSchema(fields); + final Map<String, Object> valueMap = new LinkedHashMap<>(); + valueMap.put("string", "string"); + valueMap.put("boolean", true); + valueMap.put("byte", (byte) 1); + valueMap.put("char", 'c'); + valueMap.put("short", (short) 8); + valueMap.put("int", 9); + valueMap.put("bigint", BigInteger.valueOf(8L)); + valueMap.put("long", 8L); + valueMap.put("float", 8.0F); + valueMap.put("double", 8.0D); + valueMap.put("date", new Date(time)); + valueMap.put("time", new Time(time)); + valueMap.put("timestamp", new Timestamp(time)); + valueMap.put("record", null); + valueMap.put("array", null); + valueMap.put("choice", 48L); + valueMap.put("map", intMap); + + final Record record = new MapRecord(schema, valueMap); + + final SchemaValidationContext validationContext = new SchemaValidationContext(schema, false, true); + final StandardSchemaValidator validator = new StandardSchemaValidator(validationContext); + + final SchemaValidationResult result = validator.validate(record); + assertTrue(result.isValid()); + assertNotNull(result.getValidationErrors()); + assertTrue(result.getValidationErrors().isEmpty()); + } + + + @Test + public void testValidateWrongButCoerceableType() throws ParseException { + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("id", RecordFieldType.INT.getDataType())); + final RecordSchema schema = new SimpleRecordSchema(fields); + + final Map<String, Object> valueMap = new LinkedHashMap<>(); + valueMap.put("id", 1); + Record record = new MapRecord(schema, valueMap); + + final SchemaValidationContext strictValidationContext = new SchemaValidationContext(schema, false, true); + final SchemaValidationContext lenientValidationContext = new SchemaValidationContext(schema, false, false); + + // Validate with correct type of int and a strict validation + StandardSchemaValidator validator = new StandardSchemaValidator(strictValidationContext); + SchemaValidationResult result = validator.validate(record); + assertTrue(result.isValid()); + assertNotNull(result.getValidationErrors()); + assertTrue(result.getValidationErrors().isEmpty()); + + // Validate with correct type of int and a lenient validation + validator = new StandardSchemaValidator(lenientValidationContext); + result = validator.validate(record); + assertTrue(result.isValid()); + assertNotNull(result.getValidationErrors()); + assertTrue(result.getValidationErrors().isEmpty()); + + + // Update Map to set value to a String that is coerceable to an int + valueMap.put("id", "1"); + record = new MapRecord(schema, valueMap); + + + // Validate with incorrect type of string and a strict validation + validator = new StandardSchemaValidator(strictValidationContext); + result = validator.validate(record); + assertFalse(result.isValid()); + final Collection<ValidationError> validationErrors = result.getValidationErrors(); + assertEquals(1, validationErrors.size()); + + final ValidationError validationError = validationErrors.iterator().next(); + assertEquals("/id", validationError.getFieldName().get()); + + // Validate with incorrect type of string and a lenient validation + validator = new StandardSchemaValidator(lenientValidationContext); + result = validator.validate(record); + assertTrue(result.isValid()); + assertNotNull(result.getValidationErrors()); + assertTrue(result.getValidationErrors().isEmpty()); + } + + @Test + public void testMissingRequiredField() { + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("id", RecordFieldType.INT.getDataType())); + fields.add(new RecordField("name", RecordFieldType.STRING.getDataType(), false)); + final RecordSchema schema = new SimpleRecordSchema(fields); + + final Map<String, Object> valueMap = new LinkedHashMap<>(); + valueMap.put("id", 1); + final Record record = new MapRecord(schema, valueMap, false, false); + + final SchemaValidationContext allowExtraFieldsContext = new SchemaValidationContext(schema, true, true); + + StandardSchemaValidator validator = new StandardSchemaValidator(allowExtraFieldsContext); + SchemaValidationResult result = validator.validate(record); + assertFalse(result.isValid()); + assertNotNull(result.getValidationErrors()); + + final ValidationError error = result.getValidationErrors().iterator().next(); + assertEquals("/name", error.getFieldName().get()); + } + + @Test + public void testMissingNullableField() { + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("id", RecordFieldType.INT.getDataType())); + fields.add(new RecordField("name", RecordFieldType.STRING.getDataType())); + final RecordSchema schema = new SimpleRecordSchema(fields); + + final Map<String, Object> valueMap = new LinkedHashMap<>(); + valueMap.put("id", 1); + Record record = new MapRecord(schema, valueMap, false, false); + + final SchemaValidationContext allowExtraFieldsContext = new SchemaValidationContext(schema, true, true); + + StandardSchemaValidator validator = new StandardSchemaValidator(allowExtraFieldsContext); + SchemaValidationResult result = validator.validate(record); + assertTrue(result.isValid()); + assertNotNull(result.getValidationErrors()); + assertTrue(result.getValidationErrors().isEmpty()); + } + + @Test + public void testExtraFields() { + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("id", RecordFieldType.INT.getDataType())); + final RecordSchema schema = new SimpleRecordSchema(fields); + + final Map<String, Object> valueMap = new LinkedHashMap<>(); + valueMap.put("id", 1); + valueMap.put("name", "John Doe"); + Record record = new MapRecord(schema, valueMap, false, false); + + final SchemaValidationContext allowExtraFieldsContext = new SchemaValidationContext(schema, true, true); + final SchemaValidationContext forbidExtraFieldsContext = new SchemaValidationContext(schema, false, false); + + StandardSchemaValidator validator = new StandardSchemaValidator(allowExtraFieldsContext); + SchemaValidationResult result = validator.validate(record); + assertTrue(result.isValid()); + assertNotNull(result.getValidationErrors()); + assertTrue(result.getValidationErrors().isEmpty()); + + validator = new StandardSchemaValidator(forbidExtraFieldsContext); + result = validator.validate(record); + assertFalse(result.isValid()); + assertNotNull(result.getValidationErrors()); + final Collection<ValidationError> validationErrors = result.getValidationErrors(); + assertEquals(1, validationErrors.size()); + final ValidationError validationError = validationErrors.iterator().next(); + assertEquals("/name", validationError.getFieldName().get()); + System.out.println(validationError); + } + + + @Test + public void testInvalidEmbeddedField() { + final List<RecordField> accountFields = new ArrayList<>(); + accountFields.add(new RecordField("name", RecordFieldType.STRING.getDataType())); + accountFields.add(new RecordField("balance", RecordFieldType.DOUBLE.getDataType())); + final RecordSchema accountSchema = new SimpleRecordSchema(accountFields); + + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("id", RecordFieldType.INT.getDataType())); + fields.add(new RecordField("account", RecordFieldType.RECORD.getRecordDataType(accountSchema))); + final RecordSchema schema = new SimpleRecordSchema(fields); + + final Map<String, Object> accountValues = new HashMap<>(); + accountValues.put("name", "account-1"); + accountValues.put("balance", "123.45"); + final Record accountRecord = new MapRecord(accountSchema, accountValues); + + final Map<String, Object> valueMap = new LinkedHashMap<>(); + valueMap.put("id", 1); + valueMap.put("account", accountRecord); + Record record = new MapRecord(schema, valueMap, false, false); + + final SchemaValidationContext strictValidationContext = new SchemaValidationContext(schema, false, true); + final SchemaValidationContext lenientValidationContext = new SchemaValidationContext(schema, false, false); + + StandardSchemaValidator validator = new StandardSchemaValidator(strictValidationContext); + SchemaValidationResult result = validator.validate(record); + assertFalse(result.isValid()); + assertNotNull(result.getValidationErrors()); + assertEquals(1, result.getValidationErrors().size()); + final ValidationError validationError = result.getValidationErrors().iterator().next(); + assertEquals("/account/balance", validationError.getFieldName().get()); + + + validator = new StandardSchemaValidator(lenientValidationContext); + result = validator.validate(record); + assertTrue(result.isValid()); + assertNotNull(result.getValidationErrors()); + assertTrue(result.getValidationErrors().isEmpty()); + } + + + @Test + public void testInvalidArrayValue() { + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("id", RecordFieldType.INT.getDataType())); + fields.add(new RecordField("numbers", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.INT.getDataType()))); + final RecordSchema schema = new SimpleRecordSchema(fields); + + final Map<String, Object> valueMap = new LinkedHashMap<>(); + valueMap.put("id", 1); + valueMap.put("numbers", new Object[] {1, "2", "3"}); + Record record = new MapRecord(schema, valueMap, false, false); + + final SchemaValidationContext strictValidationContext = new SchemaValidationContext(schema, false, true); + final SchemaValidationContext lenientValidationContext = new SchemaValidationContext(schema, false, false); + + StandardSchemaValidator validator = new StandardSchemaValidator(strictValidationContext); + SchemaValidationResult result = validator.validate(record); + assertFalse(result.isValid()); + assertNotNull(result.getValidationErrors()); + assertEquals(1, result.getValidationErrors().size()); + final ValidationError validationError = result.getValidationErrors().iterator().next(); + assertEquals("/numbers", validationError.getFieldName().get()); + + validator = new StandardSchemaValidator(lenientValidationContext); + result = validator.validate(record); + assertTrue(result.isValid()); + assertNotNull(result.getValidationErrors()); + assertTrue(result.getValidationErrors().isEmpty()); + } +}
