liubo1022126 opened a new issue #2219:
URL: https://github.com/apache/iceberg/issues/2219


   I try to test flink cdc and flink rewriteDataFiles on iceberg 0.11, when I 
write the msg append (msg is +I,1,aaa,20210128), everything is ok, but when i 
write a row level delete by id (-D,1,20210128), rewriteDataFiles has an error, 
the same to DataStream streaming read.
   
   > Caused by: com.esotericsoftware.kryo.KryoException: 
java.lang.UnsupportedOperationException
   Serialization trace:
   nullValueCounts (org.apache.iceberg.GenericDataFile)
   file (org.apache.iceberg.BaseFileScanTask)
   fileScanTask (org.apache.iceberg.BaseFileScanTask$SplitScanTask)
   tasks (org.apache.iceberg.BaseCombinedScanTask)
   task (org.apache.iceberg.flink.source.FlinkInputSplit)
        at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:378)
        at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:289)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
        at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:205)
        at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
        at 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
        at 
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335)
        at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108)
        at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
        at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:146)
        at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
        at java.lang.Thread.run(Thread.java:748)
   Caused by: java.lang.UnsupportedOperationException
        at java.util.Collections$UnmodifiableMap.put(Collections.java:1457)
        at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:144)
        at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        ... 34 more
   
   
   ### flink cdc code is
       private static Settings.TableBuilder tableBuilder = new 
Settings.TableBuilder();
       private static TableLoader tableLoader = tableBuilder.tableLoader();
       public static TableSchema tableSchema = 
FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(tableBuilder.load().getSchema()));
   
       private static final Map<String, RowKind> ROW_KIND_MAP = ImmutableMap.of(
               "+I", RowKind.INSERT,
               "-D", RowKind.DELETE,
               "-U", RowKind.UPDATE_BEFORE,
               "+U", RowKind.UPDATE_AFTER);
   
       private static StreamExecutionEnvironment env;
   
       public static void main(String[] args) throws Exception {
           env = StreamExecutionEnvironment.getExecutionEnvironment()
                   .enableCheckpointing(60000)
                   .setParallelism(1)
                   .setMaxParallelism(1);
           env.getConfig().enableSysoutLogging();
   
           DataStream<String> dataStream = env.socketTextStream("localhost", 
9999);   //nc -lk 9999
   
           DataStream<Row> rowStream = dataStream
                   .flatMap(new FlatMapFunction<String, Row>() {
                       @Override
                       public void flatMap(String rowContent, Collector<Row> 
collector) throws Exception {
                           try {
                               String[] rowElements = rowContent.split(",");
                               String ops = rowElements[0];
                               Integer id = Integer.parseInt(rowElements[1]);
   
                               if ("+I".equals(ops)) {
                                   collector.collect(row("+I", id, 
rowElements[2], rowElements[3]));
                               } else if ("-D".equals(ops)) {
                                   collector.collect(row("-D", id, "", 
rowElements[2]));
                               } else if ("-U".equals(ops) || "+U".equals(ops)) 
{
                                   collector.collect(row("-D", id, 
rowElements[2], rowElements[3]));
                                   collector.collect(row("+I", id, 
rowElements[2], rowElements[3]));
                               } else {
                                   throw new IllegalArgumentException("cdc type 
unknown: " + ops);
                               }
                           } catch (NumberFormatException e) {
                               e.printStackTrace();
                           }
                       }
                   });
   
           /*
            * +I,1,aaa,20201228
            * -D,1,20201228
            * +I,1,bbb,20201228
            * -U,1,ccc,20201228
            */
           runV2(rowStream);
       }
   
       private static void runV2(DataStream<Row> rowStream) throws Exception {
           DataStream<Row> rowDataStream = rowStream.keyBy(row -> 
Row.of(row.getField(0)));
   
           FlinkSink.forRow(rowDataStream, tableSchema)
                   .tableLoader(tableLoader)
                   .tableSchema(tableSchema)
                   .writeParallelism(1)
                   .equalityFieldColumns(ImmutableList.of("id"))
                   .build();
   
           env.execute("Test Iceberg DataStream");
       }
   
       private static Row row(String rowKind, int id, String data, String pt) {
           RowKind kind = ROW_KIND_MAP.get(rowKind);
           if (kind == null) {
               throw new IllegalArgumentException("Unknown row kind: " + 
rowKind);
           }
   
           return Row.ofKind(kind, id, data, pt);
       }
   
   
   ### flink DataStream streaming read code is
       private static Settings.TableBuilder tableBuilder = new 
Settings.TableBuilder();
       private static TableLoader tableLoader = tableBuilder.tableLoader();
       private static Table table = tableBuilder.load().getTable();
   
       public static void main(String[] args) throws Exception {
           RowType rowType = FlinkSchemaUtil.convert(table.schema());
           DataStructureConverter<Object, Object> converter = 
DataStructureConverters.getConverter(
                   TypeConversions.fromLogicalToDataType(rowType));
   
           StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();
           DataStream<RowData> stream = FlinkSource.forRowData()
                   .env(env)
                   .tableLoader(tableLoader)
                   .streaming(true)
                   .build();
   
           stream.map(new MapFunction<RowData, Row>() {
               @Override
               public Row map(RowData rowData) throws Exception {
                   return (Row) converter.toExternal(rowData);
               }
           }).print();
   
           env.execute("Test Iceberg Read");
       }
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to