This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 143f9db2f3 [VL] Allow calling TaskResources.runUnsafe in Spark task
code (#11405)
143f9db2f3 is described below
commit 143f9db2f336cea8c31635846bce959f7b33b0b9
Author: Hongze Zhang <[email protected]>
AuthorDate: Fri Jan 16 16:08:47 2026 +0000
[VL] Allow calling TaskResources.runUnsafe in Spark task code (#11405)
The purpose of `TaskResources.runUnsafe` is to prevent driver side code
from leaking resources. It's safe to allow the usage in Spark task, so we could
reuse some driver-only code (e.g., rule application) in task as well.
---
.../src/main/scala/org/apache/spark/task/TaskResources.scala | 7 +++++--
1 file changed, 5 insertions(+), 2 deletions(-)
diff --git
a/gluten-core/src/main/scala/org/apache/spark/task/TaskResources.scala
b/gluten-core/src/main/scala/org/apache/spark/task/TaskResources.scala
index dd5eddf750..bf7c8cd422 100644
--- a/gluten-core/src/main/scala/org/apache/spark/task/TaskResources.scala
+++ b/gluten-core/src/main/scala/org/apache/spark/task/TaskResources.scala
@@ -55,7 +55,7 @@ object TaskResources extends TaskListener with Logging {
private def setUnsafeTaskContext(): Unit = {
if (inSparkTask()) {
throw new UnsupportedOperationException(
- "TaskResources#runUnsafe should only be used outside Spark task")
+ "TaskResources#setUnsafeTaskContext should only be called outside
Spark task")
}
val properties = new Properties()
SQLConf.get.getAllConfs.foreach {
@@ -83,11 +83,14 @@ object TaskResources extends TaskListener with Logging {
// be created and used. Since unsafe task context is not managed by Spark's
task memory manager,
// Spark may not be aware of the allocations happened inside the user code.
//
- // The API should only be used in the following cases:
+ // The API should typically be used in the following cases:
//
// 1. Run code on driver
// 2. Run test code
def runUnsafe[T](body: => T): T = {
+ if (inSparkTask()) {
+ return body
+ }
TaskResources.setUnsafeTaskContext()
onTaskStart()
val context = getLocalTaskContext()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]