openinx opened a new issue #2730:
URL: https://github.com/apache/iceberg/issues/2730


   Saying if we write few change log records ( INSERT,  DELETE, UPDATE_BEFORE, 
UPDATE_AFTER ) into apache iceberg table and then query it by using flink SQL, 
then we will encounter the exception: 
   
   ```txt
   [flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source 
-> SinkConversionToTuple2 -> Sink: Select table sink (1/1) 
(03d66a7324d07ea31972ce3cc8d3f1df) switched from RUNNING to FAILED on 
8e8e9f3c-67a5-4aaa-9cc8-18400ceee4a3 @ localhost (dataPort=-1).
   java.lang.IllegalArgumentException: Row arity: 4, but serializer arity: 2
        at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:124)
        at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
        at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:317)
        at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:411)
        at 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:92)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)
   ```
   
   The way to reproduce this bug is quite easy ,  please just apply the 
following patch, and then run the unit tests  
`org.apache.iceberg.flink.TestChangeLogTable.testSqlChangeLogOnIdKey`
   
   ```patch
   diff --git 
a/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java 
b/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java
   index d44f45ab5..f4334b43c 100644
   --- a/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java
   +++ b/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java
   @@ -273,6 +273,8 @@ public class TestChangeLogTable extends 
ChangeLogTableTestBase {
        Table table = createTable(tableName, key, partitioned);
        sql("INSERT INTO %s SELECT * FROM %s", tableName, SOURCE_TABLE);
    
   +    sql("SELECT * FROM %s", tableName);
   +
        table.refresh();
        List<Snapshot> snapshots = findValidSnapshots(table);
        int expectedSnapshotNum = expectedRecordsPerCheckpoint.size();
   ```
   
   I guess other query engines such as spark sql , presto sql, hive sql will 
also encounter this issue if we've write few change log events in the iceberg 
table.
   
   I remember there's another issue also encountered this exception ( I 
encountered it many times in our asia user group). Will prepare a  pull request 
for this.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to