This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new b939acd9072 [SPARK-45002][SS] Avoid uncaught exception from state
store maintenance task thread on error to prevent executor being killed
b939acd9072 is described below
commit b939acd9072ed3a1ba60940942df6fc72b97e5e8
Author: Anish Shrigondekar <[email protected]>
AuthorDate: Wed Aug 30 10:22:45 2023 +0900
[SPARK-45002][SS] Avoid uncaught exception from state store maintenance
task thread on error to prevent executor being killed
### What changes were proposed in this pull request?
Avoid uncaught exception from state store maintenance task thread on error
to prevent executor being killed
### Why are the changes needed?
With the current change, the uncaught exception handler was being triggered
on the executor leading to the executor being killed.
```
/**
* The default uncaught exception handler for Spark daemons. It terminates
the whole process for
* any Errors, and also terminates the process for Exceptions when the
exitOnException flag is true.
*
* param exitOnUncaughtException Whether to exit the process on
UncaughtException.
*/
private[spark] class SparkUncaughtExceptionHandler(val
exitOnUncaughtException: Boolean = true)
```
Ideally we just want to note the exception which will eventually be picked
up by the maintenance task thread and force the reset, instead of causing the
executor JVM process to be killed. This change fixes this issue.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added unit tests
Before:
```
[info] Run completed in 2 seconds, 398 milliseconds.
[info] Total number of tests run: 1
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 0, failed 1, canceled 0, ignored 0, pending 0
[info] *** 1 TEST FAILED ***
[error] Failed tests:
[error]
org.apache.spark.sql.execution.streaming.state.StateStoreSuite
```
After:
```
[info] Run completed in 2 seconds, 339 milliseconds.
[info] Total number of tests run: 1
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #42716 from anishshri-db/task/SPARK-45002.
Authored-by: Anish Shrigondekar <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../sql/execution/streaming/state/StateStore.scala | 1 -
.../execution/streaming/state/StateStoreSuite.scala | 20 ++++++++++++++++++++
2 files changed, 20 insertions(+), 1 deletion(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index a1d4f7f40a7..6a3a30c7efb 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -713,7 +713,6 @@ object StateStore extends Logging {
case NonFatal(e) =>
logWarning(s"Error managing $provider, stopping management
thread", e)
threadPoolException.set(e)
- throw e
} finally {
val duration = System.currentTimeMillis() - startTime
val logMsg = s"Finished maintenance task for provider=$id" +
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index 6c4e259bac5..093f4262011 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -21,6 +21,7 @@ import java.io.{File, IOException}
import java.net.URI
import java.util
import java.util.UUID
+import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -48,8 +49,15 @@ import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
class FakeStateStoreProviderWithMaintenanceError extends StateStoreProvider {
+ import FakeStateStoreProviderWithMaintenanceError._
private var id: StateStoreId = null
+ private val exceptionHandler = new Thread.UncaughtExceptionHandler() {
+ override def uncaughtException(t: Thread, e: Throwable): Unit = {
+ errorOnMaintenance.set(true)
+ }
+ }
+
override def init(
stateStoreId: StateStoreId,
keySchema: StructType,
@@ -67,10 +75,15 @@ class FakeStateStoreProviderWithMaintenanceError extends
StateStoreProvider {
override def getStore(version: Long): StateStore = null
override def doMaintenance(): Unit = {
+ Thread.currentThread.setUncaughtExceptionHandler(exceptionHandler)
throw new RuntimeException("Intentional maintenance failure")
}
}
+private object FakeStateStoreProviderWithMaintenanceError {
+ val errorOnMaintenance = new AtomicBoolean(false)
+}
+
@ExtendedSQLTest
class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
with BeforeAndAfter {
@@ -1364,6 +1377,7 @@ abstract class StateStoreSuiteBase[ProviderClass <:
StateStoreProvider]
quietly {
withSpark(new SparkContext(conf)) { sc =>
withCoordinatorRef(sc) { _ =>
+
FakeStateStoreProviderWithMaintenanceError.errorOnMaintenance.set(false)
val storeId = StateStoreProviderId(StateStoreId("firstDir", 0, 1),
UUID.randomUUID)
val storeConf = StateStoreConf(sqlConf)
@@ -1373,6 +1387,12 @@ abstract class StateStoreSuiteBase[ProviderClass <:
StateStoreProvider]
eventually(timeout(30.seconds)) {
assert(!StateStore.isMaintenanceRunning)
}
+
+ // SPARK-45002: The maintenance task thread failure should not
invoke the
+ // SparkUncaughtExceptionHandler which could lead to the executor
process
+ // getting killed.
+
assert(!FakeStateStoreProviderWithMaintenanceError.errorOnMaintenance.get)
+
StateStore.stop()
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]