ZeyuQiu-Rinze opened a new issue, #7520:
URL: https://github.com/apache/hudi/issues/7520

   I'm trying Hudi with a simple demo, just read some parquet files and then 
write to a new directory, but Hudi stuck in the "Getting small files from 
partitions" stage for a very long time (more than 2 hours) and then failed.
   <img width="1418" alt="image" 
src="https://user-images.githubusercontent.com/99316147/208644475-a178523b-3052-4343-8696-3ba6445e298a.png";>
   <img width="1437" alt="image" 
src="https://user-images.githubusercontent.com/99316147/208644585-a669695b-6b7a-4910-b2ea-ccbe664e2f1c.png";>
   My code was like:
   ```
           df = spark.read.parquet(
               's3://my_bucket/flink_test/users_activity/')
           hudi_options = {
               'hoodie.table.name': 'users_activity',
               'hoodie.datasource.write.recordkey.field': 'users_activity_id',
               'hoodie.datasource.write.partitionpath.field': 
'users_activity_create_date',
               'hoodie.datasource.write.table.name': 'users_activity_result',
               'hoodie.datasource.write.operation': 'insert',
               'hoodie.datasource.write.precombine.field': 
'users_activity_update_date',
           }
   
           
(df.write.format('org.apache.hudi').options(**hudi_options).mode('overwrite')
            .save('s3://my_bucket/flink_test/hudi_test_result/users_activity/'))
   ```
   
   **Environment Description**
   
   EMR Version: emr-6.8.0
   
   Hudi version : 0.11.1
   
   Spark version : Spark 3.3.0
   
   Hive version : Hive 3.1.3
   
   Storage (HDFS/S3/GCS..) : S3
   
   Running on Docker? (yes/no) : No
   
   **Stacktrace**
   ```
   Traceback (most recent call last):
     File "spark_hudi.py", line 37, in <module>
       process()
     File "spark_hudi.py", line 27, in process
       .save('s3://htm-hawk-data-lake-test/flink_test/copy/users_activity/'))
     File 
"/mnt/yarn/usercache/hadoop/appcache/application_1671439304688_0001/container_1671439304688_0001_01_000001/pyspark.zip/pyspark/sql/readwriter.py",
 line 968, in save
     File 
"/mnt/yarn/usercache/hadoop/appcache/application_1671439304688_0001/container_1671439304688_0001_01_000001/py4j-0.10.9.5-src.zip/py4j/java_gateway.py",
 line 1322, in __call__
     File 
"/mnt/yarn/usercache/hadoop/appcache/application_1671439304688_0001/container_1671439304688_0001_01_000001/pyspark.zip/pyspark/sql/utils.py",
 line 190, in deco
     File 
"/mnt/yarn/usercache/hadoop/appcache/application_1671439304688_0001/container_1671439304688_0001_01_000001/py4j-0.10.9.5-src.zip/py4j/protocol.py",
 line 328, in get_return_value
   py4j.protocol.Py4JJavaError: An error occurred while calling o89.save.
   : org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for 
commit time 20221219084414984
        at 
org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:64)
        at 
org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:45)
        at 
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:113)
        at 
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:97)
        at 
org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:155)
        at 
org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:213)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:307)
        at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:173)
        at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:103)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
        at 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
        at 
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
        at 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:100)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:96)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:615)
        at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:177)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:615)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:591)
        at 
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:96)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:83)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:81)
        at 
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:124)
        at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
        at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
        at 
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.lang.Thread.run(Thread.java:750)
   Caused by: org.apache.spark.SparkException: Job aborted due to stage 
failure: Total size of serialized results of 1409413 tasks (1024.0 MiB) is 
bigger than spark.driver.maxResultSize (1024.0 MiB)
        at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2863)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2799)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2798)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2798)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1239)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1239)
        at scala.Option.foreach(Option.scala:407)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1239)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3051)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2993)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1009)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2229)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2250)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2269)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2294)
        at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
        at org.apache.spark.api.java.JavaRDDLike.collect(JavaRDDLike.scala:362)
        at org.apache.spark.api.java.JavaRDDLike.collect$(JavaRDDLike.scala:361)
        at 
org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45)
        at 
org.apache.hudi.client.common.HoodieSparkEngineContext.flatMap(HoodieSparkEngineContext.java:137)
        at 
org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPartitions(HoodieIndexUtils.java:87)
        at 
org.apache.hudi.index.bloom.HoodieBloomIndex.loadColumnRangesFromFiles(HoodieBloomIndex.java:166)
        at 
org.apache.hudi.index.bloom.HoodieBloomIndex.getBloomIndexFileInfoForPartitions(HoodieBloomIndex.java:151)
        at 
org.apache.hudi.index.bloom.HoodieBloomIndex.lookupIndex(HoodieBloomIndex.java:125)
        at 
org.apache.hudi.index.bloom.HoodieBloomIndex.tagLocation(HoodieBloomIndex.java:91)
        at 
org.apache.hudi.table.action.commit.HoodieWriteHelper.tag(HoodieWriteHelper.java:49)
        at 
org.apache.hudi.table.action.commit.HoodieWriteHelper.tag(HoodieWriteHelper.java:32)
        at 
org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:53)
        ... 54 more
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to