yhyyz opened a new issue, #6900:
URL: https://github.com/apache/hudi/issues/6900

   #### Environement
   Hudi 0.11.0
   Spark 3.2.1
   
   #### Job Info
   Hudi table type: MOR
   Spark Structured Streaming foreachBatch multiple task write different hudi 
table。An error occurs after the job runs for a few hours。
   
   #### code
   ```scala
   object Debezium2Hudi {
   
     case class TableInfoList(tableInfo: List[TableInfo])
     private val log = LoggerFactory.getLogger("debezium2hudi")
     def main(args: Array[String]): Unit = {
       log.info(args.mkString)
       // Set log4j level to warn
       Logger.getLogger("org").setLevel(Level.WARN)
       //    System.setProperty("HADOOP_USER_NAME", "hadoop")
       val params = Config.parseConfig(Debezium2Hudi, args)
       val tableInfoList = JsonUtil.mapper.readValue(params.tableInfoJson, 
classOf[TableInfoList])
       // init spark session
       val ss = SparkHelper.getSparkSession(params.env)
       import ss.implicits._
       val df = ss
         .readStream
         .format("kafka")
         .option("kafka.bootstrap.servers", params.brokerList)
         .option("subscribe", params.sourceTopic)
         .option("startingOffsets", params.startPos)
         .option("failOnDataLoss", false)
         .option("maxOffsetsPerTrigger",params.maxOffset.toLong)
         .option("kafka.consumer.commit.groupid", params.consumerGroup)
         .load()
         .repartition(Integer.valueOf(params.partitionNum))
   
       ss.streams.addListener(new StreamingQueryListener {
         override def onQueryStarted(event: 
StreamingQueryListener.QueryStartedEvent): Unit = log.debug(s"QueryStarted [id 
= ${event.id}, name = ${event.name}, runId = ${event.runId}]")
   
         override def onQueryProgress(event: 
StreamingQueryListener.QueryProgressEvent): Unit = log.warn(s"QueryProgress 
${event.progress}")
   
         override def onQueryTerminated(event: 
StreamingQueryListener.QueryTerminatedEvent): Unit = 
log.debug(s"QueryTerminated [id = ${event.id}, runId = ${event.runId}, error = 
${event.exception}]")
       })
   
       val listener = new KafkaOffsetCommitterListener()
       ss.streams.addListener(listener)
   
       val pool = Executors.newFixedThreadPool(50)
       implicit val xc: ExecutionContextExecutor = 
ExecutionContext.fromExecutor(pool)
   
       val partitionFormat: (String => String) = (arg: String) => {
         val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss'Z'")
         val parFormatter = DateTimeFormatter.ofPattern("yyyyMM")
         parFormatter.format(formatter.parse(arg))
       }
       val sqlPartitionFunc = udf(partitionFormat)
   
       val ds = df.selectExpr("CAST(value AS STRING)").as[String]
       val query = ds
         .writeStream
         .queryName("debezium2hudi")
         .option("checkpointLocation", params.checkpointDir)
         // if set 0, as fast as possible
         .trigger(Trigger.ProcessingTime(params.trigger + " seconds"))
         .foreachBatch { (batchDF: Dataset[String], batchId: Long) =>
           log.warn("current batch: "+batchId.toString)
   
           val newsDF = batchDF.map(cdc => 
DebeziumParser.apply().debezium2Hudi(cdc))
             .filter(_ != null)
           if (!newsDF.isEmpty) {
             val tasks = Seq[Future[Unit]]()
             for (tableInfo <- tableInfoList.tableInfo) {
               val insertORUpsertDF = newsDF
                 .filter($"database" === tableInfo.database && $"table" === 
tableInfo.table)
                 .filter($"operationType" === HudiOP.UPSERT || $"operationType" 
=== HudiOP.INSERT)
                 .select($"data".as("jsonData"))
               if (!insertORUpsertDF.isEmpty) {
                 val json_schema = 
ss.read.json(insertORUpsertDF.select("jsonData").as[String]).schema
                 val cdcDF = insertORUpsertDF.select(from_json($"jsonData", 
json_schema).as("cdc_data"))
                 val cdcPartitionDF = cdcDF.select($"cdc_data.*")
                   .withColumn(tableInfo.hudiPartitionField, 
sqlPartitionFunc(col(tableInfo.partitionTimeColumn)))
                 params.concurrent match {
                   case "true" => {
                     val runTask = HudiWriteTask.run(cdcPartitionDF, params, 
tableInfo)(xc)
                     tasks :+ runTask
                   }
                   case _ => ....
                 }
               }
             }
             if (params.concurrent == "true" && tasks.nonEmpty) {
               Await.result(Future.sequence(tasks), Duration(60, MINUTES))
               ()
             }
   
           }
         }.start()
       query.awaitTermination()
     }
   
   }
   
   ```
   
   #### ERROR
   ```
   22/10/09 08:03:12 INFO ShuffleBlockFetcherIterator: Started 8 remote fetches 
in 0 ms
   22/10/09 08:03:12 INFO Executor: Finished task 0.0 in stage 25180.0 (TID 
172844). 2512 bytes result sent to driver
   22/10/09 08:03:12 ERROR Executor: Exception in task 0.0 in stage 25178.0 
(TID 172843)
   org.apache.hudi.exception.HoodieIOException: Failed to read MARKERS file 
s3://app-util/hudi-bloom/opti-tmp-102/cdc_test_db/dhdata_02/.hoodie/.temp/20221009080301290/MARKERS0
        at 
org.apache.hudi.common.util.MarkerUtils.readMarkersFromFile(MarkerUtils.java:210)
        at 
org.apache.hudi.common.util.MarkerUtils.lambda$readTimelineServerBasedMarkersFromFileSystem$141c8e72$1(MarkerUtils.java:185)
        at 
org.apache.hudi.common.fs.FSUtils.lambda$parallelizeFilesProcess$1f9929d5$1(FSUtils.java:736)
        at 
org.apache.hudi.client.common.HoodieSparkEngineContext.lambda$mapToPair$786cea6a$1(HoodieSparkEngineContext.java:149)
        at 
org.apache.spark.api.java.JavaPairRDD$.$anonfun$pairFunToScalaFun$1(JavaPairRDD.scala:1073)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
        at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
        at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
        at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
        at scala.collection.AbstractIterator.to(Iterator.scala:1431)
        at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
        at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
        at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
        at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
        at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1030)
        at 
org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2255)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:133)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1474)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   Caused by: java.io.FileNotFoundException: No such file or directory 
's3://app-util/hudi-bloom/opti-tmp-102/cdc_test_db/dhdata_02/.hoodie/.temp/20221009080301290/MARKERS0'
        at 
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:521)
        at 
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.open(S3NativeFileSystem.java:932)
        at 
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.open(S3NativeFileSystem.java:924)
        at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:906)
        at 
com.amazon.ws.emr.hadoop.fs.EmrFileSystem.open(EmrFileSystem.java:194)
        at 
org.apache.hudi.common.util.MarkerUtils.readMarkersFromFile(MarkerUtils.java:207)
        ... 31 more
   22/10/09 08:03:13 INFO YarnCoarseGrainedExecutorBackend: Got assigned task 
172858
   22/10/09 08:03:13 INFO Executor: Running task 6.0 in stage 25185.0 (TID 
172858)
   ```
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to