This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new d476b370e [CORE] Propagate SQLConf to code block of 
TaskResources.runUnsafe (#6658)
d476b370e is described below

commit d476b370e55df08576ffd3407625a86f94518b6e
Author: Hongze Zhang <[email protected]>
AuthorDate: Thu Aug 1 15:03:52 2024 +0800

    [CORE] Propagate SQLConf to code block of TaskResources.runUnsafe (#6658)
---
 .../apache/gluten/test/VeloxBackendTestBase.java   |  2 +-
 .../gluten/planner/plan/GlutenPlanModel.scala      |  4 +--
 .../org/apache/spark/util/SparkTaskUtil.scala      |  5 ++++
 .../org/apache/spark/util/TaskResources.scala      | 24 ++++++++++++---
 .../memtarget/spark/TreeMemoryConsumerTest.java    | 34 ++++++++--------------
 .../apache/gluten/utils/TaskResourceSuite.scala    | 29 ++++++++++++++++--
 .../gluten/columnarbatch/ColumnarBatches.java      |  1 -
 .../org/apache/gluten/sql/shims/SparkShims.scala   |  4 +--
 .../gluten/sql/shims/spark32/Spark32Shims.scala    |  6 ++--
 .../scala/org/apache/spark/TaskContextUtils.scala  | 13 ++++-----
 .../gluten/sql/shims/spark33/Spark33Shims.scala    |  6 ++--
 .../scala/org/apache/spark/TaskContextUtils.scala  | 13 ++++-----
 .../gluten/sql/shims/spark34/Spark34Shims.scala    |  6 ++--
 .../scala/org/apache/spark/TaskContextUtils.scala  | 13 ++++-----
 .../gluten/sql/shims/spark35/Spark35Shims.scala    |  6 ++--
 .../scala/org/apache/spark/TaskContextUtils.scala  | 13 ++++-----
 tools/gluten-it/sbin/gluten-it.sh                  |  1 +
 17 files changed, 106 insertions(+), 74 deletions(-)

diff --git 
a/backends-velox/src/test/java/org/apache/gluten/test/VeloxBackendTestBase.java 
b/backends-velox/src/test/java/org/apache/gluten/test/VeloxBackendTestBase.java
index 1d7df2356..2f0087608 100644
--- 
a/backends-velox/src/test/java/org/apache/gluten/test/VeloxBackendTestBase.java
+++ 
b/backends-velox/src/test/java/org/apache/gluten/test/VeloxBackendTestBase.java
@@ -53,7 +53,7 @@ public abstract class VeloxBackendTestBase {
       @Override
       public SparkConf conf() {
         final SparkConf conf = new SparkConf();
-        conf.set(GlutenConfig.GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY(), "0");
+        conf.set(GlutenConfig.COLUMNAR_VELOX_CONNECTOR_IO_THREADS().key(), 
"0");
         return conf;
       }
 
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/planner/plan/GlutenPlanModel.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/planner/plan/GlutenPlanModel.scala
index 7417d9a5d..727613f56 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/planner/plan/GlutenPlanModel.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/planner/plan/GlutenPlanModel.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.execution.{ColumnarToRowExec, LeafExecNode, 
SparkPlan}
 import org.apache.spark.util.{SparkTaskUtil, TaskResources}
 
-import java.util.Objects
+import java.util.{Objects, Properties}
 
 object GlutenPlanModel {
   def apply(): PlanModel[SparkPlan] = {
@@ -71,7 +71,7 @@ object GlutenPlanModel {
   }
 
   private object PlanModelImpl extends PlanModel[SparkPlan] {
-    private val fakeTc = SparkShimLoader.getSparkShims.createTestTaskContext()
+    private val fakeTc = 
SparkShimLoader.getSparkShims.createTestTaskContext(new Properties())
     private def fakeTc[T](body: => T): T = {
       assert(!TaskResources.inSparkTask())
       SparkTaskUtil.setTaskContext(fakeTc)
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/util/SparkTaskUtil.scala 
b/gluten-core/src/main/scala/org/apache/spark/util/SparkTaskUtil.scala
index 92a12b3c6..21ef7bb0a 100644
--- a/gluten-core/src/main/scala/org/apache/spark/util/SparkTaskUtil.scala
+++ b/gluten-core/src/main/scala/org/apache/spark/util/SparkTaskUtil.scala
@@ -17,6 +17,7 @@
 package org.apache.spark.util
 
 import org.apache.spark.TaskContext
+import org.apache.spark.memory.TaskMemoryManager
 
 object SparkTaskUtil {
   def setTaskContext(taskContext: TaskContext): Unit = {
@@ -26,4 +27,8 @@ object SparkTaskUtil {
   def unsetTaskContext(): Unit = {
     TaskContext.unset()
   }
+
+  def getTaskMemoryManager(taskContext: TaskContext): TaskMemoryManager = {
+    taskContext.taskMemoryManager()
+  }
 }
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/util/TaskResources.scala 
b/gluten-core/src/main/scala/org/apache/spark/util/TaskResources.scala
index d8079abce..2ab2a41a4 100644
--- a/gluten-core/src/main/scala/org/apache/spark/util/TaskResources.scala
+++ b/gluten-core/src/main/scala/org/apache/spark/util/TaskResources.scala
@@ -25,7 +25,7 @@ import _root_.org.apache.gluten.sql.shims.SparkShimLoader
 import _root_.org.apache.gluten.utils.TaskListener
 
 import java.util
-import java.util.{Collections, UUID}
+import java.util.{Collections, Properties, UUID}
 import java.util.concurrent.atomic.AtomicLong
 
 import scala.collection.JavaConverters._
@@ -40,8 +40,16 @@ object TaskResources extends TaskListener with Logging {
   }
   val ACCUMULATED_LEAK_BYTES = new AtomicLong(0L)
 
-  private def newUnsafeTaskContext(): TaskContext = {
-    SparkShimLoader.getSparkShims.createTestTaskContext()
+  private def newUnsafeTaskContext(properties: Properties): TaskContext = {
+    SparkShimLoader.getSparkShims.createTestTaskContext(properties)
+  }
+
+  implicit private class PropertiesOps(properties: Properties) {
+    def setIfMissing(key: String, value: String): Unit = {
+      if (!properties.containsKey(key)) {
+        properties.setProperty(key, value)
+      }
+    }
   }
 
   private def setUnsafeTaskContext(): Unit = {
@@ -49,7 +57,15 @@ object TaskResources extends TaskListener with Logging {
       throw new UnsupportedOperationException(
         "TaskResources#runUnsafe should only be used outside Spark task")
     }
-    TaskContext.setTaskContext(newUnsafeTaskContext())
+    val properties = new Properties()
+    SQLConf.get.getAllConfs.foreach {
+      case (key, value) if key.startsWith("spark") =>
+        properties.put(key, value)
+      case _ =>
+    }
+    properties.setIfMissing("spark.memory.offHeap.enabled", "true")
+    properties.setIfMissing("spark.memory.offHeap.size", "1TB")
+    TaskContext.setTaskContext(newUnsafeTaskContext(properties))
   }
 
   private def unsetUnsafeTaskContext(): Unit = {
diff --git 
a/gluten-core/src/test/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumerTest.java
 
b/gluten-core/src/test/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumerTest.java
index db018ffe4..1632e5ef4 100644
--- 
a/gluten-core/src/test/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumerTest.java
+++ 
b/gluten-core/src/test/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumerTest.java
@@ -24,6 +24,7 @@ import org.apache.spark.TaskContext;
 import org.apache.spark.sql.internal.SQLConf;
 import org.apache.spark.util.TaskResources$;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Collections;
@@ -31,13 +32,18 @@ import java.util.Collections;
 import scala.Function0;
 
 public class TreeMemoryConsumerTest {
-  @Test
-  public void testIsolated() {
-    final SQLConf conf = new SQLConf();
+  @Before
+  public void setUp() throws Exception {
+    final SQLConf conf = SQLConf.get();
+    conf.setConfString("spark.memory.offHeap.enabled", "true");
+    conf.setConfString("spark.memory.offHeap.size", "400");
     conf.setConfString(
         GlutenConfig.COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES().key(), 
"100");
+  }
+
+  @Test
+  public void testIsolated() {
     test(
-        conf,
         () -> {
           final TreeMemoryConsumers.Factory factory = 
TreeMemoryConsumers.isolated();
           final TreeMemoryTarget consumer =
@@ -55,11 +61,7 @@ public class TreeMemoryConsumerTest {
 
   @Test
   public void testShared() {
-    final SQLConf conf = new SQLConf();
-    conf.setConfString(
-        GlutenConfig.COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES().key(), 
"100");
     test(
-        conf,
         () -> {
           final TreeMemoryConsumers.Factory factory = 
TreeMemoryConsumers.shared();
           final TreeMemoryTarget consumer =
@@ -77,11 +79,7 @@ public class TreeMemoryConsumerTest {
 
   @Test
   public void testIsolatedAndShared() {
-    final SQLConf conf = new SQLConf();
-    conf.setConfString(
-        GlutenConfig.COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES().key(), 
"100");
     test(
-        conf,
         () -> {
           final TreeMemoryTarget shared =
               TreeMemoryConsumers.shared()
@@ -102,20 +100,12 @@ public class TreeMemoryConsumerTest {
         });
   }
 
-  private void test(SQLConf conf, Runnable r) {
+  private void test(Runnable r) {
     TaskResources$.MODULE$.runUnsafe(
         new Function0<Object>() {
           @Override
           public Object apply() {
-            SQLConf.withExistingConf(
-                conf,
-                new Function0<Object>() {
-                  @Override
-                  public Object apply() {
-                    r.run();
-                    return null;
-                  }
-                });
+            r.run();
             return null;
           }
         });
diff --git 
a/gluten-core/src/test/scala/org/apache/gluten/utils/TaskResourceSuite.scala 
b/gluten-core/src/test/scala/org/apache/gluten/utils/TaskResourceSuite.scala
index 898cf873c..0c91b4faf 100644
--- a/gluten-core/src/test/scala/org/apache/gluten/utils/TaskResourceSuite.scala
+++ b/gluten-core/src/test/scala/org/apache/gluten/utils/TaskResourceSuite.scala
@@ -16,13 +16,16 @@
  */
 package org.apache.gluten.utils
 
-import org.apache.spark.util.{TaskResource, TaskResources}
+import org.apache.spark.memory.{MemoryConsumer, MemoryMode}
+import org.apache.spark.sql.catalyst.plans.SQLHelper
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.{SparkTaskUtil, TaskResource, TaskResources}
 
 import org.scalatest.funsuite.AnyFunSuite
 
 import java.util.UUID
 
-class TaskResourceSuite extends AnyFunSuite {
+class TaskResourceSuite extends AnyFunSuite with SQLHelper {
   test("Run unsafe") {
     val out = TaskResources.runUnsafe {
       1
@@ -37,6 +40,28 @@ class TaskResourceSuite extends AnyFunSuite {
     }
   }
 
+  test("Run unsafe - propagate Spark config") {
+    val total = 128 * 1024 * 1024
+    withSQLConf(
+      "spark.memory.offHeap.enabled" -> "true",
+      "spark.memory.offHeap.size" -> s"$total",
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
+      TaskResources.runUnsafe {
+        assert(TaskResources.inSparkTask())
+        assert(TaskResources.getLocalTaskContext() != null)
+
+        val tmm = 
SparkTaskUtil.getTaskMemoryManager(TaskResources.getLocalTaskContext())
+        val consumer = new MemoryConsumer(tmm, MemoryMode.OFF_HEAP) {
+          override def spill(size: Long, trigger: MemoryConsumer): Long = 0L
+        }
+        assert(consumer.acquireMemory(total) == total)
+        assert(consumer.acquireMemory(1) == 0)
+
+        assert(!SQLConf.get.adaptiveExecutionEnabled)
+      }
+    }
+  }
+
   test("Run unsafe - register resource") {
     var unregisteredCount = 0
     TaskResources.runUnsafe {
diff --git 
a/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
 
b/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
index cb68e032d..a72eaafae 100644
--- 
a/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
+++ 
b/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
@@ -100,7 +100,6 @@ public class ColumnarBatches {
         newVectors[i] = from.column(i);
       }
       FIELD_COLUMNS.set(target, newVectors);
-      System.out.println();
     } catch (IllegalAccessException e) {
       throw new GlutenException(e);
     }
diff --git 
a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala 
b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
index fed08b789..5a0946988 100644
--- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
@@ -50,7 +50,7 @@ import org.apache.spark.storage.{BlockId, BlockManagerId}
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.parquet.schema.MessageType
 
-import java.util.{ArrayList => JArrayList, Map => JMap}
+import java.util.{Map => JMap, Properties}
 
 import scala.reflect.ClassTag
 
@@ -156,7 +156,7 @@ trait SparkShims {
 
   def enableNativeWriteFilesByDefault(): Boolean = false
 
-  def createTestTaskContext(): TaskContext
+  def createTestTaskContext(properties: Properties): TaskContext
 
   def broadcastInternal[T: ClassTag](sc: SparkContext, value: T): Broadcast[T] 
= {
     // Since Spark 3.4, the `sc.broadcast` has been optimized to use 
`sc.broadcastInternal`.
diff --git 
a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
 
b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
index 5ad63884a..995d5b087 100644
--- 
a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
+++ 
b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
@@ -53,7 +53,7 @@ import org.apache.spark.storage.{BlockId, BlockManagerId}
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.parquet.schema.MessageType
 
-import java.util.{HashMap => JHashMap, Map => JMap}
+import java.util.{HashMap => JHashMap, Map => JMap, Properties}
 
 class Spark32Shims extends SparkShims {
   override def getShimDescriptor: ShimDescriptor = SparkShimProvider.DESCRIPTOR
@@ -156,8 +156,8 @@ class Spark32Shims extends SparkShims {
     List(session => 
GlutenParquetWriterInjects.getInstance().getExtendedColumnarPostRule(session))
   }
 
-  override def createTestTaskContext(): TaskContext = {
-    TaskContextUtils.createTestTaskContext()
+  override def createTestTaskContext(properties: Properties): TaskContext = {
+    TaskContextUtils.createTestTaskContext(properties)
   }
 
   def setJobDescriptionOrTagForBroadcastExchange(
diff --git 
a/shims/spark32/src/main/scala/org/apache/spark/TaskContextUtils.scala 
b/shims/spark32/src/main/scala/org/apache/spark/TaskContextUtils.scala
index c4abeb411..ac7f926f1 100644
--- a/shims/spark32/src/main/scala/org/apache/spark/TaskContextUtils.scala
+++ b/shims/spark32/src/main/scala/org/apache/spark/TaskContextUtils.scala
@@ -19,17 +19,16 @@ package org.apache.spark
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.memory.{TaskMemoryManager, UnifiedMemoryManager}
 import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.network.util.ByteUnit
 
 import java.util.Properties
 
+import scala.collection.JavaConverters._
+
 object TaskContextUtils {
-  def createTestTaskContext(): TaskContext = {
+  def createTestTaskContext(properties: Properties): TaskContext = {
     val conf = new SparkConf()
-    conf.set("spark.memory.offHeap.enabled", "true")
-    conf.set("spark.memory.offHeap.size", "1TB")
-    val memoryManager =
-      new UnifiedMemoryManager(conf, ByteUnit.TiB.toBytes(2), 
ByteUnit.TiB.toBytes(1), 1)
+    conf.setAll(properties.asScala)
+    val memoryManager = UnifiedMemoryManager(conf, 1)
     new TaskContextImpl(
       -1,
       -1,
@@ -37,7 +36,7 @@ object TaskContextUtils {
       -1L,
       -1,
       new TaskMemoryManager(memoryManager, -1L),
-      new Properties,
+      properties,
       MetricsSystem.createMetricsSystem("GLUTEN_UNSAFE", conf),
       TaskMetrics.empty,
       Map.empty
diff --git 
a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
 
b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
index 8b12c2642..7b606ea97 100644
--- 
a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
+++ 
b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
@@ -56,7 +56,7 @@ import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.parquet.schema.MessageType
 
 import java.time.ZoneOffset
-import java.util.{HashMap => JHashMap, Map => JMap}
+import java.util.{HashMap => JHashMap, Map => JMap, Properties}
 
 class Spark33Shims extends SparkShims {
   override def getShimDescriptor: ShimDescriptor = SparkShimProvider.DESCRIPTOR
@@ -248,8 +248,8 @@ class Spark33Shims extends SparkShims {
     List(session => 
GlutenParquetWriterInjects.getInstance().getExtendedColumnarPostRule(session))
   }
 
-  override def createTestTaskContext(): TaskContext = {
-    TaskContextUtils.createTestTaskContext()
+  override def createTestTaskContext(properties: Properties): TaskContext = {
+    TaskContextUtils.createTestTaskContext(properties)
   }
 
   def setJobDescriptionOrTagForBroadcastExchange(
diff --git 
a/shims/spark33/src/main/scala/org/apache/spark/TaskContextUtils.scala 
b/shims/spark33/src/main/scala/org/apache/spark/TaskContextUtils.scala
index 6d7e686de..c4fea992d 100644
--- a/shims/spark33/src/main/scala/org/apache/spark/TaskContextUtils.scala
+++ b/shims/spark33/src/main/scala/org/apache/spark/TaskContextUtils.scala
@@ -19,17 +19,16 @@ package org.apache.spark
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.memory.{TaskMemoryManager, UnifiedMemoryManager}
 import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.network.util.ByteUnit
 
 import java.util.Properties
 
+import scala.collection.JavaConverters._
+
 object TaskContextUtils {
-  def createTestTaskContext(): TaskContext = {
+  def createTestTaskContext(properties: Properties): TaskContext = {
     val conf = new SparkConf()
-    conf.set("spark.memory.offHeap.enabled", "true")
-    conf.set("spark.memory.offHeap.size", "1TB")
-    val memoryManager =
-      new UnifiedMemoryManager(conf, ByteUnit.TiB.toBytes(2), 
ByteUnit.TiB.toBytes(1), 1)
+    conf.setAll(properties.asScala)
+    val memoryManager = UnifiedMemoryManager(conf, 1)
     new TaskContextImpl(
       -1,
       -1,
@@ -37,7 +36,7 @@ object TaskContextUtils {
       -1L,
       -1,
       new TaskMemoryManager(memoryManager, -1L),
-      new Properties,
+      properties,
       MetricsSystem.createMetricsSystem("GLUTEN_UNSAFE", conf),
       TaskMetrics.empty,
       1,
diff --git 
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
 
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
index cd7e4347d..8b1b7649a 100644
--- 
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
+++ 
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
@@ -58,7 +58,7 @@ import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.parquet.schema.MessageType
 
 import java.time.ZoneOffset
-import java.util.{HashMap => JHashMap, Map => JMap}
+import java.util.{HashMap => JHashMap, Map => JMap, Properties}
 
 import scala.reflect.ClassTag
 
@@ -299,8 +299,8 @@ class Spark34Shims extends SparkShims {
 
   override def enableNativeWriteFilesByDefault(): Boolean = true
 
-  override def createTestTaskContext(): TaskContext = {
-    TaskContextUtils.createTestTaskContext()
+  override def createTestTaskContext(properties: Properties): TaskContext = {
+    TaskContextUtils.createTestTaskContext(properties)
   }
 
   override def broadcastInternal[T: ClassTag](sc: SparkContext, value: T): 
Broadcast[T] = {
diff --git 
a/shims/spark34/src/main/scala/org/apache/spark/TaskContextUtils.scala 
b/shims/spark34/src/main/scala/org/apache/spark/TaskContextUtils.scala
index 976851eb9..7a81b6121 100644
--- a/shims/spark34/src/main/scala/org/apache/spark/TaskContextUtils.scala
+++ b/shims/spark34/src/main/scala/org/apache/spark/TaskContextUtils.scala
@@ -19,17 +19,16 @@ package org.apache.spark
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.memory.{TaskMemoryManager, UnifiedMemoryManager}
 import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.network.util.ByteUnit
 
 import java.util.Properties
 
+import scala.collection.JavaConverters._
+
 object TaskContextUtils {
-  def createTestTaskContext(): TaskContext = {
+  def createTestTaskContext(properties: Properties): TaskContext = {
     val conf = new SparkConf()
-    conf.set("spark.memory.offHeap.enabled", "true")
-    conf.set("spark.memory.offHeap.size", "1TB")
-    val memoryManager =
-      new UnifiedMemoryManager(conf, ByteUnit.TiB.toBytes(2), 
ByteUnit.TiB.toBytes(1), 1)
+    conf.setAll(properties.asScala)
+    val memoryManager = UnifiedMemoryManager(conf, 1)
     new TaskContextImpl(
       -1,
       -1,
@@ -38,7 +37,7 @@ object TaskContextUtils {
       -1,
       -1,
       new TaskMemoryManager(memoryManager, -1L),
-      new Properties,
+      properties,
       MetricsSystem.createMetricsSystem("GLUTEN_UNSAFE", conf),
       TaskMetrics.empty,
       1,
diff --git 
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
 
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
index bb41b7e73..93785d7a2 100644
--- 
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
+++ 
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
@@ -58,7 +58,7 @@ import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.parquet.schema.MessageType
 
 import java.time.ZoneOffset
-import java.util.{HashMap => JHashMap, Map => JMap}
+import java.util.{HashMap => JHashMap, Map => JMap, Properties}
 
 import scala.reflect.ClassTag
 
@@ -326,8 +326,8 @@ class Spark35Shims extends SparkShims {
 
   override def enableNativeWriteFilesByDefault(): Boolean = true
 
-  override def createTestTaskContext(): TaskContext = {
-    TaskContextUtils.createTestTaskContext()
+  override def createTestTaskContext(properties: Properties): TaskContext = {
+    TaskContextUtils.createTestTaskContext(properties)
   }
 
   override def broadcastInternal[T: ClassTag](sc: SparkContext, value: T): 
Broadcast[T] = {
diff --git 
a/shims/spark35/src/main/scala/org/apache/spark/TaskContextUtils.scala 
b/shims/spark35/src/main/scala/org/apache/spark/TaskContextUtils.scala
index 976851eb9..7a81b6121 100644
--- a/shims/spark35/src/main/scala/org/apache/spark/TaskContextUtils.scala
+++ b/shims/spark35/src/main/scala/org/apache/spark/TaskContextUtils.scala
@@ -19,17 +19,16 @@ package org.apache.spark
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.memory.{TaskMemoryManager, UnifiedMemoryManager}
 import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.network.util.ByteUnit
 
 import java.util.Properties
 
+import scala.collection.JavaConverters._
+
 object TaskContextUtils {
-  def createTestTaskContext(): TaskContext = {
+  def createTestTaskContext(properties: Properties): TaskContext = {
     val conf = new SparkConf()
-    conf.set("spark.memory.offHeap.enabled", "true")
-    conf.set("spark.memory.offHeap.size", "1TB")
-    val memoryManager =
-      new UnifiedMemoryManager(conf, ByteUnit.TiB.toBytes(2), 
ByteUnit.TiB.toBytes(1), 1)
+    conf.setAll(properties.asScala)
+    val memoryManager = UnifiedMemoryManager(conf, 1)
     new TaskContextImpl(
       -1,
       -1,
@@ -38,7 +37,7 @@ object TaskContextUtils {
       -1,
       -1,
       new TaskMemoryManager(memoryManager, -1L),
-      new Properties,
+      properties,
       MetricsSystem.createMetricsSystem("GLUTEN_UNSAFE", conf),
       TaskMetrics.empty,
       1,
diff --git a/tools/gluten-it/sbin/gluten-it.sh 
b/tools/gluten-it/sbin/gluten-it.sh
index b21038ccd..00ff78e34 100755
--- a/tools/gluten-it/sbin/gluten-it.sh
+++ b/tools/gluten-it/sbin/gluten-it.sh
@@ -48,6 +48,7 @@ $JAVA_HOME/bin/java $GLUTEN_IT_JVM_ARGS \
     --add-opens=java.base/java.util.concurrent=ALL-UNNAMED \
     --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED \
     --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED \
+    --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED \
     --add-opens=java.base/sun.nio.ch=ALL-UNNAMED \
     --add-opens=java.base/sun.nio.cs=ALL-UNNAMED \
     --add-opens=java.base/sun.security.action=ALL-UNNAMED \


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to