DontSingRus opened a new issue, #12751:
URL: https://github.com/apache/hudi/issues/12751

   **Describe the problem you faced**
   
   I have a source with protobuf, after parse them in pyspark df with fixed 
structure start writing in hudi table. First part of data write done, second 
write causes an error: org.apache.avro.SchemaParseException: Can't redefine: 
array
   
   df structure:
   ```
   root
    |-- _hoodie_commit_time: string (nullable = true)
    |-- _hoodie_commit_seqno: string (nullable = true)
    |-- _hoodie_record_key: string (nullable = true)
    |-- _hoodie_partition_path: string (nullable = true)
    |-- _hoodie_file_name: string (nullable = true)
    |-- id: string (nullable = true)
    |-- col_1: string (nullable = true)
    |-- col_2: string (nullable = true)
    |-- col_3: string (nullable = true)
    |-- col_4: string (nullable = true)
    |-- col_5: string (nullable = true)
    |-- col_6: string (nullable = true)
    |-- col_7: boolean (nullable = true)
    |-- col_8: string (nullable = true)
    |-- col_9: string (nullable = true)
    |-- col_10: string (nullable = true)
    |-- col_11: boolean (nullable = true)
    |-- col_12: array (nullable = true)
    |    |-- element: struct (containsNull = true)
    |    |    |-- col_2: string (nullable = true)
    |    |    |-- col_8: string (nullable = true)
    |-- event_timestamp: timestamp (nullable = true)
    |-- event_idempotency: string (nullable = true)
    |-- col_13: string (nullable = true)
    |-- col_14: string (nullable = true)
    |-- body: struct (nullable = true)
    |    |-- col_15: array (nullable = true)
    |    |    |-- element: struct (containsNull = true)
    |    |    |    |-- col_16: string (nullable = true)
    |    |    |    |-- col_12: array (nullable = true)
    |    |    |    |    |-- element: struct (containsNull = true)
    |    |    |    |    |    |-- col_2: string (nullable = true)
    |    |    |    |    |    |-- col_8: string (nullable = true)
    |    |-- col_17: array (nullable = true)
    |    |    |-- element: string (containsNull = true)
    |    |-- col_18: boolean (nullable = true)
    |-- col_19: string (nullable = true)
    |-- col_20: string (nullable = true)
    |-- load_timestamp: timestamp (nullable = false)
    |-- date: string (nullable = true)
   ```
   
   write options:
   ```
   'hoodie.table.name': 'table_name',
   'hoodie.datasource.write.recordkey.field': 'id',
   'hoodie.datasource.write.partitionpath.field': 'date',
   'hoodie.datasource.write.table.name': 'table_name',
   'hoodie.datasource.write.operation': 'upsert',
   'hoodie.datasource.write.precombine.field': 'etl__ods_load_timestamp',
   'hoodie.upsert.shuffle.parallelism': 32,
   'hoodie.insert.shuffle.parallelism': 32,
   'hoodie.clustering.async.enabled': 'true'
   ```
   
   **To Reproduce**
   
   I couldn't reproduce the problem, I tried it locally, on a similar server 
with the same data, everything is recording successfully. On the same server 
where the error occurred with different data and a different schema, but with 
the same code, everything is still successful
   
   **Expected behavior**
   
   I was expecting to see a successful entry like it was before
   
   **Environment Description**
   
   * Hudi version : 0.15.0
   
   * Spark version : 3.0.3
   
   * Storage (HDFS/S3/GCS..) : s3
   
   * Running on Docker? (yes/no) : no
   
   * jars: parquet-common-1.10.1, parquet-hadoop-1.10.1, spark-avro_2.12-3.0.3
   
   **Stacktrace**
   ```
   Driver stacktrace:
        at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
        at scala.Option.foreach(Option.scala:407)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2135)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2154)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2179)
        at org.apache.spark.rdd.RDD.count(RDD.scala:1227)
        at 
org.apache.hudi.HoodieSparkSqlWriterInternal.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:1073)
        at 
org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:508)
        at 
org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:187)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:125)
        at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:168)
        at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
        at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:127)
        at 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:126)
        at 
org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:962)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:767)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:962)
        at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:414)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:398)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:287)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:750)
   Caused by: org.apache.hudi.exception.HoodieUpsertException: Error upserting 
bucketType UPDATE for partition :469
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:344)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:254)
        at 
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
        at 
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:889)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:889)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
        at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:362)
        at 
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1388)
        at 
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1298)
        at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1362)
        at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1186)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:360)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:311)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:463)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:466)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more
   Caused by: org.apache.hudi.exception.HoodieException: 
org.apache.avro.SchemaParseException: Can't redefine: array
        at 
org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:151)
        at 
org.apache.hudi.table.HoodieSparkTable.runMerge(HoodieSparkTable.java:148)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:376)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:371)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:337)
        ... 28 more
   Caused by: org.apache.avro.SchemaParseException: Can't redefine: array
        at org.apache.avro.Schema$Names.put(Schema.java:1128)
        at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:562)
        at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:690)
        at org.apache.avro.Schema$ArraySchema.toJson(Schema.java:805)
        at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:882)
        at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:716)
        at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:701)
        at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:882)
        at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:716)
        at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:701)
        at org.apache.avro.Schema.toString(Schema.java:324)
        at org.apache.avro.Schema.toString(Schema.java:314)
        at 
org.apache.parquet.avro.AvroReadSupport.setAvroReadSchema(AvroReadSupport.java:69)
        at 
org.apache.hudi.io.hadoop.HoodieAvroParquetReader.getIndexedRecordIteratorInternal(HoodieAvroParquetReader.java:170)
        at 
org.apache.hudi.io.hadoop.HoodieAvroParquetReader.getIndexedRecordIterator(HoodieAvroParquetReader.java:101)
        at 
org.apache.hudi.io.hadoop.HoodieAvroParquetReader.getRecordIterator(HoodieAvroParquetReader.java:80)
        at 
org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:128)
        ... 32 more
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to