pushpavanthar opened a new issue, #7657:
URL: https://github.com/apache/hudi/issues/7657

   Problem:
   When DeltaStreamer running in continuous mode is killed and resumed, below 
error is thrown. 
   ```
   23/01/12 13:28:06 ERROR HoodieAsyncService: Service shutdown with error
   java.util.concurrent.ExecutionException: 
org.apache.hudi.exception.HoodieRollbackException: Failed to rollback 
s3://datalake_bucket/test/hudi_poc/continuous_cow/loan_applications commits 
20230112132517252
        at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
        at 
org.apache.hudi.async.HoodieAsyncService.waitForShutdown(HoodieAsyncService.java:103)
        at 
org.apache.hudi.async.AsyncCleanerService.waitForCompletion(AsyncCleanerService.java:75)
        at 
org.apache.hudi.client.BaseHoodieWriteClient.autoCleanOnCommit(BaseHoodieWriteClient.java:605)
        at 
org.apache.hudi.client.BaseHoodieWriteClient.postCommit(BaseHoodieWriteClient.java:529)
        at 
org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:236)
        at 
org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:122)
        at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.writeToSink(DeltaSync.java:624)
        at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:333)
        at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:679)
        at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   Caused by: org.apache.hudi.exception.HoodieRollbackException: Failed to 
rollback s3://datalake_bucket/test/hudi_poc/continuous_cow/loan_applications 
commits 20230112132517252
        at 
org.apache.hudi.client.BaseHoodieWriteClient.rollback(BaseHoodieWriteClient.java:779)
        at 
org.apache.hudi.client.BaseHoodieWriteClient.rollbackFailedWrites(BaseHoodieWriteClient.java:1189)
        at 
org.apache.hudi.client.BaseHoodieWriteClient.rollbackFailedWrites(BaseHoodieWriteClient.java:1172)
        at 
org.apache.hudi.client.BaseHoodieWriteClient.lambda$clean$33796fd2$1(BaseHoodieWriteClient.java:852)
        at 
org.apache.hudi.common.util.CleanerUtils.rollbackFailedWrites(CleanerUtils.java:142)
        at 
org.apache.hudi.client.BaseHoodieWriteClient.clean(BaseHoodieWriteClient.java:851)
        at 
org.apache.hudi.client.BaseHoodieWriteClient.clean(BaseHoodieWriteClient.java:821)
        at 
org.apache.hudi.async.AsyncCleanerService.lambda$startService$0(AsyncCleanerService.java:55)
        ... 4 more
   Caused by: java.lang.IllegalArgumentException: Invalid number of file groups 
for partition:column_stats, found=0, required=1
        at 
org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:40)
        at 
org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.prepRecords(HoodieBackedTableMetadataWriter.java:968)
        at 
org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.commit(SparkHoodieBackedTableMetadataWriter.java:132)
        at 
org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.update(HoodieBackedTableMetadataWriter.java:924)
        at 
org.apache.hudi.table.action.BaseActionExecutor.lambda$writeTableMetadata$2(BaseActionExecutor.java:77)
        at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
        at 
org.apache.hudi.table.action.BaseActionExecutor.writeTableMetadata(BaseActionExecutor.java:77)
        at 
org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.finishRollback(BaseRollbackActionExecutor.java:255)
        at 
org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.runRollback(BaseRollbackActionExecutor.java:124)
        at 
org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.execute(BaseRollbackActionExecutor.java:145)
        at 
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.rollback(HoodieSparkCopyOnWriteTable.java:281)
        at 
org.apache.hudi.client.BaseHoodieWriteClient.rollback(BaseHoodieWriteClient.java:762)
        ... 11 more
   ```
   
   Steps to reproduce the behavour:
   
   1. Run HoodieDeltaStreamer with below config
   ```
   spark-submit --master yarn --jars 
/usr/lib/spark/external/lib/spark-avro.jar,s3://<some_domain>/jars/hudi-utilities-bundle_2.12-0.11.1.jar
 --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
   --jars 
/usr/lib/spark/external/lib/spark-avro.jar,s3://<some_domain>/jars/hudi-utilities-bundle_2.12-0.11.1.jar
 \
   --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
   --deploy-mode cluster s3://<some_domain>/jars/deltastreamer-addons-1.3.jar \
   --enable-sync \
   --hoodie-conf hoodie.deltastreamer.source.kafka.auto.reset.offsets=earliest \
   --hoodie-conf hoodie.parquet.compression.codec=snappy \
   --hoodie-conf 
partition.assignment.strategy=org.apache.kafka.clients.consumer.RangeAssignor \
   --hoodie-conf hive.metastore.disallow.incompatible.col.type.changes=false \
   --hoodie-conf 
hoodie.deltastreamer.schemaprovider.spark_avro_post_processor.enable=false \
   --hoodie-conf auto.offset.reset=earliest \
   --table-type COPY_ON_WRITE \
   --source-class com.<some_domain>.sources.ConfluentAvroKafkaSource \
   --schemaprovider-class 
org.apache.hudi.utilities.schema.NullTargetSchemaRegistryProvider \
   --props s3://<artifacts>/config/hudi/clusteringjob.properties \
   --source-limit 10000000 \
   --hoodie-conf 
hoodie.deltastreamer.schemaprovider.registry.url=https://<some_domain>-tech.in/subjects/db_name.public.table_name-value/versions/latest
 \
   --hoodie-conf hoodie.datasource.hive_sync.database=test_clustering \
   --hoodie-conf hoodie.datasource.hive_sync.table=cow_table_name \
   --hoodie-conf hoodie.datasource.write.recordkey.field=id \
   --hoodie-conf hoodie.datasource.write.precombine.field=__lsn \
   --hoodie-conf 
hoodie.deltastreamer.source.kafka.topic=db_name.public.table_name \
   --hoodie-conf group.id=hudi-cow-continuous-loan-applications \
   --source-ordering-field __lsn \
   --target-base-path 
s3://<some_domain>/test/hudi_poc/continuous_cow/table_name \
   --target-table cow_table_name \
   --payload-class 
com.<some_domain>.payload.PostgresSoftDeleteDebeziumAvroPayload \
   --hoodie-conf hoodie.bloom.index.update.partition.path=false \
   --hoodie-conf hoodie.metrics.on=true \
   --hoodie-conf hoodie.metrics.reporter.type=PROMETHEUS_PUSHGATEWAY \
   --hoodie-conf 
hoodie.metrics.pushgateway.host=pushgateway.prod.<some_domain>-tech.in \
   --hoodie-conf hoodie.metrics.pushgateway.port=443 \
   --hoodie-conf hoodie.metrics.pushgateway.delete.on.shutdown=false \
   --hoodie-conf 
hoodie.metrics.pushgateway.job.name=hudi_cow_continuous_table_name \
   --hoodie-conf hoodie.metrics.pushgateway.random.job.name.suffix=false \
   --hoodie-conf hoodie.metrics.reporter.metricsname.prefix=hudi \
   --hoodie-conf hoodie.datasource.write.partitionpath.field='' \
   --hoodie-conf 
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGenerator
 \
   --hoodie-conf 
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractor
 \
   --transformer-class com.<some_domain>.transform.DebeziumTransformer \
   --hoodie-conf hoodie.clean.automatic=true \
   --hoodie-conf hoodie.clean.async=true \
   --hoodie-conf hoodie.clean.max.commits=10 \
   --hoodie-conf hoodie.cleaner.policy=KEEP_LATEST_BY_HOURS \
   --hoodie-conf hoodie.keep.max.commits=800 \
   --hoodie-conf hoodie.keep.min.commits=600 \
   --hoodie-conf hoodie.cleaner.hours.retained=1 \
   --hoodie-conf hoodie.cleaner.parallelism=500 \
   --hoodie-conf hoodie.clean.allow.multiple=false \
   --hoodie-conf hoodie.cleaner.incremental.mode=true \
   --hoodie-conf hoodie.archive.async=true \
   --hoodie-conf hoodie.archive.automatic=true \
   --hoodie-conf hoodie.archive.merge.files.batch.size=20 \
   --hoodie-conf hoodie.commits.archival.batch=20 \
   --hoodie-conf hoodie.archive.delete.parallelism=500 \
   --hoodie-conf hoodie.archive.merge.enable=true \
   --hoodie-conf hoodie.clustering.inline=false \
   --hoodie-conf hoodie.bloom.index.use.metadata=false \
   --hoodie-conf hoodie.index.type=BLOOM \
   --hoodie-conf hoodie.metadata.enable=true \
   --hoodie-conf hoodie.metadata.index.column.stats.parallelism=50 \
   --hoodie-conf hoodie.metadata.compact.max.delta.commits=10 \
   --hoodie-conf hoodie.metadata.index.column.stats.enable=true \
   --hoodie-conf hoodie.metadata.metrics.enable=true \
   --hoodie-conf hoodie.metadata.index.bloom.filter.file.group.count=20 \
   --hoodie-conf hoodie.metadata.cleaner.commits.retained=60 \
   --hoodie-conf hoodie.metadata.index.check.timeout.seconds=900 \
   --hoodie-conf hoodie.metadata.populate.meta.fields=true \
   --hoodie-conf hoodie.metadata.index.async=true \
   --hoodie-conf hoodie.file.listing.parallelism=800 \
   --hoodie-conf hoodie.metadata.index.bloom.filter.enable=true \
   --hoodie-conf hoodie.metadata.index.bloom.filter.parallelism=500 \
   --hoodie-conf hoodie.metadata.clean.async=true \
   --hoodie-conf hoodie.metadata.keep.max.commits=80 \
   --hoodie-conf hoodie.metadata.insert.parallelism=20 \
   --hoodie-conf hoodie.metadata.keep.min.commits=70 \
   --hoodie-conf hoodie.write.markers.type=DIRECT \
   --hoodie-conf hoodie.write.concurrency.mode=OPTIMISTIC_CONCURRENCY_CONTROL \
   --hoodie-conf hoodie.cleaner.policy.failed.writes=LAZY \
   --hoodie-conf 
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.InProcessLockProvider
 \
   --hoodie-conf hoodie.deltastreamer.source.kafka.enable.commit.offset=true \
   --sync-tool-classes org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool \
   --continuous
   ```
   2. Kill the application
   3. Restart the application and the job fails with above mentioned exception. 
When metadata is disabled, job runs fine
   
   **Expected behavior**
   
   The job is supposed to resume when restarted without any problem.
   
   **Environment Description**
   
   * Hudi version : 0.11.1
   
   * Spark version : 3.1.1
   
   * Hive version : 3.1.2
   
   * Hadoop version : 
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : No
   
   Let me know if you need more info. 
   Thanks in advance.
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to