hgudladona opened a new issue, #12298:
URL: https://github.com/apache/hudi/issues/12298

   
   
   **Describe the problem you faced**
   
   Intermittent java.util.NoSuchElementException when writing to partitions 
that are out of order and not covered by the active timeline. 
   
   **To Reproduce**
   
   We have a hudi job reading from kafka and writing to S3 in partitions 
dynamically derived from certain columns in the records in the format of 
tenant=xxxxx/date=YYYYMMDD. Under certain situations when the partition the new 
data is written into is not in the active timeline (Late arriving data), there 
seems to be a mismatch between the file group decided in the stage "Getting 
small files from partitions" and "Doing partition and writing data". 
   
   Lets say a FG id '9836da68-a598-4072-a342-53dc511609f6-0' is decided as a 
small file in stage "Getting small files from partitions" and passed on to the 
"Doing partition and writing data" stage to INSERT new data and create a new 
base file for it, this stage fails with the following exception and fails the 
streamer job with exception below.
   
   However, this operation streamer job succeeds in 2 situations
   1. Upon restart of the streamer job and the same records are retried to be 
INSERTed. 
   2. If embedded timeline server is turned off
   
   **Expected behavior**
   
   We expect that there is no mismatch between the views of the stages "Getting 
small files from partitions" and "Doing partition and writing data" in cases 
when we are writing to a partition thats no not actively tracked in the active 
timeline. 
   
   **Environment Description**
   
   * Hudi version : 0.14.1
   
   * Spark version : 3.4.x
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : yes
   
   
   **Additional context**
   
   auto.offset.reset: latest
   bootstrap.servers: kafka-brokers
   group.id: hudi-ingest-some-group
   hoodie.archive.async: true
   hoodie.archive.automatic: true
   hoodie.auto.adjust.lock.configs: true
   hoodie.base.path: s3a://some-base-path
   hoodie.clean.async: true
   hoodie.cleaner.hours.retained: 36
   hoodie.cleaner.parallelism: 600
   hoodie.cleaner.policy: KEEP_LATEST_BY_HOURS
   hoodie.cleaner.policy.failed.writes: LAZY
   hoodie.clustering.async.enabled: false
   hoodie.combine.before.insert: false
   hoodie.copyonwrite.insert.auto.split: false
   hoodie.datasource.fetch.table.enable: true
   hoodie.datasource.hive_sync.database: hudi_events_v1
   hoodie.datasource.hive_sync.mode: hms
   hoodie.datasource.hive_sync.partition_extractor_class: 
org.apache.hudi.hive.MultiPartKeysValueExtractor
   hoodie.datasource.hive_sync.partition_fields: tenant,date
   hoodie.datasource.hive_sync.table: some-table
   hoodie.datasource.hive_sync.table_properties: 
projection.date.type=date|projection.date.format=yyyyMMdd|projection.date.range=19700101,99990101|projection.tenant.type=integer|projection.tenant.range=-1,8675309|projection.enabled=true
   hoodie.datasource.meta_sync.condition.sync: true
   hoodie.datasource.sync_tool.single_instance: true
   hoodie.datasource.write.hive_style_partitioning: true
   hoodie.datasource.write.keygenerator.class: 
com.some-class-prefix.KeyGenerator
   hoodie.datasource.write.operation: insert
   hoodie.datasource.write.partitionpath.field: tenant:SIMPLE,date:SIMPLE
   hoodie.datasource.write.precombine.field: event_time_usec
   hoodie.datasource.write.reconcile.schema: false
   hoodie.datasource.write.recordkey.field: resource_id
   hoodie.deltastreamer.kafka.source.maxEvents: 75000000
   hoodie.deltastreamer.schemaprovider.registry.url: 
http://schema-registry.some-suffix:8085
   hoodie.deltastreamer.source.kafka.enable.commit.offset: true
   hoodie.deltastreamer.source.kafka.topic: some-topic
   hoodie.deltastreamer.source.schema.subject: some-topic-value
   hoodie.fail.on.timeline.archiving: false
   hoodie.filesystem.view.incr.timeline.sync.enable: true
   hoodie.filesystem.view.remote.timeout.secs: 2
   hoodie.insert.shuffle.parallelism: 1600
   hoodie.memory.merge.max.size: 2147483648
   hoodie.metadata.enable: false
   hoodie.metrics.on: true
   hoodie.metrics.reporter.metricsname.prefix:
   hoodie.metrics.reporter.prefix.tablename: false
   hoodie.metrics.reporter.type: DATADOG
   hoodie.parquet.compression.codec: zstd
   hoodie.streamer.source.kafka.minPartitions: 450
   hoodie.table.name: <>
   hoodie.table.partition.fields: tenant,date
   hoodie.table.type: MERGE_ON_READ
   hoodie.write.concurrency.mode: OPTIMISTIC_CONCURRENCY_CONTROL
   hoodie.write.lock.dynamodb.billing_mode: PROVISIONED
   hoodie.write.lock.dynamodb.endpoint_url: 
https://dynamodb.us-east-2.amazonaws.com/
   hoodie.write.lock.dynamodb.partition_key: some-key
   hoodie.write.lock.dynamodb.region: us-east-2
   hoodie.write.lock.dynamodb.table: HudiLocker
   hoodie.write.lock.provider: 
org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider
   hoodie.write.markers.type: DIRECT
   
   
   
   **Stacktrace**
   
   ```2024-11-19T16:41:53,800 ERROR [pool-34-thread-1] 
org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService:Shutting 
down delta-sync due to exception
   org.apache.spark.SparkException: Job aborted due to stage failure: Task 302 
in stage 473.0 failed 4 times, most recent failure: Lost task 302.3 in stage 
473.0 (TID 254209) (10.41.171.188 executor 186): 
org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType 
UPDATE for partition :302
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:342)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleInsertPartition(BaseSparkCommitActionExecutor.java:348)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:259)
        at 
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
        at 
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:905)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:905)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
        at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:377)
        at 
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1552)
        at 
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1462)
        at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1526)
        at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1349)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:375)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:326)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
        at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
        at org.apache.spark.scheduler.Task.run(Task.scala:139)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
Source)
        at java.base/java.lang.Thread.run(Unknown Source)
   Caused by: java.util.NoSuchElementException: FileID 
9836da68-a598-4072-a342-53dc511609f6-0 of partition path 
tenant=XXXXX/date=20241118 does not exist.
        at 
org.apache.hudi.io.HoodieMergeHandle.getLatestBaseFile(HoodieMergeHandle.java:161)
        at 
org.apache.hudi.io.HoodieMergeHandle.<init>(HoodieMergeHandle.java:126)
        at 
org.apache.hudi.io.HoodieMergeHandleFactory.create(HoodieMergeHandleFactory.java:68)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.getUpdateHandle(BaseSparkCommitActionExecutor.java:400)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:368)
        at 
org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor.handleUpdate(BaseSparkDeltaCommitActionExecutor.java:79)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:335)```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to