[ 
https://issues.apache.org/jira/browse/NIFI-2613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15929130#comment-15929130
 ] 

ASF GitHub Bot commented on NIFI-2613:
--------------------------------------

Github user jvwing commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/929#discussion_r106553171
  
    --- Diff: 
nifi-nar-bundles/nifi-poi-bundle/nifi-poi-processors/src/main/java/org/apache/nifi/processors/poi/ConvertExcelToCSVProcessor.java
 ---
    @@ -0,0 +1,409 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.poi;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +import org.apache.commons.io.FilenameUtils;
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.poi.openxml4j.opc.OPCPackage;
    +import org.apache.poi.xssf.eventusermodel.XSSFReader;
    +import org.apache.poi.xssf.model.SharedStringsTable;
    +import org.apache.poi.xssf.usermodel.XSSFRichTextString;
    +import org.xml.sax.Attributes;
    +import org.xml.sax.InputSource;
    +import org.xml.sax.SAXException;
    +import org.xml.sax.XMLReader;
    +import org.xml.sax.helpers.DefaultHandler;
    +import org.xml.sax.helpers.XMLReaderFactory;
    +
    +
    +@Tags({"excel", "csv", "poi"})
    +@CapabilityDescription("Consumes a Microsoft Excel document and converts 
each worksheet to csv. Each sheet from the incoming Excel " +
    +        "document will generate a new Flowfile that will be output from 
this processor. Each output Flowfile's contents will be formatted as a csv file 
" +
    +        "where the each row from the excel sheet is output as a newline in 
the csv file.")
    +@WritesAttributes({@WritesAttribute(attribute="sheetname", 
description="The name of the Excel sheet that this particular row of data came 
from in the Excel document"),
    +        @WritesAttribute(attribute="numrows", description="The number of 
rows in this Excel Sheet"),
    +        @WritesAttribute(attribute="sourcefilename", description="The name 
of the Excel document file that this data originated from"),
    +        @WritesAttribute(attribute="convertexceltocsvprocessor.error", 
description="Error message that was encountered on a per Excel sheet basis. 
This attribute is" +
    +                " only populated if an error was occured while processing 
the particular sheet. Having the error present at the sheet level will allow 
for the end" +
    +                " user to better understand what syntax errors in their 
excel doc on a larger scale caused the error.")})
    +public class ConvertExcelToCSVProcessor
    +        extends AbstractProcessor {
    +
    +    private static final String CSV_MIME_TYPE = "text/csv";
    +    public static final String SHEET_NAME = "sheetname";
    +    public static final String ROW_NUM = "numrows";
    +    public static final String SOURCE_FILE_NAME = "sourcefilename";
    +    private static final String SAX_CELL_REF = "c";
    +    private static final String SAX_CELL_TYPE = "t";
    +    private static final String SAX_CELL_STRING = "s";
    +    private static final String SAX_CELL_CONTENT_REF = "v";
    +    private static final String SAX_ROW_REF = "row";
    +    private static final String SAX_SHEET_NAME_REF = "sheetPr";
    +    private static final String DESIRED_SHEETS_DELIMITER = ",";
    +    private static final String UNKNOWN_SHEET_NAME = "UNKNOWN";
    +
    +    public static final PropertyDescriptor DESIRED_SHEETS = new 
PropertyDescriptor
    +            .Builder().name("extract-sheets")
    +            .displayName("Sheets to Extract")
    +            .description("Comma separated list of Excel document sheet 
names that should be extracted from the excel document. If this property" +
    +                    " is left blank then all of the sheets will be 
extracted from the Excel document. The list of names is case in-sensitive. Any 
sheets not " +
    +                    "specified in this value will be ignored.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final Relationship ORIGINAL = new Relationship.Builder()
    +            .name("original")
    +            .description("Original Excel document received by this 
processor")
    +            .build();
    +
    +    public static final Relationship SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("Excel data converted to csv")
    +            .build();
    +
    +    public static final Relationship FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("Failed to parse the Excel document")
    +            .build();
    +
    +    private List<PropertyDescriptor> descriptors;
    +
    +    private Set<Relationship> relationships;
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        final List<PropertyDescriptor> descriptors = new ArrayList<>();
    +        descriptors.add(DESIRED_SHEETS);
    +        this.descriptors = Collections.unmodifiableList(descriptors);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(ORIGINAL);
    +        relationships.add(SUCCESS);
    +        relationships.add(FAILURE);
    +        this.relationships = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return this.relationships;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> 
getSupportedPropertyDescriptors() {
    +        return descriptors;
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
    +        final FlowFile flowFile = session.get();
    +        if ( flowFile == null ) {
    +            return;
    +        }
    +
    +        try {
    +
    +            session.read(flowFile, new InputStreamCallback() {
    +                @Override
    +                public void process(InputStream inputStream) throws 
IOException {
    +
    +                    try {
    +                        String desiredSheetsDelimited = 
context.getProperty(DESIRED_SHEETS)
    +                                .evaluateAttributeExpressions().getValue();
    +
    +                        OPCPackage pkg = OPCPackage.open(inputStream);
    +                        XSSFReader r = new XSSFReader(pkg);
    +                        SharedStringsTable sst = r.getSharedStringsTable();
    +                        XSSFReader.SheetIterator iter = 
(XSSFReader.SheetIterator) r.getSheetsData();
    +
    +                        if (desiredSheetsDelimited != null) {
    +
    +                            String[] desiredSheets = StringUtils
    +                                    .split(desiredSheetsDelimited, 
DESIRED_SHEETS_DELIMITER);
    +
    +                            if (desiredSheets != null) {
    +
    +                                while (iter.hasNext()) {
    +                                    InputStream sheet = iter.next();
    +                                    String sheetName = iter.getSheetName();
    +
    +                                    for (int i = 0; i < 
desiredSheets.length; i++) {
    +                                        //If the sheetName is a desired 
one parse it
    +                                        if 
(sheetName.equalsIgnoreCase(desiredSheets[i])) {
    +                                            handleExcelSheet(session, 
flowFile, sst, sheet, sheetName);
    +                                            break;
    +                                        }
    +                                    }
    +                                }
    +                            } else {
    +                                getLogger().debug("Excel document was 
parsed but no sheets with the specified desired names were found.");
    +                            }
    +
    +                        } else {
    +                            //Get all of the sheets in the document.
    +                            while (iter.hasNext()) {
    +                                handleExcelSheet(session, flowFile, sst, 
iter.next(), iter.getSheetName());
    +                            }
    +                        }
    +                    } catch (Exception ex) {
    +                        getLogger().error(ex.getMessage());
    +                    }
    +                }
    +            });
    +
    +        } catch (Exception ex) {
    +            getLogger().error(ex.getMessage());
    +            session.transfer(flowFile, FAILURE);
    +        } finally {
    +            session.transfer(flowFile, ORIGINAL);
    +        }
    +    }
    +
    +
    +    /**
    +     * Handles an individual Excel sheet from the entire Excel document. 
Each sheet will result in an individual flowfile.
    +     *
    +     * @param session
    +     *  The NiFi ProcessSession instance for the current invocation.
    +     */
    +    private void handleExcelSheet(ProcessSession session, FlowFile 
originalParentFF,
    +            SharedStringsTable sst, final InputStream sheetInputStream, 
String sName) {
    +
    +        FlowFile ff = session.create();
    +        try {
    +
    +            XMLReader parser =
    +                    XMLReaderFactory.createXMLReader(
    +                            "org.apache.xerces.parsers.SAXParser"
    +                    );
    +            ExcelSheetRowHandler handler = new ExcelSheetRowHandler(sst);
    +            parser.setContentHandler(handler);
    +
    +            ff = session.write(ff, new OutputStreamCallback() {
    +                @Override
    +                public void process(OutputStream out) throws IOException {
    +                    InputSource sheetSource = new 
InputSource(sheetInputStream);
    +                    try {
    +                        ExcelSheetRowHandler eh = (ExcelSheetRowHandler) 
parser.getContentHandler();
    +                        eh.setFlowFileOutputStream(out);
    +                        parser.setContentHandler(eh);
    +                        parser.parse(sheetSource);
    +                        sheetInputStream.close();
    +                    } catch (Exception ex) {
    +                        getLogger().error(ex.getMessage());
    +                    }
    +                }
    +            });
    +
    +            if (handler.getSheetName().equals(UNKNOWN_SHEET_NAME)) {
    +                //Used the named parsed from the handler. This logic is 
only here because IF the handler does find a value that should take precedence.
    +                ff = session.putAttribute(ff, SHEET_NAME, sName);
    +            } else {
    +                ff = session.putAttribute(ff, SHEET_NAME, 
handler.getSheetName());
    +                sName = handler.getSheetName();
    +            }
    +
    +            ff = session.putAttribute(ff, ROW_NUM, new 
Long(handler.getRowCount()).toString());
    +
    +            if 
(StringUtils.isNotEmpty(originalParentFF.getAttribute(CoreAttributes.FILENAME.key())))
 {
    +                ff = session.putAttribute(ff, SOURCE_FILE_NAME, 
originalParentFF.getAttribute(CoreAttributes.FILENAME.key()));
    +            } else {
    +                ff = session.putAttribute(ff, SOURCE_FILE_NAME, 
UNKNOWN_SHEET_NAME);
    +            }
    +
    +            //Update the CoreAttributes.FILENAME to have the .csv 
extension now. Also update MIME.TYPE
    +            ff = session.putAttribute(ff, CoreAttributes.FILENAME.key(), 
updateFilenameToCSVExtension(ff.getAttribute(CoreAttributes.UUID.key()),
    +                    ff.getAttribute(CoreAttributes.FILENAME.key()), 
sName));
    +            ff = session.putAttribute(ff, CoreAttributes.MIME_TYPE.key(), 
CSV_MIME_TYPE);
    +
    +            session.transfer(ff, SUCCESS);
    +
    +        } catch (SAXException saxE) {
    +            getLogger().error(saxE.getMessage());
    +            ff = session.putAttribute(ff,
    +                    ConvertExcelToCSVProcessor.class.getName() + ".error", 
saxE.getMessage());
    +            session.transfer(ff, FAILURE);
    +        } finally {
    +            try {
    +                sheetInputStream.close();
    +            } catch (IOException ioe) {
    +                //nothing further can be done...
    +            }
    +        }
    +    }
    +
    +
    +    /**
    +     * Extracts every row from an Excel Sheet and generates a 
corresponding JSONObject whose key is the Excel CellAddress and value
    +     * is the content of that CellAddress converted to a String
    +     */
    +    private static class ExcelSheetRowHandler
    +            extends DefaultHandler {
    +
    +        private SharedStringsTable sst;
    +        private String currentContent;
    +        private boolean nextIsString;
    +        private OutputStream outputStream;
    +        private boolean firstColInRow;
    +        long rowCount;
    +        String sheetName;
    +
    +        private ExcelSheetRowHandler(SharedStringsTable sst) {
    +            this.sst = sst;
    +            this.firstColInRow = true;
    +            this.rowCount = 0l;
    +            this.sheetName = UNKNOWN_SHEET_NAME;
    +        }
    +
    +        public void setFlowFileOutputStream(OutputStream outputStream) {
    +            this.outputStream = outputStream;
    +        }
    +
    +        public void startElement(String uri, String localName, String name,
    +                Attributes attributes) throws SAXException {
    +
    +            if (name.equals(SAX_CELL_REF)) {
    +                String cellType = attributes.getValue(SAX_CELL_TYPE);
    +                if(cellType != null && cellType.equals(SAX_CELL_STRING)) {
    +                    nextIsString = true;
    +                } else {
    +                    nextIsString = false;
    +                }
    +            } else if (name.equals(SAX_ROW_REF)) {
    +               firstColInRow = true;
    +            } else if (name.equals(SAX_SHEET_NAME_REF)) {
    +                sheetName = attributes.getValue(0);
    +            }
    +
    +            currentContent = "";
    +        }
    +
    +        public void endElement(String uri, String localName, String name)
    +                throws SAXException {
    +
    +            if (nextIsString) {
    +                int idx = Integer.parseInt(currentContent);
    +                currentContent = new 
XSSFRichTextString(sst.getEntryAt(idx)).toString();
    +                nextIsString = false;
    +            }
    +
    +            if (name.equals(SAX_CELL_CONTENT_REF)) {
    +                if (firstColInRow) {
    +                    firstColInRow = false;
    +                    try {
    +                        outputStream.write(currentContent.getBytes());
    +                    } catch (IOException e) {
    +                        e.printStackTrace();
    --- End diff --
    
    Please change to logging


> Support extracting content from Microsoft Excel (.xlxs) documents
> -----------------------------------------------------------------
>
>                 Key: NIFI-2613
>                 URL: https://issues.apache.org/jira/browse/NIFI-2613
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Extensions
>            Reporter: Jeremy Dyer
>            Assignee: Jeremy Dyer
>
> Microsoft Excel is a wildly popular application that businesses rely heavily 
> on to store, visualize, and calculate data. Any single company most likely 
> has thousands of Excel documents containing data that could be very valuable 
> if ingested via NiFi and combined with other datasources. Apache POI is a 
> popular 100% Java library for parsing several Microsoft document formats 
> including Excel. Apache POI is extremely flexible and can do several things. 
> This issue would focus solely on using Apache POI to parse an incoming .xlxs 
> document and convert it to CSV. The processor should be capable of limiting 
> which excel sheets. CSV seems like the natural choice for outputting each row 
> since this feature is already available in Excel and feels very natural to 
> most Excel sheet designs.
> This capability should most likely introduce a new "poi" module as I envision 
> many more capabilities around parsing Microsoft documents could come from 
> this base effort.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to