FelixKJose opened a new issue #4719:
URL: https://github.com/apache/hudi/issues/4719
I am doing some volume testing for my data ingestion pipeline. My data
ingestion pipeline comprises of Kafka for streaming, Spark Structured Streaming
(continuous mode) + Hudi for ingesting data from Kafka to AWS S3. Spark job is
running on AWS EMR -6.5.0. The Hudi table is a non-partitioned table and
metadata is enabled. Spark job continuously failing with
HoodieMetadataException after runs for a certain time.
Note: Hudi table is MOR; Hive sync is enabled and external hive meta store
is MySQL.
Exception Stacktrace:
My hoodie config:
> {
"hoodie.table.name": "patient_flfinal",
"hoodie.datasource.write.table.type": "MERGE_ON_READ",
"hoodie.datasource.write.operation": "upsert",
"hoodie.datasource.write.recordkey.field":
"originatingSystemIdentifier_value",
"hoodie.datasource.write.precombine.field": "eventDateTime",
"hoodie.payload.ordering.field": "eventDateTime",
"hoodie.datasource.write.payload.class":
"org.apache.hudi.common.model.DefaultHoodieRecordPayload",
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.datasource.hive_sync.enable": "true",
"hoodie.datasource.hive_sync.table": "patient_flfinal",
"hoodie.datasource.write.streaming.retry.count": 3,
"hoodie.datasource.write.streaming.retry.interval.ms": 2000,
"hoodie.datasource.write.streaming.ignore.failed.batch": "false",
"hoodie.metadata.enable": "true",
"hoodie.upsert.shuffle.parallelism": 12,
"hoodie.insert.shuffle.parallelism": 12,
"hoodie.consistency.check.enabled": "false",
"hoodie.index.type": "BLOOM",
"hoodie.bloom.index.filter.type": "DYNAMIC_V0",
"hoodie.index.bloom.num_entries": 60000,
"hoodie.index.bloom.fpp": 1e-09,
"hoodie.parquet.max.file.size": "134217728",
"hoodie.parquet.block.size": "134217728",
"hoodie.parquet.page.size": "1048576",
"hoodie.cleaner.commits.retained": 1,
"hoodie.keep.min.commits": 2,
"hoodie.compact.inline": "true",
"hoodie.compact.inline.max.delta.commits": 10
}
**Expected behavior**
Job continuously runs and ingest data from configured kafka topic to
non-partitioned Hudi MOR table in near-real-time
**Environment Description**
* Hudi version :
* Spark version : 0.9.0-amzn-1
* Hive version : 3.1.2
* Hadoop version : 3.2.1
* Storage (HDFS/S3/GCS..) : S3
* Running on Docker? (yes/no) : No
* Running on AWS EMR? (yes/no) : Yes
* EMR Version: 6.5.0
**Additional context**
**Stacktrace**
```
{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminatedEvent","id":"72800a7d-3043-4b0f-aeea-ff2fbdb626fe","runId":"158a904e-f9ba-4596-9ce9-a10339790e29","exception":"py4j.Py4JException:
An exception was raised by the Python Proxy. Return Message: Traceback (most
recent call last):\n File
\"/mnt3/yarn/usercache/hadoop/appcache/application_1643390814970_0010/container_1643390814970_0010_02_000001/py4j-0.10.9-src.zip/py4j/java_gateway.py\",
line 2442, in _call_proxy\n return_value = getattr(self.pool[obj_id],
method)(*params)\n File
\"/mnt3/yarn/usercache/hadoop/appcache/application_1643390814970_0010/container_1643390814970_0010_02_000001/pyspark.zip/pyspark/sql/utils.py\",
line 196, in call\n raise e\n File
\"/mnt3/yarn/usercache/hadoop/appcache/application_1643390814970_0010/container_1643390814970_0010_02_000001/pyspark.zip/pyspark/sql/utils.py\",
line 193, in call\n self.func(DataFrame(jdf, self.sql_ctx), batch_id)\n
File \"/mnt3/yarn/use
rcache/hadoop/appcache/application_1643390814970_0010/container_1643390814970_0010_02_000001/sparketlframework.zip/sparketlframework/streaming_etl.py\",
line 109, in <lambda>\n output_df,\n File
\"/mnt3/yarn/usercache/hadoop/appcache/application_1643390814970_0010/container_1643390814970_0010_02_000001/sparketlframework.zip/sparketlframework/streaming_etl.py\",
line 135, in _batch_write\n super()._save(sink_config,
output_partition_cols, output_df)\n File
\"/mnt3/yarn/usercache/hadoop/appcache/application_1643390814970_0010/container_1643390814970_0010_02_000001/sparketlframework.zip/sparketlframework/etl.py\",
line 196, in _save\n .save(sink_config[\"output_path\"])\n File
\"/mnt3/yarn/usercache/hadoop/appcache/application_1643390814970_0010/container_1643390814970_0010_02_000001/pyspark.zip/pyspark/sql/readwriter.py\",
line 1109, in save\n self._jwrite.save(path)\n File
\"/mnt3/yarn/usercache/hadoop/appcache/application_1643390814970_0010/container_1643390814970_00
10_02_000001/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1305, in
__call__\n answer, self.gateway_client, self.target_id, self.name)\n File
\"/mnt3/yarn/usercache/hadoop/appcache/application_1643390814970_0010/container_1643390814970_0010_02_000001/pyspark.zip/pyspark/sql/utils.py\",
line 111, in deco\n return f(*a, **kw)\n File
\"/mnt3/yarn/usercache/hadoop/appcache/application_1643390814970_0010/container_1643390814970_0010_02_000001/py4j-0.10.9-src.zip/py4j/protocol.py\",
line 328, in get_return_value\n format(target_id, \".\", name),
value)\npy4j.protocol.Py4JJavaError: An error occurred while calling
o1190.save.\n: org.apache.hudi.exception.HoodieUpsertException: Failed to
upsert for commit time 20220129135141\n\tat
org.apache.hudi.table.action.commit.AbstractWriteHelper.write(AbstractWriteHelper.java:62)\n\tat
org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitActionExecutor.execute(SparkUpsertDeltaCommitActionExecutor.java:46)\n\tat
org.apache.hudi
.table.HoodieSparkMergeOnReadTable.upsert(HoodieSparkMergeOnReadTable.java:82)\n\tat
org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsert(HoodieSparkMergeOnReadTable.java:74)\n\tat
org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:157)\n\tat
org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:214)\n\tat
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:265)\n\tat
org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:169)\n\tat
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)\n\tat
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)\n\tat
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)\n\tat
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)\n\tat
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(Spark
Plan.scala:194)\n\tat
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232)\n\tat
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n\tat
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229)\n\tat
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:190)\n\tat
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)\n\tat
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)\n\tat
org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)\n\tat
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)\n\tat
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)\n\tat
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)\n\tat
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.sca
la:135)\n\tat
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)\n\tat
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)\n\tat
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)\n\tat
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)\n\tat
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)\n\tat
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)\n\tat
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)\n\tat
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)\n\tat
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)\n\tat
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)\n\tat
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)\n
\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat
java.lang.reflect.Method.invoke(Method.java:498)\n\tat
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\n\tat
py4j.Gateway.invoke(Gateway.java:282)\n\tat
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat
py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat
py4j.GatewayConnection.run(GatewayConnection.java:238)\n\tat
java.lang.Thread.run(Thread.java:750)\nCaused by:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
stage 7.0 failed 4 times, most recent failure: Lost task 0.3 in stage 7.0 (TID
40) (ip-172-23-10-119.ec2.internal executor 2):
org.apache.hudi.exception.HoodieMetadataException: Failed to
retrieve files in partition
s3://cf-s3-649a93ec-2b5f-42ff-9f5e-a64b2035/patient_flfinal/data from
metadata\n\tat
org.apache.hudi.metadata.BaseTableMetadata.getAllFilesInPartition(BaseTableMetadata.java:128)\n\tat
org.apache.hudi.metadata.HoodieMetadataFileSystemView.listPartition(HoodieMetadataFileSystemView.java:66)\n\tat
org.apache.hudi.common.table.view.AbstractTableFileSystemView.lambda$ensurePartitionLoadedCorrectly$9(AbstractTableFileSystemView.java:292)\n\tat
java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)\n\tat
org.apache.hudi.common.table.view.AbstractTableFileSystemView.ensurePartitionLoadedCorrectly(AbstractTableFileSystemView.java:281)\n\tat
org.apache.hudi.common.table.view.AbstractTableFileSystemView.getLatestBaseFilesBeforeOrOn(AbstractTableFileSystemView.java:467)\n\tat
org.apache.hudi.common.table.view.PriorityBasedFileSystemView.execute(PriorityBasedFileSystemView.java:101)\n\tat
org.apache.hudi.common.table.view.PriorityBasedFi
leSystemView.getLatestBaseFilesBeforeOrOn(PriorityBasedFileSystemView.java:134)\n\tat
org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForPartition(HoodieIndexUtils.java:54)\n\tat
org.apache.hudi.index.HoodieIndexUtils.lambda$getLatestBaseFilesForAllPartitions$ff6885d8$1(HoodieIndexUtils.java:74)\n\tat
org.apache.hudi.client.common.HoodieSparkEngineContext.lambda$flatMap$7d470b86$1(HoodieSparkEngineContext.java:78)\n\tat
org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125)\n\tat
scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)\n\tat
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)\n\tat
scala.collection.Iterator.foreach(Iterator.scala:941)\n\tat
scala.collection.Iterator.foreach$(Iterator.scala:941)\n\tat
scala.collection.AbstractIterator.foreach(Iterator.scala:1429)\n\tat
scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)\n\tat
scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)\n\tat sca
la.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)\n\tat
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)\n\tat
scala.collection.TraversableOnce.to(TraversableOnce.scala:315)\n\tat
scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)\n\tat
scala.collection.AbstractIterator.to(Iterator.scala:1429)\n\tat
scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)\n\tat
scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)\n\tat
scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429)\n\tat
scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)\n\tat
scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)\n\tat
scala.collection.AbstractIterator.toArray(Iterator.scala:1429)\n\tat
org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1030)\n\tat
org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2281)\n\tat
org.apache.spark.scheduler.ResultTask.runTask(ResultTa
sk.scala:90)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:131)\n\tat
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)\n\tat
org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)\n\tat
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)\n\tat
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
java.lang.Thread.run(Thread.java:750)\nCaused by:
java.lang.IllegalArgumentException: must be at-least one valid metadata file
slice\n\tat
org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:40)\n\tat
org.apache.hudi.metadata.HoodieBackedTableMetadata.openReadersIfNeeded(HoodieBackedTableMetadata.java:197)\n\tat
org.apache.hudi.metadata.HoodieBackedTableMetadata.openReadersIfNeededOrThrow(HoodieBackedTableMetadata.java:177)\n\tat
org.apache.hudi.metadata.HoodieBackedTableMetadata.g
etRecordByKeyFromMetadata(HoodieBackedTableMetadata.java:129)\n\tat
org.apache.hudi.metadata.BaseTableMetadata.getMergedRecordByKey(BaseTableMetadata.java:280)\n\tat
org.apache.hudi.metadata.BaseTableMetadata.fetchAllFilesInPartition(BaseTableMetadata.java:217)\n\tat
org.apache.hudi.metadata.BaseTableMetadata.getAllFilesInPartition(BaseTableMetadata.java:126)\n\t...
39 more\n\nDriver stacktrace:\n\tat
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2470)\n\tat
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2419)\n\tat
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2418)\n\tat
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)\n\tat
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)\n\tat
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)\n\tat
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.sca
la:2418)\n\tat
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1125)\n\tat
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1125)\n\tat
scala.Option.foreach(Option.scala:407)\n\tat
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1125)\n\tat
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2684)\n\tat
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2626)\n\tat
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2615)\n\tat
org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)\n\tat
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:914)\n\tat
org.apache.spark.SparkContext.runJob(SparkContext.scala:2241)\n\tat
org.apache.spark.SparkContext.runJob(SparkContext.scala:2262)\n\tat
org.apache.spark.SparkContext.runJob(SparkContext.scala:22
81)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2306)\n\tat
org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)\n\tat
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n\tat
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)\n\tat
org.apache.spark.rdd.RDD.withScope(RDD.scala:414)\n\tat
org.apache.spark.rdd.RDD.collect(RDD.scala:1029)\n\tat
org.apache.spark.api.java.JavaRDDLike.collect(JavaRDDLike.scala:362)\n\tat
org.apache.spark.api.java.JavaRDDLike.collect$(JavaRDDLike.scala:361)\n\tat
org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45)\n\tat
org.apache.hudi.client.common.HoodieSparkEngineContext.flatMap(HoodieSparkEngineContext.java:78)\n\tat
org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPartitions(HoodieIndexUtils.java:72)\n\tat
org.apache.hudi.index.bloom.SparkHoodieBloomIndex.loadInvolvedFiles(SparkHoodieBloomIndex.java:169)\n\tat
org.apache.hudi.index.bl
oom.SparkHoodieBloomIndex.lookupIndex(SparkHoodieBloomIndex.java:119)\n\tat
org.apache.hudi.index.bloom.SparkHoodieBloomIndex.tagLocation(SparkHoodieBloomIndex.java:84)\n\tat
org.apache.hudi.index.bloom.SparkHoodieBloomIndex.tagLocation(SparkHoodieBloomIndex.java:60)\n\tat
org.apache.hudi.table.action.commit.AbstractWriteHelper.tag(AbstractWriteHelper.java:69)\n\tat
org.apache.hudi.table.action.commit.AbstractWriteHelper.write(AbstractWriteHelper.java:51)\n\t...
45 more\nCaused by: org.apache.hudi.exception.HoodieMetadataException: Failed
to retrieve files in partition
s3://cf-s3-649a93ec-2b5f-42ff-9f5e-a64b20315/patient_flfinal/data from
metadata\n\tat
org.apache.hudi.metadata.BaseTableMetadata.getAllFilesInPartition(BaseTableMetadata.java:128)\n\tat
org.apache.hudi.metadata.HoodieMetadataFileSystemView.listPartition(HoodieMetadataFileSystemView.java:66)\n\tat
org.apache.hudi.common.table.view.AbstractTableFileSystemView.lambda$ensurePartitionLoadedCorrectly$9(AbstractTableFileSyst
emView.java:292)\n\tat
java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)\n\tat
org.apache.hudi.common.table.view.AbstractTableFileSystemView.ensurePartitionLoadedCorrectly(AbstractTableFileSystemView.java:281)\n\tat
org.apache.hudi.common.table.view.AbstractTableFileSystemView.getLatestBaseFilesBeforeOrOn(AbstractTableFileSystemView.java:467)\n\tat
org.apache.hudi.common.table.view.PriorityBasedFileSystemView.execute(PriorityBasedFileSystemView.java:101)\n\tat
org.apache.hudi.common.table.view.PriorityBasedFileSystemView.getLatestBaseFilesBeforeOrOn(PriorityBasedFileSystemView.java:134)\n\tat
org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForPartition(HoodieIndexUtils.java:54)\n\tat
org.apache.hudi.index.HoodieIndexUtils.lambda$getLatestBaseFilesForAllPartitions$ff6885d8$1(HoodieIndexUtils.java:74)\n\tat
org.apache.hudi.client.common.HoodieSparkEngineContext.lambda$flatMap$7d470b86$1(HoodieSparkEngineContext.java:78)\n\tat
org.apache.spark
.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125)\n\tat
scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)\n\tat
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)\n\tat
scala.collection.Iterator.foreach(Iterator.scala:941)\n\tat
scala.collection.Iterator.foreach$(Iterator.scala:941)\n\tat
scala.collection.AbstractIterator.foreach(Iterator.scala:1429)\n\tat
scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)\n\tat
scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)\n\tat
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)\n\tat
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)\n\tat
scala.collection.TraversableOnce.to(TraversableOnce.scala:315)\n\tat
scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)\n\tat
scala.collection.AbstractIterator.to(Iterator.scala:1429)\n\tat
scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)\n\tat
scala.collectio
n.TraversableOnce.toBuffer$(TraversableOnce.scala:307)\n\tat
scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429)\n\tat
scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)\n\tat
scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)\n\tat
scala.collection.AbstractIterator.toArray(Iterator.scala:1429)\n\tat
org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1030)\n\tat
org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2281)\n\tat
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)\n\tat
org.apache.spark.scheduler.Task.run(Task.scala:131)\n\tat
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)\n\tat
org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)\n\tat
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)\n\tat
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:624)\n\t... 1 more\nCaused by:
java.lang.IllegalArgumentException: must be at-least one valid metadata file
slice\n\tat
org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:40)\n\tat
org.apache.hudi.metadata.HoodieBackedTableMetadata.openReadersIfNeeded(HoodieBackedTableMetadata.java:197)\n\tat
org.apache.hudi.metadata.HoodieBackedTableMetadata.openReadersIfNeededOrThrow(HoodieBackedTableMetadata.java:177)\n\tat
org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordByKeyFromMetadata(HoodieBackedTableMetadata.java:129)\n\tat
org.apache.hudi.metadata.BaseTableMetadata.getMergedRecordByKey(BaseTableMetadata.java:280)\n\tat
org.apache.hudi.metadata.BaseTableMetadata.fetchAllFilesInPartition(BaseTableMetadata.java:217)\n\tat
org.apache.hudi.metadata.BaseTableMetadata.getAllFilesInPartition(BaseTableMetadata.java:126)\n\t...
39 more\n\n\n\tat py4j.Protocol.getReturnValue(Protocol.java:476)\n\tat
py4j.reflection.PythonProxyHandler.inv
oke(PythonProxyHandler.java:108)\n\tat com.sun.proxy.$Proxy33.call(Unknown
Source)\n\tat
org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1(ForeachBatchSink.scala:55)\n\tat
org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1$adapted(ForeachBatchSink.scala:55)\n\tat
org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)\n\tat
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:586)\n\tat
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)\n\tat
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)\n\tat
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)\n\tat
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)\n\tat
org.apache.spark.
sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)\n\tat
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)\n\tat
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)\n\tat
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)\n\tat
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)\n\tat
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)\n\tat
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)\n\tat
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:584)\n\tat
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)\n\tat
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)\n\tat
org.apache.spa
rk.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)\n\tat
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:584)\n\tat
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:226)\n\tat
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)\n\tat
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)\n\tat
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)\n\tat
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)\n\tat
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)\n\tat
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)\n\tat
org.apache.spark.sql.execution.streaming
.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)\n\tat
org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:334)\n\tat
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)\n\tat
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)\n\tat
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317)\n\tat
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)\n"}```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]