dyang108 opened a new issue, #12975:
URL: https://github.com/apache/hudi/issues/12975
**Describe the problem you faced**
Running Deltastreamer with async compaction on a schedule takes continuously
longer as dataset grows, compactions seem to start failing. We can't keep up on
writing with our input Kafka topic, on a small dataset.
**To Reproduce**
Steps to reproduce the behavior:
I have a Kafka Avro topic that Deltastreamer reads and writes to an s3
table. The data size is not crazy, but it seems the files created are too
small. We have accumulated 112,889 files totaling 1.4TB in our S3 prefix, after
running the job for 3 months
Deltastreamer:
```
/opt/spark/bin/spark-submit --class
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer
--master k8s://https://172.20.0.1:443
--deploy-mode cluster
--conf spark.kubernetes.namespace=deltastreamer
--conf spark.app.name=photo-inferences-deltastreamer-1741891516380541751
--conf
spark.kubernetes.driver.pod.name=photo-inferences-deltastreamer-1741891516380541751-driver
--conf spark.kubernetes.container.image.pullPolicy=Always
--conf spark.kubernetes.submission.waitAppCompletion=false
--conf
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.readOnly=false
--conf spark.executor.memory=40g
--conf spark.kubernetes.container.image.pullPolicy=Always
--conf spark.ui.prometheus.enabled=true
--conf
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.claimName=OnDemand
--conf spark.driver.cores=2
--conf spark.eventLog.rolling.maxFileSize=128m
--conf spark.kubernetes.submission.waitAppCompletion=false
--conf spark.cores.max=10
--conf
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.storageClass=gp3-fastest
--conf spark.ui.port=4040
--conf spark.sql.shuffle.partitions=4000
--conf
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.path=/tmp/dir1
--conf spark.cassandra.output.consistency.level=QUORUM
--conf
spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.WebIdentityTokenCredentialsProvider
--conf spark.eventLog.enabled=true
--conf spark.cassandra.input.consistency.level=QUORUM
--conf spark.master=k8s://https://foo.us-east-1.eks.amazonaws.com:443
--conf spark.eventLog.rolling.enabled=true
--conf spark.sql.parquet.int96RebaseModeInRead=CORRECTED
--conf spark.sql.parquet.datetimeRebaseModeInRead=CORRECTED
--conf spark.executor.instances=4
--conf spark.executor.cores=10
--conf spark.kubernetes.executor.request.cores=6
--conf spark.snowflake.sfUser=spark
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
--conf
spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension
--conf spark.jars.ivy=/tmp/.ivy2
--conf
spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog
--conf spark.driver.memory=4g
--conf spark.hadoop.fs.s3a.path.style.access=true
--conf
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.sizeLimit=200Gi
--conf spark.kubernetes.executor.label.version=3.2.1
--conf spark.sql.parquet.datetimeRebaseModeInWrite=CORRECTED
--conf spark.sql.parquet.int96RebaseModeInWrite=CORRECTED
--conf spark.hudi.timeline.server.port=26754
--conf spark.metrics.namespace=unknown
--conf spark.ui.showConsoleProgress=true
--conf spark.default.parallelism=4000
--conf spark.kubernetes.resource.type=java
--conf spark.hadoop.fs.s3a.connection.ssl.enabled=false
--conf spark.hadoop.fs.s3a.connection.ssl.enabled=false
--conf
spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.WebIdentityTokenCredentialsProvider
--conf
spark.kubernetes.driver.label.sparkoperator.k8s.io/app-name=photo-inferences-deltastreamer-1741891516380541751
--conf
spark.kubernetes.driver.label.sparkoperator.k8s.io/launched-by-spark-operator=true
--conf
spark.kubernetes.driver.label.sparkoperator.k8s.io/submission-id=7c1bc0fd-d164-4bc0-9ec7-9e013a2ca300
--conf spark.kubernetes.driver.container.image=<image>
--conf spark.driver.cores=2
--conf spark.kubernetes.driver.request.cores=2
--conf
spark.kubernetes.authenticate.driver.serviceAccountName=photo-inferences-deltastreamer
--conf
spark.driver.extraJavaOptions=-Dconfig.override_with_env_vars=true
-Dconfig.file=/etc/spark/work-dir/conf/photo-inferences-deltastreamer.json
--conf spark.kubernetes.driver.label.version=3.3.0
--conf
spark.kubernetes.driver.label.app.kubernetes.io/name=photo-inferences-deltastreamer
--conf spark.kubernetes.driver.label.app.kubernetes.io/managed-by=Helm
--conf spark.kubernetes.driver.label.export-metrics-to-prometheus=true
--conf spark.kubernetes.driver.label.namespace=deltastreamer
--conf
spark.kubernetes.driver.label.sparkoperator.k8s.io/scheduled-app-name=photo-inferences-deltastreamer
--conf spark.kubernetes.driver.annotation.fluentbit.io/exclude=true
--conf
spark.kubernetes.driver.annotation.karpenter.sh/do-not-disrupt=true
--conf spark.kubernetes.driver.annotation.sidecar.istio.io/inject=false
--conf
spark.kubernetes.executor.label.sparkoperator.k8s.io/app-name=photo-inferences-deltastreamer-1741891516380541751
--conf
spark.kubernetes.executor.label.sparkoperator.k8s.io/launched-by-spark-operator=true
--conf
spark.kubernetes.executor.label.sparkoperator.k8s.io/submission-id=7c1bc0fd-d164-4bc0-9ec7-9e013a2ca300
--conf spark.kubernetes.executor.container.image=<image>
--conf spark.executor.cores=10
--conf spark.kubernetes.executor.request.cores=6
--conf
spark.kubernetes.authenticate.executor.serviceAccountName=photo-inferences-deltastreamer
--conf spark.kubernetes.executor.label.app.kubernetes.io/managed-by=Helm
--conf spark.kubernetes.executor.label.export-metrics-to-prometheus=true
--conf spark.kubernetes.executor.label.namespace=deltastreamer
--conf
spark.kubernetes.executor.label.sparkoperator.k8s.io/scheduled-app-name=photo-inferences-deltastreamer
--conf spark.kubernetes.executor.label.version=3.3.0
--conf
spark.kubernetes.executor.annotation.sidecar.istio.io/inject=false
--conf spark.kubernetes.executor.annotation.fluentbit.io/exclude=true
--conf
spark.kubernetes.executor.annotation.karpenter.sh/do-not-disrupt=true
--conf
spark.executor.extraJavaOptions=-Dconfig.override_with_env_vars=true
-Dconfig.file=/configmap-data/photo-inferences-deltastreamer.json
-XX:+UnlockExperimentalVMOptions -XX:+UseG1GC -XX:MaxGCPauseMillis=500
--conf spark.kubernetes.node.selector.karpenter.sh/nodepool=spark
local:///etc/spark/jars/deltastreamer-server_2.12-0.0.1-SNAPSHOT.jar
--props /kafka-source.properties
--schemaprovider-class
org.apache.hudi.utilities.schema.SchemaRegistryProvider
--source-class org.apache.hudi.utilities.sources.AvroKafkaSource
--target-base-path s3a://my-bucket/photo-data/hudi
--target-table photo_inf
--op UPSERT
--source-ordering-field ts
--table-type MERGE_ON_READ
--source-limit 1000000
--max-pending-compactions 2
--hoodie-conf hoodie.datasource.compaction.async.enable=true
--hoodie-conf auto.offset.reset=earliest
--hoodie-conf hoodie.filesystem.view.remote.port=26754 --hoodie-conf
hoodie.datasource.compaction.async.enable=true
```
Compactor:
```
/opt/spark/bin/spark-submit
--class org.apache.hudi.utilities.HoodieCompactor
--master k8s://https://172.20.0.1:443
--deploy-mode cluster
--conf spark.kubernetes.namespace=deltastreamer
--conf spark.app.name=photo-inferences-compactor-1741645510295959759
--conf
spark.kubernetes.driver.pod.name=photo-inferences-compactor-1741645510295959759-driver
--conf spark.kubernetes.container.image.pullPolicy=Always
--conf spark.kubernetes.submission.waitAppCompletion=false
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
--conf
spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog
--conf spark.hadoop.fs.s3a.connection.ssl.enabled=false
--conf spark.sql.parquet.datetimeRebaseModeInWrite=CORRECTED
--conf spark.cassandra.input.consistency.level=QUORUM
--conf spark.hadoop.fs.s3a.path.style.access=true
--conf spark.kubernetes.resource.type=java
--conf
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.claimName=OnDemand
--conf spark.ui.showConsoleProgress=true
--conf spark.executor.instances=4
--conf spark.jars.ivy=/tmp/.ivy2
--conf spark.driver.memory=24g
--conf spark.executor.cores=4
--conf spark.sql.parquet.int96RebaseModeInRead=CORRECTED
--conf spark.sql.parquet.int96RebaseModeInWrite=CORRECTED
--conf spark.kubernetes.container.image.pullPolicy=Always
--conf spark.kubernetes.submission.waitAppCompletion=false
--conf
spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension
--conf spark.kubernetes.executor.label.version=3.2.1
--conf spark.ui.port=4040
--conf
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.storageClass=gp3-fastest
--conf spark.eventLog.rolling.maxFileSize=128m
--conf spark.ui.prometheus.enabled=true
--conf
spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.WebIdentityTokenCredentialsProvider
--conf spark.cores.max=16
--conf spark.executor.memory=24g
--conf
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.readOnly=false
--conf spark.driver.cores=2
--conf spark.hudi.timeline.server.port=26754
--conf spark.sql.shuffle.partitions=4000
--conf spark.master=k8s://https://foo.us-east-1.eks.amazonaws.com:443
--conf spark.snowflake.sfUser=spark
--conf spark.default.parallelism=4000
--conf
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.sizeLimit=200Gi
--conf spark.sql.parquet.datetimeRebaseModeInRead=CORRECTED
--conf spark.cassandra.output.consistency.level=QUORUM
--conf spark.eventLog.rolling.enabled=true
--conf spark.eventLog.enabled=true
--conf spark.metrics.namespace=unknown
--conf
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.path=/tmp/dir1
--conf
spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.WebIdentityTokenCredentialsProvider
--conf spark.hadoop.fs.s3a.connection.ssl.enabled=false
--conf
spark.kubernetes.driver.label.sparkoperator.k8s.io/app-name=photo-inferences-compactor-1741645510295959759
--conf
spark.kubernetes.driver.label.sparkoperator.k8s.io/launched-by-spark-operator=true
--conf
spark.kubernetes.driver.label.sparkoperator.k8s.io/submission-id=477b19e0-cd38-4711-bb08-7b28db176c23
--conf spark.kubernetes.driver.container.image=<image>
--conf spark.driver.cores=2
--conf spark.kubernetes.driver.request.cores=2
--conf
spark.kubernetes.authenticate.driver.serviceAccountName=photo-inferences-compactor
--conf
spark.driver.extraJavaOptions=-Dconfig.override_with_env_vars=true
-Dconfig.file=/etc/spark/work-dir/conf/photo-inferences-compactor.json
--conf spark.kubernetes.driver.label.version=3.3.0
--conf
spark.kubernetes.driver.label.app.kubernetes.io/name=photo-inferences-compactor
--conf spark.kubernetes.driver.label.app.kubernetes.io/managed-by=Helm
--conf spark.kubernetes.driver.label.export-metrics-to-prometheus=true
--conf spark.kubernetes.driver.label.namespace=deltastreamer
--conf
spark.kubernetes.driver.label.sparkoperator.k8s.io/scheduled-app-name=photo-inferences-compactor
--conf spark.kubernetes.driver.annotation.fluentbit.io/exclude=true
--conf
spark.kubernetes.driver.annotation.karpenter.sh/do-not-disrupt=true
--conf spark.kubernetes.driver.annotation.sidecar.istio.io/inject=false
--conf
spark.kubernetes.executor.label.sparkoperator.k8s.io/app-name=photo-inferences-compactor-1741645510295959759
--conf
spark.kubernetes.executor.label.sparkoperator.k8s.io/launched-by-spark-operator=true
--conf
spark.kubernetes.executor.label.sparkoperator.k8s.io/submission-id=477b19e0-cd38-4711-bb08-7b28db176c23
--conf spark.kubernetes.executor.container.image=<image>
--conf spark.executor.cores=4
--conf spark.kubernetes.executor.request.cores=4
--conf
spark.kubernetes.authenticate.executor.serviceAccountName=photo-inferences-compactor
--conf spark.kubernetes.executor.label.namespace=deltastreamer
--conf
spark.kubernetes.executor.label.sparkoperator.k8s.io/scheduled-app-name=photo-inferences-compactor
--conf spark.kubernetes.executor.label.version=3.3.0
--conf spark.kubernetes.executor.label.app.kubernetes.io/managed-by=Helm
--conf spark.kubernetes.executor.label.export-metrics-to-prometheus=true
--conf spark.kubernetes.executor.annotation.fluentbit.io/exclude=true
--conf
spark.kubernetes.executor.annotation.karpenter.sh/do-not-disrupt=true
--conf
spark.kubernetes.executor.annotation.sidecar.istio.io/inject=false
--conf
spark.executor.extraJavaOptions=-Dconfig.override_with_env_vars=true
-Dconfig.file=/configmap-data/photo-inferences-compactor.json
-XX:+UnlockExperimentalVMOptions -XX:+UseG1GC -XX:MaxGCPauseMillis=500
--conf spark.kubernetes.node.selector.karpenter.sh/nodepool=spark
local:///etc/spark/jars/deltastreamer-server_2.12-0.0.1-SNAPSHOT.jar
--base-path s3a://my-bucket/photo-data/hudi
--table-name photo_inf
```
kafka-source.properties:
```
hoodie.upsert.shuffle.parallelism=4
hoodie.insert.shuffle.parallelism=4
hoodie.delete.shuffle.parallelism=4
hoodie.bulkinsert.shuffle.parallelism=4
hoodie.schema.cache.enable=true
hoodie.datasource.write.recordkey.field=mediaId
hoodie.datasource.write.partitionpath.field=partition
hoodie.deltastreamer.schemaprovider.registry.url=<my-schema-registry>
hoodie.deltastreamer.source.kafka.topic=<input-topic>
hoodie.deltastreamer.source.kafka.group.id=<input-consumer-group>
bootstrap.servers=<my-kafka-bootstrap-servers>
auto.offset.reset=latest
schema.registry.url=<my-schema-registry>
```
The `partition` column is an autoincrement userId % 10000 to create an even
distribution.
The `ts` is the processing time on the producer of the input-topic.
I'm running compaction every 2 hours, and running deltastreamer without
continuous mode every 6 hours (if a deltastreamer is already running, we don't
overwrite the existing instance).
I've tried a healthy number of combinations of inline compactions. (ie
[hoodie.compact.schedule.inline](https://hudi.apache.org/docs/configurations/#hoodiecompactscheduleinline)
[hoodie.compact.inline.max.delta.commits](https://hudi.apache.org/docs/configurations/#hoodiecompactinlinemaxdeltacommits)
hoodie.compact.inline hoodie.compaction.strategy, etc) - they all seem to
eventually fall into difficulty a few weeks along in the pipeline.
The last successful compaction requested was
20241207093706000.compaction.requested. And the inflight:
20241207093706000.compaction.inflight.
The last successful Deltastreamer run took 56 hours, a few days ago. The
runtime of the deltastreamer job has been steadily increasing since deploying
this.
I'm also seeing failed Deltastreamer runs after the last success, failing
with Caused by: java.io.FileNotFoundException: No such file or directory:
s3a://my-bucket/photo-data/hudi/9868/ab48667d-ac44-460a-850a-bc413f42e540-0_50-96-94348_20250312191529445.parquet
**Expected behavior**
Deltastreamer should not take this long, especially given mostly-append
nature of the incoming data. Compaction should be successfully completing.
**Environment Description**
* Hudi version : 0.15.0
* Spark version : 3.3.0
* Hive version : 2.3.9
* Hadoop version : 3.3.6
* Storage (HDFS/S3/GCS..) : S3
* Running on Docker? (yes/no) : yes
**Additional context**
Chatted with @vinothchandar at Onehouse about this issue.
There's certainly something faulty with my configuration but would love some
help on this issue.
Add any other context about the problem here.
**Stacktrace**
```Add the stacktrace of the error.```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]