This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
commit b4dc68922fa6354844ddaf5093482e38a39aeb97 Author: JingsongLi <[email protected]> AuthorDate: Mon Nov 10 19:25:04 2025 +0800 [format] Refactor split in FormatTableScan --- .../org/apache/paimon/io/DataFileRecordReader.java | 56 ++++++++++------- .../paimon/table/format/FormatDataSplit.java | 1 + .../paimon/table/format/FormatReadBuilder.java | 18 ++++-- .../paimon/table/format/FormatTableScan.java | 71 +++++++++++----------- .../paimon/format/text/HadoopCompressionUtils.java | 16 +++-- .../apache/paimon/format/text/TextLineReader.java | 15 +++-- 6 files changed, 106 insertions(+), 71 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java index 3f9e377a6b..2584aef00f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java @@ -39,7 +39,6 @@ import javax.annotation.Nullable; import java.io.IOException; import java.util.Arrays; -import java.util.Collections; import java.util.Map; /** Reads {@link InternalRow} from data files. */ @@ -68,13 +67,32 @@ public class DataFileRecordReader implements FileRecordReader<InternalRow> { long maxSequenceNumber, Map<String, Integer> systemFields) throws IOException { + this( + tableRowType, + createReader(readerFactory, context), + indexMapping, + castMapping, + partitionInfo, + rowTrackingEnabled, + firstRowId, + maxSequenceNumber, + systemFields, + context.selection()); + } + + public DataFileRecordReader( + RowType tableRowType, + FileRecordReader<InternalRow> reader, + @Nullable int[] indexMapping, + @Nullable CastFieldGetter[] castMapping, + @Nullable PartitionInfo partitionInfo, + boolean rowTrackingEnabled, + @Nullable Long firstRowId, + long maxSequenceNumber, + Map<String, Integer> systemFields, + @Nullable RoaringBitmap32 selection) { this.tableRowType = tableRowType; - try { - this.reader = readerFactory.createReader(context); - } catch (Exception e) { - FileUtils.checkExists(context.fileIO(), context.filePath()); - throw e; - } + this.reader = reader; this.indexMapping = indexMapping; this.partitionInfo = partitionInfo; this.castMapping = castMapping; @@ -82,24 +100,18 @@ public class DataFileRecordReader implements FileRecordReader<InternalRow> { this.firstRowId = firstRowId; this.maxSequenceNumber = maxSequenceNumber; this.systemFields = systemFields; - this.selection = context.selection(); + this.selection = selection; } - public DataFileRecordReader( - RowType tableRowType, - FileRecordReader<InternalRow> reader, - @Nullable PartitionInfo partitionInfo) + private static FileRecordReader<InternalRow> createReader( + FormatReaderFactory readerFactory, FormatReaderFactory.Context context) throws IOException { - this.tableRowType = tableRowType; - this.reader = reader; - this.indexMapping = null; - this.partitionInfo = partitionInfo; - this.castMapping = null; - this.rowTrackingEnabled = false; - this.firstRowId = null; - this.maxSequenceNumber = 0L; - this.systemFields = Collections.emptyMap(); - this.selection = null; + try { + return readerFactory.createReader(context); + } catch (Exception e) { + FileUtils.checkExists(context.fileIO(), context.filePath()); + throw e; + } } @Nullable diff --git a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatDataSplit.java b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatDataSplit.java index e228b043f8..e446337641 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatDataSplit.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatDataSplit.java @@ -70,6 +70,7 @@ public class FormatDataSplit implements Split { return offset; } + @Nullable public Long length() { return length; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java index 54bba0d6a3..449ae74029 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java @@ -45,6 +45,7 @@ import org.apache.paimon.utils.Pair; import javax.annotation.Nullable; import java.io.IOException; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -55,6 +56,7 @@ import static org.apache.paimon.partition.PartitionPredicate.fromPredicate; import static org.apache.paimon.predicate.PredicateBuilder.excludePredicateWithFields; import static org.apache.paimon.predicate.PredicateBuilder.fieldIdxToPartitionIdx; import static org.apache.paimon.predicate.PredicateBuilder.splitAndByPartition; +import static org.apache.paimon.utils.Preconditions.checkArgument; /** {@link ReadBuilder} for {@link FormatTable}. */ public class FormatReadBuilder implements ReadBuilder { @@ -175,17 +177,25 @@ public class FormatReadBuilder implements ReadBuilder { table.partitionKeys(), readType().getFields(), table.partitionType()); try { FileRecordReader<InternalRow> reader; - if (dataSplit.length() != null) { + Long length = dataSplit.length(); + if (length != null) { reader = - readerFactory.createReader( - formatReaderContext, dataSplit.offset(), dataSplit.length()); + readerFactory.createReader(formatReaderContext, dataSplit.offset(), length); } else { + checkArgument(dataSplit.offset() == 0, "Offset must be 0."); reader = readerFactory.createReader(formatReaderContext); } return new DataFileRecordReader( readType(), reader, - PartitionUtils.create(partitionMapping, dataSplit.partition())); + null, + null, + PartitionUtils.create(partitionMapping, dataSplit.partition()), + false, + null, + 0, + Collections.emptyMap(), + null); } catch (Exception e) { FileUtils.checkExists(formatReaderContext.fileIO(), formatReaderContext.filePath()); throw e; diff --git a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java index 96c3805470..20cbb9b31d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java @@ -28,6 +28,7 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; import org.apache.paimon.manifest.PartitionEntry; +import org.apache.paimon.options.Options; import org.apache.paimon.partition.PartitionPredicate; import org.apache.paimon.partition.PartitionPredicate.DefaultPartitionPredicate; import org.apache.paimon.partition.PartitionPredicate.MultiplePartitionPredicate; @@ -49,12 +50,15 @@ import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; +import static org.apache.paimon.format.text.HadoopCompressionUtils.isCompressed; +import static org.apache.paimon.format.text.TextLineReader.isDefaultDelimiter; import static org.apache.paimon.utils.InternalRowPartitionComputer.convertSpecToInternalRow; import static org.apache.paimon.utils.PartitionPathUtils.searchPartSpecAndPaths; @@ -65,6 +69,8 @@ public class FormatTableScan implements InnerTableScan { private final CoreOptions coreOptions; @Nullable private PartitionPredicate partitionFilter; @Nullable private final Integer limit; + private final long targetSplitSize; + private final FormatTable.Format format; public FormatTableScan( FormatTable table, @@ -74,6 +80,8 @@ public class FormatTableScan implements InnerTableScan { this.coreOptions = new CoreOptions(table.options()); this.partitionFilter = partitionFilter; this.limit = limit; + this.targetSplitSize = coreOptions.splitTargetSize(); + this.format = table.format(); } @Override @@ -242,56 +250,51 @@ public class FormatTableScan implements InnerTableScan { FileStatus[] files = fileIO.listFiles(path, true); for (FileStatus file : files) { if (isDataFileName(file.getPath().getName())) { - List<FormatDataSplit> fileSplits = - tryToSplitLargeFile( - table.format(), file, coreOptions.splitTargetSize(), partition); + List<FormatDataSplit> fileSplits = tryToSplitLargeFile(file, partition); splits.addAll(fileSplits); } } return splits; } - private List<FormatDataSplit> tryToSplitLargeFile( - FormatTable.Format format, FileStatus file, long maxSplitBytes, BinaryRow partition) { - boolean isSplittableFile = - ((format == FormatTable.Format.CSV - && !table.options() - .containsKey(CsvOptions.LINE_DELIMITER.key())) - || (format == FormatTable.Format.JSON - && !table.options() - .containsKey(JsonOptions.LINE_DELIMITER.key()))) - && isTextFileUncompressed(file.getPath().getName()); + private List<FormatDataSplit> tryToSplitLargeFile(FileStatus file, BinaryRow partition) { + if (!preferToSplitFile(file)) { + return Collections.singletonList( + new FormatDataSplit(file.getPath(), file.getLen(), partition)); + } List<FormatDataSplit> splits = new ArrayList<>(); - if (isSplittableFile && file.getLen() > maxSplitBytes) { - long remainingBytes = file.getLen(); - long currentStart = 0; + long remainingBytes = file.getLen(); + long currentStart = 0; - while (remainingBytes > 0) { - long splitSize = Math.min(maxSplitBytes, remainingBytes); + while (remainingBytes > 0) { + long splitSize = Math.min(targetSplitSize, remainingBytes); - FormatDataSplit split = - new FormatDataSplit( - file.getPath(), file.getLen(), currentStart, splitSize, partition); - splits.add(split); - currentStart += splitSize; - remainingBytes -= splitSize; - } - } else { - splits.add(new FormatDataSplit(file.getPath(), file.getLen(), partition)); + FormatDataSplit split = + new FormatDataSplit( + file.getPath(), file.getLen(), currentStart, splitSize, partition); + splits.add(split); + currentStart += splitSize; + remainingBytes -= splitSize; } return splits; } - private static boolean isTextFileUncompressed(String fileName) { - if (fileName == null || fileName.trim().isEmpty()) { + private boolean preferToSplitFile(FileStatus file) { + if (file.getLen() <= targetSplitSize) { return false; } - String[] parts = fileName.split("\\."); - if (parts.length < 2) { - return false; + + Options options = coreOptions.toConfiguration(); + switch (format) { + case CSV: + return !isCompressed(file.getPath()) + && isDefaultDelimiter(options.get(CsvOptions.LINE_DELIMITER)); + case JSON: + return !isCompressed(file.getPath()) + && isDefaultDelimiter(options.get(JsonOptions.LINE_DELIMITER)); + default: + return false; } - String lastExt = parts[parts.length - 1].toLowerCase(); - return "csv".equals(lastExt) || "json".equals(lastExt); } public static Map<String, String> extractLeadingEqualityPartitionSpecWhenOnlyAnd( diff --git a/paimon-format/src/main/java/org/apache/paimon/format/text/HadoopCompressionUtils.java b/paimon-format/src/main/java/org/apache/paimon/format/text/HadoopCompressionUtils.java index 0a292c4d8b..6ab7734872 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/text/HadoopCompressionUtils.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/text/HadoopCompressionUtils.java @@ -69,11 +69,7 @@ public class HadoopCompressionUtils { return inputStream; } - CompressionCodecFactory codecFactory = - new CompressionCodecFactory(new Configuration(false)); - - CompressionCodec codec = - codecFactory.getCodec(new org.apache.hadoop.fs.Path(filePath.toString())); + CompressionCodec codec = getCompressionCodec(filePath); if (codec != null) { return codec.createInputStream(inputStream); } @@ -83,6 +79,16 @@ public class HadoopCompressionUtils { } } + public static boolean isCompressed(Path filePath) { + return getCompressionCodec(filePath) != null; + } + + public static CompressionCodec getCompressionCodec(Path filePath) { + CompressionCodecFactory codecFactory = + new CompressionCodecFactory(new Configuration(false)); + return codecFactory.getCodec(new org.apache.hadoop.fs.Path(filePath.toString())); + } + /** * Gets a compression codec by compression type. * diff --git a/paimon-format/src/main/java/org/apache/paimon/format/text/TextLineReader.java b/paimon-format/src/main/java/org/apache/paimon/format/text/TextLineReader.java index 50fb00c66b..e65d647ee2 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/text/TextLineReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/text/TextLineReader.java @@ -34,18 +34,21 @@ public interface TextLineReader extends Closeable { static TextLineReader create( InputStream inputStream, String delimiter, long offset, @Nullable Long length) throws IOException { - byte[] delimiterBytes = - delimiter != null && !"\n".equals(delimiter) - ? delimiter.getBytes(StandardCharsets.UTF_8) - : null; - if (delimiterBytes == null || delimiterBytes.length == 0) { + if (isDefaultDelimiter(delimiter)) { return new StandardLineReader(inputStream, offset, length); } else { if (offset != 0 || length != null) { throw new UnsupportedOperationException( "Custom line text file does not support offset and length."); } - return new CustomLineReader(inputStream, delimiterBytes); + return new CustomLineReader(inputStream, delimiter.getBytes(StandardCharsets.UTF_8)); } } + + static boolean isDefaultDelimiter(String delimiter) { + return delimiter == null + || "\n".equals(delimiter) + || "\r\n".equals(delimiter) + || "\r".equals(delimiter); + } }
