15663671003 opened a new issue, #12017:
URL: https://github.com/apache/hudi/issues/12017

   **Describe the problem you faced**
   
   A third attempt to write times error
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. upsert 10 million data for the first time, Succeeded
   2.The system successfully upsert 10 million data and compact is completed
   3.Failed to upsert 1 billion data again
   
   **Expected behavior**
   
   Why did it fail, and how can I see the cause of the error in more detail
   
   **Environment Description**
   
   * Hudi version : 0.15.0
   
   * Spark version : 3.2.2
   
   * Hive version : 2.1.1
   
   * Hadoop version : 3.0.0
   
   * Storage (HDFS/S3/GCS..) : HDFS
   
   * Running on Docker? (yes/no) : no
   
   
   **Additional context**
   
   ```
   from pyspark.sql import SparkSession
   from pyspark.sql import functions as SF
   
   
   
   def run():
       spark = SparkSession \
           .builder \
           .config("spark.hadoop.hive.exec.dynamic.partition", "true") \
           .config("spark.hadoop.hive.exec.dynamic.partition.mode", 
"nonstrict") \
           .config("spark.sql.broadcastTimeout", 3600) \
           .enableHiveSupport() \
           .getOrCreate()
   
        df = spark.table("xx.test_data")
        
       db, table = "xx", "test_hudi_0_15_0_ri"
       path = "/user/hive/warehouse/xx.db/test_hudi_0_15_0_ri"
       cleaner_commits_retaine = 3
       keep_min_commits = cleaner_commits_retaine + 1
       keep_max_commits = keep_min_commits + 1
       keep_max_file_version = keep_max_commits + 1
       hive_table = f"{db}.{table}"
   
       hudi_options = {
           'hoodie.table.name': hive_table,
           'hoodie.datasource.write.recordkey.field': "id",
           'hoodie.datasource.write.table.name': hive_table,
           'hoodie.datasource.write.operation': "upsert",
           'hoodie.datasource.write.precombine.field': "order_key",
           'hoodie.datasource.write.table.type': "MERGE_ON_READ",
           'hoodie.upsert.shuffle.parallelism': 2000,
           'hoodie.bulkinsert.shuffle.parallelism': 2000,
           'hoodie.insert.shuffle.parallelism': 2000,
           "hoodie.compact.inline": "true",
           "hoodie.compact.inline.max.delta.commits": 1,
           'hoodie.cleaner.policy': 'KEEP_LATEST_FILE_VERSIONS',
           'hoodie.cleaner.fileversions.retained': keep_max_file_version,
           "hoodie.keep.min.commits": keep_min_commits,
           "hoodie.keep.max.commits": keep_max_commits,
           'hoodie.cleaner.commits.retained': cleaner_commits_retaine,
           'hoodie.parquet.max.file.size': 1024 * 1024 * 100,
           'hoodie.parquet.small.file.limit': 1024 * 1024 * 60,
           'hoodie.parquet.compression.codec': 'snappy',
           'hoodie.datasource.hive_sync.partition_extractor_class': 
'org.apache.hudi.hive.NonPartitionedExtractor',
           'hoodie.datasource.write.keygenerator.class': 
"org.apache.hudi.keygen.NonpartitionedKeyGenerator",
           'hoodie.datasource.write.hive_style_partitioning': "true",
           'hoodie.datasource.hive_sync.enable': 'false',
           'hoodie.datasource.hive_sync.database': db,
           'hoodie.datasource.hive_sync.table': table,
           'hoodie.datasource.hive_sync.mode': "hms",
           "hoodie.index.type": "RECORD_INDEX",
           "hoodie.metadata.enable": "true",
           'hoodie.datasource.write.payload.class': 
"org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload",
       }
       df.write.format("hudi").options(**hudi_options).save(path, mode='append')
   
   
   
   if __name__ == "__main__":
       run()
   
   ```
   
   **Stacktrace**
   
   ```
   24/09/27 18:24:58 INFO scheduler.TaskSetManager: Finished task 178.0 in 
stage 27.0 (TID 18856) in 117 ms on xx (executor 83) (194/200)
   24/09/27 18:24:58 INFO scheduler.TaskSetManager: Finished task 36.0 in stage 
27.0 (TID 18714) in 122 ms on xx (executor 260) (195/200)
   24/09/27 18:24:58 INFO scheduler.TaskSetManager: Finished task 180.0 in 
stage 27.0 (TID 18858) in 119 ms on xx (executor 197) (196/200)
   24/09/27 18:24:58 INFO scheduler.TaskSetManager: Finished task 6.0 in stage 
27.0 (TID 18684) in 129 ms on xx (executor 46) (197/200)
   24/09/27 18:24:58 INFO scheduler.TaskSetManager: Finished task 174.0 in 
stage 27.0 (TID 18852) in 124 ms on xx (executor 84) (198/200)
   24/09/27 18:24:58 INFO scheduler.TaskSetManager: Finished task 160.0 in 
stage 27.0 (TID 18838) in 129 ms on xx (executor 245) (199/200)
   24/09/27 18:24:58 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 
27.0 (TID 18678) in 254 ms on xx (executor 192) (200/200)
   24/09/27 18:24:58 INFO cluster.YarnScheduler: Removed TaskSet 27.0, whose 
tasks have all completed, from pool
   24/09/27 18:24:58 INFO scheduler.DAGScheduler: ResultStage 27 (collect at 
HoodieJavaRDD.java:177) finished in 0.267 s
   24/09/27 18:24:58 INFO scheduler.DAGScheduler: Job 9 is finished. Cancelling 
potential speculative or zombie tasks for this job
   24/09/27 18:24:58 INFO cluster.YarnScheduler: Killing all running tasks in 
stage 27: Stage finished
   24/09/27 18:24:58 INFO scheduler.DAGScheduler: Job 9 finished: collect at 
HoodieJavaRDD.java:177, took 0.600983 s
   24/09/27 18:24:58 WARN marker.WriteMarkersFactory: Timeline-server-based 
markers are not supported for HDFS: base path 
/user/hive/warehouse/xx.db/test_hudi_0_15_0_ri.  Falling back to direct         
                                                        markers.
   24/09/27 18:24:58 INFO table.HoodieTableMetaClient: Loading 
HoodieTableMetaClient from /user/hive/warehouse/xx.db/test_hudi_0_15_0_ri
   24/09/27 18:24:58 INFO table.HoodieTableConfig: Loading table properties 
from /user/hive/warehouse/xx.db/test_hudi_0_15_0_ri/.hoodie/hoodie.properties
   24/09/27 18:24:58 INFO table.HoodieTableMetaClient: Finished Loading Table 
of type MERGE_ON_READ(version=1, baseFileFormat=PARQUET) from 
/user/hive/warehouse/xx.db/test_hudi_0_15_0_ri
   24/09/27 18:24:58 INFO table.HoodieTableMetaClient: Loading 
HoodieTableMetaClient from 
/user/hive/warehouse/xx.db/test_hudi_0_15_0_ri/.hoodie/metadata
   24/09/27 18:24:58 INFO table.HoodieTableConfig: Loading table properties 
from 
/user/hive/warehouse/xx.db/test_hudi_0_15_0_ri/.hoodie/metadata/.hoodie/hoodie.properties
   24/09/27 18:24:58 INFO table.HoodieTableMetaClient: Finished Loading Table 
of type MERGE_ON_READ(version=1, baseFileFormat=HFILE) from 
/user/hive/warehouse/xx.db/test_hudi_0_15_0_ri/.hoodie/m                        
                                        etadata
   24/09/27 18:24:58 INFO timeline.HoodieActiveTimeline: Loaded instants upto : 
Option{val=[20240927172830490__deltacommit__COMPLETED__20240927174916003]}
   24/09/27 18:24:58 INFO metadata.HoodieBackedTableMetadataWriter: Async 
metadata indexing disabled and following partitions already initialized: [files]
   24/09/27 18:24:58 INFO table.HoodieTableMetaClient: Loading 
HoodieTableMetaClient from /user/hive/warehouse/xx.db/test_hudi_0_15_0_ri
   24/09/27 18:24:58 INFO table.HoodieTableConfig: Loading table properties 
from /user/hive/warehouse/xx.db/test_hudi_0_15_0_ri/.hoodie/hoodie.properties
   24/09/27 18:24:58 INFO table.HoodieTableMetaClient: Finished Loading Table 
of type MERGE_ON_READ(version=1, baseFileFormat=PARQUET) from 
/user/hive/warehouse/xx.db/test_hudi_0_15_0_ri
   24/09/27 18:24:58 INFO table.HoodieTableMetaClient: Loading 
HoodieTableMetaClient from 
/user/hive/warehouse/xx.db/test_hudi_0_15_0_ri/.hoodie/metadata
   24/09/27 18:24:58 INFO table.HoodieTableConfig: Loading table properties 
from 
/user/hive/warehouse/xx.db/test_hudi_0_15_0_ri/.hoodie/metadata/.hoodie/hoodie.properties
   24/09/27 18:24:58 INFO table.HoodieTableMetaClient: Finished Loading Table 
of type MERGE_ON_READ(version=1, baseFileFormat=HFILE) from 
/user/hive/warehouse/xx.db/test_hudi_0_15_0_ri/.hoodie/m                        
                                        etadata
   24/09/27 18:24:58 INFO timeline.HoodieActiveTimeline: Loaded instants upto : 
Option{val=[20240927172830490__deltacommit__COMPLETED__20240927174916003]}
   24/09/27 18:24:58 INFO view.AbstractTableFileSystemView: Took 0 ms to read  
0 instants, 0 replaced file groups
   24/09/27 18:24:58 INFO util.ClusteringUtils: Found 0 files in pending 
clustering operations
   24/09/27 18:24:58 INFO rdd.MapPartitionsRDD: Removing RDD 28 from 
persistence list
   24/09/27 18:24:58 INFO storage.BlockManager: Removing RDD 28
   24/09/27 18:24:58 INFO rdd.MapPartitionsRDD: Removing RDD 38 from 
persistence list
   24/09/27 18:24:58 INFO storage.BlockManager: Removing RDD 38
   24/09/27 18:24:58 INFO hudi.HoodieSparkSqlWriterInternal: 
Config.inlineCompactionEnabled ? true
   24/09/27 18:24:58 INFO hudi.HoodieSparkSqlWriterInternal: 
Config.asyncClusteringEnabled ? false
   24/09/27 18:24:58 WARN hudi.HoodieSparkSqlWriterInternal: Closing write 
client
   24/09/27 18:24:58 INFO client.BaseHoodieClient: Stopping Timeline service !!
   24/09/27 18:24:58 INFO embedded.EmbeddedTimelineService: Closing Timeline 
server
   24/09/27 18:24:58 INFO service.TimelineService: Closing Timeline Service
   24/09/27 18:24:58 INFO javalin.Javalin: Stopping Javalin ...
   24/09/27 18:24:58 INFO javalin.Javalin: Javalin has stopped
   24/09/27 18:24:58 INFO service.TimelineService: Closed Timeline Service
   24/09/27 18:24:58 INFO embedded.EmbeddedTimelineService: Closed Timeline 
server
   Traceback (most recent call last):
     File "d.py", line xx, in run
       df.write.format("hudi").options(**hudi_options).save(path, mode='append')
     File 
"/opt/spark-3.2.2-bin-3.0.0-cdh6.2.1/python/lib/pyspark.zip/pyspark/sql/readwriter.py",
 line 740, in save
     File 
"/opt/spark-3.2.2-bin-3.0.0-cdh6.2.1/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py",
 line 1322, in __call__
     File 
"/opt/spark-3.2.2-bin-3.0.0-cdh6.2.1/python/lib/pyspark.zip/pyspark/sql/utils.py",
 line 111, in deco
     File 
"/opt/spark-3.2.2-bin-3.0.0-cdh6.2.1/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py",
 line 328, in get_return_value
   py4j.protocol.Py4JJavaError: An error occurred while calling o184.save.
   : org.apache.hudi.exception.HoodieException: Failed to update metadata
           at 
org.apache.hudi.client.BaseHoodieClient.writeTableMetadata(BaseHoodieClient.java:262)
           at 
org.apache.hudi.client.BaseHoodieWriteClient.commit(BaseHoodieWriteClient.java:294)
           at 
org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:239)
           at 
org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:108)
           at 
org.apache.hudi.HoodieSparkSqlWriterInternal.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:1082)
           at 
org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:508)
           at 
org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:187)
           at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:125)
           at 
org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:168)
           at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
           at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
           at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
           at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
           at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:97)
           at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
           at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
           at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
           at 
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
           at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
           at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:97)
           at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:93)
           at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
           at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
           at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
           at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
           at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
           at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
           at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
           at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
           at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
           at 
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:93)
           at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:80)
           at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:78)
           at 
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:115)
           at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
           at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
           at 
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
           at 
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
           at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
           at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
           at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
           at java.lang.reflect.Method.invoke(Method.java:498)
           at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
           at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
           at py4j.Gateway.invoke(Gateway.java:282)
           at 
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
           at py4j.commands.CallCommand.execute(CallCommand.java:79)
           at 
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
           at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
           at java.lang.Thread.run(Thread.java:748)
   Caused by: java.lang.IllegalStateException
           at 
org.apache.hudi.common.util.ValidationUtils.checkState(ValidationUtils.java:62)
           at 
org.apache.hudi.metadata.HoodieMetadataPayload.lambda$createPartitionFilesRecord$2(HoodieMetadataPayload.java:335)
           at java.util.HashMap.forEach(HashMap.java:1289)
           at 
org.apache.hudi.metadata.HoodieMetadataPayload.createPartitionFilesRecord(HoodieMetadataPayload.java:332)
           at 
org.apache.hudi.metadata.HoodieTableMetadataUtil.lambda$convertMetadataToFilesPartitionRecords$6(HoodieTableMetadataUtil.java:425)
           at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
           at 
java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1696)
           at 
java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
           at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
           at 
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
           at 
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
           at 
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
           at 
org.apache.hudi.metadata.HoodieTableMetadataUtil.convertMetadataToFilesPartitionRecords(HoodieTableMetadataUtil.java:428)
           at 
org.apache.hudi.metadata.HoodieTableMetadataUtil.convertMetadataToRecords(HoodieTableMetadataUtil.java:356)
           at 
org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.lambda$updateFromWriteStatuses$22(HoodieBackedTableMetadataWriter.java:920)
           at 
org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.processAndCommit(HoodieBackedTableMetadataWriter.java:861)
           at 
org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.updateFromWriteStatuses(HoodieBackedTableMetadataWriter.java:918)
           at 
org.apache.hudi.client.BaseHoodieClient.writeTableMetadata(BaseHoodieClient.java:257)
           ... 49 more
   
   24/09/27 18:24:58 INFO spark.SparkContext: Invoking stop() from shutdown hook
   24/09/27 18:24:58 INFO server.AbstractConnector: Stopped 
Spark@f5da179{HTTP/1.1, (http/1.1)}{0.0.0.0:4040}
   24/09/27 18:24:58 INFO ui.SparkUI: Stopped Spark web UI at http://xx:4040
   24/09/27 18:24:58 INFO cluster.YarnClientSchedulerBackend: Interrupting 
monitor thread
   24/09/27 18:24:58 INFO cluster.YarnClientSchedulerBackend: Shutting down all 
executors
   24/09/27 18:24:58 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: 
Asking each executor to shut down
   24/09/27 18:24:58 INFO cluster.YarnClientSchedulerBackend: YARN client 
scheduler backend Stopped
   24/09/27 18:24:58 INFO spark.MapOutputTrackerMasterEndpoint: 
MapOutputTrackerMasterEndpoint stopped!
   24/09/27 18:24:58 WARN nio.NioEventLoop: Selector.select() returned 
prematurely 512 times in a row; rebuilding Selector 
io.netty.channel.nio.SelectedSelectionKeySetSelector@2d8ab97f.
   24/09/27 18:24:58 INFO nio.NioEventLoop: Migrated 37 channel(s) to the new 
Selector.
   24/09/27 18:24:58 WARN netty.Dispatcher: Message 
RemoteProcessDisconnected(xx.xx.xx.xx:50326) dropped. Could not find 
BlockManagerEndpoint1.
   24/09/27 18:24:58 INFO memory.MemoryStore: MemoryStore cleared
   24/09/27 18:24:58 INFO storage.BlockManager: BlockManager stopped
   24/09/27 18:24:58 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
   24/09/27 18:24:58 INFO 
scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: 
OutputCommitCoordinator stopped!
   24/09/27 18:24:58 INFO spark.SparkContext: Successfully stopped SparkContext
   24/09/27 18:24:58 INFO util.ShutdownHookManager: Shutdown hook called
   24/09/27 18:24:58 INFO util.ShutdownHookManager: Deleting directory 
/tmp/spark-6d257678-0e94-43b4-9441-a5968d70417a/pyspark-ed116a34-04e9-4884-8f45-22572d7b2532
   24/09/27 18:24:58 INFO util.ShutdownHookManager: Deleting directory 
/tmp/spark-6d257678-0e94-43b4-9441-a5968d70417a
   24/09/27 18:24:58 INFO util.ShutdownHookManager: Deleting directory 
/tmp/spark-c472a44a-a82f-4498-b7df-1357b78b61aa
   24/09/27 18:24:58 INFO util.ShutdownHookManager: Deleting directory 
/tmp/localPyFiles-199d2db9-808d-489f-a4f8-23334eaeed93
   ```
   
![11](https://github.com/user-attachments/assets/96ee7288-2101-4eae-99a7-fc9f3719fd1b)
   
![22](https://github.com/user-attachments/assets/ea586363-4f0a-415c-bbeb-f83a318b26e3)
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to