[
https://issues.apache.org/jira/browse/HUDI-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
sivabalan narayanan resolved HUDI-2641.
---------------------------------------
> One inflight commit rolling back other concurrent inflight commits causing
> them to fail
> ---------------------------------------------------------------------------------------
>
> Key: HUDI-2641
> URL: https://issues.apache.org/jira/browse/HUDI-2641
> Project: Apache Hudi
> Issue Type: Sub-task
> Components: Writer Core
> Reporter: Udit Mehrotra
> Assignee: Udit Mehrotra
> Priority: Blocker
> Labels: pull-request-available, release-blocker
> Fix For: 0.10.0
>
>
> The bug is easily reproducible when running 4-5 concurrent writers at once.
> Atleast a few writers would end up failing with the following stack trace in
> Hudi 0.8.0:
> {noformat}
> py4j.protocol.Py4JJavaError: An error occurred while calling o122.save.
> : java.lang.IllegalArgumentException
> at
> org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:31)
> at
> org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:396)
> at
> org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:377)
> at
> org.apache.hudi.common.table.timeline.HoodieActiveTimeline.saveAsComplete(HoodieActiveTimeline.java:154)
> at
> org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:212)
> at
> org.apache.hudi.client.AbstractHoodieWriteClient.commitStats(AbstractHoodieWriteClient.java:185)
> at
> org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:121)
> at
> org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:479)
> at
> org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:223)
> at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:145)
> at
> org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
> at
> org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:194)
> at
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:190)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
> at
> org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
> at
> org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
> at
> org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
> at
> org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
> at
> org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
> at
> org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
> at
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
> at
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
> at
> org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at py4j.Gateway.invoke(Gateway.java:282)
> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.lang.Thread.run(Thread.java:748){noformat}
>
> Based on my debugging, this is what is going on:
> * In our start commit/clean logic, we have a step where Hoodie rolls back
> any failed writes
> [here|https://github.com/apache/hudi/blob/release-0.8.0/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java#L669].
> As part of the logic, it needs to identify instants that need to be rolled
> back. In multi writer case, it depends upon the heart beats (whether expired
> on not) to make a determination whether an inflight commit is a failed write
> or not and can be rolled back
> [here|https://github.com/apache/hudi/blob/release-0.8.0/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java#L828].
> * And as part of the rolling back failed writes logic it makes this call to
> delete heartbeats of rolled back/expired commits
> [here|https://github.com/apache/hudi/blob/release-0.8.0/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java#L813].
> *This call is being made irrespective of whether or not there is any instant
> to rollback and the logic in that method is glaringly wrong. It just goes and
> deletes all the heartbeats of all the inflight commits.*
> * So one commit while starting the commit process goes ahead and deletes all
> the heartbeats of other inflight commits. Meanwhile another commit has
> reached the _cleaner_ stage where it again checks if there are any commits to
> rollback. Now, because of missing/deleted heartbeats of some of the other
> commits it assumes them to be expired/failed writes and starts rolling them
> back and deletes their INFLIGHT commit file on the timeline.
> * Ultimately, the inflight commits that are incorrectly rolled back end up
> failing with the above exception because INFLIGHT state of that commit no
> longer exists in the timeline, for it to be able to complete the commit
> process successfully.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)