sbernauer opened a new issue #3841:
URL: https://github.com/apache/hudi/issues/3841


   Hi Hudi-Team, we updated our patched Hudi 0.8.0 (commit 
https://github.com/apache/hudi/commit/8f7ad8b178ac3d394f88ac1c21f701138126fa04) 
to Hudi 0.9.0
   Around 30% of our applications suddenly stopped working. We don't have an 
easy way of roll-backing all of our Hudi-Tables to pre 0.9.0.
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   We use Hudi Deltastreamer 0.9.0 ingesting from Kafka  Our schema is attached 
at the end. This worked fine with 0.8.0
   This may be related to https://github.com/apache/hudi/pull/2927.
   
   **Expected behavior**
   Application should continue working as with 0.8.0
   
   **Environment Description**
   
   * Hudi version : 0.9.0
   * Spark version : 3.2.0
   
   
   **Stacktrace**
   
   ```
    21/10/21 09:45:11 WARN TaskSetManager: Lost task 0.0 in stage 30.0 (TID 
668) (100.64.36.123 executor 1): 
org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType 
UPDATE for partition :0
           at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:305)
           at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleInsertPartition(BaseSparkCommitActionExecutor.java:311)
           at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$execute$ecf5068c$1(BaseSparkCommitActionExecutor.java:158)
           at 
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
           at 
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
           at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915)
           at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915)
           at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
           at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
           at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
           at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
           at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)
           at 
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1440)
           at 
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
           at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
           at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
           at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
           at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
           at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
           at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
           at org.apache.spark.scheduler.Task.run(Task.scala:131)
           at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
           at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
           at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at java.lang.Thread.run(Thread.java:748)
   Caused by: org.apache.hudi.exception.HoodieException: 
org.apache.hudi.exception.HoodieException: 
java.util.concurrent.ExecutionException: 
org.apache.hudi.exception.HoodieException: operation has failed
           at 
org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:102)
           at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:334)
           at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:325)
           at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:298)
           ... 29 more
   Caused by: org.apache.hudi.exception.HoodieException: 
java.util.concurrent.ExecutionException: 
org.apache.hudi.exception.HoodieException: operation has failed
           at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:147)
           at 
org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:100)
           ... 32 more
   Caused by: java.util.concurrent.ExecutionException: 
org.apache.hudi.exception.HoodieException: operation has failed
           at java.util.concurrent.FutureTask.report(FutureTask.java:122)
           at java.util.concurrent.FutureTask.get(FutureTask.java:192)
           at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:141)
           ... 33 more
   Caused by: org.apache.hudi.exception.HoodieException: operation has failed
           at 
org.apache.hudi.common.util.queue.BoundedInMemoryQueue.throwExceptionIfFailed(BoundedInMemoryQueue.java:247)
           at 
org.apache.hudi.common.util.queue.BoundedInMemoryQueue.readNextRecord(BoundedInMemoryQueue.java:226)
           at 
org.apache.hudi.common.util.queue.BoundedInMemoryQueue.access$100(BoundedInMemoryQueue.java:52)
           at 
org.apache.hudi.common.util.queue.BoundedInMemoryQueue$QueueIterator.hasNext(BoundedInMemoryQueue.java:277)
           at 
org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:36)
           at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121)
           at java.util.concurrent.FutureTask.run(FutureTask.java:266)
           ... 3 more
   Caused by: org.apache.avro.SchemaParseException: Can't redefine: list
           at org.apache.avro.Schema$Names.put(Schema.java:1128)
           at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:562)
           at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:690)
           at org.apache.avro.Schema$ArraySchema.toJson(Schema.java:805)
           at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:716)
           at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:701)
           at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:716)
           at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:701)
           at org.apache.avro.Schema.toString(Schema.java:324)
           at 
org.apache.avro.SchemaCompatibility.checkReaderWriterCompatibility(SchemaCompatibility.java:68)
           at 
org.apache.parquet.avro.AvroRecordConverter.isElementType(AvroRecordConverter.java:866)
           at 
org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter.<init>(AvroRecordConverter.java:475)
           at 
org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:289)
           at 
org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:141)
           at 
org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:95)
           at 
org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)
           at 
org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:138)
           at 
org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:183)
           at 
org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156)
           at 
org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
           at 
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
           at 
org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
           at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:92)
           at java.util.concurrent.FutureTask.run(FutureTask.java:266)
           at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
           ... 4 more
   ```
   Our schema
   ```
   {
     "type": "record",
     "name": "PredictionEvent",
     "namespace": "mycompany.sip.prediction",
     "doc": "<truncated>",
     "fields": [
       {
         "name": "affectedResource",
         "type": {
           "type": "record",
           "name": "Resource",
           "namespace": "mycompany.common",
           "doc": "<truncated>",
           "fields": [
             {
               "name": "type",
               "type": {
                 "type": "string",
                 "avro.java.string": "String"
               },
               "doc": "<truncated>"
             },
             {
               "name": "id",
               "type": {
                 "type": "string",
                 "avro.java.string": "String",
                 "PersonalInformation": "unspecified"
               },
               "doc": "<truncated>"
             },
             {
               "name": "brand",
               "type": [
                 "null",
                 {
                   "type": "record",
                   "name": "Brand",
                   "doc": "<truncated>",
                   "fields": [
                     {
                       "name": "name",
                       "type": {
                         "type": "string",
                         "avro.java.string": "String"
                       }
                     }
                   ]
                 }
               ],
               "doc": "<truncated>",
               "default": null
             }
           ]
         },
         "doc": "<truncated>",
         "mam_partitioning_key": "id"
       },
       {
         "name": "surrogateResourceId",
         "type": [
           "null",
           "mycompany.common.Resource"
         ],
         "doc": "<truncated>",
         "default": null
       },
       {
         "name": "objectType",
         "type": {
           "type": "string",
           "avro.java.string": "String"
         },
         "doc": "<truncated>"
       },
       {
         "name": "objectId",
         "type": {
           "type": "string",
           "avro.java.string": "String"
         },
         "doc": "<truncated>"
       },
       {
         "name": "objectAttributes",
         "type": {
           "type": "map",
           "values": {
             "type": "array",
             "items": {
               "type": "string",
               "avro.java.string": "String"
             }
           },
           "avro.java.string": "String"
         },
         "doc": "<truncated>",
         "default": {}
       },
       {
         "name": "source",
         "type": [
           "null",
           {
             "type": "string",
             "avro.java.string": "String"
           }
         ],
         "doc": "<truncated>",
         "default": null
       },
       {
         "name": "storeDate",
         "type": [
           "null",
           {
             "type": "string",
             "avro.java.string": "String"
           }
         ],
         "doc": "<truncated>",
         "default": null
       },
       {
         "name": "domain",
         "type": {
           "type": "string",
           "avro.java.string": "String"
         },
         "doc": "<truncated>"
       },
       {
         "name": "traits",
         "type": {
           "type": "map",
           "values": {
             "type": "array",
             "items": {
               "type": "string",
               "avro.java.string": "String"
             }
           },
           "avro.java.string": "String"
         },
         "doc": "<truncated>",
         "default": {}
       },
       {
         "name": "annotations",
         "type": {
           "type": "array",
           "items": {
             "type": "record",
             "name": "Annotation",
             "doc": "<truncated>",
             "fields": [
               {
                 "name": "type",
                 "type": {
                   "type": "string",
                   "avro.java.string": "String"
                 },
                 "doc": "<truncated>"
               },
               {
                 "name": "name",
                 "type": {
                   "type": "string",
                   "avro.java.string": "String"
                 },
                 "doc": "<truncated>"
               },
               {
                 "name": "values",
                 "type": {
                   "type": "array",
                   "items": {
                     "type": "string",
                     "avro.java.string": "String"
                   }
                 },
                 "doc": "<truncated>"
               },
               {
                 "name": "positionInText",
                 "type": [
                   "null",
                   {
                     "type": "array",
                     "items": {
                       "type": "string",
                       "avro.java.string": "String"
                     }
                   }
                 ],
                 "doc": "<truncated>",
                 "default": null
               },
               {
                 "name": "valuationTimeMillis",
                 "type": "long",
                 "doc": "<truncated>"
               },
               {
                 "name": "meta",
                 "type": {
                   "type": "map",
                   "values": {
                     "type": "string",
                     "avro.java.string": "String"
                   },
                   "avro.java.string": "String"
                 },
                 "doc": "<truncated>"
               },
               {
                 "name": "source",
                 "type": [
                   "null",
                   {
                     "type": "record",
                     "name": "AnnotationSource",
                     "doc": "<truncated>",
                     "fields": [
                       {
                         "name": "name",
                         "type": {
                           "type": "string",
                           "avro.java.string": "String"
                         },
                         "doc": "<truncated>"
                       },
                       {
                         "name": "version",
                         "type": {
                           "type": "string",
                           "avro.java.string": "String"
                         },
                         "doc": "<truncated>"
                       }
                     ]
                   }
                 ],
                 "doc": "<truncated>",
                 "default": null
               }
             ]
           }
         },
         "doc": "<truncated>"
       }
     ],
     "version": "1.0.0"
   }
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to