MikeTipico opened a new issue, #5683:
URL: https://github.com/apache/hudi/issues/5683

   **Problem Faced**
   
   We currently have a series of pipelines doing the following: 
   A. NiFi
        1. consume avro data from Kafka 
        2. attach schema information (downloaded from registry) 
        3. save to S3 'raw'
   B. Spark Application (structured streaming)
        4. consume from S3 'raw'
        5. do some basic cleaning and schema flattening 
        6. persist to Hudi S3 'data'
   
   For one pipeline we encountered a schema change from source with some of the 
data replayed in the new schema format. This has unfortunately caused our Hudi 
pipeline to break and give the exception 
'org.apache.hudi.exception.HoodieUpsertException: Failed to merge old record 
into new file' (full stack-trace attached). What we found strange in this case 
is that we prepared for the change and were no longer selecting the now dropped 
integer field to be inserted into Hudi. 
   Kindly advice us if something was missed and what can be done in such cases 
to be able to process the data with the new schema irrespective of the way it 
was originally sent and processed. 
        
   Schema change:       [string,integer] -> string
   stack-trace: 
   ```
   Caused by: org.apache.hudi.exception.HoodieException: 
java.util.concurrent.ExecutionException: 
org.apache.hudi.exception.HoodieUpsertException: Failed to merge old record 
into new file for key xyz_topic from old 
                                file 
s3a://data_raw/xyz_topic/calendardate=2022-05-19/6a7a5f41-b43f-47ac-8109-6215c6119ff5-0_0-8956-834609_20220519080508668.parquet
 
   to new       file 
s3a://data_raw/xyz_topic/calendardate=2022-05-19/6a7a5f41-b43f-47ac-8109-6215c6119ff5-0_0-27-385_20220519145322564.parquet
 with writerSchema {
     "type" : "record",
     "name" : "em_dm_v1_topic_record",
     "namespace" : "hoodie.em_dm_v1_topic",
     "fields" : [ {
       "name" : "_hoodie_commit_time",
       "type" : [ "null", "string" ],
       "doc" : "",
       "default" : null
     }, {
       "name" : "_hoodie_commit_seqno",
       "type" : [ "null", "string" ],
       "doc" : "",
       "default" : null
     }, {
       "name" : "_hoodie_record_key",
       "type" : [ "null", "string" ],
       "doc" : "",
       "default" : null
     }, {
       "name" : "_hoodie_partition_path",
       "type" : [ "null", "string" ],
       "doc" : "",
       "default" : null
     }, {
       "name" : "_hoodie_file_name",
       "type" : [ "null", "string" ],
       "doc" : "",
       "default" : null
     }, {
       "name" : "key",
       "type" : [ "null", "string" ],
       "default" : null
     }, {
       "name" : "calendardate",
       "type" : [ "null", {
         "type" : "int",
         "logicalType" : "date"
       } ],
       "default" : null
     }, {
       "name" : "eventtime",
       "type" : [ "null", "long" ],
       "default" : null
     }, {
       "name" : "userid_string",
       "type" : [ "null", "string" ],
       "default" : null
     }, {
       "name" : "userid_integer",
       "type" : "int"
     }]
   }
        at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:147)
        at 
org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:100)
        ... 31 more
   }
        at 
org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:356)
        at 
org.apache.hudi.table.action.commit.AbstractMergeHelper$UpdateHandler.consumeOneRecord(AbstractMergeHelper.java:122)
        at 
org.apache.hudi.table.action.commit.AbstractMergeHelper$UpdateHandler.consumeOneRecord(AbstractMergeHelper.java:112)
        at 
org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:37)
        at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        ... 3 more
   Caused by: java.lang.RuntimeException: Null-value for required field: 
userid_integer
        at 
org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:194)
        at 
org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:165)
        at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
        at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:299)
        at 
org.apache.hudi.io.storage.HoodieParquetWriter.writeAvro(HoodieParquetWriter.java:95)
        at 
org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:351)
        ... 8 more
   
   Driver stacktrace:
   === Streaming Query ===
   Identifier: [dpl-spark-streaming] query for hudi 
'dm_v1_GmUserInfo_GmCoreSelfExclusionMessage' [id = 
3fb941dc-f450-4306-8eab-6af98062c0b6, runId = 
afc85301-938a-4141-9e31-a84bc24094e0]
   Current Committed Offsets: 
{FileStreamSource[s3a://data_raw/xyz_topic/calendardate=2*]: {"logOffset":170}}
   Current Available Offsets: 
{FileStreamSource[s3a://data_raw/xyz_topic/calendardate=2*]: {"logOffset":171}}
   
   Current State: ACTIVE
   Thread State: RUNNABLE
   
   Logical Plan:
   Project [key#74, calendardate#75, eventtime#76L, userid_string#77, 
userid_integer#78, domainid#27, exclusiontype#28, expirydate#29, 
exclusionperiod#30, cooloffreason#31, cooloffdescription#32, 
unsatisfiedreason#33, unsatisfieddescription#34, selfexclusionreason#35, 
messagekey_operationtype#36, messagekey_messageversion#37, 
messagekey_messagetype#38, messagekey_domainid#39, messagekey_sourcename#40, 
messagekey_sourcecategory#41, messagekey_processingdate#42, 
messagekey_streamingdate#43, selfexclusionid#44]
   +- SubqueryAlias t
      +- Project [userid#26 AS key#74, cast(messagekey_streamingdate#43 as 
date) AS calendardate#75, unix_micros(cast(messagekey_processingdate#42 as 
timestamp)) AS eventtime#76L, userid#26 AS userid_string#77, -1 AS 
userid_integer#78, domainid#27, exclusiontype#28, expirydate#29, 
exclusionperiod#30, cooloffreason#31, cooloffdescription#32, 
unsatisfiedreason#33, unsatisfieddescription#34, selfexclusionreason#35, 
messagekey_operationtype#36, messagekey_messageversion#37, 
messagekey_messagetype#38, messagekey_domainid#39, messagekey_sourcename#40, 
messagekey_sourcecategory#41, messagekey_processingdate#42, 
messagekey_streamingdate#43, selfexclusionid#44]
         +- SubqueryAlias view_dm_v1_gmuserinfo_gmcoreselfexclusionmessage
            +- Project [userid#0 AS userid#26, domainid#1 AS domainid#27, 
exclusiontype#2 AS exclusiontype#28, expirydate#3 AS expirydate#29, 
exclusionperiod#4 AS exclusionperiod#30, cooloffreason#5 AS cooloffreason#31, 
cooloffdescription#6 AS cooloffdescription#32, unsatisfiedreason#7 AS 
unsatisfiedreason#33, unsatisfieddescription#8 AS unsatisfieddescription#34, 
selfexclusionreason#9 AS selfexclusionreason#35, messagekey#10.operationtype AS 
messagekey_operationtype#36, messagekey#10.messageversion AS 
messagekey_messageversion#37, messagekey#10.messagetype AS 
messagekey_messagetype#38, messagekey#10.domainid AS messagekey_domainid#39, 
messagekey#10.sourcename AS messagekey_sourcename#40, 
messagekey#10.sourcecategory AS messagekey_sourcecategory#41, 
messagekey#10.processingdate AS messagekey_processingdate#42, 
messagekey#10.streamingdate AS messagekey_streamingdate#43, selfexclusionid#11 
AS selfexclusionid#44, username#12 AS username#45]
               +- StreamingExecutionRelation 
FileStreamSource[s3a://data_raw/xyz_topic/calendardate=2*], [UserID#0, 
DomainID#1, ExclusionType#2, ExpiryDate#3, ExclusionPeriod#4, CoolOffReason#5, 
CoolOffDescription#6, UnsatisfiedReason#7, UnsatisfiedDescription#8, 
SelfExclusionReason#9, MessageKey#10, SelfExclusionID#11, Username#12]
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:354)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
   Caused by: org.apache.spark.SparkException: Job aborted due to stage 
failure: Task 0 in stage 27.0 failed 11 times, most recent failure: Lost task 
0.10 in stage 27.0 (TID 385) (ip-10-102-9-167.eu-central-1.compute.internal 
executor 1): org.apache.hudi.exception.HoodieUpsertException: Error upserting 
bucketType UPDATE for partition :0
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:320)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$execute$ecf5068c$1(BaseSparkCommitActionExecutor.java:172)
        at 
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
        at 
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)
        at 
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1440)
        at 
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
        at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
        at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to