FelixKJose opened a new issue #4719:
URL: https://github.com/apache/hudi/issues/4719


   I am doing some volume testing for my data ingestion pipeline. My  data 
ingestion pipeline comprises of Kafka for streaming, Spark Structured Streaming 
(continuous mode) + Hudi for  ingesting data from Kafka to AWS S3. Spark job is 
running on AWS EMR -6.5.0. The Hudi table is a non-partitioned table and 
metadata is enabled. Spark job continuously failing with 
HoodieMetadataException after runs for a certain time. 
   
   Note: Hudi table is MOR; Hive sync is enabled and external hive meta store 
is MySQL.
   
   Exception Stacktrace:
   
   
   
   My hoodie config:
   
   >  {
         "hoodie.table.name": "patient_flfinal",
         "hoodie.datasource.write.table.type": "MERGE_ON_READ",
         "hoodie.datasource.write.operation": "upsert",
         "hoodie.datasource.write.recordkey.field": 
"originatingSystemIdentifier_value",
         "hoodie.datasource.write.precombine.field": "eventDateTime",
         "hoodie.payload.ordering.field": "eventDateTime",
         "hoodie.datasource.write.payload.class": 
"org.apache.hudi.common.model.DefaultHoodieRecordPayload",
         "hoodie.datasource.write.hive_style_partitioning": "true",
         "hoodie.datasource.hive_sync.enable": "true",
         "hoodie.datasource.hive_sync.table": "patient_flfinal",
         "hoodie.datasource.write.streaming.retry.count": 3,
         "hoodie.datasource.write.streaming.retry.interval.ms": 2000,
         "hoodie.datasource.write.streaming.ignore.failed.batch": "false",
         "hoodie.metadata.enable": "true",
         "hoodie.upsert.shuffle.parallelism": 12,
         "hoodie.insert.shuffle.parallelism": 12,
         "hoodie.consistency.check.enabled": "false",
         "hoodie.index.type": "BLOOM",
         "hoodie.bloom.index.filter.type": "DYNAMIC_V0",
         "hoodie.index.bloom.num_entries": 60000,
         "hoodie.index.bloom.fpp": 1e-09,
         "hoodie.parquet.max.file.size": "134217728",
         "hoodie.parquet.block.size": "134217728",
         "hoodie.parquet.page.size": "1048576",
         "hoodie.cleaner.commits.retained": 1,
         "hoodie.keep.min.commits": 2,
         "hoodie.compact.inline": "true",
         "hoodie.compact.inline.max.delta.commits": 10
       }
   
   
   **Expected behavior**
   Job continuously runs and ingest data from configured kafka topic to 
non-partitioned Hudi MOR table in near-real-time
   
   **Environment Description**
   
   * Hudi version :
   
   * Spark version : 0.9.0-amzn-1
   
   * Hive version : 3.1.2
   
   * Hadoop version : 3.2.1
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : No
   
   * Running on AWS EMR? (yes/no) : Yes
   * EMR Version: 6.5.0
   
   **Additional context**
   
   
   **Stacktrace**
   
   ``` 
{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminatedEvent","id":"72800a7d-3043-4b0f-aeea-ff2fbdb626fe","runId":"158a904e-f9ba-4596-9ce9-a10339790e29","exception":"py4j.Py4JException:
 An exception was raised by the Python Proxy. Return Message: Traceback (most 
recent call last):\n  File 
\"/mnt3/yarn/usercache/hadoop/appcache/application_1643390814970_0010/container_1643390814970_0010_02_000001/py4j-0.10.9-src.zip/py4j/java_gateway.py\",
 line 2442, in _call_proxy\n    return_value = getattr(self.pool[obj_id], 
method)(*params)\n  File 
\"/mnt3/yarn/usercache/hadoop/appcache/application_1643390814970_0010/container_1643390814970_0010_02_000001/pyspark.zip/pyspark/sql/utils.py\",
 line 196, in call\n    raise e\n  File 
\"/mnt3/yarn/usercache/hadoop/appcache/application_1643390814970_0010/container_1643390814970_0010_02_000001/pyspark.zip/pyspark/sql/utils.py\",
 line 193, in call\n    self.func(DataFrame(jdf, self.sql_ctx), batch_id)\n  
File \"/mnt3/yarn/use
 
rcache/hadoop/appcache/application_1643390814970_0010/container_1643390814970_0010_02_000001/sparketlframework.zip/sparketlframework/streaming_etl.py\",
 line 109, in <lambda>\n    output_df,\n  File 
\"/mnt3/yarn/usercache/hadoop/appcache/application_1643390814970_0010/container_1643390814970_0010_02_000001/sparketlframework.zip/sparketlframework/streaming_etl.py\",
 line 135, in _batch_write\n    super()._save(sink_config, 
output_partition_cols, output_df)\n  File 
\"/mnt3/yarn/usercache/hadoop/appcache/application_1643390814970_0010/container_1643390814970_0010_02_000001/sparketlframework.zip/sparketlframework/etl.py\",
 line 196, in _save\n    .save(sink_config[\"output_path\"])\n  File 
\"/mnt3/yarn/usercache/hadoop/appcache/application_1643390814970_0010/container_1643390814970_0010_02_000001/pyspark.zip/pyspark/sql/readwriter.py\",
 line 1109, in save\n    self._jwrite.save(path)\n  File 
\"/mnt3/yarn/usercache/hadoop/appcache/application_1643390814970_0010/container_1643390814970_00
 10_02_000001/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1305, in 
__call__\n    answer, self.gateway_client, self.target_id, self.name)\n  File 
\"/mnt3/yarn/usercache/hadoop/appcache/application_1643390814970_0010/container_1643390814970_0010_02_000001/pyspark.zip/pyspark/sql/utils.py\",
 line 111, in deco\n    return f(*a, **kw)\n  File 
\"/mnt3/yarn/usercache/hadoop/appcache/application_1643390814970_0010/container_1643390814970_0010_02_000001/py4j-0.10.9-src.zip/py4j/protocol.py\",
 line 328, in get_return_value\n    format(target_id, \".\", name), 
value)\npy4j.protocol.Py4JJavaError: An error occurred while calling 
o1190.save.\n: org.apache.hudi.exception.HoodieUpsertException: Failed to 
upsert for commit time 20220129135141\n\tat 
org.apache.hudi.table.action.commit.AbstractWriteHelper.write(AbstractWriteHelper.java:62)\n\tat
 
org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitActionExecutor.execute(SparkUpsertDeltaCommitActionExecutor.java:46)\n\tat
 org.apache.hudi
 
.table.HoodieSparkMergeOnReadTable.upsert(HoodieSparkMergeOnReadTable.java:82)\n\tat
 
org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsert(HoodieSparkMergeOnReadTable.java:74)\n\tat
 
org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:157)\n\tat
 
org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:214)\n\tat
 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:265)\n\tat
 org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:169)\n\tat 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)\n\tat
 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)\n\tat
 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)\n\tat
 
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)\n\tat
 org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(Spark
 Plan.scala:194)\n\tat 
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232)\n\tat
 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n\tat
 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229)\n\tat
 org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:190)\n\tat 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)\n\tat
 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)\n\tat
 
org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)\n\tat
 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)\n\tat
 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)\n\tat
 
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)\n\tat
 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.sca
 la:135)\n\tat 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)\n\tat
 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)\n\tat
 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)\n\tat
 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)\n\tat
 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)\n\tat
 org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)\n\tat 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)\n\tat
 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)\n\tat
 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)\n\tat
 
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)\n\tat
 org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)\n
 \tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat
 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat
 java.lang.reflect.Method.invoke(Method.java:498)\n\tat 
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat 
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\n\tat 
py4j.Gateway.invoke(Gateway.java:282)\n\tat 
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat 
py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat 
py4j.GatewayConnection.run(GatewayConnection.java:238)\n\tat 
java.lang.Thread.run(Thread.java:750)\nCaused by: 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 7.0 failed 4 times, most recent failure: Lost task 0.3 in stage 7.0 (TID 
40) (ip-172-23-10-119.ec2.internal executor 2): 
org.apache.hudi.exception.HoodieMetadataException: Failed to
  retrieve files in partition 
s3://cf-s3-649a93ec-2b5f-42ff-9f5e-a64b2035/patient_flfinal/data from 
metadata\n\tat 
org.apache.hudi.metadata.BaseTableMetadata.getAllFilesInPartition(BaseTableMetadata.java:128)\n\tat
 
org.apache.hudi.metadata.HoodieMetadataFileSystemView.listPartition(HoodieMetadataFileSystemView.java:66)\n\tat
 
org.apache.hudi.common.table.view.AbstractTableFileSystemView.lambda$ensurePartitionLoadedCorrectly$9(AbstractTableFileSystemView.java:292)\n\tat
 
java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)\n\tat
 
org.apache.hudi.common.table.view.AbstractTableFileSystemView.ensurePartitionLoadedCorrectly(AbstractTableFileSystemView.java:281)\n\tat
 
org.apache.hudi.common.table.view.AbstractTableFileSystemView.getLatestBaseFilesBeforeOrOn(AbstractTableFileSystemView.java:467)\n\tat
 
org.apache.hudi.common.table.view.PriorityBasedFileSystemView.execute(PriorityBasedFileSystemView.java:101)\n\tat
 org.apache.hudi.common.table.view.PriorityBasedFi
 
leSystemView.getLatestBaseFilesBeforeOrOn(PriorityBasedFileSystemView.java:134)\n\tat
 
org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForPartition(HoodieIndexUtils.java:54)\n\tat
 
org.apache.hudi.index.HoodieIndexUtils.lambda$getLatestBaseFilesForAllPartitions$ff6885d8$1(HoodieIndexUtils.java:74)\n\tat
 
org.apache.hudi.client.common.HoodieSparkEngineContext.lambda$flatMap$7d470b86$1(HoodieSparkEngineContext.java:78)\n\tat
 
org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125)\n\tat
 scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)\n\tat 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)\n\tat 
scala.collection.Iterator.foreach(Iterator.scala:941)\n\tat 
scala.collection.Iterator.foreach$(Iterator.scala:941)\n\tat 
scala.collection.AbstractIterator.foreach(Iterator.scala:1429)\n\tat 
scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)\n\tat 
scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)\n\tat sca
 la.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)\n\tat 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)\n\tat 
scala.collection.TraversableOnce.to(TraversableOnce.scala:315)\n\tat 
scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)\n\tat 
scala.collection.AbstractIterator.to(Iterator.scala:1429)\n\tat 
scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)\n\tat 
scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)\n\tat 
scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429)\n\tat 
scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)\n\tat 
scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)\n\tat 
scala.collection.AbstractIterator.toArray(Iterator.scala:1429)\n\tat 
org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1030)\n\tat 
org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2281)\n\tat 
org.apache.spark.scheduler.ResultTask.runTask(ResultTa
 sk.scala:90)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:131)\n\tat 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)\n\tat
 org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)\n\tat 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)\n\tat 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
 java.lang.Thread.run(Thread.java:750)\nCaused by: 
java.lang.IllegalArgumentException: must be at-least one valid metadata file 
slice\n\tat 
org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:40)\n\tat
 
org.apache.hudi.metadata.HoodieBackedTableMetadata.openReadersIfNeeded(HoodieBackedTableMetadata.java:197)\n\tat
 
org.apache.hudi.metadata.HoodieBackedTableMetadata.openReadersIfNeededOrThrow(HoodieBackedTableMetadata.java:177)\n\tat
 org.apache.hudi.metadata.HoodieBackedTableMetadata.g
 etRecordByKeyFromMetadata(HoodieBackedTableMetadata.java:129)\n\tat 
org.apache.hudi.metadata.BaseTableMetadata.getMergedRecordByKey(BaseTableMetadata.java:280)\n\tat
 
org.apache.hudi.metadata.BaseTableMetadata.fetchAllFilesInPartition(BaseTableMetadata.java:217)\n\tat
 
org.apache.hudi.metadata.BaseTableMetadata.getAllFilesInPartition(BaseTableMetadata.java:126)\n\t...
 39 more\n\nDriver stacktrace:\n\tat 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2470)\n\tat
 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2419)\n\tat
 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2418)\n\tat
 scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)\n\tat 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)\n\tat 
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)\n\tat 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.sca
 la:2418)\n\tat 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1125)\n\tat
 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1125)\n\tat
 scala.Option.foreach(Option.scala:407)\n\tat 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1125)\n\tat
 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2684)\n\tat
 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2626)\n\tat
 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2615)\n\tat
 org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)\n\tat 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:914)\n\tat 
org.apache.spark.SparkContext.runJob(SparkContext.scala:2241)\n\tat 
org.apache.spark.SparkContext.runJob(SparkContext.scala:2262)\n\tat 
org.apache.spark.SparkContext.runJob(SparkContext.scala:22
 81)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2306)\n\tat 
org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)\n\tat 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n\tat
 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)\n\tat
 org.apache.spark.rdd.RDD.withScope(RDD.scala:414)\n\tat 
org.apache.spark.rdd.RDD.collect(RDD.scala:1029)\n\tat 
org.apache.spark.api.java.JavaRDDLike.collect(JavaRDDLike.scala:362)\n\tat 
org.apache.spark.api.java.JavaRDDLike.collect$(JavaRDDLike.scala:361)\n\tat 
org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45)\n\tat
 
org.apache.hudi.client.common.HoodieSparkEngineContext.flatMap(HoodieSparkEngineContext.java:78)\n\tat
 
org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPartitions(HoodieIndexUtils.java:72)\n\tat
 
org.apache.hudi.index.bloom.SparkHoodieBloomIndex.loadInvolvedFiles(SparkHoodieBloomIndex.java:169)\n\tat
 org.apache.hudi.index.bl
 oom.SparkHoodieBloomIndex.lookupIndex(SparkHoodieBloomIndex.java:119)\n\tat 
org.apache.hudi.index.bloom.SparkHoodieBloomIndex.tagLocation(SparkHoodieBloomIndex.java:84)\n\tat
 
org.apache.hudi.index.bloom.SparkHoodieBloomIndex.tagLocation(SparkHoodieBloomIndex.java:60)\n\tat
 
org.apache.hudi.table.action.commit.AbstractWriteHelper.tag(AbstractWriteHelper.java:69)\n\tat
 
org.apache.hudi.table.action.commit.AbstractWriteHelper.write(AbstractWriteHelper.java:51)\n\t...
 45 more\nCaused by: org.apache.hudi.exception.HoodieMetadataException: Failed 
to retrieve files in partition 
s3://cf-s3-649a93ec-2b5f-42ff-9f5e-a64b20315/patient_flfinal/data from 
metadata\n\tat 
org.apache.hudi.metadata.BaseTableMetadata.getAllFilesInPartition(BaseTableMetadata.java:128)\n\tat
 
org.apache.hudi.metadata.HoodieMetadataFileSystemView.listPartition(HoodieMetadataFileSystemView.java:66)\n\tat
 
org.apache.hudi.common.table.view.AbstractTableFileSystemView.lambda$ensurePartitionLoadedCorrectly$9(AbstractTableFileSyst
 emView.java:292)\n\tat 
java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)\n\tat
 
org.apache.hudi.common.table.view.AbstractTableFileSystemView.ensurePartitionLoadedCorrectly(AbstractTableFileSystemView.java:281)\n\tat
 
org.apache.hudi.common.table.view.AbstractTableFileSystemView.getLatestBaseFilesBeforeOrOn(AbstractTableFileSystemView.java:467)\n\tat
 
org.apache.hudi.common.table.view.PriorityBasedFileSystemView.execute(PriorityBasedFileSystemView.java:101)\n\tat
 
org.apache.hudi.common.table.view.PriorityBasedFileSystemView.getLatestBaseFilesBeforeOrOn(PriorityBasedFileSystemView.java:134)\n\tat
 
org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForPartition(HoodieIndexUtils.java:54)\n\tat
 
org.apache.hudi.index.HoodieIndexUtils.lambda$getLatestBaseFilesForAllPartitions$ff6885d8$1(HoodieIndexUtils.java:74)\n\tat
 
org.apache.hudi.client.common.HoodieSparkEngineContext.lambda$flatMap$7d470b86$1(HoodieSparkEngineContext.java:78)\n\tat
 org.apache.spark
 .api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125)\n\tat 
scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)\n\tat 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)\n\tat 
scala.collection.Iterator.foreach(Iterator.scala:941)\n\tat 
scala.collection.Iterator.foreach$(Iterator.scala:941)\n\tat 
scala.collection.AbstractIterator.foreach(Iterator.scala:1429)\n\tat 
scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)\n\tat 
scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)\n\tat 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)\n\tat 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)\n\tat 
scala.collection.TraversableOnce.to(TraversableOnce.scala:315)\n\tat 
scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)\n\tat 
scala.collection.AbstractIterator.to(Iterator.scala:1429)\n\tat 
scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)\n\tat 
scala.collectio
 n.TraversableOnce.toBuffer$(TraversableOnce.scala:307)\n\tat 
scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429)\n\tat 
scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)\n\tat 
scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)\n\tat 
scala.collection.AbstractIterator.toArray(Iterator.scala:1429)\n\tat 
org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1030)\n\tat 
org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2281)\n\tat 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)\n\tat 
org.apache.spark.scheduler.Task.run(Task.scala:131)\n\tat 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)\n\tat
 org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)\n\tat 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)\n\tat 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
 java.util.concurrent.ThreadPoolExecutor$Worker.run(
 ThreadPoolExecutor.java:624)\n\t... 1 more\nCaused by: 
java.lang.IllegalArgumentException: must be at-least one valid metadata file 
slice\n\tat 
org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:40)\n\tat
 
org.apache.hudi.metadata.HoodieBackedTableMetadata.openReadersIfNeeded(HoodieBackedTableMetadata.java:197)\n\tat
 
org.apache.hudi.metadata.HoodieBackedTableMetadata.openReadersIfNeededOrThrow(HoodieBackedTableMetadata.java:177)\n\tat
 
org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordByKeyFromMetadata(HoodieBackedTableMetadata.java:129)\n\tat
 
org.apache.hudi.metadata.BaseTableMetadata.getMergedRecordByKey(BaseTableMetadata.java:280)\n\tat
 
org.apache.hudi.metadata.BaseTableMetadata.fetchAllFilesInPartition(BaseTableMetadata.java:217)\n\tat
 
org.apache.hudi.metadata.BaseTableMetadata.getAllFilesInPartition(BaseTableMetadata.java:126)\n\t...
 39 more\n\n\n\tat py4j.Protocol.getReturnValue(Protocol.java:476)\n\tat 
py4j.reflection.PythonProxyHandler.inv
 oke(PythonProxyHandler.java:108)\n\tat com.sun.proxy.$Proxy33.call(Unknown 
Source)\n\tat 
org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1(ForeachBatchSink.scala:55)\n\tat
 
org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1$adapted(ForeachBatchSink.scala:55)\n\tat
 
org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)\n\tat
 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:586)\n\tat
 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)\n\tat
 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)\n\tat
 
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)\n\tat
 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)\n\tat
 org.apache.spark.
 
sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)\n\tat
 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)\n\tat
 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)\n\tat
 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)\n\tat
 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)\n\tat
 org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)\n\tat 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)\n\tat
 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:584)\n\tat
 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)\n\tat
 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)\n\tat
 org.apache.spa
 
rk.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)\n\tat
 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:584)\n\tat
 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:226)\n\tat
 scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)\n\tat 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)\n\tat
 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)\n\tat
 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)\n\tat
 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)\n\tat
 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)\n\tat
 org.apache.spark.sql.execution.streaming
 .MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)\n\tat 
org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:334)\n\tat
 scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)\n\tat 
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)\n\tat 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317)\n\tat
 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)\n"}```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to