yzeng1618 commented on code in PR #10332:
URL: https://github.com/apache/seatunnel/pull/10332#discussion_r2693072645


##########
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/ParquetFileSplitStrategy.java:
##########
@@ -78,41 +93,71 @@ List<FileSourceSplit> splitByRowGroups(
             return splits;
         }
         long currentStart = 0;
-        long currentLength = 0;
+        long currentEnd = 0;
         boolean hasOpenSplit = false;
         for (BlockMetaData block : rowGroups) {
             long rgStart = block.getStartingPos();
             long rgSize = block.getCompressedSize();
+            long rgEnd = rgStart + rgSize;
             // start a new split
             if (!hasOpenSplit) {
                 currentStart = rgStart;
-                currentLength = rgSize;
+                currentEnd = rgEnd;
                 hasOpenSplit = true;
                 continue;
             }
             // exceeds threshold, close current split
-            if (currentLength + rgSize > splitSizeBytes) {
-                splits.add(new FileSourceSplit(tableId, filePath, 
currentStart, currentLength));
+            if (rgEnd - currentStart > splitSizeBytes) {
+                splits.add(
+                        new FileSourceSplit(
+                                tableId, filePath, currentStart, currentEnd - 
currentStart));
                 // start next split
                 currentStart = rgStart;
-                currentLength = rgSize;
+                currentEnd = rgEnd;
             } else {
-                currentLength += rgSize;
+                currentEnd = rgEnd;
             }
         }
         // last split
-        if (hasOpenSplit && currentLength > 0) {
-            splits.add(new FileSourceSplit(tableId, filePath, currentStart, 
currentLength));
+        if (hasOpenSplit && currentEnd > currentStart) {
+            splits.add(
+                    new FileSourceSplit(
+                            tableId, filePath, currentStart, currentEnd - 
currentStart));
         }
         return splits;
     }
 
     private List<BlockMetaData> readRowGroups(String filePath) throws 
IOException {
         Path path = new Path(filePath);
-        Configuration conf = new Configuration();
-        try (ParquetFileReader reader =
-                ParquetFileReader.open(HadoopInputFile.fromPath(path, conf))) {
-            return reader.getFooter().getBlocks();
+        if (hadoopFileSystemProxy == null) {
+            Configuration conf = new Configuration();
+            try (ParquetFileReader reader =
+                    ParquetFileReader.open(HadoopInputFile.fromPath(path, 
conf))) {
+                return reader.getFooter().getBlocks();
+            }
+        }
+        try {
+            return hadoopFileSystemProxy.doWithHadoopAuth(
+                    (configuration, userGroupInformation) -> {
+                        try (ParquetFileReader reader =
+                                ParquetFileReader.open(
+                                        HadoopInputFile.fromPath(path, 
configuration))) {
+                            return reader.getFooter().getBlocks();
+                        }
+                    });
+        } catch (Exception e) {
+            if (e instanceof IOException) {

Review Comment:
   Thanks for the feedback. I updated ParquetFileSplitStrategy to rethrow 
runtime exceptions (so we don’t mask non-IO failures as IOExceptions) and added 
an HDFS parquet split E2E case to cover the missing parquet scenario.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to