Github user jvwing commented on a diff in the pull request:
https://github.com/apache/nifi/pull/929#discussion_r106553305
--- Diff:
nifi-nar-bundles/nifi-poi-bundle/nifi-poi-processors/src/main/java/org/apache/nifi/processors/poi/ConvertExcelToCSVProcessor.java
---
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.poi;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.poi.openxml4j.opc.OPCPackage;
+import org.apache.poi.xssf.eventusermodel.XSSFReader;
+import org.apache.poi.xssf.model.SharedStringsTable;
+import org.apache.poi.xssf.usermodel.XSSFRichTextString;
+import org.xml.sax.Attributes;
+import org.xml.sax.InputSource;
+import org.xml.sax.SAXException;
+import org.xml.sax.XMLReader;
+import org.xml.sax.helpers.DefaultHandler;
+import org.xml.sax.helpers.XMLReaderFactory;
+
+
+@Tags({"excel", "csv", "poi"})
+@CapabilityDescription("Consumes a Microsoft Excel document and converts
each worksheet to csv. Each sheet from the incoming Excel " +
+ "document will generate a new Flowfile that will be output from
this processor. Each output Flowfile's contents will be formatted as a csv file
" +
+ "where the each row from the excel sheet is output as a newline in
the csv file.")
+@WritesAttributes({@WritesAttribute(attribute="sheetname",
description="The name of the Excel sheet that this particular row of data came
from in the Excel document"),
+ @WritesAttribute(attribute="numrows", description="The number of
rows in this Excel Sheet"),
+ @WritesAttribute(attribute="sourcefilename", description="The name
of the Excel document file that this data originated from"),
+ @WritesAttribute(attribute="convertexceltocsvprocessor.error",
description="Error message that was encountered on a per Excel sheet basis.
This attribute is" +
+ " only populated if an error was occured while processing
the particular sheet. Having the error present at the sheet level will allow
for the end" +
+ " user to better understand what syntax errors in their
excel doc on a larger scale caused the error.")})
+public class ConvertExcelToCSVProcessor
+ extends AbstractProcessor {
+
+ private static final String CSV_MIME_TYPE = "text/csv";
+ public static final String SHEET_NAME = "sheetname";
+ public static final String ROW_NUM = "numrows";
+ public static final String SOURCE_FILE_NAME = "sourcefilename";
+ private static final String SAX_CELL_REF = "c";
+ private static final String SAX_CELL_TYPE = "t";
+ private static final String SAX_CELL_STRING = "s";
+ private static final String SAX_CELL_CONTENT_REF = "v";
+ private static final String SAX_ROW_REF = "row";
+ private static final String SAX_SHEET_NAME_REF = "sheetPr";
+ private static final String DESIRED_SHEETS_DELIMITER = ",";
+ private static final String UNKNOWN_SHEET_NAME = "UNKNOWN";
+
+ public static final PropertyDescriptor DESIRED_SHEETS = new
PropertyDescriptor
+ .Builder().name("extract-sheets")
+ .displayName("Sheets to Extract")
+ .description("Comma separated list of Excel document sheet
names that should be extracted from the excel document. If this property" +
+ " is left blank then all of the sheets will be
extracted from the Excel document. The list of names is case in-sensitive. Any
sheets not " +
+ "specified in this value will be ignored.")
+ .required(false)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final Relationship ORIGINAL = new Relationship.Builder()
+ .name("original")
+ .description("Original Excel document received by this
processor")
+ .build();
+
+ public static final Relationship SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("Excel data converted to csv")
+ .build();
+
+ public static final Relationship FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("Failed to parse the Excel document")
+ .build();
+
+ private List<PropertyDescriptor> descriptors;
+
+ private Set<Relationship> relationships;
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ final List<PropertyDescriptor> descriptors = new ArrayList<>();
+ descriptors.add(DESIRED_SHEETS);
+ this.descriptors = Collections.unmodifiableList(descriptors);
+
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(ORIGINAL);
+ relationships.add(SUCCESS);
+ relationships.add(FAILURE);
+ this.relationships = Collections.unmodifiableSet(relationships);
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return this.relationships;
+ }
+
+ @Override
+ public final List<PropertyDescriptor>
getSupportedPropertyDescriptors() {
+ return descriptors;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException {
+ final FlowFile flowFile = session.get();
+ if ( flowFile == null ) {
+ return;
+ }
+
+ try {
+
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(InputStream inputStream) throws
IOException {
+
+ try {
+ String desiredSheetsDelimited =
context.getProperty(DESIRED_SHEETS)
+ .evaluateAttributeExpressions().getValue();
+
+ OPCPackage pkg = OPCPackage.open(inputStream);
+ XSSFReader r = new XSSFReader(pkg);
+ SharedStringsTable sst = r.getSharedStringsTable();
+ XSSFReader.SheetIterator iter =
(XSSFReader.SheetIterator) r.getSheetsData();
+
+ if (desiredSheetsDelimited != null) {
+
+ String[] desiredSheets = StringUtils
+ .split(desiredSheetsDelimited,
DESIRED_SHEETS_DELIMITER);
+
+ if (desiredSheets != null) {
+
+ while (iter.hasNext()) {
+ InputStream sheet = iter.next();
+ String sheetName = iter.getSheetName();
+
+ for (int i = 0; i <
desiredSheets.length; i++) {
+ //If the sheetName is a desired
one parse it
+ if
(sheetName.equalsIgnoreCase(desiredSheets[i])) {
+ handleExcelSheet(session,
flowFile, sst, sheet, sheetName);
+ break;
+ }
+ }
+ }
+ } else {
+ getLogger().debug("Excel document was
parsed but no sheets with the specified desired names were found.");
+ }
+
+ } else {
+ //Get all of the sheets in the document.
+ while (iter.hasNext()) {
+ handleExcelSheet(session, flowFile, sst,
iter.next(), iter.getSheetName());
+ }
+ }
+ } catch (Exception ex) {
+ getLogger().error(ex.getMessage());
+ }
+ }
+ });
+
+ } catch (Exception ex) {
+ getLogger().error(ex.getMessage());
+ session.transfer(flowFile, FAILURE);
+ } finally {
+ session.transfer(flowFile, ORIGINAL);
+ }
+ }
+
+
+ /**
+ * Handles an individual Excel sheet from the entire Excel document.
Each sheet will result in an individual flowfile.
+ *
+ * @param session
+ * The NiFi ProcessSession instance for the current invocation.
+ */
+ private void handleExcelSheet(ProcessSession session, FlowFile
originalParentFF,
+ SharedStringsTable sst, final InputStream sheetInputStream,
String sName) {
+
+ FlowFile ff = session.create();
+ try {
+
+ XMLReader parser =
+ XMLReaderFactory.createXMLReader(
+ "org.apache.xerces.parsers.SAXParser"
+ );
+ ExcelSheetRowHandler handler = new ExcelSheetRowHandler(sst);
+ parser.setContentHandler(handler);
+
+ ff = session.write(ff, new OutputStreamCallback() {
+ @Override
+ public void process(OutputStream out) throws IOException {
+ InputSource sheetSource = new
InputSource(sheetInputStream);
+ try {
+ ExcelSheetRowHandler eh = (ExcelSheetRowHandler)
parser.getContentHandler();
+ eh.setFlowFileOutputStream(out);
+ parser.setContentHandler(eh);
+ parser.parse(sheetSource);
+ sheetInputStream.close();
+ } catch (Exception ex) {
+ getLogger().error(ex.getMessage());
+ }
+ }
+ });
+
+ if (handler.getSheetName().equals(UNKNOWN_SHEET_NAME)) {
+ //Used the named parsed from the handler. This logic is
only here because IF the handler does find a value that should take precedence.
+ ff = session.putAttribute(ff, SHEET_NAME, sName);
+ } else {
+ ff = session.putAttribute(ff, SHEET_NAME,
handler.getSheetName());
+ sName = handler.getSheetName();
+ }
+
+ ff = session.putAttribute(ff, ROW_NUM, new
Long(handler.getRowCount()).toString());
+
+ if
(StringUtils.isNotEmpty(originalParentFF.getAttribute(CoreAttributes.FILENAME.key())))
{
+ ff = session.putAttribute(ff, SOURCE_FILE_NAME,
originalParentFF.getAttribute(CoreAttributes.FILENAME.key()));
+ } else {
+ ff = session.putAttribute(ff, SOURCE_FILE_NAME,
UNKNOWN_SHEET_NAME);
+ }
+
+ //Update the CoreAttributes.FILENAME to have the .csv
extension now. Also update MIME.TYPE
+ ff = session.putAttribute(ff, CoreAttributes.FILENAME.key(),
updateFilenameToCSVExtension(ff.getAttribute(CoreAttributes.UUID.key()),
+ ff.getAttribute(CoreAttributes.FILENAME.key()),
sName));
+ ff = session.putAttribute(ff, CoreAttributes.MIME_TYPE.key(),
CSV_MIME_TYPE);
+
+ session.transfer(ff, SUCCESS);
+
+ } catch (SAXException saxE) {
+ getLogger().error(saxE.getMessage());
+ ff = session.putAttribute(ff,
+ ConvertExcelToCSVProcessor.class.getName() + ".error",
saxE.getMessage());
+ session.transfer(ff, FAILURE);
+ } finally {
+ try {
+ sheetInputStream.close();
+ } catch (IOException ioe) {
+ //nothing further can be done...
+ }
+ }
+ }
+
+
+ /**
+ * Extracts every row from an Excel Sheet and generates a
corresponding JSONObject whose key is the Excel CellAddress and value
+ * is the content of that CellAddress converted to a String
+ */
+ private static class ExcelSheetRowHandler
+ extends DefaultHandler {
+
+ private SharedStringsTable sst;
+ private String currentContent;
+ private boolean nextIsString;
+ private OutputStream outputStream;
+ private boolean firstColInRow;
+ long rowCount;
+ String sheetName;
+
+ private ExcelSheetRowHandler(SharedStringsTable sst) {
+ this.sst = sst;
+ this.firstColInRow = true;
+ this.rowCount = 0l;
+ this.sheetName = UNKNOWN_SHEET_NAME;
+ }
+
+ public void setFlowFileOutputStream(OutputStream outputStream) {
+ this.outputStream = outputStream;
+ }
+
+ public void startElement(String uri, String localName, String name,
+ Attributes attributes) throws SAXException {
+
+ if (name.equals(SAX_CELL_REF)) {
+ String cellType = attributes.getValue(SAX_CELL_TYPE);
+ if(cellType != null && cellType.equals(SAX_CELL_STRING)) {
+ nextIsString = true;
+ } else {
+ nextIsString = false;
+ }
+ } else if (name.equals(SAX_ROW_REF)) {
+ firstColInRow = true;
+ } else if (name.equals(SAX_SHEET_NAME_REF)) {
+ sheetName = attributes.getValue(0);
+ }
+
+ currentContent = "";
+ }
+
+ public void endElement(String uri, String localName, String name)
+ throws SAXException {
+
+ if (nextIsString) {
+ int idx = Integer.parseInt(currentContent);
+ currentContent = new
XSSFRichTextString(sst.getEntryAt(idx)).toString();
+ nextIsString = false;
+ }
+
+ if (name.equals(SAX_CELL_CONTENT_REF)) {
+ if (firstColInRow) {
+ firstColInRow = false;
+ try {
+ outputStream.write(currentContent.getBytes());
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ } else {
+ try {
+ outputStream.write(("," +
currentContent).getBytes());
+ } catch (IOException e) {
+ e.printStackTrace();
--- End diff --
Please change to logging
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---