snleee commented on code in PR #11487:
URL: https://github.com/apache/pinot/pull/11487#discussion_r1313692304
##########
pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java:
##########
@@ -83,20 +105,34 @@ public void init(File dataFile, @Nullable Set<String>
fieldsToRead, @Nullable Re
}
char delimiter = config.getDelimiter();
format = format.withDelimiter(delimiter);
+
+ if (config.isSkipUnParseableLines()) {
+ _useLineIterator = true;
+ }
+
String csvHeader = config.getHeader();
if (csvHeader == null) {
format = format.withHeader();
} else {
- //validate header for the delimiter before splitting
- validateHeaderForDelimiter(delimiter, csvHeader, format);
- format = format.withHeader(StringUtils.split(csvHeader, delimiter));
+ // do not validate header if using the line iterator
+ if (_useLineIterator) {
+ String[] header = StringUtils.split(csvHeader, delimiter);
+ setHeaderMap(header);
+ format = format.withHeader(header);
+ _isHeaderProvided = true;
+ } else {
+ // validate header for the delimiter before splitting
+ validateHeaderForDelimiter(delimiter, csvHeader, format);
+ format = format.withHeader(StringUtils.split(csvHeader, delimiter));
+ }
}
format = format.withCommentMarker(config.getCommentMarker());
format = format.withEscape(config.getEscapeCharacter());
format = format.withIgnoreEmptyLines(config.isIgnoreEmptyLines());
format =
format.withIgnoreSurroundingSpaces(config.isIgnoreSurroundingSpaces());
format = format.withSkipHeaderRecord(config.isSkipHeader());
+ _skipHeaderRecord = config.isSkipHeader();
Review Comment:
Let's move line 135, 136 to 129 for cleaning up.
```
_skipHeaderRecord = config.isSkipHeader();
format = format.withSkipHeaderRecord(config.isSkipHeader());
format = format.withCommentMarker(config.getCommentMarker());
format = format.withEscape(config.getEscapeCharacter());
format = format.withIgnoreEmptyLines(config.isIgnoreEmptyLines());
format =
format.withIgnoreSurroundingSpaces(config.isIgnoreSurroundingSpaces());
```
##########
pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java:
##########
@@ -83,20 +105,34 @@ public void init(File dataFile, @Nullable Set<String>
fieldsToRead, @Nullable Re
}
char delimiter = config.getDelimiter();
format = format.withDelimiter(delimiter);
+
+ if (config.isSkipUnParseableLines()) {
+ _useLineIterator = true;
+ }
+
String csvHeader = config.getHeader();
if (csvHeader == null) {
format = format.withHeader();
} else {
- //validate header for the delimiter before splitting
- validateHeaderForDelimiter(delimiter, csvHeader, format);
- format = format.withHeader(StringUtils.split(csvHeader, delimiter));
+ // do not validate header if using the line iterator
Review Comment:
I think that there's no harm in running `validateHeaderForDelimiter`? It's
reading the first line and check the delimiter.
##########
pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java:
##########
@@ -216,4 +219,555 @@ public void testNullValueString()
Assert.assertNull(row.getValue("col2"));
Assert.assertNull(row.getValue("col3"));
}
+
+ @Test
Review Comment:
Instead of repeating entire function for default/line iterator, we can keep
1 test but within each test we can test both. For all existing tests (e.g.
`testNullValueString`, `testHeaderDelimiterSingleColumn` etc), let's add the
coverage for line iterator as well.
I think that we can make this test very concise like the following:
```
URI uri =
ClassLoader.getSystemResource("dataFileWithCommentedLines.csv").toURI();
File dataFile = new File(uri);
CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
readerConfig.setSkipUnParseableLines(true);
readerConfig.setCommentMarker('#');
List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig);
Assert.assertEquals(3, genericRows.size());
readerConfig.setSkipUnParseableLines(true);
genericRows = readCSVRecords(dataFile, readerConfig);
Assert.assertEquals(3, genericRows.size());
```
##########
pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java:
##########
@@ -33,17 +41,31 @@
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderConfig;
import org.apache.pinot.spi.data.readers.RecordReaderUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Record reader for CSV file.
*/
+@NotThreadSafe
Review Comment:
Does other record reader has this annotation?
##########
pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java:
##########
@@ -161,36 +202,140 @@ private void init()
public Map<String, Integer> getCSVHeaderMap() {
// if header row is not configured and input file doesn't contain a valid
header record, the returned map would
// contain values from the first row in the input file.
- return _parser.getHeaderMap();
+ return _headerMap;
}
@Override
public boolean hasNext() {
+ if (_useLineIterator) {
+ // When line iterator is used, the call to this method won't throw an
exception. The default and the only iterator
+ // from commons-csv library can throw an exception upon calling the
hasNext() method. The line iterator overcomes
+ // this limitation.
+ return _nextLine != null;
+ }
return _iterator.hasNext();
}
@Override
- public GenericRow next() {
+ public GenericRow next()
+ throws IOException {
return next(new GenericRow());
}
@Override
- public GenericRow next(GenericRow reuse) {
- CSVRecord record = _iterator.next();
- _recordExtractor.extract(record, reuse);
+ public GenericRow next(GenericRow reuse)
+ throws IOException {
+ if (_useLineIterator) {
+ readNextLine(reuse);
+ } else {
+ CSVRecord record = _iterator.next();
+ _recordExtractor.extract(record, reuse);
+ }
return reuse;
}
@Override
public void rewind()
throws IOException {
- _parser.close();
+ if (_useLineIterator) {
+ resetLineIteratorResources();
+ }
+
+ if (_parser != null && !_parser.isClosed()) {
+ _parser.close();
+ }
+
init();
}
@Override
public void close()
throws IOException {
- _parser.close();
+ if (_useLineIterator) {
+ resetLineIteratorResources();
Review Comment:
`resetLineIteratorResources()` will throw the exception. In that case, we
won't close `_parser` properly.
Can we double check on how to properly close this?
##########
pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java:
##########
@@ -161,36 +202,140 @@ private void init()
public Map<String, Integer> getCSVHeaderMap() {
// if header row is not configured and input file doesn't contain a valid
header record, the returned map would
// contain values from the first row in the input file.
- return _parser.getHeaderMap();
+ return _headerMap;
}
@Override
public boolean hasNext() {
+ if (_useLineIterator) {
+ // When line iterator is used, the call to this method won't throw an
exception. The default and the only iterator
+ // from commons-csv library can throw an exception upon calling the
hasNext() method. The line iterator overcomes
+ // this limitation.
+ return _nextLine != null;
+ }
return _iterator.hasNext();
}
@Override
- public GenericRow next() {
+ public GenericRow next()
+ throws IOException {
return next(new GenericRow());
}
@Override
- public GenericRow next(GenericRow reuse) {
- CSVRecord record = _iterator.next();
- _recordExtractor.extract(record, reuse);
+ public GenericRow next(GenericRow reuse)
+ throws IOException {
+ if (_useLineIterator) {
+ readNextLine(reuse);
+ } else {
+ CSVRecord record = _iterator.next();
+ _recordExtractor.extract(record, reuse);
+ }
return reuse;
}
@Override
public void rewind()
throws IOException {
- _parser.close();
+ if (_useLineIterator) {
+ resetLineIteratorResources();
+ }
+
+ if (_parser != null && !_parser.isClosed()) {
+ _parser.close();
+ }
+
init();
}
@Override
public void close()
throws IOException {
- _parser.close();
+ if (_useLineIterator) {
+ resetLineIteratorResources();
+ }
+
+ if (_parser != null && !_parser.isClosed()) {
+ _parser.close();
+ }
+ }
+
+ private void readNextLine(GenericRow reuse)
+ throws IOException {
+ while (_nextLine != null) {
+ try (Reader reader = new StringReader(_nextLine)) {
+ try (CSVParser csvParser = _format.parse(reader)) {
+ List<CSVRecord> csvRecords = csvParser.getRecords();
+ if (csvRecords != null && csvRecords.size() > 0) {
+ // There would be only one record as lines are read one after the
other
+ CSVRecord record = csvRecords.get(0);
+ _recordExtractor.extract(record, reuse);
+ break;
+ } else {
+ // Can be thrown on: 1) Empty lines 2) Commented lines
+ throw new NoSuchElementException("Failed to find any records");
+ }
+ } catch (Exception e) {
+ _skippedLinesCount++;
+ LOGGER.debug("Skipped input line: {} from file: {}", _nextLine,
_dataFile, e);
+ // Find the next line that can be parsed
+ _nextLine = _bufferedReader.readLine();
+ }
+ }
+ }
+ if (_nextLine != null) {
+ // Advance the pointer to the next line for future reading
+ _nextLine = _bufferedReader.readLine();
+ } else {
+ throw new RuntimeException("No more parseable lines. Line iterator
reached end of file.");
+ }
+ }
+
+ private void setHeaderMap(String[] header) {
+ int columnPos = 0;
+ for (String columnName : header) {
+ _headerMap.put(columnName, columnPos++);
+ }
+ }
+
+ private void initLineIteratorResources()
+ throws IOException {
+ _bufferedReader = new BufferedReader(new FileReader(_dataFile), 1024 *
32); // 32KB buffer size
Review Comment:
Do we cover the case when the line > 32KB (in the extreme case)?
##########
pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java:
##########
@@ -161,36 +202,140 @@ private void init()
public Map<String, Integer> getCSVHeaderMap() {
// if header row is not configured and input file doesn't contain a valid
header record, the returned map would
// contain values from the first row in the input file.
- return _parser.getHeaderMap();
+ return _headerMap;
}
@Override
public boolean hasNext() {
+ if (_useLineIterator) {
+ // When line iterator is used, the call to this method won't throw an
exception. The default and the only iterator
+ // from commons-csv library can throw an exception upon calling the
hasNext() method. The line iterator overcomes
+ // this limitation.
+ return _nextLine != null;
+ }
return _iterator.hasNext();
}
@Override
- public GenericRow next() {
+ public GenericRow next()
+ throws IOException {
return next(new GenericRow());
}
@Override
- public GenericRow next(GenericRow reuse) {
- CSVRecord record = _iterator.next();
- _recordExtractor.extract(record, reuse);
+ public GenericRow next(GenericRow reuse)
+ throws IOException {
+ if (_useLineIterator) {
+ readNextLine(reuse);
+ } else {
+ CSVRecord record = _iterator.next();
+ _recordExtractor.extract(record, reuse);
+ }
return reuse;
}
@Override
public void rewind()
throws IOException {
- _parser.close();
+ if (_useLineIterator) {
+ resetLineIteratorResources();
+ }
+
+ if (_parser != null && !_parser.isClosed()) {
+ _parser.close();
+ }
+
init();
}
@Override
public void close()
throws IOException {
- _parser.close();
+ if (_useLineIterator) {
+ resetLineIteratorResources();
+ }
+
+ if (_parser != null && !_parser.isClosed()) {
+ _parser.close();
+ }
+ }
+
+ private void readNextLine(GenericRow reuse)
+ throws IOException {
+ while (_nextLine != null) {
+ try (Reader reader = new StringReader(_nextLine)) {
+ try (CSVParser csvParser = _format.parse(reader)) {
+ List<CSVRecord> csvRecords = csvParser.getRecords();
+ if (csvRecords != null && csvRecords.size() > 0) {
+ // There would be only one record as lines are read one after the
other
+ CSVRecord record = csvRecords.get(0);
+ _recordExtractor.extract(record, reuse);
+ break;
+ } else {
+ // Can be thrown on: 1) Empty lines 2) Commented lines
+ throw new NoSuchElementException("Failed to find any records");
+ }
+ } catch (Exception e) {
+ _skippedLinesCount++;
+ LOGGER.debug("Skipped input line: {} from file: {}", _nextLine,
_dataFile, e);
+ // Find the next line that can be parsed
+ _nextLine = _bufferedReader.readLine();
Review Comment:
Once we reach to the end file, `_bufferedReader.readLine()` will return
`null`?
##########
pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java:
##########
@@ -83,20 +105,34 @@ public void init(File dataFile, @Nullable Set<String>
fieldsToRead, @Nullable Re
}
char delimiter = config.getDelimiter();
format = format.withDelimiter(delimiter);
+
+ if (config.isSkipUnParseableLines()) {
+ _useLineIterator = true;
+ }
+
String csvHeader = config.getHeader();
if (csvHeader == null) {
format = format.withHeader();
} else {
- //validate header for the delimiter before splitting
- validateHeaderForDelimiter(delimiter, csvHeader, format);
- format = format.withHeader(StringUtils.split(csvHeader, delimiter));
+ // do not validate header if using the line iterator
+ if (_useLineIterator) {
+ String[] header = StringUtils.split(csvHeader, delimiter);
+ setHeaderMap(header);
Review Comment:
This assumes that the header will be in good format. Why we are not doing
the validation in this case?
##########
pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java:
##########
@@ -161,36 +202,140 @@ private void init()
public Map<String, Integer> getCSVHeaderMap() {
// if header row is not configured and input file doesn't contain a valid
header record, the returned map would
// contain values from the first row in the input file.
- return _parser.getHeaderMap();
+ return _headerMap;
}
@Override
public boolean hasNext() {
+ if (_useLineIterator) {
+ // When line iterator is used, the call to this method won't throw an
exception. The default and the only iterator
+ // from commons-csv library can throw an exception upon calling the
hasNext() method. The line iterator overcomes
+ // this limitation.
+ return _nextLine != null;
+ }
return _iterator.hasNext();
}
@Override
- public GenericRow next() {
+ public GenericRow next()
+ throws IOException {
return next(new GenericRow());
}
@Override
- public GenericRow next(GenericRow reuse) {
- CSVRecord record = _iterator.next();
- _recordExtractor.extract(record, reuse);
+ public GenericRow next(GenericRow reuse)
+ throws IOException {
+ if (_useLineIterator) {
+ readNextLine(reuse);
+ } else {
+ CSVRecord record = _iterator.next();
+ _recordExtractor.extract(record, reuse);
+ }
return reuse;
}
@Override
public void rewind()
throws IOException {
- _parser.close();
+ if (_useLineIterator) {
+ resetLineIteratorResources();
+ }
+
+ if (_parser != null && !_parser.isClosed()) {
+ _parser.close();
+ }
+
init();
}
@Override
public void close()
throws IOException {
- _parser.close();
+ if (_useLineIterator) {
+ resetLineIteratorResources();
+ }
+
+ if (_parser != null && !_parser.isClosed()) {
+ _parser.close();
+ }
+ }
+
+ private void readNextLine(GenericRow reuse)
+ throws IOException {
+ while (_nextLine != null) {
+ try (Reader reader = new StringReader(_nextLine)) {
+ try (CSVParser csvParser = _format.parse(reader)) {
+ List<CSVRecord> csvRecords = csvParser.getRecords();
+ if (csvRecords != null && csvRecords.size() > 0) {
+ // There would be only one record as lines are read one after the
other
+ CSVRecord record = csvRecords.get(0);
+ _recordExtractor.extract(record, reuse);
+ break;
+ } else {
+ // Can be thrown on: 1) Empty lines 2) Commented lines
+ throw new NoSuchElementException("Failed to find any records");
+ }
+ } catch (Exception e) {
+ _skippedLinesCount++;
+ LOGGER.debug("Skipped input line: {} from file: {}", _nextLine,
_dataFile, e);
+ // Find the next line that can be parsed
+ _nextLine = _bufferedReader.readLine();
+ }
+ }
+ }
+ if (_nextLine != null) {
+ // Advance the pointer to the next line for future reading
+ _nextLine = _bufferedReader.readLine();
+ } else {
+ throw new RuntimeException("No more parseable lines. Line iterator
reached end of file.");
+ }
+ }
+
+ private void setHeaderMap(String[] header) {
+ int columnPos = 0;
+ for (String columnName : header) {
+ _headerMap.put(columnName, columnPos++);
+ }
+ }
+
+ private void initLineIteratorResources()
+ throws IOException {
+ _bufferedReader = new BufferedReader(new FileReader(_dataFile), 1024 *
32); // 32KB buffer size
+
+ // When header is supplied by the client
+ if (_isHeaderProvided) {
+ if (_skipHeaderRecord) {
+ // When skip header config is set and header is supplied – skip the
first line from the input file
+ _bufferedReader.readLine();
+ // turn off the property so that it doesn't interfere with further
parsing
+ _format = _format.withSkipHeaderRecord(false);
+ }
+ _nextLine = _bufferedReader.readLine();
+ } else {
+ // Read the first line and set the header
+ String headerLine = _bufferedReader.readLine();
+ String[] header = StringUtils.split(headerLine, _format.getDelimiter());
Review Comment:
Why don't we use the same approach as the regular csv for reading the
header? I don't think that we need the custom handling here? (header is anyway
the first line of the file. So, we can first consume header using the shared
code and then diverge?)
Otherwise, I would see different behavior for parsing the header in some
edge cases.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]