snleee commented on code in PR #11487:
URL: https://github.com/apache/pinot/pull/11487#discussion_r1316187574


##########
pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java:
##########
@@ -161,36 +202,140 @@ private void init()
   public Map<String, Integer> getCSVHeaderMap() {
     // if header row is not configured and input file doesn't contain a valid 
header record, the returned map would
     // contain values from the first row in the input file.
-    return _parser.getHeaderMap();
+    return _headerMap;
   }
 
   @Override
   public boolean hasNext() {
+    if (_useLineIterator) {
+      // When line iterator is used, the call to this method won't throw an 
exception. The default and the only iterator
+      // from commons-csv library can throw an exception upon calling the 
hasNext() method. The line iterator overcomes
+      // this limitation.
+      return _nextLine != null;
+    }
     return _iterator.hasNext();
   }
 
   @Override
-  public GenericRow next() {
+  public GenericRow next()
+      throws IOException {
     return next(new GenericRow());
   }
 
   @Override
-  public GenericRow next(GenericRow reuse) {
-    CSVRecord record = _iterator.next();
-    _recordExtractor.extract(record, reuse);
+  public GenericRow next(GenericRow reuse)
+      throws IOException {
+    if (_useLineIterator) {
+      readNextLine(reuse);
+    } else {
+      CSVRecord record = _iterator.next();
+      _recordExtractor.extract(record, reuse);
+    }
     return reuse;
   }
 
   @Override
   public void rewind()
       throws IOException {
-    _parser.close();
+    if (_useLineIterator) {
+      resetLineIteratorResources();
+    }
+
+    if (_parser != null && !_parser.isClosed()) {
+      _parser.close();
+    }
+
     init();
   }
 
   @Override
   public void close()
       throws IOException {
-    _parser.close();
+    if (_useLineIterator) {
+      resetLineIteratorResources();
+    }
+
+    if (_parser != null && !_parser.isClosed()) {
+      _parser.close();
+    }
+  }
+
+  private void readNextLine(GenericRow reuse)
+      throws IOException {
+    while (_nextLine != null) {
+      try (Reader reader = new StringReader(_nextLine)) {
+        try (CSVParser csvParser = _format.parse(reader)) {
+          List<CSVRecord> csvRecords = csvParser.getRecords();
+          if (csvRecords != null && csvRecords.size() > 0) {
+            // There would be only one record as lines are read one after the 
other
+            CSVRecord record = csvRecords.get(0);
+            _recordExtractor.extract(record, reuse);
+            break;
+          } else {
+            // Can be thrown on: 1) Empty lines 2) Commented lines
+            throw new NoSuchElementException("Failed to find any records");
+          }
+        } catch (Exception e) {
+          _skippedLinesCount++;
+          LOGGER.debug("Skipped input line: {} from file: {}", _nextLine, 
_dataFile, e);
+          // Find the next line that can be parsed
+          _nextLine = _bufferedReader.readLine();
+        }
+      }
+    }
+    if (_nextLine != null) {
+      // Advance the pointer to the next line for future reading
+      _nextLine = _bufferedReader.readLine();
+    } else {
+      throw new RuntimeException("No more parseable lines. Line iterator 
reached end of file.");
+    }
+  }
+
+  private void setHeaderMap(String[] header) {
+    int columnPos = 0;
+    for (String columnName : header) {
+      _headerMap.put(columnName, columnPos++);
+    }
+  }
+
+  private void initLineIteratorResources()
+      throws IOException {
+    _bufferedReader = new BufferedReader(new FileReader(_dataFile), 1024 * 
32); // 32KB buffer size
+
+    // When header is supplied by the client
+    if (_isHeaderProvided) {
+      if (_skipHeaderRecord) {
+        // When skip header config is set and header is supplied – skip the 
first line from the input file
+        _bufferedReader.readLine();
+        // turn off the property so that it doesn't interfere with further 
parsing
+        _format = _format.withSkipHeaderRecord(false);
+      }
+      _nextLine = _bufferedReader.readLine();
+    } else {
+      // Read the first line and set the header
+      String headerLine = _bufferedReader.readLine();
+      String[] header = StringUtils.split(headerLine, _format.getDelimiter());

Review Comment:
   Let's investigate the following and address in the follow-up PR.
   
   - check if newer library version has the proper wiring for these options
   - we should keep the same behavior across 2 approaches.
   - do the same investigation on `validateHeaderForDelimiter`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to