This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new 5744268157 [csv] Fix projection read for Csv File Format (#6147) 5744268157 is described below commit 57442681571027697518afa79867b7ce1b79ab36 Author: jerry <lining....@alibaba-inc.com> AuthorDate: Wed Aug 27 11:04:52 2025 +0800 [csv] Fix projection read for Csv File Format (#6147) --- .../apache/paimon/flink/BatchFileStoreITCase.java | 4 +- .../apache/paimon/format/csv/CsvFileFormat.java | 18 ++++-- .../apache/paimon/format/csv/CsvFileReader.java | 70 +++++++++++++++----- .../paimon/format/csv/CsvFileFormatTest.java | 75 +++++++++++++++++++--- 4 files changed, 136 insertions(+), 31 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java index f576010fc3..066eeb31ca 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java @@ -89,7 +89,7 @@ public class BatchFileStoreITCase extends CatalogITCaseBase { "CREATE TABLE TEXT_NONE (a INT, b INT, c INT) WITH ('file.format'='%s', 'file.compression'='none')", format); sql("INSERT INTO TEXT_NONE VALUES (1, 2, 3)"); - assertThat(sql("SELECT * FROM TEXT_NONE")).containsExactly(Row.of(1, 2, 3)); + assertThat(sql("SELECT a FROM TEXT_NONE")).containsExactly(Row.of(1)); List<String> files = sql("select file_path from `TEXT_NONE$files`").stream() .map(r -> r.getField(0).toString()) @@ -100,7 +100,7 @@ public class BatchFileStoreITCase extends CatalogITCaseBase { "CREATE TABLE TEXT_GZIP (a INT, b INT, c INT) WITH ('file.format'='%s', 'file.compression'='gzip')", format); sql("INSERT INTO TEXT_GZIP VALUES (1, 2, 3)"); - assertThat(sql("SELECT * FROM TEXT_GZIP")).containsExactly(Row.of(1, 2, 3)); + assertThat(sql("SELECT b FROM TEXT_GZIP")).containsExactly(Row.of(2)); files = sql("select file_path from `TEXT_GZIP$files`").stream() .map(r -> r.getField(0).toString()) diff --git a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java index 393d4dc050..5ee4483b70 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java @@ -54,7 +54,7 @@ public class CsvFileFormat extends FileFormat { RowType dataSchemaRowType, RowType projectedRowType, @Nullable List<Predicate> filters) { - return new CsvReaderFactory(projectedRowType, options); + return new CsvReaderFactory(dataSchemaRowType, projectedRowType, options); } @Override @@ -101,17 +101,25 @@ public class CsvFileFormat extends FileFormat { /** CSV {@link FormatReaderFactory} implementation. */ private static class CsvReaderFactory implements FormatReaderFactory { - private final RowType rowType; + private final RowType dataSchemaRowType; + private final RowType projectedRowType; private final CsvOptions options; - public CsvReaderFactory(RowType rowType, CsvOptions options) { - this.rowType = rowType; + public CsvReaderFactory( + RowType dataSchemaRowType, RowType projectedRowType, CsvOptions options) { + this.dataSchemaRowType = dataSchemaRowType; + this.projectedRowType = projectedRowType; this.options = options; } @Override public FileRecordReader<InternalRow> createReader(Context context) throws IOException { - return new CsvFileReader(context.fileIO(), context.filePath(), rowType, options); + return new CsvFileReader( + context.fileIO(), + context.filePath(), + dataSchemaRowType, + projectedRowType, + options); } } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java index 36c1043095..0e215aa3fb 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java @@ -51,12 +51,23 @@ public class CsvFileReader extends BaseTextFileReader { private final CsvOptions formatOptions; private final CsvSchema schema; + private final RowType dataSchemaRowType; + private final RowType projectedRowType; + private final int[] projectionMapping; private boolean headerSkipped = false; - public CsvFileReader(FileIO fileIO, Path filePath, RowType rowType, CsvOptions options) + public CsvFileReader( + FileIO fileIO, + Path filePath, + RowType rowReadType, + RowType projectedRowType, + CsvOptions options) throws IOException { - super(fileIO, filePath, rowType); + super(fileIO, filePath, projectedRowType); + this.dataSchemaRowType = rowReadType; + this.projectedRowType = projectedRowType; this.formatOptions = options; + this.projectionMapping = createProjectionMapping(rowReadType, projectedRowType); this.schema = CsvSchema.emptySchema() .withQuoteChar(formatOptions.quoteCharacter().charAt(0)) @@ -99,25 +110,52 @@ public class CsvFileReader extends BaseTextFileReader { return CSV_MAPPER.readerFor(String[].class).with(schema).readValue(line); } + /** + * Creates a mapping array from read schema to projected schema. Returns indices of projected + * columns in the read schema. + */ + private static int[] createProjectionMapping(RowType rowReadType, RowType projectedRowType) { + int[] mapping = new int[projectedRowType.getFieldCount()]; + for (int i = 0; i < projectedRowType.getFieldCount(); i++) { + String projectedFieldName = projectedRowType.getFieldNames().get(i); + int readIndex = rowReadType.getFieldNames().indexOf(projectedFieldName); + if (readIndex == -1) { + throw new IllegalArgumentException( + String.format( + "Projected field '%s' not found in read schema", + projectedFieldName)); + } + mapping[i] = readIndex; + } + return mapping; + } + private InternalRow parseCsvLine(String line, CsvSchema schema) throws IOException { String[] fields = parseCsvLineToArray(line, schema); - int fieldCount = Math.min(fields.length, rowType.getFieldCount()); - Object[] values = new Object[fieldCount]; // Pre-allocated array - - for (int i = 0; i < fieldCount; i++) { - String field = fields[i]; - - // Fast path for null values - if (field == null || field.equals(formatOptions.nullLiteral()) || field.isEmpty()) { - values[i] = null; - continue; + int fieldCount = fields.length; + + // Directly parse only projected fields to avoid unnecessary parsing + Object[] projectedValues = new Object[projectedRowType.getFieldCount()]; + for (int i = 0; i < projectedRowType.getFieldCount(); i++) { + int readIndex = projectionMapping[i]; + // Check if the field exists in the CSV line + if (readIndex < fieldCount) { + String field = fields[readIndex]; + // Fast path for null values - check if field is null or empty first + if (field == null || field.isEmpty() || field.equals(formatOptions.nullLiteral())) { + projectedValues[i] = null; + continue; + } + + // Optimized field parsing with cached cast executors + projectedValues[i] = + parseFieldOptimized(field.trim(), dataSchemaRowType.getTypeAt(readIndex)); + } else { + projectedValues[i] = null; // Field not present in the CSV line } - - // Optimized field parsing with cached cast executors - values[i] = parseFieldOptimized(field.trim(), rowType.getTypeAt(i)); } - return GenericRow.of(values); + return GenericRow.of(projectedValues); } /** Optimized field parsing with caching and fast paths for common types. */ diff --git a/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java b/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java index c66094e4b7..a1ae1a2772 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java @@ -181,7 +181,11 @@ public class CsvFileFormatTest extends FormatReadWriteTest { List<InternalRow> result = writeThenRead( - options, rowType, testData, "test_field_delim_" + delimiter.hashCode()); + options, + rowType, + rowType, + testData, + "test_field_delim_" + delimiter.hashCode()); // Verify results assertThat(result).hasSize(3); @@ -216,7 +220,11 @@ public class CsvFileFormatTest extends FormatReadWriteTest { List<InternalRow> result = writeThenRead( - options, rowType, testData, "test_line_delim_" + delimiter.hashCode()); + options, + rowType, + rowType, + testData, + "test_line_delim_" + delimiter.hashCode()); // Verify results assertThat(result).hasSize(3); @@ -248,7 +256,11 @@ public class CsvFileFormatTest extends FormatReadWriteTest { List<InternalRow> result = writeThenRead( - options, rowType, testData, "test_quote_char_" + quoteChar.hashCode()); + options, + rowType, + rowType, + testData, + "test_quote_char_" + quoteChar.hashCode()); // Verify results assertThat(result).hasSize(3); @@ -282,6 +294,7 @@ public class CsvFileFormatTest extends FormatReadWriteTest { writeThenRead( options, rowType, + rowType, testData, "test_escape_char_" + escapeChar.hashCode()); @@ -314,7 +327,11 @@ public class CsvFileFormatTest extends FormatReadWriteTest { List<InternalRow> result = writeThenRead( - options, rowType, testData, "test_include_header_" + includeHeader); + options, + rowType, + rowType, + testData, + "test_include_header_" + includeHeader); // Verify results assertThat(result).hasSize(3); @@ -352,6 +369,7 @@ public class CsvFileFormatTest extends FormatReadWriteTest { writeThenRead( options, rowType, + rowType, testData, "test_null_literal_" + nullLiteral.hashCode()); @@ -396,7 +414,7 @@ public class CsvFileFormatTest extends FormatReadWriteTest { GenericRow.of(4, BinaryString.fromString("Normal"), 400.0, null)); List<InternalRow> result = - writeThenRead(options, rowType, testData, "test_csv_combination"); + writeThenRead(options, rowType, rowType, testData, "test_csv_combination"); // Verify results assertThat(result).hasSize(4); @@ -494,6 +512,43 @@ public class CsvFileFormatTest extends FormatReadWriteTest { return true; } + @Test + public void testProjectionPushdown() throws IOException { + RowType fullRowType = + RowType.builder() + .field("id", DataTypes.INT().notNull()) + .field("name", DataTypes.STRING()) + .field("score", DataTypes.DOUBLE()) + .field("active", DataTypes.BOOLEAN()) + .build(); + + RowType projectedRowType = + RowType.builder() + .field("score", DataTypes.DOUBLE()) + .field("name", DataTypes.STRING()) + .build(); + + List<InternalRow> testData = + Arrays.asList( + GenericRow.of(1, BinaryString.fromString("Alice"), null, true), + GenericRow.of(2, null, 87.2, false), + GenericRow.of(3, BinaryString.fromString("Charlie"), 92.8, null)); + + List<InternalRow> result = + writeThenRead( + new Options(), fullRowType, projectedRowType, testData, "test_projection"); + + assertThat(result).hasSize(3); + assertThat(result.get(0).isNullAt(0)).isTrue(); // score is null + assertThat(result.get(0).getString(1).toString()).isEqualTo("Alice"); + + assertThat(result.get(1).getDouble(0)).isEqualTo(87.2); + assertThat(result.get(1).isNullAt(1)).isTrue(); // name is null + + assertThat(result.get(2).getDouble(0)).isEqualTo(92.8); + assertThat(result.get(2).getString(1).toString()).isEqualTo("Charlie"); + } + private String[] parse(String csvLine) throws IOException { CsvSchema schema = CsvSchema.emptySchema() @@ -509,13 +564,17 @@ public class CsvFileFormatTest extends FormatReadWriteTest { * that was read back for further verification. */ private List<InternalRow> writeThenRead( - Options options, RowType rowType, List<InternalRow> testData, String testPrefix) + Options options, + RowType fullRowType, + RowType rowType, + List<InternalRow> testData, + String testPrefix) throws IOException { FileFormat format = new CsvFileFormatFactory().create(new FormatContext(options, 1024, 1024)); Path testFile = new Path(parent, testPrefix + "_" + UUID.randomUUID() + ".csv"); - FormatWriterFactory writerFactory = format.createWriterFactory(rowType); + FormatWriterFactory writerFactory = format.createWriterFactory(fullRowType); try (PositionOutputStream out = fileIO.newOutputStream(testFile, false); FormatWriter writer = writerFactory.create(out, "none")) { for (InternalRow row : testData) { @@ -523,7 +582,7 @@ public class CsvFileFormatTest extends FormatReadWriteTest { } } try (RecordReader<InternalRow> reader = - format.createReaderFactory(rowType, rowType, new ArrayList<>()) + format.createReaderFactory(fullRowType, rowType, new ArrayList<>()) .createReader( new FormatReaderContext( fileIO, testFile, fileIO.getFileSize(testFile)))) {