Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2587#discussion_r179831931 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordReader.java --- @@ -0,0 +1,502 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.xml; + +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.type.ArrayDataType; +import org.apache.nifi.serialization.record.type.RecordDataType; +import org.apache.nifi.serialization.record.util.DataTypeUtils; + +import javax.xml.stream.XMLEventReader; +import javax.xml.stream.XMLInputFactory; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.events.Attribute; +import javax.xml.stream.events.Characters; +import javax.xml.stream.events.StartElement; +import javax.xml.stream.events.XMLEvent; +import java.io.IOException; +import java.io.InputStream; +import java.text.DateFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Supplier; + +public class XMLRecordReader implements RecordReader { + + private final ComponentLog logger; + private final RecordSchema schema; + private final String recordName; + private final String attributePrefix; + private final String contentFieldName; + + // thread safety required? + private StartElement currentRecordStartTag; + + private final XMLEventReader xmlEventReader; + + private final Supplier<DateFormat> LAZY_DATE_FORMAT; + private final Supplier<DateFormat> LAZY_TIME_FORMAT; + private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT; + + public XMLRecordReader(InputStream in, RecordSchema schema, String rootName, String recordName, String attributePrefix, String contentFieldName, + final String dateFormat, final String timeFormat, final String timestampFormat, final ComponentLog logger) throws MalformedRecordException { + this.schema = schema; + this.recordName = recordName; + this.attributePrefix = attributePrefix; + this.contentFieldName = contentFieldName; + this.logger = logger; + + final DateFormat df = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat); + final DateFormat tf = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat); + final DateFormat tsf = timestampFormat == null ? null : DataTypeUtils.getDateFormat(timestampFormat); + + LAZY_DATE_FORMAT = () -> df; + LAZY_TIME_FORMAT = () -> tf; + LAZY_TIMESTAMP_FORMAT = () -> tsf; + + try { + final XMLInputFactory xmlInputFactory = XMLInputFactory.newInstance(); + + // Avoid namespace replacements + xmlInputFactory.setProperty(XMLInputFactory.IS_NAMESPACE_AWARE, false); + + xmlEventReader = xmlInputFactory.createXMLEventReader(in); + final StartElement rootTag = getNextStartTag(); + + // root tag validation + if (rootName != null && !rootName.equals(rootTag.getName().toString())) { + final StringBuffer message = new StringBuffer(); + message.append("Name of root tag \"") + .append(rootTag.getName().toString()) + .append("\" does not match root tag validation \"") + .append(rootName) + .append("\"."); + throw new MalformedRecordException(message.toString()); + } + setNextRecordStartTag(); + } catch (XMLStreamException e) { + throw new MalformedRecordException("Could not parse XML", e); + } + } + + private StartElement getNextStartTag() throws XMLStreamException { + while (xmlEventReader.hasNext()) { + final XMLEvent xmlEvent = xmlEventReader.nextEvent(); + if (xmlEvent.isStartElement()) { + return xmlEvent.asStartElement(); + } + } + return null; + } + + private void setNextRecordStartTag() throws XMLStreamException { + while (xmlEventReader.hasNext()) { + final XMLEvent xmlEvent = xmlEventReader.nextEvent(); + if (xmlEvent.isStartElement()) { + final StartElement startElement = xmlEvent.asStartElement(); + if (recordName != null) { + if (startElement.getName().toString().equals(recordName)) { + currentRecordStartTag = startElement; + return; + } else { + logger.debug("Mismatch between expected record tag name {} and actual tag name in XML {}. " + + "Record will be skipped", new Object[] {recordName, startElement.getName().toString()}); + skipElement(); + } + } else { + currentRecordStartTag = startElement; + return; + } + } + } + currentRecordStartTag = null; + } + + @Override + public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFields) throws IOException, MalformedRecordException { + if (currentRecordStartTag == null) { + return null; + } + try { + final Record record = parseRecord(currentRecordStartTag, this.schema, coerceTypes, dropUnknownFields); + setNextRecordStartTag(); + if (record != null) { + return record; + } else { + return new MapRecord(this.schema, Collections.EMPTY_MAP); + } + } catch (XMLStreamException e) { + throw new MalformedRecordException("Could not parse XML", e); + } + } + + private Object parseFieldForType(StartElement startElement, String fieldName, DataType dataType, Map<String, Object> recordValues) throws XMLStreamException, MalformedRecordException { + switch (dataType.getFieldType()) { + case BOOLEAN: + case BYTE: + case CHAR: + case DOUBLE: + case FLOAT: + case INT: + case LONG: + case SHORT: + case STRING: + case DATE: + case TIME: + case TIMESTAMP: { + XMLEvent xmlEvent = xmlEventReader.nextEvent(); + if (xmlEvent.isCharacters()) { + final Characters characters = xmlEvent.asCharacters(); + if (!characters.isWhiteSpace()) { + xmlEventReader.nextEvent(); + return DataTypeUtils.convertType(characters.toString(), dataType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName); + } else { + xmlEvent = xmlEventReader.nextEvent(); + } + } + if (xmlEvent.isEndElement()) { + return null; + } else if (xmlEvent.isStartElement()) { + final String message = "Error parsing XML. Either the XML is invalid or there is a mismatch between schema type definitions and XML structure."; + throw new MalformedRecordException(message); + } + } + case ARRAY: { + final DataType arrayDataType = ((ArrayDataType) dataType).getElementType(); + + final Object newValue = parseFieldForType(startElement, fieldName, arrayDataType, recordValues); + final Object oldValues = recordValues.get(fieldName); + + if (newValue != null) { + if (oldValues != null) { + if (oldValues instanceof List) { + ((List) oldValues).add(newValue); + } else { + return new ArrayList<Object>(){{ add(oldValues); add(newValue); }}; + } + } else { + return new ArrayList<Object>(){{ add(newValue); }}; + } + } else { + return null; + } + } + case RECORD: { + final RecordSchema childSchema; + if (dataType instanceof RecordDataType) { + childSchema = ((RecordDataType) dataType).getChildSchema(); + } else { + return null; + } + return parseRecord(startElement, childSchema, true, true); + } + case MAP: { + logger.warn("Type map is not supported by this record reader. Field will be skipped."); --- End diff -- Is there a reason that we don't want to support parsing MAP type? I think it's a viable option, such something like: ``` <myElement> <myMap> <key1>Hello</key1> <key2>Adios</key2> <another-key>74</another-key> </myMap> </myElement> ```
---