This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 0a07ebc315 [spark] Fix the build of read type in binlog table (#4689)
0a07ebc315 is described below
commit 0a07ebc3157da23c91428f80667b2a722c5f6da5
Author: Zouxxyy <[email protected]>
AuthorDate: Thu Dec 12 09:09:34 2024 +0800
[spark] Fix the build of read type in binlog table (#4689)
---
.../apache/paimon/table/system/BinlogTable.java | 22 +++++++++++++++-------
.../paimon/spark/sql/PaimonSystemTableTest.scala | 16 ++++++++++++++++
2 files changed, 31 insertions(+), 7 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java
index b17d61d44e..08eea468ea 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java
@@ -72,13 +72,8 @@ public class BinlogTable extends AuditLogTable {
List<DataField> fields = new ArrayList<>();
fields.add(SpecialFields.ROW_KIND);
for (DataField field : wrapped.rowType().getFields()) {
- DataField newField =
- new DataField(
- field.id(),
- field.name(),
- new ArrayType(field.type().nullable()), // convert
to nullable
- field.description());
- fields.add(newField);
+ // convert to nullable
+ fields.add(field.newType(new ArrayType(field.type().nullable())));
}
return new RowType(fields);
}
@@ -99,6 +94,19 @@ public class BinlogTable extends AuditLogTable {
super(dataRead);
}
+ @Override
+ public InnerTableRead withReadType(RowType readType) {
+ List<DataField> fields = new ArrayList<>();
+ for (DataField field : readType.getFields()) {
+ if (field.name().equals(SpecialFields.ROW_KIND.name())) {
+ fields.add(field);
+ } else {
+ fields.add(field.newType(((ArrayType)
field.type()).getElementType()));
+ }
+ }
+ return super.withReadType(readType.copy(fields));
+ }
+
@Override
public RecordReader<InternalRow> createReader(Split split) throws
IOException {
DataSplit dataSplit = (DataSplit) split;
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonSystemTableTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonSystemTableTest.scala
index 64baf6232f..7baa57a54d 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonSystemTableTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonSystemTableTest.scala
@@ -81,4 +81,20 @@ class PaimonSystemTableTest extends PaimonSparkTestBase {
spark.sql("select partition,bucket from `T$buckets`"),
Row("[2024-10-10, 01]", 0) :: Row("[2024-10-10, 01]", 1) ::
Row("[2024-10-10, 01]", 2) :: Nil)
}
+
+ test("system table: binlog table") {
+ sql("""
+ |CREATE TABLE T (a INT, b INT)
+ |TBLPROPERTIES ('primary-key'='a', 'changelog-producer' = 'lookup',
'bucket' = '2')
+ |""".stripMargin)
+
+ sql("INSERT INTO T VALUES (1, 2)")
+ sql("INSERT INTO T VALUES (1, 3)")
+ sql("INSERT INTO T VALUES (2, 2)")
+
+ checkAnswer(
+ sql("SELECT * FROM `T$binlog`"),
+ Seq(Row("+I", Array(1), Array(3)), Row("+I", Array(2), Array(2)))
+ )
+ }
}