NIFI-4142: This closes #2015. Refactored Record Reader/Writer to allow for 
reading/writing "raw records". Implemented ValidateRecord. Updated Record 
Reader to take two parameters for nextRecord: (boolean coerceTypes) and 
(boolean dropUnknownFields)

Signed-off-by: joewitt <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/451f9cf1
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/451f9cf1
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/451f9cf1

Branch: refs/heads/master
Commit: 451f9cf12407c4f7abeb8e538e3f55ecbf40abab
Parents: 84935d4
Author: Mark Payne <[email protected]>
Authored: Fri Jun 30 08:32:01 2017 -0400
Committer: joewitt <[email protected]>
Committed: Fri Aug 11 22:01:46 2017 -0700

----------------------------------------------------------------------
 .../serialization/AbstractRecordSetWriter.java  |   4 +
 .../apache/nifi/serialization/RecordReader.java |  27 +-
 .../apache/nifi/serialization/RecordWriter.java |  12 +-
 .../SchemaValidationException.java              |  30 ++
 .../nifi/serialization/record/MapRecord.java    |  90 +++-
 .../serialization/record/RawRecordWriter.java   |  33 ++
 .../nifi/serialization/record/Record.java       |  32 +-
 .../nifi/serialization/record/RecordField.java  |  36 +-
 .../record/TypeMismatchException.java           |  28 --
 .../util/IllegalTypeConversionException.java    |   4 +-
 .../validation/RecordSchemaValidator.java       |  26 ++
 .../validation/SchemaValidationResult.java      |  26 ++
 .../record/validation/ValidationContext.java    |  26 ++
 .../record/validation/ValidationError.java      |  31 ++
 .../record/validation/ValidationErrorType.java  |  41 ++
 .../java/org/apache/nifi/avro/AvroTypeUtil.java |  38 +-
 .../record/CommaSeparatedRecordReader.java      |   2 +-
 .../serialization/record/MockRecordParser.java  |   2 +-
 .../validation/SchemaValidationContext.java     |  44 ++
 .../StandardSchemaValidationResult.java         |  45 ++
 .../validation/StandardSchemaValidator.java     | 263 +++++++++++
 .../validation/StandardValidationError.java     | 129 ++++++
 .../validation/TestStandardSchemaValidator.java | 306 +++++++++++++
 .../kafka/pubsub/util/MockRecordParser.java     |   3 +-
 .../groovy/test_record_reader_inline.groovy     |   2 +-
 .../groovy/test_record_reader_xml.groovy        |   2 +-
 .../nifi-standard-processors/pom.xml            |  29 +-
 .../processors/standard/ValidateRecord.java     | 457 +++++++++++++++++++
 .../org.apache.nifi.processor.Processor         |   1 +
 .../org/apache/nifi/avro/AvroRecordReader.java  |   2 +-
 .../java/org/apache/nifi/csv/CSVReader.java     |  18 +-
 .../org/apache/nifi/csv/CSVRecordReader.java    | 121 +++--
 .../main/java/org/apache/nifi/csv/CSVUtils.java |  16 +-
 .../org/apache/nifi/csv/WriteCSVResult.java     |  74 ++-
 .../java/org/apache/nifi/grok/GrokReader.java   |   7 +-
 .../org/apache/nifi/grok/GrokRecordReader.java  | 113 +++--
 .../nifi/json/AbstractJsonRowRecordReader.java  |  71 ++-
 .../nifi/json/JsonPathRowRecordReader.java      |  79 +++-
 .../nifi/json/JsonTreeRowRecordReader.java      |  83 ++--
 .../org/apache/nifi/json/WriteJsonResult.java   | 114 ++++-
 .../additionalDetails.html                      | 167 ++++++-
 .../avro/TestAvroReaderWithEmbeddedSchema.java  |  12 +-
 .../apache/nifi/csv/TestCSVRecordReader.java    | 172 ++++++-
 .../org/apache/nifi/csv/TestWriteCSVResult.java | 167 +++++++
 .../apache/nifi/grok/TestGrokRecordReader.java  |  16 +-
 .../nifi/json/TestJsonTreeRowRecordReader.java  |  68 +++
 .../apache/nifi/json/TestWriteJsonResult.java   | 158 +++++++
 47 files changed, 2953 insertions(+), 274 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java
index 4de5ce3..1e653c1 100644
--- 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java
@@ -92,6 +92,10 @@ public abstract class AbstractRecordSetWriter implements 
RecordSetWriter {
         return WriteResult.of(recordCount, attributes == null ? 
Collections.emptyMap() : attributes);
     }
 
+    protected int incrementRecordCount() {
+        return ++recordCount;
+    }
+
     /**
      * Method that is called as a result of {@link #beginRecordSet()} being 
called. This gives subclasses
      * the chance to react to a new RecordSet beginning but prevents the 
subclass from changing how this

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordReader.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordReader.java
index add248e..188f032 100644
--- 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordReader.java
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordReader.java
@@ -38,14 +38,37 @@ import org.apache.nifi.serialization.record.RecordSet;
 public interface RecordReader extends Closeable {
 
     /**
-     * Returns the next record in the stream or <code>null</code> if no more 
records are available.
+     * Returns the next record in the stream or <code>null</code> if no more 
records are available. Types will be coerced and any unknown fields will be 
dropped.
      *
      * @return the next record in the stream or <code>null</code> if no more 
records are available.
      *
      * @throws IOException if unable to read from the underlying data
      * @throws MalformedRecordException if an unrecoverable failure occurs 
when trying to parse a record
+     * @throws SchemaValidationException if a Record contains a field that 
violates the schema and cannot be coerced into the appropriate field type.
      */
-    Record nextRecord() throws IOException, MalformedRecordException;
+    default Record nextRecord() throws IOException, MalformedRecordException {
+        return nextRecord(true, true);
+    }
+
+    /**
+     * Reads the next record from the underlying stream. If type coercion is 
enabled, then any field in the Record whose type does not
+     * match the schema will be coerced to the correct type and a 
MalformedRecordException will be thrown if unable to coerce the data into
+     * the correct type. If type coercion is disabled, then no type coercion 
will occur. As a result, calling
+     * {@link 
Record#getValue(org.apache.nifi.serialization.record.RecordField)}
+     * may return any type of Object, such as a String or another Record, even 
though the schema indicates that the field must be an integer.
+     *
+     * @param coerceTypes whether or not fields in the Record should be 
validated against the schema and coerced when necessary
+     * @param dropUnknownFields if <code>true</code>, any field that is found 
in the data that is not present in the schema will be dropped. If 
<code>false</code>,
+     *            those fields will still be part of the Record (though their 
type cannot be coerced, since the schema does not provide a type for it).
+     *
+     * @return the next record in the stream or <code>null</code> if no more 
records are available
+     * @throws IOException if unable to read from the underlying data
+     * @throws MalformedRecordException if an unrecoverable failure occurs 
when trying to parse a record, or a Record contains a field
+     *             that violates the schema and cannot be coerced into the 
appropriate field type.
+     * @throws SchemaValidationException if a Record contains a field that 
violates the schema and cannot be coerced into the appropriate
+     *             field type and schema enforcement is enabled
+     */
+    Record nextRecord(boolean coerceTypes, boolean dropUnknownFields) throws 
IOException, MalformedRecordException;
 
     /**
      * @return a RecordSchema that is appropriate for the records in the stream

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java
index 6c21a39..3e1c4ab 100644
--- 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java
@@ -24,24 +24,24 @@ import org.apache.nifi.serialization.record.Record;
 
 public interface RecordWriter extends Closeable {
     /**
-     * Writes the given result set to the given output stream
+     * Writes the given record to the underlying stream
      *
-     * @param record the record set to serialize
+     * @param record the record to write
      * @return the results of writing the data
-     * @throws IOException if unable to write to the given OutputStream
+     * @throws IOException if unable to write to the underlying stream
      */
     WriteResult write(Record record) throws IOException;
 
     /**
-     * @return the MIME Type that the Result Set Writer produces. This will be 
added to FlowFiles using
+     * @return the MIME Type that the Record Writer produces. This will be 
added to FlowFiles using
      *         the mime.type attribute.
      */
     String getMimeType();
 
     /**
-     * Flushes any buffered data to the underlying storage mechanism
+     * Flushes any buffered data to the underlying stream
      *
-     * @throws IOException if unable to write to the underlying storage 
mechanism
+     * @throws IOException if unable to write to the underlying stream
      */
     void flush() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SchemaValidationException.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SchemaValidationException.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SchemaValidationException.java
new file mode 100644
index 0000000..796cc8f
--- /dev/null
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SchemaValidationException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization;
+
+public class SchemaValidationException extends RuntimeException {
+
+    public SchemaValidationException(final String message) {
+        super(message);
+    }
+
+    public SchemaValidationException(final String message, final Throwable 
cause) {
+        super(message, cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
index ca33e32..c3444ed 100644
--- 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
@@ -17,12 +17,16 @@
 
 package org.apache.nifi.serialization.record;
 
+import java.text.DateFormat;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
+import java.util.function.Supplier;
 
+import org.apache.nifi.serialization.SchemaValidationException;
 import org.apache.nifi.serialization.record.type.ArrayDataType;
 import org.apache.nifi.serialization.record.type.MapDataType;
 import org.apache.nifi.serialization.record.util.DataTypeUtils;
@@ -32,17 +36,67 @@ public class MapRecord implements Record {
     private RecordSchema schema;
     private final Map<String, Object> values;
     private Optional<SerializedForm> serializedForm;
+    private final boolean checkTypes;
+    private final boolean dropUnknownFields;
+
 
     public MapRecord(final RecordSchema schema, final Map<String, Object> 
values) {
+        this(schema, values, false, false);
+    }
+
+    public MapRecord(final RecordSchema schema, final Map<String, Object> 
values, final boolean checkTypes, final boolean dropUnknownFields) {
+        Objects.requireNonNull(values);
+
         this.schema = Objects.requireNonNull(schema);
-        this.values = Objects.requireNonNull(values);
+        this.values = checkTypes ? checkTypes(values, schema) : values;
         this.serializedForm = Optional.empty();
+        this.checkTypes = checkTypes;
+        this.dropUnknownFields = dropUnknownFields;
     }
 
     public MapRecord(final RecordSchema schema, final Map<String, Object> 
values, final SerializedForm serializedForm) {
+        this(schema, values, serializedForm, false, false);
+    }
+
+    public MapRecord(final RecordSchema schema, final Map<String, Object> 
values, final SerializedForm serializedForm, final boolean checkTypes, final 
boolean dropUnknownFields) {
+        Objects.requireNonNull(values);
+
         this.schema = Objects.requireNonNull(schema);
-        this.values = Objects.requireNonNull(values);
+        this.values = checkTypes ? checkTypes(values, schema) : values;
         this.serializedForm = Optional.ofNullable(serializedForm);
+        this.checkTypes = checkTypes;
+        this.dropUnknownFields = dropUnknownFields;
+    }
+
+    private Map<String, Object> checkTypes(final Map<String, Object> values, 
final RecordSchema schema) {
+        for (final RecordField field : schema.getFields()) {
+            final Object value = getExplicitValue(field, values);
+
+            if (value == null) {
+                if (field.isNullable()) {
+                    continue;
+                }
+
+                throw new SchemaValidationException("Field " + 
field.getFieldName() + " cannot be null");
+            }
+
+            if (!DataTypeUtils.isCompatibleDataType(value, 
field.getDataType())) {
+                throw new SchemaValidationException("Field " + 
field.getFieldName() + " has a value of " + value
+                    + ", which cannot be coerced into the appropriate data 
type of " + field.getDataType());
+            }
+        }
+
+        return values;
+    }
+
+    @Override
+    public boolean isDropUnknownFields() {
+        return dropUnknownFields;
+    }
+
+    @Override
+    public boolean isTypeChecked() {
+        return checkTypes;
     }
 
     @Override
@@ -67,7 +121,11 @@ public class MapRecord implements Record {
             return getValue(fieldOption.get());
         }
 
-        return null;
+        if (dropUnknownFields) {
+            return null;
+        }
+
+        return this.values.get(fieldName);
     }
 
     @Override
@@ -115,6 +173,10 @@ public class MapRecord implements Record {
     }
 
     private Object getExplicitValue(final RecordField field) {
+        return getExplicitValue(field, this.values);
+    }
+
+    private Object getExplicitValue(final RecordField field, final Map<String, 
Object> values) {
         final String canonicalFieldName = field.getFieldName();
 
         // We use containsKey here instead of just calling get() and checking 
for a null value
@@ -139,11 +201,11 @@ public class MapRecord implements Record {
     @Override
     public String getAsString(final String fieldName) {
         final Optional<DataType> dataTypeOption = 
schema.getDataType(fieldName);
-        if (!dataTypeOption.isPresent()) {
-            return null;
+        if (dataTypeOption.isPresent()) {
+            return convertToString(getValue(fieldName), 
dataTypeOption.get().getFormat());
         }
 
-        return convertToString(getValue(fieldName), 
dataTypeOption.get().getFormat());
+        return DataTypeUtils.toString(getValue(fieldName), 
(Supplier<DateFormat>) null);
     }
 
     @Override
@@ -239,11 +301,20 @@ public class MapRecord implements Record {
     public void setValue(final String fieldName, final Object value) {
         final Optional<RecordField> field = getSchema().getField(fieldName);
         if (!field.isPresent()) {
+            if (dropUnknownFields) {
+                return;
+            }
+
+            final Object previousValue = values.put(fieldName, value);
+            if (!Objects.equals(value, previousValue)) {
+                serializedForm = Optional.empty();
+            }
+
             return;
         }
 
         final RecordField recordField = field.get();
-        final Object coerced = DataTypeUtils.convertType(value, 
recordField.getDataType(), fieldName);
+        final Object coerced = isTypeChecked() ? 
DataTypeUtils.convertType(value, recordField.getDataType(), fieldName) : value;
         final Object previousValue = values.put(recordField.getFieldName(), 
coerced);
         if (!Objects.equals(coerced, previousValue)) {
             serializedForm = Optional.empty();
@@ -327,4 +398,9 @@ public class MapRecord implements Record {
     public void incorporateSchema(RecordSchema other) {
         this.schema = DataTypeUtils.merge(this.schema, other);
     }
+
+    @Override
+    public Set<String> getRawFieldNames() {
+        return values.keySet();
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RawRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RawRecordWriter.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RawRecordWriter.java
new file mode 100644
index 0000000..b62c50e
--- /dev/null
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RawRecordWriter.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record;
+
+import java.io.IOException;
+
+import org.apache.nifi.serialization.WriteResult;
+
+public interface RawRecordWriter {
+    /**
+     * Writes the given result set to the underlying stream
+     *
+     * @param record the record to write
+     * @return the results of writing the data
+     * @throws IOException if unable to write to the underlying stream
+     */
+    WriteResult writeRawRecord(Record record) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/Record.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/Record.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/Record.java
index 822352d..1a89225 100644
--- 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/Record.java
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/Record.java
@@ -19,6 +19,7 @@ package org.apache.nifi.serialization.record;
 
 import java.util.Date;
 import java.util.Optional;
+import java.util.Set;
 
 import 
org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
 
@@ -27,6 +28,24 @@ public interface Record {
     RecordSchema getSchema();
 
     /**
+     * Indicates whether or not field values for this record are expected to 
be coerced into the type designated by the schema.
+     * If <code>true</code>, then it is safe to assume that calling {@link 
#getValue(RecordField)} will return an Object of the appropriate
+     * type according to the schema, or an object that can be coerced into the 
appropriate type. If type checking
+     * is not enabled, then calling {@link #getValue(RecordField)} can return 
an object of any type.
+     *
+     * @return <code>true</code> if type checking is enabled, 
<code>false</code> otherwise.
+     */
+    boolean isTypeChecked();
+
+    /**
+     * If <code>true</code>, any field that is added to the record will be 
drop unless the field is known by the schema
+     *
+     * @return <code>true</code> if fields that are unknown to the schema will 
be dropped, <code>false</code>
+     *         if all field values are retained.
+     */
+    boolean isDropUnknownFields();
+
+    /**
      * Updates the Record's schema to to incorporate all of the fields in the 
given schema. If both schemas have a
      * field with the same name but a different type, then the existing schema 
will be updated to have a
      * {@link RecordFieldType#CHOICE} field with both types as choices. If two 
fields have the same name but different
@@ -45,7 +64,9 @@ public interface Record {
 
     /**
      * <p>
-     * Returns a view of the the values of the fields in this Record.
+     * Returns a view of the the values of the fields in this Record. Note 
that this method returns values only for
+     * those entries in the Record's schema. This allows the Record to 
guarantee that it will return the values in
+     * the order dictated by the schema.
      * </p>
      *
      * <b>NOTE:</b> The array that is returned may be an underlying array that 
is backing
@@ -134,4 +155,13 @@ public interface Record {
      *             name is not a Map
      */
     void setMapValue(String fieldName, String mapKey, Object value);
+
+    /**
+     * Returns a Set that contains the names of all of the fields that are 
present in the Record, regardless of
+     * whether or not those fields are contained in the schema. To determine 
which fields exist in the Schema, use
+     * {@link #getSchema()}.{@link RecordSchema#getFieldNames() 
getFieldNames()} instead.
+     *
+     * @return a Set that contains the names of all of the fields that are 
present in the Record
+     */
+    Set<String> getRawFieldNames();
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordField.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordField.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordField.java
index c7cd290..41da6be 100644
--- 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordField.java
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordField.java
@@ -24,24 +24,43 @@ import java.util.Objects;
 import java.util.Set;
 
 public class RecordField {
+    private static final boolean DEFAULT_NULLABLE = true;
+
     private final String fieldName;
     private final DataType dataType;
     private final Set<String> aliases;
     private final Object defaultValue;
+    private final boolean nullable;
 
     public RecordField(final String fieldName, final DataType dataType) {
-        this(fieldName, dataType, null, Collections.emptySet());
+        this(fieldName, dataType, null, Collections.emptySet(), 
DEFAULT_NULLABLE);
+    }
+
+    public RecordField(final String fieldName, final DataType dataType, final 
boolean nullable) {
+        this(fieldName, dataType, null, Collections.emptySet(), nullable);
     }
 
     public RecordField(final String fieldName, final DataType dataType, final 
Object defaultValue) {
-        this(fieldName, dataType, defaultValue, Collections.emptySet());
+        this(fieldName, dataType, defaultValue, Collections.emptySet(), 
DEFAULT_NULLABLE);
+    }
+
+    public RecordField(final String fieldName, final DataType dataType, final 
Object defaultValue, final boolean nullable) {
+        this(fieldName, dataType, defaultValue, Collections.emptySet(), 
nullable);
     }
 
     public RecordField(final String fieldName, final DataType dataType, final 
Set<String> aliases) {
-        this(fieldName, dataType, null, aliases);
+        this(fieldName, dataType, null, aliases, DEFAULT_NULLABLE);
+    }
+
+    public RecordField(final String fieldName, final DataType dataType, final 
Set<String> aliases, final boolean nullable) {
+        this(fieldName, dataType, null, aliases, nullable);
     }
 
     public RecordField(final String fieldName, final DataType dataType, final 
Object defaultValue, final Set<String> aliases) {
+        this(fieldName, dataType, defaultValue, aliases, DEFAULT_NULLABLE);
+    }
+
+    public RecordField(final String fieldName, final DataType dataType, final 
Object defaultValue, final Set<String> aliases, final boolean nullable) {
         if (defaultValue != null && 
!DataTypeUtils.isCompatibleDataType(defaultValue, dataType)) {
             throw new IllegalArgumentException("Cannot set the default value 
for field [" + fieldName + "] to [" + defaultValue
                 + "] because that is not a valid value for Data Type [" + 
dataType + "]");
@@ -51,6 +70,7 @@ public class RecordField {
         this.dataType = Objects.requireNonNull(dataType);
         this.aliases = 
Collections.unmodifiableSet(Objects.requireNonNull(aliases));
         this.defaultValue = defaultValue;
+        this.nullable = nullable;
     }
 
     public String getFieldName() {
@@ -69,6 +89,10 @@ public class RecordField {
         return defaultValue;
     }
 
+    public boolean isNullable() {
+        return nullable;
+    }
+
     @Override
     public int hashCode() {
         final int prime = 31;
@@ -77,6 +101,7 @@ public class RecordField {
         result = prime * result + fieldName.hashCode();
         result = prime * result + aliases.hashCode();
         result = prime * result + ((defaultValue == null) ? 0 : 
defaultValue.hashCode());
+        result = prime * result + Boolean.hashCode(nullable);
         return result;
     }
 
@@ -94,11 +119,12 @@ public class RecordField {
         }
 
         RecordField other = (RecordField) obj;
-        return dataType.equals(other.getDataType()) && 
fieldName.equals(other.getFieldName()) && aliases.equals(other.getAliases()) && 
Objects.equals(defaultValue, other.defaultValue);
+        return dataType.equals(other.getDataType()) && 
fieldName.equals(other.getFieldName()) && aliases.equals(other.getAliases()) && 
Objects.equals(defaultValue, other.defaultValue)
+            && nullable == other.nullable;
     }
 
     @Override
     public String toString() {
-        return "RecordField[name=" + fieldName + ", dataType=" + dataType + 
(aliases.isEmpty() ? "" : ", aliases=" + aliases) + "]";
+        return "RecordField[name=" + fieldName + ", dataType=" + dataType + 
(aliases.isEmpty() ? "" : ", aliases=" + aliases) + ", nullable=" + nullable + 
"]";
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/TypeMismatchException.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/TypeMismatchException.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/TypeMismatchException.java
deleted file mode 100644
index af5f909..0000000
--- 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/TypeMismatchException.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.serialization.record;
-
-public class TypeMismatchException extends RuntimeException {
-    public TypeMismatchException(String message) {
-        super(message);
-    }
-
-    public TypeMismatchException(String message, Throwable cause) {
-        super(message, cause);
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/IllegalTypeConversionException.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/IllegalTypeConversionException.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/IllegalTypeConversionException.java
index 38b5d20..32c5ea5 100644
--- 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/IllegalTypeConversionException.java
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/IllegalTypeConversionException.java
@@ -17,7 +17,9 @@
 
 package org.apache.nifi.serialization.record.util;
 
-public class IllegalTypeConversionException extends RuntimeException {
+import org.apache.nifi.serialization.SchemaValidationException;
+
+public class IllegalTypeConversionException extends SchemaValidationException {
 
     public IllegalTypeConversionException(final String message) {
         super(message);

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/validation/RecordSchemaValidator.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/validation/RecordSchemaValidator.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/validation/RecordSchemaValidator.java
new file mode 100644
index 0000000..f7dd880
--- /dev/null
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/validation/RecordSchemaValidator.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record.validation;
+
+import org.apache.nifi.serialization.record.Record;
+
+public interface RecordSchemaValidator {
+
+    SchemaValidationResult validate(Record record);
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/validation/SchemaValidationResult.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/validation/SchemaValidationResult.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/validation/SchemaValidationResult.java
new file mode 100644
index 0000000..423120a
--- /dev/null
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/validation/SchemaValidationResult.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record.validation;
+
+import java.util.Collection;
+
+public interface SchemaValidationResult {
+    boolean isValid();
+
+    Collection<ValidationError> getValidationErrors();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/validation/ValidationContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/validation/ValidationContext.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/validation/ValidationContext.java
new file mode 100644
index 0000000..441dbb2
--- /dev/null
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/validation/ValidationContext.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record.validation;
+
+public interface ValidationContext {
+
+    boolean isExtraFieldAllowed();
+
+    boolean isTypeCoercionAllowed();
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/validation/ValidationError.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/validation/ValidationError.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/validation/ValidationError.java
new file mode 100644
index 0000000..6f65bc5
--- /dev/null
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/validation/ValidationError.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record.validation;
+
+import java.util.Optional;
+
+public interface ValidationError {
+
+    Optional<String> getFieldName();
+
+    Optional<Object> getInputValue();
+
+    String getExplanation();
+
+    ValidationErrorType getType();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/validation/ValidationErrorType.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/validation/ValidationErrorType.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/validation/ValidationErrorType.java
new file mode 100644
index 0000000..3f014eb
--- /dev/null
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/validation/ValidationErrorType.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record.validation;
+
+public enum ValidationErrorType {
+    /**
+     * A required field (i.e., a field that is not 'nullable') exists in the 
schema, but the record had no value for this field
+     * or the value for this field was <code>null</code>.
+     */
+    MISSING_FIELD,
+
+    /**
+     * The record had a field that was not valid according to the schema.
+     */
+    EXTRA_FIELD,
+
+    /**
+     * The record had a value for a field, but the value was not valid 
according to the schema.
+     */
+    INVALID_FIELD,
+
+    /**
+     * Some other sort of validation error occurred.
+     */
+    OTHER;
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
index a9f051e..abc381f 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
@@ -75,6 +75,7 @@ public class AvroTypeUtil {
     private static final String LOGICAL_TYPE_TIMESTAMP_MICROS = 
"timestamp-micros";
     private static final String LOGICAL_TYPE_DECIMAL = "decimal";
 
+
     public static Schema extractAvroSchema(final RecordSchema recordSchema) {
         if (recordSchema == null) {
             throw new IllegalArgumentException("RecordSchema cannot be null");
@@ -110,7 +111,7 @@ public class AvroTypeUtil {
     }
 
     private static Field buildAvroField(final RecordField recordField) {
-        final Schema schema = buildAvroSchema(recordField.getDataType(), 
recordField.getFieldName());
+        final Schema schema = buildAvroSchema(recordField.getDataType(), 
recordField.getFieldName(), recordField.isNullable());
         final Field field = new Field(recordField.getFieldName(), schema, 
null, recordField.getDefaultValue());
         for (final String alias : recordField.getAliases()) {
             field.addAlias(alias);
@@ -119,7 +120,7 @@ public class AvroTypeUtil {
         return field;
     }
 
-    private static Schema buildAvroSchema(final DataType dataType, final 
String fieldName) {
+    private static Schema buildAvroSchema(final DataType dataType, final 
String fieldName, final boolean nullable) {
         final Schema schema;
 
         switch (dataType.getFieldType()) {
@@ -129,7 +130,7 @@ public class AvroTypeUtil {
                 if 
(RecordFieldType.BYTE.equals(elementDataType.getFieldType())) {
                     schema = Schema.create(Type.BYTES);
                 } else {
-                    final Schema elementType = 
buildAvroSchema(elementDataType, fieldName);
+                    final Schema elementType = 
buildAvroSchema(elementDataType, fieldName, false);
                     schema = Schema.createArray(elementType);
                 }
                 break;
@@ -151,7 +152,7 @@ public class AvroTypeUtil {
 
                 final List<Schema> unionTypes = new 
ArrayList<>(options.size());
                 for (final DataType option : options) {
-                    unionTypes.add(buildAvroSchema(option, fieldName));
+                    unionTypes.add(buildAvroSchema(option, fieldName, false));
                 }
 
                 schema = Schema.createUnion(unionTypes);
@@ -173,7 +174,7 @@ public class AvroTypeUtil {
                 schema = Schema.create(Type.LONG);
                 break;
             case MAP:
-                schema = Schema.createMap(buildAvroSchema(((MapDataType) 
dataType).getValueType(), fieldName));
+                schema = Schema.createMap(buildAvroSchema(((MapDataType) 
dataType).getValueType(), fieldName, false));
                 break;
             case RECORD:
                 final RecordDataType recordDataType = (RecordDataType) 
dataType;
@@ -204,7 +205,11 @@ public class AvroTypeUtil {
                 return null;
         }
 
-        return nullable(schema);
+        if (nullable) {
+            return nullable(schema);
+        } else {
+            return schema;
+        }
     }
 
     private static Schema nullable(final Schema schema) {
@@ -362,12 +367,14 @@ public class AvroTypeUtil {
         final List<RecordField> recordFields = new 
ArrayList<>(avroSchema.getFields().size());
         for (final Field field : avroSchema.getFields()) {
             final String fieldName = field.name();
-            final DataType dataType = 
AvroTypeUtil.determineDataType(field.schema(), knownRecords);
+            final Schema fieldSchema = field.schema();
+            final DataType dataType = 
AvroTypeUtil.determineDataType(fieldSchema, knownRecords);
+            final boolean nullable = isNullable(fieldSchema);
 
             if (field.defaultVal() == JsonProperties.NULL_VALUE) {
-               recordFields.add(new RecordField(fieldName, dataType, 
field.aliases()));
+                recordFields.add(new RecordField(fieldName, dataType, 
field.aliases(), nullable));
             } else {
-               recordFields.add(new RecordField(fieldName, dataType, 
field.defaultVal(), field.aliases()));
+                recordFields.add(new RecordField(fieldName, dataType, 
field.defaultVal(), field.aliases(), nullable));
             }
         }
 
@@ -375,6 +382,19 @@ public class AvroTypeUtil {
         return recordSchema;
     }
 
+    public static boolean isNullable(final Schema schema) {
+        final Type schemaType = schema.getType();
+        if (schemaType == Type.UNION) {
+            for (final Schema unionSchema : schema.getTypes()) {
+                if (isNullable(unionSchema)) {
+                    return true;
+                }
+            }
+        }
+
+        return schemaType == Type.NULL;
+    }
+
     public static Object[] convertByteArray(final byte[] bytes) {
         final Object[] array = new Object[bytes.length];
         for (int i = 0; i < bytes.length; i++) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/CommaSeparatedRecordReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/CommaSeparatedRecordReader.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/CommaSeparatedRecordReader.java
index 8973055..1c22687 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/CommaSeparatedRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/CommaSeparatedRecordReader.java
@@ -70,7 +70,7 @@ public class CommaSeparatedRecordReader extends 
AbstractControllerService implem
             }
 
             @Override
-            public Record nextRecord() throws IOException, 
MalformedRecordException {
+            public Record nextRecord(final boolean coerceTypes, final boolean 
dropUnknown) throws IOException, MalformedRecordException {
                 if (failAfterN > -1 && recordCount >= failAfterN) {
                     throw new MalformedRecordException("Intentional Unit Test 
Exception because " + recordCount + " records have been read");
                 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java
index 251eb46..b6606a8 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java
@@ -76,7 +76,7 @@ public class MockRecordParser extends 
AbstractControllerService implements Recor
             }
 
             @Override
-            public Record nextRecord() throws IOException, 
MalformedRecordException {
+            public Record nextRecord(final boolean coerceTypes, final boolean 
dropUnknown) throws IOException, MalformedRecordException {
                 if (failAfterN >= recordCount) {
                     throw new MalformedRecordException("Intentional Unit Test 
Exception because " + recordCount + " records have been read");
                 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/validation/SchemaValidationContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/validation/SchemaValidationContext.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/validation/SchemaValidationContext.java
new file mode 100644
index 0000000..53711de
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/validation/SchemaValidationContext.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.schema.validation;
+
+import org.apache.nifi.serialization.record.RecordSchema;
+
+public class SchemaValidationContext {
+    private final RecordSchema schema;
+    private final boolean allowExtraFields;
+    private final boolean strictTypeChecking;
+
+    public SchemaValidationContext(final RecordSchema schema, final boolean 
allowExtraFields, final boolean strictTypeChecking) {
+        this.schema = schema;
+        this.allowExtraFields = allowExtraFields;
+        this.strictTypeChecking = strictTypeChecking;
+    }
+
+    public RecordSchema getSchema() {
+        return schema;
+    }
+
+    public boolean isExtraFieldAllowed() {
+        return allowExtraFields;
+    }
+
+    public boolean isStrictTypeChecking() {
+        return strictTypeChecking;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/validation/StandardSchemaValidationResult.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/validation/StandardSchemaValidationResult.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/validation/StandardSchemaValidationResult.java
new file mode 100644
index 0000000..f304c9f
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/validation/StandardSchemaValidationResult.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.schema.validation;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.nifi.serialization.record.validation.SchemaValidationResult;
+import org.apache.nifi.serialization.record.validation.ValidationError;
+
+public class StandardSchemaValidationResult implements SchemaValidationResult {
+
+    private final List<ValidationError> validationErrors = new ArrayList<>();
+
+    @Override
+    public boolean isValid() {
+        return validationErrors.isEmpty();
+    }
+
+    @Override
+    public Collection<ValidationError> getValidationErrors() {
+        return Collections.unmodifiableList(validationErrors);
+    }
+
+    public void addValidationError(final ValidationError validationError) {
+        this.validationErrors.add(validationError);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/validation/StandardSchemaValidator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/validation/StandardSchemaValidator.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/validation/StandardSchemaValidator.java
new file mode 100644
index 0000000..d467962
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/validation/StandardSchemaValidator.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.schema.validation;
+
+import java.math.BigInteger;
+import java.util.Map;
+
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.apache.nifi.serialization.record.type.MapDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.apache.nifi.serialization.record.validation.RecordSchemaValidator;
+import org.apache.nifi.serialization.record.validation.SchemaValidationResult;
+import org.apache.nifi.serialization.record.validation.ValidationError;
+import org.apache.nifi.serialization.record.validation.ValidationErrorType;
+
+public class StandardSchemaValidator implements RecordSchemaValidator {
+    private final SchemaValidationContext validationContext;
+
+    public StandardSchemaValidator(final SchemaValidationContext 
validationContext) {
+        this.validationContext = validationContext;
+    }
+
+    @Override
+    public SchemaValidationResult validate(final Record record) {
+        return validate(record, validationContext.getSchema(), "");
+    }
+
+    private SchemaValidationResult validate(final Record record, final 
RecordSchema schema, final String fieldPrefix) {
+        // Ensure that for every field in the schema, the type is correct (if 
we care) and that
+        // a value is present (unless it is nullable).
+        final StandardSchemaValidationResult result = new 
StandardSchemaValidationResult();
+
+        for (final RecordField field : schema.getFields()) {
+            final Object rawValue = record.getValue(field);
+
+            // If there is no value, then it is always valid unless the field 
is required.
+            if (rawValue == null) {
+                if (!field.isNullable() && field.getDefaultValue() == null) {
+                    result.addValidationError(new 
StandardValidationError(concat(fieldPrefix, field), 
ValidationErrorType.MISSING_FIELD, "Field is required"));
+                }
+
+                continue;
+            }
+
+            // Check that the type is correct.
+            final DataType dataType = field.getDataType();
+            if (validationContext.isStrictTypeChecking()) {
+                if (!isTypeCorrect(rawValue, dataType)) {
+                    result.addValidationError(new 
StandardValidationError(concat(fieldPrefix, field), rawValue, 
ValidationErrorType.INVALID_FIELD,
+                        "Value is of type " + rawValue.getClass().getName() + 
" but was expected to be of type " + dataType));
+
+                    continue;
+                }
+            } else {
+                // Use a lenient type check. This will be true if, for 
instance, a value is the String "123" and should be an integer
+                // but will be false if the value is "123" and should be an 
Array or Record.
+                if (!DataTypeUtils.isCompatibleDataType(rawValue, dataType)) {
+                    result.addValidationError(new 
StandardValidationError(concat(fieldPrefix, field), rawValue, 
ValidationErrorType.INVALID_FIELD,
+                        "Value is of type " + rawValue.getClass().getName() + 
" but was expected to be of type " + dataType));
+
+                    continue;
+                }
+            }
+
+            // If the field type is RECORD, or if the field type is a CHOICE 
that allows for a RECORD and the value is a RECORD, then we
+            // need to dig into each of the sub-fields. To do this, we first 
need to determine the 'canonical data type'.
+            final DataType canonicalDataType = getCanonicalDataType(dataType, 
rawValue, result, fieldPrefix, field);
+            if (canonicalDataType == null) {
+                continue;
+            }
+
+            // Now that we have the 'canonical data type', we check if it is a 
Record. If so, we need to validate each sub-field.
+            verifyComplexType(dataType, rawValue, result, fieldPrefix, field);
+        }
+
+        if (!validationContext.isExtraFieldAllowed()) {
+            for (final String fieldName : record.getRawFieldNames()) {
+                if (!schema.getDataType(fieldName).isPresent()) {
+                    result.addValidationError(new 
StandardValidationError(fieldPrefix + "/" + fieldName, 
ValidationErrorType.EXTRA_FIELD, "Field is not present in the schema"));
+                }
+            }
+        }
+
+        return result;
+    }
+
+    private void verifyComplexType(final DataType dataType, final Object 
rawValue, final StandardSchemaValidationResult result, final String 
fieldPrefix, final RecordField field) {
+        // If the field type is RECORD, or if the field type is a CHOICE that 
allows for a RECORD and the value is a RECORD, then we
+        // need to dig into each of the sub-fields. To do this, we first need 
to determine the 'canonical data type'.
+        final DataType canonicalDataType = getCanonicalDataType(dataType, 
rawValue, result, fieldPrefix, field);
+        if (canonicalDataType == null) {
+            return;
+        }
+
+        // Now that we have the 'canonical data type', we check if it is a 
Record. If so, we need to validate each sub-field.
+        if (canonicalDataType.getFieldType() == RecordFieldType.RECORD) {
+            verifyChildRecord(canonicalDataType, rawValue, dataType, result, 
field, fieldPrefix);
+        }
+
+        if (canonicalDataType.getFieldType() == RecordFieldType.ARRAY) {
+            final ArrayDataType arrayDataType = (ArrayDataType) 
canonicalDataType;
+            final DataType elementType = arrayDataType.getElementType();
+            final Object[] arrayObject = (Object[]) rawValue;
+
+            int i=0;
+            for (final Object arrayValue : arrayObject) {
+                verifyComplexType(elementType, arrayValue, result, fieldPrefix 
+ "[" + i + "]", field);
+                i++;
+            }
+        }
+    }
+
+    private DataType getCanonicalDataType(final DataType dataType, final 
Object rawValue, final StandardSchemaValidationResult result, final String 
fieldPrefix, final RecordField field) {
+        final RecordFieldType fieldType = dataType.getFieldType();
+        final DataType canonicalDataType;
+        if (fieldType == RecordFieldType.CHOICE) {
+            canonicalDataType = DataTypeUtils.chooseDataType(rawValue, 
(ChoiceDataType) dataType);
+
+            if (canonicalDataType == null) {
+                result.addValidationError(new 
StandardValidationError(concat(fieldPrefix, field), rawValue, 
ValidationErrorType.INVALID_FIELD,
+                    "Value is of type " + rawValue.getClass().getName() + " 
but was expected to be of type " + dataType));
+
+                return null;
+            }
+        } else {
+            canonicalDataType = dataType;
+        }
+
+        return canonicalDataType;
+    }
+
+    private void verifyChildRecord(final DataType canonicalDataType, final 
Object rawValue, final DataType expectedDataType, final 
StandardSchemaValidationResult result,
+        final RecordField field, final String fieldPrefix) {
+        // Now that we have the 'canonical data type', we check if it is a 
Record. If so, we need to validate each sub-field.
+        if (canonicalDataType.getFieldType() == RecordFieldType.RECORD) {
+            if (!(rawValue instanceof Record)) { // sanity check
+                result.addValidationError(new 
StandardValidationError(concat(fieldPrefix, field), rawValue, 
ValidationErrorType.INVALID_FIELD,
+                    "Value is of type " + rawValue.getClass().getName() + " 
but was expected to be of type " + expectedDataType));
+
+                return;
+            }
+
+            final RecordDataType recordDataType = (RecordDataType) 
canonicalDataType;
+            final RecordSchema childSchema = recordDataType.getChildSchema();
+
+            final String fullChildFieldName = concat(fieldPrefix, field);
+            final SchemaValidationResult childValidationResult = 
validate((Record) rawValue, childSchema, fullChildFieldName);
+            if (childValidationResult.isValid()) {
+                return;
+            }
+
+            for (final ValidationError validationError : 
childValidationResult.getValidationErrors()) {
+                result.addValidationError(validationError);
+            }
+        }
+    }
+
+    private boolean isTypeCorrect(final Object value, final DataType dataType) 
{
+        switch (dataType.getFieldType()) {
+            case ARRAY:
+                if (!(value instanceof Object[])) {
+                    return false;
+                }
+
+                final ArrayDataType arrayDataType = (ArrayDataType) dataType;
+                final DataType elementType = arrayDataType.getElementType();
+
+                final Object[] array = (Object[]) value;
+                for (final Object arrayVal : array) {
+                    if (!isTypeCorrect(arrayVal, elementType)) {
+                        return false;
+                    }
+                }
+
+                return true;
+            case MAP:
+                if (!(value instanceof Map)) {
+                    return false;
+                }
+
+                final MapDataType mapDataType = (MapDataType) dataType;
+                final DataType valueDataType = mapDataType.getValueType();
+                final Map<?, ?> map = (Map<?, ?>) value;
+
+                for (final Object mapValue : map.values()) {
+                    if (!isTypeCorrect(mapValue, valueDataType)) {
+                        return false;
+                    }
+                }
+
+                return true;
+            case RECORD:
+                return value instanceof Record;
+            case CHOICE:
+                final ChoiceDataType choiceDataType = (ChoiceDataType) 
dataType;
+                for (final DataType choice : 
choiceDataType.getPossibleSubTypes()) {
+                    if (isTypeCorrect(value, choice)) {
+                        return true;
+                    }
+                }
+
+                return false;
+            case BIGINT:
+                return value instanceof BigInteger;
+            case BOOLEAN:
+                return value instanceof Boolean;
+            case BYTE:
+                return value instanceof Byte;
+            case CHAR:
+                return value instanceof Character;
+            case DATE:
+                return value instanceof java.sql.Date;
+            case DOUBLE:
+                return value instanceof Double;
+            case FLOAT:
+                // Some readers do not provide float vs. double.
+                // We should consider if it makes sense to allow either a 
Float or a Double here or have
+                // a Reader indicate whether or not it supports higher 
precision, etc.
+                // Same goes for Short/Integer
+                return value instanceof Float;
+            case INT:
+                return value instanceof Integer;
+            case LONG:
+                return value instanceof Long;
+            case SHORT:
+                return value instanceof Short;
+            case STRING:
+                return value instanceof String;
+            case TIME:
+                return value instanceof java.sql.Time;
+            case TIMESTAMP:
+                return value instanceof java.sql.Timestamp;
+        }
+
+        return false;
+    }
+
+    private String concat(final String fieldPrefix, final RecordField field) {
+        return fieldPrefix + "/" + field.getFieldName();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/validation/StandardValidationError.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/validation/StandardValidationError.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/validation/StandardValidationError.java
new file mode 100644
index 0000000..fdd44d4
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/validation/StandardValidationError.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.schema.validation;
+
+import java.util.Objects;
+import java.util.Optional;
+
+import org.apache.nifi.serialization.record.validation.ValidationError;
+import org.apache.nifi.serialization.record.validation.ValidationErrorType;
+
+public class StandardValidationError implements ValidationError {
+    private final Optional<String> fieldName;
+    private final Optional<Object> inputValue;
+    private final String explanation;
+    private final ValidationErrorType type;
+
+
+    public StandardValidationError(final String fieldName, final Object value, 
final ValidationErrorType type, final String explanation) {
+        this.fieldName = Optional.ofNullable(fieldName);
+        this.inputValue = Optional.ofNullable(value);
+        this.type = type;
+        this.explanation = explanation;
+    }
+
+    public StandardValidationError(final String fieldName, final 
ValidationErrorType type, final String explanation) {
+        this.fieldName = Optional.ofNullable(fieldName);
+        this.inputValue = Optional.empty();
+        this.type = type;
+        this.explanation = Objects.requireNonNull(explanation);
+    }
+
+    public StandardValidationError(final ValidationErrorType type, final 
String explanation) {
+        this.fieldName = Optional.empty();
+        this.inputValue = Optional.empty();
+        this.type = type;
+        this.explanation = Objects.requireNonNull(explanation);
+    }
+
+    @Override
+    public ValidationErrorType getType() {
+        return type;
+    }
+
+    @Override
+    public Optional<String> getFieldName() {
+        return fieldName;
+    }
+
+    @Override
+    public Optional<Object> getInputValue() {
+        return inputValue;
+    }
+
+    @Override
+    public String getExplanation() {
+        return explanation;
+    }
+
+    @Override
+    public String toString() {
+        if (fieldName.isPresent()) {
+            if (inputValue.isPresent()) {
+                final Object input = inputValue.get();
+                if (input instanceof Object[]) {
+                    final StringBuilder sb = new StringBuilder("[");
+                    final Object[] array = (Object[]) input;
+                    for (int i=0; i < array.length; i++) {
+
+                        final Object arrayValue = array[i];
+                        if (arrayValue instanceof String) {
+                            sb.append('"').append(array[i]).append('"');
+                        } else {
+                            sb.append(array[i]);
+                        }
+
+                        if (i < array.length - 1) {
+                            sb.append(", ");
+                        }
+                    }
+                    sb.append("]");
+
+                    return sb.toString() + " is not a valid value for " + 
fieldName.get() + ": " + explanation;
+                } else {
+                    return inputValue.get() + " is not a valid value for " + 
fieldName.get() + ": " + explanation;
+                }
+            } else {
+                return fieldName.get() + " is invalid due to: " + explanation;
+            }
+        }
+
+        return explanation;
+    }
+
+    @Override
+    public int hashCode() {
+        return 31 + 17 * fieldName.hashCode() + 17 * inputValue.hashCode() + 
17 * explanation.hashCode();
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (obj == null) {
+            return false;
+        }
+        if (obj == this) {
+            return true;
+        }
+        if (!(obj instanceof ValidationError)) {
+            return false;
+        }
+
+        final ValidationError other = (ValidationError) obj;
+        return getFieldName().equals(other.getFieldName()) && 
getInputValue().equals(other.getInputValue()) && 
getExplanation().equals(other.getExplanation());
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/validation/TestStandardSchemaValidator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/validation/TestStandardSchemaValidator.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/validation/TestStandardSchemaValidator.java
new file mode 100644
index 0000000..f323a03
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/validation/TestStandardSchemaValidator.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.schema.validation;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.validation.SchemaValidationResult;
+import org.apache.nifi.serialization.record.validation.ValidationError;
+import org.junit.Test;
+
+public class TestStandardSchemaValidator {
+
+    @Test
+    public void testValidateCorrectSimpleTypesStrictValidation() throws 
ParseException {
+        final List<RecordField> fields = new ArrayList<>();
+        for (final RecordFieldType fieldType : RecordFieldType.values()) {
+            if (fieldType == RecordFieldType.CHOICE) {
+                final List<DataType> possibleTypes = new ArrayList<>();
+                possibleTypes.add(RecordFieldType.INT.getDataType());
+                possibleTypes.add(RecordFieldType.LONG.getDataType());
+
+                fields.add(new RecordField(fieldType.name().toLowerCase(), 
fieldType.getChoiceDataType(possibleTypes)));
+            } else if (fieldType == RecordFieldType.MAP) {
+                fields.add(new RecordField(fieldType.name().toLowerCase(), 
fieldType.getMapDataType(RecordFieldType.INT.getDataType())));
+            } else {
+                fields.add(new RecordField(fieldType.name().toLowerCase(), 
fieldType.getDataType()));
+            }
+        }
+
+        final DateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
+        df.setTimeZone(TimeZone.getTimeZone("gmt"));
+        final long time = df.parse("2017/01/01 17:00:00.000").getTime();
+
+        final Map<String, Object> intMap = new LinkedHashMap<>();
+        intMap.put("height", 48);
+        intMap.put("width", 96);
+
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+        final Map<String, Object> valueMap = new LinkedHashMap<>();
+        valueMap.put("string", "string");
+        valueMap.put("boolean", true);
+        valueMap.put("byte", (byte) 1);
+        valueMap.put("char", 'c');
+        valueMap.put("short", (short) 8);
+        valueMap.put("int", 9);
+        valueMap.put("bigint", BigInteger.valueOf(8L));
+        valueMap.put("long", 8L);
+        valueMap.put("float", 8.0F);
+        valueMap.put("double", 8.0D);
+        valueMap.put("date", new Date(time));
+        valueMap.put("time", new Time(time));
+        valueMap.put("timestamp", new Timestamp(time));
+        valueMap.put("record", null);
+        valueMap.put("array", null);
+        valueMap.put("choice", 48L);
+        valueMap.put("map", intMap);
+
+        final Record record = new MapRecord(schema, valueMap);
+
+        final SchemaValidationContext validationContext = new 
SchemaValidationContext(schema, false, true);
+        final StandardSchemaValidator validator = new 
StandardSchemaValidator(validationContext);
+
+        final SchemaValidationResult result = validator.validate(record);
+        assertTrue(result.isValid());
+        assertNotNull(result.getValidationErrors());
+        assertTrue(result.getValidationErrors().isEmpty());
+    }
+
+
+    @Test
+    public void testValidateWrongButCoerceableType() throws ParseException {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final Map<String, Object> valueMap = new LinkedHashMap<>();
+        valueMap.put("id", 1);
+        Record record = new MapRecord(schema, valueMap);
+
+        final SchemaValidationContext strictValidationContext = new 
SchemaValidationContext(schema, false, true);
+        final SchemaValidationContext lenientValidationContext = new 
SchemaValidationContext(schema, false, false);
+
+        // Validate with correct type of int and a strict validation
+        StandardSchemaValidator validator = new 
StandardSchemaValidator(strictValidationContext);
+        SchemaValidationResult result = validator.validate(record);
+        assertTrue(result.isValid());
+        assertNotNull(result.getValidationErrors());
+        assertTrue(result.getValidationErrors().isEmpty());
+
+        // Validate with correct type of int and a lenient validation
+        validator = new StandardSchemaValidator(lenientValidationContext);
+        result = validator.validate(record);
+        assertTrue(result.isValid());
+        assertNotNull(result.getValidationErrors());
+        assertTrue(result.getValidationErrors().isEmpty());
+
+
+        // Update Map to set value to a String that is coerceable to an int
+        valueMap.put("id", "1");
+        record = new MapRecord(schema, valueMap);
+
+
+        // Validate with incorrect type of string and a strict validation
+        validator = new StandardSchemaValidator(strictValidationContext);
+        result = validator.validate(record);
+        assertFalse(result.isValid());
+        final Collection<ValidationError> validationErrors = 
result.getValidationErrors();
+        assertEquals(1, validationErrors.size());
+
+        final ValidationError validationError = 
validationErrors.iterator().next();
+        assertEquals("/id", validationError.getFieldName().get());
+
+        // Validate with incorrect type of string and a lenient validation
+        validator = new StandardSchemaValidator(lenientValidationContext);
+        result = validator.validate(record);
+        assertTrue(result.isValid());
+        assertNotNull(result.getValidationErrors());
+        assertTrue(result.getValidationErrors().isEmpty());
+    }
+
+    @Test
+    public void testMissingRequiredField() {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
+        fields.add(new RecordField("name", 
RecordFieldType.STRING.getDataType(), false));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final Map<String, Object> valueMap = new LinkedHashMap<>();
+        valueMap.put("id", 1);
+        final Record record = new MapRecord(schema, valueMap, false, false);
+
+        final SchemaValidationContext allowExtraFieldsContext = new 
SchemaValidationContext(schema, true, true);
+
+        StandardSchemaValidator validator = new 
StandardSchemaValidator(allowExtraFieldsContext);
+        SchemaValidationResult result = validator.validate(record);
+        assertFalse(result.isValid());
+        assertNotNull(result.getValidationErrors());
+
+        final ValidationError error = 
result.getValidationErrors().iterator().next();
+        assertEquals("/name", error.getFieldName().get());
+    }
+
+    @Test
+    public void testMissingNullableField() {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
+        fields.add(new RecordField("name", 
RecordFieldType.STRING.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final Map<String, Object> valueMap = new LinkedHashMap<>();
+        valueMap.put("id", 1);
+        Record record = new MapRecord(schema, valueMap, false, false);
+
+        final SchemaValidationContext allowExtraFieldsContext = new 
SchemaValidationContext(schema, true, true);
+
+        StandardSchemaValidator validator = new 
StandardSchemaValidator(allowExtraFieldsContext);
+        SchemaValidationResult result = validator.validate(record);
+        assertTrue(result.isValid());
+        assertNotNull(result.getValidationErrors());
+        assertTrue(result.getValidationErrors().isEmpty());
+    }
+
+    @Test
+    public void testExtraFields() {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final Map<String, Object> valueMap = new LinkedHashMap<>();
+        valueMap.put("id", 1);
+        valueMap.put("name", "John Doe");
+        Record record = new MapRecord(schema, valueMap, false, false);
+
+        final SchemaValidationContext allowExtraFieldsContext = new 
SchemaValidationContext(schema, true, true);
+        final SchemaValidationContext forbidExtraFieldsContext = new 
SchemaValidationContext(schema, false, false);
+
+        StandardSchemaValidator validator = new 
StandardSchemaValidator(allowExtraFieldsContext);
+        SchemaValidationResult result = validator.validate(record);
+        assertTrue(result.isValid());
+        assertNotNull(result.getValidationErrors());
+        assertTrue(result.getValidationErrors().isEmpty());
+
+        validator = new StandardSchemaValidator(forbidExtraFieldsContext);
+        result = validator.validate(record);
+        assertFalse(result.isValid());
+        assertNotNull(result.getValidationErrors());
+        final Collection<ValidationError> validationErrors = 
result.getValidationErrors();
+        assertEquals(1, validationErrors.size());
+        final ValidationError validationError = 
validationErrors.iterator().next();
+        assertEquals("/name", validationError.getFieldName().get());
+        System.out.println(validationError);
+    }
+
+
+    @Test
+    public void testInvalidEmbeddedField() {
+        final List<RecordField> accountFields = new ArrayList<>();
+        accountFields.add(new RecordField("name", 
RecordFieldType.STRING.getDataType()));
+        accountFields.add(new RecordField("balance", 
RecordFieldType.DOUBLE.getDataType()));
+        final RecordSchema accountSchema = new 
SimpleRecordSchema(accountFields);
+
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
+        fields.add(new RecordField("account", 
RecordFieldType.RECORD.getRecordDataType(accountSchema)));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final Map<String, Object> accountValues = new HashMap<>();
+        accountValues.put("name", "account-1");
+        accountValues.put("balance", "123.45");
+        final Record accountRecord = new MapRecord(accountSchema, 
accountValues);
+
+        final Map<String, Object> valueMap = new LinkedHashMap<>();
+        valueMap.put("id", 1);
+        valueMap.put("account", accountRecord);
+        Record record = new MapRecord(schema, valueMap, false, false);
+
+        final SchemaValidationContext strictValidationContext = new 
SchemaValidationContext(schema, false, true);
+        final SchemaValidationContext lenientValidationContext = new 
SchemaValidationContext(schema, false, false);
+
+        StandardSchemaValidator validator = new 
StandardSchemaValidator(strictValidationContext);
+        SchemaValidationResult result = validator.validate(record);
+        assertFalse(result.isValid());
+        assertNotNull(result.getValidationErrors());
+        assertEquals(1, result.getValidationErrors().size());
+        final ValidationError validationError = 
result.getValidationErrors().iterator().next();
+        assertEquals("/account/balance", validationError.getFieldName().get());
+
+
+        validator = new StandardSchemaValidator(lenientValidationContext);
+        result = validator.validate(record);
+        assertTrue(result.isValid());
+        assertNotNull(result.getValidationErrors());
+        assertTrue(result.getValidationErrors().isEmpty());
+    }
+
+
+    @Test
+    public void testInvalidArrayValue() {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
+        fields.add(new RecordField("numbers", 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.INT.getDataType())));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final Map<String, Object> valueMap = new LinkedHashMap<>();
+        valueMap.put("id", 1);
+        valueMap.put("numbers", new Object[] {1, "2", "3"});
+        Record record = new MapRecord(schema, valueMap, false, false);
+
+        final SchemaValidationContext strictValidationContext = new 
SchemaValidationContext(schema, false, true);
+        final SchemaValidationContext lenientValidationContext = new 
SchemaValidationContext(schema, false, false);
+
+        StandardSchemaValidator validator = new 
StandardSchemaValidator(strictValidationContext);
+        SchemaValidationResult result = validator.validate(record);
+        assertFalse(result.isValid());
+        assertNotNull(result.getValidationErrors());
+        assertEquals(1, result.getValidationErrors().size());
+        final ValidationError validationError = 
result.getValidationErrors().iterator().next();
+        assertEquals("/numbers", validationError.getFieldName().get());
+
+        validator = new StandardSchemaValidator(lenientValidationContext);
+        result = validator.validate(record);
+        assertTrue(result.isValid());
+        assertNotNull(result.getValidationErrors());
+        assertTrue(result.getValidationErrors().isEmpty());
+    }
+}

Reply via email to