[ 
https://issues.apache.org/jira/browse/HUDI-739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17148123#comment-17148123
 ] 

t oo commented on HUDI-739:
---------------------------

I am facing similar error on hudi 0.5.3 & spark 2.4.6 with s3, in this case the 
existing data was written by hudi 0.4.6 but I get the error when trying to 
Delete records with hudi 0.5.3 & spark 2.4.6. I compare the S3 listing before 
and after running spark-submit and I can see it is actually deleting the file 
that the error complains about. Each time i run the spark-submit it deletes 
another 'inflight' file but still gives the error

```

2020-06-29 20:29:41,948 [Executor task launch worker for task 7] INFO 
org.apache.spark.executor.Executor - Running task 0.0 in stage 7.0 (TID 7)
2020-06-29 20:29:41,968 [Executor task launch worker for task 7] INFO 
org.apache.spark.storage.ShuffleBlockFetcherIterator - Getting 1 non-empty 
blocks including 1 local blocks and 0 remote blocks
2020-06-29 20:29:41,970 [Executor task launch worker for task 7] INFO 
org.apache.spark.storage.ShuffleBlockFetcherIterator - Started 0 remote fetches 
in 5 ms
2020-06-29 20:29:42,000 [Executor task launch worker for task 7] INFO 
org.apache.spark.executor.Executor - Finished task 0.0 in stage 7.0 (TID 7). 
1177 bytes result sent to driver
2020-06-29 20:29:42,019 [task-result-getter-3] INFO 
org.apache.spark.scheduler.TaskSetManager - Finished task 0.0 in stage 7.0 (TID 
7) in 74 ms on localhost (executor driver) (1/1)
2020-06-29 20:29:42,019 [task-result-getter-3] INFO 
org.apache.spark.scheduler.TaskSchedulerImpl - Removed TaskSet 7.0, whose tasks 
have all completed, from pool
2020-06-29 20:29:42,020 [dag-scheduler-event-loop] INFO 
org.apache.spark.scheduler.DAGScheduler - ResultStage 7 (collect at 
RollbackHelper.java:139) finished in 0.095 s
2020-06-29 20:29:42,021 [main] INFO org.apache.spark.scheduler.DAGScheduler - 
Job 6 finished: collect at RollbackHelper.java:139, took 0.311751 s
2020-06-29 20:29:42,100 [main] INFO 
org.apache.hudi.table.HoodieCopyOnWriteTable - Deleting 
instant=[==>20190926055336__commit__INFLIGHT]
2020-06-29 20:29:42,100 [main] INFO 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline - Deleting instant 
[==>20190926055336__commit__INFLIGHT]
2020-06-29 20:29:42,207 [main] INFO 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline - Removed instant 
[==>20190926055336__commit__INFLIGHT]
2020-06-29 20:29:42,207 [main] INFO 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline - Deleting instant 
[==>20190926055336__commit__REQUESTED]
Exception in thread "main" org.apache.hudi.exception.HoodieIOException: Could 
not delete instant [==>20190926055336__commit__REQUESTED]
 at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.deleteInstantFile(HoodieActiveTimeline.java:181)
 at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.deletePending(HoodieActiveTimeline.java:164)
 at 
org.apache.hudi.table.HoodieCopyOnWriteTable.deleteInflightAndRequestedInstant(HoodieCopyOnWriteTable.java:412)
 at 
org.apache.hudi.table.HoodieCopyOnWriteTable.rollback(HoodieCopyOnWriteTable.java:379)
 at 
org.apache.hudi.client.AbstractHoodieWriteClient.doRollbackAndGetStats(AbstractHoodieWriteClient.java:370)
 at 
org.apache.hudi.client.AbstractHoodieWriteClient.rollbackInternal(AbstractHoodieWriteClient.java:322)
 at 
org.apache.hudi.client.HoodieWriteClient.rollback(HoodieWriteClient.java:716)
 at 
org.apache.hudi.client.HoodieWriteClient.rollbackPendingCommits(HoodieWriteClient.java:1017)
 at 
org.apache.hudi.client.HoodieWriteClient.startCommit(HoodieWriteClient.java:846)

```

 

then after 3 reruns (no inflight files remaining) the next error i get is:

```

Exception in thread "main" org.apache.hudi.exception.HoodieRollbackException: 
Found commits after time :20190905164612, please rollback greater commits first
 at 
org.apache.hudi.client.AbstractHoodieWriteClient.doRollbackAndGetStats(AbstractHoodieWriteClient.java:358)
 at 
org.apache.hudi.client.AbstractHoodieWriteClient.rollbackInternal(AbstractHoodieWriteClient.java:322)
 at 
org.apache.hudi.client.HoodieWriteClient.rollback(HoodieWriteClient.java:716)
 at 
org.apache.hudi.client.HoodieWriteClient.rollbackPendingCommits(HoodieWriteClient.java:1017)
 at 
org.apache.hudi.client.HoodieWriteClient.startCommit(HoodieWriteClient.java:846)

```

 

> HoodieIOException: Could not delete in-flight instant
> -----------------------------------------------------
>
>                 Key: HUDI-739
>                 URL: https://issues.apache.org/jira/browse/HUDI-739
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: Common Core
>    Affects Versions: 0.5.0
>            Reporter: Catalin Alexandru Zamfir
>            Assignee: sivabalan narayanan
>            Priority: Blocker
>              Labels: AWS, S3, bug-bash-0.6.0
>
> We are evaluating Hudi to use for our near real-time ingestion needs, 
> compared to other solutions (Delta/Iceberg). We've picked Hudi because 
> pre-installed with Amazon EMR by AWS. However, adopting it is blocking on 
> this issue with concurrent small batch (of 256 files) write jobs (to the same 
> S3 path).
> Using Livy we're triggering Spark jobs writing Hudi tables over S3, on EMR 
> with EMRFS active. Paths are using the "s3://" prefix and EMRFS is active. 
> We're writing Spark SQL datasets promoted up from RDDs. The 
> "hoodie.consistency.check.enabled" is set to true. Spark serializer is Kryo. 
> Hoodie version is 0.5.0-incubating.
> Both on COW and MOR tables some of the submitted jobs are failing with the 
> below exception:
> {code:java}
> org.apache.hudi.exception.HoodieIOException: Could not delete in-flight 
> instant [==>20200326175252__deltacommit__INFLIGHT]
>       at 
> org.apache.hudi.common.table.timeline.HoodieActiveTimeline.deleteInstantFile(HoodieActiveTimeline.java:239)
>       at 
> org.apache.hudi.common.table.timeline.HoodieActiveTimeline.deleteInflight(HoodieActiveTimeline.java:222)
>       at 
> org.apache.hudi.table.HoodieCopyOnWriteTable.deleteInflightInstant(HoodieCopyOnWriteTable.java:380)
>       at 
> org.apache.hudi.table.HoodieMergeOnReadTable.rollback(HoodieMergeOnReadTable.java:327)
>       at 
> org.apache.hudi.HoodieWriteClient.doRollbackAndGetStats(HoodieWriteClient.java:834)
>       at 
> org.apache.hudi.HoodieWriteClient.rollbackInternal(HoodieWriteClient.java:907)
>       at 
> org.apache.hudi.HoodieWriteClient.rollback(HoodieWriteClient.java:733)
>       at 
> org.apache.hudi.HoodieWriteClient.rollbackInflightCommits(HoodieWriteClient.java:1121)
>       at 
> org.apache.hudi.HoodieWriteClient.startCommitWithTime(HoodieWriteClient.java:994)
>       at 
> org.apache.hudi.HoodieWriteClient.startCommit(HoodieWriteClient.java:987)
>       at 
> org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:141)
>       at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:91)
>       at 
> org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
>       at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
>       at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
>       at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>       at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>       at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>       at 
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
>       at 
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:83)
>       at 
> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
>       at 
> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
>       at 
> org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:84)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:165)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74)
>       at 
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
>       at 
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
>       at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
>       at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
> {code}
> The jobs are sent in concurrent batches of 256 files, over the same S3 path, 
> in total some 8k files for 6 hours of our data.
> Writing happens with the following code (basePath is an S3 bucket):
> {code:java}
> // Configs (edited)
> String databaseName = "nrt";
> String assumeYmdPartitions = "false";
> String extractorClass = MultiPartKeysValueExtractor.class.getName ();
> String tableType = DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL ();
> String tableOperation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL ();
> String hiveJdbcUri = 
> "jdbc:hive2://ip-x-y-z-q.eu-west-1.compute.internal:10000";
> String basePath = "s3://some_path_to_hudi"; // or "s3a://" does not seem to 
> matter, same exception
> String avroSchemaAsString = avroSchema.toString ();
> String tableName = avroSchema.getName ().toLowerCase ().replace ("avro", "");
> eventsDataset.write ()
>     .format ("org.apache.hudi")
>     .option (HoodieWriteConfig.TABLE_NAME, tableName)
>     .option (DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY (), tableType)
>     .option (DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY (), "id")
>     .option (DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY (), 
> "partition_path")
>     .option (DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY (), "timestamp")
>     .option (DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY (), "true")
>     .option (DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY (), databaseName)
>     .option (DataSourceWriteOptions.HIVE_TABLE_OPT_KEY (), tableName)
>     .option (DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY (), 
> "tenant,year,month,day")
>     .option (DataSourceWriteOptions.HIVE_URL_OPT_KEY (), hiveJdbcUri)
>     .option (DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY (), 
> assumeYmdPartitions)
>     .option (DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY 
> (), extractorClass)
>     .option (DataSourceWriteOptions.OPERATION_OPT_KEY (), tableOperation)
> .mode (SaveMode.Append)
> .save (String.format ("%s/%s", basePath, tableName));
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to