[
https://issues.apache.org/jira/browse/HUDI-739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17148123#comment-17148123
]
t oo commented on HUDI-739:
---------------------------
I am facing similar error on hudi 0.5.3 & spark 2.4.6 with s3, in this case the
existing data was written by hudi 0.4.6 but I get the error when trying to
Delete records with hudi 0.5.3 & spark 2.4.6. I compare the S3 listing before
and after running spark-submit and I can see it is actually deleting the file
that the error complains about. Each time i run the spark-submit it deletes
another 'inflight' file but still gives the error
```
2020-06-29 20:29:41,948 [Executor task launch worker for task 7] INFO
org.apache.spark.executor.Executor - Running task 0.0 in stage 7.0 (TID 7)
2020-06-29 20:29:41,968 [Executor task launch worker for task 7] INFO
org.apache.spark.storage.ShuffleBlockFetcherIterator - Getting 1 non-empty
blocks including 1 local blocks and 0 remote blocks
2020-06-29 20:29:41,970 [Executor task launch worker for task 7] INFO
org.apache.spark.storage.ShuffleBlockFetcherIterator - Started 0 remote fetches
in 5 ms
2020-06-29 20:29:42,000 [Executor task launch worker for task 7] INFO
org.apache.spark.executor.Executor - Finished task 0.0 in stage 7.0 (TID 7).
1177 bytes result sent to driver
2020-06-29 20:29:42,019 [task-result-getter-3] INFO
org.apache.spark.scheduler.TaskSetManager - Finished task 0.0 in stage 7.0 (TID
7) in 74 ms on localhost (executor driver) (1/1)
2020-06-29 20:29:42,019 [task-result-getter-3] INFO
org.apache.spark.scheduler.TaskSchedulerImpl - Removed TaskSet 7.0, whose tasks
have all completed, from pool
2020-06-29 20:29:42,020 [dag-scheduler-event-loop] INFO
org.apache.spark.scheduler.DAGScheduler - ResultStage 7 (collect at
RollbackHelper.java:139) finished in 0.095 s
2020-06-29 20:29:42,021 [main] INFO org.apache.spark.scheduler.DAGScheduler -
Job 6 finished: collect at RollbackHelper.java:139, took 0.311751 s
2020-06-29 20:29:42,100 [main] INFO
org.apache.hudi.table.HoodieCopyOnWriteTable - Deleting
instant=[==>20190926055336__commit__INFLIGHT]
2020-06-29 20:29:42,100 [main] INFO
org.apache.hudi.common.table.timeline.HoodieActiveTimeline - Deleting instant
[==>20190926055336__commit__INFLIGHT]
2020-06-29 20:29:42,207 [main] INFO
org.apache.hudi.common.table.timeline.HoodieActiveTimeline - Removed instant
[==>20190926055336__commit__INFLIGHT]
2020-06-29 20:29:42,207 [main] INFO
org.apache.hudi.common.table.timeline.HoodieActiveTimeline - Deleting instant
[==>20190926055336__commit__REQUESTED]
Exception in thread "main" org.apache.hudi.exception.HoodieIOException: Could
not delete instant [==>20190926055336__commit__REQUESTED]
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.deleteInstantFile(HoodieActiveTimeline.java:181)
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.deletePending(HoodieActiveTimeline.java:164)
at
org.apache.hudi.table.HoodieCopyOnWriteTable.deleteInflightAndRequestedInstant(HoodieCopyOnWriteTable.java:412)
at
org.apache.hudi.table.HoodieCopyOnWriteTable.rollback(HoodieCopyOnWriteTable.java:379)
at
org.apache.hudi.client.AbstractHoodieWriteClient.doRollbackAndGetStats(AbstractHoodieWriteClient.java:370)
at
org.apache.hudi.client.AbstractHoodieWriteClient.rollbackInternal(AbstractHoodieWriteClient.java:322)
at
org.apache.hudi.client.HoodieWriteClient.rollback(HoodieWriteClient.java:716)
at
org.apache.hudi.client.HoodieWriteClient.rollbackPendingCommits(HoodieWriteClient.java:1017)
at
org.apache.hudi.client.HoodieWriteClient.startCommit(HoodieWriteClient.java:846)
```
then after 3 reruns (no inflight files remaining) the next error i get is:
```
Exception in thread "main" org.apache.hudi.exception.HoodieRollbackException:
Found commits after time :20190905164612, please rollback greater commits first
at
org.apache.hudi.client.AbstractHoodieWriteClient.doRollbackAndGetStats(AbstractHoodieWriteClient.java:358)
at
org.apache.hudi.client.AbstractHoodieWriteClient.rollbackInternal(AbstractHoodieWriteClient.java:322)
at
org.apache.hudi.client.HoodieWriteClient.rollback(HoodieWriteClient.java:716)
at
org.apache.hudi.client.HoodieWriteClient.rollbackPendingCommits(HoodieWriteClient.java:1017)
at
org.apache.hudi.client.HoodieWriteClient.startCommit(HoodieWriteClient.java:846)
```
> HoodieIOException: Could not delete in-flight instant
> -----------------------------------------------------
>
> Key: HUDI-739
> URL: https://issues.apache.org/jira/browse/HUDI-739
> Project: Apache Hudi
> Issue Type: Bug
> Components: Common Core
> Affects Versions: 0.5.0
> Reporter: Catalin Alexandru Zamfir
> Assignee: sivabalan narayanan
> Priority: Blocker
> Labels: AWS, S3, bug-bash-0.6.0
>
> We are evaluating Hudi to use for our near real-time ingestion needs,
> compared to other solutions (Delta/Iceberg). We've picked Hudi because
> pre-installed with Amazon EMR by AWS. However, adopting it is blocking on
> this issue with concurrent small batch (of 256 files) write jobs (to the same
> S3 path).
> Using Livy we're triggering Spark jobs writing Hudi tables over S3, on EMR
> with EMRFS active. Paths are using the "s3://" prefix and EMRFS is active.
> We're writing Spark SQL datasets promoted up from RDDs. The
> "hoodie.consistency.check.enabled" is set to true. Spark serializer is Kryo.
> Hoodie version is 0.5.0-incubating.
> Both on COW and MOR tables some of the submitted jobs are failing with the
> below exception:
> {code:java}
> org.apache.hudi.exception.HoodieIOException: Could not delete in-flight
> instant [==>20200326175252__deltacommit__INFLIGHT]
> at
> org.apache.hudi.common.table.timeline.HoodieActiveTimeline.deleteInstantFile(HoodieActiveTimeline.java:239)
> at
> org.apache.hudi.common.table.timeline.HoodieActiveTimeline.deleteInflight(HoodieActiveTimeline.java:222)
> at
> org.apache.hudi.table.HoodieCopyOnWriteTable.deleteInflightInstant(HoodieCopyOnWriteTable.java:380)
> at
> org.apache.hudi.table.HoodieMergeOnReadTable.rollback(HoodieMergeOnReadTable.java:327)
> at
> org.apache.hudi.HoodieWriteClient.doRollbackAndGetStats(HoodieWriteClient.java:834)
> at
> org.apache.hudi.HoodieWriteClient.rollbackInternal(HoodieWriteClient.java:907)
> at
> org.apache.hudi.HoodieWriteClient.rollback(HoodieWriteClient.java:733)
> at
> org.apache.hudi.HoodieWriteClient.rollbackInflightCommits(HoodieWriteClient.java:1121)
> at
> org.apache.hudi.HoodieWriteClient.startCommitWithTime(HoodieWriteClient.java:994)
> at
> org.apache.hudi.HoodieWriteClient.startCommit(HoodieWriteClient.java:987)
> at
> org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:141)
> at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:91)
> at
> org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:83)
> at
> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
> at
> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
> at
> org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:84)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:165)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74)
> at
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
> at
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
> {code}
> The jobs are sent in concurrent batches of 256 files, over the same S3 path,
> in total some 8k files for 6 hours of our data.
> Writing happens with the following code (basePath is an S3 bucket):
> {code:java}
> // Configs (edited)
> String databaseName = "nrt";
> String assumeYmdPartitions = "false";
> String extractorClass = MultiPartKeysValueExtractor.class.getName ();
> String tableType = DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL ();
> String tableOperation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL ();
> String hiveJdbcUri =
> "jdbc:hive2://ip-x-y-z-q.eu-west-1.compute.internal:10000";
> String basePath = "s3://some_path_to_hudi"; // or "s3a://" does not seem to
> matter, same exception
> String avroSchemaAsString = avroSchema.toString ();
> String tableName = avroSchema.getName ().toLowerCase ().replace ("avro", "");
> eventsDataset.write ()
> .format ("org.apache.hudi")
> .option (HoodieWriteConfig.TABLE_NAME, tableName)
> .option (DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY (), tableType)
> .option (DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY (), "id")
> .option (DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY (),
> "partition_path")
> .option (DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY (), "timestamp")
> .option (DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY (), "true")
> .option (DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY (), databaseName)
> .option (DataSourceWriteOptions.HIVE_TABLE_OPT_KEY (), tableName)
> .option (DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY (),
> "tenant,year,month,day")
> .option (DataSourceWriteOptions.HIVE_URL_OPT_KEY (), hiveJdbcUri)
> .option (DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY (),
> assumeYmdPartitions)
> .option (DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY
> (), extractorClass)
> .option (DataSourceWriteOptions.OPERATION_OPT_KEY (), tableOperation)
> .mode (SaveMode.Append)
> .save (String.format ("%s/%s", basePath, tableName));
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)