This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 05f27d276f [core] Fix that cannot read binlog table with projection
(#6417)
05f27d276f is described below
commit 05f27d276ff272ed0eea00c19bc2312766b06707
Author: yuzelin <[email protected]>
AuthorDate: Thu Oct 16 21:13:17 2025 +0800
[core] Fix that cannot read binlog table with projection (#6417)
---
.../org/apache/paimon/table/system/BinlogTable.java | 2 +-
.../org/apache/paimon/flink/CatalogTableITCase.java | 20 ++++++++++++++++++++
2 files changed, 21 insertions(+), 1 deletion(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java
index 8488e3ff9b..cc8d1621a3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java
@@ -131,7 +131,7 @@ public class BinlogTable extends AuditLogTable {
(row1, row2) ->
new AuditLogRow(
readProjection, convertToArray(row1,
row2, fieldGetters)),
- wrapped.rowType());
+ this.wrappedReadType);
} else {
return dataRead.createReader(split)
.transform(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index bcdf1e2ef6..3d7429177a 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -1172,6 +1172,26 @@ public class CatalogTableITCase extends
CatalogITCaseBase {
iterator.close();
}
+ @Test
+ public void testBinlogTableStreamReadWithProjection() throws Exception {
+ sql(
+ "CREATE TABLE T (a INT, b INT, primary key (a) NOT ENFORCED)
with ('changelog-producer' = 'lookup', "
+ + "'bucket' = '2')");
+ BlockingIterator<Row, Row> iterator =
+ streamSqlBlockIter(
+ "SELECT rowkind, a FROM T$binlog /*+
OPTIONS('scan.mode' = 'latest') */");
+ sql("INSERT INTO T VALUES (1, 2)");
+ sql("INSERT INTO T VALUES (1, 3)");
+ sql("INSERT INTO T VALUES (2, 2)");
+ List<Row> rows = iterator.collect(3);
+ assertThat(rows)
+ .containsExactly(
+ Row.of("+I", new Integer[] {1}),
+ Row.of("+U", new Integer[] {1, 1}),
+ Row.of("+I", new Integer[] {2}));
+ iterator.close();
+ }
+
@Test
public void testBinlogTableBatchRead() throws Exception {
sql(