lukecwik commented on code in PR #23196:
URL: https://github.com/apache/beam/pull/23196#discussion_r972235373


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java:
##########
@@ -152,131 +160,260 @@ protected void startReading(ReadableByteChannel 
channel) throws IOException {
           // all the bytes of the delimiter in the call to 
findDelimiterBounds() below
           requiredPosition = startOffset - delimiter.length;
         }
-        ((SeekableByteChannel) channel).position(requiredPosition);
-        findDelimiterBounds();
-        buffer = buffer.substring(endOfDelimiterInBuffer);
-        startOfNextRecord = requiredPosition + endOfDelimiterInBuffer;
-        endOfDelimiterInBuffer = 0;
-        startOfDelimiterInBuffer = 0;
+
+        // Handle the case where the requiredPosition is at the beginning of 
the file so we can
+        // skip over UTF8_BOM if present.
+        if (requiredPosition < UTF8_BOM.size()) {
+          ((SeekableByteChannel) channel).position(0);
+          if (fileStartsWithBom()) {
+            startOfNextRecord = bufferPosn = UTF8_BOM.size();
+          } else {
+            startOfNextRecord = bufferPosn = (int) requiredPosition;
+          }
+        } else {
+          ((SeekableByteChannel) channel).position(requiredPosition);
+          startOfNextRecord = requiredPosition;
+        }
+
+        // Read and discard the next record ensuring that startOfNextRecord 
and bufferPosn point
+        // to the beginning of the next record.
+        readNextRecord();
+        currentValue = null;
+      } else {
+        // Check to see if we start with the UTF_BOM bytes skipping them if 
present.
+        if (fileStartsWithBom()) {
+          startOfNextRecord = bufferPosn = UTF8_BOM.size();
+        }
+      }
+    }
+
+    private boolean fileStartsWithBom() throws IOException {
+      for (; ; ) {
+        int bytesRead = inChannel.read(byteBuffer);
+        if (bytesRead == -1) {
+          return false;
+        } else {
+          bufferLength += bytesRead;
+        }
+        if (bufferLength >= UTF8_BOM.size()) {
+          int i;
+          for (i = 0; i < UTF8_BOM.size() && buffer[i] == UTF8_BOM.byteAt(i); 
++i) {}
+          if (i == UTF8_BOM.size()) {
+            return true;
+          }
+          return false;
+        }
+      }
+    }
+
+    @Override
+    protected boolean readNextRecord() throws IOException {
+      startOfRecord = startOfNextRecord;
+
+      // If we have reached EOF file last time around then we will mark that 
we don't have an
+      // element and return false.
+      if (eof) {
+        currentValue = null;
+        return false;
+      }
+
+      if (delimiter == null) {
+        return readDefaultLine();
+      } else {
+        return readCustomLine();
       }
     }
 
     /**
-     * Locates the start position and end position of the next delimiter. Will 
consume the channel
-     * till either EOF or the delimiter bounds are found.
+     * Loosely based upon <a
+     * 
href="https://github.com/hanborq/hadoop/blob/master/src/core/org/apache/hadoop/util/LineReader.java";>Hadoop
+     * LineReader.java</a>
      *
-     * <p>This fills the buffer and updates the positions as follows:
+     * <p>We're reading data from inChannel, but the head of the stream may be 
already buffered in
+     * buffer, so we have several cases: 1. No newline characters are in the 
buffer, so we need to
+     * copy everything and read another buffer from the stream. 2. An 
unambiguously terminated line
+     * is in buffer, so we just create currentValue 3. Ambiguously terminated 
line is in buffer,

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to