This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 56f0233658e [SPARK-39868][CORE][TESTS] StageFailed event should attach
with the root cause
56f0233658e is described below
commit 56f0233658e53425ff915e803284139defb4af42
Author: panbingkun <[email protected]>
AuthorDate: Wed Jul 27 14:21:34 2022 +0900
[SPARK-39868][CORE][TESTS] StageFailed event should attach with the root
cause
### What changes were proposed in this pull request?
The pr follow https://github.com/apache/spark/pull/37245
StageFailed event should attach with the root cause
### Why are the changes needed?
**It may be a good way for users to know the reason of failure.**
By carefully investigating the issue:
https://issues.apache.org/jira/browse/SPARK-39622,
I found the root cause of test failure: StageFailed don't attach the failed
reason from executor.
when OutputCommitCoordinator execute 'taskCompleted', the 'reason' is
ignored.
Scenario 1: receive TaskSetFailed (Success)
> InsertIntoHadoopFsRelationCommand
> FileFormatWriter.write
> _**handleTaskSetFailed**_ (**attach root cause**)
> abortStage
> failJobAndIndependentStages
> SparkListenerJobEnd
Scenario 1: receive StageFailed (Fail)
> InsertIntoHadoopFsRelationCommand
> FileFormatWriter.write
> _**handleStageFailed**_ (**don't attach root cause**)
> abortStage
> failJobAndIndependentStages
> SparkListenerJobEnd
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manual run UT & Pass GitHub Actions
Closes #37292 from panbingkun/SPARK-39868.
Authored-by: panbingkun <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../apache/spark/scheduler/OutputCommitCoordinator.scala | 3 ++-
.../OutputCommitCoordinatorIntegrationSuite.scala | 3 ++-
.../sql/execution/datasources/parquet/ParquetIOSuite.scala | 14 ++------------
3 files changed, 6 insertions(+), 14 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
index a33c2bb93bc..cd5d6b8f9c9 100644
---
a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
+++
b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
@@ -160,7 +160,8 @@ private[spark] class OutputCommitCoordinator(
if (stageState.authorizedCommitters(partition) == taskId) {
sc.foreach(_.dagScheduler.stageFailed(stage, s"Authorized committer
" +
s"(attemptNumber=$attemptNumber, stage=$stage,
partition=$partition) failed; " +
- s"but task commit success, data duplication may happen."))
+ s"but task commit success, data duplication may happen. " +
+ s"reason=$reason"))
}
}
}
diff --git
a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala
index 66b13be4f7a..45da750768f 100644
---
a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala
+++
b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala
@@ -51,7 +51,8 @@ class OutputCommitCoordinatorIntegrationSuite
sc.parallelize(1 to 4,
2).map(_.toString).saveAsTextFile(tempDir.getAbsolutePath + "/out")
}
}.getCause.getMessage
- assert(e.endsWith("failed; but task commit success, data duplication may
happen."))
+ assert(e.contains("failed; but task commit success, data duplication may
happen.") &&
+ e.contains("Intentional exception"))
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index a06ddc1b9e9..5a8f4563756 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -1202,15 +1202,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
val m1 = intercept[SparkException] {
spark.range(1).coalesce(1).write.options(extraOptions).parquet(dir.getCanonicalPath)
}.getCause.getMessage
- // SPARK-39622: The current case must handle the `TaskSetFailed` event
before SPARK-39195
- // due to `maxTaskFailures` is 1 when local mode. After SPARK-39195,
it may handle to one
- // of the `TaskSetFailed` event and `StageFailed` event, and the
execution order of the
- // two events is uncertain, so the assertion of `Authorized committer
(attemptNumber=n,
- // stage=s, partition=p) failed; but task commit success, data
duplication may happen.`
- // is added for workaround.
- assert(m1.contains("Intentional exception for testing purposes") ||
- (m1.contains("Authorized committer") &&
- m1.contains("failed; but task commit success, data duplication may
happen.")))
+ assert(m1.contains("Intentional exception for testing purposes"))
}
withTempPath { dir =>
@@ -1219,9 +1211,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
.coalesce(1)
df.write.partitionBy("a").options(extraOptions).parquet(dir.getCanonicalPath)
}.getCause.getMessage
- assert(m2.contains("Intentional exception for testing purposes") ||
- (m2.contains("Authorized committer") &&
- m2.contains("failed; but task commit success, data duplication may
happen.")))
+ assert(m2.contains("Intentional exception for testing purposes"))
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]