hgudladona opened a new issue, #12298: URL: https://github.com/apache/hudi/issues/12298
**Describe the problem you faced** Intermittent java.util.NoSuchElementException when writing to partitions that are out of order and not covered by the active timeline. **To Reproduce** We have a hudi job reading from kafka and writing to S3 in partitions dynamically derived from certain columns in the records in the format of tenant=xxxxx/date=YYYYMMDD. Under certain situations when the partition the new data is written into is not in the active timeline (Late arriving data), there seems to be a mismatch between the file group decided in the stage "Getting small files from partitions" and "Doing partition and writing data". Lets say a FG id '9836da68-a598-4072-a342-53dc511609f6-0' is decided as a small file in stage "Getting small files from partitions" and passed on to the "Doing partition and writing data" stage to INSERT new data and create a new base file for it, this stage fails with the following exception and fails the streamer job with exception below. However, this operation streamer job succeeds in 2 situations 1. Upon restart of the streamer job and the same records are retried to be INSERTed. 2. If embedded timeline server is turned off **Expected behavior** We expect that there is no mismatch between the views of the stages "Getting small files from partitions" and "Doing partition and writing data" in cases when we are writing to a partition thats no not actively tracked in the active timeline. **Environment Description** * Hudi version : 0.14.1 * Spark version : 3.4.x * Storage (HDFS/S3/GCS..) : S3 * Running on Docker? (yes/no) : yes **Additional context** auto.offset.reset: latest bootstrap.servers: kafka-brokers group.id: hudi-ingest-some-group hoodie.archive.async: true hoodie.archive.automatic: true hoodie.auto.adjust.lock.configs: true hoodie.base.path: s3a://some-base-path hoodie.clean.async: true hoodie.cleaner.hours.retained: 36 hoodie.cleaner.parallelism: 600 hoodie.cleaner.policy: KEEP_LATEST_BY_HOURS hoodie.cleaner.policy.failed.writes: LAZY hoodie.clustering.async.enabled: false hoodie.combine.before.insert: false hoodie.copyonwrite.insert.auto.split: false hoodie.datasource.fetch.table.enable: true hoodie.datasource.hive_sync.database: hudi_events_v1 hoodie.datasource.hive_sync.mode: hms hoodie.datasource.hive_sync.partition_extractor_class: org.apache.hudi.hive.MultiPartKeysValueExtractor hoodie.datasource.hive_sync.partition_fields: tenant,date hoodie.datasource.hive_sync.table: some-table hoodie.datasource.hive_sync.table_properties: projection.date.type=date|projection.date.format=yyyyMMdd|projection.date.range=19700101,99990101|projection.tenant.type=integer|projection.tenant.range=-1,8675309|projection.enabled=true hoodie.datasource.meta_sync.condition.sync: true hoodie.datasource.sync_tool.single_instance: true hoodie.datasource.write.hive_style_partitioning: true hoodie.datasource.write.keygenerator.class: com.some-class-prefix.KeyGenerator hoodie.datasource.write.operation: insert hoodie.datasource.write.partitionpath.field: tenant:SIMPLE,date:SIMPLE hoodie.datasource.write.precombine.field: event_time_usec hoodie.datasource.write.reconcile.schema: false hoodie.datasource.write.recordkey.field: resource_id hoodie.deltastreamer.kafka.source.maxEvents: 75000000 hoodie.deltastreamer.schemaprovider.registry.url: http://schema-registry.some-suffix:8085 hoodie.deltastreamer.source.kafka.enable.commit.offset: true hoodie.deltastreamer.source.kafka.topic: some-topic hoodie.deltastreamer.source.schema.subject: some-topic-value hoodie.fail.on.timeline.archiving: false hoodie.filesystem.view.incr.timeline.sync.enable: true hoodie.filesystem.view.remote.timeout.secs: 2 hoodie.insert.shuffle.parallelism: 1600 hoodie.memory.merge.max.size: 2147483648 hoodie.metadata.enable: false hoodie.metrics.on: true hoodie.metrics.reporter.metricsname.prefix: hoodie.metrics.reporter.prefix.tablename: false hoodie.metrics.reporter.type: DATADOG hoodie.parquet.compression.codec: zstd hoodie.streamer.source.kafka.minPartitions: 450 hoodie.table.name: <> hoodie.table.partition.fields: tenant,date hoodie.table.type: MERGE_ON_READ hoodie.write.concurrency.mode: OPTIMISTIC_CONCURRENCY_CONTROL hoodie.write.lock.dynamodb.billing_mode: PROVISIONED hoodie.write.lock.dynamodb.endpoint_url: https://dynamodb.us-east-2.amazonaws.com/ hoodie.write.lock.dynamodb.partition_key: some-key hoodie.write.lock.dynamodb.region: us-east-2 hoodie.write.lock.dynamodb.table: HudiLocker hoodie.write.lock.provider: org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider hoodie.write.markers.type: DIRECT **Stacktrace** ```2024-11-19T16:41:53,800 ERROR [pool-34-thread-1] org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService:Shutting down delta-sync due to exception org.apache.spark.SparkException: Job aborted due to stage failure: Task 302 in stage 473.0 failed 4 times, most recent failure: Lost task 302.3 in stage 473.0 (TID 254209) (10.41.171.188 executor 186): org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :302 at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:342) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleInsertPartition(BaseSparkCommitActionExecutor.java:348) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:259) at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102) at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:905) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:905) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:377) at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1552) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1462) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1526) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1349) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:375) at org.apache.spark.rdd.RDD.iterator(RDD.scala:326) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) at org.apache.spark.scheduler.Task.run(Task.scala:139) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.base/java.lang.Thread.run(Unknown Source) Caused by: java.util.NoSuchElementException: FileID 9836da68-a598-4072-a342-53dc511609f6-0 of partition path tenant=XXXXX/date=20241118 does not exist. at org.apache.hudi.io.HoodieMergeHandle.getLatestBaseFile(HoodieMergeHandle.java:161) at org.apache.hudi.io.HoodieMergeHandle.<init>(HoodieMergeHandle.java:126) at org.apache.hudi.io.HoodieMergeHandleFactory.create(HoodieMergeHandleFactory.java:68) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.getUpdateHandle(BaseSparkCommitActionExecutor.java:400) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:368) at org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor.handleUpdate(BaseSparkDeltaCommitActionExecutor.java:79) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:335)``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
