This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5744268157 [csv] Fix projection read for Csv File Format (#6147)
5744268157 is described below

commit 57442681571027697518afa79867b7ce1b79ab36
Author: jerry <lining....@alibaba-inc.com>
AuthorDate: Wed Aug 27 11:04:52 2025 +0800

    [csv] Fix projection read for Csv File Format (#6147)
---
 .../apache/paimon/flink/BatchFileStoreITCase.java  |  4 +-
 .../apache/paimon/format/csv/CsvFileFormat.java    | 18 ++++--
 .../apache/paimon/format/csv/CsvFileReader.java    | 70 +++++++++++++++-----
 .../paimon/format/csv/CsvFileFormatTest.java       | 75 +++++++++++++++++++---
 4 files changed, 136 insertions(+), 31 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index f576010fc3..066eeb31ca 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -89,7 +89,7 @@ public class BatchFileStoreITCase extends CatalogITCaseBase {
                 "CREATE TABLE TEXT_NONE (a INT, b INT, c INT) WITH 
('file.format'='%s', 'file.compression'='none')",
                 format);
         sql("INSERT INTO TEXT_NONE VALUES (1, 2, 3)");
-        assertThat(sql("SELECT * FROM TEXT_NONE")).containsExactly(Row.of(1, 
2, 3));
+        assertThat(sql("SELECT a FROM TEXT_NONE")).containsExactly(Row.of(1));
         List<String> files =
                 sql("select file_path from `TEXT_NONE$files`").stream()
                         .map(r -> r.getField(0).toString())
@@ -100,7 +100,7 @@ public class BatchFileStoreITCase extends CatalogITCaseBase 
{
                 "CREATE TABLE TEXT_GZIP (a INT, b INT, c INT) WITH 
('file.format'='%s', 'file.compression'='gzip')",
                 format);
         sql("INSERT INTO TEXT_GZIP VALUES (1, 2, 3)");
-        assertThat(sql("SELECT * FROM TEXT_GZIP")).containsExactly(Row.of(1, 
2, 3));
+        assertThat(sql("SELECT b FROM TEXT_GZIP")).containsExactly(Row.of(2));
         files =
                 sql("select file_path from `TEXT_GZIP$files`").stream()
                         .map(r -> r.getField(0).toString())
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java 
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java
index 393d4dc050..5ee4483b70 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java
@@ -54,7 +54,7 @@ public class CsvFileFormat extends FileFormat {
             RowType dataSchemaRowType,
             RowType projectedRowType,
             @Nullable List<Predicate> filters) {
-        return new CsvReaderFactory(projectedRowType, options);
+        return new CsvReaderFactory(dataSchemaRowType, projectedRowType, 
options);
     }
 
     @Override
@@ -101,17 +101,25 @@ public class CsvFileFormat extends FileFormat {
     /** CSV {@link FormatReaderFactory} implementation. */
     private static class CsvReaderFactory implements FormatReaderFactory {
 
-        private final RowType rowType;
+        private final RowType dataSchemaRowType;
+        private final RowType projectedRowType;
         private final CsvOptions options;
 
-        public CsvReaderFactory(RowType rowType, CsvOptions options) {
-            this.rowType = rowType;
+        public CsvReaderFactory(
+                RowType dataSchemaRowType, RowType projectedRowType, 
CsvOptions options) {
+            this.dataSchemaRowType = dataSchemaRowType;
+            this.projectedRowType = projectedRowType;
             this.options = options;
         }
 
         @Override
         public FileRecordReader<InternalRow> createReader(Context context) 
throws IOException {
-            return new CsvFileReader(context.fileIO(), context.filePath(), 
rowType, options);
+            return new CsvFileReader(
+                    context.fileIO(),
+                    context.filePath(),
+                    dataSchemaRowType,
+                    projectedRowType,
+                    options);
         }
     }
 
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java 
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
index 36c1043095..0e215aa3fb 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
@@ -51,12 +51,23 @@ public class CsvFileReader extends BaseTextFileReader {
 
     private final CsvOptions formatOptions;
     private final CsvSchema schema;
+    private final RowType dataSchemaRowType;
+    private final RowType projectedRowType;
+    private final int[] projectionMapping;
     private boolean headerSkipped = false;
 
-    public CsvFileReader(FileIO fileIO, Path filePath, RowType rowType, 
CsvOptions options)
+    public CsvFileReader(
+            FileIO fileIO,
+            Path filePath,
+            RowType rowReadType,
+            RowType projectedRowType,
+            CsvOptions options)
             throws IOException {
-        super(fileIO, filePath, rowType);
+        super(fileIO, filePath, projectedRowType);
+        this.dataSchemaRowType = rowReadType;
+        this.projectedRowType = projectedRowType;
         this.formatOptions = options;
+        this.projectionMapping = createProjectionMapping(rowReadType, 
projectedRowType);
         this.schema =
                 CsvSchema.emptySchema()
                         
.withQuoteChar(formatOptions.quoteCharacter().charAt(0))
@@ -99,25 +110,52 @@ public class CsvFileReader extends BaseTextFileReader {
         return 
CSV_MAPPER.readerFor(String[].class).with(schema).readValue(line);
     }
 
+    /**
+     * Creates a mapping array from read schema to projected schema. Returns 
indices of projected
+     * columns in the read schema.
+     */
+    private static int[] createProjectionMapping(RowType rowReadType, RowType 
projectedRowType) {
+        int[] mapping = new int[projectedRowType.getFieldCount()];
+        for (int i = 0; i < projectedRowType.getFieldCount(); i++) {
+            String projectedFieldName = 
projectedRowType.getFieldNames().get(i);
+            int readIndex = 
rowReadType.getFieldNames().indexOf(projectedFieldName);
+            if (readIndex == -1) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Projected field '%s' not found in read 
schema",
+                                projectedFieldName));
+            }
+            mapping[i] = readIndex;
+        }
+        return mapping;
+    }
+
     private InternalRow parseCsvLine(String line, CsvSchema schema) throws 
IOException {
         String[] fields = parseCsvLineToArray(line, schema);
-        int fieldCount = Math.min(fields.length, rowType.getFieldCount());
-        Object[] values = new Object[fieldCount]; // Pre-allocated array
-
-        for (int i = 0; i < fieldCount; i++) {
-            String field = fields[i];
-
-            // Fast path for null values
-            if (field == null || field.equals(formatOptions.nullLiteral()) || 
field.isEmpty()) {
-                values[i] = null;
-                continue;
+        int fieldCount = fields.length;
+
+        // Directly parse only projected fields to avoid unnecessary parsing
+        Object[] projectedValues = new 
Object[projectedRowType.getFieldCount()];
+        for (int i = 0; i < projectedRowType.getFieldCount(); i++) {
+            int readIndex = projectionMapping[i];
+            // Check if the field exists in the CSV line
+            if (readIndex < fieldCount) {
+                String field = fields[readIndex];
+                // Fast path for null values - check if field is null or empty 
first
+                if (field == null || field.isEmpty() || 
field.equals(formatOptions.nullLiteral())) {
+                    projectedValues[i] = null;
+                    continue;
+                }
+
+                // Optimized field parsing with cached cast executors
+                projectedValues[i] =
+                        parseFieldOptimized(field.trim(), 
dataSchemaRowType.getTypeAt(readIndex));
+            } else {
+                projectedValues[i] = null; // Field not present in the CSV line
             }
-
-            // Optimized field parsing with cached cast executors
-            values[i] = parseFieldOptimized(field.trim(), 
rowType.getTypeAt(i));
         }
 
-        return GenericRow.of(values);
+        return GenericRow.of(projectedValues);
     }
 
     /** Optimized field parsing with caching and fast paths for common types. 
*/
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java
index c66094e4b7..a1ae1a2772 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java
@@ -181,7 +181,11 @@ public class CsvFileFormatTest extends FormatReadWriteTest 
{
 
             List<InternalRow> result =
                     writeThenRead(
-                            options, rowType, testData, "test_field_delim_" + 
delimiter.hashCode());
+                            options,
+                            rowType,
+                            rowType,
+                            testData,
+                            "test_field_delim_" + delimiter.hashCode());
 
             // Verify results
             assertThat(result).hasSize(3);
@@ -216,7 +220,11 @@ public class CsvFileFormatTest extends FormatReadWriteTest 
{
 
             List<InternalRow> result =
                     writeThenRead(
-                            options, rowType, testData, "test_line_delim_" + 
delimiter.hashCode());
+                            options,
+                            rowType,
+                            rowType,
+                            testData,
+                            "test_line_delim_" + delimiter.hashCode());
 
             // Verify results
             assertThat(result).hasSize(3);
@@ -248,7 +256,11 @@ public class CsvFileFormatTest extends FormatReadWriteTest 
{
 
             List<InternalRow> result =
                     writeThenRead(
-                            options, rowType, testData, "test_quote_char_" + 
quoteChar.hashCode());
+                            options,
+                            rowType,
+                            rowType,
+                            testData,
+                            "test_quote_char_" + quoteChar.hashCode());
 
             // Verify results
             assertThat(result).hasSize(3);
@@ -282,6 +294,7 @@ public class CsvFileFormatTest extends FormatReadWriteTest {
                     writeThenRead(
                             options,
                             rowType,
+                            rowType,
                             testData,
                             "test_escape_char_" + escapeChar.hashCode());
 
@@ -314,7 +327,11 @@ public class CsvFileFormatTest extends FormatReadWriteTest 
{
 
             List<InternalRow> result =
                     writeThenRead(
-                            options, rowType, testData, "test_include_header_" 
+ includeHeader);
+                            options,
+                            rowType,
+                            rowType,
+                            testData,
+                            "test_include_header_" + includeHeader);
 
             // Verify results
             assertThat(result).hasSize(3);
@@ -352,6 +369,7 @@ public class CsvFileFormatTest extends FormatReadWriteTest {
                     writeThenRead(
                             options,
                             rowType,
+                            rowType,
                             testData,
                             "test_null_literal_" + nullLiteral.hashCode());
 
@@ -396,7 +414,7 @@ public class CsvFileFormatTest extends FormatReadWriteTest {
                         GenericRow.of(4, BinaryString.fromString("Normal"), 
400.0, null));
 
         List<InternalRow> result =
-                writeThenRead(options, rowType, testData, 
"test_csv_combination");
+                writeThenRead(options, rowType, rowType, testData, 
"test_csv_combination");
 
         // Verify results
         assertThat(result).hasSize(4);
@@ -494,6 +512,43 @@ public class CsvFileFormatTest extends FormatReadWriteTest 
{
         return true;
     }
 
+    @Test
+    public void testProjectionPushdown() throws IOException {
+        RowType fullRowType =
+                RowType.builder()
+                        .field("id", DataTypes.INT().notNull())
+                        .field("name", DataTypes.STRING())
+                        .field("score", DataTypes.DOUBLE())
+                        .field("active", DataTypes.BOOLEAN())
+                        .build();
+
+        RowType projectedRowType =
+                RowType.builder()
+                        .field("score", DataTypes.DOUBLE())
+                        .field("name", DataTypes.STRING())
+                        .build();
+
+        List<InternalRow> testData =
+                Arrays.asList(
+                        GenericRow.of(1, BinaryString.fromString("Alice"), 
null, true),
+                        GenericRow.of(2, null, 87.2, false),
+                        GenericRow.of(3, BinaryString.fromString("Charlie"), 
92.8, null));
+
+        List<InternalRow> result =
+                writeThenRead(
+                        new Options(), fullRowType, projectedRowType, 
testData, "test_projection");
+
+        assertThat(result).hasSize(3);
+        assertThat(result.get(0).isNullAt(0)).isTrue(); // score is null
+        assertThat(result.get(0).getString(1).toString()).isEqualTo("Alice");
+
+        assertThat(result.get(1).getDouble(0)).isEqualTo(87.2);
+        assertThat(result.get(1).isNullAt(1)).isTrue(); // name is null
+
+        assertThat(result.get(2).getDouble(0)).isEqualTo(92.8);
+        assertThat(result.get(2).getString(1).toString()).isEqualTo("Charlie");
+    }
+
     private String[] parse(String csvLine) throws IOException {
         CsvSchema schema =
                 CsvSchema.emptySchema()
@@ -509,13 +564,17 @@ public class CsvFileFormatTest extends 
FormatReadWriteTest {
      * that was read back for further verification.
      */
     private List<InternalRow> writeThenRead(
-            Options options, RowType rowType, List<InternalRow> testData, 
String testPrefix)
+            Options options,
+            RowType fullRowType,
+            RowType rowType,
+            List<InternalRow> testData,
+            String testPrefix)
             throws IOException {
         FileFormat format =
                 new CsvFileFormatFactory().create(new FormatContext(options, 
1024, 1024));
         Path testFile = new Path(parent, testPrefix + "_" + UUID.randomUUID() 
+ ".csv");
 
-        FormatWriterFactory writerFactory = 
format.createWriterFactory(rowType);
+        FormatWriterFactory writerFactory = 
format.createWriterFactory(fullRowType);
         try (PositionOutputStream out = fileIO.newOutputStream(testFile, 
false);
                 FormatWriter writer = writerFactory.create(out, "none")) {
             for (InternalRow row : testData) {
@@ -523,7 +582,7 @@ public class CsvFileFormatTest extends FormatReadWriteTest {
             }
         }
         try (RecordReader<InternalRow> reader =
-                format.createReaderFactory(rowType, rowType, new ArrayList<>())
+                format.createReaderFactory(fullRowType, rowType, new 
ArrayList<>())
                         .createReader(
                                 new FormatReaderContext(
                                         fileIO, testFile, 
fileIO.getFileSize(testFile)))) {

Reply via email to