pushpavanthar opened a new issue, #7657:
URL: https://github.com/apache/hudi/issues/7657
Problem:
When DeltaStreamer running in continuous mode is killed and resumed, below
error is thrown.
```
23/01/12 13:28:06 ERROR HoodieAsyncService: Service shutdown with error
java.util.concurrent.ExecutionException:
org.apache.hudi.exception.HoodieRollbackException: Failed to rollback
s3://datalake_bucket/test/hudi_poc/continuous_cow/loan_applications commits
20230112132517252
at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at
org.apache.hudi.async.HoodieAsyncService.waitForShutdown(HoodieAsyncService.java:103)
at
org.apache.hudi.async.AsyncCleanerService.waitForCompletion(AsyncCleanerService.java:75)
at
org.apache.hudi.client.BaseHoodieWriteClient.autoCleanOnCommit(BaseHoodieWriteClient.java:605)
at
org.apache.hudi.client.BaseHoodieWriteClient.postCommit(BaseHoodieWriteClient.java:529)
at
org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:236)
at
org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:122)
at
org.apache.hudi.utilities.deltastreamer.DeltaSync.writeToSink(DeltaSync.java:624)
at
org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:333)
at
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:679)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.hudi.exception.HoodieRollbackException: Failed to
rollback s3://datalake_bucket/test/hudi_poc/continuous_cow/loan_applications
commits 20230112132517252
at
org.apache.hudi.client.BaseHoodieWriteClient.rollback(BaseHoodieWriteClient.java:779)
at
org.apache.hudi.client.BaseHoodieWriteClient.rollbackFailedWrites(BaseHoodieWriteClient.java:1189)
at
org.apache.hudi.client.BaseHoodieWriteClient.rollbackFailedWrites(BaseHoodieWriteClient.java:1172)
at
org.apache.hudi.client.BaseHoodieWriteClient.lambda$clean$33796fd2$1(BaseHoodieWriteClient.java:852)
at
org.apache.hudi.common.util.CleanerUtils.rollbackFailedWrites(CleanerUtils.java:142)
at
org.apache.hudi.client.BaseHoodieWriteClient.clean(BaseHoodieWriteClient.java:851)
at
org.apache.hudi.client.BaseHoodieWriteClient.clean(BaseHoodieWriteClient.java:821)
at
org.apache.hudi.async.AsyncCleanerService.lambda$startService$0(AsyncCleanerService.java:55)
... 4 more
Caused by: java.lang.IllegalArgumentException: Invalid number of file groups
for partition:column_stats, found=0, required=1
at
org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:40)
at
org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.prepRecords(HoodieBackedTableMetadataWriter.java:968)
at
org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.commit(SparkHoodieBackedTableMetadataWriter.java:132)
at
org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.update(HoodieBackedTableMetadataWriter.java:924)
at
org.apache.hudi.table.action.BaseActionExecutor.lambda$writeTableMetadata$2(BaseActionExecutor.java:77)
at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
at
org.apache.hudi.table.action.BaseActionExecutor.writeTableMetadata(BaseActionExecutor.java:77)
at
org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.finishRollback(BaseRollbackActionExecutor.java:255)
at
org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.runRollback(BaseRollbackActionExecutor.java:124)
at
org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.execute(BaseRollbackActionExecutor.java:145)
at
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.rollback(HoodieSparkCopyOnWriteTable.java:281)
at
org.apache.hudi.client.BaseHoodieWriteClient.rollback(BaseHoodieWriteClient.java:762)
... 11 more
```
Steps to reproduce the behavour:
1. Run HoodieDeltaStreamer with below config
```
spark-submit --master yarn --jars
/usr/lib/spark/external/lib/spark-avro.jar,s3://<some_domain>/jars/hudi-utilities-bundle_2.12-0.11.1.jar
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
--jars
/usr/lib/spark/external/lib/spark-avro.jar,s3://<some_domain>/jars/hudi-utilities-bundle_2.12-0.11.1.jar
\
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
--deploy-mode cluster s3://<some_domain>/jars/deltastreamer-addons-1.3.jar \
--enable-sync \
--hoodie-conf hoodie.deltastreamer.source.kafka.auto.reset.offsets=earliest \
--hoodie-conf hoodie.parquet.compression.codec=snappy \
--hoodie-conf
partition.assignment.strategy=org.apache.kafka.clients.consumer.RangeAssignor \
--hoodie-conf hive.metastore.disallow.incompatible.col.type.changes=false \
--hoodie-conf
hoodie.deltastreamer.schemaprovider.spark_avro_post_processor.enable=false \
--hoodie-conf auto.offset.reset=earliest \
--table-type COPY_ON_WRITE \
--source-class com.<some_domain>.sources.ConfluentAvroKafkaSource \
--schemaprovider-class
org.apache.hudi.utilities.schema.NullTargetSchemaRegistryProvider \
--props s3://<artifacts>/config/hudi/clusteringjob.properties \
--source-limit 10000000 \
--hoodie-conf
hoodie.deltastreamer.schemaprovider.registry.url=https://<some_domain>-tech.in/subjects/db_name.public.table_name-value/versions/latest
\
--hoodie-conf hoodie.datasource.hive_sync.database=test_clustering \
--hoodie-conf hoodie.datasource.hive_sync.table=cow_table_name \
--hoodie-conf hoodie.datasource.write.recordkey.field=id \
--hoodie-conf hoodie.datasource.write.precombine.field=__lsn \
--hoodie-conf
hoodie.deltastreamer.source.kafka.topic=db_name.public.table_name \
--hoodie-conf group.id=hudi-cow-continuous-loan-applications \
--source-ordering-field __lsn \
--target-base-path
s3://<some_domain>/test/hudi_poc/continuous_cow/table_name \
--target-table cow_table_name \
--payload-class
com.<some_domain>.payload.PostgresSoftDeleteDebeziumAvroPayload \
--hoodie-conf hoodie.bloom.index.update.partition.path=false \
--hoodie-conf hoodie.metrics.on=true \
--hoodie-conf hoodie.metrics.reporter.type=PROMETHEUS_PUSHGATEWAY \
--hoodie-conf
hoodie.metrics.pushgateway.host=pushgateway.prod.<some_domain>-tech.in \
--hoodie-conf hoodie.metrics.pushgateway.port=443 \
--hoodie-conf hoodie.metrics.pushgateway.delete.on.shutdown=false \
--hoodie-conf
hoodie.metrics.pushgateway.job.name=hudi_cow_continuous_table_name \
--hoodie-conf hoodie.metrics.pushgateway.random.job.name.suffix=false \
--hoodie-conf hoodie.metrics.reporter.metricsname.prefix=hudi \
--hoodie-conf hoodie.datasource.write.partitionpath.field='' \
--hoodie-conf
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGenerator
\
--hoodie-conf
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractor
\
--transformer-class com.<some_domain>.transform.DebeziumTransformer \
--hoodie-conf hoodie.clean.automatic=true \
--hoodie-conf hoodie.clean.async=true \
--hoodie-conf hoodie.clean.max.commits=10 \
--hoodie-conf hoodie.cleaner.policy=KEEP_LATEST_BY_HOURS \
--hoodie-conf hoodie.keep.max.commits=800 \
--hoodie-conf hoodie.keep.min.commits=600 \
--hoodie-conf hoodie.cleaner.hours.retained=1 \
--hoodie-conf hoodie.cleaner.parallelism=500 \
--hoodie-conf hoodie.clean.allow.multiple=false \
--hoodie-conf hoodie.cleaner.incremental.mode=true \
--hoodie-conf hoodie.archive.async=true \
--hoodie-conf hoodie.archive.automatic=true \
--hoodie-conf hoodie.archive.merge.files.batch.size=20 \
--hoodie-conf hoodie.commits.archival.batch=20 \
--hoodie-conf hoodie.archive.delete.parallelism=500 \
--hoodie-conf hoodie.archive.merge.enable=true \
--hoodie-conf hoodie.clustering.inline=false \
--hoodie-conf hoodie.bloom.index.use.metadata=false \
--hoodie-conf hoodie.index.type=BLOOM \
--hoodie-conf hoodie.metadata.enable=true \
--hoodie-conf hoodie.metadata.index.column.stats.parallelism=50 \
--hoodie-conf hoodie.metadata.compact.max.delta.commits=10 \
--hoodie-conf hoodie.metadata.index.column.stats.enable=true \
--hoodie-conf hoodie.metadata.metrics.enable=true \
--hoodie-conf hoodie.metadata.index.bloom.filter.file.group.count=20 \
--hoodie-conf hoodie.metadata.cleaner.commits.retained=60 \
--hoodie-conf hoodie.metadata.index.check.timeout.seconds=900 \
--hoodie-conf hoodie.metadata.populate.meta.fields=true \
--hoodie-conf hoodie.metadata.index.async=true \
--hoodie-conf hoodie.file.listing.parallelism=800 \
--hoodie-conf hoodie.metadata.index.bloom.filter.enable=true \
--hoodie-conf hoodie.metadata.index.bloom.filter.parallelism=500 \
--hoodie-conf hoodie.metadata.clean.async=true \
--hoodie-conf hoodie.metadata.keep.max.commits=80 \
--hoodie-conf hoodie.metadata.insert.parallelism=20 \
--hoodie-conf hoodie.metadata.keep.min.commits=70 \
--hoodie-conf hoodie.write.markers.type=DIRECT \
--hoodie-conf hoodie.write.concurrency.mode=OPTIMISTIC_CONCURRENCY_CONTROL \
--hoodie-conf hoodie.cleaner.policy.failed.writes=LAZY \
--hoodie-conf
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.InProcessLockProvider
\
--hoodie-conf hoodie.deltastreamer.source.kafka.enable.commit.offset=true \
--sync-tool-classes org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool \
--continuous
```
2. Kill the application
3. Restart the application and the job fails with above mentioned exception.
When metadata is disabled, job runs fine
**Expected behavior**
The job is supposed to resume when restarted without any problem.
**Environment Description**
* Hudi version : 0.11.1
* Spark version : 3.1.1
* Hive version : 3.1.2
* Hadoop version :
* Storage (HDFS/S3/GCS..) : S3
* Running on Docker? (yes/no) : No
Let me know if you need more info.
Thanks in advance.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]