Github user JohannesDaniel commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2587#discussion_r181653767
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordReader.java
 ---
    @@ -0,0 +1,502 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.xml;
    +
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.serialization.MalformedRecordException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.DataType;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.type.ArrayDataType;
    +import org.apache.nifi.serialization.record.type.RecordDataType;
    +import org.apache.nifi.serialization.record.util.DataTypeUtils;
    +
    +import javax.xml.stream.XMLEventReader;
    +import javax.xml.stream.XMLInputFactory;
    +import javax.xml.stream.XMLStreamException;
    +import javax.xml.stream.events.Attribute;
    +import javax.xml.stream.events.Characters;
    +import javax.xml.stream.events.StartElement;
    +import javax.xml.stream.events.XMLEvent;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.text.DateFormat;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.function.Supplier;
    +
    +public class XMLRecordReader implements RecordReader {
    +
    +    private final ComponentLog logger;
    +    private final RecordSchema schema;
    +    private final String recordName;
    +    private final String attributePrefix;
    +    private final String contentFieldName;
    +
    +    // thread safety required?
    +    private StartElement currentRecordStartTag;
    +
    +    private final XMLEventReader xmlEventReader;
    +
    +    private final Supplier<DateFormat> LAZY_DATE_FORMAT;
    +    private final Supplier<DateFormat> LAZY_TIME_FORMAT;
    +    private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT;
    +
    +    public XMLRecordReader(InputStream in, RecordSchema schema, String 
rootName, String recordName, String attributePrefix, String contentFieldName,
    +                           final String dateFormat, final String 
timeFormat, final String timestampFormat, final ComponentLog logger) throws 
MalformedRecordException {
    +        this.schema = schema;
    +        this.recordName = recordName;
    +        this.attributePrefix = attributePrefix;
    +        this.contentFieldName = contentFieldName;
    +        this.logger = logger;
    +
    +        final DateFormat df = dateFormat == null ? null : 
DataTypeUtils.getDateFormat(dateFormat);
    +        final DateFormat tf = timeFormat == null ? null : 
DataTypeUtils.getDateFormat(timeFormat);
    +        final DateFormat tsf = timestampFormat == null ? null : 
DataTypeUtils.getDateFormat(timestampFormat);
    +
    +        LAZY_DATE_FORMAT = () -> df;
    +        LAZY_TIME_FORMAT = () -> tf;
    +        LAZY_TIMESTAMP_FORMAT = () -> tsf;
    +
    +        try {
    +            final XMLInputFactory xmlInputFactory = 
XMLInputFactory.newInstance();
    +
    +            // Avoid namespace replacements
    +            
xmlInputFactory.setProperty(XMLInputFactory.IS_NAMESPACE_AWARE, false);
    +
    +            xmlEventReader = xmlInputFactory.createXMLEventReader(in);
    +            final StartElement rootTag = getNextStartTag();
    +
    +            // root tag validation
    +            if (rootName != null && 
!rootName.equals(rootTag.getName().toString())) {
    +                final StringBuffer message = new StringBuffer();
    +                message.append("Name of root tag \"")
    +                        .append(rootTag.getName().toString())
    +                        .append("\" does not match root tag validation \"")
    +                        .append(rootName)
    +                        .append("\".");
    +                throw new MalformedRecordException(message.toString());
    +            }
    +            setNextRecordStartTag();
    +        } catch (XMLStreamException e) {
    +            throw new MalformedRecordException("Could not parse XML", e);
    +        }
    +    }
    +
    +    private StartElement getNextStartTag() throws XMLStreamException {
    +        while (xmlEventReader.hasNext()) {
    +            final XMLEvent xmlEvent = xmlEventReader.nextEvent();
    +            if (xmlEvent.isStartElement()) {
    +                return xmlEvent.asStartElement();
    +            }
    +        }
    +        return null;
    +    }
    +
    +    private void setNextRecordStartTag() throws XMLStreamException {
    +        while (xmlEventReader.hasNext()) {
    +            final XMLEvent xmlEvent = xmlEventReader.nextEvent();
    +            if (xmlEvent.isStartElement()) {
    +                final StartElement startElement = 
xmlEvent.asStartElement();
    +                if (recordName != null) {
    +                    if 
(startElement.getName().toString().equals(recordName)) {
    +                        currentRecordStartTag = startElement;
    +                        return;
    +                    } else {
    +                        logger.debug("Mismatch between expected record tag 
name {} and actual tag name in XML {}. " +
    +                                "Record will be skipped", new Object[] 
{recordName, startElement.getName().toString()});
    +                        skipElement();
    +                    }
    +                } else {
    +                    currentRecordStartTag = startElement;
    +                    return;
    +                }
    +            }
    +        }
    +        currentRecordStartTag = null;
    +    }
    +
    +    @Override
    +    public Record nextRecord(final boolean coerceTypes, final boolean 
dropUnknownFields) throws IOException, MalformedRecordException {
    +        if (currentRecordStartTag == null) {
    +            return null;
    +        }
    +        try {
    +            final Record record = parseRecord(currentRecordStartTag, 
this.schema, coerceTypes, dropUnknownFields);
    +            setNextRecordStartTag();
    +            if (record != null) {
    +                return record;
    +            } else {
    +                return new MapRecord(this.schema, Collections.EMPTY_MAP);
    +            }
    +        } catch (XMLStreamException e) {
    +            throw new MalformedRecordException("Could not parse XML", e);
    +        }
    +    }
    +
    +    private Object parseFieldForType(StartElement startElement, String 
fieldName, DataType dataType, Map<String, Object> recordValues) throws 
XMLStreamException, MalformedRecordException {
    +        switch (dataType.getFieldType()) {
    +            case BOOLEAN:
    +            case BYTE:
    +            case CHAR:
    +            case DOUBLE:
    +            case FLOAT:
    +            case INT:
    +            case LONG:
    +            case SHORT:
    +            case STRING:
    +            case DATE:
    +            case TIME:
    +            case TIMESTAMP: {
    +                XMLEvent xmlEvent = xmlEventReader.nextEvent();
    +                if (xmlEvent.isCharacters()) {
    +                    final Characters characters = xmlEvent.asCharacters();
    +                    if (!characters.isWhiteSpace()) {
    +                        xmlEventReader.nextEvent();
    +                        return 
DataTypeUtils.convertType(characters.toString(), dataType, LAZY_DATE_FORMAT, 
LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName);
    +                    } else {
    +                        xmlEvent = xmlEventReader.nextEvent();
    +                    }
    +                }
    +                if (xmlEvent.isEndElement()) {
    +                    return null;
    +                } else if (xmlEvent.isStartElement()) {
    +                    final String message = "Error parsing XML. Either the 
XML is invalid or there is a mismatch between schema type definitions and XML 
structure.";
    +                    throw new MalformedRecordException(message);
    +                }
    +            }
    +            case ARRAY: {
    +                final DataType arrayDataType = ((ArrayDataType) 
dataType).getElementType();
    +
    +                final Object newValue = parseFieldForType(startElement, 
fieldName, arrayDataType, recordValues);
    +                final Object oldValues = recordValues.get(fieldName);
    +
    +                if (newValue != null) {
    +                    if (oldValues != null) {
    +                        if (oldValues instanceof List) {
    +                            ((List) oldValues).add(newValue);
    +                        } else {
    +                            return new ArrayList<Object>(){{ 
add(oldValues); add(newValue); }};
    +                        }
    +                    } else {
    +                        return new ArrayList<Object>(){{ add(newValue); }};
    +                    }
    +                } else {
    +                    return null;
    +                }
    +            }
    +            case RECORD: {
    +                final RecordSchema childSchema;
    +                if (dataType instanceof RecordDataType) {
    +                    childSchema = ((RecordDataType) 
dataType).getChildSchema();
    +                } else {
    +                    return null;
    +                }
    +                return parseRecord(startElement, childSchema, true, true);
    +            }
    +            case MAP: {
    +                logger.warn("Type map is not supported by this record 
reader. Field will be skipped.");
    +                skipElement();
    +                return null;
    +            }
    +            case CHOICE: {
    +                return parseUnknownField(startElement);
    +            }
    +        }
    +        return null;
    +    }
    +
    +    private Object parseUnknownField(StartElement startElement) throws 
XMLStreamException {
    +        // parse attributes
    +        final Map<String, Object> recordValues = new HashMap<>();
    +        final Iterator iterator = startElement.getAttributes();
    +        while (iterator.hasNext()) {
    +            final Attribute attribute = (Attribute) iterator.next();
    +            final String attributeName = attribute.getName().toString();
    +            recordValues.put(attributePrefix == null ? attributeName : 
attributePrefix + attributeName, attribute.getValue());
    +        }
    +        boolean hasAttributes = recordValues.size() > 0;
    +
    +        // parse fields
    +        while (xmlEventReader.hasNext()) {
    +            final XMLEvent xmlEvent = xmlEventReader.nextEvent();
    +            if (xmlEvent.isCharacters()) {
    +                final Characters characters = xmlEvent.asCharacters();
    +                if (!characters.isWhiteSpace()) {
    +                    xmlEventReader.nextEvent();
    +                    if (hasAttributes) {
    +                        if (contentFieldName != null) {
    +                            recordValues.put(contentFieldName, 
characters.toString());
    +                        } else {
    +                            logger.debug("Found content for field that has 
to be parsed as record but property \"Field Name for Content\" is not set. " +
    --- End diff --
    
    But what if the xml has the following structure?
    
    ```
    <person>
      <location location="NONE">1.469, -3.875</location>
    </person>
    ```
    
    then the will replace the attribute as long as location ist not defined as 
an array...


---

Reply via email to