exceptionfactory commented on code in PR #7194: URL: https://github.com/apache/nifi/pull/7194#discussion_r1194517265
########## nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/excel/ExcelReader.java: ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.excel; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaAccessStrategy; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.schema.inference.InferSchemaAccessStrategy; +import org.apache.nifi.schema.inference.RecordSourceFactory; +import org.apache.nifi.schema.inference.SchemaInferenceEngine; +import org.apache.nifi.schema.inference.SchemaInferenceUtil; +import org.apache.nifi.schema.inference.TimeValueInference; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; +import org.apache.nifi.serialization.DateTimeUtils; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.SchemaRegistryService; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.stream.io.NonCloseableInputStream; +import org.apache.poi.ss.usermodel.Row; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.stream.IntStream; + +@Tags({"excel", "spreadsheet", "xlsx", "parse", "record", "row", "reader", "values", "cell"}) +@CapabilityDescription("Parses a Microsoft Excel document returning each row in each sheet as a separate record. " + + "This reader allows for inferring a schema either based on the first line of an Excel sheet if a 'header line' is " + + "present or from all the desired sheets, or providing an explicit schema " + + "for interpreting the values. See Controller Service's Usage for further documentation. " + + "This reader is currently only capable of processing .xlsx " + + "(XSSF 2007 OOXML file format) Excel documents and not older .xls (HSSF '97(-2007) file format) documents.)") +public class ExcelReader extends SchemaRegistryService implements RecordReaderFactory { + + private static final AllowableValue HEADER_DERIVED = new AllowableValue("excel-header-derived", "Use fields From Header", + "The first chosen row of the Excel sheet is a header row that contains the columns representative of all the rows " + + "in the desired sheets. The schema will be derived by using those columns in the header."); + public static final PropertyDescriptor DESIRED_SHEETS = new PropertyDescriptor + .Builder().name("extract-sheets") + .displayName("Sheets to Extract") + .description("Comma separated list of Excel document sheet names whose rows should be extracted from the excel document. If this property" + + " is left blank then all the rows from all the sheets will be extracted from the Excel document. The list of names is case in-sensitive. Any sheets not" + + " specified in this value will be ignored. A bulletin will be generated if a specified sheet(s) are not found.") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor FIRST_ROW_NUM = new PropertyDescriptor + .Builder().name("excel-extract-first-row") + .displayName("Row number to start from") Review Comment: Names should use "Title Case". Recommend changing the display name and property name to "Starting Row" ```suggestion .Builder().name("Starting Row") .displayName("Starting Row") ``` ########## nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/excel/ExcelReader.java: ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.excel; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaAccessStrategy; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.schema.inference.InferSchemaAccessStrategy; +import org.apache.nifi.schema.inference.RecordSourceFactory; +import org.apache.nifi.schema.inference.SchemaInferenceEngine; +import org.apache.nifi.schema.inference.SchemaInferenceUtil; +import org.apache.nifi.schema.inference.TimeValueInference; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; +import org.apache.nifi.serialization.DateTimeUtils; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.SchemaRegistryService; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.stream.io.NonCloseableInputStream; +import org.apache.poi.ss.usermodel.Row; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.stream.IntStream; + +@Tags({"excel", "spreadsheet", "xlsx", "parse", "record", "row", "reader", "values", "cell"}) +@CapabilityDescription("Parses a Microsoft Excel document returning each row in each sheet as a separate record. " + + "This reader allows for inferring a schema either based on the first line of an Excel sheet if a 'header line' is " + + "present or from all the desired sheets, or providing an explicit schema " + + "for interpreting the values. See Controller Service's Usage for further documentation. " + + "This reader is currently only capable of processing .xlsx " + + "(XSSF 2007 OOXML file format) Excel documents and not older .xls (HSSF '97(-2007) file format) documents.)") +public class ExcelReader extends SchemaRegistryService implements RecordReaderFactory { + + private static final AllowableValue HEADER_DERIVED = new AllowableValue("excel-header-derived", "Use fields From Header", + "The first chosen row of the Excel sheet is a header row that contains the columns representative of all the rows " + + "in the desired sheets. The schema will be derived by using those columns in the header."); + public static final PropertyDescriptor DESIRED_SHEETS = new PropertyDescriptor + .Builder().name("extract-sheets") + .displayName("Sheets to Extract") + .description("Comma separated list of Excel document sheet names whose rows should be extracted from the excel document. If this property" + + " is left blank then all the rows from all the sheets will be extracted from the Excel document. The list of names is case in-sensitive. Any sheets not" + + " specified in this value will be ignored. A bulletin will be generated if a specified sheet(s) are not found.") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor FIRST_ROW_NUM = new PropertyDescriptor + .Builder().name("excel-extract-first-row") + .displayName("Row number to start from") + .description("The row number of the first row to start processing (One based)." + + " Use this to skip over rows of data at the top of a worksheet that are not part of the dataset.") + .required(true) + .defaultValue("0") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + + private AtomicReferenceArray<String> desiredSheets; + private volatile int firstRow; + private volatile String dateFormat; + private volatile String timeFormat; + private volatile String timestampFormat; + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + this.firstRow = getFirstRow(context); + String[] rawDesiredSheets = getRawDesiredSheets(context); + this.desiredSheets = new AtomicReferenceArray<>(rawDesiredSheets.length); + IntStream.range(0, rawDesiredSheets.length) + .forEach(index -> this.desiredSheets.set(index, rawDesiredSheets[index])); + this.dateFormat = context.getProperty(DateTimeUtils.DATE_FORMAT).getValue(); + this.timeFormat = context.getProperty(DateTimeUtils.TIME_FORMAT).getValue(); + this.timestampFormat = context.getProperty(DateTimeUtils.TIMESTAMP_FORMAT).getValue(); + } + + private int getFirstRow(final PropertyContext context) { + int rawFirstRow = context.getProperty(FIRST_ROW_NUM).asInteger(); + return getZeroBasedIndex(rawFirstRow); + } + + static int getZeroBasedIndex(int rawFirstRow) { + return rawFirstRow > 0 ? rawFirstRow - 1 : 0; + } + private String[] getRawDesiredSheets(final PropertyContext context) { + final String desiredSheetsDelimited = context.getProperty(DESIRED_SHEETS).getValue(); + return getRawDesiredSheets(desiredSheetsDelimited, getLogger()); + } + + static String[] getRawDesiredSheets(String desiredSheetsDelimited, ComponentLog logger) { + if (desiredSheetsDelimited != null) { + String[] delimitedSheets = StringUtils.split(desiredSheetsDelimited, ","); + if (delimitedSheets != null) { + return delimitedSheets; + } else { + if (logger != null) { + logger.debug("Excel document was parsed but no sheets with the specified desired names were found."); + } Review Comment: Recommend removing this check and log. If logging is necessary, it should be implemented in the caller, not using an optional method argument for the logger. ########## nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/excel/ExcelReader.java: ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.excel; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaAccessStrategy; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.schema.inference.InferSchemaAccessStrategy; +import org.apache.nifi.schema.inference.RecordSourceFactory; +import org.apache.nifi.schema.inference.SchemaInferenceEngine; +import org.apache.nifi.schema.inference.SchemaInferenceUtil; +import org.apache.nifi.schema.inference.TimeValueInference; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; +import org.apache.nifi.serialization.DateTimeUtils; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.SchemaRegistryService; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.stream.io.NonCloseableInputStream; +import org.apache.poi.ss.usermodel.Row; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.stream.IntStream; + +@Tags({"excel", "spreadsheet", "xlsx", "parse", "record", "row", "reader", "values", "cell"}) +@CapabilityDescription("Parses a Microsoft Excel document returning each row in each sheet as a separate record. " + + "This reader allows for inferring a schema either based on the first line of an Excel sheet if a 'header line' is " + + "present or from all the desired sheets, or providing an explicit schema " + + "for interpreting the values. See Controller Service's Usage for further documentation. " + + "This reader is currently only capable of processing .xlsx " + + "(XSSF 2007 OOXML file format) Excel documents and not older .xls (HSSF '97(-2007) file format) documents.)") +public class ExcelReader extends SchemaRegistryService implements RecordReaderFactory { + + private static final AllowableValue HEADER_DERIVED = new AllowableValue("excel-header-derived", "Use fields From Header", + "The first chosen row of the Excel sheet is a header row that contains the columns representative of all the rows " + + "in the desired sheets. The schema will be derived by using those columns in the header."); + public static final PropertyDescriptor DESIRED_SHEETS = new PropertyDescriptor + .Builder().name("extract-sheets") + .displayName("Sheets to Extract") + .description("Comma separated list of Excel document sheet names whose rows should be extracted from the excel document. If this property" + + " is left blank then all the rows from all the sheets will be extracted from the Excel document. The list of names is case in-sensitive. Any sheets not" + + " specified in this value will be ignored. A bulletin will be generated if a specified sheet(s) are not found.") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor FIRST_ROW_NUM = new PropertyDescriptor + .Builder().name("excel-extract-first-row") + .displayName("Row number to start from") + .description("The row number of the first row to start processing (One based)." Review Comment: The comment says `(One based)` but the default value is `0`, which is confusing. If it is one-based, it seems like the default value should be 1. ########## nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/excel/ExcelReader.java: ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.excel; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaAccessStrategy; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.schema.inference.InferSchemaAccessStrategy; +import org.apache.nifi.schema.inference.RecordSourceFactory; +import org.apache.nifi.schema.inference.SchemaInferenceEngine; +import org.apache.nifi.schema.inference.SchemaInferenceUtil; +import org.apache.nifi.schema.inference.TimeValueInference; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; +import org.apache.nifi.serialization.DateTimeUtils; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.SchemaRegistryService; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.stream.io.NonCloseableInputStream; +import org.apache.poi.ss.usermodel.Row; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.stream.IntStream; + +@Tags({"excel", "spreadsheet", "xlsx", "parse", "record", "row", "reader", "values", "cell"}) +@CapabilityDescription("Parses a Microsoft Excel document returning each row in each sheet as a separate record. " + + "This reader allows for inferring a schema either based on the first line of an Excel sheet if a 'header line' is " + + "present or from all the desired sheets, or providing an explicit schema " + + "for interpreting the values. See Controller Service's Usage for further documentation. " + + "This reader is currently only capable of processing .xlsx " + + "(XSSF 2007 OOXML file format) Excel documents and not older .xls (HSSF '97(-2007) file format) documents.)") +public class ExcelReader extends SchemaRegistryService implements RecordReaderFactory { + + private static final AllowableValue HEADER_DERIVED = new AllowableValue("excel-header-derived", "Use fields From Header", + "The first chosen row of the Excel sheet is a header row that contains the columns representative of all the rows " + + "in the desired sheets. The schema will be derived by using those columns in the header."); + public static final PropertyDescriptor DESIRED_SHEETS = new PropertyDescriptor + .Builder().name("extract-sheets") + .displayName("Sheets to Extract") + .description("Comma separated list of Excel document sheet names whose rows should be extracted from the excel document. If this property" + + " is left blank then all the rows from all the sheets will be extracted from the Excel document. The list of names is case in-sensitive. Any sheets not" + + " specified in this value will be ignored. A bulletin will be generated if a specified sheet(s) are not found.") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor FIRST_ROW_NUM = new PropertyDescriptor + .Builder().name("excel-extract-first-row") + .displayName("Row number to start from") + .description("The row number of the first row to start processing (One based)." + + " Use this to skip over rows of data at the top of a worksheet that are not part of the dataset.") + .required(true) + .defaultValue("0") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + + private AtomicReferenceArray<String> desiredSheets; + private volatile int firstRow; + private volatile String dateFormat; + private volatile String timeFormat; + private volatile String timestampFormat; + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + this.firstRow = getFirstRow(context); + String[] rawDesiredSheets = getRawDesiredSheets(context); + this.desiredSheets = new AtomicReferenceArray<>(rawDesiredSheets.length); + IntStream.range(0, rawDesiredSheets.length) + .forEach(index -> this.desiredSheets.set(index, rawDesiredSheets[index])); + this.dateFormat = context.getProperty(DateTimeUtils.DATE_FORMAT).getValue(); + this.timeFormat = context.getProperty(DateTimeUtils.TIME_FORMAT).getValue(); + this.timestampFormat = context.getProperty(DateTimeUtils.TIMESTAMP_FORMAT).getValue(); + } + + private int getFirstRow(final PropertyContext context) { + int rawFirstRow = context.getProperty(FIRST_ROW_NUM).asInteger(); + return getZeroBasedIndex(rawFirstRow); + } + + static int getZeroBasedIndex(int rawFirstRow) { + return rawFirstRow > 0 ? rawFirstRow - 1 : 0; + } + private String[] getRawDesiredSheets(final PropertyContext context) { + final String desiredSheetsDelimited = context.getProperty(DESIRED_SHEETS).getValue(); + return getRawDesiredSheets(desiredSheetsDelimited, getLogger()); + } + + static String[] getRawDesiredSheets(String desiredSheetsDelimited, ComponentLog logger) { + if (desiredSheetsDelimited != null) { + String[] delimitedSheets = StringUtils.split(desiredSheetsDelimited, ","); + if (delimitedSheets != null) { + return delimitedSheets; + } else { + if (logger != null) { + logger.debug("Excel document was parsed but no sheets with the specified desired names were found."); + } + } + } + + return new String[0]; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); + properties.add(DESIRED_SHEETS); + properties.add(FIRST_ROW_NUM); + properties.add(DateTimeUtils.DATE_FORMAT); + properties.add(DateTimeUtils.TIME_FORMAT); + properties.add(DateTimeUtils.TIMESTAMP_FORMAT); + + return properties; + } + + @Override + protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final PropertyContext context) { + if (allowableValue.equalsIgnoreCase(HEADER_DERIVED.getValue())) { Review Comment: It is better to use `equals()` for property value comparisons, since the case should always match. ```suggestion if (HEADER_DERIVED.getValue().equals(allowableValue)) { ``` ########## nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/excel/ExcelRecordReader.java: ########## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.excel; + +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.util.DataTypeUtils; +import org.apache.poi.ss.usermodel.Cell; +import org.apache.poi.ss.usermodel.DateUtil; +import org.apache.poi.ss.usermodel.Row; + +import java.io.IOException; +import java.text.DateFormat; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; +import java.util.stream.IntStream; + +import static org.apache.commons.lang3.StringUtils.isEmpty; + +public class ExcelRecordReader implements RecordReader { + private final RowIterator rowIterator; + private ComponentLog logger; + private final RecordSchema schema; + private final List<String> desiredSheets; + private final Supplier<DateFormat> LAZY_DATE_FORMAT; + private final Supplier<DateFormat> LAZY_TIME_FORMAT; + private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT; + private final String dateFormat; + private final String timeFormat; + private final String timestampFormat; + + public ExcelRecordReader(ExcelRecordReaderArgs args) throws MalformedRecordException { + this.logger = args.getLogger(); + this.schema = args.getSchema(); + this.desiredSheets = new ArrayList<>(); + if (args.getDesiredSheets() != null && args.getDesiredSheets().length() > 0) { + IntStream.range(0, args.getDesiredSheets().length()) + .forEach(index -> this.desiredSheets.add(args.getDesiredSheets().get(index))); + } + + if (isEmpty(args.getDateFormat())) { + this.dateFormat = null; + LAZY_DATE_FORMAT = null; + } else { + this.dateFormat = args.getDateFormat(); + LAZY_DATE_FORMAT = () -> DataTypeUtils.getDateFormat(dateFormat); + } + + if (isEmpty(args.getTimeFormat())) { + this.timeFormat = null; + LAZY_TIME_FORMAT = null; + } else { + this.timeFormat = args.getTimeFormat(); + LAZY_TIME_FORMAT = () -> DataTypeUtils.getDateFormat(timeFormat); + } + + if (isEmpty(args.getTimestampFormat())) { + this.timestampFormat = null; + LAZY_TIMESTAMP_FORMAT = null; + } else { + this.timestampFormat = args.getTimestampFormat(); + LAZY_TIMESTAMP_FORMAT = () -> DataTypeUtils.getDateFormat(timestampFormat); + } + + try { + this.rowIterator = new RowIterator(args.getInputStream(), desiredSheets, args.getFirstRow(), logger); + } catch (RuntimeException e) { + String msg = "Error occurred while processing record file"; + logger.error(msg, e); + throw new MalformedRecordException(msg, e); + } + } + + @Override + public Record nextRecord(boolean coerceTypes, boolean dropUnknownFields) throws IOException, MalformedRecordException { + try { + if (rowIterator.hasNext()) { + Row currentRow = rowIterator.next(); + Map<String, Object> currentRowValues = getCurrentRowValues(currentRow, coerceTypes, dropUnknownFields); + return new MapRecord(schema, currentRowValues); + } + } catch (Exception e) { + throw new MalformedRecordException("Error while getting next record", e); + } + return null; + } + + private Map<String, Object> getCurrentRowValues(Row currentRow, boolean coerceTypes, boolean dropUnknownFields) { + final List<RecordField> recordFields = schema.getFields(); + final Map<String, Object> currentRowValues = new LinkedHashMap<>(); + + if (ExcelUtils.hasCells(currentRow)) { + IntStream.range(0, currentRow.getLastCellNum()) + .forEach(index -> { + Cell cell = currentRow.getCell(index); + Object cellValue; + if (index >= recordFields.size()) { + if (!dropUnknownFields) { + cellValue = getCellValue(cell); + currentRowValues.put("unknown_field_index_" + index, cellValue); + } + } else { + final RecordField recordField = recordFields.get(index); + String fieldName = recordField.getFieldName(); + DataType dataType = recordField.getDataType(); + cellValue = getCellValue(cell); + final Object value = coerceTypes ? convert(cellValue, dataType, fieldName) + : convertSimpleIfPossible(cellValue, dataType, fieldName); + currentRowValues.put(fieldName, value); + } + }); + } + + return currentRowValues; + } + + public static Object getCellValue(Cell cell) { Review Comment: Is there a reason this method is public? If it is public for testing, it probably should be moved to a utility class. ########## nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/excel/ExcelRecordReader.java: ########## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.excel; + +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.util.DataTypeUtils; +import org.apache.poi.ss.usermodel.Cell; +import org.apache.poi.ss.usermodel.DateUtil; +import org.apache.poi.ss.usermodel.Row; + +import java.io.IOException; +import java.text.DateFormat; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; +import java.util.stream.IntStream; + +import static org.apache.commons.lang3.StringUtils.isEmpty; + +public class ExcelRecordReader implements RecordReader { + private final RowIterator rowIterator; + private ComponentLog logger; + private final RecordSchema schema; + private final List<String> desiredSheets; + private final Supplier<DateFormat> LAZY_DATE_FORMAT; + private final Supplier<DateFormat> LAZY_TIME_FORMAT; + private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT; + private final String dateFormat; + private final String timeFormat; + private final String timestampFormat; + + public ExcelRecordReader(ExcelRecordReaderArgs args) throws MalformedRecordException { + this.logger = args.getLogger(); + this.schema = args.getSchema(); + this.desiredSheets = new ArrayList<>(); + if (args.getDesiredSheets() != null && args.getDesiredSheets().length() > 0) { + IntStream.range(0, args.getDesiredSheets().length()) + .forEach(index -> this.desiredSheets.add(args.getDesiredSheets().get(index))); + } + + if (isEmpty(args.getDateFormat())) { + this.dateFormat = null; + LAZY_DATE_FORMAT = null; + } else { + this.dateFormat = args.getDateFormat(); + LAZY_DATE_FORMAT = () -> DataTypeUtils.getDateFormat(dateFormat); + } + + if (isEmpty(args.getTimeFormat())) { + this.timeFormat = null; + LAZY_TIME_FORMAT = null; + } else { + this.timeFormat = args.getTimeFormat(); + LAZY_TIME_FORMAT = () -> DataTypeUtils.getDateFormat(timeFormat); + } + + if (isEmpty(args.getTimestampFormat())) { + this.timestampFormat = null; + LAZY_TIMESTAMP_FORMAT = null; + } else { + this.timestampFormat = args.getTimestampFormat(); + LAZY_TIMESTAMP_FORMAT = () -> DataTypeUtils.getDateFormat(timestampFormat); + } + + try { + this.rowIterator = new RowIterator(args.getInputStream(), desiredSheets, args.getFirstRow(), logger); + } catch (RuntimeException e) { + String msg = "Error occurred while processing record file"; + logger.error(msg, e); Review Comment: It is not necessary to log and error and throw the exception in this case. Recommend removing the logger.error call and allowing the framework to log the exception. ########## nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/excel/RowIterator.java: ########## @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */package org.apache.nifi.excel; + +import com.github.pjfanning.xlsx.StreamingReader; +import org.apache.nifi.logging.ComponentLog; +import org.apache.poi.ss.usermodel.Row; +import org.apache.poi.ss.usermodel.Sheet; +import org.apache.poi.ss.usermodel.Workbook; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class RowIterator implements Iterator<Row>, Closeable { + private final Workbook workbook; + private final Iterator<Sheet> sheets; + private Sheet currentSheet; + private Iterator<Row> currentRows; + private final Map<String, Boolean> desiredSheets; + private final int firstRow; + private ComponentLog logger; + private boolean log; + private Row currentRow; + + public RowIterator(InputStream in, List<String> desiredSheets, int firstRow) { + this(in, desiredSheets, firstRow, null); + } + + public RowIterator(InputStream in, List<String> desiredSheets, int firstRow, ComponentLog logger) { + this.workbook = StreamingReader.builder() + .rowCacheSize(100) + .bufferSize(4096) + .open(in); + this.sheets = this.workbook.iterator(); + this.desiredSheets = desiredSheets != null ? desiredSheets.stream() + .collect(Collectors.toMap(key -> key, value -> Boolean.FALSE)) : new HashMap<>(); + this.firstRow = firstRow; + this.logger = logger; + this.log = logger != null; + } + + @Override + public boolean hasNext() { + setCurrent(); + boolean next = currentRow != null; + if(!next) { + String sheetsNotFound = getSheetsNotFound(desiredSheets); + if (!sheetsNotFound.isEmpty() && log) { + logger.warn("Excel sheet(s) not found: {}", sheetsNotFound); + } Review Comment: This check and warning seems unnecessary in general, perhaps there are other places this could be logged if needed. ########## nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/excel/ExcelRecordSource.java: ########## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.excel; + +import org.apache.commons.lang3.math.NumberUtils; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.schema.inference.RecordSource; +import org.apache.poi.ss.usermodel.Row; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class ExcelRecordSource implements RecordSource<Row> { + private final RowIterator rowIterator; + public ExcelRecordSource(final InputStream in, final PropertyContext context, final Map<String, String> variables) { + try { + String desiredSheetsDelimited = context.getProperty(ExcelReader.DESIRED_SHEETS).evaluateAttributeExpressions(variables).getValue(); + String [] rawDesiredSheets = ExcelReader.getRawDesiredSheets(desiredSheetsDelimited, null); + Integer rawFirstRow = context.getProperty(ExcelReader.FIRST_ROW_NUM).evaluateAttributeExpressions(variables).asInteger(); + int firstRow = rawFirstRow != null ? rawFirstRow : NumberUtils.toInt(ExcelReader.FIRST_ROW_NUM.getDefaultValue()); + firstRow = ExcelReader.getZeroBasedIndex(firstRow); + this.rowIterator = new RowIterator(in, getDesiredSheets(rawDesiredSheets), firstRow); + } catch (RuntimeException e) { + throw new ProcessException(e); Review Comment: ProcessException should include a message, but as it does not seem to be adding any value, it probably makes sense to remove the try-catch and allow RuntimeExceptions to bubble up. ########## nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/excel/ExcelUtils.java: ########## @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.excel; + +import org.apache.poi.ss.usermodel.Row; + +public class ExcelUtils { Review Comment: Unless there are other utility methods, it is probably cleaner to localize `hasCells` instead of creating this one-method class. ########## nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/excel/RowIterator.java: ########## @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */package org.apache.nifi.excel; + +import com.github.pjfanning.xlsx.StreamingReader; +import org.apache.nifi.logging.ComponentLog; +import org.apache.poi.ss.usermodel.Row; +import org.apache.poi.ss.usermodel.Sheet; +import org.apache.poi.ss.usermodel.Workbook; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class RowIterator implements Iterator<Row>, Closeable { + private final Workbook workbook; + private final Iterator<Sheet> sheets; + private Sheet currentSheet; + private Iterator<Row> currentRows; + private final Map<String, Boolean> desiredSheets; + private final int firstRow; + private ComponentLog logger; + private boolean log; + private Row currentRow; + + public RowIterator(InputStream in, List<String> desiredSheets, int firstRow) { + this(in, desiredSheets, firstRow, null); + } + + public RowIterator(InputStream in, List<String> desiredSheets, int firstRow, ComponentLog logger) { + this.workbook = StreamingReader.builder() + .rowCacheSize(100) + .bufferSize(4096) + .open(in); + this.sheets = this.workbook.iterator(); + this.desiredSheets = desiredSheets != null ? desiredSheets.stream() + .collect(Collectors.toMap(key -> key, value -> Boolean.FALSE)) : new HashMap<>(); + this.firstRow = firstRow; + this.logger = logger; + this.log = logger != null; + } + + @Override + public boolean hasNext() { + setCurrent(); + boolean next = currentRow != null; + if(!next) { + String sheetsNotFound = getSheetsNotFound(desiredSheets); + if (!sheetsNotFound.isEmpty() && log) { + logger.warn("Excel sheet(s) not found: {}", sheetsNotFound); + } + } + return next; + } + + private void setCurrent() { + currentRow = getNextRow(); + if (currentRow != null) { + return; + } + + currentSheet = null; + currentRows = null; + while (sheets.hasNext()) { + currentSheet = sheets.next(); + if (isIterateOverAllSheets() || hasSheet(currentSheet.getSheetName())) { + currentRows = currentSheet.iterator(); + currentRow = getNextRow(); + if (currentRow != null) { + return; + } + } + } + } + + private Row getNextRow() { + while (currentRows != null && !hasExhaustedRows()) { + Row tempCurrentRow = currentRows.next(); + if (!isSkip(tempCurrentRow)) { + return tempCurrentRow; + } + } + return null; + } + + private boolean hasExhaustedRows() { + boolean exhausted = !currentRows.hasNext(); + if (log && exhausted) { + logger.info("Exhausted all rows from sheet {}", currentSheet.getSheetName()); + } + return exhausted; + } + + private boolean isSkip(Row row) { + return row.getRowNum() < firstRow; + } + + private boolean isIterateOverAllSheets() { + boolean iterateAllSheets = desiredSheets.isEmpty(); + if (iterateAllSheets && log) { + logger.info("Advanced to sheet {}", currentSheet.getSheetName()); + } + return iterateAllSheets; + } + + private boolean hasSheet(String name) { + boolean sheetByName = !desiredSheets.isEmpty() + && desiredSheets.keySet().stream() + .anyMatch(desiredSheet -> desiredSheet.equalsIgnoreCase(name)); + if (sheetByName) { + desiredSheets.put(name, Boolean.TRUE); + } + return sheetByName; + } + + private String getSheetsNotFound(Map<String, Boolean> desiredSheets) { + return desiredSheets.entrySet().stream() + .filter(entry -> !entry.getValue()) + .map(Map.Entry::getKey) + .collect(Collectors.joining(",")); + } + + @Override + public Row next() { + return currentRow; + } + + @Override + public void close() throws IOException { + this.workbook.close(); + } + + void setLogger(ComponentLog logger) { Review Comment: As elsewhere, having a setter for the ComponentLog does not seem like the best approach, if the ComponentLog is needed, it should be passed in the constructor. ########## nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/excel/ExcelReader.java: ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.excel; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaAccessStrategy; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.schema.inference.InferSchemaAccessStrategy; +import org.apache.nifi.schema.inference.RecordSourceFactory; +import org.apache.nifi.schema.inference.SchemaInferenceEngine; +import org.apache.nifi.schema.inference.SchemaInferenceUtil; +import org.apache.nifi.schema.inference.TimeValueInference; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; +import org.apache.nifi.serialization.DateTimeUtils; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.SchemaRegistryService; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.stream.io.NonCloseableInputStream; +import org.apache.poi.ss.usermodel.Row; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.stream.IntStream; + +@Tags({"excel", "spreadsheet", "xlsx", "parse", "record", "row", "reader", "values", "cell"}) +@CapabilityDescription("Parses a Microsoft Excel document returning each row in each sheet as a separate record. " + + "This reader allows for inferring a schema either based on the first line of an Excel sheet if a 'header line' is " + + "present or from all the desired sheets, or providing an explicit schema " + + "for interpreting the values. See Controller Service's Usage for further documentation. " + + "This reader is currently only capable of processing .xlsx " + + "(XSSF 2007 OOXML file format) Excel documents and not older .xls (HSSF '97(-2007) file format) documents.)") +public class ExcelReader extends SchemaRegistryService implements RecordReaderFactory { + + private static final AllowableValue HEADER_DERIVED = new AllowableValue("excel-header-derived", "Use fields From Header", + "The first chosen row of the Excel sheet is a header row that contains the columns representative of all the rows " + + "in the desired sheets. The schema will be derived by using those columns in the header."); + public static final PropertyDescriptor DESIRED_SHEETS = new PropertyDescriptor + .Builder().name("extract-sheets") + .displayName("Sheets to Extract") + .description("Comma separated list of Excel document sheet names whose rows should be extracted from the excel document. If this property" + + " is left blank then all the rows from all the sheets will be extracted from the Excel document. The list of names is case in-sensitive. Any sheets not" + + " specified in this value will be ignored. A bulletin will be generated if a specified sheet(s) are not found.") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor FIRST_ROW_NUM = new PropertyDescriptor + .Builder().name("excel-extract-first-row") + .displayName("Row number to start from") + .description("The row number of the first row to start processing (One based)." + + " Use this to skip over rows of data at the top of a worksheet that are not part of the dataset.") + .required(true) + .defaultValue("0") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + + private AtomicReferenceArray<String> desiredSheets; + private volatile int firstRow; + private volatile String dateFormat; + private volatile String timeFormat; + private volatile String timestampFormat; + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + this.firstRow = getFirstRow(context); + String[] rawDesiredSheets = getRawDesiredSheets(context); + this.desiredSheets = new AtomicReferenceArray<>(rawDesiredSheets.length); + IntStream.range(0, rawDesiredSheets.length) + .forEach(index -> this.desiredSheets.set(index, rawDesiredSheets[index])); + this.dateFormat = context.getProperty(DateTimeUtils.DATE_FORMAT).getValue(); + this.timeFormat = context.getProperty(DateTimeUtils.TIME_FORMAT).getValue(); + this.timestampFormat = context.getProperty(DateTimeUtils.TIMESTAMP_FORMAT).getValue(); + } + + private int getFirstRow(final PropertyContext context) { + int rawFirstRow = context.getProperty(FIRST_ROW_NUM).asInteger(); + return getZeroBasedIndex(rawFirstRow); + } + + static int getZeroBasedIndex(int rawFirstRow) { + return rawFirstRow > 0 ? rawFirstRow - 1 : 0; + } + private String[] getRawDesiredSheets(final PropertyContext context) { Review Comment: Methods should have a newline separating them from previous methods. ```suggestion private String[] getRawDesiredSheets(final PropertyContext context) { ``` ########## nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/excel/ExcelReader.java: ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.excel; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaAccessStrategy; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.schema.inference.InferSchemaAccessStrategy; +import org.apache.nifi.schema.inference.RecordSourceFactory; +import org.apache.nifi.schema.inference.SchemaInferenceEngine; +import org.apache.nifi.schema.inference.SchemaInferenceUtil; +import org.apache.nifi.schema.inference.TimeValueInference; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; +import org.apache.nifi.serialization.DateTimeUtils; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.SchemaRegistryService; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.stream.io.NonCloseableInputStream; +import org.apache.poi.ss.usermodel.Row; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.stream.IntStream; + +@Tags({"excel", "spreadsheet", "xlsx", "parse", "record", "row", "reader", "values", "cell"}) +@CapabilityDescription("Parses a Microsoft Excel document returning each row in each sheet as a separate record. " + + "This reader allows for inferring a schema either based on the first line of an Excel sheet if a 'header line' is " + + "present or from all the desired sheets, or providing an explicit schema " + + "for interpreting the values. See Controller Service's Usage for further documentation. " + + "This reader is currently only capable of processing .xlsx " + + "(XSSF 2007 OOXML file format) Excel documents and not older .xls (HSSF '97(-2007) file format) documents.)") +public class ExcelReader extends SchemaRegistryService implements RecordReaderFactory { + + private static final AllowableValue HEADER_DERIVED = new AllowableValue("excel-header-derived", "Use fields From Header", + "The first chosen row of the Excel sheet is a header row that contains the columns representative of all the rows " + + "in the desired sheets. The schema will be derived by using those columns in the header."); + public static final PropertyDescriptor DESIRED_SHEETS = new PropertyDescriptor + .Builder().name("extract-sheets") + .displayName("Sheets to Extract") + .description("Comma separated list of Excel document sheet names whose rows should be extracted from the excel document. If this property" + + " is left blank then all the rows from all the sheets will be extracted from the Excel document. The list of names is case in-sensitive. Any sheets not" + + " specified in this value will be ignored. A bulletin will be generated if a specified sheet(s) are not found.") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor FIRST_ROW_NUM = new PropertyDescriptor + .Builder().name("excel-extract-first-row") + .displayName("Row number to start from") + .description("The row number of the first row to start processing (One based)." + + " Use this to skip over rows of data at the top of a worksheet that are not part of the dataset.") + .required(true) + .defaultValue("0") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) Review Comment: If this property supports FlowFile Attributes, then `firstRow` should not be a member variable. In light of the Configuration Context not being available to `createRecordReader()`, recommend changing this value to `NONE`. ########## nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/excel/ExcelRecordReaderArgs.java: ########## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.excel; + +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.record.RecordSchema; + +import java.io.InputStream; +import java.util.concurrent.atomic.AtomicReferenceArray; + +public class ExcelRecordReaderArgs { Review Comment: Recommend renaming this to `ExcelRecordReaderConfiguration`, although it does include the `InputStream`. In that case, it may be better to pass the InputStream separately. ########## nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/excel/ExcelReader.java: ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.excel; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaAccessStrategy; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.schema.inference.InferSchemaAccessStrategy; +import org.apache.nifi.schema.inference.RecordSourceFactory; +import org.apache.nifi.schema.inference.SchemaInferenceEngine; +import org.apache.nifi.schema.inference.SchemaInferenceUtil; +import org.apache.nifi.schema.inference.TimeValueInference; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; +import org.apache.nifi.serialization.DateTimeUtils; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.SchemaRegistryService; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.stream.io.NonCloseableInputStream; +import org.apache.poi.ss.usermodel.Row; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.stream.IntStream; + +@Tags({"excel", "spreadsheet", "xlsx", "parse", "record", "row", "reader", "values", "cell"}) +@CapabilityDescription("Parses a Microsoft Excel document returning each row in each sheet as a separate record. " + + "This reader allows for inferring a schema either based on the first line of an Excel sheet if a 'header line' is " + + "present or from all the desired sheets, or providing an explicit schema " + + "for interpreting the values. See Controller Service's Usage for further documentation. " + + "This reader is currently only capable of processing .xlsx " + + "(XSSF 2007 OOXML file format) Excel documents and not older .xls (HSSF '97(-2007) file format) documents.)") +public class ExcelReader extends SchemaRegistryService implements RecordReaderFactory { + + private static final AllowableValue HEADER_DERIVED = new AllowableValue("excel-header-derived", "Use fields From Header", + "The first chosen row of the Excel sheet is a header row that contains the columns representative of all the rows " + + "in the desired sheets. The schema will be derived by using those columns in the header."); + public static final PropertyDescriptor DESIRED_SHEETS = new PropertyDescriptor + .Builder().name("extract-sheets") + .displayName("Sheets to Extract") + .description("Comma separated list of Excel document sheet names whose rows should be extracted from the excel document. If this property" + + " is left blank then all the rows from all the sheets will be extracted from the Excel document. The list of names is case in-sensitive. Any sheets not" + + " specified in this value will be ignored. A bulletin will be generated if a specified sheet(s) are not found.") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor FIRST_ROW_NUM = new PropertyDescriptor Review Comment: ```suggestion public static final PropertyDescriptor STARTING_ROW = new PropertyDescriptor ``` ########## nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/excel/ExcelRecordReader.java: ########## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.excel; + +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.util.DataTypeUtils; +import org.apache.poi.ss.usermodel.Cell; +import org.apache.poi.ss.usermodel.DateUtil; +import org.apache.poi.ss.usermodel.Row; + +import java.io.IOException; +import java.text.DateFormat; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; +import java.util.stream.IntStream; + +import static org.apache.commons.lang3.StringUtils.isEmpty; + +public class ExcelRecordReader implements RecordReader { + private final RowIterator rowIterator; + private ComponentLog logger; + private final RecordSchema schema; + private final List<String> desiredSheets; + private final Supplier<DateFormat> LAZY_DATE_FORMAT; + private final Supplier<DateFormat> LAZY_TIME_FORMAT; + private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT; + private final String dateFormat; + private final String timeFormat; + private final String timestampFormat; + + public ExcelRecordReader(ExcelRecordReaderArgs args) throws MalformedRecordException { + this.logger = args.getLogger(); + this.schema = args.getSchema(); + this.desiredSheets = new ArrayList<>(); + if (args.getDesiredSheets() != null && args.getDesiredSheets().length() > 0) { + IntStream.range(0, args.getDesiredSheets().length()) + .forEach(index -> this.desiredSheets.add(args.getDesiredSheets().get(index))); + } + + if (isEmpty(args.getDateFormat())) { + this.dateFormat = null; + LAZY_DATE_FORMAT = null; + } else { + this.dateFormat = args.getDateFormat(); + LAZY_DATE_FORMAT = () -> DataTypeUtils.getDateFormat(dateFormat); + } + + if (isEmpty(args.getTimeFormat())) { + this.timeFormat = null; + LAZY_TIME_FORMAT = null; + } else { + this.timeFormat = args.getTimeFormat(); + LAZY_TIME_FORMAT = () -> DataTypeUtils.getDateFormat(timeFormat); + } + + if (isEmpty(args.getTimestampFormat())) { + this.timestampFormat = null; + LAZY_TIMESTAMP_FORMAT = null; + } else { + this.timestampFormat = args.getTimestampFormat(); + LAZY_TIMESTAMP_FORMAT = () -> DataTypeUtils.getDateFormat(timestampFormat); + } + + try { + this.rowIterator = new RowIterator(args.getInputStream(), desiredSheets, args.getFirstRow(), logger); + } catch (RuntimeException e) { + String msg = "Error occurred while processing record file"; + logger.error(msg, e); + throw new MalformedRecordException(msg, e); + } + } + + @Override + public Record nextRecord(boolean coerceTypes, boolean dropUnknownFields) throws IOException, MalformedRecordException { + try { + if (rowIterator.hasNext()) { + Row currentRow = rowIterator.next(); + Map<String, Object> currentRowValues = getCurrentRowValues(currentRow, coerceTypes, dropUnknownFields); + return new MapRecord(schema, currentRowValues); + } + } catch (Exception e) { + throw new MalformedRecordException("Error while getting next record", e); + } + return null; + } + + private Map<String, Object> getCurrentRowValues(Row currentRow, boolean coerceTypes, boolean dropUnknownFields) { + final List<RecordField> recordFields = schema.getFields(); + final Map<String, Object> currentRowValues = new LinkedHashMap<>(); + + if (ExcelUtils.hasCells(currentRow)) { + IntStream.range(0, currentRow.getLastCellNum()) + .forEach(index -> { + Cell cell = currentRow.getCell(index); + Object cellValue; + if (index >= recordFields.size()) { + if (!dropUnknownFields) { + cellValue = getCellValue(cell); + currentRowValues.put("unknown_field_index_" + index, cellValue); + } + } else { + final RecordField recordField = recordFields.get(index); + String fieldName = recordField.getFieldName(); + DataType dataType = recordField.getDataType(); + cellValue = getCellValue(cell); + final Object value = coerceTypes ? convert(cellValue, dataType, fieldName) + : convertSimpleIfPossible(cellValue, dataType, fieldName); + currentRowValues.put(fieldName, value); + } + }); + } + + return currentRowValues; + } + + public static Object getCellValue(Cell cell) { + if (cell != null) { + switch (cell.getCellType()) { + case _NONE: + case BLANK: + case ERROR: + case FORMULA: + case STRING: + return cell.getStringCellValue(); + case NUMERIC: + return DateUtil.isCellDateFormatted(cell) ? cell.getDateCellValue() : cell.getNumericCellValue(); + case BOOLEAN: + return cell.getBooleanCellValue(); + } + } + return null; + } + + private Object convert(final Object value, final DataType dataType, final String fieldName) { + if (value == null || dataType == null) { + return value; + } + + return DataTypeUtils.convertType(value, dataType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName); + } + + private Object convertSimpleIfPossible(final Object value, final DataType dataType, final String fieldName) { + if (value == null || dataType == null) { + return value; + } + + switch (dataType.getFieldType()) { + case STRING: + return value; + case BOOLEAN: + case INT: + case LONG: + case FLOAT: + case DOUBLE: + case DECIMAL: + case BYTE: + case CHAR: + case SHORT: + if (DataTypeUtils.isCompatibleDataType(value, dataType)) { + return DataTypeUtils.convertType(value, dataType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName); + } + break; + case DATE: + if (DataTypeUtils.isDateTypeCompatible(value, dateFormat)) { + return DataTypeUtils.convertType(value, dataType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName); + } + break; + case TIME: + if (DataTypeUtils.isTimeTypeCompatible(value, timeFormat)) { + return DataTypeUtils.convertType(value, dataType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName); + } + break; + case TIMESTAMP: + if (DataTypeUtils.isTimestampTypeCompatible(value, timestampFormat)) { + return DataTypeUtils.convertType(value, dataType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName); + } + break; + } + + return value; + } + + @Override + public RecordSchema getSchema() throws MalformedRecordException { + return schema; + } + + @Override + public void close() throws IOException { + this.rowIterator.close(); + } + + void setLogger(ComponentLog logger) { Review Comment: Is there a reason for this setter method as opposed to passing the logger in the constructor? ########## nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/excel/ExcelReader.java: ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.excel; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaAccessStrategy; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.schema.inference.InferSchemaAccessStrategy; +import org.apache.nifi.schema.inference.RecordSourceFactory; +import org.apache.nifi.schema.inference.SchemaInferenceEngine; +import org.apache.nifi.schema.inference.SchemaInferenceUtil; +import org.apache.nifi.schema.inference.TimeValueInference; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; +import org.apache.nifi.serialization.DateTimeUtils; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.SchemaRegistryService; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.stream.io.NonCloseableInputStream; +import org.apache.poi.ss.usermodel.Row; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.stream.IntStream; + +@Tags({"excel", "spreadsheet", "xlsx", "parse", "record", "row", "reader", "values", "cell"}) +@CapabilityDescription("Parses a Microsoft Excel document returning each row in each sheet as a separate record. " + + "This reader allows for inferring a schema either based on the first line of an Excel sheet if a 'header line' is " + + "present or from all the desired sheets, or providing an explicit schema " + + "for interpreting the values. See Controller Service's Usage for further documentation. " + + "This reader is currently only capable of processing .xlsx " + + "(XSSF 2007 OOXML file format) Excel documents and not older .xls (HSSF '97(-2007) file format) documents.)") +public class ExcelReader extends SchemaRegistryService implements RecordReaderFactory { + + private static final AllowableValue HEADER_DERIVED = new AllowableValue("excel-header-derived", "Use fields From Header", + "The first chosen row of the Excel sheet is a header row that contains the columns representative of all the rows " + + "in the desired sheets. The schema will be derived by using those columns in the header."); + public static final PropertyDescriptor DESIRED_SHEETS = new PropertyDescriptor + .Builder().name("extract-sheets") + .displayName("Sheets to Extract") + .description("Comma separated list of Excel document sheet names whose rows should be extracted from the excel document. If this property" + + " is left blank then all the rows from all the sheets will be extracted from the Excel document. The list of names is case in-sensitive. Any sheets not" + + " specified in this value will be ignored. A bulletin will be generated if a specified sheet(s) are not found.") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor FIRST_ROW_NUM = new PropertyDescriptor + .Builder().name("excel-extract-first-row") + .displayName("Row number to start from") + .description("The row number of the first row to start processing (One based)." + + " Use this to skip over rows of data at the top of a worksheet that are not part of the dataset.") + .required(true) + .defaultValue("0") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + + private AtomicReferenceArray<String> desiredSheets; + private volatile int firstRow; + private volatile String dateFormat; + private volatile String timeFormat; + private volatile String timestampFormat; + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + this.firstRow = getFirstRow(context); + String[] rawDesiredSheets = getRawDesiredSheets(context); + this.desiredSheets = new AtomicReferenceArray<>(rawDesiredSheets.length); + IntStream.range(0, rawDesiredSheets.length) + .forEach(index -> this.desiredSheets.set(index, rawDesiredSheets[index])); + this.dateFormat = context.getProperty(DateTimeUtils.DATE_FORMAT).getValue(); + this.timeFormat = context.getProperty(DateTimeUtils.TIME_FORMAT).getValue(); + this.timestampFormat = context.getProperty(DateTimeUtils.TIMESTAMP_FORMAT).getValue(); + } + + private int getFirstRow(final PropertyContext context) { + int rawFirstRow = context.getProperty(FIRST_ROW_NUM).asInteger(); + return getZeroBasedIndex(rawFirstRow); + } + + static int getZeroBasedIndex(int rawFirstRow) { + return rawFirstRow > 0 ? rawFirstRow - 1 : 0; + } + private String[] getRawDesiredSheets(final PropertyContext context) { Review Comment: Private methods should be defined after public and protected methods in general class ordering, so it would be helpful to move this down in the class. ########## nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/excel/TestExcelRecordReader.java: ########## @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.excel; + +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.util.MockComponentLog; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.Mockito; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicReferenceArray; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestExcelRecordReader { + + private static final String DATA_FORMATTING_FILE = "dataformatting.xlsx"; + private static final String MULTI_SHEET_FILE = "twoSheets.xlsx"; + + @Test + public void testNonExcelFile() throws IOException { + ExcelRecordReaderArgs args = new ExcelRecordReaderArgs.Builder() + .withInputStream(getInputStream("notExcel.txt")) + .withLogger(Mockito.mock(ComponentLog.class)) + .build(); + + MalformedRecordException mre = assertThrows(MalformedRecordException.class, () -> new ExcelRecordReader(args)); + assertTrue(ExceptionUtils.getStackTrace(mre).contains("this is not a valid OOXML")); + } + + @Test + public void testOlderExcelFormatFile() throws IOException { + ExcelRecordReaderArgs args = new ExcelRecordReaderArgs.Builder() + .withInputStream(getInputStream("olderFormat.xls")) + .withLogger(Mockito.mock(ComponentLog.class)) + .build(); + + MalformedRecordException mre = assertThrows(MalformedRecordException.class, () -> new ExcelRecordReader(args)); + assertTrue(ExceptionUtils.getStackTrace(mre).contains("data appears to be in the OLE2 Format")); + } + @Test + public void testMultipleRecordsSingleSheet() throws IOException, MalformedRecordException { + ExcelRecordReaderArgs args = new ExcelRecordReaderArgs.Builder() + .withInputStream(getInputStream(DATA_FORMATTING_FILE)) + .withSchema(getDataFormattingSchema()) + .withLogger(Mockito.mock(ComponentLog.class)) + .build(); + + ExcelRecordReader recordReader = getRecordReader(args); + List<Record> records = getRecords(recordReader, false, false, true); + + assertEquals(9, records.size()); + } + + private RecordSchema getDataFormattingSchema() { + final List<RecordField> fields = List.of( + new RecordField("Numbers", RecordFieldType.DOUBLE.getDataType()), + new RecordField("Timestamps", RecordFieldType.DATE.getDataType()), + new RecordField("Money", RecordFieldType.DOUBLE.getDataType()), + new RecordField("Flags", RecordFieldType.BOOLEAN.getDataType())); + + return new SimpleRecordSchema(fields); + } + + private InputStream getInputStream(String excelFile) throws IOException { + String excelResourcesDir = "src/test/resources/excel"; + Path excelDoc = Paths.get(excelResourcesDir, excelFile); + return Files.newInputStream(excelDoc); + } + + private ExcelRecordReader getRecordReader(ExcelRecordReaderArgs args) throws MalformedRecordException { + ExcelRecordReader recordReader = new ExcelRecordReader(args); + recordReader.setLogger(new MockComponentLog(null, recordReader)); + + return recordReader; + } + + private List<Record> getRecords(ExcelRecordReader recordReader, boolean coerceTypes, boolean dropUnknownFields, boolean print) throws IOException, MalformedRecordException { + Record record; + List<Record> records = new ArrayList<>(); + while ((record = recordReader.nextRecord(coerceTypes, dropUnknownFields)) != null) { + if (print) { + System.out.println(record.toMap()); + } Review Comment: This should be removed, `System.out.println()` should not be included in tests. ########## nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/excel/TestExcelRecordReader.java: ########## @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.excel; + +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.util.MockComponentLog; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.Mockito; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicReferenceArray; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestExcelRecordReader { + + private static final String DATA_FORMATTING_FILE = "dataformatting.xlsx"; + private static final String MULTI_SHEET_FILE = "twoSheets.xlsx"; + + @Test + public void testNonExcelFile() throws IOException { + ExcelRecordReaderArgs args = new ExcelRecordReaderArgs.Builder() + .withInputStream(getInputStream("notExcel.txt")) + .withLogger(Mockito.mock(ComponentLog.class)) Review Comment: Instead of calling `Mockito.mock()` `ComponentLog` can be declared as a member variable with the `@Mock` annotation, and the class can use `@ExtendWith(MockitoExtension.class)` to instantiate the mock. ########## nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/excel/ExcelReader.java: ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.excel; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaAccessStrategy; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.schema.inference.InferSchemaAccessStrategy; +import org.apache.nifi.schema.inference.RecordSourceFactory; +import org.apache.nifi.schema.inference.SchemaInferenceEngine; +import org.apache.nifi.schema.inference.SchemaInferenceUtil; +import org.apache.nifi.schema.inference.TimeValueInference; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; +import org.apache.nifi.serialization.DateTimeUtils; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.SchemaRegistryService; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.stream.io.NonCloseableInputStream; +import org.apache.poi.ss.usermodel.Row; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.stream.IntStream; + +@Tags({"excel", "spreadsheet", "xlsx", "parse", "record", "row", "reader", "values", "cell"}) +@CapabilityDescription("Parses a Microsoft Excel document returning each row in each sheet as a separate record. " + + "This reader allows for inferring a schema either based on the first line of an Excel sheet if a 'header line' is " + + "present or from all the desired sheets, or providing an explicit schema " + + "for interpreting the values. See Controller Service's Usage for further documentation. " + + "This reader is currently only capable of processing .xlsx " + + "(XSSF 2007 OOXML file format) Excel documents and not older .xls (HSSF '97(-2007) file format) documents.)") +public class ExcelReader extends SchemaRegistryService implements RecordReaderFactory { + + private static final AllowableValue HEADER_DERIVED = new AllowableValue("excel-header-derived", "Use fields From Header", + "The first chosen row of the Excel sheet is a header row that contains the columns representative of all the rows " + + "in the desired sheets. The schema will be derived by using those columns in the header."); + public static final PropertyDescriptor DESIRED_SHEETS = new PropertyDescriptor + .Builder().name("extract-sheets") + .displayName("Sheets to Extract") + .description("Comma separated list of Excel document sheet names whose rows should be extracted from the excel document. If this property" + + " is left blank then all the rows from all the sheets will be extracted from the Excel document. The list of names is case in-sensitive. Any sheets not" + + " specified in this value will be ignored. A bulletin will be generated if a specified sheet(s) are not found.") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor FIRST_ROW_NUM = new PropertyDescriptor + .Builder().name("excel-extract-first-row") + .displayName("Row number to start from") + .description("The row number of the first row to start processing (One based)." + + " Use this to skip over rows of data at the top of a worksheet that are not part of the dataset.") + .required(true) + .defaultValue("0") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + + private AtomicReferenceArray<String> desiredSheets; + private volatile int firstRow; + private volatile String dateFormat; + private volatile String timeFormat; + private volatile String timestampFormat; + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + this.firstRow = getFirstRow(context); + String[] rawDesiredSheets = getRawDesiredSheets(context); + this.desiredSheets = new AtomicReferenceArray<>(rawDesiredSheets.length); + IntStream.range(0, rawDesiredSheets.length) + .forEach(index -> this.desiredSheets.set(index, rawDesiredSheets[index])); + this.dateFormat = context.getProperty(DateTimeUtils.DATE_FORMAT).getValue(); + this.timeFormat = context.getProperty(DateTimeUtils.TIME_FORMAT).getValue(); + this.timestampFormat = context.getProperty(DateTimeUtils.TIMESTAMP_FORMAT).getValue(); + } + + private int getFirstRow(final PropertyContext context) { + int rawFirstRow = context.getProperty(FIRST_ROW_NUM).asInteger(); + return getZeroBasedIndex(rawFirstRow); + } + + static int getZeroBasedIndex(int rawFirstRow) { + return rawFirstRow > 0 ? rawFirstRow - 1 : 0; + } + private String[] getRawDesiredSheets(final PropertyContext context) { + final String desiredSheetsDelimited = context.getProperty(DESIRED_SHEETS).getValue(); + return getRawDesiredSheets(desiredSheetsDelimited, getLogger()); + } + + static String[] getRawDesiredSheets(String desiredSheetsDelimited, ComponentLog logger) { + if (desiredSheetsDelimited != null) { + String[] delimitedSheets = StringUtils.split(desiredSheetsDelimited, ","); + if (delimitedSheets != null) { + return delimitedSheets; + } else { + if (logger != null) { + logger.debug("Excel document was parsed but no sheets with the specified desired names were found."); + } + } + } + + return new String[0]; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); + properties.add(DESIRED_SHEETS); + properties.add(FIRST_ROW_NUM); + properties.add(DateTimeUtils.DATE_FORMAT); + properties.add(DateTimeUtils.TIME_FORMAT); + properties.add(DateTimeUtils.TIMESTAMP_FORMAT); + + return properties; + } + + @Override + protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final PropertyContext context) { + if (allowableValue.equalsIgnoreCase(HEADER_DERIVED.getValue())) { + return new ExcelHeaderSchemaStrategy(context); + } else if (allowableValue.equalsIgnoreCase(SchemaInferenceUtil.INFER_SCHEMA.getValue())) { + final RecordSourceFactory<Row> sourceFactory = (variables, in) -> new ExcelRecordSource(in, context, variables); + final SchemaInferenceEngine<Row> inference = new ExcelSchemaInference(new TimeValueInference(dateFormat, timeFormat, timestampFormat)); + return new InferSchemaAccessStrategy<>(sourceFactory, inference, getLogger()); + } + + return super.getSchemaAccessStrategy(allowableValue, schemaRegistry, context); + } + + @Override + protected List<AllowableValue> getSchemaAccessStrategyValues() { + final List<AllowableValue> allowableValues = new ArrayList<>(super.getSchemaAccessStrategyValues()); + allowableValues.add(HEADER_DERIVED); + allowableValues.add(SchemaInferenceUtil.INFER_SCHEMA); + return allowableValues; + } + + @Override + public RecordReader createRecordReader(Map<String, String> variables, InputStream in, long inputLength, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException { + // Use Mark/Reset of a BufferedInputStream in case we read from the Input Stream for the header. + in.mark(1024 * 1024); + final RecordSchema schema = getSchema(variables, new NonCloseableInputStream(in), null); + in.reset(); + + ExcelRecordReaderArgs args = new ExcelRecordReaderArgs.Builder() + .withDateFormat(dateFormat) + .withDesiredSheets(desiredSheets) + .withFirstRow(firstRow) + .withInputStream(in) + .withLogger(logger) + .withSchema(schema) + .withTimeFormat(timeFormat) + .withTimestampFormat(timestampFormat) + .build(); + + return new ExcelRecordReader(args); Review Comment: Instead of passing the `InputStream` with the configuration, recommend keeping it as a separate argument to the constructor, the same applies to `ComponentLog`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
