Toroidals commented on issue #11694:
URL: https://github.com/apache/hudi/issues/11694#issuecomment-2254148431

   > @Toroidals Thanks for the reporting, it would be very helpful if you can 
also paste the detailed error stack trace.
   
   java.io.IOException: Failed to deserialize consumer record due to
           at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56)
 ~[hand_flink_kafka_to_hudi-1.0-SNAPSHOT.jar:?]
           at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33)
 ~[hand_flink_kafka_to_hudi-1.0-SNAPSHOT.jar:?]
           at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)
 ~[flink-connector-base-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) 
~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
 ~[flink-dist-1.15.2.jar:1.15.2]
           at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 
~[flink-dist-1.15.2.jar:1.15.2]
           at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) 
~[flink-dist-1.15.2.jar:1.15.2]
           at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) 
~[flink-dist-1.15.2.jar:1.15.2]
           at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_211]
   Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
           at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:77)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67)
 ~[hand_flink_kafka_to_hudi-1.0-SNAPSHOT.jar:?]
           at 
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84)
 ~[flink-core-1.15.2.jar:1.15.2]
           at 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51)
 ~[hand_flink_kafka_to_hudi-1.0-SNAPSHOT.jar:?]
           at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
 ~[hand_flink_kafka_to_hudi-1.0-SNAPSHOT.jar:?]
           ... 14 more
   Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
           at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:77)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67)
 ~[hand_flink_kafka_to_hudi-1.0-SNAPSHOT.jar:?]
           at 
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84)
 ~[flink-core-1.15.2.jar:1.15.2]
           at 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51)
 ~[hand_flink_kafka_to_hudi-1.0-SNAPSHOT.jar:?]
           at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
 ~[hand_flink_kafka_to_hudi-1.0-SNAPSHOT.jar:?]
           ... 14 more
   Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
           at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:77)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67)
 ~[hand_flink_kafka_to_hudi-1.0-SNAPSHOT.jar:?]
           at 
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84)
 ~[flink-core-1.15.2.jar:1.15.2]
           at 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51)
 ~[hand_flink_kafka_to_hudi-1.0-SNAPSHOT.jar:?]
           at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
 ~[hand_flink_kafka_to_hudi-1.0-SNAPSHOT.jar:?]
           ... 14 more
   Caused by: java.lang.reflect.InvocationTargetException
           at sun.reflect.GeneratedConstructorAccessor81.newInstance(Unknown 
Source) ~[?:?]
           at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 ~[?:1.8.0_211]
           at java.lang.reflect.Constructor.newInstance(Constructor.java:423) 
~[?:1.8.0_211]
           at 
org.apache.hudi.sink.utils.PayloadCreation.createPayload(PayloadCreation.java:81)
 ~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
           at 
org.apache.hudi.sink.transform.RowDataToHoodieFunction.toHoodieRecord(RowDataToHoodieFunction.java:113)
 ~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
           at 
org.apache.hudi.sink.transform.RowDataToHoodieFunction.map(RowDataToHoodieFunction.java:98)
 ~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
           at 
org.apache.hudi.sink.transform.RowDataToHoodieFunctionWithRateLimit.map(RowDataToHoodieFunctionWithRateLimit.java:61)
 ~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
           at 
org.apache.hudi.sink.transform.RowDataToHoodieFunctionWithRateLimit.map(RowDataToHoodieFunctionWithRateLimit.java:34)
 ~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
           at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:77)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67)
 ~[hand_flink_kafka_to_hudi-1.0-SNAPSHOT.jar:?]
           at 
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84)
 ~[flink-core-1.15.2.jar:1.15.2]
           at 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51)
 ~[hand_flink_kafka_to_hudi-1.0-SNAPSHOT.jar:?]
           at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
 ~[hand_flink_kafka_to_hudi-1.0-SNAPSHOT.jar:?]
           ... 14 more
   Caused by: java.lang.NullPointerException: null value for (non-nullable) 
string at hudi_user_cdc_record.role_id
           at 
org.apache.hudi.org.apache.avro.path.TracingNullPointException.summarize(TracingNullPointException.java:88)
 ~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
           at 
org.apache.hudi.org.apache.avro.path.TracingNullPointException.summarize(TracingNullPointException.java:30)
 ~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
           at 
org.apache.hudi.org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:84)
 ~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
           at 
org.apache.hudi.avro.HoodieAvroUtils.indexedRecordToBytes(HoodieAvroUtils.java:184)
 ~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
           at 
org.apache.hudi.avro.HoodieAvroUtils.avroToBytes(HoodieAvroUtils.java:176) 
~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
           at 
org.apache.hudi.common.model.BaseAvroPayload.<init>(BaseAvroPayload.java:53) 
~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
           at 
org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.<init>(OverwriteWithLatestAvroPayload.java:44)
 ~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
           at 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.<init>(DefaultHoodieRecordPayload.java:53)
 ~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
           at 
org.apache.hudi.common.model.EventTimeAvroPayload.<init>(EventTimeAvroPayload.java:40)
 ~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
           at sun.reflect.GeneratedConstructorAccessor81.newInstance(Unknown 
Source) ~[?:?]
           at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 ~[?:1.8.0_211]
           at java.lang.reflect.Constructor.newInstance(Constructor.java:423) 
~[?:1.8.0_211]
           at 
org.apache.hudi.sink.utils.PayloadCreation.createPayload(PayloadCreation.java:81)
 ~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
           at 
org.apache.hudi.sink.transform.RowDataToHoodieFunction.toHoodieRecord(RowDataToHoodieFunction.java:113)
 ~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
           at 
org.apache.hudi.sink.transform.RowDataToHoodieFunction.map(RowDataToHoodieFunction.java:98)
 ~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
           at 
org.apache.hudi.sink.transform.RowDataToHoodieFunctionWithRateLimit.map(RowDataToHoodieFunctionWithRateLimit.java:61)
 ~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
           at 
org.apache.hudi.sink.transform.RowDataToHoodieFunctionWithRateLimit.map(RowDataToHoodieFunctionWithRateLimit.java:34)
 ~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
           at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:77)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67)
 ~[hand_flink_kafka_to_hudi-1.0-SNAPSHOT.jar:?]
           at 
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84)
 ~[flink-core-1.15.2.jar:1.15.2]
           at 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51)
 ~[hand_flink_kafka_to_hudi-1.0-SNAPSHOT.jar:?]
           at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
 ~[hand_flink_kafka_to_hudi-1.0-SNAPSHOT.jar:?]
           ... 14 more
   Caused by: java.lang.NullPointerException
           at 
org.apache.hudi.org.apache.avro.io.Encoder.writeString(Encoder.java:130) 
~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
           at 
org.apache.hudi.org.apache.avro.generic.GenericDatumWriter.writeString(GenericDatumWriter.java:392)
 ~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
           at 
org.apache.hudi.org.apache.avro.generic.GenericDatumWriter.writeString(GenericDatumWriter.java:384)
 ~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
           at 
org.apache.hudi.org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:165)
 ~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
           at 
org.apache.hudi.org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:95)
 ~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
           at 
org.apache.hudi.org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:245)
 ~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
           at 
org.apache.hudi.org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:234)
 ~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
           at 
org.apache.hudi.org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:145)
 ~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
           at 
org.apache.hudi.org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:95)
 ~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
           at 
org.apache.hudi.org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
 ~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
           at 
org.apache.hudi.avro.HoodieAvroUtils.indexedRecordToBytes(HoodieAvroUtils.java:184)
 ~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
           at 
org.apache.hudi.avro.HoodieAvroUtils.avroToBytes(HoodieAvroUtils.java:176) 
~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
           at 
org.apache.hudi.common.model.BaseAvroPayload.<init>(BaseAvroPayload.java:53) 
~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
           at 
org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.<init>(OverwriteWithLatestAvroPayload.java:44)
 ~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
           at 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.<init>(DefaultHoodieRecordPayload.java:53)
 ~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
           at 
org.apache.hudi.common.model.EventTimeAvroPayload.<init>(EventTimeAvroPayload.java:40)
 ~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
           at sun.reflect.GeneratedConstructorAccessor81.newInstance(Unknown 
Source) ~[?:?]
           at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 ~[?:1.8.0_211]
           at java.lang.reflect.Constructor.newInstance(Constructor.java:423) 
~[?:1.8.0_211]
           at 
org.apache.hudi.sink.utils.PayloadCreation.createPayload(PayloadCreation.java:81)
 ~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
           at 
org.apache.hudi.sink.transform.RowDataToHoodieFunction.toHoodieRecord(RowDataToHoodieFunction.java:113)
 ~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
           at 
org.apache.hudi.sink.transform.RowDataToHoodieFunction.map(RowDataToHoodieFunction.java:98)
 ~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
           at 
org.apache.hudi.sink.transform.RowDataToHoodieFunctionWithRateLimit.map(RowDataToHoodieFunctionWithRateLimit.java:61)
 ~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
           at 
org.apache.hudi.sink.transform.RowDataToHoodieFunctionWithRateLimit.map(RowDataToHoodieFunctionWithRateLimit.java:34)
 ~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
           at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:77)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67)
 ~[hand_flink_kafka_to_hudi-1.0-SNAPSHOT.jar:?]
           at 
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84)
 ~[flink-core-1.15.2.jar:1.15.2]
           at 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51)
 ~[hand_flink_kafka_to_hudi-1.0-SNAPSHOT.jar:?]
           at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
 ~[hand_flink_kafka_to_hudi-1.0-SNAPSHOT.jar:?]
           ... 14 more


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to