[
https://issues.apache.org/jira/browse/HUDI-9119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17937249#comment-17937249
]
sivabalan narayanan commented on HUDI-9119:
-------------------------------------------
hey [~yc2523] : is this specific to EMR 7.8? I tried EMR 7.6 and w/ latest
master as of today and could not reproduce the issue.
> Hudi 1.0.1 cannot write MOR tables
> ----------------------------------
>
> Key: HUDI-9119
> URL: https://issues.apache.org/jira/browse/HUDI-9119
> Project: Apache Hudi
> Issue Type: Sub-task
> Affects Versions: 1.0.1
> Reporter: Shawn Chang
> Assignee: sivabalan narayanan
> Priority: Critical
> Fix For: 1.0.2
>
>
> When testing Hudi 1.0.1 on EMR 7.8, I can see issues like below:
> {code:java}
> Exception in thread "main" org.apache.hudi.exception.HoodieException: Failed
> to update metadata at
> org.apache.hudi.client.BaseHoodieClient.writeTableMetadata(BaseHoodieClient.java:282)
> at
> org.apache.hudi.client.BaseHoodieWriteClient.commit(BaseHoodieWriteClient.java:293)
> at
> org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:253)
> at
> org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:94)
> at
> org.apache.hudi.HoodieSparkSqlWriterInternal.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:999)
> at
> org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:538)
> at
> org.apache.hudi.HoodieSparkSqlWriterInternal.$anonfun$write$1(HoodieSparkSqlWriter.scala:193)
> at
> org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108)
> at
> org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:384)
> at
> org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:157)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$10(SQLExecution.scala:220)
> at
> org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108)
> at
> org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:384)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$9(SQLExecution.scala:220)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:405)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:219)
> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:901) at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:83)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74)
> at
> org.apache.spark.sql.adapter.BaseSpark3Adapter.sqlExecutionWithNewExecutionId(BaseSpark3Adapter.scala:105)
> at
> org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:215)
> at
> org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:130)
> at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:185) at
> org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
> at
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:126)
> at
> org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108)
> at
> org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:384)
> at
> org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:157)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$10(SQLExecution.scala:220)
> at
> org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108)
> at
> org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:384)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$9(SQLExecution.scala:220)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:405)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:219)
> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:901) at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:83)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74)
> at
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:123)
> at
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:114)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:520)
> at
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:77)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:520)
> at
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:34)
> at
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:303)
> at
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:299)
> at
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:34)
> at
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:34)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:496)
> at
> org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:114)
> at
> org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:101)
> at
> org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:99)
> at
> org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:164)
> at
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:884)
> at
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:405)
> at
> org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:365)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:244) at
> code.amazonaws.emr.examples.hudi.metadata_listing.TestHudiFileListMetadataCOW$.runFileListMetadataTest(TestHudiFileListMetadataCOW.scala:87)
> at
> code.amazonaws.emr.examples.hudi.metadata_listing.TestHudiFileListMetadataCOW$.main(TestHudiFileListMetadataCOW.scala:148)
> at
> code.amazonaws.emr.examples.hudi.metadata_listing.TestHudiFileListMetadataCOW.main(TestHudiFileListMetadataCOW.scala)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method) at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:569) at
> org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
> at
> org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1112)
> at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:200)
> at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:223) at
> org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:92) at
> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1204)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1213) at
> org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)Caused by:
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
> stage 92.0 failed 4 times, most recent failure: Lost task 0.3 in stage 92.0
> (TID 125) (ip-10-10-18-194.ec2.internal executor 2):
> org.apache.hudi.exception.HoodieException: Error occurs when executing map
> at
> org.apache.hudi.common.function.FunctionWrapper.lambda$throwingMapWrapper$0(FunctionWrapper.java:40)
> at
> org.apache.hudi.common.data.HoodieListData.lambda$flatMap$0(HoodieListData.java:135)
> at
> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:273)
> at
> java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
> at
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
> at
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
> at
> java.base/java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:960)
> at java.base/java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:934)
> at java.base/java.util.stream.AbstractTask.compute(AbstractTask.java:327)
> at
> java.base/java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:754)
> at
> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) at
> java.base/java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:686) at
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateParallel(ReduceOps.java:927)
> at
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
> at
> java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
> at
> org.apache.hudi.common.data.HoodieBaseListData.<init>(HoodieBaseListData.java:46)
> at
> org.apache.hudi.common.data.HoodieListData.<init>(HoodieListData.java:69) at
> org.apache.hudi.common.data.HoodieListData.flatMap(HoodieListData.java:136)
> at
> org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordsByKeyPrefixes(HoodieBackedTableMetadata.java:214)
> at
> org.apache.hudi.metadata.HoodieTableMetadataUtil.lambda$convertMetadataToPartitionStatRecords$b0d2b4b0$1(HoodieTableMetadataUtil.java:2664)
> at
> org.apache.hudi.data.HoodieJavaRDD.lambda$mapToPair$aa72055d$1(HoodieJavaRDD.java:173)
> at
> org.apache.spark.api.java.JavaPairRDD$.$anonfun$pairFunToScalaFun$1(JavaPairRDD.scala:1073)
> at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at
> scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) at
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) at
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:157)
> at
> org.apache.spark.shuffle.ShuffleWriteProcessor.doWrite(ShuffleWriteProcessor.scala:45)
> at
> org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:69)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
> at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:174)
> at org.apache.spark.scheduler.Task.run(Task.scala:152) at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:632)
> at
> org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
> at
> org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:96) at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:635) at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
> at java.base/java.lang.Thread.run(Thread.java:840)Caused by:
> org.apache.hudi.exception.HoodieException: Exception when reading log file
> at
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scanInternalV1(AbstractHoodieLogRecordScanner.java:388)
> at
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scanInternal(AbstractHoodieLogRecordScanner.java:250)
> at
> org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.scanByKeyPrefixes(HoodieMergedLogRecordScanner.java:196)
> at
> org.apache.hudi.metadata.HoodieMetadataLogRecordReader.getRecordsByKeyPrefixes(HoodieMetadataLogRecordReader.java:87)
> at
> org.apache.hudi.metadata.HoodieBackedTableMetadata.readLogRecords(HoodieBackedTableMetadata.java:379)
> at
> org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$getRecordsByKeyPrefixes$7539c171$1(HoodieBackedTableMetadata.java:234)
> at
> org.apache.hudi.common.function.FunctionWrapper.lambda$throwingMapWrapper$0(FunctionWrapper.java:38)
> ... 39 moreCaused by: java.lang.ClassCastException: class
> org.apache.avro.generic.GenericData$Record cannot be cast to class
> org.apache.hudi.avro.model.HoodieDeleteRecordList
> (org.apache.avro.generic.GenericData$Record is in unnamed module of loader
> 'app'; org.apache.hudi.avro.model.HoodieDeleteRecordList is in unnamed module
> of loader org.apache.spark.util.MutableURLClassLoader @5b2ea718) at
> org.apache.hudi.common.table.log.block.HoodieDeleteBlock.deserialize(HoodieDeleteBlock.java:169)
> at
> org.apache.hudi.common.table.log.block.HoodieDeleteBlock.getRecordsToDelete(HoodieDeleteBlock.java:124)
> at
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.processQueuedBlocksForInstant(AbstractHoodieLogRecordScanner.java:678)
> at
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scanInternalV1(AbstractHoodieLogRecordScanner.java:378)
> ... 45 more {code}
> Reproduction steps:
> # Start a EMR 7.8 cluster
> # Start spark-shell with the command below
> #
> {code:java}
> spark-shell \--packages org.apache.hudi:hudi-spark3.5-bundle_2.12:1.0.1
> \--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \--conf
> 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
> \--conf
> 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
> \--conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
> {code}
> Run the script below:
> #
> {code:java}
> import org.apache.hudi.DataSourceWriteOptions
> import org.apache.spark.sql.SaveMode
> val df1 = Seq( (100, "2015-01-01", "event_name_900",
> "2015-01-01T13:51:39.340396Z", "type1"), (101, "2015-01-01",
> "event_name_546", "2015-01-01T12:14:58.597216Z", "type2"), (102,
> "2015-01-01", "event_name_345", "2015-01-01T13:51:40.417052Z", "type3"),
> (103, "2015-01-01", "event_name_234", "2015-01-01T13:51:40.519832Z",
> "type4"), (104, "2015-01-01", "event_name_123",
> "2015-01-01T12:15:00.512679Z", "type1"), (105, "2015-01-01",
> "event_name_678", "2015-01-01T13:51:42.248818Z", "type2"), (106,
> "2015-01-01", "event_name_890", "2015-01-01T13:51:44.735360Z", "type3"),
> (107, "2015-01-01", "event_name_944", "2015-01-01T13:51:45.019544Z",
> "type4"), (108, "2015-01-01", "event_name_456",
> "2015-01-01T13:51:45.208007Z", "type1"), (109, "2015-01-01",
> "event_name_567", "2015-01-01T13:51:45.369689Z", "type2"), (110,
> "2015-01-01", "event_name_789", "2015-01-01T12:15:05.664947Z", "type3"),
> (111, "2015-01-01", "event_name_322", "2015-01-01T13:51:47.388239Z", "type4")
> ).toDF("event_id", "event_date", "event_name", "event_ts", "event_type")
> val r = scala.util.Random
> val num = r.nextInt(99999)
> var tableName = "yxchang_hudi_cow_simple_14_" + num
> var tablePath = "s3://<yourbucket>/hudi10/" + tableName + "/"
> df1.write.format("hudi")
> .option("hoodie.metadata.enable", "true")
> .option("hoodie.table.name", tableName)
> .option("hoodie.datasource.write.operation", "insert") // use insert
> .option("hoodie.datasource.write.table.type", "COPY_ON_WRITE")
> .option("hoodie.datasource.write.recordkey.field", "event_id,event_date")
> .option("hoodie.datasource.write.partitionpath.field", "event_type")
> .option("hoodie.datasource.write.precombine.field", "event_ts")
> .option("hoodie.datasource.write.keygenerator.class",
> "org.apache.hudi.keygen.ComplexKeyGenerator")
> .option("hoodie.datasource.hive_sync.enable", "true")
> .option("hoodie.datasource.meta.sync.enable", "true")
> .option("hoodie.datasource.hive_sync.mode", "hms")
> .option("hoodie.datasource.hive_sync.table", tableName)
> .option("hoodie.datasource.hive_sync.partition_fields", "event_type")
> .option("hoodie.datasource.hive_sync.partition_extractor_class",
> "org.apache.hudi.hive.MultiPartKeysValueExtractor") .mode(SaveMode.Append)
> .save(tablePath) {code}
> In the script above, I used a COW table with MDT enabled, which can also
> reproduce the issue.
>
> Additional context:
> # This exception looks like the same as
> [https://github.com/apache/hudi/issues/10609]
> # The same script won't have issue when using OSS Hudi 1.0.0
> # This script doesn't always fail, sometimes the exception shows up but the
> job still completes
> Class loader information in spark-shell:
> {code:java}
> scala>
> classOf[org.apache.hudi.avro.model.HoodieDeleteRecordList].getClassLoader
> res1: ClassLoader =
> scala.reflect.internal.util.ScalaClassLoader$URLClassLoader@4d174189scala>scala>scala>
> classOf[org.apache.avro.generic.GenericData.Record].getClassLoader
> res2: ClassLoader = jdk.internal.loader.ClassLoaders$AppClassLoader@5ffd2b27
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)