zhouyuan commented on code in PR #9336:
URL: https://github.com/apache/incubator-gluten/pull/9336#discussion_r2049075847
##########
shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala:
##########
@@ -1634,7 +1637,7 @@ object GlutenConfig {
.createWithDefault(true)
val DYNAMIC_OFFHEAP_SIZING_ENABLED =
- buildConf("spark.gluten.memory.dynamic.offHeap.sizing.enabled")
+ buildStaticConf("spark.gluten.memory.dynamic.offHeap.sizing.enabled")
Review Comment:
is this a behavior change by changing this to static config?
##########
gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala:
##########
@@ -188,74 +188,25 @@ private[gluten] class GlutenDriverPlugin extends
DriverPlugin with Logging {
// check memory off-heap enabled and size.
checkOffHeapSettings(conf)
- // Task slots.
- val taskSlots = SparkResourceUtil.getTaskSlots(conf)
- conf.set(NUM_TASK_SLOTS_PER_EXECUTOR.key, taskSlots.toString)
-
- val onHeapSize: Long = conf.getSizeAsBytes(SPARK_ONHEAP_SIZE_KEY, 1024 *
1024 * 1024)
-
- // If dynamic off-heap sizing is enabled, the off-heap size is calculated
based on the on-heap
- // size. Otherwise, the off-heap size is set to the value specified by the
user (if any).
- // Note that this means that we will IGNORE the off-heap size specified by
the user if the
- // dynamic off-heap feature is enabled.
- val offHeapSize: Long =
- if (
- conf.getBoolean(
- DYNAMIC_OFFHEAP_SIZING_ENABLED.key,
- DYNAMIC_OFFHEAP_SIZING_ENABLED.defaultValue.get)
- ) {
- // Since when dynamic off-heap sizing is enabled, we commingle on-heap
- // and off-heap memory, we set the off-heap size to the usable on-heap
size. We will
- // size it with a memory fraction, which can be aggressively set, but
the default
- // is using the same way that Spark sizes on-heap memory:
- //
- // spark.gluten.memory.dynamic.offHeap.sizing.memory.fraction *
- // (spark.executor.memory - 300MB).
- //
- // We will be careful to use the same configuration settings as Spark
to ensure
- // that we are sizing the off-heap memory in the same way as Spark
sizes on-heap memory.
- // The 300MB value, unfortunately, is hard-coded in Spark code.
- ((onHeapSize - (300 * 1024 * 1024)) *
- conf.getDouble(DYNAMIC_OFFHEAP_SIZING_MEMORY_FRACTION.key,
0.6d)).toLong
- } else {
- // Optimistic off-heap sizes, assuming all storage memory can be
borrowed into execution
- // memory pool, regardless of Spark option
spark.memory.storageFraction.
- conf.getSizeAsBytes(SPARK_OFFHEAP_SIZE_KEY, 0L)
- }
+ // Get the off-heap size set by user.
+ val offHeapSize = conf.getSizeAsBytes(SPARK_OFFHEAP_SIZE_KEY)
+ // Set off-heap size in bytes.
conf.set(COLUMNAR_OFFHEAP_SIZE_IN_BYTES.key, offHeapSize.toString)
- conf.set(SPARK_OFFHEAP_SIZE_KEY, offHeapSize.toString)
+ // Set off-heap size in bytes per task.
+ val taskSlots = SparkResourceUtil.getTaskSlots(conf)
+ conf.set(NUM_TASK_SLOTS_PER_EXECUTOR.key, taskSlots.toString)
val offHeapPerTask = offHeapSize / taskSlots
conf.set(COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES.key, offHeapPerTask.toString)
- // If we are using dynamic off-heap sizing, we should also enable off-heap
memory
- // officially.
- if (
- conf.getBoolean(
- DYNAMIC_OFFHEAP_SIZING_ENABLED.key,
- DYNAMIC_OFFHEAP_SIZING_ENABLED.defaultValue.get)
- ) {
- conf.set(SPARK_OFFHEAP_ENABLED, "true")
-
- // We already sized the off-heap per task in a conservative manner, so
we can just
- // use it.
- conf.set(COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES.key,
offHeapPerTask.toString)
- } else {
- // Let's make sure this is set to false explicitly if it is not on as it
- // is looked up when throwing OOF exceptions.
- conf.set(
- DYNAMIC_OFFHEAP_SIZING_ENABLED.key,
- DYNAMIC_OFFHEAP_SIZING_ENABLED.defaultValueString)
-
- // Pessimistic off-heap sizes, with the assumption that all
non-borrowable storage memory
- // determined by spark.memory.storageFraction was used.
- val fraction = 1.0d - conf.getDouble("spark.memory.storageFraction",
0.5d)
- val conservativeOffHeapPerTask = (offHeapSize * fraction).toLong /
taskSlots
- conf.set(
- COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES.key,
- conservativeOffHeapPerTask.toString)
- }
+ // Pessimistic off-heap sizes, with the assumption that all non-borrowable
storage memory
+ // determined by spark.memory.storageFraction was used.
+ val fraction = 1.0d - conf.getDouble("spark.memory.storageFraction", 0.5d)
+ val conservativeOffHeapPerTask = (offHeapSize * fraction).toLong /
taskSlots
Review Comment:
it looks like changed the scope: `conservativeOffHeapPerTask` is effective
on `DYNAMIC_OFFHEAP_SIZING_ENABLED=false` on previous code
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]