This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 3b2fd7d3cc Added support to skip unparseable records in the csv record
reader (#11487)
3b2fd7d3cc is described below
commit 3b2fd7d3cc595ad32af0967f388ed0e8053379ec
Author: Ragesh Rajagopalan <[email protected]>
AuthorDate: Tue Sep 5 10:30:25 2023 -0700
Added support to skip unparseable records in the csv record reader (#11487)
---
.../plugin/inputformat/csv/CSVRecordReader.java | 170 ++++++++++++-
.../inputformat/csv/CSVRecordReaderConfig.java | 9 +
.../inputformat/csv/CSVRecordReaderTest.java | 262 +++++++++++++++++++++
.../resources/dataFileWithAlternateDelimiter.csv | 4 +
.../test/resources/dataFileWithCommentedLines.csv | 6 +
.../src/test/resources/dataFileWithEmptyLines.csv | 9 +
.../test/resources/dataFileWithEscapedQuotes.csv | 3 +
.../test/resources/dataFileWithInvalidHeader.csv | 4 +
.../resources/dataFileWithMultipleCombinations.csv | 17 ++
.../dataFileWithMultipleCombinationsParseable.csv | 15 ++
.../src/test/resources/dataFileWithNoHeader.csv | 3 +
.../test/resources/dataFileWithQuotedHeaders.csv | 3 +
.../resources/dataFileWithUnparseableLines.csv | 3 +
13 files changed, 495 insertions(+), 13 deletions(-)
diff --git
a/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
b/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
index 400125067f..a635f03cf9 100644
---
a/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
+++
b/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
@@ -18,12 +18,20 @@
*/
package org.apache.pinot.plugin.inputformat.csv;
+import java.io.BufferedReader;
import java.io.File;
+import java.io.FileReader;
import java.io.IOException;
+import java.io.Reader;
+import java.io.StringReader;
+import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.Set;
import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
@@ -33,17 +41,31 @@ import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderConfig;
import org.apache.pinot.spi.data.readers.RecordReaderUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Record reader for CSV file.
*/
+@NotThreadSafe
public class CSVRecordReader implements RecordReader {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CSVRecordReader.class);
+
private File _dataFile;
private CSVFormat _format;
private CSVParser _parser;
private Iterator<CSVRecord> _iterator;
private CSVRecordExtractor _recordExtractor;
+ private Map<String, Integer> _headerMap = new HashMap<>();
+
+ // line iterator specific variables
+ private boolean _useLineIterator = false;
+ private boolean _skipHeaderRecord = false;
+ private boolean _isHeaderProvided = false;
+ private long _skippedLinesCount;
+ private BufferedReader _bufferedReader;
+ private String _nextLine;
public CSVRecordReader() {
}
@@ -83,20 +105,33 @@ public class CSVRecordReader implements RecordReader {
}
char delimiter = config.getDelimiter();
format = format.withDelimiter(delimiter);
+
+ if (config.isSkipUnParseableLines()) {
+ _useLineIterator = true;
+ }
+
String csvHeader = config.getHeader();
if (csvHeader == null) {
format = format.withHeader();
} else {
- //validate header for the delimiter before splitting
- validateHeaderForDelimiter(delimiter, csvHeader, format);
- format = format.withHeader(StringUtils.split(csvHeader, delimiter));
+ // do not validate header if using the line iterator
+ if (_useLineIterator) {
+ String[] header = StringUtils.split(csvHeader, delimiter);
+ setHeaderMap(header);
+ format = format.withHeader(header);
+ _isHeaderProvided = true;
+ } else {
+ // validate header for the delimiter before splitting
+ validateHeaderForDelimiter(delimiter, csvHeader, format);
+ format = format.withHeader(StringUtils.split(csvHeader, delimiter));
+ }
}
-
+ format = format.withSkipHeaderRecord(config.isSkipHeader());
+ _skipHeaderRecord = config.isSkipHeader();
format = format.withCommentMarker(config.getCommentMarker());
format = format.withEscape(config.getEscapeCharacter());
format = format.withIgnoreEmptyLines(config.isIgnoreEmptyLines());
format =
format.withIgnoreSurroundingSpaces(config.isIgnoreSurroundingSpaces());
- format = format.withSkipHeaderRecord(config.isSkipHeader());
format = format.withQuote(config.getQuoteCharacter());
if (config.getQuoteMode() != null) {
@@ -123,7 +158,7 @@ public class CSVRecordReader implements RecordReader {
CSVRecordExtractorConfig recordExtractorConfig = new
CSVRecordExtractorConfig();
recordExtractorConfig.setMultiValueDelimiter(multiValueDelimiter);
- recordExtractorConfig.setColumnNames(_parser.getHeaderMap().keySet());
+ recordExtractorConfig.setColumnNames(_headerMap.keySet());
_recordExtractor.init(fieldsToRead, recordExtractorConfig);
}
@@ -147,7 +182,12 @@ public class CSVRecordReader implements RecordReader {
private void init()
throws IOException {
+ if (_useLineIterator) {
+ initLineIteratorResources();
+ return;
+ }
_parser = _format.parse(RecordReaderUtils.getBufferedReader(_dataFile));
+ _headerMap = _parser.getHeaderMap();
_iterator = _parser.iterator();
}
@@ -161,36 +201,140 @@ public class CSVRecordReader implements RecordReader {
public Map<String, Integer> getCSVHeaderMap() {
// if header row is not configured and input file doesn't contain a valid
header record, the returned map would
// contain values from the first row in the input file.
- return _parser.getHeaderMap();
+ return _headerMap;
}
@Override
public boolean hasNext() {
+ if (_useLineIterator) {
+ // When line iterator is used, the call to this method won't throw an
exception. The default and the only iterator
+ // from commons-csv library can throw an exception upon calling the
hasNext() method. The line iterator overcomes
+ // this limitation.
+ return _nextLine != null;
+ }
return _iterator.hasNext();
}
@Override
- public GenericRow next() {
+ public GenericRow next()
+ throws IOException {
return next(new GenericRow());
}
@Override
- public GenericRow next(GenericRow reuse) {
- CSVRecord record = _iterator.next();
- _recordExtractor.extract(record, reuse);
+ public GenericRow next(GenericRow reuse)
+ throws IOException {
+ if (_useLineIterator) {
+ readNextLine(reuse);
+ } else {
+ CSVRecord record = _iterator.next();
+ _recordExtractor.extract(record, reuse);
+ }
return reuse;
}
@Override
public void rewind()
throws IOException {
- _parser.close();
+ if (_useLineIterator) {
+ resetLineIteratorResources();
+ }
+
+ if (_parser != null && !_parser.isClosed()) {
+ _parser.close();
+ }
+
init();
}
@Override
public void close()
throws IOException {
- _parser.close();
+ if (_useLineIterator) {
+ resetLineIteratorResources();
+ }
+
+ if (_parser != null && !_parser.isClosed()) {
+ _parser.close();
+ }
+ }
+
+ private void readNextLine(GenericRow reuse)
+ throws IOException {
+ while (_nextLine != null) {
+ try (Reader reader = new StringReader(_nextLine)) {
+ try (CSVParser csvParser = _format.parse(reader)) {
+ List<CSVRecord> csvRecords = csvParser.getRecords();
+ if (csvRecords != null && csvRecords.size() > 0) {
+ // There would be only one record as lines are read one after the
other
+ CSVRecord record = csvRecords.get(0);
+ _recordExtractor.extract(record, reuse);
+ break;
+ } else {
+ // Can be thrown on: 1) Empty lines 2) Commented lines
+ throw new NoSuchElementException("Failed to find any records");
+ }
+ } catch (Exception e) {
+ _skippedLinesCount++;
+ LOGGER.debug("Skipped input line: {} from file: {}", _nextLine,
_dataFile, e);
+ // Find the next line that can be parsed
+ _nextLine = _bufferedReader.readLine();
+ }
+ }
+ }
+ if (_nextLine != null) {
+ // Advance the pointer to the next line for future reading
+ _nextLine = _bufferedReader.readLine();
+ } else {
+ throw new RuntimeException("No more parseable lines. Line iterator
reached end of file.");
+ }
+ }
+
+ private void setHeaderMap(String[] header) {
+ int columnPos = 0;
+ for (String columnName : header) {
+ _headerMap.put(columnName, columnPos++);
+ }
+ }
+
+ private void initLineIteratorResources()
+ throws IOException {
+ _bufferedReader = new BufferedReader(new FileReader(_dataFile), 1024 *
32); // 32KB buffer size
+
+ // When header is supplied by the client
+ if (_isHeaderProvided) {
+ if (_skipHeaderRecord) {
+ // When skip header config is set and header is supplied – skip the
first line from the input file
+ _bufferedReader.readLine();
+ // turn off the property so that it doesn't interfere with further
parsing
+ _format = _format.withSkipHeaderRecord(false);
+ }
+ _nextLine = _bufferedReader.readLine();
+ } else {
+ // Read the first line and set the header
+ String headerLine = _bufferedReader.readLine();
+ String[] header = StringUtils.split(headerLine, _format.getDelimiter());
+ setHeaderMap(header);
+ _format = _format.withHeader(header);
+ _nextLine = _bufferedReader.readLine();
+ }
+ }
+
+ private void resetLineIteratorResources()
+ throws IOException {
+ _nextLine = null;
+
+ LOGGER.info("Total lines skipped in file: {} were: {}", _dataFile,
_skippedLinesCount);
+ _skippedLinesCount = 0;
+
+ // if header is not provided by the client it would be rebuilt. When it's
provided by the client it's initialized
+ // once in the constructor
+ if (!_isHeaderProvided) {
+ _headerMap.clear();
+ }
+
+ if (_bufferedReader != null) {
+ _bufferedReader.close();
+ }
}
}
diff --git
a/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderConfig.java
b/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderConfig.java
index d0d2df8f77..bdc4ae2ed8 100644
---
a/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderConfig.java
+++
b/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderConfig.java
@@ -37,6 +37,7 @@ public class CSVRecordReaderConfig implements
RecordReaderConfig {
private Character _escapeCharacter; // Default is null
private String _nullStringValue;
private boolean _skipHeader;
+ private boolean _skipUnParseableLines = false;
private boolean _ignoreEmptyLines = true;
private boolean _ignoreSurroundingSpaces = true;
private Character _quoteCharacter = '"';
@@ -76,6 +77,14 @@ public class CSVRecordReaderConfig implements
RecordReaderConfig {
_multiValueDelimiter = multiValueDelimiter;
}
+ public boolean isSkipUnParseableLines() {
+ return _skipUnParseableLines;
+ }
+
+ public void setSkipUnParseableLines(boolean skipUnParseableLines) {
+ _skipUnParseableLines = skipUnParseableLines;
+ }
+
public boolean isMultiValueDelimiterEnabled() {
return _multiValueDelimiterEnabled;
}
diff --git
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
index 14542bc496..9cc8b67bd7 100644
---
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
+++
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
@@ -21,6 +21,9 @@ package org.apache.pinot.plugin.inputformat.csv;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.csv.CSVFormat;
@@ -216,4 +219,263 @@ public class CSVRecordReaderTest extends
AbstractRecordReaderTest {
Assert.assertNull(row.getValue("col2"));
Assert.assertNull(row.getValue("col3"));
}
+
+ @Test
+ public void testReadingDataFileWithCommentedLines()
+ throws IOException, URISyntaxException {
+ URI uri =
ClassLoader.getSystemResource("dataFileWithCommentedLines.csv").toURI();
+ File dataFile = new File(uri);
+
+ // test using line iterator
+ CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
+ readerConfig.setSkipUnParseableLines(true);
+ readerConfig.setCommentMarker('#');
+ List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
false);
+ Assert.assertEquals(3, genericRows.size());
+
+ // test using default CSVRecordReader
+ readerConfig.setSkipUnParseableLines(false);
+ genericRows = readCSVRecords(dataFile, readerConfig, false);
+ Assert.assertEquals(3, genericRows.size());
+ }
+
+ @Test
+ public void testReadingDataFileWithEmptyLines()
+ throws URISyntaxException, IOException {
+ URI uri =
ClassLoader.getSystemResource("dataFileWithEmptyLines.csv").toURI();
+ File dataFile = new File(uri);
+
+ // test using line iterator
+ CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
+ readerConfig.setSkipUnParseableLines(true);
+ List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
false);
+ Assert.assertEquals(5, genericRows.size());
+
+ // test using default CSVRecordReader
+ readerConfig.setSkipUnParseableLines(false);
+ genericRows = readCSVRecords(dataFile, readerConfig, false);
+ Assert.assertEquals(5, genericRows.size());
+ }
+
+ @Test
+ public void testReadingDataFileWithEscapedQuotes()
+ throws URISyntaxException, IOException {
+ URI uri =
ClassLoader.getSystemResource("dataFileWithEscapedQuotes.csv").toURI();
+ File dataFile = new File(uri);
+
+ // test using line iterator
+ CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
+ readerConfig.setSkipUnParseableLines(true);
+ List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
false);
+ Assert.assertEquals(2, genericRows.size());
+
+ // test using default CSVRecordReader
+ readerConfig.setSkipUnParseableLines(false);
+ genericRows = readCSVRecords(dataFile, readerConfig, false);
+ Assert.assertEquals(2, genericRows.size());
+ }
+
+ @Test
+ public void testReadingDataFileWithNoHeader()
+ throws URISyntaxException, IOException {
+ URI uri =
ClassLoader.getSystemResource("dataFileWithNoHeader.csv").toURI();
+ File dataFile = new File(uri);
+
+ // test using line iterator
+ CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
+ readerConfig.setSkipUnParseableLines(true);
+ readerConfig.setHeader("id,name");
+ List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
false);
+ Assert.assertEquals(3, genericRows.size());
+
+ // test using default CSVRecordReader
+ readerConfig.setSkipUnParseableLines(false);
+ genericRows = readCSVRecords(dataFile, readerConfig, false);
+ Assert.assertEquals(3, genericRows.size());
+ }
+
+ @Test
+ public void testReadingDataFileWithQuotedHeaders()
+ throws URISyntaxException, IOException {
+ URI uri =
ClassLoader.getSystemResource("dataFileWithQuotedHeaders.csv").toURI();
+ File dataFile = new File(uri);
+
+ // test using line iterator
+ CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
+ readerConfig.setSkipUnParseableLines(true);
+ List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
false);
+ Assert.assertEquals(2, genericRows.size());
+
+ // test using default CSVRecordReader
+ readerConfig.setSkipUnParseableLines(false);
+ genericRows = readCSVRecords(dataFile, readerConfig, false);
+ Assert.assertEquals(2, genericRows.size());
+ }
+
+ @Test
+ public void testLineIteratorReadingDataFileWithUnparseableLines()
+ throws URISyntaxException, IOException {
+ URI uri =
ClassLoader.getSystemResource("dataFileWithUnparseableLines.csv").toURI();
+ File dataFile = new File(uri);
+
+ CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
+ readerConfig.setSkipUnParseableLines(true);
+ List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
false);
+ Assert.assertEquals(1, genericRows.size());
+ }
+
+ @Test (expectedExceptions = RuntimeException.class)
+ public void
testDefaultCsvReaderExceptionReadingDataFileWithUnparseableLines()
+ throws URISyntaxException, IOException {
+ URI uri =
ClassLoader.getSystemResource("dataFileWithUnparseableLines.csv").toURI();
+ File dataFile = new File(uri);
+
+ CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
+ readCSVRecords(dataFile, readerConfig, false);
+ }
+
+ @Test
+ public void testLineIteratorReadingDataFileWithMultipleCombinations()
+ throws URISyntaxException, IOException {
+ URI uri =
ClassLoader.getSystemResource("dataFileWithMultipleCombinations.csv").toURI();
+ File dataFile = new File(uri);
+
+ CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
+ readerConfig.setSkipUnParseableLines(true);
+ readerConfig.setCommentMarker('#');
+ readerConfig.setIgnoreEmptyLines(true);
+
+ List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
false);
+ Assert.assertEquals(7, genericRows.size());
+ }
+
+ @Test
+ public void testDefaultCsvReaderReadingDataFileWithMultipleCombinations()
+ throws URISyntaxException, IOException {
+ URI uri =
ClassLoader.getSystemResource("dataFileWithMultipleCombinationsParseable.csv").toURI();
+ File dataFile = new File(uri);
+
+ CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
+ readerConfig.setCommentMarker('#');
+ readerConfig.setIgnoreEmptyLines(true);
+
+ List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
false);
+ Assert.assertEquals(7, genericRows.size());
+ }
+
+ @Test
+ public void testLineIteratorRewindMethod()
+ throws URISyntaxException, IOException {
+ URI uri =
ClassLoader.getSystemResource("dataFileWithMultipleCombinations.csv").toURI();
+ File dataFile = new File(uri);
+
+ CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
+ readerConfig.setSkipUnParseableLines(true);
+ readerConfig.setCommentMarker('#');
+ readerConfig.setIgnoreEmptyLines(true);
+ readCSVRecords(dataFile, readerConfig, true);
+
+ // Start reading again; results should be same
+ List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
false);
+ Assert.assertEquals(7, genericRows.size());
+ }
+
+ @Test
+ public void testDefaultCsvReaderRewindMethod()
+ throws URISyntaxException, IOException {
+ URI uri =
ClassLoader.getSystemResource("dataFileWithMultipleCombinationsParseable.csv").toURI();
+ File dataFile = new File(uri);
+
+ CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
+ readerConfig.setCommentMarker('#');
+ readerConfig.setIgnoreEmptyLines(true);
+ readCSVRecords(dataFile, readerConfig, true);
+
+ // Start reading again; results should be same
+ List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
false);
+ Assert.assertEquals(7, genericRows.size());
+ }
+
+ @Test
+ public void testReadingDataFileWithInvalidHeader()
+ throws URISyntaxException, IOException {
+ URI uri =
ClassLoader.getSystemResource("dataFileWithInvalidHeader.csv").toURI();
+ File dataFile = new File(uri);
+
+ // test using line iterator
+ CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
+ readerConfig.setHeader("firstName,lastName,id");
+ readerConfig.setSkipHeader(true);
+ readerConfig.setSkipUnParseableLines(true);
+ List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
false);
+ Assert.assertEquals(3, genericRows.size());
+
+ // test using default CSVRecordReader
+ readerConfig.setSkipUnParseableLines(false);
+ genericRows = readCSVRecords(dataFile, readerConfig, false);
+ Assert.assertEquals(3, genericRows.size());
+ }
+
+ @Test
+ public void testReadingDataFileWithAlternateDelimiter()
+ throws URISyntaxException, IOException {
+ URI uri =
ClassLoader.getSystemResource("dataFileWithAlternateDelimiter.csv").toURI();
+ File dataFile = new File(uri);
+
+ // test using line iterator
+ CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
+ readerConfig.setDelimiter('|');
+ readerConfig.setSkipUnParseableLines(true);
+ List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
false);
+ Assert.assertEquals(3, genericRows.size());
+
+ // test using default CSVRecordReader
+ readerConfig.setSkipUnParseableLines(false);
+ genericRows = readCSVRecords(dataFile, readerConfig, false);
+ Assert.assertEquals(3, genericRows.size());
+ }
+
+ @Test
+ public void testRewindMethodAndSkipHeader()
+ throws URISyntaxException, IOException {
+ URI uri =
ClassLoader.getSystemResource("dataFileWithInvalidHeader.csv").toURI();
+ File dataFile = new File(uri);
+
+ CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
+ readerConfig.setSkipUnParseableLines(true);
+ readerConfig.setHeader("id,name");
+ readerConfig.setSkipHeader(true);
+ readCSVRecords(dataFile, readerConfig, true);
+
+ // Start reading again; results should be same
+ List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
false);
+ Assert.assertEquals(3, genericRows.size());
+
+ // test using default CSVRecordReader
+ readerConfig.setSkipUnParseableLines(false);
+ readCSVRecords(dataFile, readerConfig, true);
+
+ // Start reading again; results should be same
+ genericRows = readCSVRecords(dataFile, readerConfig, false);
+ Assert.assertEquals(3, genericRows.size());
+ }
+
+ private List<GenericRow> readCSVRecords(File dataFile, CSVRecordReaderConfig
readerConfig, boolean rewind)
+ throws IOException {
+ List<GenericRow> genericRows = new ArrayList<>();
+
+ try (CSVRecordReader recordReader = new CSVRecordReader()) {
+ recordReader.init(dataFile, null, readerConfig);
+ while (recordReader.hasNext()) {
+ GenericRow genericRow = recordReader.next();
+ genericRows.add(genericRow);
+ }
+
+ if (rewind) {
+ // rewind the reader after reading all the lines
+ recordReader.rewind();
+ }
+ }
+ return genericRows;
+ }
}
diff --git
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithAlternateDelimiter.csv
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithAlternateDelimiter.csv
new file mode 100644
index 0000000000..09d3a2ce2c
--- /dev/null
+++
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithAlternateDelimiter.csv
@@ -0,0 +1,4 @@
+id|fisrtName|lastName
+100|John|Doe
+101|Jane|Doe
+102|Jen|Doe
diff --git
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithCommentedLines.csv
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithCommentedLines.csv
new file mode 100644
index 0000000000..d435459b5d
--- /dev/null
+++
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithCommentedLines.csv
@@ -0,0 +1,6 @@
+id,name
+# ignore line#1
+100,Jane
+# ignore line#2
+101,John
+102,Sam
diff --git
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithEmptyLines.csv
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithEmptyLines.csv
new file mode 100644
index 0000000000..cc617a1099
--- /dev/null
+++
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithEmptyLines.csv
@@ -0,0 +1,9 @@
+id,name
+100,Jane
+
+101,John
+
+
+102,Sam
+103,Tom
+104,Zu
diff --git
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithEscapedQuotes.csv
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithEscapedQuotes.csv
new file mode 100644
index 0000000000..c760885cc9
--- /dev/null
+++
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithEscapedQuotes.csv
@@ -0,0 +1,3 @@
+\"id\",\"name\"
+\"100\",\"Jane\"
+\"101\",\"John\"
diff --git
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithInvalidHeader.csv
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithInvalidHeader.csv
new file mode 100644
index 0000000000..c2e2357572
--- /dev/null
+++
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithInvalidHeader.csv
@@ -0,0 +1,4 @@
+id
+John,Doe,100
+Jane,Doe,101
+Jen,Doe,102
diff --git
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithMultipleCombinations.csv
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithMultipleCombinations.csv
new file mode 100644
index 0000000000..5f173f4b6c
--- /dev/null
+++
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithMultipleCombinations.csv
@@ -0,0 +1,17 @@
+id,name
+
+
+100,John
+# ignore line 1
+
+# ignore line 2
+
+101,Jane
+102,Jerry
+
+103,Suzanne
+# below line is unparseable by the commons-csv library
+"104","Yu"s"
+"105","Zack"
+\"106\",\"Ze\"
+107,Zu
diff --git
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithMultipleCombinationsParseable.csv
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithMultipleCombinationsParseable.csv
new file mode 100644
index 0000000000..d804f04101
--- /dev/null
+++
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithMultipleCombinationsParseable.csv
@@ -0,0 +1,15 @@
+id,name
+
+
+100,John
+# ignore line 1
+
+# ignore line 2
+
+101,Jane
+102,Jerry
+
+103,Suzanne
+"105","Zack"
+\"106\",\"Ze\"
+107,Zu
diff --git
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithNoHeader.csv
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithNoHeader.csv
new file mode 100644
index 0000000000..b402feba4a
--- /dev/null
+++
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithNoHeader.csv
@@ -0,0 +1,3 @@
+100,Jane
+101,John
+102,Sam
diff --git
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithQuotedHeaders.csv
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithQuotedHeaders.csv
new file mode 100644
index 0000000000..ec7033d1d4
--- /dev/null
+++
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithQuotedHeaders.csv
@@ -0,0 +1,3 @@
+"id","name"
+"100","Jane"
+"101","John"
diff --git
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableLines.csv
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableLines.csv
new file mode 100644
index 0000000000..ffbcb091e9
--- /dev/null
+++
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableLines.csv
@@ -0,0 +1,3 @@
+id,name
+"100","John"s"
+"101","Jane"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]