[ 
https://issues.apache.org/jira/browse/NIFI-4185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439880#comment-16439880
 ] 

ASF GitHub Bot commented on NIFI-4185:
--------------------------------------

Github user JohannesDaniel commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2587#discussion_r181851731
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordReader.java
 ---
    @@ -0,0 +1,502 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.xml;
    +
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.serialization.MalformedRecordException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.DataType;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.type.ArrayDataType;
    +import org.apache.nifi.serialization.record.type.RecordDataType;
    +import org.apache.nifi.serialization.record.util.DataTypeUtils;
    +
    +import javax.xml.stream.XMLEventReader;
    +import javax.xml.stream.XMLInputFactory;
    +import javax.xml.stream.XMLStreamException;
    +import javax.xml.stream.events.Attribute;
    +import javax.xml.stream.events.Characters;
    +import javax.xml.stream.events.StartElement;
    +import javax.xml.stream.events.XMLEvent;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.text.DateFormat;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.function.Supplier;
    +
    +public class XMLRecordReader implements RecordReader {
    +
    +    private final ComponentLog logger;
    +    private final RecordSchema schema;
    +    private final String recordName;
    +    private final String attributePrefix;
    +    private final String contentFieldName;
    +
    +    // thread safety required?
    +    private StartElement currentRecordStartTag;
    +
    +    private final XMLEventReader xmlEventReader;
    +
    +    private final Supplier<DateFormat> LAZY_DATE_FORMAT;
    +    private final Supplier<DateFormat> LAZY_TIME_FORMAT;
    +    private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT;
    +
    +    public XMLRecordReader(InputStream in, RecordSchema schema, String 
rootName, String recordName, String attributePrefix, String contentFieldName,
    +                           final String dateFormat, final String 
timeFormat, final String timestampFormat, final ComponentLog logger) throws 
MalformedRecordException {
    +        this.schema = schema;
    +        this.recordName = recordName;
    +        this.attributePrefix = attributePrefix;
    +        this.contentFieldName = contentFieldName;
    +        this.logger = logger;
    +
    +        final DateFormat df = dateFormat == null ? null : 
DataTypeUtils.getDateFormat(dateFormat);
    +        final DateFormat tf = timeFormat == null ? null : 
DataTypeUtils.getDateFormat(timeFormat);
    +        final DateFormat tsf = timestampFormat == null ? null : 
DataTypeUtils.getDateFormat(timestampFormat);
    +
    +        LAZY_DATE_FORMAT = () -> df;
    +        LAZY_TIME_FORMAT = () -> tf;
    +        LAZY_TIMESTAMP_FORMAT = () -> tsf;
    +
    +        try {
    +            final XMLInputFactory xmlInputFactory = 
XMLInputFactory.newInstance();
    +
    +            // Avoid namespace replacements
    +            
xmlInputFactory.setProperty(XMLInputFactory.IS_NAMESPACE_AWARE, false);
    +
    +            xmlEventReader = xmlInputFactory.createXMLEventReader(in);
    +            final StartElement rootTag = getNextStartTag();
    +
    +            // root tag validation
    +            if (rootName != null && 
!rootName.equals(rootTag.getName().toString())) {
    +                final StringBuffer message = new StringBuffer();
    +                message.append("Name of root tag \"")
    +                        .append(rootTag.getName().toString())
    +                        .append("\" does not match root tag validation \"")
    +                        .append(rootName)
    +                        .append("\".");
    +                throw new MalformedRecordException(message.toString());
    +            }
    +            setNextRecordStartTag();
    +        } catch (XMLStreamException e) {
    +            throw new MalformedRecordException("Could not parse XML", e);
    +        }
    +    }
    +
    +    private StartElement getNextStartTag() throws XMLStreamException {
    +        while (xmlEventReader.hasNext()) {
    +            final XMLEvent xmlEvent = xmlEventReader.nextEvent();
    +            if (xmlEvent.isStartElement()) {
    +                return xmlEvent.asStartElement();
    +            }
    +        }
    +        return null;
    +    }
    +
    +    private void setNextRecordStartTag() throws XMLStreamException {
    +        while (xmlEventReader.hasNext()) {
    +            final XMLEvent xmlEvent = xmlEventReader.nextEvent();
    +            if (xmlEvent.isStartElement()) {
    +                final StartElement startElement = 
xmlEvent.asStartElement();
    +                if (recordName != null) {
    +                    if 
(startElement.getName().toString().equals(recordName)) {
    +                        currentRecordStartTag = startElement;
    +                        return;
    +                    } else {
    +                        logger.debug("Mismatch between expected record tag 
name {} and actual tag name in XML {}. " +
    +                                "Record will be skipped", new Object[] 
{recordName, startElement.getName().toString()});
    +                        skipElement();
    +                    }
    +                } else {
    +                    currentRecordStartTag = startElement;
    +                    return;
    +                }
    +            }
    +        }
    +        currentRecordStartTag = null;
    +    }
    +
    +    @Override
    +    public Record nextRecord(final boolean coerceTypes, final boolean 
dropUnknownFields) throws IOException, MalformedRecordException {
    +        if (currentRecordStartTag == null) {
    +            return null;
    +        }
    +        try {
    +            final Record record = parseRecord(currentRecordStartTag, 
this.schema, coerceTypes, dropUnknownFields);
    +            setNextRecordStartTag();
    +            if (record != null) {
    +                return record;
    +            } else {
    +                return new MapRecord(this.schema, Collections.EMPTY_MAP);
    +            }
    +        } catch (XMLStreamException e) {
    +            throw new MalformedRecordException("Could not parse XML", e);
    +        }
    +    }
    +
    +    private Object parseFieldForType(StartElement startElement, String 
fieldName, DataType dataType, Map<String, Object> recordValues) throws 
XMLStreamException, MalformedRecordException {
    +        switch (dataType.getFieldType()) {
    +            case BOOLEAN:
    +            case BYTE:
    +            case CHAR:
    +            case DOUBLE:
    +            case FLOAT:
    +            case INT:
    +            case LONG:
    +            case SHORT:
    +            case STRING:
    +            case DATE:
    +            case TIME:
    +            case TIMESTAMP: {
    +                XMLEvent xmlEvent = xmlEventReader.nextEvent();
    +                if (xmlEvent.isCharacters()) {
    +                    final Characters characters = xmlEvent.asCharacters();
    +                    if (!characters.isWhiteSpace()) {
    +                        xmlEventReader.nextEvent();
    +                        return 
DataTypeUtils.convertType(characters.toString(), dataType, LAZY_DATE_FORMAT, 
LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName);
    +                    } else {
    +                        xmlEvent = xmlEventReader.nextEvent();
    +                    }
    +                }
    +                if (xmlEvent.isEndElement()) {
    +                    return null;
    +                } else if (xmlEvent.isStartElement()) {
    +                    final String message = "Error parsing XML. Either the 
XML is invalid or there is a mismatch between schema type definitions and XML 
structure.";
    +                    throw new MalformedRecordException(message);
    +                }
    +            }
    +            case ARRAY: {
    +                final DataType arrayDataType = ((ArrayDataType) 
dataType).getElementType();
    +
    +                final Object newValue = parseFieldForType(startElement, 
fieldName, arrayDataType, recordValues);
    +                final Object oldValues = recordValues.get(fieldName);
    +
    +                if (newValue != null) {
    +                    if (oldValues != null) {
    +                        if (oldValues instanceof List) {
    +                            ((List) oldValues).add(newValue);
    +                        } else {
    +                            return new ArrayList<Object>(){{ 
add(oldValues); add(newValue); }};
    +                        }
    +                    } else {
    +                        return new ArrayList<Object>(){{ add(newValue); }};
    +                    }
    +                } else {
    +                    return null;
    --- End diff --
    
    This already works. If null is returned, the outer loop doesn't do 
anything, so the field containing the list remains. However, I changed this to 
make the logic more obvious.


> Add XML record reader & writer services
> ---------------------------------------
>
>                 Key: NIFI-4185
>                 URL: https://issues.apache.org/jira/browse/NIFI-4185
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Extensions
>    Affects Versions: 1.3.0
>            Reporter: Andy LoPresto
>            Assignee: Johannes Peter
>            Priority: Major
>              Labels: json, records, xml
>
> With the addition of the {{RecordReader}} and {{RecordSetWriter}} paradigm, 
> XML conversion has not yet been targeted. This will replace the previous 
> ticket for XML to JSON conversion. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to