This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 4ea60cef9 fix: Simplify CometShuffleMemoryAllocator logic, rename
classes, remove config (#1485)
4ea60cef9 is described below
commit 4ea60cef9099b6b03186d45b502c1a516d9ad0e7
Author: Matt Butrovich <[email protected]>
AuthorDate: Thu Mar 13 13:01:59 2025 -0400
fix: Simplify CometShuffleMemoryAllocator logic, rename classes, remove
config (#1485)
---
.../main/scala/org/apache/comet/CometConf.scala | 11 ---
...ava => CometBoundedShuffleMemoryAllocator.java} | 6 +-
.../shuffle/comet/CometShuffleMemoryAllocator.java | 80 ++++------------------
...ava => CometUnifiedShuffleMemoryAllocator.java} | 54 ++++-----------
.../apache/comet/CometSparkSessionExtensions.scala | 6 --
.../src/main/scala/org/apache/spark/Plugins.scala | 20 +++---
.../comet/exec/CometColumnarShuffleSuite.scala | 10 ---
.../scala/org/apache/spark/CometPluginsSuite.scala | 18 +++--
8 files changed, 46 insertions(+), 159 deletions(-)
diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala
b/common/src/main/scala/org/apache/comet/CometConf.scala
index 319b0848c..532e43f13 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -404,17 +404,6 @@ object CometConf extends ShimCometConf {
"Ensure that Comet shuffle memory overhead factor is a double greater
than 0")
.createWithDefault(1.0)
- val COMET_COLUMNAR_SHUFFLE_UNIFIED_MEMORY_ALLOCATOR_IN_TEST:
ConfigEntry[Boolean] =
- conf("spark.comet.columnar.shuffle.unifiedMemoryAllocatorTest")
- .doc("Whether to use Spark unified memory allocator for Comet columnar
shuffle in tests." +
- "If not configured, Comet will use a test-only memory allocator for
Comet columnar " +
- "shuffle when Spark test env detected. The test-ony allocator is
proposed to run with " +
- "Spark tests as these tests require on-heap memory configuration. " +
- "By default, this config is false.")
- .internal()
- .booleanConf
- .createWithDefault(false)
-
val COMET_COLUMNAR_SHUFFLE_BATCH_SIZE: ConfigEntry[Int] =
conf("spark.comet.columnar.shuffle.batch.size")
.internal()
diff --git
a/spark/src/main/java/org/apache/spark/shuffle/comet/CometTestShuffleMemoryAllocator.java
b/spark/src/main/java/org/apache/spark/shuffle/comet/CometBoundedShuffleMemoryAllocator.java
similarity index 97%
rename from
spark/src/main/java/org/apache/spark/shuffle/comet/CometTestShuffleMemoryAllocator.java
rename to
spark/src/main/java/org/apache/spark/shuffle/comet/CometBoundedShuffleMemoryAllocator.java
index 084e82b2b..bd6782a5d 100644
---
a/spark/src/main/java/org/apache/spark/shuffle/comet/CometTestShuffleMemoryAllocator.java
+++
b/spark/src/main/java/org/apache/spark/shuffle/comet/CometBoundedShuffleMemoryAllocator.java
@@ -48,7 +48,7 @@ import org.apache.comet.CometSparkSessionExtensions$;
* Thus, this allocator is used to allocate separate off-heap memory
allocation for Comet JVM
* shuffle and execution apart from Spark's on-heap memory configuration.
*/
-public final class CometTestShuffleMemoryAllocator extends
CometShuffleMemoryAllocatorTrait {
+public final class CometBoundedShuffleMemoryAllocator extends
CometShuffleMemoryAllocatorTrait {
private final UnsafeMemoryAllocator allocator = new UnsafeMemoryAllocator();
private final long pageSize;
@@ -67,9 +67,7 @@ public final class CometTestShuffleMemoryAllocator extends
CometShuffleMemoryAll
private static final int OFFSET_BITS = 51;
private static final long MASK_LONG_LOWER_51_BITS = 0x7FFFFFFFFFFFFL;
- private static CometTestShuffleMemoryAllocator INSTANCE;
-
- CometTestShuffleMemoryAllocator(
+ CometBoundedShuffleMemoryAllocator(
SparkConf conf, TaskMemoryManager taskMemoryManager, long pageSize) {
super(taskMemoryManager, pageSize, MemoryMode.OFF_HEAP);
this.pageSize = pageSize;
diff --git
a/spark/src/main/java/org/apache/spark/shuffle/comet/CometShuffleMemoryAllocator.java
b/spark/src/main/java/org/apache/spark/shuffle/comet/CometShuffleMemoryAllocator.java
index f6e6ca96a..e8f9525b6 100644
---
a/spark/src/main/java/org/apache/spark/shuffle/comet/CometShuffleMemoryAllocator.java
+++
b/spark/src/main/java/org/apache/spark/shuffle/comet/CometShuffleMemoryAllocator.java
@@ -19,91 +19,37 @@
package org.apache.spark.shuffle.comet;
-import java.io.IOException;
-
import org.apache.spark.SparkConf;
-import org.apache.spark.memory.MemoryConsumer;
import org.apache.spark.memory.MemoryMode;
import org.apache.spark.memory.TaskMemoryManager;
-import org.apache.spark.unsafe.memory.MemoryBlock;
-import org.apache.spark.util.Utils;
-
-import org.apache.comet.CometConf$;
/**
- * A simple memory allocator used by `CometShuffleExternalSorter` to allocate
memory blocks which
- * store serialized rows. This class is simply an implementation of
`MemoryConsumer` that delegates
- * memory allocation to the `TaskMemoryManager`. This requires that the
`TaskMemoryManager` is
- * configured with `MemoryMode.OFF_HEAP`, i.e. it is using off-heap memory.
+ * An interface to instantiate either CometBoundedShuffleMemoryAllocator
(on-heap mode) or
+ * CometUnifiedShuffleMemoryAllocator (off-heap mode).
*/
-public final class CometShuffleMemoryAllocator extends
CometShuffleMemoryAllocatorTrait {
+public final class CometShuffleMemoryAllocator {
private static CometShuffleMemoryAllocatorTrait INSTANCE;
/**
* Returns the singleton instance of `CometShuffleMemoryAllocator`. This
method should be used
* instead of the constructor to ensure that only one instance of
`CometShuffleMemoryAllocator` is
- * created. For Spark tests, this returns `CometTestShuffleMemoryAllocator`
which is a test-only
- * allocator that should not be used in production.
+ * created. For on-heap mode (Spark tests), this returns
`CometBoundedShuffleMemoryAllocator`.
*/
public static CometShuffleMemoryAllocatorTrait getInstance(
SparkConf conf, TaskMemoryManager taskMemoryManager, long pageSize) {
- boolean isSparkTesting = Utils.isTesting();
- boolean useUnifiedMemAllocator =
- (boolean)
-
CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_UNIFIED_MEMORY_ALLOCATOR_IN_TEST().get();
-
- if (!useUnifiedMemAllocator) {
- synchronized (CometShuffleMemoryAllocator.class) {
- if (INSTANCE == null) {
- // CometTestShuffleMemoryAllocator handles pages by itself so it can
be a singleton.
- INSTANCE = new CometTestShuffleMemoryAllocator(conf,
taskMemoryManager, pageSize);
- }
- }
- return INSTANCE;
- } else {
- if (taskMemoryManager.getTungstenMemoryMode() != MemoryMode.OFF_HEAP) {
- throw new IllegalArgumentException(
- "CometShuffleMemoryAllocator should be used with off-heap "
- + "memory mode, but got "
- + taskMemoryManager.getTungstenMemoryMode());
- }
+ if (taskMemoryManager.getTungstenMemoryMode() == MemoryMode.OFF_HEAP) {
// CometShuffleMemoryAllocator stores pages in TaskMemoryManager which
is not singleton,
// but one instance per task. So we need to create a new instance for
each task.
- return new CometShuffleMemoryAllocator(taskMemoryManager, pageSize);
+ return new CometUnifiedShuffleMemoryAllocator(taskMemoryManager,
pageSize);
}
- }
-
- CometShuffleMemoryAllocator(TaskMemoryManager taskMemoryManager, long
pageSize) {
- super(taskMemoryManager, pageSize, MemoryMode.OFF_HEAP);
- }
-
- public long spill(long l, MemoryConsumer memoryConsumer) throws IOException {
- // JVM shuffle writer does not support spilling for other memory consumers
- return 0;
- }
-
- public synchronized MemoryBlock allocate(long required) {
- return this.allocatePage(required);
- }
-
- public synchronized void free(MemoryBlock block) {
- this.freePage(block);
- }
- /**
- * Returns the offset in the page for the given page plus base offset
address. Note that this
- * method assumes that the page number is valid.
- */
- public long getOffsetInPage(long pagePlusOffsetAddress) {
- return taskMemoryManager.getOffsetInPage(pagePlusOffsetAddress);
- }
-
- public long encodePageNumberAndOffset(int pageNumber, long offsetInPage) {
- return TaskMemoryManager.encodePageNumberAndOffset(pageNumber,
offsetInPage);
- }
-
- public long encodePageNumberAndOffset(MemoryBlock page, long offsetInPage) {
- return encodePageNumberAndOffset(page.pageNumber, offsetInPage -
page.getBaseOffset());
+ synchronized (CometShuffleMemoryAllocator.class) {
+ if (INSTANCE == null) {
+ // CometBoundedShuffleMemoryAllocator handles pages by itself so it
can be a singleton.
+ INSTANCE = new CometBoundedShuffleMemoryAllocator(conf,
taskMemoryManager, pageSize);
+ }
+ }
+ return INSTANCE;
}
}
diff --git
a/spark/src/main/java/org/apache/spark/shuffle/comet/CometShuffleMemoryAllocator.java
b/spark/src/main/java/org/apache/spark/shuffle/comet/CometUnifiedShuffleMemoryAllocator.java
similarity index 56%
copy from
spark/src/main/java/org/apache/spark/shuffle/comet/CometShuffleMemoryAllocator.java
copy to
spark/src/main/java/org/apache/spark/shuffle/comet/CometUnifiedShuffleMemoryAllocator.java
index f6e6ca96a..917d96f0f 100644
---
a/spark/src/main/java/org/apache/spark/shuffle/comet/CometShuffleMemoryAllocator.java
+++
b/spark/src/main/java/org/apache/spark/shuffle/comet/CometUnifiedShuffleMemoryAllocator.java
@@ -21,61 +21,31 @@ package org.apache.spark.shuffle.comet;
import java.io.IOException;
-import org.apache.spark.SparkConf;
import org.apache.spark.memory.MemoryConsumer;
import org.apache.spark.memory.MemoryMode;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.unsafe.memory.MemoryBlock;
-import org.apache.spark.util.Utils;
-
-import org.apache.comet.CometConf$;
/**
* A simple memory allocator used by `CometShuffleExternalSorter` to allocate
memory blocks which
* store serialized rows. This class is simply an implementation of
`MemoryConsumer` that delegates
* memory allocation to the `TaskMemoryManager`. This requires that the
`TaskMemoryManager` is
* configured with `MemoryMode.OFF_HEAP`, i.e. it is using off-heap memory.
+ *
+ * <p>If the user does not enable off-heap memory then we want to use
+ * CometBoundedShuffleMemoryAllocator. The tests also need to default to using
this because off-heap
+ * is not enabled when running the Spark SQL tests.
*/
-public final class CometShuffleMemoryAllocator extends
CometShuffleMemoryAllocatorTrait {
- private static CometShuffleMemoryAllocatorTrait INSTANCE;
-
- /**
- * Returns the singleton instance of `CometShuffleMemoryAllocator`. This
method should be used
- * instead of the constructor to ensure that only one instance of
`CometShuffleMemoryAllocator` is
- * created. For Spark tests, this returns `CometTestShuffleMemoryAllocator`
which is a test-only
- * allocator that should not be used in production.
- */
- public static CometShuffleMemoryAllocatorTrait getInstance(
- SparkConf conf, TaskMemoryManager taskMemoryManager, long pageSize) {
- boolean isSparkTesting = Utils.isTesting();
- boolean useUnifiedMemAllocator =
- (boolean)
-
CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_UNIFIED_MEMORY_ALLOCATOR_IN_TEST().get();
+public final class CometUnifiedShuffleMemoryAllocator extends
CometShuffleMemoryAllocatorTrait {
- if (!useUnifiedMemAllocator) {
- synchronized (CometShuffleMemoryAllocator.class) {
- if (INSTANCE == null) {
- // CometTestShuffleMemoryAllocator handles pages by itself so it can
be a singleton.
- INSTANCE = new CometTestShuffleMemoryAllocator(conf,
taskMemoryManager, pageSize);
- }
- }
- return INSTANCE;
- } else {
- if (taskMemoryManager.getTungstenMemoryMode() != MemoryMode.OFF_HEAP) {
- throw new IllegalArgumentException(
- "CometShuffleMemoryAllocator should be used with off-heap "
- + "memory mode, but got "
- + taskMemoryManager.getTungstenMemoryMode());
- }
-
- // CometShuffleMemoryAllocator stores pages in TaskMemoryManager which
is not singleton,
- // but one instance per task. So we need to create a new instance for
each task.
- return new CometShuffleMemoryAllocator(taskMemoryManager, pageSize);
- }
- }
-
- CometShuffleMemoryAllocator(TaskMemoryManager taskMemoryManager, long
pageSize) {
+ CometUnifiedShuffleMemoryAllocator(TaskMemoryManager taskMemoryManager, long
pageSize) {
super(taskMemoryManager, pageSize, MemoryMode.OFF_HEAP);
+ if (taskMemoryManager.getTungstenMemoryMode() != MemoryMode.OFF_HEAP) {
+ throw new IllegalArgumentException(
+ "CometUnifiedShuffleMemoryAllocator should be used with off-heap "
+ + "memory mode, but got "
+ + taskMemoryManager.getTungstenMemoryMode());
+ }
}
public long spill(long l, MemoryConsumer memoryConsumer) throws IOException {
diff --git
a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
index c82d10da2..14af8ded9 100644
--- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
+++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
@@ -1418,12 +1418,6 @@ object CometSparkSessionExtensions extends Logging {
sparkConf.getBoolean("spark.memory.offHeap.enabled", false)
}
- def cometShuffleUnifiedMemoryManagerInTestEnabled(sparkConf: SparkConf):
Boolean = {
- sparkConf.getBoolean(
- CometConf.COMET_COLUMNAR_SHUFFLE_UNIFIED_MEMORY_ALLOCATOR_IN_TEST.key,
-
CometConf.COMET_COLUMNAR_SHUFFLE_UNIFIED_MEMORY_ALLOCATOR_IN_TEST.defaultValue.get)
- }
-
/**
* Attaches explain information to a TreeNode, rolling up the corresponding
information tags
* from any child nodes. For now, we are using this to attach the reasons
why certain Spark
diff --git a/spark/src/main/scala/org/apache/spark/Plugins.scala
b/spark/src/main/scala/org/apache/spark/Plugins.scala
index ac3ec8c3f..c4adf48e9 100644
--- a/spark/src/main/scala/org/apache/spark/Plugins.scala
+++ b/spark/src/main/scala/org/apache/spark/Plugins.scala
@@ -101,15 +101,17 @@ class CometDriverPlugin extends DriverPlugin with Logging
with ShimCometDriverPl
* unified memory manager.
*/
private def shouldOverrideMemoryConf(conf: SparkConf): Boolean = {
- conf.getBoolean(CometConf.COMET_ENABLED.key,
CometConf.COMET_ENABLED.defaultValue.get) && (
- conf.getBoolean(
- CometConf.COMET_EXEC_SHUFFLE_ENABLED.key,
- CometConf.COMET_EXEC_SHUFFLE_ENABLED.defaultValue.get) ||
- conf.getBoolean(
- CometConf.COMET_EXEC_ENABLED.key,
- CometConf.COMET_EXEC_ENABLED.defaultValue.get)
- ) && (!CometSparkSessionExtensions.cometUnifiedMemoryManagerEnabled(conf)
||
-
!CometSparkSessionExtensions.cometShuffleUnifiedMemoryManagerInTestEnabled(conf))
+ val cometEnabled =
+ conf.getBoolean(CometConf.COMET_ENABLED.key,
CometConf.COMET_ENABLED.defaultValue.get)
+ val cometExecShuffle = conf.getBoolean(
+ CometConf.COMET_EXEC_SHUFFLE_ENABLED.key,
+ CometConf.COMET_EXEC_SHUFFLE_ENABLED.defaultValue.get)
+ val cometExec = conf.getBoolean(
+ CometConf.COMET_EXEC_ENABLED.key,
+ CometConf.COMET_EXEC_ENABLED.defaultValue.get)
+ val unifiedMemory =
CometSparkSessionExtensions.cometUnifiedMemoryManagerEnabled(conf)
+
+ cometEnabled && (cometExecShuffle || cometExec) && !unifiedMemory
}
}
diff --git
a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala
b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala
index b789cf994..1df865290 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala
@@ -40,7 +40,6 @@ import org.apache.comet.{CometConf,
CometSparkSessionExtensions}
abstract class CometColumnarShuffleSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
protected val adaptiveExecutionEnabled: Boolean
protected val numElementsForceSpillThreshold: Int = 10
- protected val useUnifiedMemoryAllocator: Boolean = true
override protected def sparkConf: SparkConf = {
val conf = super.sparkConf
@@ -58,8 +57,6 @@ abstract class CometColumnarShuffleSuite extends
CometTestBase with AdaptiveSpar
CometConf.COMET_COLUMNAR_SHUFFLE_SPILL_THRESHOLD.key ->
numElementsForceSpillThreshold.toString,
CometConf.COMET_EXEC_ENABLED.key -> "false",
CometConf.COMET_SHUFFLE_MODE.key -> "jvm",
- CometConf.COMET_COLUMNAR_SHUFFLE_UNIFIED_MEMORY_ALLOCATOR_IN_TEST.key
->
- useUnifiedMemoryAllocator.toString,
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
CometConf.COMET_COLUMNAR_SHUFFLE_MEMORY_SIZE.key -> "1536m") {
testFun
@@ -997,13 +994,6 @@ abstract class CometColumnarShuffleSuite extends
CometTestBase with AdaptiveSpar
}
}
-class CometTestMemoryAllocatorShuffleSuite extends CometColumnarShuffleSuite {
- override protected val asyncShuffleEnable: Boolean = false
- override protected val adaptiveExecutionEnabled: Boolean = true
- // Explicitly test with `CometTestShuffleMemoryAllocator`
- override protected val useUnifiedMemoryAllocator: Boolean = false
-}
-
class CometAsyncShuffleSuite extends CometColumnarShuffleSuite {
override protected val asyncShuffleEnable: Boolean = true
diff --git a/spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala
b/spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala
index 67a06b28b..93dc10550 100644
--- a/spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala
+++ b/spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala
@@ -161,21 +161,19 @@ class CometPluginsUnifiedModeOverrideSuite extends
CometTestBase {
}
/*
- * Since using unified memory, but not shuffle unified memory
- * executor memory should be overridden by adding comet shuffle memory size
+ * Since using unified memory executor memory should not be overridden
*/
- test("executor memory overhead is correctly overridden") {
+ test("executor memory overhead is not overridden") {
val execMemOverhead1 = spark.conf.get("spark.executor.memoryOverhead")
val execMemOverhead2 =
spark.sessionState.conf.getConfString("spark.executor.memoryOverhead")
val execMemOverhead3 =
spark.sparkContext.getConf.get("spark.executor.memoryOverhead")
val execMemOverhead4 =
spark.sparkContext.conf.get("spark.executor.memoryOverhead")
- // in unified memory mode, comet memory overhead is
spark.memory.offHeap.size (2G) * spark.comet.memory.overhead.factor (0.5) = 1G
- // so the total executor memory overhead is executor memory overhead (1G)
+ comet memory overhead (1G) = 2G
- // and the overhead is overridden in MiB
- assert(execMemOverhead1 == "2048M")
- assert(execMemOverhead2 == "2048M")
- assert(execMemOverhead3 == "2048M")
- assert(execMemOverhead4 == "2048M")
+ // in unified memory mode, comet memory overhead is
+ // spark.memory.offHeap.size (2G) * spark.comet.memory.overhead.factor
(0.5) = 1G and the overhead is not overridden
+ assert(execMemOverhead1 == "1G")
+ assert(execMemOverhead2 == "1G")
+ assert(execMemOverhead3 == "1G")
+ assert(execMemOverhead4 == "1G")
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]