This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d03241acf9 Re-implement CSV record reader to skip unparseable lines
(#14396)
d03241acf9 is described below
commit d03241acf9754bc42f030e43fc5e513ccddf9e9c
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Fri Nov 8 01:07:17 2024 -0800
Re-implement CSV record reader to skip unparseable lines (#14396)
---
.../plugin/inputformat/csv/CSVRecordReader.java | 377 ++++-----
.../inputformat/csv/CSVRecordReaderConfig.java | 19 +-
.../inputformat/csv/CSVRecordReaderTest.java | 921 ++++++++++-----------
.../pinot-csv/src/test/resources/dataFileBasic.csv | 8 +-
.../pinot-csv/src/test/resources/dataFileEmpty.csv | 0
.../resources/dataFileWithAlternateDelimiter.csv | 2 +-
.../src/test/resources/dataFileWithCustomNull.csv | 5 +
.../test/resources/dataFileWithMultiLineValues.csv | 4 +
.../resources/dataFileWithMultipleCombinations.csv | 10 +-
.../dataFileWithMultipleCombinationsParseable.csv | 15 -
....csv => dataFileWithNoHeaderAndEmptyValues.csv} | 0
.../test/resources/dataFileWithQuotedHeaders.csv | 3 -
...WithValidHeaders.csv => dataFileWithQuotes.csv} | 0
.../test/resources/dataFileWithSingleColumn.csv | 4 +
.../resources/dataFileWithSpaceAroundHeaders.csv | 4 -
.../resources/dataFileWithSurroundingSpaces.csv | 4 +
...es.csv => dataFileWithUnparseableFirstLine.csv} | 0
.../resources/dataFileWithUnparseableLastLine.csv | 3 +
...leBasic.csv => dataFileWithUnparseableLine.csv} | 3 +-
.../resources/dataFileWithUnparseableLines2.csv | 5 -
.../pinot/spi/data/readers/RecordReader.java | 5 +
.../spi/data/readers/RecordReaderFileConfig.java | 9 +-
22 files changed, 654 insertions(+), 747 deletions(-)
diff --git
a/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
b/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
index 636574a19b..68958480f9 100644
---
a/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
+++
b/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
@@ -20,16 +20,12 @@ package org.apache.pinot.plugin.inputformat.csv;
import java.io.BufferedReader;
import java.io.File;
-import java.io.FileReader;
import java.io.IOException;
-import java.io.Reader;
-import java.io.StringReader;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Optional;
import java.util.Set;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
@@ -53,284 +49,199 @@ import org.slf4j.LoggerFactory;
public class CSVRecordReader implements RecordReader {
private static final Logger LOGGER =
LoggerFactory.getLogger(CSVRecordReader.class);
- private File _dataFile;
- private CSVFormat _format;
- private CSVParser _parser;
- private Iterator<CSVRecord> _iterator;
- private CSVRecordExtractor _recordExtractor;
- private Map<String, Integer> _headerMap = new HashMap<>();
+ private static final Map<String, CSVFormat> CSV_FORMAT_MAP = new HashMap<>();
- private BufferedReader _bufferedReader;
- private CSVRecordReaderConfig _config = null;
+ static {
+ for (CSVFormat.Predefined format : CSVFormat.Predefined.values()) {
+ CSV_FORMAT_MAP.put(canonicalize(format.name()), format.getFormat());
+ }
+ }
- public CSVRecordReader() {
+ private static String canonicalize(String format) {
+ return StringUtils.remove(format, '_').toUpperCase();
}
- private static CSVFormat baseCsvFormat(CSVRecordReaderConfig config) {
- if (config.getFileFormat() == null) {
+ private static CSVFormat getCSVFormat(@Nullable String format) {
+ if (format == null) {
return CSVFormat.DEFAULT;
}
- switch (config.getFileFormat().toUpperCase()) {
- case "EXCEL":
- return CSVFormat.EXCEL;
- case "MYSQL":
- return CSVFormat.MYSQL;
- case "RFC4180":
- return CSVFormat.RFC4180;
- case "TDF":
- return CSVFormat.TDF;
- default:
- return CSVFormat.DEFAULT;
- }
- }
-
- private static Map<String, Integer> parseHeaderMapFromLine(CSVFormat format,
String line) {
- try (StringReader stringReader = new StringReader(line)) {
- try (CSVParser parser = format.parse(stringReader)) {
- return parser.getHeaderMap();
- }
- } catch (IOException e) {
- throw new RuntimeException("Failed to parse header from line: " + line,
e);
+ CSVFormat csvFormat = CSV_FORMAT_MAP.get(canonicalize(format));
+ if (csvFormat != null) {
+ return csvFormat;
+ } else {
+ LOGGER.warn("Failed to find CSV format for: {}, using DEFAULT format",
format);
+ return CSVFormat.DEFAULT;
}
}
- private static Character getMultiValueDelimiter(CSVRecordReaderConfig
config) {
- if (config == null) {
- return CSVRecordReaderConfig.DEFAULT_MULTI_VALUE_DELIMITER;
- } else if (config.isMultiValueDelimiterEnabled()) {
- return config.getMultiValueDelimiter();
- }
- return null;
- }
+ private File _dataFile;
+ private CSVRecordReaderConfig _config;
+ private CSVFormat _format;
+ private BufferedReader _reader;
+ private CSVParser _parser;
+ private List<String> _columns;
+ private Iterator<CSVRecord> _iterator;
+ private CSVRecordExtractor _recordExtractor;
- private static boolean useLineIterator(CSVRecordReaderConfig config) {
- return config != null && config.isSkipUnParseableLines();
- }
+ // Following fields are used to handle exceptions in hasNext() method
+ private int _nextLineId;
+ private int _numSkippedLines;
+ private RuntimeException _exceptionInHasNext;
+ private CSVFormat _recoveryFormat;
@Override
public void init(File dataFile, @Nullable Set<String> fieldsToRead,
@Nullable RecordReaderConfig recordReaderConfig)
throws IOException {
_dataFile = dataFile;
- _config = (CSVRecordReaderConfig) recordReaderConfig;
- _format = createCSVFormat();
-
- // If header is provided by the client, use it. Otherwise, parse the
header from the first line of the file.
- // Overwrite the format with the header information.
-
Optional.ofNullable(_config).map(CSVRecordReaderConfig::getHeader).ifPresent(header
-> {
- _headerMap = parseHeaderMapFromLine(_format, header);
- _format = _format.builder().setHeader(_headerMap.keySet().toArray(new
String[0])).build();
- });
-
- validateHeaderWithDelimiter();
- initIterator();
- initRecordExtractor(fieldsToRead);
- }
-
- private void initRecordExtractor(Set<String> fieldsToRead) {
- final CSVRecordExtractorConfig recordExtractorConfig = new
CSVRecordExtractorConfig();
-
recordExtractorConfig.setMultiValueDelimiter(getMultiValueDelimiter(_config));
- recordExtractorConfig.setColumnNames(_headerMap.keySet());
- _recordExtractor = new CSVRecordExtractor();
- _recordExtractor.init(fieldsToRead, recordExtractorConfig);
- }
-
- private CSVFormat createCSVFormat() {
- if (_config == null) {
- return
CSVFormat.DEFAULT.builder().setDelimiter(CSVRecordReaderConfig.DEFAULT_DELIMITER).setHeader().build();
+ _config = recordReaderConfig != null ? (CSVRecordReaderConfig)
recordReaderConfig : new CSVRecordReaderConfig();
+ _format = getCSVFormat();
+ _reader = RecordReaderUtils.getBufferedReader(_dataFile);
+ _parser = _format.parse(_reader);
+ _columns = _parser.getHeaderNames();
+ _iterator = _parser.iterator();
+ _recordExtractor = getRecordExtractor(fieldsToRead);
+ _nextLineId = (int) _parser.getCurrentLineNumber();
+
+ // Read the first record, and validate if the header uses the configured
delimiter
+ // (address https://github.com/apache/pinot/issues/7187)
+ boolean hasNext;
+ try {
+ hasNext = _iterator.hasNext();
+ } catch (RuntimeException e) {
+ throw new IOException("Failed to read first record from file: " +
_dataFile, e);
+ }
+ if (hasNext) {
+ CSVRecord record = _iterator.next();
+ if (record.size() > 1 && _columns.size() <= 1) {
+ throw new IllegalStateException("Header does not contain the
configured delimiter");
+ }
+ _reader.close();
+ _reader = RecordReaderUtils.getBufferedReader(_dataFile);
+ _parser = _format.parse(_reader);
+ _iterator = _parser.iterator();
}
+ }
- final CSVFormat.Builder builder = baseCsvFormat(_config).builder()
+ private CSVFormat getCSVFormat() {
+ CSVFormat.Builder builder = getCSVFormat(_config.getFileFormat()).builder()
+ .setHeader() // Parse header from the file
.setDelimiter(_config.getDelimiter())
- .setHeader()
- .setSkipHeaderRecord(_config.isSkipHeader())
- .setCommentMarker(_config.getCommentMarker())
- .setEscape(_config.getEscapeCharacter())
.setIgnoreEmptyLines(_config.isIgnoreEmptyLines())
.setIgnoreSurroundingSpaces(_config.isIgnoreSurroundingSpaces())
.setQuote(_config.getQuoteCharacter());
-
-
Optional.ofNullable(_config.getQuoteMode()).map(QuoteMode::valueOf).ifPresent(builder::setQuoteMode);
-
Optional.ofNullable(_config.getRecordSeparator()).ifPresent(builder::setRecordSeparator);
-
Optional.ofNullable(_config.getNullStringValue()).ifPresent(builder::setNullString);
-
- return builder.build();
- }
-
- private void initIterator()
- throws IOException {
- if (useLineIterator(_config)) {
- _bufferedReader = new BufferedReader(new FileReader(_dataFile), 1024 *
32); // 32KB buffer size
- _iterator = new LineIterator();
- } else {
- _parser = _format.parse(RecordReaderUtils.getBufferedReader(_dataFile));
- _headerMap = _parser.getHeaderMap();
- _iterator = _parser.iterator();
+ if (_config.getCommentMarker() != null) {
+ builder.setCommentMarker(_config.getCommentMarker());
}
- }
-
- private void validateHeaderWithDelimiter()
- throws IOException {
- if (_config == null || _config.getHeader() == null ||
useLineIterator(_config)) {
- return;
+ if (_config.getEscapeCharacter() != null) {
+ builder.setEscape(_config.getEscapeCharacter());
}
- final CSVParser parser =
_format.parse(RecordReaderUtils.getBufferedReader(_dataFile));
- final Iterator<CSVRecord> iterator = parser.iterator();
- if (iterator.hasNext() && recordHasMultipleValues(iterator.next()) &&
delimiterNotPresentInHeader(
- _config.getDelimiter(), _config.getHeader())) {
- throw new IllegalArgumentException("Configured header does not contain
the configured delimiter");
+ if (_config.getNullStringValue() != null) {
+ builder.setNullString(_config.getNullStringValue());
}
+ if (_config.getQuoteMode() != null) {
+ builder.setQuoteMode(QuoteMode.valueOf(_config.getQuoteMode()));
+ }
+ if (_config.getRecordSeparator() != null) {
+ builder.setRecordSeparator(_config.getRecordSeparator());
+ }
+ CSVFormat format = builder.build();
+ String header = _config.getHeader();
+ if (header == null) {
+ return format;
+ }
+ // Parse header using the current format, and set it into the builder
+ try (CSVParser parser = CSVParser.parse(header, format)) {
+ format = builder.setHeader(parser.getHeaderNames().toArray(new
String[0]))
+ .setSkipHeaderRecord(_config.isSkipHeader()).build();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to parse header from line: " +
header, e);
+ }
+ return format;
}
- private boolean recordHasMultipleValues(CSVRecord record) {
- return record.size() > 1;
- }
-
- private boolean delimiterNotPresentInHeader(char delimiter, String
csvHeader) {
- return !StringUtils.contains(csvHeader, delimiter);
+ private CSVRecordExtractor getRecordExtractor(@Nullable Set<String>
fieldsToRead) {
+ CSVRecordExtractorConfig recordExtractorConfig = new
CSVRecordExtractorConfig();
+ if (_config.isMultiValueDelimiterEnabled()) {
+
recordExtractorConfig.setMultiValueDelimiter(_config.getMultiValueDelimiter());
+ }
+ recordExtractorConfig.setColumnNames(new HashSet<>(_columns));
+ CSVRecordExtractor recordExtractor = new CSVRecordExtractor();
+ recordExtractor.init(fieldsToRead, recordExtractorConfig);
+ return recordExtractor;
}
- /**
- * Returns a copy of the header map that iterates in column order.
- * <p>
- * The map keys are column names. The map values are 0-based indices.
- * </p>
- * @return a copy of the header map that iterates in column order.
- */
- public Map<String, Integer> getCSVHeaderMap() {
- // if header row is not configured and input file doesn't contain a valid
header record, the returned map would
- // contain values from the first row in the input file.
- return _headerMap;
+ public List<String> getColumns() {
+ return _columns;
}
@Override
public boolean hasNext() {
- return _iterator.hasNext();
+ try {
+ return _iterator.hasNext();
+ } catch (RuntimeException e) {
+ if (_config.isStopOnError()) {
+ LOGGER.warn("Caught exception while reading CSV file: {}, stopping
processing", _dataFile, e);
+ return false;
+ } else {
+ // Cache exception here and throw it in next() method
+ _exceptionInHasNext = e;
+ return true;
+ }
+ }
}
@Override
public GenericRow next(GenericRow reuse)
throws IOException {
+ if (_exceptionInHasNext != null) {
+ // When hasNext() throws an exception, recreate the reader and skip to
the next line, then throw the exception
+ // TODO: This is very expensive. Consider marking the reader then reset
it. The challenge here is that the reader
+ // offset is not the same as parsed offset, and we need to mark at
the correct offset.
+ _reader.close();
+ _reader = RecordReaderUtils.getBufferedReader(_dataFile);
+ _numSkippedLines = _nextLineId + 1;
+ for (int i = 0; i < _numSkippedLines; i++) {
+ _reader.readLine();
+ }
+ _nextLineId = _numSkippedLines;
+ // Create recovery format if not created yet. Recovery format has header
preset, and does not skip header record.
+ if (_recoveryFormat == null) {
+ _recoveryFormat =
+ _format.builder().setHeader(_columns.toArray(new
String[0])).setSkipHeaderRecord(false).build();
+ }
+ _parser = _recoveryFormat.parse(_reader);
+ _iterator = _parser.iterator();
+
+ RuntimeException exception = _exceptionInHasNext;
+ _exceptionInHasNext = null;
+ LOGGER.warn("Caught exception while reading CSV file: {}, recovering
from line: {}", _dataFile, _numSkippedLines,
+ exception);
+
+ throw exception;
+ }
+
CSVRecord record = _iterator.next();
_recordExtractor.extract(record, reuse);
+ _nextLineId = _numSkippedLines + (int) _parser.getCurrentLineNumber();
return reuse;
}
@Override
public void rewind()
throws IOException {
- if (_parser != null && !_parser.isClosed()) {
- _parser.close();
- }
- closeIterator();
- initIterator();
+ _reader.close();
+ _reader = RecordReaderUtils.getBufferedReader(_dataFile);
+ _parser = _format.parse(_reader);
+ _iterator = _parser.iterator();
+ _nextLineId = (int) _parser.getCurrentLineNumber();
+ _numSkippedLines = 0;
}
@Override
public void close()
throws IOException {
- closeIterator();
-
- if (_parser != null && !_parser.isClosed()) {
- _parser.close();
- }
- }
-
- private void closeIterator()
- throws IOException {
- // if header is not provided by the client it would be rebuilt. When it's
provided by the client it's initialized
- // once in the constructor
- if (useLineIterator(_config) && _config.getHeader() == null) {
- _headerMap.clear();
- }
-
- if (_bufferedReader != null) {
- _bufferedReader.close();
- }
- }
-
- class LineIterator implements Iterator<CSVRecord> {
- private String _nextLine;
- private CSVRecord _current;
-
- public LineIterator() {
- init();
- }
-
- private void init() {
- try {
- if (_config.getHeader() != null) {
- if (_config.isSkipHeader()) {
- // When skip header config is set and header is supplied – skip
the first line from the input file
- _bufferedReader.readLine();
- // turn off the property so that it doesn't interfere with further
parsing
- _format = _format.builder().setSkipHeaderRecord(false).build();
- }
- } else {
- // read the first line
- String headerLine = _bufferedReader.readLine();
- _headerMap = parseHeaderMapFromLine(_format, headerLine);
- // If header isn't provided, the first line would be set as header
and the 'skipHeader' property
- // is set to false.
- _format = _format.builder()
- .setSkipHeaderRecord(false)
- .setHeader(_headerMap.keySet().toArray(new String[0]))
- .build();
- }
- _nextLine = _bufferedReader.readLine();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- private CSVRecord getNextRecord() {
- while (_nextLine != null) {
- try (Reader reader = new StringReader(_nextLine)) {
- try (CSVParser csvParser = _format.parse(reader)) {
- List<CSVRecord> csvRecords = csvParser.getRecords();
- if (csvRecords == null || csvRecords.isEmpty()) {
- // Can be thrown on: 1) Empty lines 2) Commented lines
- throw new NoSuchElementException("Failed to find any records");
- }
- // There would be only one record as lines are read one after the
other
- CSVRecord csvRecord = csvRecords.get(0);
-
- // move the pointer to the next line
- _nextLine = _bufferedReader.readLine();
- return csvRecord;
- } catch (Exception e) {
- // Find the next line that can be parsed
- _nextLine = _bufferedReader.readLine();
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- return null;
- }
-
- @Override
- public boolean hasNext() {
- if (_current == null) {
- _current = getNextRecord();
- }
-
- return _current != null;
- }
-
- @Override
- public CSVRecord next() {
- CSVRecord next = _current;
- _current = null;
-
- if (next == null) {
- // hasNext() wasn't called before
- next = getNextRecord();
- if (next == null) {
- throw new NoSuchElementException("No more CSV records available");
- }
- }
-
- return next;
+ if (_reader != null) {
+ _reader.close();
}
}
}
diff --git
a/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderConfig.java
b/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderConfig.java
index bdc4ae2ed8..6d99f9712e 100644
---
a/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderConfig.java
+++
b/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderConfig.java
@@ -37,13 +37,14 @@ public class CSVRecordReaderConfig implements
RecordReaderConfig {
private Character _escapeCharacter; // Default is null
private String _nullStringValue;
private boolean _skipHeader;
- private boolean _skipUnParseableLines = false;
private boolean _ignoreEmptyLines = true;
private boolean _ignoreSurroundingSpaces = true;
private Character _quoteCharacter = '"';
private String _quoteMode;
private String _recordSeparator;
+ // When set to true, the record reader will stop processing the file if it
encounters an error.
+ private boolean _stopOnError;
public String getFileFormat() {
return _fileFormat;
@@ -77,14 +78,6 @@ public class CSVRecordReaderConfig implements
RecordReaderConfig {
_multiValueDelimiter = multiValueDelimiter;
}
- public boolean isSkipUnParseableLines() {
- return _skipUnParseableLines;
- }
-
- public void setSkipUnParseableLines(boolean skipUnParseableLines) {
- _skipUnParseableLines = skipUnParseableLines;
- }
-
public boolean isMultiValueDelimiterEnabled() {
return _multiValueDelimiterEnabled;
}
@@ -165,6 +158,14 @@ public class CSVRecordReaderConfig implements
RecordReaderConfig {
_recordSeparator = recordSeparator;
}
+ public boolean isStopOnError() {
+ return _stopOnError;
+ }
+
+ public void setStopOnError(boolean stopOnError) {
+ _stopOnError = stopOnError;
+ }
+
@Override
public String toString() {
return ToStringBuilder.reflectionToString(this,
ToStringStyle.SHORT_PREFIX_STYLE);
diff --git
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
index 6a1d86a48d..12303657b0 100644
---
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
+++
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
@@ -21,11 +21,12 @@ package org.apache.pinot.plugin.inputformat.csv;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
+import java.io.UncheckedIOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import javax.annotation.Nullable;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVPrinter;
import org.apache.commons.lang3.StringUtils;
@@ -33,43 +34,49 @@ import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.AbstractRecordReaderTest;
import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.data.readers.PrimaryKey;
import org.apache.pinot.spi.data.readers.RecordReader;
-import org.testng.Assert;
import org.testng.annotations.Test;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.assertTrue;
+
public class CSVRecordReaderTest extends AbstractRecordReaderTest {
private static final char CSV_MULTI_VALUE_DELIMITER = '\t';
+ private static final CSVRecordReaderConfig[] NULL_AND_EMPTY_CONFIGS = new
CSVRecordReaderConfig[]{
+ null, new CSVRecordReaderConfig()
+ };
@Override
protected RecordReader createRecordReader(File file)
throws Exception {
- CSVRecordReaderConfig csvRecordReaderConfig = new CSVRecordReaderConfig();
- csvRecordReaderConfig.setMultiValueDelimiter(CSV_MULTI_VALUE_DELIMITER);
- CSVRecordReader csvRecordReader = new CSVRecordReader();
- csvRecordReader.init(file, _sourceFields, csvRecordReaderConfig);
- return csvRecordReader;
+ CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
+ readerConfig.setMultiValueDelimiter(CSV_MULTI_VALUE_DELIMITER);
+ CSVRecordReader recordReader = new CSVRecordReader();
+ recordReader.init(file, _sourceFields, readerConfig);
+ return recordReader;
}
@Override
- protected void writeRecordsToFile(List<Map<String, Object>> recordsToWrite)
+ protected void writeRecordsToFile(List<Map<String, Object>> records)
throws Exception {
- Schema pinotSchema = getPinotSchema();
- String[] columns = pinotSchema.getColumnNames().toArray(new String[0]);
- try (FileWriter fileWriter = new FileWriter(_dataFile);
- CSVPrinter csvPrinter = new CSVPrinter(fileWriter,
CSVFormat.DEFAULT.withHeader(columns))) {
-
- for (Map<String, Object> r : recordsToWrite) {
- Object[] record = new Object[columns.length];
- for (int i = 0; i < columns.length; i++) {
- if (pinotSchema.getFieldSpecFor(columns[i]).isSingleValueField()) {
- record[i] = r.get(columns[i]);
+ Schema schema = getPinotSchema();
+ String[] columns = schema.getColumnNames().toArray(new String[0]);
+ int numColumns = columns.length;
+ try (CSVPrinter csvPrinter = new CSVPrinter(new FileWriter(_dataFile),
+ CSVFormat.Builder.create().setHeader(columns).build())) {
+ for (Map<String, Object> record : records) {
+ Object[] values = new Object[numColumns];
+ for (int i = 0; i < numColumns; i++) {
+ if (schema.getFieldSpecFor(columns[i]).isSingleValueField()) {
+ values[i] = record.get(columns[i]);
} else {
- record[i] = StringUtils.join(((List) r.get(columns[i])).toArray(),
CSV_MULTI_VALUE_DELIMITER);
+ values[i] = StringUtils.join(((List<?>)
record.get(columns[i])).toArray(), CSV_MULTI_VALUE_DELIMITER);
}
}
- csvPrinter.printRecord(record);
+ csvPrinter.printRecord(values);
}
}
}
@@ -80,590 +87,572 @@ public class CSVRecordReaderTest extends
AbstractRecordReaderTest {
}
@Override
- protected void checkValue(RecordReader recordReader, List<Map<String,
Object>> expectedRecordsMap,
+ protected void checkValue(RecordReader recordReader, List<Map<String,
Object>> expectedRecords,
List<Object[]> expectedPrimaryKeys)
throws Exception {
- for (int i = 0; i < expectedRecordsMap.size(); i++) {
- Map<String, Object> expectedRecord = expectedRecordsMap.get(i);
+ int numRecords = expectedRecords.size();
+ for (int i = 0; i < numRecords; i++) {
+ Map<String, Object> expectedRecord = expectedRecords.get(i);
GenericRow actualRecord = recordReader.next();
for (FieldSpec fieldSpec : _pinotSchema.getAllFieldSpecs()) {
- String fieldSpecName = fieldSpec.getName();
+ String column = fieldSpec.getName();
if (fieldSpec.isSingleValueField()) {
- Assert.assertEquals(actualRecord.getValue(fieldSpecName).toString(),
- expectedRecord.get(fieldSpecName).toString());
+ assertEquals(actualRecord.getValue(column).toString(),
expectedRecord.get(column).toString());
} else {
- List expectedRecords = (List) expectedRecord.get(fieldSpecName);
- if (expectedRecords.size() == 1) {
-
Assert.assertEquals(actualRecord.getValue(fieldSpecName).toString(),
expectedRecords.get(0).toString());
+ List<?> expectedValues = (List<?>) expectedRecord.get(column);
+ if (expectedValues.size() == 1) {
+ assertEquals(actualRecord.getValue(column).toString(),
expectedValues.get(0).toString());
} else {
- Object[] actualRecords = (Object[])
actualRecord.getValue(fieldSpecName);
- Assert.assertEquals(actualRecords.length, expectedRecords.size());
- for (int j = 0; j < actualRecords.length; j++) {
- Assert.assertEquals(actualRecords[j].toString(),
expectedRecords.get(j).toString());
+ Object[] actualValues = (Object[]) actualRecord.getValue(column);
+ assertEquals(actualValues.length, expectedValues.size());
+ for (int j = 0; j < actualValues.length; j++) {
+ assertEquals(actualValues[j].toString(),
expectedValues.get(j).toString());
}
}
}
- PrimaryKey primaryKey =
actualRecord.getPrimaryKey(getPrimaryKeyColumns());
- for (int j = 0; j < primaryKey.getValues().length; j++) {
- Assert.assertEquals(primaryKey.getValues()[j].toString(),
expectedPrimaryKeys.get(i)[j].toString());
+ Object[] expectedPrimaryKey = expectedPrimaryKeys.get(i);
+ Object[] actualPrimaryKey =
actualRecord.getPrimaryKey(getPrimaryKeyColumns()).getValues();
+ for (int j = 0; j < actualPrimaryKey.length; j++) {
+ assertEquals(actualPrimaryKey[j].toString(),
expectedPrimaryKey[j].toString());
}
}
}
- Assert.assertFalse(recordReader.hasNext());
+ assertFalse(recordReader.hasNext());
}
@Test
- public void testInvalidDelimiterInHeader() {
- // setup
- CSVRecordReaderConfig csvRecordReaderConfig = new CSVRecordReaderConfig();
- csvRecordReaderConfig.setMultiValueDelimiter(CSV_MULTI_VALUE_DELIMITER);
-
csvRecordReaderConfig.setHeader("col1;col2;col3;col4;col5;col6;col7;col8;col9;col10");
- csvRecordReaderConfig.setDelimiter(',');
- CSVRecordReader csvRecordReader = new CSVRecordReader();
-
- //execute and assert
- Assert.assertThrows(IllegalArgumentException.class,
- () -> csvRecordReader.init(_dataFile, null, csvRecordReaderConfig));
+ public void testInvalidDelimiterInHeader()
+ throws IOException {
+ CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
+ readerConfig.setMultiValueDelimiter(CSV_MULTI_VALUE_DELIMITER);
+
readerConfig.setHeader("col1;col2;col3;col4;col5;col6;col7;col8;col9;col10");
+ try (CSVRecordReader recordReader = new CSVRecordReader()) {
+ assertThrows(IllegalStateException.class, () ->
recordReader.init(_dataFile, null, readerConfig));
+ }
}
@Test
public void testValidDelimiterInHeader()
throws IOException {
- //setup
- CSVRecordReaderConfig csvRecordReaderConfig = new CSVRecordReaderConfig();
- csvRecordReaderConfig.setMultiValueDelimiter(CSV_MULTI_VALUE_DELIMITER);
-
csvRecordReaderConfig.setHeader("col1,col2,col3,col4,col5,col6,col7,col8,col9,col10");
- csvRecordReaderConfig.setDelimiter(',');
- CSVRecordReader csvRecordReader = new CSVRecordReader();
-
- //read all fields
- //execute and assert
- csvRecordReader.init(_dataFile, null, csvRecordReaderConfig);
- Assert.assertEquals(10, csvRecordReader.getCSVHeaderMap().size());
- Assert.assertTrue(csvRecordReader.getCSVHeaderMap().containsKey("col1"));
- Assert.assertTrue(csvRecordReader.hasNext());
- }
-
- /**
- * When CSV records contain a single value, then no exception should be
throw while initialising.
- * This test requires a different setup from the rest of the tests as it
requires a single-column
- * CSV. Therefore, we re-write already generated records into a new file,
but only the first
- * column.
- *
- * @throws IOException
- */
- @Test
- public void testHeaderDelimiterSingleColumn()
- throws IOException {
- //setup
-
- //create a single value CSV
- Schema pinotSchema = getPinotSchema();
- //write only the first column in the schema
- String column = pinotSchema.getColumnNames().toArray(new String[0])[0];
- //use a different file name so that other tests aren't affected
- File file = new File(_tempDir, "data1.csv");
- try (FileWriter fileWriter = new FileWriter(file);
- CSVPrinter csvPrinter = new CSVPrinter(fileWriter,
CSVFormat.DEFAULT.withHeader(column))) {
- for (Map<String, Object> r : _records) {
- Object[] record = new Object[1];
- record[0] = r.get(column);
- csvPrinter.printRecord(record);
- }
+ CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
+ readerConfig.setMultiValueDelimiter(CSV_MULTI_VALUE_DELIMITER);
+
readerConfig.setHeader("col1,col2,col3,col4,col5,col6,col7,col8,col9,col10");
+ try (CSVRecordReader recordReader = new CSVRecordReader()) {
+ recordReader.init(_dataFile, null, readerConfig);
+ assertEquals(recordReader.getColumns(),
+ List.of("col1", "col2", "col3", "col4", "col5", "col6", "col7",
"col8", "col9", "col10"));
+ assertTrue(recordReader.hasNext());
}
-
- CSVRecordReaderConfig csvRecordReaderConfig = new CSVRecordReaderConfig();
- csvRecordReaderConfig.setMultiValueDelimiter(CSV_MULTI_VALUE_DELIMITER);
- csvRecordReaderConfig.setHeader("col1");
- CSVRecordReader csvRecordReader = new CSVRecordReader();
-
- //execute and assert
- csvRecordReader.init(file, null, csvRecordReaderConfig);
- Assert.assertTrue(csvRecordReader.hasNext());
}
@Test
- public void testNullValueString()
- throws IOException {
- //setup
- String nullString = "NULL";
- //create a single value CSV
- Schema pinotSchema = getPinotSchema();
- //write only the first column in the schema
- String column = pinotSchema.getColumnNames().toArray(new String[0])[0];
- //use a different file name so that other tests aren't affected
- File file = new File(_tempDir, "data1.csv");
- try (FileWriter fileWriter = new FileWriter(file);
- CSVPrinter csvPrinter = new CSVPrinter(fileWriter,
- CSVFormat.DEFAULT.withHeader("col1", "col2",
"col3").withNullString(nullString))) {
- for (Map<String, Object> r : _records) {
- Object[] record = new Object[3];
- record[0] = r.get(column);
- csvPrinter.printRecord(record);
- }
+ public void testReadingDataFileBasic()
+ throws IOException {
+ File dataFile = getDataFile("dataFileBasic.csv");
+ for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) {
+ validate(dataFile, readerConfig, List.of(
+ createMap("id", "100", "name", "John"),
+ createMap("id", "101", "name", "Jane"),
+ createMap("id", "102", "name", "Alice"),
+ createMap("id", "103", "name", "Bob")
+ ));
}
-
- CSVRecordReaderConfig csvRecordReaderConfig = new CSVRecordReaderConfig();
- csvRecordReaderConfig.setMultiValueDelimiter(CSV_MULTI_VALUE_DELIMITER);
- csvRecordReaderConfig.setHeader("col1,col2,col3");
- csvRecordReaderConfig.setNullStringValue(nullString);
- CSVRecordReader csvRecordReader = new CSVRecordReader();
-
- //execute and assert
- csvRecordReader.init(file, null, csvRecordReaderConfig);
- Assert.assertTrue(csvRecordReader.hasNext());
- csvRecordReader.next();
-
- GenericRow row = csvRecordReader.next();
- Assert.assertNotNull(row.getValue("col1"));
- Assert.assertNull(row.getValue("col2"));
- Assert.assertNull(row.getValue("col3"));
}
@Test
- public void testReadingDataFileWithCommentedLines()
- throws IOException, URISyntaxException {
- URI uri =
ClassLoader.getSystemResource("dataFileWithCommentedLines.csv").toURI();
- File dataFile = new File(uri);
+ public void testReadingDataFileWithSingleColumn()
+ throws IOException {
+ File dataFile = getDataFile("dataFileWithSingleColumn.csv");
+ for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) {
+ validate(dataFile, readerConfig, List.of(
+ createMap("name", "John"),
+ createMap("name", "Jane"),
+ createMap("name", "Jen")
+ ));
+ }
- // test using line iterator
CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
- readerConfig.setSkipUnParseableLines(true);
- readerConfig.setCommentMarker('#');
- List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
null, false);
- Assert.assertEquals(3, genericRows.size());
-
- // test using default CSVRecordReader
- readerConfig.setSkipUnParseableLines(false);
- genericRows = readCSVRecords(dataFile, readerConfig, null, false);
- Assert.assertEquals(3, genericRows.size());
+ readerConfig.setHeader("firstName,lastName,id");
+ readerConfig.setSkipHeader(true);
+ validate(dataFile, readerConfig, List.of(
+ createMap("firstName", "John", "lastName", null, "id", null),
+ createMap("firstName", "Jane", "lastName", null, "id", null),
+ createMap("firstName", "Jen", "lastName", null, "id", null)
+ ));
}
@Test
- public void testReadingDataFileWithEmptyLines()
- throws URISyntaxException, IOException {
- URI uri =
ClassLoader.getSystemResource("dataFileWithEmptyLines.csv").toURI();
- File dataFile = new File(uri);
+ public void testReadingDataFileWithInvalidHeader()
+ throws IOException {
+ File dataFile = getDataFile("dataFileWithInvalidHeader.csv");
+ for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) {
+ try (CSVRecordReader recordReader = new CSVRecordReader()) {
+ assertThrows(IllegalStateException.class, () ->
recordReader.init(dataFile, null, readerConfig));
+ }
+ }
- // test using line iterator
CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
- readerConfig.setSkipUnParseableLines(true);
- List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
null, false);
- Assert.assertEquals(5, genericRows.size());
-
- // test using default CSVRecordReader
- readerConfig.setSkipUnParseableLines(false);
- genericRows = readCSVRecords(dataFile, readerConfig, null, false);
- Assert.assertEquals(5, genericRows.size());
+ readerConfig.setHeader("firstName,lastName,id");
+ readerConfig.setSkipHeader(true);
+ validate(dataFile, readerConfig, List.of(
+ createMap("firstName", "John", "lastName", "Doe", "id", "100"),
+ createMap("firstName", "Jane", "lastName", "Doe", "id", "101"),
+ createMap("firstName", "Jen", "lastName", "Doe", "id", "102")
+ ));
}
@Test
- public void testReadingDataFileWithEscapedQuotes()
- throws URISyntaxException, IOException {
- URI uri =
ClassLoader.getSystemResource("dataFileWithEscapedQuotes.csv").toURI();
- File dataFile = new File(uri);
-
- // test using line iterator
+ public void testReadingDataFileWithAlternateDelimiter()
+ throws IOException {
+ File dataFile = getDataFile("dataFileWithAlternateDelimiter.csv");
CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
- readerConfig.setSkipUnParseableLines(true);
- List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
null, false);
- Assert.assertEquals(2, genericRows.size());
-
- // test using default CSVRecordReader
- readerConfig.setSkipUnParseableLines(false);
- genericRows = readCSVRecords(dataFile, readerConfig, null, false);
- Assert.assertEquals(2, genericRows.size());
+ readerConfig.setDelimiter('|');
+ validate(dataFile, readerConfig, List.of(
+ createMap("id", "100", "firstName", "John", "lastName", "Doe"),
+ createMap("id", "101", "firstName", "Jane", "lastName", "Doe"),
+ createMap("id", "102", "firstName", "Jen", "lastName", "Doe")
+ ));
}
@Test
- public void testReadingDataFileWithNoHeader()
- throws URISyntaxException, IOException {
- URI uri =
ClassLoader.getSystemResource("dataFileWithNoHeader.csv").toURI();
- File dataFile = new File(uri);
+ public void testReadingDataFileWithSurroundingSpaces()
+ throws IOException {
+ File dataFile = getDataFile("dataFileWithSurroundingSpaces.csv");
+ for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) {
+ validate(dataFile, readerConfig, List.of(
+ createMap("firstName", "John", "lastName", "Doe", "id", "100"),
+ createMap("firstName", "Jane", "lastName", "Doe", "id", "101"),
+ createMap("firstName", "Jen", "lastName", "Doe", "id", "102")
+ ));
+ }
- // test using line iterator
CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
- readerConfig.setSkipUnParseableLines(true);
- readerConfig.setHeader("id,name");
- List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
null, false);
- Assert.assertEquals(3, genericRows.size());
-
- // test using default CSVRecordReader
- readerConfig.setSkipUnParseableLines(false);
- genericRows = readCSVRecords(dataFile, readerConfig, null, false);
- Assert.assertEquals(3, genericRows.size());
+ readerConfig.setIgnoreSurroundingSpaces(false);
+ validate(dataFile, readerConfig, List.of(
+ createMap(" firstName ", "John ", " lastName ", " Doe", " id", "100"),
+ createMap(" firstName ", "Jane", " lastName ", " Doe", " id", " 101"),
+ createMap(" firstName ", "Jen", " lastName ", "Doe ", " id", "102")
+ ));
}
@Test
- public void testReadingDataFileWithQuotedHeaders()
- throws URISyntaxException, IOException {
- URI uri =
ClassLoader.getSystemResource("dataFileWithQuotedHeaders.csv").toURI();
- File dataFile = new File(uri);
-
- // test using line iterator
- CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
- readerConfig.setSkipUnParseableLines(true);
- List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
null, false);
- Assert.assertEquals(2, genericRows.size());
-
- // test using default CSVRecordReader
- readerConfig.setSkipUnParseableLines(false);
- genericRows = readCSVRecords(dataFile, readerConfig, null, false);
- Assert.assertEquals(2, genericRows.size());
+ public void testReadingDataFileWithQuotes()
+ throws IOException {
+ File dataFile = getDataFile("dataFileWithQuotes.csv");
+ for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) {
+ validate(dataFile, readerConfig, List.of(
+ createMap("key", "key00", "num0", "12.3", "num1", "8.42"),
+ createMap("key", "key01", "num0", null, "num1", "7.1"),
+ createMap("key", "key02", "num0", null, "num1", "16.81"),
+ createMap("key", "key03", "num0", null, "num1", "7.12")
+ ));
+ }
}
@Test
- public void testLineIteratorReadingDataFileWithUnparseableLines()
- throws URISyntaxException, IOException {
- URI uri =
ClassLoader.getSystemResource("dataFileWithUnparseableLines.csv").toURI();
- File dataFile = new File(uri);
-
+ public void testReadingDataFileWithCustomNull()
+ throws IOException {
+ File dataFile = getDataFile("dataFileWithCustomNull.csv");
CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
- readerConfig.setSkipUnParseableLines(true);
- List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
null, false);
- Assert.assertEquals(1, genericRows.size());
+ readerConfig.setNullStringValue("NULL");
+ validate(dataFile, readerConfig, List.of(
+ createMap("id", "100", "name", null),
+ createMap("id", null, "name", "Jane"),
+ createMap("id", null, "name", null),
+ createMap("id", null, "name", null)
+ ));
}
@Test
- public void testLineIteratorReadingDataFileWithUnparseableLinesWithRewind()
- throws URISyntaxException, IOException {
- URI uri =
ClassLoader.getSystemResource("dataFileWithUnparseableLines2.csv").toURI();
- File dataFile = new File(uri);
+ public void testReadingDataFileWithCommentedLines()
+ throws IOException {
+ File dataFile = getDataFile("dataFileWithCommentedLines.csv");
+ for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) {
+ // Verify first row
+ validate(dataFile, readerConfig, 5, List.of(createMap("id", "# ignore
line#1", "name", null)));
+ }
CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
- readerConfig.setSkipUnParseableLines(true);
- final List<GenericRow> genericRows1 = readCSVRecords(dataFile,
readerConfig, null, true);
- Assert.assertEquals(3, genericRows1.size());
-
- // Start reading again; results should be same
- final List<GenericRow> genericRows2 = readCSVRecords(dataFile,
readerConfig, null, true);
- Assert.assertEquals(3, genericRows2.size());
-
- // Check that the rows are the same
- for (int i = 0; i < genericRows1.size(); i++) {
- Assert.assertEquals(genericRows1.get(i), genericRows2.get(i));
- }
+ readerConfig.setCommentMarker('#');
+ validate(dataFile, readerConfig, List.of(
+ createMap("id", "100", "name", "Jane"),
+ createMap("id", "101", "name", "John"),
+ createMap("id", "102", "name", "Sam")
+ ));
}
@Test
- public void testReadingDataFileWithRewind()
- throws URISyntaxException, IOException {
- URI uri = ClassLoader.getSystemResource("dataFileBasic.csv").toURI();
- File dataFile = new File(uri);
-
- CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
- List<GenericRow> genericRows1 = readCSVRecords(dataFile, readerConfig,
null, true);
- Assert.assertEquals(4, genericRows1.size());
-
- // Start reading again; results should be same
- List<GenericRow> genericRows2 = readCSVRecords(dataFile, readerConfig,
null, true);
- Assert.assertEquals(4, genericRows2.size());
-
- // Check that the rows are the same
- for (int i = 0; i < genericRows1.size(); i++) {
- Assert.assertEquals(genericRows1.get(i), genericRows2.get(i));
+ public void testReadingDataFileWithEmptyLines()
+ throws IOException {
+ File dataFile = getDataFile("dataFileWithEmptyLines.csv");
+ for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) {
+ validate(dataFile, readerConfig, 5);
}
- }
-
- @Test (expectedExceptions = RuntimeException.class)
- public void
testDefaultCsvReaderExceptionReadingDataFileWithUnparseableLines()
- throws URISyntaxException, IOException {
- URI uri =
ClassLoader.getSystemResource("dataFileWithUnparseableLines.csv").toURI();
- File dataFile = new File(uri);
CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
- readCSVRecords(dataFile, readerConfig, null, false);
+ readerConfig.setIgnoreEmptyLines(false);
+ validate(dataFile, readerConfig, 8);
}
@Test
- public void testLineIteratorReadingDataFileWithMultipleCombinations()
- throws URISyntaxException, IOException {
- URI uri =
ClassLoader.getSystemResource("dataFileWithMultipleCombinations.csv").toURI();
- File dataFile = new File(uri);
+ public void testReadingDataFileWithEscapedQuotes()
+ throws IOException {
+ File dataFile = getDataFile("dataFileWithEscapedQuotes.csv");
+ for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) {
+ validate(dataFile, readerConfig, List.of(
+ createMap("\\\"id\\\"", "\\\"100\\\"", "\\\"name\\\"",
"\\\"Jane\\\""),
+ createMap("\\\"id\\\"", "\\\"101\\\"", "\\\"name\\\"",
"\\\"John\\\"")
+ ));
+ }
CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
- readerConfig.setSkipUnParseableLines(true);
- readerConfig.setCommentMarker('#');
- readerConfig.setIgnoreEmptyLines(true);
-
- List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
null, false);
- Assert.assertEquals(7, genericRows.size());
+ readerConfig.setEscapeCharacter('\\');
+ validate(dataFile, readerConfig, List.of(
+ createMap("\"id\"", "\"100\"", "\"name\"", "\"Jane\""),
+ createMap("\"id\"", "\"101\"", "\"name\"", "\"John\"")
+ ));
}
@Test
- public void testDefaultCsvReaderReadingDataFileWithMultipleCombinations()
- throws URISyntaxException, IOException {
- URI uri =
ClassLoader.getSystemResource("dataFileWithMultipleCombinationsParseable.csv").toURI();
- File dataFile = new File(uri);
+ public void testReadingDataFileWithNoHeader()
+ throws IOException {
+ File dataFile = getDataFile("dataFileWithNoHeader.csv");
+ for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) {
+ validate(dataFile, readerConfig, List.of(
+ createMap("100", "101", "Jane", "John"),
+ createMap("100", "102", "Jane", "Sam")
+ ));
+ }
CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
- readerConfig.setCommentMarker('#');
- readerConfig.setIgnoreEmptyLines(true);
-
- List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, new
GenericRow(), false);
- Assert.assertEquals(7, genericRows.size());
+ readerConfig.setHeader("id,name");
+ validate(dataFile, readerConfig, List.of(
+ createMap("id", "100", "name", "Jane"),
+ createMap("id", "101", "name", "John"),
+ createMap("id", "102", "name", "Sam")
+ ));
}
@Test
- public void testLineIteratorRewindMethod()
- throws URISyntaxException, IOException {
- URI uri =
ClassLoader.getSystemResource("dataFileWithMultipleCombinations.csv").toURI();
- File dataFile = new File(uri);
+ public void testReadingDataFileWithNoHeaderAndEmptyValues()
+ throws IOException {
+ File dataFile = getDataFile("dataFileWithNoHeaderAndEmptyValues.csv");
+ for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) {
+ validate(dataFile, readerConfig, List.of(
+ createMap("key00", "key01", "12.3", null, "8.42", "7.1"),
+ createMap("key00", "key02", "12.3", null, "8.42", "16.81"),
+ createMap("key00", "key03", "12.3", null, "8.42", "7.12")
+ ));
+ }
CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
- readerConfig.setSkipUnParseableLines(true);
- readerConfig.setCommentMarker('#');
- readerConfig.setIgnoreEmptyLines(true);
- readCSVRecords(dataFile, readerConfig, null, true);
-
- // Start reading again; results should be same
- List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, new
GenericRow(), false);
- Assert.assertEquals(7, genericRows.size());
+ readerConfig.setHeader("key,num0,num1");
+ validate(dataFile, readerConfig, List.of(
+ createMap("key", "key00", "num0", "12.3", "num1", "8.42"),
+ createMap("key", "key01", "num0", null, "num1", "7.1"),
+ createMap("key", "key02", "num0", null, "num1", "16.81"),
+ createMap("key", "key03", "num0", null, "num1", "7.12")
+ ));
}
@Test
- public void testDefaultCsvReaderRewindMethod()
- throws URISyntaxException, IOException {
- URI uri =
ClassLoader.getSystemResource("dataFileWithMultipleCombinationsParseable.csv").toURI();
- File dataFile = new File(uri);
+ public void testReadingDataFileWithNoRecords()
+ throws IOException {
+ File dataFile = getDataFile("dataFileWithNoRecords.csv");
+ for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) {
+ validate(dataFile, readerConfig, 0);
+ }
CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
- readerConfig.setCommentMarker('#');
- readerConfig.setIgnoreEmptyLines(true);
- readCSVRecords(dataFile, readerConfig, null, true);
-
- // Start reading again; results should be same
- List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
null, false);
- Assert.assertEquals(7, genericRows.size());
+ readerConfig.setHeader("id,name");
+ readerConfig.setSkipHeader(true);
+ validate(dataFile, readerConfig, 0);
}
@Test
- public void testReadingDataFileWithInvalidHeader()
- throws URISyntaxException, IOException {
- URI uri =
ClassLoader.getSystemResource("dataFileWithInvalidHeader.csv").toURI();
- File dataFile = new File(uri);
+ public void testReadingDataFileEmpty()
+ throws IOException {
+ File dataFile = getDataFile("dataFileEmpty.csv");
+ for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) {
+ validate(dataFile, readerConfig, 0);
+ }
- // test using line iterator
CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
- readerConfig.setHeader("firstName,lastName,id");
- readerConfig.setSkipHeader(true);
- readerConfig.setSkipUnParseableLines(true);
- List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
null, false);
- Assert.assertEquals(3, genericRows.size());
-
- // test using default CSVRecordReader
- readerConfig.setSkipUnParseableLines(false);
- genericRows = readCSVRecords(dataFile, readerConfig, null, false);
- Assert.assertEquals(3, genericRows.size());
+ readerConfig.setHeader("id,name");
+ validate(dataFile, readerConfig, 0);
}
@Test
- public void testReadingDataFileWithAlternateDelimiter()
- throws URISyntaxException, IOException {
- URI uri =
ClassLoader.getSystemResource("dataFileWithAlternateDelimiter.csv").toURI();
- File dataFile = new File(uri);
-
- // test using line iterator
- CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
- readerConfig.setDelimiter('|');
- readerConfig.setSkipUnParseableLines(true);
- List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
null, false);
- Assert.assertEquals(3, genericRows.size());
-
- // test using default CSVRecordReader
- readerConfig.setSkipUnParseableLines(false);
- genericRows = readCSVRecords(dataFile, readerConfig, null, false);
- Assert.assertEquals(3, genericRows.size());
+ public void testReadingDataFileWithMultiLineValues()
+ throws IOException {
+ File dataFile = getDataFile("dataFileWithMultiLineValues.csv");
+ for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) {
+ validate(dataFile, readerConfig, List.of(
+ createMap("id", "100", "name", "John\n101,Jane"),
+ createMap("id", "102", "name", "Alice")
+ ));
+ }
}
@Test
- public void testReadingDataFileWithSpaceAroundHeaderFields()
- throws URISyntaxException, IOException {
- URI uri =
ClassLoader.getSystemResource("dataFileWithSpaceAroundHeaders.csv").toURI();
- File dataFile = new File(uri);
-
- // test using line iterator
- CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
- readerConfig.setSkipUnParseableLines(true);
- readerConfig.setIgnoreSurroundingSpaces(true);
- List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
null, false);
- Assert.assertEquals(3, genericRows.size());
- validateSpaceAroundHeadersAreTrimmed(dataFile, readerConfig);
-
- // test using default CSVRecordReader
- readerConfig.setSkipUnParseableLines(false);
- genericRows = readCSVRecords(dataFile, readerConfig, null, false);
- Assert.assertEquals(3, genericRows.size());
- validateSpaceAroundHeadersAreTrimmed(dataFile, readerConfig);
+ public void testReadingDataFileWithUnparseableFirstLine()
+ throws IOException {
+ File dataFile = getDataFile("dataFileWithUnparseableFirstLine.csv");
+ for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) {
+ try (CSVRecordReader recordReader = new CSVRecordReader()) {
+ assertThrows(IOException.class, () -> recordReader.init(dataFile,
null, readerConfig));
+ }
+ }
}
@Test
- public void testReadingDataFileWithSpaceAroundHeaderAreRetained()
- throws URISyntaxException, IOException {
- URI uri =
ClassLoader.getSystemResource("dataFileWithSpaceAroundHeaders.csv").toURI();
- File dataFile = new File(uri);
+ public void testLineIteratorReadingDataFileWithUnparseableLine()
+ throws IOException {
+ File dataFile = getDataFile("dataFileWithUnparseableLine.csv");
+ for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) {
+ try (CSVRecordReader recordReader = new CSVRecordReader()) {
+ recordReader.init(dataFile, null, readerConfig);
+ testUnparseableLine(recordReader);
+ recordReader.rewind();
+ testUnparseableLine(recordReader);
+ }
+ }
- // test using line iterator
CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
- readerConfig.setSkipUnParseableLines(true);
- readerConfig.setIgnoreSurroundingSpaces(false);
- List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
null, false);
- Assert.assertEquals(3, genericRows.size());
- validateSpaceAroundHeadersAreRetained(dataFile, readerConfig);
+ readerConfig.setStopOnError(true);
+ try (CSVRecordReader recordReader = new CSVRecordReader()) {
+ recordReader.init(dataFile, null, readerConfig);
+ testUnparseableLineStopOnError(recordReader);
+ recordReader.rewind();
+ testUnparseableLineStopOnError(recordReader);
+ }
+ }
- // test using default CSVRecordReader
- readerConfig.setSkipUnParseableLines(false);
- genericRows = readCSVRecords(dataFile, readerConfig, null, false);
- Assert.assertEquals(3, genericRows.size());
- validateSpaceAroundHeadersAreRetained(dataFile, readerConfig);
+ private void testUnparseableLine(CSVRecordReader recordReader)
+ throws IOException {
+ // First line is parseable
+ assertTrue(recordReader.hasNext());
+ assertEquals(recordReader.next().getFieldToValueMap(), createMap("id",
"100", "name", "John"));
+ // Second line is unparseable, should throw exception when next() is
called, and being skipped
+ assertTrue(recordReader.hasNext());
+ assertThrows(UncheckedIOException.class, recordReader::next);
+ // Third line is parseable
+ assertTrue(recordReader.hasNext());
+ assertEquals(recordReader.next().getFieldToValueMap(), createMap("id",
"102", "name", "Alice"));
+ // 3 lines in total
+ assertFalse(recordReader.hasNext());
+ }
+
+ private void testUnparseableLineStopOnError(CSVRecordReader recordReader)
+ throws IOException {
+ // First line is parseable
+ assertTrue(recordReader.hasNext());
+ assertEquals(recordReader.next().getFieldToValueMap(), createMap("id",
"100", "name", "John"));
+ // Second line is unparseable, stop here
+ assertFalse(recordReader.hasNext());
}
@Test
- public void testRewindMethodAndSkipHeader()
- throws URISyntaxException, IOException {
- URI uri =
ClassLoader.getSystemResource("dataFileWithInvalidHeader.csv").toURI();
- File dataFile = new File(uri);
+ public void testLineIteratorReadingDataFileWithUnparseableLastLine()
+ throws IOException {
+ File dataFile = getDataFile("dataFileWithUnparseableLastLine.csv");
+ for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) {
+ try (CSVRecordReader recordReader = new CSVRecordReader()) {
+ recordReader.init(dataFile, null, readerConfig);
+ testUnparseableLastLine(recordReader);
+ recordReader.rewind();
+ testUnparseableLastLine(recordReader);
+ }
+ }
CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
- readerConfig.setSkipUnParseableLines(true);
- readerConfig.setHeader("id,name");
- readerConfig.setSkipHeader(true);
- readCSVRecords(dataFile, readerConfig, new GenericRow(), true);
-
- // Start reading again; results should be same
- List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
null, false);
- Assert.assertEquals(3, genericRows.size());
+ readerConfig.setStopOnError(true);
+ try (CSVRecordReader recordReader = new CSVRecordReader()) {
+ recordReader.init(dataFile, null, readerConfig);
+ testUnparseableLastLineStopOnError(recordReader);
+ recordReader.rewind();
+ testUnparseableLastLineStopOnError(recordReader);
+ }
+ }
- // test using default CSVRecordReader
- readerConfig.setSkipUnParseableLines(false);
- readCSVRecords(dataFile, readerConfig, null, true);
+ private void testUnparseableLastLine(CSVRecordReader recordReader)
+ throws IOException {
+ // First line is parseable
+ assertTrue(recordReader.hasNext());
+ assertEquals(recordReader.next().getFieldToValueMap(), createMap("id",
"100", "name", "John"));
+ // Second line is unparseable, should throw exception when next() is
called, and being skipped
+ assertTrue(recordReader.hasNext());
+ assertThrows(UncheckedIOException.class, recordReader::next);
+ // 2 lines in total
+ assertFalse(recordReader.hasNext());
+ }
- // Start reading again; results should be same
- genericRows = readCSVRecords(dataFile, readerConfig, null, false);
- Assert.assertEquals(3, genericRows.size());
+ private void testUnparseableLastLineStopOnError(CSVRecordReader recordReader)
+ throws IOException {
+ // First line is parseable
+ assertTrue(recordReader.hasNext());
+ assertEquals(recordReader.next().getFieldToValueMap(), createMap("id",
"100", "name", "John"));
+ // Second line is unparseable, stop here
+ assertFalse(recordReader.hasNext());
}
@Test
public void testReadingDataFileWithPartialLastRow()
- throws URISyntaxException, IOException {
- URI uri =
ClassLoader.getSystemResource("dataFileWithPartialLastRow.csv").toURI();
- File dataFile = new File(uri);
+ throws IOException {
+ File dataFile = getDataFile("dataFileWithPartialLastRow.csv");
+ for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) {
+ try (CSVRecordReader recordReader = new CSVRecordReader()) {
+ recordReader.init(dataFile, null, readerConfig);
+ testPartialLastRow(recordReader);
+ recordReader.rewind();
+ testPartialLastRow(recordReader);
+ }
+ }
- // test using line iterator
CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
- readerConfig.setSkipUnParseableLines(true);
- List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
null, false);
- Assert.assertEquals(2, genericRows.size());
-
- // Note: The default CSVRecordReader cannot handle unparseable rows
+ readerConfig.setStopOnError(true);
+ try (CSVRecordReader recordReader = new CSVRecordReader()) {
+ recordReader.init(dataFile, null, readerConfig);
+ testPartialLastRowStopOnError(recordReader);
+ recordReader.rewind();
+ testPartialLastRowStopOnError(recordReader);
+ }
}
- @Test
- public void testReadingDataFileWithNoRecords()
- throws URISyntaxException, IOException {
- URI uri =
ClassLoader.getSystemResource("dataFileWithNoRecords.csv").toURI();
- File dataFile = new File(uri);
-
- // test using line iterator
- CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
- readerConfig.setSkipUnParseableLines(true);
- List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
null, false);
- Assert.assertEquals(0, genericRows.size());
-
- // Note: The default CSVRecordReader cannot handle unparseable rows
+ private void testPartialLastRow(CSVRecordReader recordReader)
+ throws IOException {
+ assertTrue(recordReader.hasNext());
+ assertEquals(recordReader.next().getFieldToValueMap(),
+ createMap("id", "100", "firstName", "jane", "lastName", "doe",
"appVersion", "1.0.0", "active", "yes"));
+ assertTrue(recordReader.hasNext());
+ assertEquals(recordReader.next().getFieldToValueMap(),
+ createMap("id", "101", "firstName", "john", "lastName", "doe",
"appVersion", "1.0.1", "active", "yes"));
+ assertTrue(recordReader.hasNext());
+ assertThrows(UncheckedIOException.class, recordReader::next);
+ assertFalse(recordReader.hasNext());
+ }
+
+ private void testPartialLastRowStopOnError(CSVRecordReader recordReader)
+ throws IOException {
+ assertTrue(recordReader.hasNext());
+ assertEquals(recordReader.next().getFieldToValueMap(),
+ createMap("id", "100", "firstName", "jane", "lastName", "doe",
"appVersion", "1.0.0", "active", "yes"));
+ assertTrue(recordReader.hasNext());
+ assertEquals(recordReader.next().getFieldToValueMap(),
+ createMap("id", "101", "firstName", "john", "lastName", "doe",
"appVersion", "1.0.1", "active", "yes"));
+ assertFalse(recordReader.hasNext());
}
@Test
- public void testReadingDataFileWithNoHeaderAndDataRecordsWithEmptyValues()
- throws URISyntaxException, IOException {
- URI uri =
ClassLoader.getSystemResource("dataFileWithNoHeader2.csv").toURI();
- File dataFile = new File(uri);
-
- // test using line iterator
+ public void testLineIteratorReadingDataFileWithMultipleCombinations()
+ throws IOException {
+ File dataFile = getDataFile("dataFileWithMultipleCombinations.csv");
CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
- readerConfig.setSkipUnParseableLines(true);
- readerConfig.setHeader("key,num0,num1");
- List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
null, false);
- Assert.assertEquals(4, genericRows.size());
+ readerConfig.setCommentMarker('#');
+ readerConfig.setEscapeCharacter('\\');
+ try (CSVRecordReader recordReader = new CSVRecordReader()) {
+ recordReader.init(dataFile, null, readerConfig);
+ testCombinations(recordReader);
+ recordReader.rewind();
+ testCombinations(recordReader);
+ }
- // test using default CSVRecordReader
- readerConfig.setSkipUnParseableLines(false);
- genericRows = readCSVRecords(dataFile, readerConfig, null, false);
- Assert.assertEquals(4, genericRows.size());
+ readerConfig.setStopOnError(true);
+ try (CSVRecordReader recordReader = new CSVRecordReader()) {
+ recordReader.init(dataFile, null, readerConfig);
+ testCombinationsStopOnError(recordReader);
+ recordReader.rewind();
+ testCombinationsStopOnError(recordReader);
+ }
}
- @Test
- public void testReadingDataFileWithValidHeaders()
- throws URISyntaxException, IOException {
- URI uri =
ClassLoader.getSystemResource("dataFileWithValidHeaders.csv").toURI();
- File dataFile = new File(uri);
-
- // test using line iterator
- CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
- readerConfig.setSkipUnParseableLines(true);
- // No explicit header is set and attempt to skip the header should be
ignored. 1st line would be treated as the
- // header line.
- readerConfig.setSkipHeader(false);
- List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
null, false);
- Assert.assertEquals(4, genericRows.size());
+ private void testCombinations(CSVRecordReader recordReader)
+ throws IOException {
+ assertTrue(recordReader.hasNext());
+ assertEquals(recordReader.next().getFieldToValueMap(), createMap("id",
"100", "name", "John"));
+ assertTrue(recordReader.hasNext());
+ assertEquals(recordReader.next().getFieldToValueMap(), createMap("id",
"101", "name", "Jane"));
+ assertTrue(recordReader.hasNext());
+ assertEquals(recordReader.next().getFieldToValueMap(), createMap("id",
"102", "name", "Jerry"));
+ assertTrue(recordReader.hasNext());
+ assertEquals(recordReader.next().getFieldToValueMap(), createMap("id",
"103", "name", "Suzanne"));
+ // NOTE: Here we need to skip twice because the first line is a comment
line
+ assertTrue(recordReader.hasNext());
+ assertThrows(UncheckedIOException.class, recordReader::next);
+ assertTrue(recordReader.hasNext());
+ assertThrows(UncheckedIOException.class, recordReader::next);
+ assertTrue(recordReader.hasNext());
+ assertEquals(recordReader.next().getFieldToValueMap(), createMap("id",
"105", "name", "Zack\nZack"));
+ assertTrue(recordReader.hasNext());
+ assertEquals(recordReader.next().getFieldToValueMap(), createMap("id",
"\"106\"", "name", "\"Ze\""));
+ assertTrue(recordReader.hasNext());
+ assertEquals(recordReader.next().getFieldToValueMap(), createMap("id",
"107", "name", "Zu"));
+ assertFalse(recordReader.hasNext());
+ }
+
+ private void testCombinationsStopOnError(CSVRecordReader recordReader)
+ throws IOException {
+ assertTrue(recordReader.hasNext());
+ assertEquals(recordReader.next().getFieldToValueMap(), createMap("id",
"100", "name", "John"));
+ assertTrue(recordReader.hasNext());
+ assertEquals(recordReader.next().getFieldToValueMap(), createMap("id",
"101", "name", "Jane"));
+ assertTrue(recordReader.hasNext());
+ assertEquals(recordReader.next().getFieldToValueMap(), createMap("id",
"102", "name", "Jerry"));
+ assertTrue(recordReader.hasNext());
+ assertEquals(recordReader.next().getFieldToValueMap(), createMap("id",
"103", "name", "Suzanne"));
+ assertFalse(recordReader.hasNext());
+ }
- // test using default CSVRecordReader
- readerConfig.setSkipUnParseableLines(false);
- genericRows = readCSVRecords(dataFile, readerConfig, null, false);
- Assert.assertEquals(4, genericRows.size());
+ private File getDataFile(String fileName) {
+ return new File(ClassLoader.getSystemResource(fileName).getFile());
}
- private List<GenericRow> readCSVRecords(File dataFile,
- CSVRecordReaderConfig readerConfig, GenericRow genericRow, boolean
rewind)
+ private void validate(File dataFile, @Nullable CSVRecordReaderConfig
readerConfig, int expectedNumRows,
+ @Nullable List<Map<String, Object>> expectedRows)
throws IOException {
- List<GenericRow> genericRows = new ArrayList<>();
+ List<GenericRow> genericRows = new ArrayList<>(expectedNumRows);
try (CSVRecordReader recordReader = new CSVRecordReader()) {
recordReader.init(dataFile, null, readerConfig);
- GenericRow reuse = new GenericRow();
while (recordReader.hasNext()) {
- if (genericRow != null) {
- recordReader.next(reuse);
- genericRows.add(reuse);
- } else {
- GenericRow nextRow = recordReader.next();
- genericRows.add(nextRow);
- }
+ genericRows.add(recordReader.next());
}
+ assertEquals(genericRows.size(), expectedNumRows);
- if (rewind) {
- // rewind the reader after reading all the lines
- recordReader.rewind();
+ // Rewind the reader and read again
+ recordReader.rewind();
+ for (GenericRow row : genericRows) {
+ GenericRow genericRow = recordReader.next();
+ assertEquals(genericRow, row);
}
+ assertFalse(recordReader.hasNext());
}
- return genericRows;
- }
- private void validateSpaceAroundHeadersAreTrimmed(File dataFile,
CSVRecordReaderConfig readerConfig)
- throws IOException {
- try (CSVRecordReader recordReader = new CSVRecordReader()) {
- recordReader.init(dataFile, null, readerConfig);
- Map<String, Integer> headerMap = recordReader.getCSVHeaderMap();
- Assert.assertEquals(3, headerMap.size());
- List<String> headers = List.of("firstName", "lastName", "id");
- for (String header : headers) {
- // surrounding spaces in headers are trimmed
- Assert.assertTrue(headerMap.containsKey(header));
+ if (expectedRows != null) {
+ int rowId = 0;
+ for (Map<String, Object> expectedRow : expectedRows) {
+ GenericRow genericRow = genericRows.get(rowId++);
+ assertEquals(genericRow.getFieldToValueMap(), expectedRow);
}
}
}
- private void validateSpaceAroundHeadersAreRetained(File dataFile,
CSVRecordReaderConfig readerConfig)
+ private void validate(File dataFile, @Nullable CSVRecordReaderConfig
readerConfig, int expectedNumRows)
throws IOException {
- try (CSVRecordReader recordReader = new CSVRecordReader()) {
- recordReader.init(dataFile, null, readerConfig);
- Map<String, Integer> headerMap = recordReader.getCSVHeaderMap();
- Assert.assertEquals(3, headerMap.size());
- List<String> headers = List.of(" firstName ", " lastName ", " id");
- for (String header : headers) {
- // surrounding spaces in headers are trimmed
- Assert.assertTrue(headerMap.containsKey(header));
- }
+ validate(dataFile, readerConfig, expectedNumRows, null);
+ }
+
+ private void validate(File dataFile, @Nullable CSVRecordReaderConfig
readerConfig,
+ List<Map<String, Object>> expectedRows)
+ throws IOException {
+ validate(dataFile, readerConfig, expectedRows.size(), expectedRows);
+ }
+
+ private static Map<String, Object> createMap(String... keyValues) {
+ Map<String, Object> map = new HashMap<>();
+ for (int i = 0; i < keyValues.length; i += 2) {
+ map.put(keyValues[i], keyValues[i + 1]);
}
+ return map;
}
}
diff --git
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileBasic.csv
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileBasic.csv
index c2b0fe3262..c2b7445426 100644
---
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileBasic.csv
+++
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileBasic.csv
@@ -1,5 +1,5 @@
id,name
-"100","John"
-"101","Jane"
-"102","Alice"
-"103","Bob"
+100,John
+101,Jane
+102,Alice
+103,Bob
diff --git
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileEmpty.csv
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileEmpty.csv
new file mode 100644
index 0000000000..e69de29bb2
diff --git
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithAlternateDelimiter.csv
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithAlternateDelimiter.csv
index 09d3a2ce2c..4b4c2fffe9 100644
---
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithAlternateDelimiter.csv
+++
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithAlternateDelimiter.csv
@@ -1,4 +1,4 @@
-id|fisrtName|lastName
+id|firstName|lastName
100|John|Doe
101|Jane|Doe
102|Jen|Doe
diff --git
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithCustomNull.csv
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithCustomNull.csv
new file mode 100644
index 0000000000..af119fc1a1
--- /dev/null
+++
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithCustomNull.csv
@@ -0,0 +1,5 @@
+id,name
+100,NULL
+,Jane
+NULL,NULL
+,
diff --git
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithMultiLineValues.csv
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithMultiLineValues.csv
new file mode 100644
index 0000000000..927983fa98
--- /dev/null
+++
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithMultiLineValues.csv
@@ -0,0 +1,4 @@
+id,name
+100,"John
+101,Jane"
+102,Alice
diff --git
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithMultipleCombinations.csv
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithMultipleCombinations.csv
index 5f173f4b6c..8590793e1e 100644
---
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithMultipleCombinations.csv
+++
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithMultipleCombinations.csv
@@ -10,8 +10,14 @@ id,name
102,Jerry
103,Suzanne
-# below line is unparseable by the commons-csv library
+# below line is unparseable by the commons-csv library
"104","Yu"s"
-"105","Zack"
+
+# below line is multi-line value
+"105","Zack
+Zack"
+
+# below line is escaped quotes
\"106\",\"Ze\"
+
107,Zu
diff --git
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithMultipleCombinationsParseable.csv
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithMultipleCombinationsParseable.csv
deleted file mode 100644
index d804f04101..0000000000
---
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithMultipleCombinationsParseable.csv
+++ /dev/null
@@ -1,15 +0,0 @@
-id,name
-
-
-100,John
-# ignore line 1
-
-# ignore line 2
-
-101,Jane
-102,Jerry
-
-103,Suzanne
-"105","Zack"
-\"106\",\"Ze\"
-107,Zu
diff --git
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithNoHeader2.csv
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithNoHeaderAndEmptyValues.csv
similarity index 100%
rename from
pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithNoHeader2.csv
rename to
pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithNoHeaderAndEmptyValues.csv
diff --git
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithQuotedHeaders.csv
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithQuotedHeaders.csv
deleted file mode 100644
index ec7033d1d4..0000000000
---
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithQuotedHeaders.csv
+++ /dev/null
@@ -1,3 +0,0 @@
-"id","name"
-"100","Jane"
-"101","John"
diff --git
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithValidHeaders.csv
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithQuotes.csv
similarity index 100%
rename from
pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithValidHeaders.csv
rename to
pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithQuotes.csv
diff --git
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithSingleColumn.csv
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithSingleColumn.csv
new file mode 100644
index 0000000000..856771f621
--- /dev/null
+++
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithSingleColumn.csv
@@ -0,0 +1,4 @@
+name
+John
+Jane
+Jen
diff --git
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithSpaceAroundHeaders.csv
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithSpaceAroundHeaders.csv
deleted file mode 100644
index c0f140a69c..0000000000
---
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithSpaceAroundHeaders.csv
+++ /dev/null
@@ -1,4 +0,0 @@
- firstName , lastName , id
-John,Doe,100
-Jane,Doe,101
-Jen,Doe,102
diff --git
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithSurroundingSpaces.csv
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithSurroundingSpaces.csv
new file mode 100644
index 0000000000..40678051c3
--- /dev/null
+++
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithSurroundingSpaces.csv
@@ -0,0 +1,4 @@
+ firstName , lastName , id
+John , Doe,100
+Jane, Doe, 101
+Jen,Doe ,102
diff --git
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableLines.csv
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableFirstLine.csv
similarity index 100%
rename from
pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableLines.csv
rename to
pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableFirstLine.csv
diff --git
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableLastLine.csv
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableLastLine.csv
new file mode 100644
index 0000000000..ef9b5942fb
--- /dev/null
+++
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableLastLine.csv
@@ -0,0 +1,3 @@
+id,name
+"100","John"
+"101","Jane"s"
diff --git
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileBasic.csv
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableLine.csv
similarity index 58%
copy from
pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileBasic.csv
copy to
pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableLine.csv
index c2b0fe3262..e0a711badf 100644
---
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileBasic.csv
+++
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableLine.csv
@@ -1,5 +1,4 @@
id,name
"100","John"
-"101","Jane"
+"101","Jane"s"
"102","Alice"
-"103","Bob"
diff --git
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableLines2.csv
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableLines2.csv
deleted file mode 100644
index 80e9a736c3..0000000000
---
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableLines2.csv
+++ /dev/null
@@ -1,5 +0,0 @@
-id,name
-"100","John"s"
-"101","Jane"
-"102","Alice"
-"103","Bob"
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReader.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReader.java
index 020b2b3135..db3a455da3 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReader.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReader.java
@@ -46,11 +46,14 @@ public interface RecordReader extends Closeable,
Serializable {
/**
* Return <code>true</code> if more records remain to be read.
+ * <p>This method should not throw exception. Caller is not responsible for
handling exceptions from this method.
*/
boolean hasNext();
/**
* Get the next record.
+ * <p>This method should be called only if {@link #hasNext()} returns
<code>true</code>. Caller is responsible for
+ * handling exceptions from this method and skip the row if user wants to
continue reading the remaining rows.
*/
default GenericRow next()
throws IOException {
@@ -60,6 +63,8 @@ public interface RecordReader extends Closeable, Serializable
{
/**
* Get the next record. Re-use the given row to reduce garbage.
* <p>The passed in row should be cleared before calling this method.
+ * <p>This method should be called only if {@link #hasNext()} returns
<code>true</code>. Caller is responsible for
+ * handling exceptions from this method and skip the row if user wants to
continue reading the remaining rows.
*
* TODO: Consider clearing the row within the record reader to simplify the
caller
*/
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java
index e7566cb0ff..cd8c600399 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java
@@ -99,10 +99,13 @@ public class RecordReaderFileConfig {
// Return true if RecordReader is done processing.
public boolean isRecordReaderDone() {
- if (_isRecordReaderInitialized) {
- return !_recordReader.hasNext();
+ if (!_isRecordReaderInitialized) {
+ return false;
+ }
+ if (_isRecordReaderClosed) {
+ return true;
}
- return false;
+ return !_recordReader.hasNext();
}
// For testing purposes only.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]