stankiewicz commented on code in PR #29202:
URL: https://github.com/apache/beam/pull/29202#discussion_r1381837714


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java:
##########
@@ -171,21 +194,42 @@ protected void startReading(ReadableByteChannel channel) 
throws IOException {
           } else {
             startOfNextRecord = bufferPosn = (int) requiredPosition;
           }
+          skipHeader(skipHeaderLines, true);
         } else {
-          ((SeekableByteChannel) channel).position(requiredPosition);
-          startOfNextRecord = requiredPosition;
+          skipHeader(skipHeaderLines, false);
+          if (requiredPosition > startOfNextRecord) {

Review Comment:
   good point



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java:
##########
@@ -171,21 +194,42 @@ protected void startReading(ReadableByteChannel channel) 
throws IOException {
           } else {
             startOfNextRecord = bufferPosn = (int) requiredPosition;
           }
+          skipHeader(skipHeaderLines, true);
         } else {
-          ((SeekableByteChannel) channel).position(requiredPosition);
-          startOfNextRecord = requiredPosition;
+          skipHeader(skipHeaderLines, false);
+          if (requiredPosition > startOfNextRecord) {
+            ((SeekableByteChannel) channel).position(requiredPosition);
+            startOfNextRecord = requiredPosition;
+            bufferLength = bufferPosn = 0;
+            // Read and discard the next record ensuring that 
startOfNextRecord and bufferPosn point
+            // to the beginning of the next record.
+            readNextRecord();
+            currentValue = null;
+          }
         }
 
-        // Read and discard the next record ensuring that startOfNextRecord 
and bufferPosn point
-        // to the beginning of the next record.
-        readNextRecord();
-        currentValue = null;
       } else {
         // Check to see if we start with the UTF_BOM bytes skipping them if 
present.
         if (fileStartsWithBom()) {
           startOfNextRecord = bufferPosn = UTF8_BOM.size();
         }
+        skipHeader(skipHeaderLines, false);
+      }
+    }
+
+    private void skipHeader(int headerLines, boolean skipFirstLine) throws 
IOException {
+      if (headerLines == 1) {
+        readNextRecord();
+      } else if (headerLines > 1) {

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to