NIFI-4185: Add XML Record Reader

Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d21bd387
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d21bd387
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d21bd387

Branch: refs/heads/master
Commit: d21bd3870b1edd107bc6ef12bc5f12820a10bebb
Parents: 8e4aa6b
Author: JohannesDaniel <[email protected]>
Authored: Wed Mar 7 00:02:43 2018 +0100
Committer: Mark Payne <[email protected]>
Committed: Mon Apr 23 14:39:18 2018 -0400

----------------------------------------------------------------------
 .../nifi-record-serialization-services/pom.xml  |   20 +
 .../java/org/apache/nifi/xml/XMLReader.java     |  129 ++
 .../org/apache/nifi/xml/XMLRecordReader.java    |  568 +++++++
 ...org.apache.nifi.controller.ControllerService |    4 +-
 .../additionalDetails.html                      |  433 ++++++
 .../java/org/apache/nifi/xml/TestXMLReader.java |  170 +++
 .../apache/nifi/xml/TestXMLReaderProcessor.java |   79 +
 .../apache/nifi/xml/TestXMLRecordReader.java    | 1436 ++++++++++++++++++
 .../src/test/resources/xml/people.xml           |   22 +
 .../src/test/resources/xml/people2.xml          |   12 +
 .../src/test/resources/xml/people3.xml          |   12 +
 .../src/test/resources/xml/people_array.xml     |   37 +
 .../test/resources/xml/people_array_simple.xml  |   28 +
 .../src/test/resources/xml/people_cdata.xml     |   22 +
 .../src/test/resources/xml/people_complex1.xml  |   33 +
 .../src/test/resources/xml/people_complex2.xml  |   73 +
 .../src/test/resources/xml/people_empty.xml     |   12 +
 .../src/test/resources/xml/people_invalid.xml   |   21 +
 .../src/test/resources/xml/people_map.xml       |   18 +
 .../src/test/resources/xml/people_map2.xml      |   32 +
 .../src/test/resources/xml/people_namespace.xml |   22 +
 .../src/test/resources/xml/people_nested.xml    |   38 +
 .../test/resources/xml/people_no_attributes.xml |   22 +
 .../resources/xml/people_tag_in_characters.xml  |   23 +
 .../xml/people_with_header_and_comments.xml     |   29 +
 .../src/test/resources/xml/person.xml           |    5 +
 .../src/test/resources/xml/testschema           |   11 +
 .../src/test/resources/xml/testschema2          |   19 +
 28 files changed, 3329 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/d21bd387/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
old mode 100644
new mode 100755
index 4c958f5..4d47701
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
@@ -122,6 +122,26 @@
                         
<exclude>src/test/resources/json/single-element-nested-array.json</exclude>
                         
<exclude>src/test/resources/json/single-element-nested.json</exclude>
                         
<exclude>src/test/resources/json/output/dataTypes.json</exclude>
+                        <exclude>src/test/resources/xml/people.xml</exclude>
+                        <exclude>src/test/resources/xml/people2.xml</exclude>
+                        <exclude>src/test/resources/xml/people3.xml</exclude>
+                        
<exclude>src/test/resources/xml/people_array.xml</exclude>
+                        
<exclude>src/test/resources/xml/people_array_simple.xml</exclude>
+                        
<exclude>src/test/resources/xml/people_cdata.xml</exclude>
+                        
<exclude>src/test/resources/xml/people_complex1.xml</exclude>
+                        
<exclude>src/test/resources/xml/people_complex2.xml</exclude>
+                        
<exclude>src/test/resources/xml/people_empty.xml</exclude>
+                        
<exclude>src/test/resources/xml/people_invalid.xml</exclude>
+                        
<exclude>src/test/resources/xml/people_map.xml</exclude>
+                        
<exclude>src/test/resources/xml/people_map2.xml</exclude>
+                        
<exclude>src/test/resources/xml/people_namespace.xml</exclude>
+                        
<exclude>src/test/resources/xml/people_nested.xml</exclude>
+                        
<exclude>src/test/resources/xml/people_no_attributes.xml</exclude>
+                        
<exclude>src/test/resources/xml/people_tag_in_characters.xml</exclude>
+                        
<exclude>src/test/resources/xml/people_with_header_and_comments.xml</exclude>
+                        <exclude>src/test/resources/xml/person.xml</exclude>
+                        <exclude>src/test/resources/xml/testschema</exclude>
+                        <exclude>src/test/resources/xml/testschema2</exclude>
                     </excludes>
                 </configuration>
             </plugin>

http://git-wip-us.apache.org/repos/asf/nifi/blob/d21bd387/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLReader.java
new file mode 100755
index 0000000..73ee75d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLReader.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.xml;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.DateTimeUtils;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SchemaRegistryService;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+@Tags({"xml", "record", "reader", "parser"})
+@CapabilityDescription("Reads XML content and creates Record objects. Records 
are expected in the second level of " +
+        "XML data, embedded in an enclosing root tag.")
+public class XMLReader extends SchemaRegistryService implements 
RecordReaderFactory {
+
+    public static final AllowableValue RECORD_SINGLE = new 
AllowableValue("false");
+    public static final AllowableValue RECORD_ARRAY = new 
AllowableValue("true");
+    public static final AllowableValue RECORD_EVALUATE = new 
AllowableValue("${xml.stream.is.array}","Use attribute xml.stream.is.array");
+
+    public static final PropertyDescriptor RECORD_FORMAT = new 
PropertyDescriptor.Builder()
+            .name("record_format")
+            .displayName("Expect Records as Array")
+            .description("This property defines whether the reader expects a 
single record an array of records. If the property is " +
+                    "set to \"true\", the reader expects an array of records 
and the outer element of the XML will be treated as a " +
+                    "wrapper for the records. If the property is set to 
\"false\", the reader expects a single record for each FlowFile " +
+                    "(without wrapper-element). If the property is set to 
\"Use attribute xml.stream.is.array\", the attribute " +
+                    "\"xml.stream.is.array\" will be evaluated for each 
FlowFile whether to treat its content as array " +
+                    "of records (in the case of \"true\") or as single record 
(in the case of \"false\".")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(RECORD_SINGLE, RECORD_ARRAY, RECORD_EVALUATE)
+            .defaultValue(RECORD_SINGLE.getValue())
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(false)
+            .build();
+
+    public static final PropertyDescriptor ATTRIBUTE_PREFIX = new 
PropertyDescriptor.Builder()
+            .name("attribute_prefix")
+            .displayName("Attribute Prefix")
+            .description("If this property is set, the name of attributes will 
be prepended with a prefix when they are added to a record.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(false)
+            .build();
+
+    public static final PropertyDescriptor CONTENT_FIELD_NAME = new 
PropertyDescriptor.Builder()
+            .name("content_field_name")
+            .displayName("Field Name for Content")
+            .description("If tags with content (e. g. <field>content</field>) 
are defined as nested records in the schema, " +
+                    "the name of the tag will be used as name for the record 
and the value of this property will be used as name for the field. " +
+                    "If tags with content shall be parsed together with 
attributes (e. g. <field attribute=\"123\">content</field>), " +
+                    "they have to be defined as records. For additional 
information, see the section of processor usage.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(false)
+            .build();
+
+    private volatile String dateFormat;
+    private volatile String timeFormat;
+    private volatile String timestampFormat;
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        this.dateFormat = 
context.getProperty(DateTimeUtils.DATE_FORMAT).getValue();
+        this.timeFormat = 
context.getProperty(DateTimeUtils.TIME_FORMAT).getValue();
+        this.timestampFormat = 
context.getProperty(DateTimeUtils.TIMESTAMP_FORMAT).getValue();
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new 
ArrayList<>(super.getSupportedPropertyDescriptors());
+        properties.add(RECORD_FORMAT);
+        properties.add(ATTRIBUTE_PREFIX);
+        properties.add(CONTENT_FIELD_NAME);
+        properties.add(DateTimeUtils.DATE_FORMAT);
+        properties.add(DateTimeUtils.TIME_FORMAT);
+        properties.add(DateTimeUtils.TIMESTAMP_FORMAT);
+        return properties;
+    }
+
+    @Override
+    public RecordReader createRecordReader(final Map<String, String> 
variables, final InputStream in, final ComponentLog logger)
+            throws IOException, SchemaNotFoundException, 
MalformedRecordException {
+        final ConfigurationContext context = getConfigurationContext();
+
+        final RecordSchema schema = getSchema(variables, in, null);
+
+        final String attributePrefix = 
context.getProperty(ATTRIBUTE_PREFIX).isSet()
+                ? 
context.getProperty(ATTRIBUTE_PREFIX).evaluateAttributeExpressions(variables).getValue().trim()
 : null;
+
+        final String contentFieldName = 
context.getProperty(CONTENT_FIELD_NAME).isSet()
+                ? 
context.getProperty(CONTENT_FIELD_NAME).evaluateAttributeExpressions(variables).getValue().trim()
 : null;
+
+        final boolean isArray = 
Boolean.parseBoolean(context.getProperty(RECORD_FORMAT).evaluateAttributeExpressions(variables).getValue());
+
+        return new XMLRecordReader(in, schema, isArray, attributePrefix, 
contentFieldName, dateFormat, timeFormat, timestampFormat, logger);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/d21bd387/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordReader.java
new file mode 100755
index 0000000..e819b92
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordReader.java
@@ -0,0 +1,568 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.xml;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.MapDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.apache.nifi.util.StringUtils;
+
+import javax.xml.stream.XMLEventReader;
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.events.Attribute;
+import javax.xml.stream.events.Characters;
+import javax.xml.stream.events.StartElement;
+import javax.xml.stream.events.XMLEvent;
+import java.io.IOException;
+import java.io.InputStream;
+import java.text.DateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Supplier;
+
+public class XMLRecordReader implements RecordReader {
+
+    private final ComponentLog logger;
+    private final RecordSchema schema;
+    private final String attributePrefix;
+    private final String contentFieldName;
+
+    private StartElement currentRecordStartTag;
+
+    private final XMLEventReader xmlEventReader;
+
+    private final Supplier<DateFormat> LAZY_DATE_FORMAT;
+    private final Supplier<DateFormat> LAZY_TIME_FORMAT;
+    private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT;
+
+    public XMLRecordReader(InputStream in, RecordSchema schema, boolean 
isArray, String attributePrefix, String contentFieldName,
+                           final String dateFormat, final String timeFormat, 
final String timestampFormat, final ComponentLog logger) throws 
MalformedRecordException {
+        this.schema = schema;
+        this.attributePrefix = attributePrefix;
+        this.contentFieldName = contentFieldName;
+        this.logger = logger;
+
+        final DateFormat df = dateFormat == null ? null : 
DataTypeUtils.getDateFormat(dateFormat);
+        final DateFormat tf = timeFormat == null ? null : 
DataTypeUtils.getDateFormat(timeFormat);
+        final DateFormat tsf = timestampFormat == null ? null : 
DataTypeUtils.getDateFormat(timestampFormat);
+
+        LAZY_DATE_FORMAT = () -> df;
+        LAZY_TIME_FORMAT = () -> tf;
+        LAZY_TIMESTAMP_FORMAT = () -> tsf;
+
+        try {
+            final XMLInputFactory xmlInputFactory = 
XMLInputFactory.newInstance();
+
+            xmlEventReader = xmlInputFactory.createXMLEventReader(in);
+
+            if (isArray) {
+                skipNextStartTag();
+            }
+
+            setNextRecordStartTag();
+        } catch (XMLStreamException e) {
+            throw new MalformedRecordException("Could not parse XML", e);
+        }
+    }
+
+    private void skipNextStartTag() throws XMLStreamException {
+        while (xmlEventReader.hasNext()) {
+            final XMLEvent xmlEvent = xmlEventReader.nextEvent();
+            if (xmlEvent.isStartElement()) {
+                return;
+            }
+        }
+    }
+
+    private void setNextRecordStartTag() throws XMLStreamException {
+        while (xmlEventReader.hasNext()) {
+            final XMLEvent xmlEvent = xmlEventReader.nextEvent();
+            if (xmlEvent.isStartElement()) {
+                final StartElement startElement = xmlEvent.asStartElement();
+                currentRecordStartTag = startElement;
+                return;
+            }
+        }
+        currentRecordStartTag = null;
+    }
+
+    @Override
+    public Record nextRecord(final boolean coerceTypes, final boolean 
dropUnknownFields) throws IOException, MalformedRecordException {
+        if (currentRecordStartTag == null) {
+            return null;
+        }
+        try {
+            final Record record = parseRecord(currentRecordStartTag, 
this.schema, coerceTypes, dropUnknownFields);
+            setNextRecordStartTag();
+            if (record != null) {
+                return record;
+            } else {
+                return new MapRecord(this.schema, Collections.EMPTY_MAP);
+            }
+        } catch (XMLStreamException e) {
+            throw new MalformedRecordException("Could not parse XML", e);
+        }
+    }
+
+    private Object parseFieldForType(StartElement startElement, String 
fieldName, DataType dataType, Map<String, Object> recordValues,
+                                     boolean dropUnknown) throws 
XMLStreamException, MalformedRecordException {
+        switch (dataType.getFieldType()) {
+            case BOOLEAN:
+            case BYTE:
+            case CHAR:
+            case DOUBLE:
+            case FLOAT:
+            case INT:
+            case LONG:
+            case SHORT:
+            case STRING:
+            case DATE:
+            case TIME:
+            case TIMESTAMP: {
+
+                StringBuilder content = new StringBuilder();
+
+                while (xmlEventReader.hasNext()) {
+                    XMLEvent xmlEvent = xmlEventReader.nextEvent();
+                    if (xmlEvent.isCharacters()) {
+                        final Characters characters = xmlEvent.asCharacters();
+                        if (!characters.isWhiteSpace()) {
+                            content.append(characters.getData());
+                        }
+                    } else if (xmlEvent.isEndElement()) {
+                        final String contentToReturn = content.toString();
+
+                        if (!StringUtils.isBlank(contentToReturn)) {
+                            return 
DataTypeUtils.convertType(content.toString(), dataType, LAZY_DATE_FORMAT, 
LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName);
+                        } else {
+                            return null;
+                        }
+
+                    } else if (xmlEvent.isStartElement()) {
+                        this.skipElement();
+                    }
+                }
+                break;
+            }
+
+            case ARRAY: {
+                final DataType arrayDataType = ((ArrayDataType) 
dataType).getElementType();
+
+                final Object newValue = parseFieldForType(startElement, 
fieldName, arrayDataType, recordValues, dropUnknown);
+                final Object oldValues = recordValues.get(fieldName);
+
+                if (newValue != null) {
+                    if (oldValues != null) {
+                        if (oldValues instanceof List) {
+                            ((List) oldValues).add(newValue);
+                        } else {
+                            List<Object> arrayValues = new ArrayList<>();
+                            arrayValues.add(oldValues);
+                            arrayValues.add(newValue);
+                            return arrayValues;
+                        }
+                    } else {
+                        List<Object> arrayValues = new ArrayList<>();
+                        arrayValues.add(newValue);
+                        return arrayValues;
+                    }
+                }
+                return oldValues;
+            }
+
+            case RECORD: {
+                final RecordSchema childSchema;
+                if (dataType instanceof RecordDataType) {
+                    childSchema = ((RecordDataType) dataType).getChildSchema();
+                } else {
+                    return null;
+                }
+
+                return parseRecord(startElement, childSchema, true, 
dropUnknown);
+            }
+
+            case MAP: {
+                final DataType mapDataType = ((MapDataType) 
dataType).getValueType();
+                final Map<String,Object> embeddedMap = new HashMap<>();
+
+                while (xmlEventReader.hasNext()) {
+                    XMLEvent xmlEvent = xmlEventReader.nextEvent();
+
+                    if (xmlEvent.isStartElement()) {
+                        final StartElement subStartElement = 
xmlEvent.asStartElement();
+                        final String subFieldName = 
subStartElement.getName().getLocalPart();
+
+                        final Object mapValue = 
parseFieldForType(subStartElement, subFieldName, mapDataType, embeddedMap, 
dropUnknown);
+                        embeddedMap.put(subFieldName, mapValue);
+
+                    } else if (xmlEvent.isEndElement()) {
+                        break;
+                    }
+                }
+
+                if (embeddedMap.size() > 0) {
+                    return embeddedMap;
+                } else {
+                    return null;
+                }
+            }
+            case CHOICE: {
+                // field choice will parse the entire tree of a field
+                return parseUnknownField(startElement, false, null);
+            }
+        }
+        return null;
+    }
+
+    private Object parseUnknownField(StartElement startElement, boolean 
dropUnknown, RecordSchema schema) throws XMLStreamException {
+        // parse attributes
+        final Map<String, Object> recordValues = new HashMap<>();
+        final Iterator iterator = startElement.getAttributes();
+        while (iterator.hasNext()) {
+            final Attribute attribute = (Attribute) iterator.next();
+            final String attributeName = attribute.getName().toString();
+
+            if (dropUnknown) {
+                if (schema != null) {
+                    final Optional<RecordField> field = 
schema.getField(attributeName);
+                    if (field.isPresent()){
+                        recordValues.put(attributePrefix == null ? 
attributeName : attributePrefix + attributeName, attribute.getValue());
+                    }
+                }
+            } else {
+                recordValues.put(attributePrefix == null ? attributeName : 
attributePrefix + attributeName, attribute.getValue());
+            }
+        }
+
+        // parse fields
+        StringBuilder content = new StringBuilder();
+
+        while (xmlEventReader.hasNext()) {
+            final XMLEvent xmlEvent = xmlEventReader.nextEvent();
+            if (xmlEvent.isCharacters()) {
+                final Characters characters = xmlEvent.asCharacters();
+                if (!characters.isWhiteSpace()) {
+                    content.append(characters.getData());
+                }
+            } else if (xmlEvent.isStartElement()){
+                final StartElement subStartElement = xmlEvent.asStartElement();
+                final String subFieldName = 
subStartElement.getName().getLocalPart();
+
+                if (dropUnknown) {
+                    if (schema != null) {
+                        final Optional<RecordField> field = 
schema.getField(subFieldName);
+                        if (field.isPresent()){
+
+                            // subElements of subStartElement can only be 
known if there is a corresponding field in the schema defined as record
+                            final DataType dataType = 
field.get().getDataType();
+                            RecordSchema childSchema = null;
+
+                            if (dataType instanceof RecordDataType) {
+                                childSchema = ((RecordDataType) 
dataType).getChildSchema();
+                            } else if (dataType instanceof ArrayDataType) {
+                                DataType typeOfArray = ((ArrayDataType) 
dataType).getElementType();
+                                if (typeOfArray instanceof RecordDataType) {
+                                    childSchema = ((RecordDataType) 
typeOfArray).getChildSchema();
+                                }
+                            }
+
+                            final Object value = 
parseUnknownField(subStartElement, true, childSchema);
+                            if (value != null) {
+                                putUnknownTypeInMap(recordValues, 
subFieldName, value);
+                            }
+                        } else {
+                            skipElement();
+                        }
+                    } else {
+                        skipElement();
+                    }
+                } else {
+                    final Object value = parseUnknownField(subStartElement, 
dropUnknown, schema);
+                    if (value != null) {
+                        putUnknownTypeInMap(recordValues, subFieldName, value);
+                    }
+                }
+            } else if (xmlEvent.isEndElement()) {
+                break;
+            }
+        }
+
+        for (final Map.Entry<String,Object> entry : recordValues.entrySet()) {
+            if (entry.getValue() instanceof List) {
+                recordValues.put(entry.getKey(), ((List) 
entry.getValue()).toArray());
+            }
+        }
+
+        final boolean hasContent = content.length() > 0;
+        final boolean hasFields = recordValues.size() > 0;
+
+        if (hasContent) {
+            if (!hasFields) {
+                return content.toString();
+            } else {
+                if (contentFieldName != null) {
+                    recordValues.put(contentFieldName, content.toString());
+                } else {
+                    logger.debug("Found content for field that has to be 
parsed as record but property \"Field Name for Content\" is not set. " +
+                            "The content will not be added to the record.");
+                }
+
+                return new MapRecord(new 
SimpleRecordSchema(Collections.emptyList()), recordValues);
+            }
+        } else {
+            if (hasFields) {
+                return new MapRecord(new 
SimpleRecordSchema(Collections.emptyList()), recordValues);
+            } else {
+                return null;
+            }
+        }
+    }
+
+    private Record parseRecord(StartElement startElement, RecordSchema schema, 
boolean coerceTypes, boolean dropUnknown) throws XMLStreamException, 
MalformedRecordException {
+        final Map<String, Object> recordValues = new HashMap<>();
+
+        // parse attributes
+        final Iterator iterator = startElement.getAttributes();
+        while (iterator.hasNext()) {
+            final Attribute attribute = (Attribute) iterator.next();
+            final String attributeName = attribute.getName().toString();
+
+            final String targetFieldName = attributePrefix == null ? 
attributeName : attributePrefix + attributeName;
+
+            if (dropUnknown) {
+                final Optional<RecordField> field = 
schema.getField(attributeName);
+                if (field.isPresent()){
+
+                    // dropUnknown == true && coerceTypes == true
+                    if (coerceTypes) {
+                        final Object value;
+                        final DataType dataType = field.get().getDataType();
+                        if ((value = parseStringForType(attribute.getValue(), 
attributeName, dataType)) != null) {
+                            recordValues.put(targetFieldName, value);
+                        }
+
+                    // dropUnknown == true && coerceTypes == false
+                    } else {
+                        recordValues.put(targetFieldName, 
attribute.getValue());
+                    }
+                }
+            } else {
+
+                // dropUnknown == false && coerceTypes == true
+                if (coerceTypes) {
+                    final Object value;
+                    final Optional<RecordField> field = 
schema.getField(attributeName);
+                    if (field.isPresent()){
+                        if ((value = parseStringForType(attribute.getValue(), 
attributeName, field.get().getDataType())) != null) {
+                            recordValues.put(targetFieldName, value);
+                        }
+                    } else {
+                        recordValues.put(targetFieldName, 
attribute.getValue());
+                    }
+
+                    // dropUnknown == false && coerceTypes == false
+                } else {
+                    recordValues.put(targetFieldName, attribute.getValue());
+                }
+            }
+        }
+
+        // parse fields
+        StringBuilder content = new StringBuilder();
+        while(xmlEventReader.hasNext()){
+            final XMLEvent xmlEvent = xmlEventReader.nextEvent();
+
+            if (xmlEvent.isStartElement()) {
+                final StartElement subStartElement = xmlEvent.asStartElement();
+                final String fieldName = 
subStartElement.getName().getLocalPart();
+
+                final Optional<RecordField> field = schema.getField(fieldName);
+
+                if (dropUnknown) {
+                    if (field.isPresent()) {
+                        // dropUnknown == true && coerceTypes == true
+                        if (coerceTypes) {
+                            final Object value = 
parseFieldForType(subStartElement, fieldName, field.get().getDataType(), 
recordValues, true);
+                            if (value != null) {
+                                recordValues.put(fieldName, value);
+                            }
+
+                        // dropUnknown == true && coerceTypes == false
+                        // subElements of subStartElement can only be known if 
there is a corresponding field in the schema defined as record
+                        } else {
+                            final DataType dataType = 
field.get().getDataType();
+                            RecordSchema childSchema = null;
+
+                            if (dataType instanceof RecordDataType) {
+                                childSchema = ((RecordDataType) 
dataType).getChildSchema();
+                            } else if (dataType instanceof ArrayDataType) {
+                                DataType typeOfArray = ((ArrayDataType) 
dataType).getElementType();
+                                if (typeOfArray instanceof RecordDataType) {
+                                    childSchema = ((RecordDataType) 
typeOfArray).getChildSchema();
+                                }
+                            }
+
+                            final Object value = 
parseUnknownField(subStartElement, true, childSchema);
+                            if (value != null) {
+                                putUnknownTypeInMap(recordValues, fieldName, 
value);
+                            }
+                        }
+
+                    } else {
+                        skipElement();
+                    }
+                } else {
+                    // dropUnknown == false && coerceTypes == true
+                    if (coerceTypes) {
+                        if (field.isPresent()) {
+                            final Object value = 
parseFieldForType(subStartElement, fieldName, field.get().getDataType(), 
recordValues, false);
+                            if (value != null) {
+                                recordValues.put(fieldName, value);
+                            }
+                        } else {
+                            final Object value = 
parseUnknownField(subStartElement, false, null);
+                            if (value != null) {
+                                putUnknownTypeInMap(recordValues, fieldName, 
value);
+                            }
+                        }
+
+                    // dropUnknown == false && coerceTypes == false
+                    } else {
+                        final Object value = 
parseUnknownField(subStartElement, false, null);
+                        if (value != null) {
+                            putUnknownTypeInMap(recordValues, fieldName, 
value);
+                        }
+                    }
+                }
+            } else if (xmlEvent.isEndElement()) {
+                break;
+            } else if (xmlEvent.isCharacters()) {
+                final Characters characters = xmlEvent.asCharacters();
+                if (!characters.isWhiteSpace()) {
+                    content.append(characters.getData());
+                }
+            }
+        }
+
+        if (content.length() > 0) {
+            if (contentFieldName != null) {
+                final Optional<RecordField> field = 
schema.getField(contentFieldName);
+                if (field.isPresent()) {
+                    Object value = parseStringForType(content.toString(), 
contentFieldName, field.get().getDataType());
+                    recordValues.put(contentFieldName, value);
+                }
+            } else {
+                logger.debug("Found content for field that is defined as 
record but property \"Field Name for Content\" is not set. " +
+                        "The content will not be added to record.");
+            }
+        }
+
+        for (final Map.Entry<String,Object> entry : recordValues.entrySet()) {
+            if (entry.getValue() instanceof List) {
+                recordValues.put(entry.getKey(), ((List) 
entry.getValue()).toArray());
+            }
+        }
+
+        if (recordValues.size() > 0) {
+            return new MapRecord(schema, recordValues);
+        } else {
+            return null;
+        }
+    }
+
+    private void putUnknownTypeInMap(Map<String, Object> values, String 
fieldName, Object fieldValue) {
+        final Object oldValues = values.get(fieldName);
+
+        if (oldValues != null) {
+            if (oldValues instanceof List) {
+                ((List) oldValues).add(fieldValue);
+            } else {
+                List<Object> valuesToPut = new ArrayList<>();
+                valuesToPut.add(oldValues);
+                valuesToPut.add(fieldValue);
+
+                values.put(fieldName, valuesToPut);
+            }
+        } else {
+            values.put(fieldName, fieldValue);
+        }
+    }
+
+    private Object parseStringForType(String data, String fieldName, DataType 
dataType) {
+        switch (dataType.getFieldType()) {
+            case BOOLEAN:
+            case BYTE:
+            case CHAR:
+            case DOUBLE:
+            case FLOAT:
+            case INT:
+            case LONG:
+            case SHORT:
+            case STRING:
+            case DATE:
+            case TIME:
+            case TIMESTAMP: {
+                return DataTypeUtils.convertType(data, dataType, 
LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName);
+            }
+        }
+        return null;
+    }
+
+    private void skipElement() throws XMLStreamException {
+        while(xmlEventReader.hasNext()){
+            final XMLEvent xmlEvent = xmlEventReader.nextEvent();
+
+            if (xmlEvent.isStartElement()) {
+                skipElement();
+            }
+            if (xmlEvent.isEndElement()) {
+                return;
+            }
+        }
+    }
+
+    @Override
+    public RecordSchema getSchema() {
+        return schema;
+    }
+
+    @Override
+    public void close() throws IOException {
+        try {
+            xmlEventReader.close();
+        } catch (XMLStreamException e) {
+            logger.error("Unable to close XMLEventReader");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/d21bd387/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
old mode 100644
new mode 100755
index 628dbe5..76c89c2
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -25,4 +25,6 @@ org.apache.nifi.csv.CSVRecordSetWriter
 
 org.apache.nifi.grok.GrokReader
 
-org.apache.nifi.text.FreeFormTextRecordSetWriter
\ No newline at end of file
+org.apache.nifi.text.FreeFormTextRecordSetWriter
+
+org.apache.nifi.xml.XMLReader
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/d21bd387/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.xml.XMLReader/additionalDetails.html
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.xml.XMLReader/additionalDetails.html
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.xml.XMLReader/additionalDetails.html
new file mode 100755
index 0000000..46cd259
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.xml.XMLReader/additionalDetails.html
@@ -0,0 +1,433 @@
+<!DOCTYPE html>
+<html lang="en">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <head>
+        <meta charset="utf-8"/>
+        <title>XMLReader</title>
+        <link rel="stylesheet" href="../../../../../css/component-usage.css" 
type="text/css"/>
+    </head>
+
+    <body>
+    <p>
+        The XMLReader Controller Service reads XML content and creates Record 
objects. The Controller Service
+        must be configured with a schema that describes the structure of the 
XML data. Fields in the XML data
+        that are not defined in the schema will be skipped. Depending on 
whether the property "Expect Records as Array"
+        is set to "false" or "true", the reader either expects a single record 
or an array of records for each FlowFile.
+    </p>
+    <p>
+        Example: Single record
+    </p>
+    <code>
+            <pre>
+                &lt;record&gt;
+                  &lt;field1&gt;content&lt;/field1&gt;
+                  &lt;field2&gt;content&lt;/field2&gt;
+                &lt;/record&gt;
+            </pre>
+    </code>
+
+    <p>
+        An array of records has to be enclosed by a root tag.
+        Example: Array of records
+    </p>
+
+    <code>
+            <pre>
+                &lt;root&gt;
+                  &lt;record&gt;
+                    &lt;field1&gt;content&lt;/field1&gt;
+                    &lt;field2&gt;content&lt;/field2&gt;
+                  &lt;/record&gt;
+                  &lt;record&gt;
+                    &lt;field1&gt;content&lt;/field1&gt;
+                    &lt;field2&gt;content&lt;/field2&gt;
+                  &lt;/record&gt;
+                &lt;/root&gt;
+            </pre>
+    </code>
+
+    <h2>Example: Simple Fields</h2>
+
+    <p>
+        The simplest kind of data within XML data are tags / fields only 
containing content (no attributes, no embedded tags).
+        They can be described in the schema by simple types (e. g. INT, 
STRING, ...).
+    </p>
+
+    <code>
+            <pre>
+                &lt;root&gt;
+                  &lt;record&gt;
+                    &lt;simple_field&gt;content&lt;/simple_field&gt;
+                  &lt;/record&gt;
+                &lt;/root&gt;
+            </pre>
+    </code>
+
+    <p>
+        This record can be described by a schema containing one field (e. g. 
of type string). By providing this schema,
+        the reader expects zero or one occurrences of "simple_field" in the 
record.
+    </p>
+
+    <code>
+            <pre>
+                {
+                  "namespace": "nifi",
+                  "name": "test",
+                  "type": "record",
+                  "fields": [
+                    { "name": "simple_field", "type": "string" }
+                  ]
+                }
+            </pre>
+    </code>
+
+    <h2>Example: Arrays with Simple Fields</h2>
+
+    <p>
+        Arrays are considered as repetitive tags / fields in XML data. For the 
following XML data, "array_field" is considered
+        to be an array enclosing simple fields, whereas "simple_field" is 
considered to be a simple field not enclosed in
+        an array.
+    </p>
+
+    <code>
+            <pre>
+                &lt;record&gt;
+                  &lt;array_field&gt;content&lt;/array_field&gt;
+                  &lt;array_field&gt;content&lt;/array_field&gt;
+                  &lt;simple_field&gt;content&lt;/simple_field&gt;
+                &lt;/record&gt;
+            </pre>
+    </code>
+
+    <p>
+        This record can be described by the following schema:
+    </p>
+
+    <code>
+            <pre>
+                {
+                  "namespace": "nifi",
+                  "name": "test",
+                  "type": "record",
+                  "fields": [
+                    { "name": "array_field", "type":
+                      { "type": "array", "items": string }
+                    },
+                    { "name": "simple_field", "type": "string" }
+                  ]
+                }
+            </pre>
+    </code>
+
+    <p>
+        If a field in a schema is embedded in an array, the reader expects 
zero, one or more occurrences of the field
+        in a record. The field "array_field" principally also could be defined 
as a simple field, but then the second occurrence
+        of this field would replace the first in the record object. Moreover, 
the field "simple_field" could also be defined
+        as an array. In this case, the reader would put it into the record 
object as an array with one element.
+    </p>
+
+    <h2>Example: Tags with Attributes</h2>
+
+    <p>
+        XML fields frequently not only contain content, but also attributes. 
The following record contains a field with
+        an attribute "attr" and content:
+    </p>
+
+    <code>
+            <pre>
+                &lt;record&gt;
+                  &lt;field_with_attribute attr="attr_content"&gt;content of 
field&lt;/field_with_attribute&gt;
+                &lt;/record&gt;
+            </pre>
+    </code>
+
+    <p>
+        To parse the content of the field "field_with_attribute" together with 
the attribute "attr", two requirements have
+        to be fulfilled:
+    </p>
+
+    <ul>
+        <li>In the schema, the field has to be defined as record.</li>
+        <li>The property "Field Name for Content" has to be set.</li>
+        <li>As an option, the property "Attribute Prefix" also can be set.</li>
+    </ul>
+
+    <p>
+        For the example above, the following property settings are assumed:
+    </p>
+
+    <table>
+        <tr>
+            <th>Property Name</th>
+            <th>Property Value</th>
+        </tr>
+        <tr>
+            <td>Field Name for Content</td>
+            <td><code>field_name_for_content</code></td>
+        </tr>
+        <tr>
+            <td>Attribute Prefix</td>
+            <td><code>prefix_</code></td>
+        </tr>
+    </table>
+
+    <p>
+        The schema can be defined as follows:
+    </p>
+
+    <code>
+            <pre>
+                {
+                  "name": "test",
+                  "namespace": "nifi",
+                  "type": "record",
+                  "fields": [
+                    {
+                      "name": "field_with_attribute",
+                      "type": {
+                        "name": "RecordForTag",
+                        "type": "record",
+                        "fields" : [
+                          {"name": "attr", "type": "string"},
+                          {"name": "field_name_for_content", "type": "string"}
+                        ]
+                    }
+                  ]
+                }
+            </pre>
+    </code>
+
+    <p>
+        Note that the field "field_name_for_content" not only has to be 
defined in the property section, but also in the
+        schema, whereas the prefix for attributes is not part of the schema. 
It will be appended when an attribute named
+        "attr" is found at the respective position in the XML data and added 
to the record. The record object of the above
+        example will be structured as follows:
+    </p>
+
+    <code>
+            <pre>
+                Record (
+                    Record "field_with_attribute" (
+                        RecordField "prefix_attr" = "attr_content",
+                        RecordField "field_name_for_content" = "content of 
field"
+                    )
+                )
+            </pre>
+    </code>
+
+    <p>
+        Principally, the field "field_with_attribute" could also be defined as 
a simple field. In this case, the attributes
+        simply would be ignored. Vice versa, the simple field in example 1 
above could also be defined as a record (assuming that
+        the property "Field Name for Content" is set.
+    </p>
+
+    <h2>Example: Tags within tags</h2>
+
+    <p>
+        XML data is frequently nested. In this case, tags enclose other tags:
+    </p>
+
+    <code>
+            <pre>
+                &lt;record&gt;
+                  &lt;field_with_embedded_fields 
attr=&quot;attr_content&quot;&gt;
+                    &lt;embedded_field&gt;embedded 
content&lt;/embedded_field&gt;
+                    &lt;another_embedded_field&gt;another embedded 
content&lt;/another_embedded_field&gt;
+                  &lt;/field_with_embedded_fields&gt;
+                &lt;/record&gt;
+            </pre>
+    </code>
+
+    <p>
+        The enclosing fields always have to be defined as records, 
irrespective whether they include attributes to be
+        parsed or not. In this example, the tag "field_with_embedded_fields" 
encloses the fields "embedded_field" and
+        "another_embedded_field", which are both simple fields. The schema can 
be defined as follows:
+    </p>
+
+    <code>
+            <pre>
+                {
+                  "name": "test",
+                  "namespace": "nifi",
+                  "type": "record",
+                  "fields": [
+                    {
+                      "name": "field_with_embedded_fields",
+                      "type": {
+                        "name": "RecordForEmbedded",
+                        "type": "record",
+                        "fields" : [
+                          {"name": "attr", "type": "string"},
+                          {"name": "embedded_field", "type": "string"},
+                          {"name": "another_embedded_field", "type": "string"}
+                        ]
+                    }
+                  ]
+                }
+            </pre>
+    </code>
+
+    <p>
+        Notice that this case does not require the property "Field Name for 
Content" to be set as this is only required
+        for tags containing attributes and content.
+    </p>
+
+    <h2>Example: Array of records</h2>
+
+    <p>
+        For further explanation of the logic of this reader, an example of an 
array of records shall be demonstrated.
+        The following record contains the field "array_field", which 
repeatedly occurs. The field contains two
+        embedded fields.
+    </p>
+
+    <code>
+            <pre>
+                &lt;record&gt;
+                  &lt;array_field&gt;
+                    &lt;embedded_field&gt;embedded content 
1&lt;/embedded_field&gt;
+                    &lt;another_embedded_field&gt;another embedded content 
1&lt;/another_embedded_field&gt;
+                  &lt;/array_field&gt;
+                  &lt;array_field&gt;
+                    &lt;embedded_field&gt;embedded content 
2&lt;/embedded_field&gt;
+                    &lt;another_embedded_field&gt;another embedded content 
2&lt;/another_embedded_field&gt;
+                  &lt;/array_field&gt;
+                &lt;/record&gt;
+            </pre>
+    </code>
+
+    <p>
+        This XML data can be parsed similarly to the data in example 4. 
However, the record defined in the schema of
+        example 4 has to be embedded in an array.
+    </p>
+
+    <code>
+            <pre>
+                {
+                  "namespace": "nifi",
+                  "name": "test",
+                  "type": "record",
+                  "fields": [
+                    { "name": "array_field",
+                      "type": {
+                        "type": "array",
+                        "items": {
+                          "name": "RecordInArray",
+                          "type": "record",
+                          "fields" : [
+                            {"name": "embedded_field", "type": "string"},
+                            {"name": "another_embedded_field", "type": 
"string"}
+                          ]
+                        }
+                      }
+                    }
+                  ]
+                }
+            </pre>
+    </code>
+
+    <h2>Example: Array in record</h2>
+
+    <p>
+        In XML data, arrays are frequently enclosed by tags:
+    </p>
+
+    <code>
+            <pre>
+                &lt;record&gt;
+                  &lt;field_enclosing_array&gt;
+                    &lt;element&gt;content 1&lt;/element&gt;
+                    &lt;element&gt;content 2&lt;/element&gt;
+                  &lt;/field_enclosing_array&gt;
+                  &lt;field_without_array&gt; content 
3&lt;/field_without_array&gt;
+                &lt;/record&gt;
+            </pre>
+    </code>
+
+    <p>
+        For the schema, embedded tags have to be described by records. 
Therefore, the field "field_enclosing_array"
+        is a record that embeds an array with elements of type string:
+    </p>
+
+    <code>
+            <pre>
+                {
+                  "namespace": "nifi",
+                  "name": "test",
+                  "type": "record",
+                  "fields": [
+                    { "name": "field_enclosing_array",
+                      "type": {
+                        "name": "EmbeddedRecord",
+                        "type": "record",
+                        "fields" : [
+                          {
+                            "name": "element",
+                            "type": {
+                              "type": "array",
+                              "items": "string"
+                            }
+                          }
+                        ]
+                      }
+                    },
+                    { "name": "field_without_array", "type": "string" }
+                  ]
+                }
+            </pre>
+    </code>
+
+
+    <h2>Example: Maps</h2>
+
+    <p>
+        A map is a field embedding fields with different names:
+    </p>
+
+    <code>
+            <pre>
+                &lt;record&gt;
+                  &lt;map_field&gt;
+                    &lt;field1&gt;content&lt;/field1&gt;
+                    &lt;field2&gt;content&lt;/field2&gt;
+                    ...
+                  &lt;/map_field&gt;
+                  &lt;simple_field&gt;content&lt;/simple_field&gt;
+                &lt;/record&gt;
+            </pre>
+    </code>
+
+    <p>
+        This data can be processed using the following schema:
+    </p>
+
+    <code>
+            <pre>
+                {
+                  "namespace": "nifi",
+                  "name": "test",
+                  "type": "record",
+                  "fields": [
+                    { "name": "map_field", "type":
+                      { "type": "map", "items": string }
+                    },
+                    { "name": "simple_field", "type": "string" }
+                  ]
+                }
+            </pre>
+    </code>
+
+    </body>
+</html>

http://git-wip-us.apache.org/repos/asf/nifi/blob/d21bd387/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLReader.java
new file mode 100755
index 0000000..e1c767e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLReader.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.xml;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+import static junit.framework.TestCase.assertEquals;
+
+public class TestXMLReader {
+
+    private XMLReader reader;
+
+    private final String ATTRIBUTE_PREFIX = "attribute_prefix";
+    private final String CONTENT_NAME = "content_field";
+    private final String EVALUATE_IS_ARRAY = "xml.stream.is.array";
+
+    public TestRunner setup(String filePath) throws InitializationException, 
IOException {
+
+        TestRunner runner = 
TestRunners.newTestRunner(TestXMLReaderProcessor.class);
+        reader = new XMLReader();
+        runner.addControllerService("xml_reader", reader);
+        runner.setProperty(TestXMLReaderProcessor.XML_READER, "xml_reader");
+
+        final String outputSchemaText = new 
String(Files.readAllBytes(Paths.get(filePath)));
+        runner.setProperty(reader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, 
SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(reader, SchemaAccessUtils.SCHEMA_TEXT, 
outputSchemaText);
+
+        return runner;
+    }
+
+    @Test
+    public void testRecordFormat() throws IOException, InitializationException 
{
+        TestRunner runner = setup("src/test/resources/xml/testschema");
+
+        runner.setProperty(reader, XMLReader.RECORD_FORMAT, 
XMLReader.RECORD_EVALUATE);
+
+        runner.enableControllerService(reader);
+
+        InputStream is = new 
FileInputStream("src/test/resources/xml/people.xml");
+        runner.enqueue(is, new HashMap<String,String>() {{
+            put(EVALUATE_IS_ARRAY, "true");
+        }});
+        runner.run();
+
+        List<MockFlowFile> flowFile = 
runner.getFlowFilesForRelationship(TestXMLReaderProcessor.SUCCESS);
+        List<String> records = Arrays.asList((new 
String(runner.getContentAsByteArray(flowFile.get(0)))).split("\n"));
+
+        assertEquals(4, records.size());
+    }
+
+    @Test
+    public void testRecordFormat2() throws IOException, 
InitializationException {
+        TestRunner runner = setup("src/test/resources/xml/testschema");
+
+        runner.setProperty(reader, XMLReader.RECORD_FORMAT, 
XMLReader.RECORD_ARRAY);
+
+        runner.enableControllerService(reader);
+
+        InputStream is = new 
FileInputStream("src/test/resources/xml/people.xml");
+        runner.enqueue(is, new HashMap<String,String>() {{
+            put(EVALUATE_IS_ARRAY, "true");
+        }});
+        runner.run();
+
+        List<MockFlowFile> flowFile = 
runner.getFlowFilesForRelationship(TestXMLReaderProcessor.SUCCESS);
+        List<String> records = Arrays.asList((new 
String(runner.getContentAsByteArray(flowFile.get(0)))).split("\n"));
+
+        assertEquals(4, records.size());
+    }
+
+    @Test
+    public void testRecordFormat3() throws IOException, 
InitializationException {
+        TestRunner runner = setup("src/test/resources/xml/testschema");
+
+        runner.setProperty(reader, XMLReader.RECORD_FORMAT, 
XMLReader.RECORD_SINGLE);
+
+        runner.enableControllerService(reader);
+
+        InputStream is = new 
FileInputStream("src/test/resources/xml/person.xml");
+        runner.enqueue(is, new HashMap<String,String>() {{
+            put(EVALUATE_IS_ARRAY, "true");
+        }});
+        runner.run();
+
+        List<MockFlowFile> flowFile = 
runner.getFlowFilesForRelationship(TestXMLReaderProcessor.SUCCESS);
+        List<String> records = Arrays.asList((new 
String(runner.getContentAsByteArray(flowFile.get(0)))).split("\n"));
+
+        assertEquals(1, records.size());
+    }
+
+    @Test
+    public void testAttributePrefix() throws IOException, 
InitializationException {
+        TestRunner runner = setup("src/test/resources/xml/testschema");
+
+        runner.setProperty(reader, XMLReader.ATTRIBUTE_PREFIX, "${" + 
ATTRIBUTE_PREFIX + "}");
+        runner.setProperty(reader, XMLReader.RECORD_FORMAT, 
XMLReader.RECORD_ARRAY);
+
+        runner.enableControllerService(reader);
+
+        InputStream is = new 
FileInputStream("src/test/resources/xml/people.xml");
+        runner.enqueue(is, new HashMap<String,String>() {{
+            put(ATTRIBUTE_PREFIX, "ATTR_");
+        }});
+        runner.run();
+
+        List<MockFlowFile> flowFile = 
runner.getFlowFilesForRelationship(TestXMLReaderProcessor.SUCCESS);
+        List<String> records = Arrays.asList((new 
String(runner.getContentAsByteArray(flowFile.get(0)))).split("\n"));
+
+        assertEquals(4, records.size());
+        assertEquals("MapRecord[{COUNTRY=USA, ATTR_ID=P1, NAME=Cleve Butler, 
AGE=42}]", records.get(0));
+        assertEquals("MapRecord[{COUNTRY=UK, ATTR_ID=P2, NAME=Ainslie 
Fletcher, AGE=33}]", records.get(1));
+        assertEquals("MapRecord[{COUNTRY=FR, ATTR_ID=P3, NAME=Amélie Bonfils, 
AGE=74}]", records.get(2));
+        assertEquals("MapRecord[{COUNTRY=USA, ATTR_ID=P4, NAME=Elenora 
Scrivens, AGE=16}]", records.get(3));
+    }
+
+    @Test
+    public void testContentField() throws IOException, InitializationException 
{
+        TestRunner runner = setup("src/test/resources/xml/testschema2");
+
+        runner.setProperty(reader, XMLReader.CONTENT_FIELD_NAME, "${" + 
CONTENT_NAME + "}");
+        runner.setProperty(reader, XMLReader.RECORD_FORMAT, 
XMLReader.RECORD_ARRAY);
+
+        runner.enableControllerService(reader);
+
+        InputStream is = new 
FileInputStream("src/test/resources/xml/people_tag_in_characters.xml");
+        runner.enqueue(is, new HashMap<String,String>() {{
+            put(CONTENT_NAME, "CONTENT");
+        }});
+        runner.run();
+
+        List<MockFlowFile> flowFile = 
runner.getFlowFilesForRelationship(TestXMLReaderProcessor.SUCCESS);
+        List<String> records = Arrays.asList((new 
String(runner.getContentAsByteArray(flowFile.get(0)))).split("\n"));
+
+        assertEquals(5, records.size());
+        assertEquals("MapRecord[{ID=P1, NAME=MapRecord[{CONTENT=Cleve Butler, 
ATTR=attr content, INNER=inner content}], AGE=42}]", records.get(0));
+        assertEquals("MapRecord[{ID=P2, NAME=MapRecord[{CONTENT=Ainslie 
Fletcher, ATTR=attr content, INNER=inner content}], AGE=33}]", records.get(1));
+        assertEquals("MapRecord[{ID=P3, NAME=MapRecord[{CONTENT=Amélie 
Bonfils, ATTR=attr content, INNER=inner content}], AGE=74}]", records.get(2));
+        assertEquals("MapRecord[{ID=P4, NAME=MapRecord[{CONTENT=Elenora 
Scrivens, ATTR=attr content, INNER=inner content}], AGE=16}]", records.get(3));
+        assertEquals("MapRecord[{ID=P5, NAME=MapRecord[{INNER=inner 
content}]}]", records.get(4));
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/d21bd387/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLReaderProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLReaderProcessor.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLReaderProcessor.java
new file mode 100755
index 0000000..b4ee4ab
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLReaderProcessor.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.xml;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class TestXMLReaderProcessor extends AbstractProcessor {
+
+    static final PropertyDescriptor XML_READER = new 
PropertyDescriptor.Builder()
+            .name("xml_reader")
+            .description("xml_reader")
+            .identifiesControllerService(XMLReader.class)
+            .required(true)
+            .build();
+
+    public static final Relationship SUCCESS = new 
Relationship.Builder().name("success").description("success").build();
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        final RecordReaderFactory readerFactory = 
context.getProperty(XML_READER).asControllerService(RecordReaderFactory.class);
+
+        final List<String> records = new ArrayList<>();
+
+        try (final InputStream in = session.read(flowFile);
+             final RecordReader reader = 
readerFactory.createRecordReader(flowFile, in, getLogger())) {
+            Record record;
+            while ((record = reader.nextRecord()) != null) {
+                records.add(record.toString());
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+        flowFile = session.write(flowFile, (out) -> 
out.write(StringUtils.join(records, "\n").getBytes()));
+        session.transfer(flowFile, SUCCESS);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return new ArrayList<PropertyDescriptor>() {{ add(XML_READER); }};
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return new HashSet<Relationship>() {{ add(SUCCESS); }};
+    }
+}

Reply via email to