MikeTipico opened a new issue, #5683:
URL: https://github.com/apache/hudi/issues/5683
**Problem Faced**
We currently have a series of pipelines doing the following:
A. NiFi
1. consume avro data from Kafka
2. attach schema information (downloaded from registry)
3. save to S3 'raw'
B. Spark Application (structured streaming)
4. consume from S3 'raw'
5. do some basic cleaning and schema flattening
6. persist to Hudi S3 'data'
For one pipeline we encountered a schema change from source with some of the
data replayed in the new schema format. This has unfortunately caused our Hudi
pipeline to break and give the exception
'org.apache.hudi.exception.HoodieUpsertException: Failed to merge old record
into new file' (full stack-trace attached). What we found strange in this case
is that we prepared for the change and were no longer selecting the now dropped
integer field to be inserted into Hudi.
Kindly advice us if something was missed and what can be done in such cases
to be able to process the data with the new schema irrespective of the way it
was originally sent and processed.
Schema change: [string,integer] -> string
stack-trace:
```
Caused by: org.apache.hudi.exception.HoodieException:
java.util.concurrent.ExecutionException:
org.apache.hudi.exception.HoodieUpsertException: Failed to merge old record
into new file for key xyz_topic from old
file
s3a://data_raw/xyz_topic/calendardate=2022-05-19/6a7a5f41-b43f-47ac-8109-6215c6119ff5-0_0-8956-834609_20220519080508668.parquet
to new file
s3a://data_raw/xyz_topic/calendardate=2022-05-19/6a7a5f41-b43f-47ac-8109-6215c6119ff5-0_0-27-385_20220519145322564.parquet
with writerSchema {
"type" : "record",
"name" : "em_dm_v1_topic_record",
"namespace" : "hoodie.em_dm_v1_topic",
"fields" : [ {
"name" : "_hoodie_commit_time",
"type" : [ "null", "string" ],
"doc" : "",
"default" : null
}, {
"name" : "_hoodie_commit_seqno",
"type" : [ "null", "string" ],
"doc" : "",
"default" : null
}, {
"name" : "_hoodie_record_key",
"type" : [ "null", "string" ],
"doc" : "",
"default" : null
}, {
"name" : "_hoodie_partition_path",
"type" : [ "null", "string" ],
"doc" : "",
"default" : null
}, {
"name" : "_hoodie_file_name",
"type" : [ "null", "string" ],
"doc" : "",
"default" : null
}, {
"name" : "key",
"type" : [ "null", "string" ],
"default" : null
}, {
"name" : "calendardate",
"type" : [ "null", {
"type" : "int",
"logicalType" : "date"
} ],
"default" : null
}, {
"name" : "eventtime",
"type" : [ "null", "long" ],
"default" : null
}, {
"name" : "userid_string",
"type" : [ "null", "string" ],
"default" : null
}, {
"name" : "userid_integer",
"type" : "int"
}]
}
at
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:147)
at
org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:100)
... 31 more
}
at
org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:356)
at
org.apache.hudi.table.action.commit.AbstractMergeHelper$UpdateHandler.consumeOneRecord(AbstractMergeHelper.java:122)
at
org.apache.hudi.table.action.commit.AbstractMergeHelper$UpdateHandler.consumeOneRecord(AbstractMergeHelper.java:112)
at
org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:37)
at
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
... 3 more
Caused by: java.lang.RuntimeException: Null-value for required field:
userid_integer
at
org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:194)
at
org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:165)
at
org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:299)
at
org.apache.hudi.io.storage.HoodieParquetWriter.writeAvro(HoodieParquetWriter.java:95)
at
org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:351)
... 8 more
Driver stacktrace:
=== Streaming Query ===
Identifier: [dpl-spark-streaming] query for hudi
'dm_v1_GmUserInfo_GmCoreSelfExclusionMessage' [id =
3fb941dc-f450-4306-8eab-6af98062c0b6, runId =
afc85301-938a-4141-9e31-a84bc24094e0]
Current Committed Offsets:
{FileStreamSource[s3a://data_raw/xyz_topic/calendardate=2*]: {"logOffset":170}}
Current Available Offsets:
{FileStreamSource[s3a://data_raw/xyz_topic/calendardate=2*]: {"logOffset":171}}
Current State: ACTIVE
Thread State: RUNNABLE
Logical Plan:
Project [key#74, calendardate#75, eventtime#76L, userid_string#77,
userid_integer#78, domainid#27, exclusiontype#28, expirydate#29,
exclusionperiod#30, cooloffreason#31, cooloffdescription#32,
unsatisfiedreason#33, unsatisfieddescription#34, selfexclusionreason#35,
messagekey_operationtype#36, messagekey_messageversion#37,
messagekey_messagetype#38, messagekey_domainid#39, messagekey_sourcename#40,
messagekey_sourcecategory#41, messagekey_processingdate#42,
messagekey_streamingdate#43, selfexclusionid#44]
+- SubqueryAlias t
+- Project [userid#26 AS key#74, cast(messagekey_streamingdate#43 as
date) AS calendardate#75, unix_micros(cast(messagekey_processingdate#42 as
timestamp)) AS eventtime#76L, userid#26 AS userid_string#77, -1 AS
userid_integer#78, domainid#27, exclusiontype#28, expirydate#29,
exclusionperiod#30, cooloffreason#31, cooloffdescription#32,
unsatisfiedreason#33, unsatisfieddescription#34, selfexclusionreason#35,
messagekey_operationtype#36, messagekey_messageversion#37,
messagekey_messagetype#38, messagekey_domainid#39, messagekey_sourcename#40,
messagekey_sourcecategory#41, messagekey_processingdate#42,
messagekey_streamingdate#43, selfexclusionid#44]
+- SubqueryAlias view_dm_v1_gmuserinfo_gmcoreselfexclusionmessage
+- Project [userid#0 AS userid#26, domainid#1 AS domainid#27,
exclusiontype#2 AS exclusiontype#28, expirydate#3 AS expirydate#29,
exclusionperiod#4 AS exclusionperiod#30, cooloffreason#5 AS cooloffreason#31,
cooloffdescription#6 AS cooloffdescription#32, unsatisfiedreason#7 AS
unsatisfiedreason#33, unsatisfieddescription#8 AS unsatisfieddescription#34,
selfexclusionreason#9 AS selfexclusionreason#35, messagekey#10.operationtype AS
messagekey_operationtype#36, messagekey#10.messageversion AS
messagekey_messageversion#37, messagekey#10.messagetype AS
messagekey_messagetype#38, messagekey#10.domainid AS messagekey_domainid#39,
messagekey#10.sourcename AS messagekey_sourcename#40,
messagekey#10.sourcecategory AS messagekey_sourcecategory#41,
messagekey#10.processingdate AS messagekey_processingdate#42,
messagekey#10.streamingdate AS messagekey_streamingdate#43, selfexclusionid#11
AS selfexclusionid#44, username#12 AS username#45]
+- StreamingExecutionRelation
FileStreamSource[s3a://data_raw/xyz_topic/calendardate=2*], [UserID#0,
DomainID#1, ExclusionType#2, ExpiryDate#3, ExclusionPeriod#4, CoolOffReason#5,
CoolOffDescription#6, UnsatisfiedReason#7, UnsatisfiedDescription#8,
SelfExclusionReason#9, MessageKey#10, SelfExclusionID#11, Username#12]
at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:354)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
Caused by: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 0 in stage 27.0 failed 11 times, most recent failure: Lost task
0.10 in stage 27.0 (TID 385) (ip-10-102-9-167.eu-central-1.compute.internal
executor 1): org.apache.hudi.exception.HoodieUpsertException: Error upserting
bucketType UPDATE for partition :0
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:320)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$execute$ecf5068c$1(BaseSparkCommitActionExecutor.java:172)
at
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
at
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)
at
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1440)
at
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
at
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]