NIFI-4185: Add XML Record Reader
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d21bd387 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d21bd387 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d21bd387 Branch: refs/heads/master Commit: d21bd3870b1edd107bc6ef12bc5f12820a10bebb Parents: 8e4aa6b Author: JohannesDaniel <[email protected]> Authored: Wed Mar 7 00:02:43 2018 +0100 Committer: Mark Payne <[email protected]> Committed: Mon Apr 23 14:39:18 2018 -0400 ---------------------------------------------------------------------- .../nifi-record-serialization-services/pom.xml | 20 + .../java/org/apache/nifi/xml/XMLReader.java | 129 ++ .../org/apache/nifi/xml/XMLRecordReader.java | 568 +++++++ ...org.apache.nifi.controller.ControllerService | 4 +- .../additionalDetails.html | 433 ++++++ .../java/org/apache/nifi/xml/TestXMLReader.java | 170 +++ .../apache/nifi/xml/TestXMLReaderProcessor.java | 79 + .../apache/nifi/xml/TestXMLRecordReader.java | 1436 ++++++++++++++++++ .../src/test/resources/xml/people.xml | 22 + .../src/test/resources/xml/people2.xml | 12 + .../src/test/resources/xml/people3.xml | 12 + .../src/test/resources/xml/people_array.xml | 37 + .../test/resources/xml/people_array_simple.xml | 28 + .../src/test/resources/xml/people_cdata.xml | 22 + .../src/test/resources/xml/people_complex1.xml | 33 + .../src/test/resources/xml/people_complex2.xml | 73 + .../src/test/resources/xml/people_empty.xml | 12 + .../src/test/resources/xml/people_invalid.xml | 21 + .../src/test/resources/xml/people_map.xml | 18 + .../src/test/resources/xml/people_map2.xml | 32 + .../src/test/resources/xml/people_namespace.xml | 22 + .../src/test/resources/xml/people_nested.xml | 38 + .../test/resources/xml/people_no_attributes.xml | 22 + .../resources/xml/people_tag_in_characters.xml | 23 + .../xml/people_with_header_and_comments.xml | 29 + .../src/test/resources/xml/person.xml | 5 + .../src/test/resources/xml/testschema | 11 + .../src/test/resources/xml/testschema2 | 19 + 28 files changed, 3329 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/d21bd387/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml old mode 100644 new mode 100755 index 4c958f5..4d47701 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml @@ -122,6 +122,26 @@ <exclude>src/test/resources/json/single-element-nested-array.json</exclude> <exclude>src/test/resources/json/single-element-nested.json</exclude> <exclude>src/test/resources/json/output/dataTypes.json</exclude> + <exclude>src/test/resources/xml/people.xml</exclude> + <exclude>src/test/resources/xml/people2.xml</exclude> + <exclude>src/test/resources/xml/people3.xml</exclude> + <exclude>src/test/resources/xml/people_array.xml</exclude> + <exclude>src/test/resources/xml/people_array_simple.xml</exclude> + <exclude>src/test/resources/xml/people_cdata.xml</exclude> + <exclude>src/test/resources/xml/people_complex1.xml</exclude> + <exclude>src/test/resources/xml/people_complex2.xml</exclude> + <exclude>src/test/resources/xml/people_empty.xml</exclude> + <exclude>src/test/resources/xml/people_invalid.xml</exclude> + <exclude>src/test/resources/xml/people_map.xml</exclude> + <exclude>src/test/resources/xml/people_map2.xml</exclude> + <exclude>src/test/resources/xml/people_namespace.xml</exclude> + <exclude>src/test/resources/xml/people_nested.xml</exclude> + <exclude>src/test/resources/xml/people_no_attributes.xml</exclude> + <exclude>src/test/resources/xml/people_tag_in_characters.xml</exclude> + <exclude>src/test/resources/xml/people_with_header_and_comments.xml</exclude> + <exclude>src/test/resources/xml/person.xml</exclude> + <exclude>src/test/resources/xml/testschema</exclude> + <exclude>src/test/resources/xml/testschema2</exclude> </excludes> </configuration> </plugin> http://git-wip-us.apache.org/repos/asf/nifi/blob/d21bd387/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLReader.java new file mode 100755 index 0000000..73ee75d --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLReader.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.xml; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.DateTimeUtils; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.SchemaRegistryService; +import org.apache.nifi.serialization.record.RecordSchema; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +@Tags({"xml", "record", "reader", "parser"}) +@CapabilityDescription("Reads XML content and creates Record objects. Records are expected in the second level of " + + "XML data, embedded in an enclosing root tag.") +public class XMLReader extends SchemaRegistryService implements RecordReaderFactory { + + public static final AllowableValue RECORD_SINGLE = new AllowableValue("false"); + public static final AllowableValue RECORD_ARRAY = new AllowableValue("true"); + public static final AllowableValue RECORD_EVALUATE = new AllowableValue("${xml.stream.is.array}","Use attribute xml.stream.is.array"); + + public static final PropertyDescriptor RECORD_FORMAT = new PropertyDescriptor.Builder() + .name("record_format") + .displayName("Expect Records as Array") + .description("This property defines whether the reader expects a single record an array of records. If the property is " + + "set to \"true\", the reader expects an array of records and the outer element of the XML will be treated as a " + + "wrapper for the records. If the property is set to \"false\", the reader expects a single record for each FlowFile " + + "(without wrapper-element). If the property is set to \"Use attribute xml.stream.is.array\", the attribute " + + "\"xml.stream.is.array\" will be evaluated for each FlowFile whether to treat its content as array " + + "of records (in the case of \"true\") or as single record (in the case of \"false\".") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .allowableValues(RECORD_SINGLE, RECORD_ARRAY, RECORD_EVALUATE) + .defaultValue(RECORD_SINGLE.getValue()) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(false) + .build(); + + public static final PropertyDescriptor ATTRIBUTE_PREFIX = new PropertyDescriptor.Builder() + .name("attribute_prefix") + .displayName("Attribute Prefix") + .description("If this property is set, the name of attributes will be prepended with a prefix when they are added to a record.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(false) + .build(); + + public static final PropertyDescriptor CONTENT_FIELD_NAME = new PropertyDescriptor.Builder() + .name("content_field_name") + .displayName("Field Name for Content") + .description("If tags with content (e. g. <field>content</field>) are defined as nested records in the schema, " + + "the name of the tag will be used as name for the record and the value of this property will be used as name for the field. " + + "If tags with content shall be parsed together with attributes (e. g. <field attribute=\"123\">content</field>), " + + "they have to be defined as records. For additional information, see the section of processor usage.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(false) + .build(); + + private volatile String dateFormat; + private volatile String timeFormat; + private volatile String timestampFormat; + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + this.dateFormat = context.getProperty(DateTimeUtils.DATE_FORMAT).getValue(); + this.timeFormat = context.getProperty(DateTimeUtils.TIME_FORMAT).getValue(); + this.timestampFormat = context.getProperty(DateTimeUtils.TIMESTAMP_FORMAT).getValue(); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); + properties.add(RECORD_FORMAT); + properties.add(ATTRIBUTE_PREFIX); + properties.add(CONTENT_FIELD_NAME); + properties.add(DateTimeUtils.DATE_FORMAT); + properties.add(DateTimeUtils.TIME_FORMAT); + properties.add(DateTimeUtils.TIMESTAMP_FORMAT); + return properties; + } + + @Override + public RecordReader createRecordReader(final Map<String, String> variables, final InputStream in, final ComponentLog logger) + throws IOException, SchemaNotFoundException, MalformedRecordException { + final ConfigurationContext context = getConfigurationContext(); + + final RecordSchema schema = getSchema(variables, in, null); + + final String attributePrefix = context.getProperty(ATTRIBUTE_PREFIX).isSet() + ? context.getProperty(ATTRIBUTE_PREFIX).evaluateAttributeExpressions(variables).getValue().trim() : null; + + final String contentFieldName = context.getProperty(CONTENT_FIELD_NAME).isSet() + ? context.getProperty(CONTENT_FIELD_NAME).evaluateAttributeExpressions(variables).getValue().trim() : null; + + final boolean isArray = Boolean.parseBoolean(context.getProperty(RECORD_FORMAT).evaluateAttributeExpressions(variables).getValue()); + + return new XMLRecordReader(in, schema, isArray, attributePrefix, contentFieldName, dateFormat, timeFormat, timestampFormat, logger); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/d21bd387/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordReader.java new file mode 100755 index 0000000..e819b92 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordReader.java @@ -0,0 +1,568 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.xml; + +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.type.ArrayDataType; +import org.apache.nifi.serialization.record.type.MapDataType; +import org.apache.nifi.serialization.record.type.RecordDataType; +import org.apache.nifi.serialization.record.util.DataTypeUtils; +import org.apache.nifi.util.StringUtils; + +import javax.xml.stream.XMLEventReader; +import javax.xml.stream.XMLInputFactory; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.events.Attribute; +import javax.xml.stream.events.Characters; +import javax.xml.stream.events.StartElement; +import javax.xml.stream.events.XMLEvent; +import java.io.IOException; +import java.io.InputStream; +import java.text.DateFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Supplier; + +public class XMLRecordReader implements RecordReader { + + private final ComponentLog logger; + private final RecordSchema schema; + private final String attributePrefix; + private final String contentFieldName; + + private StartElement currentRecordStartTag; + + private final XMLEventReader xmlEventReader; + + private final Supplier<DateFormat> LAZY_DATE_FORMAT; + private final Supplier<DateFormat> LAZY_TIME_FORMAT; + private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT; + + public XMLRecordReader(InputStream in, RecordSchema schema, boolean isArray, String attributePrefix, String contentFieldName, + final String dateFormat, final String timeFormat, final String timestampFormat, final ComponentLog logger) throws MalformedRecordException { + this.schema = schema; + this.attributePrefix = attributePrefix; + this.contentFieldName = contentFieldName; + this.logger = logger; + + final DateFormat df = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat); + final DateFormat tf = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat); + final DateFormat tsf = timestampFormat == null ? null : DataTypeUtils.getDateFormat(timestampFormat); + + LAZY_DATE_FORMAT = () -> df; + LAZY_TIME_FORMAT = () -> tf; + LAZY_TIMESTAMP_FORMAT = () -> tsf; + + try { + final XMLInputFactory xmlInputFactory = XMLInputFactory.newInstance(); + + xmlEventReader = xmlInputFactory.createXMLEventReader(in); + + if (isArray) { + skipNextStartTag(); + } + + setNextRecordStartTag(); + } catch (XMLStreamException e) { + throw new MalformedRecordException("Could not parse XML", e); + } + } + + private void skipNextStartTag() throws XMLStreamException { + while (xmlEventReader.hasNext()) { + final XMLEvent xmlEvent = xmlEventReader.nextEvent(); + if (xmlEvent.isStartElement()) { + return; + } + } + } + + private void setNextRecordStartTag() throws XMLStreamException { + while (xmlEventReader.hasNext()) { + final XMLEvent xmlEvent = xmlEventReader.nextEvent(); + if (xmlEvent.isStartElement()) { + final StartElement startElement = xmlEvent.asStartElement(); + currentRecordStartTag = startElement; + return; + } + } + currentRecordStartTag = null; + } + + @Override + public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFields) throws IOException, MalformedRecordException { + if (currentRecordStartTag == null) { + return null; + } + try { + final Record record = parseRecord(currentRecordStartTag, this.schema, coerceTypes, dropUnknownFields); + setNextRecordStartTag(); + if (record != null) { + return record; + } else { + return new MapRecord(this.schema, Collections.EMPTY_MAP); + } + } catch (XMLStreamException e) { + throw new MalformedRecordException("Could not parse XML", e); + } + } + + private Object parseFieldForType(StartElement startElement, String fieldName, DataType dataType, Map<String, Object> recordValues, + boolean dropUnknown) throws XMLStreamException, MalformedRecordException { + switch (dataType.getFieldType()) { + case BOOLEAN: + case BYTE: + case CHAR: + case DOUBLE: + case FLOAT: + case INT: + case LONG: + case SHORT: + case STRING: + case DATE: + case TIME: + case TIMESTAMP: { + + StringBuilder content = new StringBuilder(); + + while (xmlEventReader.hasNext()) { + XMLEvent xmlEvent = xmlEventReader.nextEvent(); + if (xmlEvent.isCharacters()) { + final Characters characters = xmlEvent.asCharacters(); + if (!characters.isWhiteSpace()) { + content.append(characters.getData()); + } + } else if (xmlEvent.isEndElement()) { + final String contentToReturn = content.toString(); + + if (!StringUtils.isBlank(contentToReturn)) { + return DataTypeUtils.convertType(content.toString(), dataType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName); + } else { + return null; + } + + } else if (xmlEvent.isStartElement()) { + this.skipElement(); + } + } + break; + } + + case ARRAY: { + final DataType arrayDataType = ((ArrayDataType) dataType).getElementType(); + + final Object newValue = parseFieldForType(startElement, fieldName, arrayDataType, recordValues, dropUnknown); + final Object oldValues = recordValues.get(fieldName); + + if (newValue != null) { + if (oldValues != null) { + if (oldValues instanceof List) { + ((List) oldValues).add(newValue); + } else { + List<Object> arrayValues = new ArrayList<>(); + arrayValues.add(oldValues); + arrayValues.add(newValue); + return arrayValues; + } + } else { + List<Object> arrayValues = new ArrayList<>(); + arrayValues.add(newValue); + return arrayValues; + } + } + return oldValues; + } + + case RECORD: { + final RecordSchema childSchema; + if (dataType instanceof RecordDataType) { + childSchema = ((RecordDataType) dataType).getChildSchema(); + } else { + return null; + } + + return parseRecord(startElement, childSchema, true, dropUnknown); + } + + case MAP: { + final DataType mapDataType = ((MapDataType) dataType).getValueType(); + final Map<String,Object> embeddedMap = new HashMap<>(); + + while (xmlEventReader.hasNext()) { + XMLEvent xmlEvent = xmlEventReader.nextEvent(); + + if (xmlEvent.isStartElement()) { + final StartElement subStartElement = xmlEvent.asStartElement(); + final String subFieldName = subStartElement.getName().getLocalPart(); + + final Object mapValue = parseFieldForType(subStartElement, subFieldName, mapDataType, embeddedMap, dropUnknown); + embeddedMap.put(subFieldName, mapValue); + + } else if (xmlEvent.isEndElement()) { + break; + } + } + + if (embeddedMap.size() > 0) { + return embeddedMap; + } else { + return null; + } + } + case CHOICE: { + // field choice will parse the entire tree of a field + return parseUnknownField(startElement, false, null); + } + } + return null; + } + + private Object parseUnknownField(StartElement startElement, boolean dropUnknown, RecordSchema schema) throws XMLStreamException { + // parse attributes + final Map<String, Object> recordValues = new HashMap<>(); + final Iterator iterator = startElement.getAttributes(); + while (iterator.hasNext()) { + final Attribute attribute = (Attribute) iterator.next(); + final String attributeName = attribute.getName().toString(); + + if (dropUnknown) { + if (schema != null) { + final Optional<RecordField> field = schema.getField(attributeName); + if (field.isPresent()){ + recordValues.put(attributePrefix == null ? attributeName : attributePrefix + attributeName, attribute.getValue()); + } + } + } else { + recordValues.put(attributePrefix == null ? attributeName : attributePrefix + attributeName, attribute.getValue()); + } + } + + // parse fields + StringBuilder content = new StringBuilder(); + + while (xmlEventReader.hasNext()) { + final XMLEvent xmlEvent = xmlEventReader.nextEvent(); + if (xmlEvent.isCharacters()) { + final Characters characters = xmlEvent.asCharacters(); + if (!characters.isWhiteSpace()) { + content.append(characters.getData()); + } + } else if (xmlEvent.isStartElement()){ + final StartElement subStartElement = xmlEvent.asStartElement(); + final String subFieldName = subStartElement.getName().getLocalPart(); + + if (dropUnknown) { + if (schema != null) { + final Optional<RecordField> field = schema.getField(subFieldName); + if (field.isPresent()){ + + // subElements of subStartElement can only be known if there is a corresponding field in the schema defined as record + final DataType dataType = field.get().getDataType(); + RecordSchema childSchema = null; + + if (dataType instanceof RecordDataType) { + childSchema = ((RecordDataType) dataType).getChildSchema(); + } else if (dataType instanceof ArrayDataType) { + DataType typeOfArray = ((ArrayDataType) dataType).getElementType(); + if (typeOfArray instanceof RecordDataType) { + childSchema = ((RecordDataType) typeOfArray).getChildSchema(); + } + } + + final Object value = parseUnknownField(subStartElement, true, childSchema); + if (value != null) { + putUnknownTypeInMap(recordValues, subFieldName, value); + } + } else { + skipElement(); + } + } else { + skipElement(); + } + } else { + final Object value = parseUnknownField(subStartElement, dropUnknown, schema); + if (value != null) { + putUnknownTypeInMap(recordValues, subFieldName, value); + } + } + } else if (xmlEvent.isEndElement()) { + break; + } + } + + for (final Map.Entry<String,Object> entry : recordValues.entrySet()) { + if (entry.getValue() instanceof List) { + recordValues.put(entry.getKey(), ((List) entry.getValue()).toArray()); + } + } + + final boolean hasContent = content.length() > 0; + final boolean hasFields = recordValues.size() > 0; + + if (hasContent) { + if (!hasFields) { + return content.toString(); + } else { + if (contentFieldName != null) { + recordValues.put(contentFieldName, content.toString()); + } else { + logger.debug("Found content for field that has to be parsed as record but property \"Field Name for Content\" is not set. " + + "The content will not be added to the record."); + } + + return new MapRecord(new SimpleRecordSchema(Collections.emptyList()), recordValues); + } + } else { + if (hasFields) { + return new MapRecord(new SimpleRecordSchema(Collections.emptyList()), recordValues); + } else { + return null; + } + } + } + + private Record parseRecord(StartElement startElement, RecordSchema schema, boolean coerceTypes, boolean dropUnknown) throws XMLStreamException, MalformedRecordException { + final Map<String, Object> recordValues = new HashMap<>(); + + // parse attributes + final Iterator iterator = startElement.getAttributes(); + while (iterator.hasNext()) { + final Attribute attribute = (Attribute) iterator.next(); + final String attributeName = attribute.getName().toString(); + + final String targetFieldName = attributePrefix == null ? attributeName : attributePrefix + attributeName; + + if (dropUnknown) { + final Optional<RecordField> field = schema.getField(attributeName); + if (field.isPresent()){ + + // dropUnknown == true && coerceTypes == true + if (coerceTypes) { + final Object value; + final DataType dataType = field.get().getDataType(); + if ((value = parseStringForType(attribute.getValue(), attributeName, dataType)) != null) { + recordValues.put(targetFieldName, value); + } + + // dropUnknown == true && coerceTypes == false + } else { + recordValues.put(targetFieldName, attribute.getValue()); + } + } + } else { + + // dropUnknown == false && coerceTypes == true + if (coerceTypes) { + final Object value; + final Optional<RecordField> field = schema.getField(attributeName); + if (field.isPresent()){ + if ((value = parseStringForType(attribute.getValue(), attributeName, field.get().getDataType())) != null) { + recordValues.put(targetFieldName, value); + } + } else { + recordValues.put(targetFieldName, attribute.getValue()); + } + + // dropUnknown == false && coerceTypes == false + } else { + recordValues.put(targetFieldName, attribute.getValue()); + } + } + } + + // parse fields + StringBuilder content = new StringBuilder(); + while(xmlEventReader.hasNext()){ + final XMLEvent xmlEvent = xmlEventReader.nextEvent(); + + if (xmlEvent.isStartElement()) { + final StartElement subStartElement = xmlEvent.asStartElement(); + final String fieldName = subStartElement.getName().getLocalPart(); + + final Optional<RecordField> field = schema.getField(fieldName); + + if (dropUnknown) { + if (field.isPresent()) { + // dropUnknown == true && coerceTypes == true + if (coerceTypes) { + final Object value = parseFieldForType(subStartElement, fieldName, field.get().getDataType(), recordValues, true); + if (value != null) { + recordValues.put(fieldName, value); + } + + // dropUnknown == true && coerceTypes == false + // subElements of subStartElement can only be known if there is a corresponding field in the schema defined as record + } else { + final DataType dataType = field.get().getDataType(); + RecordSchema childSchema = null; + + if (dataType instanceof RecordDataType) { + childSchema = ((RecordDataType) dataType).getChildSchema(); + } else if (dataType instanceof ArrayDataType) { + DataType typeOfArray = ((ArrayDataType) dataType).getElementType(); + if (typeOfArray instanceof RecordDataType) { + childSchema = ((RecordDataType) typeOfArray).getChildSchema(); + } + } + + final Object value = parseUnknownField(subStartElement, true, childSchema); + if (value != null) { + putUnknownTypeInMap(recordValues, fieldName, value); + } + } + + } else { + skipElement(); + } + } else { + // dropUnknown == false && coerceTypes == true + if (coerceTypes) { + if (field.isPresent()) { + final Object value = parseFieldForType(subStartElement, fieldName, field.get().getDataType(), recordValues, false); + if (value != null) { + recordValues.put(fieldName, value); + } + } else { + final Object value = parseUnknownField(subStartElement, false, null); + if (value != null) { + putUnknownTypeInMap(recordValues, fieldName, value); + } + } + + // dropUnknown == false && coerceTypes == false + } else { + final Object value = parseUnknownField(subStartElement, false, null); + if (value != null) { + putUnknownTypeInMap(recordValues, fieldName, value); + } + } + } + } else if (xmlEvent.isEndElement()) { + break; + } else if (xmlEvent.isCharacters()) { + final Characters characters = xmlEvent.asCharacters(); + if (!characters.isWhiteSpace()) { + content.append(characters.getData()); + } + } + } + + if (content.length() > 0) { + if (contentFieldName != null) { + final Optional<RecordField> field = schema.getField(contentFieldName); + if (field.isPresent()) { + Object value = parseStringForType(content.toString(), contentFieldName, field.get().getDataType()); + recordValues.put(contentFieldName, value); + } + } else { + logger.debug("Found content for field that is defined as record but property \"Field Name for Content\" is not set. " + + "The content will not be added to record."); + } + } + + for (final Map.Entry<String,Object> entry : recordValues.entrySet()) { + if (entry.getValue() instanceof List) { + recordValues.put(entry.getKey(), ((List) entry.getValue()).toArray()); + } + } + + if (recordValues.size() > 0) { + return new MapRecord(schema, recordValues); + } else { + return null; + } + } + + private void putUnknownTypeInMap(Map<String, Object> values, String fieldName, Object fieldValue) { + final Object oldValues = values.get(fieldName); + + if (oldValues != null) { + if (oldValues instanceof List) { + ((List) oldValues).add(fieldValue); + } else { + List<Object> valuesToPut = new ArrayList<>(); + valuesToPut.add(oldValues); + valuesToPut.add(fieldValue); + + values.put(fieldName, valuesToPut); + } + } else { + values.put(fieldName, fieldValue); + } + } + + private Object parseStringForType(String data, String fieldName, DataType dataType) { + switch (dataType.getFieldType()) { + case BOOLEAN: + case BYTE: + case CHAR: + case DOUBLE: + case FLOAT: + case INT: + case LONG: + case SHORT: + case STRING: + case DATE: + case TIME: + case TIMESTAMP: { + return DataTypeUtils.convertType(data, dataType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName); + } + } + return null; + } + + private void skipElement() throws XMLStreamException { + while(xmlEventReader.hasNext()){ + final XMLEvent xmlEvent = xmlEventReader.nextEvent(); + + if (xmlEvent.isStartElement()) { + skipElement(); + } + if (xmlEvent.isEndElement()) { + return; + } + } + } + + @Override + public RecordSchema getSchema() { + return schema; + } + + @Override + public void close() throws IOException { + try { + xmlEventReader.close(); + } catch (XMLStreamException e) { + logger.error("Unable to close XMLEventReader"); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/d21bd387/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService old mode 100644 new mode 100755 index 628dbe5..76c89c2 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -25,4 +25,6 @@ org.apache.nifi.csv.CSVRecordSetWriter org.apache.nifi.grok.GrokReader -org.apache.nifi.text.FreeFormTextRecordSetWriter \ No newline at end of file +org.apache.nifi.text.FreeFormTextRecordSetWriter + +org.apache.nifi.xml.XMLReader \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/d21bd387/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.xml.XMLReader/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.xml.XMLReader/additionalDetails.html b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.xml.XMLReader/additionalDetails.html new file mode 100755 index 0000000..46cd259 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.xml.XMLReader/additionalDetails.html @@ -0,0 +1,433 @@ +<!DOCTYPE html> +<html lang="en"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <head> + <meta charset="utf-8"/> + <title>XMLReader</title> + <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/> + </head> + + <body> + <p> + The XMLReader Controller Service reads XML content and creates Record objects. The Controller Service + must be configured with a schema that describes the structure of the XML data. Fields in the XML data + that are not defined in the schema will be skipped. Depending on whether the property "Expect Records as Array" + is set to "false" or "true", the reader either expects a single record or an array of records for each FlowFile. + </p> + <p> + Example: Single record + </p> + <code> + <pre> + <record> + <field1>content</field1> + <field2>content</field2> + </record> + </pre> + </code> + + <p> + An array of records has to be enclosed by a root tag. + Example: Array of records + </p> + + <code> + <pre> + <root> + <record> + <field1>content</field1> + <field2>content</field2> + </record> + <record> + <field1>content</field1> + <field2>content</field2> + </record> + </root> + </pre> + </code> + + <h2>Example: Simple Fields</h2> + + <p> + The simplest kind of data within XML data are tags / fields only containing content (no attributes, no embedded tags). + They can be described in the schema by simple types (e. g. INT, STRING, ...). + </p> + + <code> + <pre> + <root> + <record> + <simple_field>content</simple_field> + </record> + </root> + </pre> + </code> + + <p> + This record can be described by a schema containing one field (e. g. of type string). By providing this schema, + the reader expects zero or one occurrences of "simple_field" in the record. + </p> + + <code> + <pre> + { + "namespace": "nifi", + "name": "test", + "type": "record", + "fields": [ + { "name": "simple_field", "type": "string" } + ] + } + </pre> + </code> + + <h2>Example: Arrays with Simple Fields</h2> + + <p> + Arrays are considered as repetitive tags / fields in XML data. For the following XML data, "array_field" is considered + to be an array enclosing simple fields, whereas "simple_field" is considered to be a simple field not enclosed in + an array. + </p> + + <code> + <pre> + <record> + <array_field>content</array_field> + <array_field>content</array_field> + <simple_field>content</simple_field> + </record> + </pre> + </code> + + <p> + This record can be described by the following schema: + </p> + + <code> + <pre> + { + "namespace": "nifi", + "name": "test", + "type": "record", + "fields": [ + { "name": "array_field", "type": + { "type": "array", "items": string } + }, + { "name": "simple_field", "type": "string" } + ] + } + </pre> + </code> + + <p> + If a field in a schema is embedded in an array, the reader expects zero, one or more occurrences of the field + in a record. The field "array_field" principally also could be defined as a simple field, but then the second occurrence + of this field would replace the first in the record object. Moreover, the field "simple_field" could also be defined + as an array. In this case, the reader would put it into the record object as an array with one element. + </p> + + <h2>Example: Tags with Attributes</h2> + + <p> + XML fields frequently not only contain content, but also attributes. The following record contains a field with + an attribute "attr" and content: + </p> + + <code> + <pre> + <record> + <field_with_attribute attr="attr_content">content of field</field_with_attribute> + </record> + </pre> + </code> + + <p> + To parse the content of the field "field_with_attribute" together with the attribute "attr", two requirements have + to be fulfilled: + </p> + + <ul> + <li>In the schema, the field has to be defined as record.</li> + <li>The property "Field Name for Content" has to be set.</li> + <li>As an option, the property "Attribute Prefix" also can be set.</li> + </ul> + + <p> + For the example above, the following property settings are assumed: + </p> + + <table> + <tr> + <th>Property Name</th> + <th>Property Value</th> + </tr> + <tr> + <td>Field Name for Content</td> + <td><code>field_name_for_content</code></td> + </tr> + <tr> + <td>Attribute Prefix</td> + <td><code>prefix_</code></td> + </tr> + </table> + + <p> + The schema can be defined as follows: + </p> + + <code> + <pre> + { + "name": "test", + "namespace": "nifi", + "type": "record", + "fields": [ + { + "name": "field_with_attribute", + "type": { + "name": "RecordForTag", + "type": "record", + "fields" : [ + {"name": "attr", "type": "string"}, + {"name": "field_name_for_content", "type": "string"} + ] + } + ] + } + </pre> + </code> + + <p> + Note that the field "field_name_for_content" not only has to be defined in the property section, but also in the + schema, whereas the prefix for attributes is not part of the schema. It will be appended when an attribute named + "attr" is found at the respective position in the XML data and added to the record. The record object of the above + example will be structured as follows: + </p> + + <code> + <pre> + Record ( + Record "field_with_attribute" ( + RecordField "prefix_attr" = "attr_content", + RecordField "field_name_for_content" = "content of field" + ) + ) + </pre> + </code> + + <p> + Principally, the field "field_with_attribute" could also be defined as a simple field. In this case, the attributes + simply would be ignored. Vice versa, the simple field in example 1 above could also be defined as a record (assuming that + the property "Field Name for Content" is set. + </p> + + <h2>Example: Tags within tags</h2> + + <p> + XML data is frequently nested. In this case, tags enclose other tags: + </p> + + <code> + <pre> + <record> + <field_with_embedded_fields attr="attr_content"> + <embedded_field>embedded content</embedded_field> + <another_embedded_field>another embedded content</another_embedded_field> + </field_with_embedded_fields> + </record> + </pre> + </code> + + <p> + The enclosing fields always have to be defined as records, irrespective whether they include attributes to be + parsed or not. In this example, the tag "field_with_embedded_fields" encloses the fields "embedded_field" and + "another_embedded_field", which are both simple fields. The schema can be defined as follows: + </p> + + <code> + <pre> + { + "name": "test", + "namespace": "nifi", + "type": "record", + "fields": [ + { + "name": "field_with_embedded_fields", + "type": { + "name": "RecordForEmbedded", + "type": "record", + "fields" : [ + {"name": "attr", "type": "string"}, + {"name": "embedded_field", "type": "string"}, + {"name": "another_embedded_field", "type": "string"} + ] + } + ] + } + </pre> + </code> + + <p> + Notice that this case does not require the property "Field Name for Content" to be set as this is only required + for tags containing attributes and content. + </p> + + <h2>Example: Array of records</h2> + + <p> + For further explanation of the logic of this reader, an example of an array of records shall be demonstrated. + The following record contains the field "array_field", which repeatedly occurs. The field contains two + embedded fields. + </p> + + <code> + <pre> + <record> + <array_field> + <embedded_field>embedded content 1</embedded_field> + <another_embedded_field>another embedded content 1</another_embedded_field> + </array_field> + <array_field> + <embedded_field>embedded content 2</embedded_field> + <another_embedded_field>another embedded content 2</another_embedded_field> + </array_field> + </record> + </pre> + </code> + + <p> + This XML data can be parsed similarly to the data in example 4. However, the record defined in the schema of + example 4 has to be embedded in an array. + </p> + + <code> + <pre> + { + "namespace": "nifi", + "name": "test", + "type": "record", + "fields": [ + { "name": "array_field", + "type": { + "type": "array", + "items": { + "name": "RecordInArray", + "type": "record", + "fields" : [ + {"name": "embedded_field", "type": "string"}, + {"name": "another_embedded_field", "type": "string"} + ] + } + } + } + ] + } + </pre> + </code> + + <h2>Example: Array in record</h2> + + <p> + In XML data, arrays are frequently enclosed by tags: + </p> + + <code> + <pre> + <record> + <field_enclosing_array> + <element>content 1</element> + <element>content 2</element> + </field_enclosing_array> + <field_without_array> content 3</field_without_array> + </record> + </pre> + </code> + + <p> + For the schema, embedded tags have to be described by records. Therefore, the field "field_enclosing_array" + is a record that embeds an array with elements of type string: + </p> + + <code> + <pre> + { + "namespace": "nifi", + "name": "test", + "type": "record", + "fields": [ + { "name": "field_enclosing_array", + "type": { + "name": "EmbeddedRecord", + "type": "record", + "fields" : [ + { + "name": "element", + "type": { + "type": "array", + "items": "string" + } + } + ] + } + }, + { "name": "field_without_array", "type": "string" } + ] + } + </pre> + </code> + + + <h2>Example: Maps</h2> + + <p> + A map is a field embedding fields with different names: + </p> + + <code> + <pre> + <record> + <map_field> + <field1>content</field1> + <field2>content</field2> + ... + </map_field> + <simple_field>content</simple_field> + </record> + </pre> + </code> + + <p> + This data can be processed using the following schema: + </p> + + <code> + <pre> + { + "namespace": "nifi", + "name": "test", + "type": "record", + "fields": [ + { "name": "map_field", "type": + { "type": "map", "items": string } + }, + { "name": "simple_field", "type": "string" } + ] + } + </pre> + </code> + + </body> +</html> http://git-wip-us.apache.org/repos/asf/nifi/blob/d21bd387/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLReader.java new file mode 100755 index 0000000..e1c767e --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLReader.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.xml; + +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.schema.access.SchemaAccessUtils; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; + +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; + +import static junit.framework.TestCase.assertEquals; + +public class TestXMLReader { + + private XMLReader reader; + + private final String ATTRIBUTE_PREFIX = "attribute_prefix"; + private final String CONTENT_NAME = "content_field"; + private final String EVALUATE_IS_ARRAY = "xml.stream.is.array"; + + public TestRunner setup(String filePath) throws InitializationException, IOException { + + TestRunner runner = TestRunners.newTestRunner(TestXMLReaderProcessor.class); + reader = new XMLReader(); + runner.addControllerService("xml_reader", reader); + runner.setProperty(TestXMLReaderProcessor.XML_READER, "xml_reader"); + + final String outputSchemaText = new String(Files.readAllBytes(Paths.get(filePath))); + runner.setProperty(reader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(reader, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); + + return runner; + } + + @Test + public void testRecordFormat() throws IOException, InitializationException { + TestRunner runner = setup("src/test/resources/xml/testschema"); + + runner.setProperty(reader, XMLReader.RECORD_FORMAT, XMLReader.RECORD_EVALUATE); + + runner.enableControllerService(reader); + + InputStream is = new FileInputStream("src/test/resources/xml/people.xml"); + runner.enqueue(is, new HashMap<String,String>() {{ + put(EVALUATE_IS_ARRAY, "true"); + }}); + runner.run(); + + List<MockFlowFile> flowFile = runner.getFlowFilesForRelationship(TestXMLReaderProcessor.SUCCESS); + List<String> records = Arrays.asList((new String(runner.getContentAsByteArray(flowFile.get(0)))).split("\n")); + + assertEquals(4, records.size()); + } + + @Test + public void testRecordFormat2() throws IOException, InitializationException { + TestRunner runner = setup("src/test/resources/xml/testschema"); + + runner.setProperty(reader, XMLReader.RECORD_FORMAT, XMLReader.RECORD_ARRAY); + + runner.enableControllerService(reader); + + InputStream is = new FileInputStream("src/test/resources/xml/people.xml"); + runner.enqueue(is, new HashMap<String,String>() {{ + put(EVALUATE_IS_ARRAY, "true"); + }}); + runner.run(); + + List<MockFlowFile> flowFile = runner.getFlowFilesForRelationship(TestXMLReaderProcessor.SUCCESS); + List<String> records = Arrays.asList((new String(runner.getContentAsByteArray(flowFile.get(0)))).split("\n")); + + assertEquals(4, records.size()); + } + + @Test + public void testRecordFormat3() throws IOException, InitializationException { + TestRunner runner = setup("src/test/resources/xml/testschema"); + + runner.setProperty(reader, XMLReader.RECORD_FORMAT, XMLReader.RECORD_SINGLE); + + runner.enableControllerService(reader); + + InputStream is = new FileInputStream("src/test/resources/xml/person.xml"); + runner.enqueue(is, new HashMap<String,String>() {{ + put(EVALUATE_IS_ARRAY, "true"); + }}); + runner.run(); + + List<MockFlowFile> flowFile = runner.getFlowFilesForRelationship(TestXMLReaderProcessor.SUCCESS); + List<String> records = Arrays.asList((new String(runner.getContentAsByteArray(flowFile.get(0)))).split("\n")); + + assertEquals(1, records.size()); + } + + @Test + public void testAttributePrefix() throws IOException, InitializationException { + TestRunner runner = setup("src/test/resources/xml/testschema"); + + runner.setProperty(reader, XMLReader.ATTRIBUTE_PREFIX, "${" + ATTRIBUTE_PREFIX + "}"); + runner.setProperty(reader, XMLReader.RECORD_FORMAT, XMLReader.RECORD_ARRAY); + + runner.enableControllerService(reader); + + InputStream is = new FileInputStream("src/test/resources/xml/people.xml"); + runner.enqueue(is, new HashMap<String,String>() {{ + put(ATTRIBUTE_PREFIX, "ATTR_"); + }}); + runner.run(); + + List<MockFlowFile> flowFile = runner.getFlowFilesForRelationship(TestXMLReaderProcessor.SUCCESS); + List<String> records = Arrays.asList((new String(runner.getContentAsByteArray(flowFile.get(0)))).split("\n")); + + assertEquals(4, records.size()); + assertEquals("MapRecord[{COUNTRY=USA, ATTR_ID=P1, NAME=Cleve Butler, AGE=42}]", records.get(0)); + assertEquals("MapRecord[{COUNTRY=UK, ATTR_ID=P2, NAME=Ainslie Fletcher, AGE=33}]", records.get(1)); + assertEquals("MapRecord[{COUNTRY=FR, ATTR_ID=P3, NAME=Amélie Bonfils, AGE=74}]", records.get(2)); + assertEquals("MapRecord[{COUNTRY=USA, ATTR_ID=P4, NAME=Elenora Scrivens, AGE=16}]", records.get(3)); + } + + @Test + public void testContentField() throws IOException, InitializationException { + TestRunner runner = setup("src/test/resources/xml/testschema2"); + + runner.setProperty(reader, XMLReader.CONTENT_FIELD_NAME, "${" + CONTENT_NAME + "}"); + runner.setProperty(reader, XMLReader.RECORD_FORMAT, XMLReader.RECORD_ARRAY); + + runner.enableControllerService(reader); + + InputStream is = new FileInputStream("src/test/resources/xml/people_tag_in_characters.xml"); + runner.enqueue(is, new HashMap<String,String>() {{ + put(CONTENT_NAME, "CONTENT"); + }}); + runner.run(); + + List<MockFlowFile> flowFile = runner.getFlowFilesForRelationship(TestXMLReaderProcessor.SUCCESS); + List<String> records = Arrays.asList((new String(runner.getContentAsByteArray(flowFile.get(0)))).split("\n")); + + assertEquals(5, records.size()); + assertEquals("MapRecord[{ID=P1, NAME=MapRecord[{CONTENT=Cleve Butler, ATTR=attr content, INNER=inner content}], AGE=42}]", records.get(0)); + assertEquals("MapRecord[{ID=P2, NAME=MapRecord[{CONTENT=Ainslie Fletcher, ATTR=attr content, INNER=inner content}], AGE=33}]", records.get(1)); + assertEquals("MapRecord[{ID=P3, NAME=MapRecord[{CONTENT=Amélie Bonfils, ATTR=attr content, INNER=inner content}], AGE=74}]", records.get(2)); + assertEquals("MapRecord[{ID=P4, NAME=MapRecord[{CONTENT=Elenora Scrivens, ATTR=attr content, INNER=inner content}], AGE=16}]", records.get(3)); + assertEquals("MapRecord[{ID=P5, NAME=MapRecord[{INNER=inner content}]}]", records.get(4)); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/d21bd387/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLReaderProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLReaderProcessor.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLReaderProcessor.java new file mode 100755 index 0000000..b4ee4ab --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLReaderProcessor.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.xml; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.util.StringUtils; + +import java.io.InputStream; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class TestXMLReaderProcessor extends AbstractProcessor { + + static final PropertyDescriptor XML_READER = new PropertyDescriptor.Builder() + .name("xml_reader") + .description("xml_reader") + .identifiesControllerService(XMLReader.class) + .required(true) + .build(); + + public static final Relationship SUCCESS = new Relationship.Builder().name("success").description("success").build(); + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + final RecordReaderFactory readerFactory = context.getProperty(XML_READER).asControllerService(RecordReaderFactory.class); + + final List<String> records = new ArrayList<>(); + + try (final InputStream in = session.read(flowFile); + final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) { + Record record; + while ((record = reader.nextRecord()) != null) { + records.add(record.toString()); + } + } catch (Exception e) { + e.printStackTrace(); + } + + flowFile = session.write(flowFile, (out) -> out.write(StringUtils.join(records, "\n").getBytes())); + session.transfer(flowFile, SUCCESS); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return new ArrayList<PropertyDescriptor>() {{ add(XML_READER); }}; + } + + @Override + public Set<Relationship> getRelationships() { + return new HashSet<Relationship>() {{ add(SUCCESS); }}; + } +}
