yhyyz opened a new issue, #6900:
URL: https://github.com/apache/hudi/issues/6900
#### Environement
Hudi 0.11.0
Spark 3.2.1
#### Job Info
Hudi table type: MOR
Spark Structured Streaming foreachBatch multiple task write different hudi
table。An error occurs after the job runs for a few hours。
#### code
```scala
object Debezium2Hudi {
case class TableInfoList(tableInfo: List[TableInfo])
private val log = LoggerFactory.getLogger("debezium2hudi")
def main(args: Array[String]): Unit = {
log.info(args.mkString)
// Set log4j level to warn
Logger.getLogger("org").setLevel(Level.WARN)
// System.setProperty("HADOOP_USER_NAME", "hadoop")
val params = Config.parseConfig(Debezium2Hudi, args)
val tableInfoList = JsonUtil.mapper.readValue(params.tableInfoJson,
classOf[TableInfoList])
// init spark session
val ss = SparkHelper.getSparkSession(params.env)
import ss.implicits._
val df = ss
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", params.brokerList)
.option("subscribe", params.sourceTopic)
.option("startingOffsets", params.startPos)
.option("failOnDataLoss", false)
.option("maxOffsetsPerTrigger",params.maxOffset.toLong)
.option("kafka.consumer.commit.groupid", params.consumerGroup)
.load()
.repartition(Integer.valueOf(params.partitionNum))
ss.streams.addListener(new StreamingQueryListener {
override def onQueryStarted(event:
StreamingQueryListener.QueryStartedEvent): Unit = log.debug(s"QueryStarted [id
= ${event.id}, name = ${event.name}, runId = ${event.runId}]")
override def onQueryProgress(event:
StreamingQueryListener.QueryProgressEvent): Unit = log.warn(s"QueryProgress
${event.progress}")
override def onQueryTerminated(event:
StreamingQueryListener.QueryTerminatedEvent): Unit =
log.debug(s"QueryTerminated [id = ${event.id}, runId = ${event.runId}, error =
${event.exception}]")
})
val listener = new KafkaOffsetCommitterListener()
ss.streams.addListener(listener)
val pool = Executors.newFixedThreadPool(50)
implicit val xc: ExecutionContextExecutor =
ExecutionContext.fromExecutor(pool)
val partitionFormat: (String => String) = (arg: String) => {
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss'Z'")
val parFormatter = DateTimeFormatter.ofPattern("yyyyMM")
parFormatter.format(formatter.parse(arg))
}
val sqlPartitionFunc = udf(partitionFormat)
val ds = df.selectExpr("CAST(value AS STRING)").as[String]
val query = ds
.writeStream
.queryName("debezium2hudi")
.option("checkpointLocation", params.checkpointDir)
// if set 0, as fast as possible
.trigger(Trigger.ProcessingTime(params.trigger + " seconds"))
.foreachBatch { (batchDF: Dataset[String], batchId: Long) =>
log.warn("current batch: "+batchId.toString)
val newsDF = batchDF.map(cdc =>
DebeziumParser.apply().debezium2Hudi(cdc))
.filter(_ != null)
if (!newsDF.isEmpty) {
val tasks = Seq[Future[Unit]]()
for (tableInfo <- tableInfoList.tableInfo) {
val insertORUpsertDF = newsDF
.filter($"database" === tableInfo.database && $"table" ===
tableInfo.table)
.filter($"operationType" === HudiOP.UPSERT || $"operationType"
=== HudiOP.INSERT)
.select($"data".as("jsonData"))
if (!insertORUpsertDF.isEmpty) {
val json_schema =
ss.read.json(insertORUpsertDF.select("jsonData").as[String]).schema
val cdcDF = insertORUpsertDF.select(from_json($"jsonData",
json_schema).as("cdc_data"))
val cdcPartitionDF = cdcDF.select($"cdc_data.*")
.withColumn(tableInfo.hudiPartitionField,
sqlPartitionFunc(col(tableInfo.partitionTimeColumn)))
params.concurrent match {
case "true" => {
val runTask = HudiWriteTask.run(cdcPartitionDF, params,
tableInfo)(xc)
tasks :+ runTask
}
case _ => ....
}
}
}
if (params.concurrent == "true" && tasks.nonEmpty) {
Await.result(Future.sequence(tasks), Duration(60, MINUTES))
()
}
}
}.start()
query.awaitTermination()
}
}
```
#### ERROR
```
22/10/09 08:03:12 INFO ShuffleBlockFetcherIterator: Started 8 remote fetches
in 0 ms
22/10/09 08:03:12 INFO Executor: Finished task 0.0 in stage 25180.0 (TID
172844). 2512 bytes result sent to driver
22/10/09 08:03:12 ERROR Executor: Exception in task 0.0 in stage 25178.0
(TID 172843)
org.apache.hudi.exception.HoodieIOException: Failed to read MARKERS file
s3://app-util/hudi-bloom/opti-tmp-102/cdc_test_db/dhdata_02/.hoodie/.temp/20221009080301290/MARKERS0
at
org.apache.hudi.common.util.MarkerUtils.readMarkersFromFile(MarkerUtils.java:210)
at
org.apache.hudi.common.util.MarkerUtils.lambda$readTimelineServerBasedMarkersFromFileSystem$141c8e72$1(MarkerUtils.java:185)
at
org.apache.hudi.common.fs.FSUtils.lambda$parallelizeFilesProcess$1f9929d5$1(FSUtils.java:736)
at
org.apache.hudi.client.common.HoodieSparkEngineContext.lambda$mapToPair$786cea6a$1(HoodieSparkEngineContext.java:149)
at
org.apache.spark.api.java.JavaPairRDD$.$anonfun$pairFunToScalaFun$1(JavaPairRDD.scala:1073)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
at scala.collection.AbstractIterator.to(Iterator.scala:1431)
at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1030)
at
org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2255)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:133)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1474)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.io.FileNotFoundException: No such file or directory
's3://app-util/hudi-bloom/opti-tmp-102/cdc_test_db/dhdata_02/.hoodie/.temp/20221009080301290/MARKERS0'
at
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:521)
at
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.open(S3NativeFileSystem.java:932)
at
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.open(S3NativeFileSystem.java:924)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:906)
at
com.amazon.ws.emr.hadoop.fs.EmrFileSystem.open(EmrFileSystem.java:194)
at
org.apache.hudi.common.util.MarkerUtils.readMarkersFromFile(MarkerUtils.java:207)
... 31 more
22/10/09 08:03:13 INFO YarnCoarseGrainedExecutorBackend: Got assigned task
172858
22/10/09 08:03:13 INFO Executor: Running task 6.0 in stage 25185.0 (TID
172858)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]