xloya opened a new issue #3119:
URL: https://github.com/apache/iceberg/issues/3119
I write the changelog whose primary key is DATE/TIMESTAMP to the Iceberg
table through FlinkSQL, and then read the data through FlinkSQL and insert it
into another Iceberg table. An exception is thrown in the
`org.apache.iceberg.data.GenericRecord.get()` method when reading.
I found out that it is because the actual type of the DATE/TIMESTAMP data
read from the equality delete data is `LocalDate.class`, but the class of the
type corresponding to the field in the `StructLikeComparator` is
`Integer.class`.
So I think the solution may be to internally convert the data to the Integer
type in the `org.apache.iceberg.data.DeleteFilter.applyEqDeletes()` method, so
as to solve the problem of the mismatch between the actual data type and the
corresponding type of the type.
I have fixed the problem in the personal branch, but the corresponding
single test failed. Can someone help? @rdblue @openinx Could you help me have
a look? Thx!
The exception stack is as follows:
```
java.lang.IllegalStateException: Not an instance of java.lang.Integer:
2021-09-10
at org.apache.iceberg.data.GenericRecord.get(GenericRecord.java:123)
at
org.apache.iceberg.types.Comparators$StructLikeComparator.compare(Comparators.java:122)
at
org.apache.iceberg.types.Comparators$StructLikeComparator.compare(Comparators.java:102)
at
org.apache.iceberg.util.StructLikeWrapper.equals(StructLikeWrapper.java:76)
at java.util.HashMap.putVal(HashMap.java:635)
at java.util.HashMap.put(HashMap.java:612)
at java.util.HashSet.add(HashSet.java:220)
at org.apache.iceberg.util.StructLikeSet.add(StructLikeSet.java:103)
at org.apache.iceberg.util.StructLikeSet.add(StructLikeSet.java:33)
at
org.apache.iceberg.relocated.com.google.common.collect.Iterators.addAll(Iterators.java:356)
at
org.apache.iceberg.relocated.com.google.common.collect.Iterables.addAll(Iterables.java:320)
at org.apache.iceberg.deletes.Deletes.toEqualitySet(Deletes.java:83)
at
org.apache.iceberg.data.DeleteFilter.applyEqDeletes(DeleteFilter.java:137)
at
org.apache.iceberg.data.DeleteFilter.applyEqDeletes(DeleteFilter.java:166)
at org.apache.iceberg.data.DeleteFilter.filter(DeleteFilter.java:112)
at
org.apache.iceberg.flink.source.RowDataIterator.openTaskIterator(RowDataIterator.java:74)
at
org.apache.iceberg.flink.source.DataIterator.updateCurrentIterator(DataIterator.java:102)
at
org.apache.iceberg.flink.source.DataIterator.hasNext(DataIterator.java:84)
at
org.apache.iceberg.flink.source.FlinkInputFormat.reachedEnd(FlinkInputFormat.java:112)
at
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:89)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:138)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:74)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]