This is an automated email from the ASF dual-hosted git repository. volodymyr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit 4efa1f981618173dcbc5ba78e109e1f4c9776b1b Author: Charles Givre <[email protected]> AuthorDate: Fri Mar 27 09:12:45 2020 -0400 DRILL-7641: Convert Excel Reader to use Streaming Reader closes #2024 --- contrib/format-excel/pom.xml | 5 + .../drill/exec/store/excel/ExcelBatchReader.java | 300 ++++++++++----------- .../drill/exec/store/excel/TestExcelFormat.java | 16 +- 3 files changed, 158 insertions(+), 163 deletions(-) diff --git a/contrib/format-excel/pom.xml b/contrib/format-excel/pom.xml index ca6b6ab..c8be2fe 100644 --- a/contrib/format-excel/pom.xml +++ b/contrib/format-excel/pom.xml @@ -64,6 +64,11 @@ <version>${project.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>com.github.pjfanning</groupId> + <artifactId>excel-streaming-reader</artifactId> + <version>2.3.2</version> + </dependency> </dependencies> <build> <plugins> diff --git a/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelBatchReader.java b/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelBatchReader.java index 32c062d..88d124b 100644 --- a/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelBatchReader.java +++ b/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelBatchReader.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.store.excel; +import com.github.pjfanning.xlsx.StreamingReader; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.TypeProtos.MinorType; @@ -28,19 +29,15 @@ import org.apache.drill.exec.physical.resultSet.RowSetLoader; import org.apache.drill.exec.record.metadata.ColumnMetadata; import org.apache.drill.exec.record.metadata.MetadataUtils; import org.apache.drill.exec.record.metadata.SchemaBuilder; -import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.exec.vector.accessor.ScalarWriter; import org.apache.drill.exec.vector.accessor.TupleWriter; import org.apache.hadoop.mapred.FileSplit; import org.apache.poi.ss.usermodel.Cell; import org.apache.poi.ss.usermodel.CellType; -import org.apache.poi.ss.usermodel.CellValue; import org.apache.poi.ss.usermodel.DateUtil; -import org.apache.poi.ss.usermodel.FormulaEvaluator; import org.apache.poi.ss.usermodel.Row; -import org.apache.poi.xssf.usermodel.XSSFRow; -import org.apache.poi.xssf.usermodel.XSSFSheet; -import org.apache.poi.xssf.usermodel.XSSFWorkbook; +import org.apache.poi.ss.usermodel.Sheet; +import org.apache.poi.ss.usermodel.Workbook; import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator; import org.joda.time.Instant; import org.slf4j.Logger; @@ -50,6 +47,8 @@ import java.util.Date; import java.util.Iterator; import java.io.IOException; import java.util.ArrayList; +import java.util.List; +import java.util.NoSuchElementException; import java.util.TimeZone; public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> { @@ -66,23 +65,25 @@ public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> { private static final String MISSING_FIELD_NAME_HEADER = "field_"; - private final ExcelReaderConfig readerConfig; + private static final int ROW_CACHE_SIZE = 100; + + private static final int BUFFER_SIZE = 4096; - private XSSFSheet sheet; + private final ExcelReaderConfig readerConfig; - private XSSFWorkbook workbook; + private Sheet sheet; - private InputStream fsStream; + private Row currentRow; - private FormulaEvaluator evaluator; + private Workbook workbook; - private ArrayList<String> excelFieldNames; + private InputStream fsStream; - private ArrayList<ScalarWriter> columnWriters; + private List<String> excelFieldNames; - private ArrayList<CellType> cellTypes; + private List<ScalarWriter> columnWriters; - private ArrayList<CellWriter> cellWriterArray; + private List<CellWriter> cellWriterArray; private Iterator<Row> rowIterator; @@ -90,14 +91,10 @@ public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> { private int totalColumnCount; - private int lineCount; - private boolean firstLine; private FileSplit split; - private ResultSetLoader loader; - private int recordCount; static class ExcelReaderConfig { @@ -134,92 +131,104 @@ public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> { @Override public boolean open(FileSchemaNegotiator negotiator) { split = negotiator.split(); - loader = negotiator.build(); + ResultSetLoader loader = negotiator.build(); rowWriter = loader.writer(); openFile(negotiator); defineSchema(); return true; } + /** + * This method opens the Excel file, initializes the Streaming Excel Reader, and initializes the sheet variable. + * @param negotiator The Drill file negotiator object that represents the file system + */ private void openFile(FileScanFramework.FileSchemaNegotiator negotiator) { try { fsStream = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath()); - workbook = new XSSFWorkbook(fsStream); + + // Open streaming reader + workbook = StreamingReader.builder() + .rowCacheSize(ROW_CACHE_SIZE) + .bufferSize(BUFFER_SIZE) + .open(fsStream); } catch (Exception e) { throw UserException .dataReadError(e) .message("Failed to open open input file: %s", split.getPath().toString()) - .message(e.getMessage()) + .addContext(e.getMessage()) .build(logger); } - - // Evaluate formulae - evaluator = workbook.getCreationHelper().createFormulaEvaluator(); - - workbook.setMissingCellPolicy(Row.MissingCellPolicy.CREATE_NULL_AS_BLANK); sheet = getSheet(); } /** * This function defines the schema from the header row. - * @return TupleMedata of the discovered schema */ - private TupleMetadata defineSchema() { + private void defineSchema() { SchemaBuilder builder = new SchemaBuilder(); - return getColumnHeaders(builder); + getColumnHeaders(builder); } - private TupleMetadata getColumnHeaders(SchemaBuilder builder) { + private void getColumnHeaders(SchemaBuilder builder) { //Get the field names - int columnCount = 0; + int columnCount; - // Case for empty sheet. - if (sheet.getFirstRowNum() == 0 && sheet.getLastRowNum() == 0) { - return builder.buildSchema(); + // Case for empty sheet + if (sheet.getLastRowNum() == 0) { + builder.buildSchema(); + return; } + rowIterator = sheet.iterator(); + // Get the number of columns. columnCount = getColumnCount(); - excelFieldNames = new ArrayList<>(columnCount); - cellWriterArray = new ArrayList<>(columnCount); - rowIterator = sheet.iterator(); + excelFieldNames = new ArrayList<>(); + cellWriterArray = new ArrayList<>(); //If there are no headers, create columns names of field_n if (readerConfig.headerRow == -1) { String missingFieldName; - for (int i = 0; i < columnCount; i++) { + int i = 0; + + for (Cell c : currentRow) { missingFieldName = MISSING_FIELD_NAME_HEADER + (i + 1); makeColumn(builder, missingFieldName, TypeProtos.MinorType.VARCHAR); excelFieldNames.add(i, missingFieldName); + i++; } - columnWriters = new ArrayList<>(excelFieldNames.size()); - cellTypes = new ArrayList<>(excelFieldNames.size()); + columnWriters = new ArrayList<>(columnCount); - return builder.buildSchema(); + builder.buildSchema(); } else if (rowIterator.hasNext()) { - //Find the header row - int firstHeaderRow = getFirstHeaderRow(); - - while (lineCount < firstHeaderRow) { - Row row = rowIterator.next(); - lineCount++; - } //Get the header row and column count - Row row = rowIterator.next(); - totalColumnCount = row.getLastCellNum(); - cellTypes = new ArrayList<>(totalColumnCount); + totalColumnCount = currentRow.getLastCellNum(); + Cell dataCell = null; //Read the header row - Iterator<Cell> cellIterator = row.cellIterator(); + Iterator<Cell> headerRowIterator = currentRow.cellIterator(); int colPosition = 0; - String tempColumnName = ""; + String tempColumnName; - while (cellIterator.hasNext()) { - Cell cell = cellIterator.next(); + // Get the first data row. + currentRow = rowIterator.next(); + Row firstDataRow = currentRow; + Iterator<Cell> dataRowIterator = firstDataRow.cellIterator(); + + + while (headerRowIterator.hasNext()) { + // We need this to get the header names + Cell cell = headerRowIterator.next(); + + // Since header names are most likely all Strings, we need the first row of actual data to get the data types + try { + dataCell = dataRowIterator.next(); + } catch (NoSuchElementException e) { + // Do nothing... empty value in data cell + } - CellValue cellValue = evaluator.evaluate(cell); - switch (cellValue.getCellType()) { + switch (dataCell.getCellType()) { case STRING: tempColumnName = cell.getStringCellValue() .replace(PARSER_WILDCARD, SAFE_WILDCARD) @@ -227,28 +236,26 @@ public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> { .replaceAll("\\n", HEADER_NEW_LINE_REPLACEMENT); makeColumn(builder, tempColumnName, TypeProtos.MinorType.VARCHAR); excelFieldNames.add(colPosition, tempColumnName); - cellTypes.add(CellType.STRING); break; - case NUMERIC: - tempColumnName = String.valueOf(cell.getNumericCellValue()); + case FORMULA: + case NUMERIC: + tempColumnName = cell.getStringCellValue(); makeColumn(builder, tempColumnName, TypeProtos.MinorType.FLOAT8); excelFieldNames.add(colPosition, tempColumnName); - cellTypes.add(CellType.NUMERIC); break; } colPosition++; } } - columnWriters = new ArrayList<>(excelFieldNames.size()); - return builder.buildSchema(); + columnWriters = new ArrayList<>(); + builder.buildSchema(); } /** * Helper function to get the selected sheet from the configuration - * - * @return XSSFSheet The selected sheet + * @return Sheet The selected sheet */ - private XSSFSheet getSheet() { + private Sheet getSheet() { int sheetIndex = 0; if (!readerConfig.sheetName.isEmpty()) { sheetIndex = workbook.getSheetIndex(readerConfig.sheetName); @@ -267,14 +274,21 @@ public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> { /** * Returns the column count. There are a few gotchas here in that we have to know the header row and count the physical number of cells - * in that row. Since the user can define the header row, + * in that row. This function also has to move the rowIterator object to the first row of data. * @return The number of actual columns */ private int getColumnCount() { + // Initialize + currentRow = rowIterator.next(); int rowNumber = readerConfig.headerRow > 0 ? sheet.getFirstRowNum() : 0; - XSSFRow sheetRow = sheet.getRow(rowNumber); - return sheetRow != null ? sheetRow.getPhysicalNumberOfCells() : 0; + // If the headerRow is greater than zero, advance the iterator to the first row of data + // This is unfortunately necessary since the streaming reader eliminated the getRow() method. + for (int i = 1; i < rowNumber; i++) { + currentRow = rowIterator.next(); + } + + return currentRow.getPhysicalNumberOfCells(); } @Override @@ -289,83 +303,78 @@ public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> { } private boolean nextLine(RowSetLoader rowWriter) { - if( sheet.getFirstRowNum() == 0 && sheet.getLastRowNum() == 0) { + if (sheet.getLastRowNum() == 0) { // Case for empty sheet return false; - } else if (!rowIterator.hasNext()) { - return false; } else if (recordCount >= readerConfig.lastRow) { return false; } - int lastRow = readerConfig.lastRow; - if (recordCount < lastRow && rowIterator.hasNext()) { - lineCount++; - - Row row = rowIterator.next(); - - // If the user specified that there are no headers, get the column count - if (readerConfig.headerRow == -1 && recordCount == 0) { - this.totalColumnCount = row.getLastCellNum(); - } + // If the user specified that there are no headers, get the column count + if (readerConfig.headerRow == -1 && recordCount == 0) { + totalColumnCount = currentRow.getLastCellNum(); + } - int colPosition = 0; - if (readerConfig.firstColumn != 0) { - colPosition = readerConfig.firstColumn - 1; - } + int colPosition = 0; + if (readerConfig.firstColumn != 0) { + colPosition = readerConfig.firstColumn - 1; + } - int finalColumn = totalColumnCount; - if (readerConfig.lastColumn != 0) { - finalColumn = readerConfig.lastColumn - 1; - } - rowWriter.start(); - for (int colWriterIndex = 0; colPosition < finalColumn; colPosition++) { - Cell cell = row.getCell(colPosition); + int finalColumn = totalColumnCount; + if (readerConfig.lastColumn != 0) { + finalColumn = readerConfig.lastColumn - 1; + } + rowWriter.start(); + for (int colWriterIndex = 0; colPosition < finalColumn; colWriterIndex++) { + Cell cell = currentRow.getCell(colPosition); - CellValue cellValue = evaluator.evaluate(cell); + populateColumnArray(cell, colPosition); + cellWriterArray.get(colWriterIndex).load(cell); - populateColumnArray(cell, cellValue, colPosition); - cellWriterArray.get(colWriterIndex).load(cell); + colPosition++; + } - colWriterIndex++; - } + if (firstLine) { + firstLine = false; + } + rowWriter.save(); + recordCount++; - if (firstLine) { - firstLine = false; - } - rowWriter.save(); - recordCount++; - return true; - } else { + if (!rowIterator.hasNext()) { return false; + } else { + currentRow = rowIterator.next(); + return true; } - } /** * Function to populate the column array * @param cell The input cell object - * @param cellValue The cell value * @param colPosition The index of the column */ - private void populateColumnArray(Cell cell, CellValue cellValue, int colPosition) { + private void populateColumnArray(Cell cell, int colPosition) { if (!firstLine) { return; } - if (cellValue == null) { + // Case for empty data cell in first row. In this case, fall back to string. + if (cell == null) { + addColumnToArray(rowWriter, excelFieldNames.get(colPosition), MinorType.VARCHAR); + return; + } + + CellType cellType = cell.getCellType(); + if (cellType == CellType.STRING || readerConfig.allTextMode) { addColumnToArray(rowWriter, excelFieldNames.get(colPosition), MinorType.VARCHAR); + } else if (cellType == CellType.NUMERIC && DateUtil.isCellDateFormatted(cell)) { + // Case if the column is a date or time + addColumnToArray(rowWriter, excelFieldNames.get(colPosition), MinorType.TIMESTAMP); + } else if (cellType == CellType.NUMERIC || cellType == CellType.FORMULA) { + // Case if the column is numeric + addColumnToArray(rowWriter, excelFieldNames.get(colPosition), MinorType.FLOAT8); } else { - CellType cellType = cellValue.getCellType(); - if (cellType == CellType.STRING || readerConfig.allTextMode) { - addColumnToArray(rowWriter, excelFieldNames.get(colPosition), MinorType.VARCHAR); - } else if (cellType == CellType.NUMERIC && DateUtil.isCellDateFormatted(cell)) { - addColumnToArray(rowWriter, excelFieldNames.get(colPosition), MinorType.TIMESTAMP); - } else if (cellType == CellType.NUMERIC) { - addColumnToArray(rowWriter, excelFieldNames.get(colPosition), MinorType.FLOAT8); - } else { - logger.warn("Unknown data type. Drill only supports reading NUMERIC and STRING."); - } + logger.warn("Unknown data type. Drill only supports reading NUMERIC and STRING."); } } @@ -411,28 +420,13 @@ public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> { } } - /** - * Returns the index of the first row of actual data. This function is to be used to find the header row as the POI skips blank rows. - * @return The headerRow index, corrected for blank rows - */ - private int getFirstHeaderRow() { - int firstRow = sheet.getFirstRowNum(); - int headerRow = readerConfig.headerRow; - - if (headerRow < 0) { - return firstRow; - } else { - return headerRow; - } - } - @Override public void close() { if (workbook != null) { try { workbook.close(); } catch (IOException e) { - logger.warn("Error when closing XSSFWorkbook resource: {}", e.getMessage()); + logger.warn("Error when closing Excel Workbook resource: {}", e.getMessage()); } workbook = null; } @@ -441,13 +435,13 @@ public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> { try { fsStream.close(); } catch (IOException e) { - logger.warn("Error when closing XSSFWorkbook resource: {}", e.getMessage()); + logger.warn("Error when closing Excel File Stream resource: {}", e.getMessage()); } fsStream = null; } } - public class CellWriter { + public static class CellWriter { ScalarWriter columnWriter; CellWriter(ScalarWriter columnWriter) { @@ -463,11 +457,10 @@ public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> { } public void load(Cell cell) { - CellValue cellValue = evaluator.evaluate(cell); - if (cellValue == null) { + if (cell == null) { columnWriter.setNull(); } else { - String fieldValue = cellValue.getStringValue(); + String fieldValue = cell.getStringCellValue(); if (fieldValue == null && readerConfig.allTextMode) { fieldValue = String.valueOf(cell.getNumericCellValue()); } @@ -476,52 +469,47 @@ public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> { } } - public class NumericStringWriter extends ExcelBatchReader.CellWriter { + public static class NumericStringWriter extends ExcelBatchReader.CellWriter { NumericStringWriter(ScalarWriter columnWriter) { super(columnWriter); } public void load(Cell cell) { - String fieldValue = String.valueOf(cell.getNumericCellValue()); - - if (fieldValue == null) { + if (cell == null) { columnWriter.setNull(); } else { + String fieldValue = String.valueOf(cell.getNumericCellValue()); columnWriter.setString(fieldValue); } } } - public class NumericCellWriter extends ExcelBatchReader.CellWriter { + public static class NumericCellWriter extends ExcelBatchReader.CellWriter { NumericCellWriter(ScalarWriter columnWriter) { super(columnWriter); } public void load(Cell cell) { - CellValue cellValue = evaluator.evaluate(cell); - if (cellValue == null) { + if (cell == null) { columnWriter.setNull(); } else { - double fieldNumValue = cellValue.getNumberValue(); + double fieldNumValue = cell.getNumericCellValue(); columnWriter.setDouble(fieldNumValue); } } } - public class TimestampCellWriter extends ExcelBatchReader.CellWriter { + public static class TimestampCellWriter extends ExcelBatchReader.CellWriter { TimestampCellWriter(ScalarWriter columnWriter) { super(columnWriter); } public void load(Cell cell) { - CellValue cellValue = evaluator.evaluate(cell); - - if (cellValue == null) { + if (cell == null) { columnWriter.setNull(); } else { - logger.debug("Cell value: {}", cellValue.getNumberValue()); - - Date dt = DateUtil.getJavaDate(cellValue.getNumberValue(), TimeZone.getTimeZone("UTC")); + logger.debug("Cell value: {}", cell.getNumericCellValue()); + Date dt = DateUtil.getJavaDate(cell.getNumericCellValue(), TimeZone.getTimeZone("UTC")); Instant timeStamp = new Instant(dt.toInstant().getEpochSecond() * 1000); columnWriter.setTimestamp(timeStamp); } diff --git a/contrib/format-excel/src/test/java/org/apache/drill/exec/store/excel/TestExcelFormat.java b/contrib/format-excel/src/test/java/org/apache/drill/exec/store/excel/TestExcelFormat.java index fb7df5c..5700b40 100644 --- a/contrib/format-excel/src/test/java/org/apache/drill/exec/store/excel/TestExcelFormat.java +++ b/contrib/format-excel/src/test/java/org/apache/drill/exec/store/excel/TestExcelFormat.java @@ -73,7 +73,8 @@ public class TestExcelFormat extends ClusterTest { testBuilder() .sqlQuery(sql) - .ordered().baselineColumns("id", "first_name", "last_name", "email", "gender", "birthdate", "balance", "order_count", "average_order") + .unOrdered() + .baselineColumns("id", "first_name", "last_name", "email", "gender", "birthdate", "balance", "order_count", "average_order") .baselineValues(1.0, "Cornelia", "Matej", "[email protected]", "Female", "10/31/1974", 735.29, 22.0, 33.42227272727273) .baselineValues(2.0, "Nydia", "Heintsch", "[email protected]", "Female", "12/10/1966", 784.14, 22.0, 35.64272727272727) .baselineValues(3.0, "Waiter", "Sherel", "[email protected]", "Male", "3/12/1961", 172.36, 17.0, 10.138823529411766) @@ -320,7 +321,7 @@ public class TestExcelFormat extends ClusterTest { testBuilder() .sqlQuery(sql) - .ordered() + .unOrdered() .baselineColumns("col1", "col2", "col3") .baselineValues(1.0,2.0,null) .baselineValues(2.0,4.0,null) @@ -339,11 +340,12 @@ public class TestExcelFormat extends ClusterTest { testBuilder() .sqlQuery(sql) - .ordered().baselineColumns("col1", "col2") - .baselineValues("1.0", "Bob") - .baselineValues("2.0", "Steve") - .baselineValues("3.0", "Anne") - .baselineValues("Bob", "3.0") + .unOrdered() + .baselineColumns("col1", "col2") + .baselineValues("1", "Bob") + .baselineValues("2", "Steve") + .baselineValues("3", "Anne") + .baselineValues("Bob", "3") .go(); }
