This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new d476b370e [CORE] Propagate SQLConf to code block of
TaskResources.runUnsafe (#6658)
d476b370e is described below
commit d476b370e55df08576ffd3407625a86f94518b6e
Author: Hongze Zhang <[email protected]>
AuthorDate: Thu Aug 1 15:03:52 2024 +0800
[CORE] Propagate SQLConf to code block of TaskResources.runUnsafe (#6658)
---
.../apache/gluten/test/VeloxBackendTestBase.java | 2 +-
.../gluten/planner/plan/GlutenPlanModel.scala | 4 +--
.../org/apache/spark/util/SparkTaskUtil.scala | 5 ++++
.../org/apache/spark/util/TaskResources.scala | 24 ++++++++++++---
.../memtarget/spark/TreeMemoryConsumerTest.java | 34 ++++++++--------------
.../apache/gluten/utils/TaskResourceSuite.scala | 29 ++++++++++++++++--
.../gluten/columnarbatch/ColumnarBatches.java | 1 -
.../org/apache/gluten/sql/shims/SparkShims.scala | 4 +--
.../gluten/sql/shims/spark32/Spark32Shims.scala | 6 ++--
.../scala/org/apache/spark/TaskContextUtils.scala | 13 ++++-----
.../gluten/sql/shims/spark33/Spark33Shims.scala | 6 ++--
.../scala/org/apache/spark/TaskContextUtils.scala | 13 ++++-----
.../gluten/sql/shims/spark34/Spark34Shims.scala | 6 ++--
.../scala/org/apache/spark/TaskContextUtils.scala | 13 ++++-----
.../gluten/sql/shims/spark35/Spark35Shims.scala | 6 ++--
.../scala/org/apache/spark/TaskContextUtils.scala | 13 ++++-----
tools/gluten-it/sbin/gluten-it.sh | 1 +
17 files changed, 106 insertions(+), 74 deletions(-)
diff --git
a/backends-velox/src/test/java/org/apache/gluten/test/VeloxBackendTestBase.java
b/backends-velox/src/test/java/org/apache/gluten/test/VeloxBackendTestBase.java
index 1d7df2356..2f0087608 100644
---
a/backends-velox/src/test/java/org/apache/gluten/test/VeloxBackendTestBase.java
+++
b/backends-velox/src/test/java/org/apache/gluten/test/VeloxBackendTestBase.java
@@ -53,7 +53,7 @@ public abstract class VeloxBackendTestBase {
@Override
public SparkConf conf() {
final SparkConf conf = new SparkConf();
- conf.set(GlutenConfig.GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY(), "0");
+ conf.set(GlutenConfig.COLUMNAR_VELOX_CONNECTOR_IO_THREADS().key(),
"0");
return conf;
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/planner/plan/GlutenPlanModel.scala
b/gluten-core/src/main/scala/org/apache/gluten/planner/plan/GlutenPlanModel.scala
index 7417d9a5d..727613f56 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/planner/plan/GlutenPlanModel.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/planner/plan/GlutenPlanModel.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.{ColumnarToRowExec, LeafExecNode,
SparkPlan}
import org.apache.spark.util.{SparkTaskUtil, TaskResources}
-import java.util.Objects
+import java.util.{Objects, Properties}
object GlutenPlanModel {
def apply(): PlanModel[SparkPlan] = {
@@ -71,7 +71,7 @@ object GlutenPlanModel {
}
private object PlanModelImpl extends PlanModel[SparkPlan] {
- private val fakeTc = SparkShimLoader.getSparkShims.createTestTaskContext()
+ private val fakeTc =
SparkShimLoader.getSparkShims.createTestTaskContext(new Properties())
private def fakeTc[T](body: => T): T = {
assert(!TaskResources.inSparkTask())
SparkTaskUtil.setTaskContext(fakeTc)
diff --git
a/gluten-core/src/main/scala/org/apache/spark/util/SparkTaskUtil.scala
b/gluten-core/src/main/scala/org/apache/spark/util/SparkTaskUtil.scala
index 92a12b3c6..21ef7bb0a 100644
--- a/gluten-core/src/main/scala/org/apache/spark/util/SparkTaskUtil.scala
+++ b/gluten-core/src/main/scala/org/apache/spark/util/SparkTaskUtil.scala
@@ -17,6 +17,7 @@
package org.apache.spark.util
import org.apache.spark.TaskContext
+import org.apache.spark.memory.TaskMemoryManager
object SparkTaskUtil {
def setTaskContext(taskContext: TaskContext): Unit = {
@@ -26,4 +27,8 @@ object SparkTaskUtil {
def unsetTaskContext(): Unit = {
TaskContext.unset()
}
+
+ def getTaskMemoryManager(taskContext: TaskContext): TaskMemoryManager = {
+ taskContext.taskMemoryManager()
+ }
}
diff --git
a/gluten-core/src/main/scala/org/apache/spark/util/TaskResources.scala
b/gluten-core/src/main/scala/org/apache/spark/util/TaskResources.scala
index d8079abce..2ab2a41a4 100644
--- a/gluten-core/src/main/scala/org/apache/spark/util/TaskResources.scala
+++ b/gluten-core/src/main/scala/org/apache/spark/util/TaskResources.scala
@@ -25,7 +25,7 @@ import _root_.org.apache.gluten.sql.shims.SparkShimLoader
import _root_.org.apache.gluten.utils.TaskListener
import java.util
-import java.util.{Collections, UUID}
+import java.util.{Collections, Properties, UUID}
import java.util.concurrent.atomic.AtomicLong
import scala.collection.JavaConverters._
@@ -40,8 +40,16 @@ object TaskResources extends TaskListener with Logging {
}
val ACCUMULATED_LEAK_BYTES = new AtomicLong(0L)
- private def newUnsafeTaskContext(): TaskContext = {
- SparkShimLoader.getSparkShims.createTestTaskContext()
+ private def newUnsafeTaskContext(properties: Properties): TaskContext = {
+ SparkShimLoader.getSparkShims.createTestTaskContext(properties)
+ }
+
+ implicit private class PropertiesOps(properties: Properties) {
+ def setIfMissing(key: String, value: String): Unit = {
+ if (!properties.containsKey(key)) {
+ properties.setProperty(key, value)
+ }
+ }
}
private def setUnsafeTaskContext(): Unit = {
@@ -49,7 +57,15 @@ object TaskResources extends TaskListener with Logging {
throw new UnsupportedOperationException(
"TaskResources#runUnsafe should only be used outside Spark task")
}
- TaskContext.setTaskContext(newUnsafeTaskContext())
+ val properties = new Properties()
+ SQLConf.get.getAllConfs.foreach {
+ case (key, value) if key.startsWith("spark") =>
+ properties.put(key, value)
+ case _ =>
+ }
+ properties.setIfMissing("spark.memory.offHeap.enabled", "true")
+ properties.setIfMissing("spark.memory.offHeap.size", "1TB")
+ TaskContext.setTaskContext(newUnsafeTaskContext(properties))
}
private def unsetUnsafeTaskContext(): Unit = {
diff --git
a/gluten-core/src/test/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumerTest.java
b/gluten-core/src/test/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumerTest.java
index db018ffe4..1632e5ef4 100644
---
a/gluten-core/src/test/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumerTest.java
+++
b/gluten-core/src/test/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumerTest.java
@@ -24,6 +24,7 @@ import org.apache.spark.TaskContext;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.util.TaskResources$;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import java.util.Collections;
@@ -31,13 +32,18 @@ import java.util.Collections;
import scala.Function0;
public class TreeMemoryConsumerTest {
- @Test
- public void testIsolated() {
- final SQLConf conf = new SQLConf();
+ @Before
+ public void setUp() throws Exception {
+ final SQLConf conf = SQLConf.get();
+ conf.setConfString("spark.memory.offHeap.enabled", "true");
+ conf.setConfString("spark.memory.offHeap.size", "400");
conf.setConfString(
GlutenConfig.COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES().key(),
"100");
+ }
+
+ @Test
+ public void testIsolated() {
test(
- conf,
() -> {
final TreeMemoryConsumers.Factory factory =
TreeMemoryConsumers.isolated();
final TreeMemoryTarget consumer =
@@ -55,11 +61,7 @@ public class TreeMemoryConsumerTest {
@Test
public void testShared() {
- final SQLConf conf = new SQLConf();
- conf.setConfString(
- GlutenConfig.COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES().key(),
"100");
test(
- conf,
() -> {
final TreeMemoryConsumers.Factory factory =
TreeMemoryConsumers.shared();
final TreeMemoryTarget consumer =
@@ -77,11 +79,7 @@ public class TreeMemoryConsumerTest {
@Test
public void testIsolatedAndShared() {
- final SQLConf conf = new SQLConf();
- conf.setConfString(
- GlutenConfig.COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES().key(),
"100");
test(
- conf,
() -> {
final TreeMemoryTarget shared =
TreeMemoryConsumers.shared()
@@ -102,20 +100,12 @@ public class TreeMemoryConsumerTest {
});
}
- private void test(SQLConf conf, Runnable r) {
+ private void test(Runnable r) {
TaskResources$.MODULE$.runUnsafe(
new Function0<Object>() {
@Override
public Object apply() {
- SQLConf.withExistingConf(
- conf,
- new Function0<Object>() {
- @Override
- public Object apply() {
- r.run();
- return null;
- }
- });
+ r.run();
return null;
}
});
diff --git
a/gluten-core/src/test/scala/org/apache/gluten/utils/TaskResourceSuite.scala
b/gluten-core/src/test/scala/org/apache/gluten/utils/TaskResourceSuite.scala
index 898cf873c..0c91b4faf 100644
--- a/gluten-core/src/test/scala/org/apache/gluten/utils/TaskResourceSuite.scala
+++ b/gluten-core/src/test/scala/org/apache/gluten/utils/TaskResourceSuite.scala
@@ -16,13 +16,16 @@
*/
package org.apache.gluten.utils
-import org.apache.spark.util.{TaskResource, TaskResources}
+import org.apache.spark.memory.{MemoryConsumer, MemoryMode}
+import org.apache.spark.sql.catalyst.plans.SQLHelper
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.{SparkTaskUtil, TaskResource, TaskResources}
import org.scalatest.funsuite.AnyFunSuite
import java.util.UUID
-class TaskResourceSuite extends AnyFunSuite {
+class TaskResourceSuite extends AnyFunSuite with SQLHelper {
test("Run unsafe") {
val out = TaskResources.runUnsafe {
1
@@ -37,6 +40,28 @@ class TaskResourceSuite extends AnyFunSuite {
}
}
+ test("Run unsafe - propagate Spark config") {
+ val total = 128 * 1024 * 1024
+ withSQLConf(
+ "spark.memory.offHeap.enabled" -> "true",
+ "spark.memory.offHeap.size" -> s"$total",
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
+ TaskResources.runUnsafe {
+ assert(TaskResources.inSparkTask())
+ assert(TaskResources.getLocalTaskContext() != null)
+
+ val tmm =
SparkTaskUtil.getTaskMemoryManager(TaskResources.getLocalTaskContext())
+ val consumer = new MemoryConsumer(tmm, MemoryMode.OFF_HEAP) {
+ override def spill(size: Long, trigger: MemoryConsumer): Long = 0L
+ }
+ assert(consumer.acquireMemory(total) == total)
+ assert(consumer.acquireMemory(1) == 0)
+
+ assert(!SQLConf.get.adaptiveExecutionEnabled)
+ }
+ }
+ }
+
test("Run unsafe - register resource") {
var unregisteredCount = 0
TaskResources.runUnsafe {
diff --git
a/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
b/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
index cb68e032d..a72eaafae 100644
---
a/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
+++
b/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
@@ -100,7 +100,6 @@ public class ColumnarBatches {
newVectors[i] = from.column(i);
}
FIELD_COLUMNS.set(target, newVectors);
- System.out.println();
} catch (IllegalAccessException e) {
throw new GlutenException(e);
}
diff --git
a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
index fed08b789..5a0946988 100644
--- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
@@ -50,7 +50,7 @@ import org.apache.spark.storage.{BlockId, BlockManagerId}
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.parquet.schema.MessageType
-import java.util.{ArrayList => JArrayList, Map => JMap}
+import java.util.{Map => JMap, Properties}
import scala.reflect.ClassTag
@@ -156,7 +156,7 @@ trait SparkShims {
def enableNativeWriteFilesByDefault(): Boolean = false
- def createTestTaskContext(): TaskContext
+ def createTestTaskContext(properties: Properties): TaskContext
def broadcastInternal[T: ClassTag](sc: SparkContext, value: T): Broadcast[T]
= {
// Since Spark 3.4, the `sc.broadcast` has been optimized to use
`sc.broadcastInternal`.
diff --git
a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
index 5ad63884a..995d5b087 100644
---
a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
+++
b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
@@ -53,7 +53,7 @@ import org.apache.spark.storage.{BlockId, BlockManagerId}
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.parquet.schema.MessageType
-import java.util.{HashMap => JHashMap, Map => JMap}
+import java.util.{HashMap => JHashMap, Map => JMap, Properties}
class Spark32Shims extends SparkShims {
override def getShimDescriptor: ShimDescriptor = SparkShimProvider.DESCRIPTOR
@@ -156,8 +156,8 @@ class Spark32Shims extends SparkShims {
List(session =>
GlutenParquetWriterInjects.getInstance().getExtendedColumnarPostRule(session))
}
- override def createTestTaskContext(): TaskContext = {
- TaskContextUtils.createTestTaskContext()
+ override def createTestTaskContext(properties: Properties): TaskContext = {
+ TaskContextUtils.createTestTaskContext(properties)
}
def setJobDescriptionOrTagForBroadcastExchange(
diff --git
a/shims/spark32/src/main/scala/org/apache/spark/TaskContextUtils.scala
b/shims/spark32/src/main/scala/org/apache/spark/TaskContextUtils.scala
index c4abeb411..ac7f926f1 100644
--- a/shims/spark32/src/main/scala/org/apache/spark/TaskContextUtils.scala
+++ b/shims/spark32/src/main/scala/org/apache/spark/TaskContextUtils.scala
@@ -19,17 +19,16 @@ package org.apache.spark
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.memory.{TaskMemoryManager, UnifiedMemoryManager}
import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.network.util.ByteUnit
import java.util.Properties
+import scala.collection.JavaConverters._
+
object TaskContextUtils {
- def createTestTaskContext(): TaskContext = {
+ def createTestTaskContext(properties: Properties): TaskContext = {
val conf = new SparkConf()
- conf.set("spark.memory.offHeap.enabled", "true")
- conf.set("spark.memory.offHeap.size", "1TB")
- val memoryManager =
- new UnifiedMemoryManager(conf, ByteUnit.TiB.toBytes(2),
ByteUnit.TiB.toBytes(1), 1)
+ conf.setAll(properties.asScala)
+ val memoryManager = UnifiedMemoryManager(conf, 1)
new TaskContextImpl(
-1,
-1,
@@ -37,7 +36,7 @@ object TaskContextUtils {
-1L,
-1,
new TaskMemoryManager(memoryManager, -1L),
- new Properties,
+ properties,
MetricsSystem.createMetricsSystem("GLUTEN_UNSAFE", conf),
TaskMetrics.empty,
Map.empty
diff --git
a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
index 8b12c2642..7b606ea97 100644
---
a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
+++
b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
@@ -56,7 +56,7 @@ import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.parquet.schema.MessageType
import java.time.ZoneOffset
-import java.util.{HashMap => JHashMap, Map => JMap}
+import java.util.{HashMap => JHashMap, Map => JMap, Properties}
class Spark33Shims extends SparkShims {
override def getShimDescriptor: ShimDescriptor = SparkShimProvider.DESCRIPTOR
@@ -248,8 +248,8 @@ class Spark33Shims extends SparkShims {
List(session =>
GlutenParquetWriterInjects.getInstance().getExtendedColumnarPostRule(session))
}
- override def createTestTaskContext(): TaskContext = {
- TaskContextUtils.createTestTaskContext()
+ override def createTestTaskContext(properties: Properties): TaskContext = {
+ TaskContextUtils.createTestTaskContext(properties)
}
def setJobDescriptionOrTagForBroadcastExchange(
diff --git
a/shims/spark33/src/main/scala/org/apache/spark/TaskContextUtils.scala
b/shims/spark33/src/main/scala/org/apache/spark/TaskContextUtils.scala
index 6d7e686de..c4fea992d 100644
--- a/shims/spark33/src/main/scala/org/apache/spark/TaskContextUtils.scala
+++ b/shims/spark33/src/main/scala/org/apache/spark/TaskContextUtils.scala
@@ -19,17 +19,16 @@ package org.apache.spark
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.memory.{TaskMemoryManager, UnifiedMemoryManager}
import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.network.util.ByteUnit
import java.util.Properties
+import scala.collection.JavaConverters._
+
object TaskContextUtils {
- def createTestTaskContext(): TaskContext = {
+ def createTestTaskContext(properties: Properties): TaskContext = {
val conf = new SparkConf()
- conf.set("spark.memory.offHeap.enabled", "true")
- conf.set("spark.memory.offHeap.size", "1TB")
- val memoryManager =
- new UnifiedMemoryManager(conf, ByteUnit.TiB.toBytes(2),
ByteUnit.TiB.toBytes(1), 1)
+ conf.setAll(properties.asScala)
+ val memoryManager = UnifiedMemoryManager(conf, 1)
new TaskContextImpl(
-1,
-1,
@@ -37,7 +36,7 @@ object TaskContextUtils {
-1L,
-1,
new TaskMemoryManager(memoryManager, -1L),
- new Properties,
+ properties,
MetricsSystem.createMetricsSystem("GLUTEN_UNSAFE", conf),
TaskMetrics.empty,
1,
diff --git
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
index cd7e4347d..8b1b7649a 100644
---
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
+++
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
@@ -58,7 +58,7 @@ import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.parquet.schema.MessageType
import java.time.ZoneOffset
-import java.util.{HashMap => JHashMap, Map => JMap}
+import java.util.{HashMap => JHashMap, Map => JMap, Properties}
import scala.reflect.ClassTag
@@ -299,8 +299,8 @@ class Spark34Shims extends SparkShims {
override def enableNativeWriteFilesByDefault(): Boolean = true
- override def createTestTaskContext(): TaskContext = {
- TaskContextUtils.createTestTaskContext()
+ override def createTestTaskContext(properties: Properties): TaskContext = {
+ TaskContextUtils.createTestTaskContext(properties)
}
override def broadcastInternal[T: ClassTag](sc: SparkContext, value: T):
Broadcast[T] = {
diff --git
a/shims/spark34/src/main/scala/org/apache/spark/TaskContextUtils.scala
b/shims/spark34/src/main/scala/org/apache/spark/TaskContextUtils.scala
index 976851eb9..7a81b6121 100644
--- a/shims/spark34/src/main/scala/org/apache/spark/TaskContextUtils.scala
+++ b/shims/spark34/src/main/scala/org/apache/spark/TaskContextUtils.scala
@@ -19,17 +19,16 @@ package org.apache.spark
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.memory.{TaskMemoryManager, UnifiedMemoryManager}
import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.network.util.ByteUnit
import java.util.Properties
+import scala.collection.JavaConverters._
+
object TaskContextUtils {
- def createTestTaskContext(): TaskContext = {
+ def createTestTaskContext(properties: Properties): TaskContext = {
val conf = new SparkConf()
- conf.set("spark.memory.offHeap.enabled", "true")
- conf.set("spark.memory.offHeap.size", "1TB")
- val memoryManager =
- new UnifiedMemoryManager(conf, ByteUnit.TiB.toBytes(2),
ByteUnit.TiB.toBytes(1), 1)
+ conf.setAll(properties.asScala)
+ val memoryManager = UnifiedMemoryManager(conf, 1)
new TaskContextImpl(
-1,
-1,
@@ -38,7 +37,7 @@ object TaskContextUtils {
-1,
-1,
new TaskMemoryManager(memoryManager, -1L),
- new Properties,
+ properties,
MetricsSystem.createMetricsSystem("GLUTEN_UNSAFE", conf),
TaskMetrics.empty,
1,
diff --git
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
index bb41b7e73..93785d7a2 100644
---
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
+++
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
@@ -58,7 +58,7 @@ import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.parquet.schema.MessageType
import java.time.ZoneOffset
-import java.util.{HashMap => JHashMap, Map => JMap}
+import java.util.{HashMap => JHashMap, Map => JMap, Properties}
import scala.reflect.ClassTag
@@ -326,8 +326,8 @@ class Spark35Shims extends SparkShims {
override def enableNativeWriteFilesByDefault(): Boolean = true
- override def createTestTaskContext(): TaskContext = {
- TaskContextUtils.createTestTaskContext()
+ override def createTestTaskContext(properties: Properties): TaskContext = {
+ TaskContextUtils.createTestTaskContext(properties)
}
override def broadcastInternal[T: ClassTag](sc: SparkContext, value: T):
Broadcast[T] = {
diff --git
a/shims/spark35/src/main/scala/org/apache/spark/TaskContextUtils.scala
b/shims/spark35/src/main/scala/org/apache/spark/TaskContextUtils.scala
index 976851eb9..7a81b6121 100644
--- a/shims/spark35/src/main/scala/org/apache/spark/TaskContextUtils.scala
+++ b/shims/spark35/src/main/scala/org/apache/spark/TaskContextUtils.scala
@@ -19,17 +19,16 @@ package org.apache.spark
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.memory.{TaskMemoryManager, UnifiedMemoryManager}
import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.network.util.ByteUnit
import java.util.Properties
+import scala.collection.JavaConverters._
+
object TaskContextUtils {
- def createTestTaskContext(): TaskContext = {
+ def createTestTaskContext(properties: Properties): TaskContext = {
val conf = new SparkConf()
- conf.set("spark.memory.offHeap.enabled", "true")
- conf.set("spark.memory.offHeap.size", "1TB")
- val memoryManager =
- new UnifiedMemoryManager(conf, ByteUnit.TiB.toBytes(2),
ByteUnit.TiB.toBytes(1), 1)
+ conf.setAll(properties.asScala)
+ val memoryManager = UnifiedMemoryManager(conf, 1)
new TaskContextImpl(
-1,
-1,
@@ -38,7 +37,7 @@ object TaskContextUtils {
-1,
-1,
new TaskMemoryManager(memoryManager, -1L),
- new Properties,
+ properties,
MetricsSystem.createMetricsSystem("GLUTEN_UNSAFE", conf),
TaskMetrics.empty,
1,
diff --git a/tools/gluten-it/sbin/gluten-it.sh
b/tools/gluten-it/sbin/gluten-it.sh
index b21038ccd..00ff78e34 100755
--- a/tools/gluten-it/sbin/gluten-it.sh
+++ b/tools/gluten-it/sbin/gluten-it.sh
@@ -48,6 +48,7 @@ $JAVA_HOME/bin/java $GLUTEN_IT_JVM_ARGS \
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED \
--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED \
--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED \
+ --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED \
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED \
--add-opens=java.base/sun.nio.cs=ALL-UNNAMED \
--add-opens=java.base/sun.security.action=ALL-UNNAMED \
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]