This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.3 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 05ad2a058aa12364069aa420c865ab9a9943ebd1 Author: yuzelin <[email protected]> AuthorDate: Thu Oct 16 21:13:17 2025 +0800 [core] Fix that cannot read binlog table with projection (#6417) --- .../org/apache/paimon/table/system/BinlogTable.java | 2 +- .../org/apache/paimon/flink/CatalogTableITCase.java | 20 ++++++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java index 8488e3ff9b..cc8d1621a3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java @@ -131,7 +131,7 @@ public class BinlogTable extends AuditLogTable { (row1, row2) -> new AuditLogRow( readProjection, convertToArray(row1, row2, fieldGetters)), - wrapped.rowType()); + this.wrappedReadType); } else { return dataRead.createReader(split) .transform( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java index bcdf1e2ef6..3d7429177a 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java @@ -1172,6 +1172,26 @@ public class CatalogTableITCase extends CatalogITCaseBase { iterator.close(); } + @Test + public void testBinlogTableStreamReadWithProjection() throws Exception { + sql( + "CREATE TABLE T (a INT, b INT, primary key (a) NOT ENFORCED) with ('changelog-producer' = 'lookup', " + + "'bucket' = '2')"); + BlockingIterator<Row, Row> iterator = + streamSqlBlockIter( + "SELECT rowkind, a FROM T$binlog /*+ OPTIONS('scan.mode' = 'latest') */"); + sql("INSERT INTO T VALUES (1, 2)"); + sql("INSERT INTO T VALUES (1, 3)"); + sql("INSERT INTO T VALUES (2, 2)"); + List<Row> rows = iterator.collect(3); + assertThat(rows) + .containsExactly( + Row.of("+I", new Integer[] {1}), + Row.of("+U", new Integer[] {1, 1}), + Row.of("+I", new Integer[] {2})); + iterator.close(); + } + @Test public void testBinlogTableBatchRead() throws Exception { sql(
