This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 05f27d276f [core] Fix that cannot read binlog table with projection 
(#6417)
05f27d276f is described below

commit 05f27d276ff272ed0eea00c19bc2312766b06707
Author: yuzelin <[email protected]>
AuthorDate: Thu Oct 16 21:13:17 2025 +0800

    [core] Fix that cannot read binlog table with projection (#6417)
---
 .../org/apache/paimon/table/system/BinlogTable.java  |  2 +-
 .../org/apache/paimon/flink/CatalogTableITCase.java  | 20 ++++++++++++++++++++
 2 files changed, 21 insertions(+), 1 deletion(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java
index 8488e3ff9b..cc8d1621a3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java
@@ -131,7 +131,7 @@ public class BinlogTable extends AuditLogTable {
                         (row1, row2) ->
                                 new AuditLogRow(
                                         readProjection, convertToArray(row1, 
row2, fieldGetters)),
-                        wrapped.rowType());
+                        this.wrappedReadType);
             } else {
                 return dataRead.createReader(split)
                         .transform(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index bcdf1e2ef6..3d7429177a 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -1172,6 +1172,26 @@ public class CatalogTableITCase extends 
CatalogITCaseBase {
         iterator.close();
     }
 
+    @Test
+    public void testBinlogTableStreamReadWithProjection() throws Exception {
+        sql(
+                "CREATE TABLE T (a INT, b INT, primary key (a) NOT ENFORCED) 
with ('changelog-producer' = 'lookup', "
+                        + "'bucket' = '2')");
+        BlockingIterator<Row, Row> iterator =
+                streamSqlBlockIter(
+                        "SELECT rowkind, a FROM T$binlog /*+ 
OPTIONS('scan.mode' = 'latest') */");
+        sql("INSERT INTO T VALUES (1, 2)");
+        sql("INSERT INTO T VALUES (1, 3)");
+        sql("INSERT INTO T VALUES (2, 2)");
+        List<Row> rows = iterator.collect(3);
+        assertThat(rows)
+                .containsExactly(
+                        Row.of("+I", new Integer[] {1}),
+                        Row.of("+U", new Integer[] {1, 1}),
+                        Row.of("+I", new Integer[] {2}));
+        iterator.close();
+    }
+
     @Test
     public void testBinlogTableBatchRead() throws Exception {
         sql(

Reply via email to