This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b2fd7d3cc Added support to skip unparseable records in the csv record 
reader (#11487)
3b2fd7d3cc is described below

commit 3b2fd7d3cc595ad32af0967f388ed0e8053379ec
Author: Ragesh Rajagopalan <[email protected]>
AuthorDate: Tue Sep 5 10:30:25 2023 -0700

    Added support to skip unparseable records in the csv record reader (#11487)
---
 .../plugin/inputformat/csv/CSVRecordReader.java    | 170 ++++++++++++-
 .../inputformat/csv/CSVRecordReaderConfig.java     |   9 +
 .../inputformat/csv/CSVRecordReaderTest.java       | 262 +++++++++++++++++++++
 .../resources/dataFileWithAlternateDelimiter.csv   |   4 +
 .../test/resources/dataFileWithCommentedLines.csv  |   6 +
 .../src/test/resources/dataFileWithEmptyLines.csv  |   9 +
 .../test/resources/dataFileWithEscapedQuotes.csv   |   3 +
 .../test/resources/dataFileWithInvalidHeader.csv   |   4 +
 .../resources/dataFileWithMultipleCombinations.csv |  17 ++
 .../dataFileWithMultipleCombinationsParseable.csv  |  15 ++
 .../src/test/resources/dataFileWithNoHeader.csv    |   3 +
 .../test/resources/dataFileWithQuotedHeaders.csv   |   3 +
 .../resources/dataFileWithUnparseableLines.csv     |   3 +
 13 files changed, 495 insertions(+), 13 deletions(-)

diff --git 
a/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
 
b/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
index 400125067f..a635f03cf9 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
@@ -18,12 +18,20 @@
  */
 package org.apache.pinot.plugin.inputformat.csv;
 
+import java.io.BufferedReader;
 import java.io.File;
+import java.io.FileReader;
 import java.io.IOException;
+import java.io.Reader;
+import java.io.StringReader;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Set;
 import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
 import org.apache.commons.csv.CSVFormat;
 import org.apache.commons.csv.CSVParser;
 import org.apache.commons.csv.CSVRecord;
@@ -33,17 +41,31 @@ import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.data.readers.RecordReaderConfig;
 import org.apache.pinot.spi.data.readers.RecordReaderUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
  * Record reader for CSV file.
  */
+@NotThreadSafe
 public class CSVRecordReader implements RecordReader {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CSVRecordReader.class);
+
   private File _dataFile;
   private CSVFormat _format;
   private CSVParser _parser;
   private Iterator<CSVRecord> _iterator;
   private CSVRecordExtractor _recordExtractor;
+  private Map<String, Integer> _headerMap = new HashMap<>();
+
+  // line iterator specific variables
+  private boolean _useLineIterator = false;
+  private boolean _skipHeaderRecord = false;
+  private boolean _isHeaderProvided = false;
+  private long _skippedLinesCount;
+  private BufferedReader _bufferedReader;
+  private String _nextLine;
 
   public CSVRecordReader() {
   }
@@ -83,20 +105,33 @@ public class CSVRecordReader implements RecordReader {
       }
       char delimiter = config.getDelimiter();
       format = format.withDelimiter(delimiter);
+
+      if (config.isSkipUnParseableLines()) {
+        _useLineIterator = true;
+      }
+
       String csvHeader = config.getHeader();
       if (csvHeader == null) {
         format = format.withHeader();
       } else {
-        //validate header for the delimiter before splitting
-        validateHeaderForDelimiter(delimiter, csvHeader, format);
-        format = format.withHeader(StringUtils.split(csvHeader, delimiter));
+        // do not validate header if using the line iterator
+        if (_useLineIterator) {
+          String[] header = StringUtils.split(csvHeader, delimiter);
+          setHeaderMap(header);
+          format = format.withHeader(header);
+          _isHeaderProvided = true;
+        } else {
+          // validate header for the delimiter before splitting
+          validateHeaderForDelimiter(delimiter, csvHeader, format);
+          format = format.withHeader(StringUtils.split(csvHeader, delimiter));
+        }
       }
-
+      format = format.withSkipHeaderRecord(config.isSkipHeader());
+      _skipHeaderRecord = config.isSkipHeader();
       format = format.withCommentMarker(config.getCommentMarker());
       format = format.withEscape(config.getEscapeCharacter());
       format = format.withIgnoreEmptyLines(config.isIgnoreEmptyLines());
       format = 
format.withIgnoreSurroundingSpaces(config.isIgnoreSurroundingSpaces());
-      format = format.withSkipHeaderRecord(config.isSkipHeader());
       format = format.withQuote(config.getQuoteCharacter());
 
       if (config.getQuoteMode() != null) {
@@ -123,7 +158,7 @@ public class CSVRecordReader implements RecordReader {
 
     CSVRecordExtractorConfig recordExtractorConfig = new 
CSVRecordExtractorConfig();
     recordExtractorConfig.setMultiValueDelimiter(multiValueDelimiter);
-    recordExtractorConfig.setColumnNames(_parser.getHeaderMap().keySet());
+    recordExtractorConfig.setColumnNames(_headerMap.keySet());
     _recordExtractor.init(fieldsToRead, recordExtractorConfig);
   }
 
@@ -147,7 +182,12 @@ public class CSVRecordReader implements RecordReader {
 
   private void init()
       throws IOException {
+    if (_useLineIterator) {
+      initLineIteratorResources();
+      return;
+    }
     _parser = _format.parse(RecordReaderUtils.getBufferedReader(_dataFile));
+    _headerMap = _parser.getHeaderMap();
     _iterator = _parser.iterator();
   }
 
@@ -161,36 +201,140 @@ public class CSVRecordReader implements RecordReader {
   public Map<String, Integer> getCSVHeaderMap() {
     // if header row is not configured and input file doesn't contain a valid 
header record, the returned map would
     // contain values from the first row in the input file.
-    return _parser.getHeaderMap();
+    return _headerMap;
   }
 
   @Override
   public boolean hasNext() {
+    if (_useLineIterator) {
+      // When line iterator is used, the call to this method won't throw an 
exception. The default and the only iterator
+      // from commons-csv library can throw an exception upon calling the 
hasNext() method. The line iterator overcomes
+      // this limitation.
+      return _nextLine != null;
+    }
     return _iterator.hasNext();
   }
 
   @Override
-  public GenericRow next() {
+  public GenericRow next()
+      throws IOException {
     return next(new GenericRow());
   }
 
   @Override
-  public GenericRow next(GenericRow reuse) {
-    CSVRecord record = _iterator.next();
-    _recordExtractor.extract(record, reuse);
+  public GenericRow next(GenericRow reuse)
+      throws IOException {
+    if (_useLineIterator) {
+      readNextLine(reuse);
+    } else {
+      CSVRecord record = _iterator.next();
+      _recordExtractor.extract(record, reuse);
+    }
     return reuse;
   }
 
   @Override
   public void rewind()
       throws IOException {
-    _parser.close();
+    if (_useLineIterator) {
+      resetLineIteratorResources();
+    }
+
+    if (_parser != null && !_parser.isClosed()) {
+      _parser.close();
+    }
+
     init();
   }
 
   @Override
   public void close()
       throws IOException {
-    _parser.close();
+    if (_useLineIterator) {
+      resetLineIteratorResources();
+    }
+
+    if (_parser != null && !_parser.isClosed()) {
+      _parser.close();
+    }
+  }
+
+  private void readNextLine(GenericRow reuse)
+      throws IOException {
+    while (_nextLine != null) {
+      try (Reader reader = new StringReader(_nextLine)) {
+        try (CSVParser csvParser = _format.parse(reader)) {
+          List<CSVRecord> csvRecords = csvParser.getRecords();
+          if (csvRecords != null && csvRecords.size() > 0) {
+            // There would be only one record as lines are read one after the 
other
+            CSVRecord record = csvRecords.get(0);
+            _recordExtractor.extract(record, reuse);
+            break;
+          } else {
+            // Can be thrown on: 1) Empty lines 2) Commented lines
+            throw new NoSuchElementException("Failed to find any records");
+          }
+        } catch (Exception e) {
+          _skippedLinesCount++;
+          LOGGER.debug("Skipped input line: {} from file: {}", _nextLine, 
_dataFile, e);
+          // Find the next line that can be parsed
+          _nextLine = _bufferedReader.readLine();
+        }
+      }
+    }
+    if (_nextLine != null) {
+      // Advance the pointer to the next line for future reading
+      _nextLine = _bufferedReader.readLine();
+    } else {
+      throw new RuntimeException("No more parseable lines. Line iterator 
reached end of file.");
+    }
+  }
+
+  private void setHeaderMap(String[] header) {
+    int columnPos = 0;
+    for (String columnName : header) {
+      _headerMap.put(columnName, columnPos++);
+    }
+  }
+
+  private void initLineIteratorResources()
+      throws IOException {
+    _bufferedReader = new BufferedReader(new FileReader(_dataFile), 1024 * 
32); // 32KB buffer size
+
+    // When header is supplied by the client
+    if (_isHeaderProvided) {
+      if (_skipHeaderRecord) {
+        // When skip header config is set and header is supplied – skip the 
first line from the input file
+        _bufferedReader.readLine();
+        // turn off the property so that it doesn't interfere with further 
parsing
+        _format = _format.withSkipHeaderRecord(false);
+      }
+      _nextLine = _bufferedReader.readLine();
+    } else {
+      // Read the first line and set the header
+      String headerLine = _bufferedReader.readLine();
+      String[] header = StringUtils.split(headerLine, _format.getDelimiter());
+      setHeaderMap(header);
+      _format = _format.withHeader(header);
+      _nextLine = _bufferedReader.readLine();
+    }
+  }
+
+  private void resetLineIteratorResources()
+      throws IOException {
+    _nextLine = null;
+
+    LOGGER.info("Total lines skipped in file: {} were: {}", _dataFile, 
_skippedLinesCount);
+    _skippedLinesCount = 0;
+
+    // if header is not provided by the client it would be rebuilt. When it's 
provided by the client it's initialized
+    // once in the constructor
+    if (!_isHeaderProvided) {
+      _headerMap.clear();
+    }
+
+    if (_bufferedReader != null) {
+      _bufferedReader.close();
+    }
   }
 }
diff --git 
a/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderConfig.java
 
b/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderConfig.java
index d0d2df8f77..bdc4ae2ed8 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderConfig.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderConfig.java
@@ -37,6 +37,7 @@ public class CSVRecordReaderConfig implements 
RecordReaderConfig {
   private Character _escapeCharacter; // Default is null
   private String _nullStringValue;
   private boolean _skipHeader;
+  private boolean _skipUnParseableLines = false;
   private boolean _ignoreEmptyLines = true;
   private boolean _ignoreSurroundingSpaces = true;
   private Character _quoteCharacter = '"';
@@ -76,6 +77,14 @@ public class CSVRecordReaderConfig implements 
RecordReaderConfig {
     _multiValueDelimiter = multiValueDelimiter;
   }
 
+  public boolean isSkipUnParseableLines() {
+    return _skipUnParseableLines;
+  }
+
+  public void setSkipUnParseableLines(boolean skipUnParseableLines) {
+    _skipUnParseableLines = skipUnParseableLines;
+  }
+
   public boolean isMultiValueDelimiterEnabled() {
     return _multiValueDelimiterEnabled;
   }
diff --git 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
index 14542bc496..9cc8b67bd7 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
@@ -21,6 +21,9 @@ package org.apache.pinot.plugin.inputformat.csv;
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import org.apache.commons.csv.CSVFormat;
@@ -216,4 +219,263 @@ public class CSVRecordReaderTest extends 
AbstractRecordReaderTest {
     Assert.assertNull(row.getValue("col2"));
     Assert.assertNull(row.getValue("col3"));
   }
+
+  @Test
+  public void testReadingDataFileWithCommentedLines()
+      throws IOException, URISyntaxException {
+    URI uri = 
ClassLoader.getSystemResource("dataFileWithCommentedLines.csv").toURI();
+    File dataFile = new File(uri);
+
+    // test using line iterator
+    CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
+    readerConfig.setSkipUnParseableLines(true);
+    readerConfig.setCommentMarker('#');
+    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
false);
+    Assert.assertEquals(3, genericRows.size());
+
+    // test using default CSVRecordReader
+    readerConfig.setSkipUnParseableLines(false);
+    genericRows = readCSVRecords(dataFile, readerConfig, false);
+    Assert.assertEquals(3, genericRows.size());
+  }
+
+  @Test
+  public void testReadingDataFileWithEmptyLines()
+      throws URISyntaxException, IOException {
+    URI uri = 
ClassLoader.getSystemResource("dataFileWithEmptyLines.csv").toURI();
+    File dataFile = new File(uri);
+
+    // test using line iterator
+    CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
+    readerConfig.setSkipUnParseableLines(true);
+    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
false);
+    Assert.assertEquals(5, genericRows.size());
+
+    // test using default CSVRecordReader
+    readerConfig.setSkipUnParseableLines(false);
+    genericRows = readCSVRecords(dataFile, readerConfig, false);
+    Assert.assertEquals(5, genericRows.size());
+  }
+
+  @Test
+  public void testReadingDataFileWithEscapedQuotes()
+      throws URISyntaxException, IOException {
+    URI uri = 
ClassLoader.getSystemResource("dataFileWithEscapedQuotes.csv").toURI();
+    File dataFile = new File(uri);
+
+    // test using line iterator
+    CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
+    readerConfig.setSkipUnParseableLines(true);
+    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
false);
+    Assert.assertEquals(2, genericRows.size());
+
+    // test using default CSVRecordReader
+    readerConfig.setSkipUnParseableLines(false);
+    genericRows = readCSVRecords(dataFile, readerConfig, false);
+    Assert.assertEquals(2, genericRows.size());
+  }
+
+  @Test
+  public void testReadingDataFileWithNoHeader()
+      throws URISyntaxException, IOException {
+    URI uri = 
ClassLoader.getSystemResource("dataFileWithNoHeader.csv").toURI();
+    File dataFile = new File(uri);
+
+    // test using line iterator
+    CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
+    readerConfig.setSkipUnParseableLines(true);
+    readerConfig.setHeader("id,name");
+    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
false);
+    Assert.assertEquals(3, genericRows.size());
+
+    // test using default CSVRecordReader
+    readerConfig.setSkipUnParseableLines(false);
+    genericRows = readCSVRecords(dataFile, readerConfig, false);
+    Assert.assertEquals(3, genericRows.size());
+  }
+
+  @Test
+  public void testReadingDataFileWithQuotedHeaders()
+      throws URISyntaxException, IOException {
+    URI uri = 
ClassLoader.getSystemResource("dataFileWithQuotedHeaders.csv").toURI();
+    File dataFile = new File(uri);
+
+    // test using line iterator
+    CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
+    readerConfig.setSkipUnParseableLines(true);
+    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
false);
+    Assert.assertEquals(2, genericRows.size());
+
+    // test using default CSVRecordReader
+    readerConfig.setSkipUnParseableLines(false);
+    genericRows = readCSVRecords(dataFile, readerConfig, false);
+    Assert.assertEquals(2, genericRows.size());
+  }
+
+  @Test
+  public void testLineIteratorReadingDataFileWithUnparseableLines()
+      throws URISyntaxException, IOException {
+    URI uri = 
ClassLoader.getSystemResource("dataFileWithUnparseableLines.csv").toURI();
+    File dataFile = new File(uri);
+
+    CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
+    readerConfig.setSkipUnParseableLines(true);
+    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
false);
+    Assert.assertEquals(1, genericRows.size());
+  }
+
+  @Test (expectedExceptions = RuntimeException.class)
+  public void 
testDefaultCsvReaderExceptionReadingDataFileWithUnparseableLines()
+      throws URISyntaxException, IOException {
+    URI uri = 
ClassLoader.getSystemResource("dataFileWithUnparseableLines.csv").toURI();
+    File dataFile = new File(uri);
+
+    CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
+    readCSVRecords(dataFile, readerConfig, false);
+  }
+
+  @Test
+  public void testLineIteratorReadingDataFileWithMultipleCombinations()
+      throws URISyntaxException, IOException {
+    URI uri = 
ClassLoader.getSystemResource("dataFileWithMultipleCombinations.csv").toURI();
+    File dataFile = new File(uri);
+
+    CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
+    readerConfig.setSkipUnParseableLines(true);
+    readerConfig.setCommentMarker('#');
+    readerConfig.setIgnoreEmptyLines(true);
+
+    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
false);
+    Assert.assertEquals(7, genericRows.size());
+  }
+
+  @Test
+  public void testDefaultCsvReaderReadingDataFileWithMultipleCombinations()
+      throws URISyntaxException, IOException {
+    URI uri = 
ClassLoader.getSystemResource("dataFileWithMultipleCombinationsParseable.csv").toURI();
+    File dataFile = new File(uri);
+
+    CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
+    readerConfig.setCommentMarker('#');
+    readerConfig.setIgnoreEmptyLines(true);
+
+    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
false);
+    Assert.assertEquals(7, genericRows.size());
+  }
+
+  @Test
+  public void testLineIteratorRewindMethod()
+      throws URISyntaxException, IOException {
+    URI uri = 
ClassLoader.getSystemResource("dataFileWithMultipleCombinations.csv").toURI();
+    File dataFile = new File(uri);
+
+    CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
+    readerConfig.setSkipUnParseableLines(true);
+    readerConfig.setCommentMarker('#');
+    readerConfig.setIgnoreEmptyLines(true);
+    readCSVRecords(dataFile, readerConfig, true);
+
+    // Start reading again; results should be same
+    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
false);
+    Assert.assertEquals(7, genericRows.size());
+  }
+
+  @Test
+  public void testDefaultCsvReaderRewindMethod()
+      throws URISyntaxException, IOException {
+    URI uri = 
ClassLoader.getSystemResource("dataFileWithMultipleCombinationsParseable.csv").toURI();
+    File dataFile = new File(uri);
+
+    CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
+    readerConfig.setCommentMarker('#');
+    readerConfig.setIgnoreEmptyLines(true);
+    readCSVRecords(dataFile, readerConfig, true);
+
+    // Start reading again; results should be same
+    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
false);
+    Assert.assertEquals(7, genericRows.size());
+  }
+
+  @Test
+  public void testReadingDataFileWithInvalidHeader()
+      throws URISyntaxException, IOException {
+    URI uri = 
ClassLoader.getSystemResource("dataFileWithInvalidHeader.csv").toURI();
+    File dataFile = new File(uri);
+
+    // test using line iterator
+    CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
+    readerConfig.setHeader("firstName,lastName,id");
+    readerConfig.setSkipHeader(true);
+    readerConfig.setSkipUnParseableLines(true);
+    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
false);
+    Assert.assertEquals(3, genericRows.size());
+
+    // test using default CSVRecordReader
+    readerConfig.setSkipUnParseableLines(false);
+    genericRows = readCSVRecords(dataFile, readerConfig, false);
+    Assert.assertEquals(3, genericRows.size());
+  }
+
+  @Test
+  public void testReadingDataFileWithAlternateDelimiter()
+      throws URISyntaxException, IOException {
+    URI uri = 
ClassLoader.getSystemResource("dataFileWithAlternateDelimiter.csv").toURI();
+    File dataFile = new File(uri);
+
+    // test using line iterator
+    CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
+    readerConfig.setDelimiter('|');
+    readerConfig.setSkipUnParseableLines(true);
+    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
false);
+    Assert.assertEquals(3, genericRows.size());
+
+    // test using default CSVRecordReader
+    readerConfig.setSkipUnParseableLines(false);
+    genericRows = readCSVRecords(dataFile, readerConfig, false);
+    Assert.assertEquals(3, genericRows.size());
+  }
+
+  @Test
+  public void testRewindMethodAndSkipHeader()
+      throws URISyntaxException, IOException {
+    URI uri = 
ClassLoader.getSystemResource("dataFileWithInvalidHeader.csv").toURI();
+    File dataFile = new File(uri);
+
+    CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
+    readerConfig.setSkipUnParseableLines(true);
+    readerConfig.setHeader("id,name");
+    readerConfig.setSkipHeader(true);
+    readCSVRecords(dataFile, readerConfig, true);
+
+    // Start reading again; results should be same
+    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
false);
+    Assert.assertEquals(3, genericRows.size());
+
+    // test using default CSVRecordReader
+    readerConfig.setSkipUnParseableLines(false);
+    readCSVRecords(dataFile, readerConfig, true);
+
+    // Start reading again; results should be same
+    genericRows = readCSVRecords(dataFile, readerConfig, false);
+    Assert.assertEquals(3, genericRows.size());
+  }
+
+  private List<GenericRow> readCSVRecords(File dataFile, CSVRecordReaderConfig 
readerConfig, boolean rewind)
+      throws IOException {
+    List<GenericRow> genericRows = new ArrayList<>();
+
+    try (CSVRecordReader recordReader = new CSVRecordReader()) {
+      recordReader.init(dataFile, null, readerConfig);
+      while (recordReader.hasNext()) {
+        GenericRow genericRow = recordReader.next();
+        genericRows.add(genericRow);
+      }
+
+      if (rewind) {
+        // rewind the reader after reading all the lines
+        recordReader.rewind();
+      }
+    }
+    return genericRows;
+  }
 }
diff --git 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithAlternateDelimiter.csv
 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithAlternateDelimiter.csv
new file mode 100644
index 0000000000..09d3a2ce2c
--- /dev/null
+++ 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithAlternateDelimiter.csv
@@ -0,0 +1,4 @@
+id|fisrtName|lastName
+100|John|Doe
+101|Jane|Doe
+102|Jen|Doe
diff --git 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithCommentedLines.csv
 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithCommentedLines.csv
new file mode 100644
index 0000000000..d435459b5d
--- /dev/null
+++ 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithCommentedLines.csv
@@ -0,0 +1,6 @@
+id,name
+# ignore line#1
+100,Jane
+# ignore line#2
+101,John
+102,Sam
diff --git 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithEmptyLines.csv
 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithEmptyLines.csv
new file mode 100644
index 0000000000..cc617a1099
--- /dev/null
+++ 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithEmptyLines.csv
@@ -0,0 +1,9 @@
+id,name
+100,Jane
+
+101,John
+
+
+102,Sam
+103,Tom
+104,Zu
diff --git 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithEscapedQuotes.csv
 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithEscapedQuotes.csv
new file mode 100644
index 0000000000..c760885cc9
--- /dev/null
+++ 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithEscapedQuotes.csv
@@ -0,0 +1,3 @@
+\"id\",\"name\"
+\"100\",\"Jane\"
+\"101\",\"John\"
diff --git 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithInvalidHeader.csv
 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithInvalidHeader.csv
new file mode 100644
index 0000000000..c2e2357572
--- /dev/null
+++ 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithInvalidHeader.csv
@@ -0,0 +1,4 @@
+id
+John,Doe,100
+Jane,Doe,101
+Jen,Doe,102
diff --git 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithMultipleCombinations.csv
 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithMultipleCombinations.csv
new file mode 100644
index 0000000000..5f173f4b6c
--- /dev/null
+++ 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithMultipleCombinations.csv
@@ -0,0 +1,17 @@
+id,name
+
+
+100,John
+# ignore line 1
+
+# ignore line 2
+
+101,Jane
+102,Jerry
+
+103,Suzanne
+# below line is  unparseable by the commons-csv library
+"104","Yu"s"
+"105","Zack"
+\"106\",\"Ze\"
+107,Zu
diff --git 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithMultipleCombinationsParseable.csv
 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithMultipleCombinationsParseable.csv
new file mode 100644
index 0000000000..d804f04101
--- /dev/null
+++ 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithMultipleCombinationsParseable.csv
@@ -0,0 +1,15 @@
+id,name
+
+
+100,John
+# ignore line 1
+
+# ignore line 2
+
+101,Jane
+102,Jerry
+
+103,Suzanne
+"105","Zack"
+\"106\",\"Ze\"
+107,Zu
diff --git 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithNoHeader.csv
 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithNoHeader.csv
new file mode 100644
index 0000000000..b402feba4a
--- /dev/null
+++ 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithNoHeader.csv
@@ -0,0 +1,3 @@
+100,Jane
+101,John
+102,Sam
diff --git 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithQuotedHeaders.csv
 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithQuotedHeaders.csv
new file mode 100644
index 0000000000..ec7033d1d4
--- /dev/null
+++ 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithQuotedHeaders.csv
@@ -0,0 +1,3 @@
+"id","name"
+"100","Jane"
+"101","John"
diff --git 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableLines.csv
 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableLines.csv
new file mode 100644
index 0000000000..ffbcb091e9
--- /dev/null
+++ 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableLines.csv
@@ -0,0 +1,3 @@
+id,name
+"100","John"s"
+"101","Jane"


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to