[ 
https://issues.apache.org/jira/browse/HUDI-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan resolved HUDI-2641.
---------------------------------------

> One inflight commit rolling back other concurrent inflight commits causing 
> them to fail
> ---------------------------------------------------------------------------------------
>
>                 Key: HUDI-2641
>                 URL: https://issues.apache.org/jira/browse/HUDI-2641
>             Project: Apache Hudi
>          Issue Type: Sub-task
>          Components: Writer Core
>            Reporter: Udit Mehrotra
>            Assignee: Udit Mehrotra
>            Priority: Blocker
>              Labels: pull-request-available, release-blocker
>             Fix For: 0.10.0
>
>
> The bug is easily reproducible when running 4-5 concurrent writers at once. 
> Atleast a few writers would end up failing with the following stack trace in 
> Hudi 0.8.0:
> {noformat}
> py4j.protocol.Py4JJavaError: An error occurred while calling o122.save.
> : java.lang.IllegalArgumentException
>       at 
> org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:31)
>       at 
> org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:396)
>       at 
> org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:377)
>       at 
> org.apache.hudi.common.table.timeline.HoodieActiveTimeline.saveAsComplete(HoodieActiveTimeline.java:154)
>       at 
> org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:212)
>       at 
> org.apache.hudi.client.AbstractHoodieWriteClient.commitStats(AbstractHoodieWriteClient.java:185)
>       at 
> org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:121)
>       at 
> org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:479)
>       at 
> org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:223)
>       at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:145)
>       at 
> org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
>       at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
>       at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
>       at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
>       at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:194)
>       at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>       at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229)
>       at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:190)
>       at 
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
>       at 
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
>       at 
> org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
>       at 
> org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
>       at 
> org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
>       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
>       at 
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
>       at 
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
>       at 
> org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
>       at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>       at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>       at py4j.Gateway.invoke(Gateway.java:282)
>       at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>       at py4j.commands.CallCommand.execute(CallCommand.java:79)
>       at py4j.GatewayConnection.run(GatewayConnection.java:238)
>       at java.lang.Thread.run(Thread.java:748){noformat}
>  
> Based on my debugging, this is what is going on:
>  * In our start commit/clean logic, we have a step where Hoodie rolls back 
> any failed writes 
> [here|https://github.com/apache/hudi/blob/release-0.8.0/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java#L669].
>  As part of the logic, it needs to identify instants that need to be rolled 
> back. In multi writer case, it depends upon the heart beats (whether expired 
> on not) to make a determination whether an inflight commit is a failed write 
> or not and can be rolled back 
> [here|https://github.com/apache/hudi/blob/release-0.8.0/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java#L828].
>  * And as part of the rolling back failed writes logic it makes this call to 
> delete heartbeats of rolled back/expired commits 
> [here|https://github.com/apache/hudi/blob/release-0.8.0/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java#L813].
>  *This call is being made irrespective of whether or not there is any instant 
> to rollback and the logic in that method is glaringly wrong. It just goes and 
> deletes all the heartbeats of all the inflight commits.*
>  * So one commit while starting the commit process goes ahead and deletes all 
> the heartbeats of other inflight commits. Meanwhile another commit has 
> reached the _cleaner_ stage where it again checks if there are any commits to 
> rollback. Now, because of missing/deleted heartbeats of some of the other 
> commits it assumes them to be expired/failed writes and starts rolling them 
> back and deletes their INFLIGHT commit file on the timeline.
>  * Ultimately, the inflight commits that are incorrectly rolled back end up 
> failing with the above exception because INFLIGHT state of that commit no 
> longer exists in the timeline, for it to be able to complete the commit 
> process successfully.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to