rajagopr commented on code in PR #11487:
URL: https://github.com/apache/pinot/pull/11487#discussion_r1313705239


##########
pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java:
##########
@@ -83,20 +105,34 @@ public void init(File dataFile, @Nullable Set<String> 
fieldsToRead, @Nullable Re
       }
       char delimiter = config.getDelimiter();
       format = format.withDelimiter(delimiter);
+
+      if (config.isSkipUnParseableLines()) {
+        _useLineIterator = true;
+      }
+
       String csvHeader = config.getHeader();
       if (csvHeader == null) {
         format = format.withHeader();
       } else {
-        //validate header for the delimiter before splitting
-        validateHeaderForDelimiter(delimiter, csvHeader, format);
-        format = format.withHeader(StringUtils.split(csvHeader, delimiter));
+        // do not validate header if using the line iterator

Review Comment:
   Following problems exist with the current validation.
   1) It calls `iterator.hasNext()` which is what we are trying to avoid in the 
first place
   2) It checks if the record has multiple values. User can pass in header and 
the record can have a single line which is valid. Hence, this check is not 
valid.
   3) It checks if delimiter is present in header. This is also not valid if 
the file is like this.
   ```
   id
   100
   ```
   
   As all these checks are harmful, I have not made use of it.
   



##########
pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java:
##########
@@ -161,36 +202,140 @@ private void init()
   public Map<String, Integer> getCSVHeaderMap() {
     // if header row is not configured and input file doesn't contain a valid 
header record, the returned map would
     // contain values from the first row in the input file.
-    return _parser.getHeaderMap();
+    return _headerMap;
   }
 
   @Override
   public boolean hasNext() {
+    if (_useLineIterator) {
+      // When line iterator is used, the call to this method won't throw an 
exception. The default and the only iterator
+      // from commons-csv library can throw an exception upon calling the 
hasNext() method. The line iterator overcomes
+      // this limitation.
+      return _nextLine != null;
+    }
     return _iterator.hasNext();
   }
 
   @Override
-  public GenericRow next() {
+  public GenericRow next()
+      throws IOException {
     return next(new GenericRow());
   }
 
   @Override
-  public GenericRow next(GenericRow reuse) {
-    CSVRecord record = _iterator.next();
-    _recordExtractor.extract(record, reuse);
+  public GenericRow next(GenericRow reuse)
+      throws IOException {
+    if (_useLineIterator) {
+      readNextLine(reuse);
+    } else {
+      CSVRecord record = _iterator.next();
+      _recordExtractor.extract(record, reuse);
+    }
     return reuse;
   }
 
   @Override
   public void rewind()
       throws IOException {
-    _parser.close();
+    if (_useLineIterator) {
+      resetLineIteratorResources();
+    }
+
+    if (_parser != null && !_parser.isClosed()) {
+      _parser.close();
+    }
+
     init();
   }
 
   @Override
   public void close()
       throws IOException {
-    _parser.close();
+    if (_useLineIterator) {
+      resetLineIteratorResources();

Review Comment:
   The line iterator does not make use of the `_parser` at all. Hence, this 
should not be a problem,



##########
pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java:
##########
@@ -33,17 +41,31 @@
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.data.readers.RecordReaderConfig;
 import org.apache.pinot.spi.data.readers.RecordReaderUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
  * Record reader for CSV file.
  */
+@NotThreadSafe

Review Comment:
   No, they do not. I have added this as a good practice. This is purely for 
documentation.



##########
pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java:
##########
@@ -83,20 +105,34 @@ public void init(File dataFile, @Nullable Set<String> 
fieldsToRead, @Nullable Re
       }
       char delimiter = config.getDelimiter();
       format = format.withDelimiter(delimiter);
+
+      if (config.isSkipUnParseableLines()) {
+        _useLineIterator = true;
+      }
+
       String csvHeader = config.getHeader();
       if (csvHeader == null) {
         format = format.withHeader();
       } else {
-        //validate header for the delimiter before splitting
-        validateHeaderForDelimiter(delimiter, csvHeader, format);
-        format = format.withHeader(StringUtils.split(csvHeader, delimiter));
+        // do not validate header if using the line iterator
+        if (_useLineIterator) {
+          String[] header = StringUtils.split(csvHeader, delimiter);
+          setHeaderMap(header);
+          format = format.withHeader(header);
+          _isHeaderProvided = true;
+        } else {
+          // validate header for the delimiter before splitting
+          validateHeaderForDelimiter(delimiter, csvHeader, format);
+          format = format.withHeader(StringUtils.split(csvHeader, delimiter));
+        }
       }
 
       format = format.withCommentMarker(config.getCommentMarker());
       format = format.withEscape(config.getEscapeCharacter());
       format = format.withIgnoreEmptyLines(config.isIgnoreEmptyLines());
       format = 
format.withIgnoreSurroundingSpaces(config.isIgnoreSurroundingSpaces());
       format = format.withSkipHeaderRecord(config.isSkipHeader());
+      _skipHeaderRecord = config.isSkipHeader();

Review Comment:
   will do.



##########
pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java:
##########
@@ -83,20 +105,34 @@ public void init(File dataFile, @Nullable Set<String> 
fieldsToRead, @Nullable Re
       }
       char delimiter = config.getDelimiter();
       format = format.withDelimiter(delimiter);
+
+      if (config.isSkipUnParseableLines()) {
+        _useLineIterator = true;
+      }
+
       String csvHeader = config.getHeader();
       if (csvHeader == null) {
         format = format.withHeader();
       } else {
-        //validate header for the delimiter before splitting
-        validateHeaderForDelimiter(delimiter, csvHeader, format);
-        format = format.withHeader(StringUtils.split(csvHeader, delimiter));
+        // do not validate header if using the line iterator
+        if (_useLineIterator) {
+          String[] header = StringUtils.split(csvHeader, delimiter);
+          setHeaderMap(header);

Review Comment:
   Yet to determine what would be the correct validation for a header line. 
Existing validation does not look correct.



##########
pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java:
##########
@@ -161,36 +202,140 @@ private void init()
   public Map<String, Integer> getCSVHeaderMap() {
     // if header row is not configured and input file doesn't contain a valid 
header record, the returned map would
     // contain values from the first row in the input file.
-    return _parser.getHeaderMap();
+    return _headerMap;
   }
 
   @Override
   public boolean hasNext() {
+    if (_useLineIterator) {
+      // When line iterator is used, the call to this method won't throw an 
exception. The default and the only iterator
+      // from commons-csv library can throw an exception upon calling the 
hasNext() method. The line iterator overcomes
+      // this limitation.
+      return _nextLine != null;
+    }
     return _iterator.hasNext();
   }
 
   @Override
-  public GenericRow next() {
+  public GenericRow next()
+      throws IOException {
     return next(new GenericRow());
   }
 
   @Override
-  public GenericRow next(GenericRow reuse) {
-    CSVRecord record = _iterator.next();
-    _recordExtractor.extract(record, reuse);
+  public GenericRow next(GenericRow reuse)
+      throws IOException {
+    if (_useLineIterator) {
+      readNextLine(reuse);
+    } else {
+      CSVRecord record = _iterator.next();
+      _recordExtractor.extract(record, reuse);
+    }
     return reuse;
   }
 
   @Override
   public void rewind()
       throws IOException {
-    _parser.close();
+    if (_useLineIterator) {
+      resetLineIteratorResources();
+    }
+
+    if (_parser != null && !_parser.isClosed()) {
+      _parser.close();
+    }
+
     init();
   }
 
   @Override
   public void close()
       throws IOException {
-    _parser.close();
+    if (_useLineIterator) {
+      resetLineIteratorResources();
+    }
+
+    if (_parser != null && !_parser.isClosed()) {
+      _parser.close();
+    }
+  }
+
+  private void readNextLine(GenericRow reuse)
+      throws IOException {
+    while (_nextLine != null) {
+      try (Reader reader = new StringReader(_nextLine)) {
+        try (CSVParser csvParser = _format.parse(reader)) {
+          List<CSVRecord> csvRecords = csvParser.getRecords();
+          if (csvRecords != null && csvRecords.size() > 0) {
+            // There would be only one record as lines are read one after the 
other
+            CSVRecord record = csvRecords.get(0);
+            _recordExtractor.extract(record, reuse);
+            break;
+          } else {
+            // Can be thrown on: 1) Empty lines 2) Commented lines
+            throw new NoSuchElementException("Failed to find any records");
+          }
+        } catch (Exception e) {
+          _skippedLinesCount++;
+          LOGGER.debug("Skipped input line: {} from file: {}", _nextLine, 
_dataFile, e);
+          // Find the next line that can be parsed
+          _nextLine = _bufferedReader.readLine();
+        }
+      }
+    }
+    if (_nextLine != null) {
+      // Advance the pointer to the next line for future reading
+      _nextLine = _bufferedReader.readLine();
+    } else {
+      throw new RuntimeException("No more parseable lines. Line iterator 
reached end of file.");
+    }
+  }
+
+  private void setHeaderMap(String[] header) {
+    int columnPos = 0;
+    for (String columnName : header) {
+      _headerMap.put(columnName, columnPos++);
+    }
+  }
+
+  private void initLineIteratorResources()
+      throws IOException {
+    _bufferedReader = new BufferedReader(new FileReader(_dataFile), 1024 * 
32); // 32KB buffer size

Review Comment:
   BufferedReader will fill the buffer until it can read the line fully. I have 
increased the buffer size from 8KB to 32KB as I noticed a performance gain when 
using slightly larger buffer.



##########
pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java:
##########
@@ -161,36 +202,140 @@ private void init()
   public Map<String, Integer> getCSVHeaderMap() {
     // if header row is not configured and input file doesn't contain a valid 
header record, the returned map would
     // contain values from the first row in the input file.
-    return _parser.getHeaderMap();
+    return _headerMap;
   }
 
   @Override
   public boolean hasNext() {
+    if (_useLineIterator) {
+      // When line iterator is used, the call to this method won't throw an 
exception. The default and the only iterator
+      // from commons-csv library can throw an exception upon calling the 
hasNext() method. The line iterator overcomes
+      // this limitation.
+      return _nextLine != null;
+    }
     return _iterator.hasNext();
   }
 
   @Override
-  public GenericRow next() {
+  public GenericRow next()
+      throws IOException {
     return next(new GenericRow());
   }
 
   @Override
-  public GenericRow next(GenericRow reuse) {
-    CSVRecord record = _iterator.next();
-    _recordExtractor.extract(record, reuse);
+  public GenericRow next(GenericRow reuse)
+      throws IOException {
+    if (_useLineIterator) {
+      readNextLine(reuse);
+    } else {
+      CSVRecord record = _iterator.next();
+      _recordExtractor.extract(record, reuse);
+    }
     return reuse;
   }
 
   @Override
   public void rewind()
       throws IOException {
-    _parser.close();
+    if (_useLineIterator) {
+      resetLineIteratorResources();
+    }
+
+    if (_parser != null && !_parser.isClosed()) {
+      _parser.close();
+    }
+
     init();
   }
 
   @Override
   public void close()
       throws IOException {
-    _parser.close();
+    if (_useLineIterator) {
+      resetLineIteratorResources();
+    }
+
+    if (_parser != null && !_parser.isClosed()) {
+      _parser.close();
+    }
+  }
+
+  private void readNextLine(GenericRow reuse)
+      throws IOException {
+    while (_nextLine != null) {
+      try (Reader reader = new StringReader(_nextLine)) {
+        try (CSVParser csvParser = _format.parse(reader)) {
+          List<CSVRecord> csvRecords = csvParser.getRecords();
+          if (csvRecords != null && csvRecords.size() > 0) {
+            // There would be only one record as lines are read one after the 
other
+            CSVRecord record = csvRecords.get(0);
+            _recordExtractor.extract(record, reuse);
+            break;
+          } else {
+            // Can be thrown on: 1) Empty lines 2) Commented lines
+            throw new NoSuchElementException("Failed to find any records");
+          }
+        } catch (Exception e) {
+          _skippedLinesCount++;
+          LOGGER.debug("Skipped input line: {} from file: {}", _nextLine, 
_dataFile, e);
+          // Find the next line that can be parsed
+          _nextLine = _bufferedReader.readLine();

Review Comment:
   Yes, that is the behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to