This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f6384b993 [core] Fix computed column and projection of BinlogTable 
(#5566)
0f6384b993 is described below

commit 0f6384b99369685f1c8f17d27b0e9f7d91d578d7
Author: yuzelin <[email protected]>
AuthorDate: Wed May 7 19:04:06 2025 +0800

    [core] Fix computed column and projection of BinlogTable (#5566)
---
 .../apache/paimon/table/system/BinlogTable.java    | 10 ++++++++--
 .../apache/paimon/flink/SystemCatalogTable.java    | 22 ++++++++++++---------
 .../apache/paimon/flink/BatchFileStoreITCase.java  | 23 ++++++++++++++++++++++
 .../paimon/spark/sql/PaimonSystemTableTest.scala   | 10 ++++++++++
 4 files changed, 54 insertions(+), 11 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java
index eafd37f1d7..a023028bd1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java
@@ -90,6 +90,8 @@ public class BinlogTable extends AuditLogTable {
 
     private class BinlogRead extends AuditLogRead {
 
+        private RowType wrappedReadType = wrapped.rowType();
+
         private BinlogRead(InnerTableRead dataRead) {
             super(dataRead);
         }
@@ -97,20 +99,24 @@ public class BinlogTable extends AuditLogTable {
         @Override
         public InnerTableRead withReadType(RowType readType) {
             List<DataField> fields = new ArrayList<>();
+            List<DataField> wrappedReadFields = new ArrayList<>();
             for (DataField field : readType.getFields()) {
                 if (field.name().equals(SpecialFields.ROW_KIND.name())) {
                     fields.add(field);
                 } else {
-                    fields.add(field.newType(((ArrayType) 
field.type()).getElementType()));
+                    DataField origin = field.newType(((ArrayType) 
field.type()).getElementType());
+                    fields.add(origin);
+                    wrappedReadFields.add(origin);
                 }
             }
+            this.wrappedReadType = 
this.wrappedReadType.copy(wrappedReadFields);
             return super.withReadType(readType.copy(fields));
         }
 
         @Override
         public RecordReader<InternalRow> createReader(Split split) throws 
IOException {
             DataSplit dataSplit = (DataSplit) split;
-            InternalRow.FieldGetter[] fieldGetters = 
wrapped.rowType().fieldGetters();
+            InternalRow.FieldGetter[] fieldGetters = 
wrappedReadType.fieldGetters();
 
             if (dataSplit.isStreaming()) {
                 return new PackChangelogReader(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/SystemCatalogTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/SystemCatalogTable.java
index 7fe26e5607..5878b25ce7 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/SystemCatalogTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/SystemCatalogTable.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink;
 
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.system.AuditLogTable;
+import org.apache.paimon.table.system.BinlogTable;
 
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.catalog.CatalogTable;
@@ -67,15 +68,18 @@ public class SystemCatalogTable implements CatalogTable {
                 deserializeWatermarkSpec(newOptions, builder);
             }
 
-            // add non-physical columns
-            List<String> physicalColumns = table.rowType().getFieldNames();
-            int columnCount =
-                    physicalColumns.size() + 
nonPhysicalColumnsCount(newOptions, physicalColumns);
-            for (int i = 0; i < columnCount; i++) {
-                String optionalName = newOptions.get(compoundKey(SCHEMA, i, 
NAME));
-                if (optionalName != null && 
!physicalColumns.contains(optionalName)) {
-                    // build non-physical column from options
-                    deserializeNonPhysicalColumn(newOptions, i, builder);
+            if (!(table instanceof BinlogTable)) {
+                // add non-physical columns
+                List<String> physicalColumns = table.rowType().getFieldNames();
+                int columnCount =
+                        physicalColumns.size()
+                                + nonPhysicalColumnsCount(newOptions, 
physicalColumns);
+                for (int i = 0; i < columnCount; i++) {
+                    String optionalName = newOptions.get(compoundKey(SCHEMA, 
i, NAME));
+                    if (optionalName != null && 
!physicalColumns.contains(optionalName)) {
+                        // build non-physical column from options
+                        deserializeNonPhysicalColumn(newOptions, i, builder);
+                    }
                 }
             }
             return builder.build();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 144a2d5a8f..2b846b68bc 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -792,4 +792,27 @@ public class BatchFileStoreITCase extends 
CatalogITCaseBase {
         assertThat(sql("SELECT * FROM `test_table$audit_log`"))
                 .containsExactly(Row.of("+I", 1, 1, 2));
     }
+
+    @Test
+    public void testBinlogTableWithComputedColumn() {
+        sql("CREATE TABLE test_table (a int, b int, c AS a + b);");
+        String ddl = sql("SHOW CREATE TABLE 
`test_table$binlog`").get(0).getFieldAs(0);
+        assertThat(ddl).doesNotContain("`c` AS `a` + `b`");
+
+        sql("INSERT INTO test_table VALUES (1, 1)");
+        assertThat(sql("SELECT * FROM `test_table$binlog`"))
+                .containsExactly(Row.of("+I", new Integer[] {1}, new Integer[] 
{1}));
+    }
+
+    @Test
+    public void testBinlogTableWithProjection() {
+        sql("CREATE TABLE test_table (a int, b string);");
+        sql("INSERT INTO test_table VALUES (1, 'A')");
+        assertThat(sql("SELECT * FROM `test_table$binlog`"))
+                .containsExactly(Row.of("+I", new Integer[] {1}, new String[] 
{"A"}));
+        assertThat(sql("SELECT b FROM `test_table$binlog`"))
+                .containsExactly(Row.of((Object) new String[] {"A"}));
+        assertThat(sql("SELECT rowkind, b FROM `test_table$binlog`"))
+                .containsExactly(Row.of("+I", new String[] {"A"}));
+    }
 }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonSystemTableTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonSystemTableTest.scala
index 0c645191f6..de9a4b040b 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonSystemTableTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonSystemTableTest.scala
@@ -114,5 +114,15 @@ class PaimonSystemTableTest extends PaimonSparkTestBase {
       sql("SELECT * FROM `T$binlog`"),
       Seq(Row("+I", Array(1), Array(3)), Row("+I", Array(2), Array(2)))
     )
+
+    checkAnswer(
+      sql("SELECT b FROM `T$binlog`"),
+      Seq(Row(Array(3)), Row(Array(2)))
+    )
+
+    checkAnswer(
+      sql("SELECT rowkind, b FROM `T$binlog`"),
+      Seq(Row("+I", Array(3)), Row("+I", Array(2)))
+    )
   }
 }

Reply via email to