yzeng1618 commented on code in PR #10332:
URL: https://github.com/apache/seatunnel/pull/10332#discussion_r2693072645
##########
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/ParquetFileSplitStrategy.java:
##########
@@ -78,41 +93,71 @@ List<FileSourceSplit> splitByRowGroups(
return splits;
}
long currentStart = 0;
- long currentLength = 0;
+ long currentEnd = 0;
boolean hasOpenSplit = false;
for (BlockMetaData block : rowGroups) {
long rgStart = block.getStartingPos();
long rgSize = block.getCompressedSize();
+ long rgEnd = rgStart + rgSize;
// start a new split
if (!hasOpenSplit) {
currentStart = rgStart;
- currentLength = rgSize;
+ currentEnd = rgEnd;
hasOpenSplit = true;
continue;
}
// exceeds threshold, close current split
- if (currentLength + rgSize > splitSizeBytes) {
- splits.add(new FileSourceSplit(tableId, filePath,
currentStart, currentLength));
+ if (rgEnd - currentStart > splitSizeBytes) {
+ splits.add(
+ new FileSourceSplit(
+ tableId, filePath, currentStart, currentEnd -
currentStart));
// start next split
currentStart = rgStart;
- currentLength = rgSize;
+ currentEnd = rgEnd;
} else {
- currentLength += rgSize;
+ currentEnd = rgEnd;
}
}
// last split
- if (hasOpenSplit && currentLength > 0) {
- splits.add(new FileSourceSplit(tableId, filePath, currentStart,
currentLength));
+ if (hasOpenSplit && currentEnd > currentStart) {
+ splits.add(
+ new FileSourceSplit(
+ tableId, filePath, currentStart, currentEnd -
currentStart));
}
return splits;
}
private List<BlockMetaData> readRowGroups(String filePath) throws
IOException {
Path path = new Path(filePath);
- Configuration conf = new Configuration();
- try (ParquetFileReader reader =
- ParquetFileReader.open(HadoopInputFile.fromPath(path, conf))) {
- return reader.getFooter().getBlocks();
+ if (hadoopFileSystemProxy == null) {
+ Configuration conf = new Configuration();
+ try (ParquetFileReader reader =
+ ParquetFileReader.open(HadoopInputFile.fromPath(path,
conf))) {
+ return reader.getFooter().getBlocks();
+ }
+ }
+ try {
+ return hadoopFileSystemProxy.doWithHadoopAuth(
+ (configuration, userGroupInformation) -> {
+ try (ParquetFileReader reader =
+ ParquetFileReader.open(
+ HadoopInputFile.fromPath(path,
configuration))) {
+ return reader.getFooter().getBlocks();
+ }
+ });
+ } catch (Exception e) {
+ if (e instanceof IOException) {
Review Comment:
Thanks for the feedback. I updated ParquetFileSplitStrategy to rethrow
runtime exceptions (so we don’t mask non-IO failures as IOExceptions) and added
an HDFS parquet split E2E case to cover the missing parquet scenario.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]