SabyasachiDasTR opened a new issue, #5422:
URL: https://github.com/apache/hudi/issues/5422
**Describe the problem you faced**
We are incrementally upserting data into our Hudi table/s every 5 minutes.
As we begin to process this data we notice mentioned error occurs and the
upserts are failing. The only command we execute is Upsert. We never call bulk
insert/insert and we are using the single writer. This starts happening when we
enable metadata with rest of the properties same as before.
Hudi table: Mor Table
Compaction : Inline Compaction
**StackTrace**
2022-04-21T12:31:34.433+0000 [WARN] [001qa_correlation_id]
[org.apache.spark.scheduler.TaskSetManager] [TaskSetManager]: Lost task 181.0
in stage 769.0 (TID 177843) (ip-10-1------.aws-int.-----------.com executor
54): org.apache.hudi.exception.HoodieIOException: Failed to read footer for
parquet
s3://bucket/table/partition/7404f6ba-4b10-4d64-8d85-a7f855af18f3-1_15-273-66858_20220421115823.parquet
at
org.apache.hudi.common.util.ParquetUtils.readMetadata(ParquetUtils.java:178)
at
org.apache.hudi.common.util.ParquetUtils.readFooter(ParquetUtils.java:194)
at
org.apache.hudi.common.util.BaseFileUtils.readMinMaxRecordKeys(BaseFileUtils.java:109)
at
org.apache.hudi.io.storage.HoodieParquetReader.readMinMaxRecordKeys(HoodieParquetReader.java:49)
at
org.apache.hudi.io.HoodieRangeInfoHandle.getMinMaxKeys(HoodieRangeInfoHandle.java:39)
at
org.apache.hudi.index.bloom.SparkHoodieBloomIndex.lambda$loadInvolvedFiles$dac7877d$1(SparkHoodieBloomIndex.java:179)
at
org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1070)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
at scala.collection.AbstractIterator.to(Iterator.scala:1429)
at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429)
at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1429)
at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1030)
at
org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2281)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.io.FileNotFoundException: No such file or directory
's3://bucket/table/partition/7404f6ba-4b10-4d64-8d85-a7f855af18f3-1_15-273-66858_20220421115823.parquet'
at
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:521)
at
com.amazon.ws.emr.hadoop.fs.EmrFileSystem.getFileStatus(EmrFileSystem.java:694)
at
org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(HadoopInputFile.java:61)
at
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:456)
at
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:441)
at
org.apache.hudi.common.util.ParquetUtils.readMetadata(ParquetUtils.java:176)
... 33 more
"throwable": [
"Failed to upsert for commit time 20220421123126",
"at
org.apache.hudi.table.action.commit.AbstractWriteHelper.write(AbstractWriteHelper.java:62)",
"at
org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitActionExecutor.execute(SparkUpsertDeltaCommitActionExecutor.java:46)",
"at
org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsert(HoodieSparkMergeOnReadTable.java:82)",
"at
org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsert(HoodieSparkMergeOnReadTable.java:74)",
"at
org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:157)",
"at
org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:214)",
"at
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:265)",
"at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:169)",
"at
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)",
"at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)",
"at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)",
"at
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)",
"at
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:194)",
"at
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232)",
"at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)",
"at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229)",
"at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:190)",
"at
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)",
"at
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)",
"at
org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)",
"at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)",
"at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)",
"at
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)",
"at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)",
"at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)",
"at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)",
"at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)",
"at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)",
"at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)",
"at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)",
"at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)",
"at
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)",
"at
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)",
"at
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)",
"at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)",
"at
com.myapp.utils.HudiOperations$.$anonfun$upsert$2(HudiOperations.scala:288)",
"at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)",
"at scala.util.Try$.apply(Try.scala:213)",
"at com.myapp.utils.HudiOperations$.retry(HudiOperations.scala:373)",
"at
com.myapp.utils.HudiOperations$.$anonfun$upsert$1(HudiOperations.scala:278)",
"at
legal.publishing.shared.utils.time.RuntimeMetrics$.measureTime(RuntimeMetrics.scala:22)",
"at com.myapp.utils.HudiOperations$.upsert(HudiOperations.scala:274)",
"at
com.myapp.caseclasses.KinesisInputMessage$.writePromoteAndDemoteMessages(KinesisInputMessage.scala:196)",
"at
com.myapp.caseclasses.KinesisInputMessage$.processPromoteAndDemoteMessages(KinesisInputMessage.scala:172)",
"at
com.myapp.caseclasses.KinesisInputMessage$.processNovusCollection(KinesisInputMessage.scala:438)",
"at
com.myapp.MyStreamingApp$.$anonfun$processInterval$15(MyStreamingApp.scala:205)",
"at
legal.publishing.shared.utils.time.RuntimeMetrics$.measureTime(RuntimeMetrics.scala:22)",
"at
com.myapp.MyStreamingApp$.$anonfun$processInterval$11(MyStreamingApp.scala:205)",
"at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)",
"at scala.util.Try$.apply(Try.scala:213)",
"at
com.myapp.MyStreamingApp$.$anonfun$processInterval$10(MyStreamingApp.scala:189)",
"at
scala.collection.parallel.AugmentedIterableIterator.map2combiner(RemainsIterator.scala:116)",
"at
scala.collection.parallel.AugmentedIterableIterator.map2combiner$(RemainsIterator.scala:113)",
"at
scala.collection.parallel.immutable.ParHashMap$ParHashMapIterator.map2combiner(ParHashMap.scala:80)",
"at
scala.collection.parallel.ParIterableLike$Map.leaf(ParIterableLike.scala:1056)",
"at scala.collection.parallel.Task.$anonfun$tryLeaf$1(Tasks.scala:53)",
"at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)",
"at scala.util.control.Breaks$$anon$1.catchBreak(Breaks.scala:67)",
"at scala.collection.parallel.Task.tryLeaf(Tasks.scala:56)",
"at scala.collection.parallel.Task.tryLeaf$(Tasks.scala:50)",
"at
scala.collection.parallel.ParIterableLike$Map.tryLeaf(ParIterableLike.scala:1053)",
"at
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.compute(Tasks.scala:153)",
"at
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.compute$(Tasks.scala:149)",
"at
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:440)",
"at java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)",
"at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)",
"at
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)",
"at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)",
"at
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)"
*PFA Upsert query*
[upsertQuery.txt](https://github.com/apache/hudi/files/8552354/upsertQuery.txt)
*Options used during Upsert*
[hudiOptions.txt](https://github.com/apache/hudi/files/8552369/hudiOptions.txt)
**Environment Description**
* Hudi version : 0.9.0
* Spark version : 3.1.2
* Hive version : Hive not install on EMR Cluster. But if needed to be
installed. Version would be 3.1.2 based on EMR 6.5
* Hadoop version : 3.2.1
* Storage (HDFS/S3/GCS..) : s3
* Running on Docker? (yes/no) : no
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]