openinx opened a new issue #2730:
URL: https://github.com/apache/iceberg/issues/2730
Saying if we write few change log records ( INSERT, DELETE, UPDATE_BEFORE,
UPDATE_AFTER ) into apache iceberg table and then query it by using flink SQL,
then we will encounter the exception:
```txt
[flink-akka.actor.default-dispatcher-2] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source
-> SinkConversionToTuple2 -> Sink: Select table sink (1/1)
(03d66a7324d07ea31972ce3cc8d3f1df) switched from RUNNING to FAILED on
8e8e9f3c-67a5-4aaa-9cc8-18400ceee4a3 @ localhost (dataPort=-1).
java.lang.IllegalArgumentException: Row arity: 4, but serializer arity: 2
at
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:124)
at
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:317)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:411)
at
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:92)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)
```
The way to reproduce this bug is quite easy , please just apply the
following patch, and then run the unit tests
`org.apache.iceberg.flink.TestChangeLogTable.testSqlChangeLogOnIdKey`
```patch
diff --git
a/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java
b/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java
index d44f45ab5..f4334b43c 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java
@@ -273,6 +273,8 @@ public class TestChangeLogTable extends
ChangeLogTableTestBase {
Table table = createTable(tableName, key, partitioned);
sql("INSERT INTO %s SELECT * FROM %s", tableName, SOURCE_TABLE);
+ sql("SELECT * FROM %s", tableName);
+
table.refresh();
List<Snapshot> snapshots = findValidSnapshots(table);
int expectedSnapshotNum = expectedRecordsPerCheckpoint.size();
```
I guess other query engines such as spark sql , presto sql, hive sql will
also encounter this issue if we've write few change log events in the iceberg
table.
I remember there's another issue also encountered this exception ( I
encountered it many times in our asia user group). Will prepare a pull request
for this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]