This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 4ea60cef9 fix: Simplify CometShuffleMemoryAllocator logic, rename 
classes, remove config (#1485)
4ea60cef9 is described below

commit 4ea60cef9099b6b03186d45b502c1a516d9ad0e7
Author: Matt Butrovich <[email protected]>
AuthorDate: Thu Mar 13 13:01:59 2025 -0400

    fix: Simplify CometShuffleMemoryAllocator logic, rename classes, remove 
config (#1485)
---
 .../main/scala/org/apache/comet/CometConf.scala    | 11 ---
 ...ava => CometBoundedShuffleMemoryAllocator.java} |  6 +-
 .../shuffle/comet/CometShuffleMemoryAllocator.java | 80 ++++------------------
 ...ava => CometUnifiedShuffleMemoryAllocator.java} | 54 ++++-----------
 .../apache/comet/CometSparkSessionExtensions.scala |  6 --
 .../src/main/scala/org/apache/spark/Plugins.scala  | 20 +++---
 .../comet/exec/CometColumnarShuffleSuite.scala     | 10 ---
 .../scala/org/apache/spark/CometPluginsSuite.scala | 18 +++--
 8 files changed, 46 insertions(+), 159 deletions(-)

diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala 
b/common/src/main/scala/org/apache/comet/CometConf.scala
index 319b0848c..532e43f13 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -404,17 +404,6 @@ object CometConf extends ShimCometConf {
         "Ensure that Comet shuffle memory overhead factor is a double greater 
than 0")
       .createWithDefault(1.0)
 
-  val COMET_COLUMNAR_SHUFFLE_UNIFIED_MEMORY_ALLOCATOR_IN_TEST: 
ConfigEntry[Boolean] =
-    conf("spark.comet.columnar.shuffle.unifiedMemoryAllocatorTest")
-      .doc("Whether to use Spark unified memory allocator for Comet columnar 
shuffle in tests." +
-        "If not configured, Comet will use a test-only memory allocator for 
Comet columnar " +
-        "shuffle when Spark test env detected. The test-ony allocator is 
proposed to run with " +
-        "Spark tests as these tests require on-heap memory configuration. " +
-        "By default, this config is false.")
-      .internal()
-      .booleanConf
-      .createWithDefault(false)
-
   val COMET_COLUMNAR_SHUFFLE_BATCH_SIZE: ConfigEntry[Int] =
     conf("spark.comet.columnar.shuffle.batch.size")
       .internal()
diff --git 
a/spark/src/main/java/org/apache/spark/shuffle/comet/CometTestShuffleMemoryAllocator.java
 
b/spark/src/main/java/org/apache/spark/shuffle/comet/CometBoundedShuffleMemoryAllocator.java
similarity index 97%
rename from 
spark/src/main/java/org/apache/spark/shuffle/comet/CometTestShuffleMemoryAllocator.java
rename to 
spark/src/main/java/org/apache/spark/shuffle/comet/CometBoundedShuffleMemoryAllocator.java
index 084e82b2b..bd6782a5d 100644
--- 
a/spark/src/main/java/org/apache/spark/shuffle/comet/CometTestShuffleMemoryAllocator.java
+++ 
b/spark/src/main/java/org/apache/spark/shuffle/comet/CometBoundedShuffleMemoryAllocator.java
@@ -48,7 +48,7 @@ import org.apache.comet.CometSparkSessionExtensions$;
  * Thus, this allocator is used to allocate separate off-heap memory 
allocation for Comet JVM
  * shuffle and execution apart from Spark's on-heap memory configuration.
  */
-public final class CometTestShuffleMemoryAllocator extends 
CometShuffleMemoryAllocatorTrait {
+public final class CometBoundedShuffleMemoryAllocator extends 
CometShuffleMemoryAllocatorTrait {
   private final UnsafeMemoryAllocator allocator = new UnsafeMemoryAllocator();
 
   private final long pageSize;
@@ -67,9 +67,7 @@ public final class CometTestShuffleMemoryAllocator extends 
CometShuffleMemoryAll
   private static final int OFFSET_BITS = 51;
   private static final long MASK_LONG_LOWER_51_BITS = 0x7FFFFFFFFFFFFL;
 
-  private static CometTestShuffleMemoryAllocator INSTANCE;
-
-  CometTestShuffleMemoryAllocator(
+  CometBoundedShuffleMemoryAllocator(
       SparkConf conf, TaskMemoryManager taskMemoryManager, long pageSize) {
     super(taskMemoryManager, pageSize, MemoryMode.OFF_HEAP);
     this.pageSize = pageSize;
diff --git 
a/spark/src/main/java/org/apache/spark/shuffle/comet/CometShuffleMemoryAllocator.java
 
b/spark/src/main/java/org/apache/spark/shuffle/comet/CometShuffleMemoryAllocator.java
index f6e6ca96a..e8f9525b6 100644
--- 
a/spark/src/main/java/org/apache/spark/shuffle/comet/CometShuffleMemoryAllocator.java
+++ 
b/spark/src/main/java/org/apache/spark/shuffle/comet/CometShuffleMemoryAllocator.java
@@ -19,91 +19,37 @@
 
 package org.apache.spark.shuffle.comet;
 
-import java.io.IOException;
-
 import org.apache.spark.SparkConf;
-import org.apache.spark.memory.MemoryConsumer;
 import org.apache.spark.memory.MemoryMode;
 import org.apache.spark.memory.TaskMemoryManager;
-import org.apache.spark.unsafe.memory.MemoryBlock;
-import org.apache.spark.util.Utils;
-
-import org.apache.comet.CometConf$;
 
 /**
- * A simple memory allocator used by `CometShuffleExternalSorter` to allocate 
memory blocks which
- * store serialized rows. This class is simply an implementation of 
`MemoryConsumer` that delegates
- * memory allocation to the `TaskMemoryManager`. This requires that the 
`TaskMemoryManager` is
- * configured with `MemoryMode.OFF_HEAP`, i.e. it is using off-heap memory.
+ * An interface to instantiate either CometBoundedShuffleMemoryAllocator 
(on-heap mode) or
+ * CometUnifiedShuffleMemoryAllocator (off-heap mode).
  */
-public final class CometShuffleMemoryAllocator extends 
CometShuffleMemoryAllocatorTrait {
+public final class CometShuffleMemoryAllocator {
   private static CometShuffleMemoryAllocatorTrait INSTANCE;
 
   /**
    * Returns the singleton instance of `CometShuffleMemoryAllocator`. This 
method should be used
    * instead of the constructor to ensure that only one instance of 
`CometShuffleMemoryAllocator` is
-   * created. For Spark tests, this returns `CometTestShuffleMemoryAllocator` 
which is a test-only
-   * allocator that should not be used in production.
+   * created. For on-heap mode (Spark tests), this returns 
`CometBoundedShuffleMemoryAllocator`.
    */
   public static CometShuffleMemoryAllocatorTrait getInstance(
       SparkConf conf, TaskMemoryManager taskMemoryManager, long pageSize) {
-    boolean isSparkTesting = Utils.isTesting();
-    boolean useUnifiedMemAllocator =
-        (boolean)
-            
CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_UNIFIED_MEMORY_ALLOCATOR_IN_TEST().get();
-
-    if (!useUnifiedMemAllocator) {
-      synchronized (CometShuffleMemoryAllocator.class) {
-        if (INSTANCE == null) {
-          // CometTestShuffleMemoryAllocator handles pages by itself so it can 
be a singleton.
-          INSTANCE = new CometTestShuffleMemoryAllocator(conf, 
taskMemoryManager, pageSize);
-        }
-      }
-      return INSTANCE;
-    } else {
-      if (taskMemoryManager.getTungstenMemoryMode() != MemoryMode.OFF_HEAP) {
-        throw new IllegalArgumentException(
-            "CometShuffleMemoryAllocator should be used with off-heap "
-                + "memory mode, but got "
-                + taskMemoryManager.getTungstenMemoryMode());
-      }
 
+    if (taskMemoryManager.getTungstenMemoryMode() == MemoryMode.OFF_HEAP) {
       // CometShuffleMemoryAllocator stores pages in TaskMemoryManager which 
is not singleton,
       // but one instance per task. So we need to create a new instance for 
each task.
-      return new CometShuffleMemoryAllocator(taskMemoryManager, pageSize);
+      return new CometUnifiedShuffleMemoryAllocator(taskMemoryManager, 
pageSize);
     }
-  }
-
-  CometShuffleMemoryAllocator(TaskMemoryManager taskMemoryManager, long 
pageSize) {
-    super(taskMemoryManager, pageSize, MemoryMode.OFF_HEAP);
-  }
-
-  public long spill(long l, MemoryConsumer memoryConsumer) throws IOException {
-    // JVM shuffle writer does not support spilling for other memory consumers
-    return 0;
-  }
-
-  public synchronized MemoryBlock allocate(long required) {
-    return this.allocatePage(required);
-  }
-
-  public synchronized void free(MemoryBlock block) {
-    this.freePage(block);
-  }
 
-  /**
-   * Returns the offset in the page for the given page plus base offset 
address. Note that this
-   * method assumes that the page number is valid.
-   */
-  public long getOffsetInPage(long pagePlusOffsetAddress) {
-    return taskMemoryManager.getOffsetInPage(pagePlusOffsetAddress);
-  }
-
-  public long encodePageNumberAndOffset(int pageNumber, long offsetInPage) {
-    return TaskMemoryManager.encodePageNumberAndOffset(pageNumber, 
offsetInPage);
-  }
-
-  public long encodePageNumberAndOffset(MemoryBlock page, long offsetInPage) {
-    return encodePageNumberAndOffset(page.pageNumber, offsetInPage - 
page.getBaseOffset());
+    synchronized (CometShuffleMemoryAllocator.class) {
+      if (INSTANCE == null) {
+        // CometBoundedShuffleMemoryAllocator handles pages by itself so it 
can be a singleton.
+        INSTANCE = new CometBoundedShuffleMemoryAllocator(conf, 
taskMemoryManager, pageSize);
+      }
+    }
+    return INSTANCE;
   }
 }
diff --git 
a/spark/src/main/java/org/apache/spark/shuffle/comet/CometShuffleMemoryAllocator.java
 
b/spark/src/main/java/org/apache/spark/shuffle/comet/CometUnifiedShuffleMemoryAllocator.java
similarity index 56%
copy from 
spark/src/main/java/org/apache/spark/shuffle/comet/CometShuffleMemoryAllocator.java
copy to 
spark/src/main/java/org/apache/spark/shuffle/comet/CometUnifiedShuffleMemoryAllocator.java
index f6e6ca96a..917d96f0f 100644
--- 
a/spark/src/main/java/org/apache/spark/shuffle/comet/CometShuffleMemoryAllocator.java
+++ 
b/spark/src/main/java/org/apache/spark/shuffle/comet/CometUnifiedShuffleMemoryAllocator.java
@@ -21,61 +21,31 @@ package org.apache.spark.shuffle.comet;
 
 import java.io.IOException;
 
-import org.apache.spark.SparkConf;
 import org.apache.spark.memory.MemoryConsumer;
 import org.apache.spark.memory.MemoryMode;
 import org.apache.spark.memory.TaskMemoryManager;
 import org.apache.spark.unsafe.memory.MemoryBlock;
-import org.apache.spark.util.Utils;
-
-import org.apache.comet.CometConf$;
 
 /**
  * A simple memory allocator used by `CometShuffleExternalSorter` to allocate 
memory blocks which
  * store serialized rows. This class is simply an implementation of 
`MemoryConsumer` that delegates
  * memory allocation to the `TaskMemoryManager`. This requires that the 
`TaskMemoryManager` is
  * configured with `MemoryMode.OFF_HEAP`, i.e. it is using off-heap memory.
+ *
+ * <p>If the user does not enable off-heap memory then we want to use
+ * CometBoundedShuffleMemoryAllocator. The tests also need to default to using 
this because off-heap
+ * is not enabled when running the Spark SQL tests.
  */
-public final class CometShuffleMemoryAllocator extends 
CometShuffleMemoryAllocatorTrait {
-  private static CometShuffleMemoryAllocatorTrait INSTANCE;
-
-  /**
-   * Returns the singleton instance of `CometShuffleMemoryAllocator`. This 
method should be used
-   * instead of the constructor to ensure that only one instance of 
`CometShuffleMemoryAllocator` is
-   * created. For Spark tests, this returns `CometTestShuffleMemoryAllocator` 
which is a test-only
-   * allocator that should not be used in production.
-   */
-  public static CometShuffleMemoryAllocatorTrait getInstance(
-      SparkConf conf, TaskMemoryManager taskMemoryManager, long pageSize) {
-    boolean isSparkTesting = Utils.isTesting();
-    boolean useUnifiedMemAllocator =
-        (boolean)
-            
CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_UNIFIED_MEMORY_ALLOCATOR_IN_TEST().get();
+public final class CometUnifiedShuffleMemoryAllocator extends 
CometShuffleMemoryAllocatorTrait {
 
-    if (!useUnifiedMemAllocator) {
-      synchronized (CometShuffleMemoryAllocator.class) {
-        if (INSTANCE == null) {
-          // CometTestShuffleMemoryAllocator handles pages by itself so it can 
be a singleton.
-          INSTANCE = new CometTestShuffleMemoryAllocator(conf, 
taskMemoryManager, pageSize);
-        }
-      }
-      return INSTANCE;
-    } else {
-      if (taskMemoryManager.getTungstenMemoryMode() != MemoryMode.OFF_HEAP) {
-        throw new IllegalArgumentException(
-            "CometShuffleMemoryAllocator should be used with off-heap "
-                + "memory mode, but got "
-                + taskMemoryManager.getTungstenMemoryMode());
-      }
-
-      // CometShuffleMemoryAllocator stores pages in TaskMemoryManager which 
is not singleton,
-      // but one instance per task. So we need to create a new instance for 
each task.
-      return new CometShuffleMemoryAllocator(taskMemoryManager, pageSize);
-    }
-  }
-
-  CometShuffleMemoryAllocator(TaskMemoryManager taskMemoryManager, long 
pageSize) {
+  CometUnifiedShuffleMemoryAllocator(TaskMemoryManager taskMemoryManager, long 
pageSize) {
     super(taskMemoryManager, pageSize, MemoryMode.OFF_HEAP);
+    if (taskMemoryManager.getTungstenMemoryMode() != MemoryMode.OFF_HEAP) {
+      throw new IllegalArgumentException(
+          "CometUnifiedShuffleMemoryAllocator should be used with off-heap "
+              + "memory mode, but got "
+              + taskMemoryManager.getTungstenMemoryMode());
+    }
   }
 
   public long spill(long l, MemoryConsumer memoryConsumer) throws IOException {
diff --git 
a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala 
b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
index c82d10da2..14af8ded9 100644
--- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
+++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
@@ -1418,12 +1418,6 @@ object CometSparkSessionExtensions extends Logging {
     sparkConf.getBoolean("spark.memory.offHeap.enabled", false)
   }
 
-  def cometShuffleUnifiedMemoryManagerInTestEnabled(sparkConf: SparkConf): 
Boolean = {
-    sparkConf.getBoolean(
-      CometConf.COMET_COLUMNAR_SHUFFLE_UNIFIED_MEMORY_ALLOCATOR_IN_TEST.key,
-      
CometConf.COMET_COLUMNAR_SHUFFLE_UNIFIED_MEMORY_ALLOCATOR_IN_TEST.defaultValue.get)
-  }
-
   /**
    * Attaches explain information to a TreeNode, rolling up the corresponding 
information tags
    * from any child nodes. For now, we are using this to attach the reasons 
why certain Spark
diff --git a/spark/src/main/scala/org/apache/spark/Plugins.scala 
b/spark/src/main/scala/org/apache/spark/Plugins.scala
index ac3ec8c3f..c4adf48e9 100644
--- a/spark/src/main/scala/org/apache/spark/Plugins.scala
+++ b/spark/src/main/scala/org/apache/spark/Plugins.scala
@@ -101,15 +101,17 @@ class CometDriverPlugin extends DriverPlugin with Logging 
with ShimCometDriverPl
    * unified memory manager.
    */
   private def shouldOverrideMemoryConf(conf: SparkConf): Boolean = {
-    conf.getBoolean(CometConf.COMET_ENABLED.key, 
CometConf.COMET_ENABLED.defaultValue.get) && (
-      conf.getBoolean(
-        CometConf.COMET_EXEC_SHUFFLE_ENABLED.key,
-        CometConf.COMET_EXEC_SHUFFLE_ENABLED.defaultValue.get) ||
-        conf.getBoolean(
-          CometConf.COMET_EXEC_ENABLED.key,
-          CometConf.COMET_EXEC_ENABLED.defaultValue.get)
-    ) && (!CometSparkSessionExtensions.cometUnifiedMemoryManagerEnabled(conf) 
||
-      
!CometSparkSessionExtensions.cometShuffleUnifiedMemoryManagerInTestEnabled(conf))
+    val cometEnabled =
+      conf.getBoolean(CometConf.COMET_ENABLED.key, 
CometConf.COMET_ENABLED.defaultValue.get)
+    val cometExecShuffle = conf.getBoolean(
+      CometConf.COMET_EXEC_SHUFFLE_ENABLED.key,
+      CometConf.COMET_EXEC_SHUFFLE_ENABLED.defaultValue.get)
+    val cometExec = conf.getBoolean(
+      CometConf.COMET_EXEC_ENABLED.key,
+      CometConf.COMET_EXEC_ENABLED.defaultValue.get)
+    val unifiedMemory = 
CometSparkSessionExtensions.cometUnifiedMemoryManagerEnabled(conf)
+
+    cometEnabled && (cometExecShuffle || cometExec) && !unifiedMemory
   }
 }
 
diff --git 
a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala 
b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala
index b789cf994..1df865290 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala
@@ -40,7 +40,6 @@ import org.apache.comet.{CometConf, 
CometSparkSessionExtensions}
 abstract class CometColumnarShuffleSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
   protected val adaptiveExecutionEnabled: Boolean
   protected val numElementsForceSpillThreshold: Int = 10
-  protected val useUnifiedMemoryAllocator: Boolean = true
 
   override protected def sparkConf: SparkConf = {
     val conf = super.sparkConf
@@ -58,8 +57,6 @@ abstract class CometColumnarShuffleSuite extends 
CometTestBase with AdaptiveSpar
         CometConf.COMET_COLUMNAR_SHUFFLE_SPILL_THRESHOLD.key -> 
numElementsForceSpillThreshold.toString,
         CometConf.COMET_EXEC_ENABLED.key -> "false",
         CometConf.COMET_SHUFFLE_MODE.key -> "jvm",
-        CometConf.COMET_COLUMNAR_SHUFFLE_UNIFIED_MEMORY_ALLOCATOR_IN_TEST.key 
->
-          useUnifiedMemoryAllocator.toString,
         CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
         CometConf.COMET_COLUMNAR_SHUFFLE_MEMORY_SIZE.key -> "1536m") {
         testFun
@@ -997,13 +994,6 @@ abstract class CometColumnarShuffleSuite extends 
CometTestBase with AdaptiveSpar
   }
 }
 
-class CometTestMemoryAllocatorShuffleSuite extends CometColumnarShuffleSuite {
-  override protected val asyncShuffleEnable: Boolean = false
-  override protected val adaptiveExecutionEnabled: Boolean = true
-  // Explicitly test with `CometTestShuffleMemoryAllocator`
-  override protected val useUnifiedMemoryAllocator: Boolean = false
-}
-
 class CometAsyncShuffleSuite extends CometColumnarShuffleSuite {
   override protected val asyncShuffleEnable: Boolean = true
 
diff --git a/spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala 
b/spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala
index 67a06b28b..93dc10550 100644
--- a/spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala
+++ b/spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala
@@ -161,21 +161,19 @@ class CometPluginsUnifiedModeOverrideSuite extends 
CometTestBase {
   }
 
   /*
-   * Since using unified memory, but not shuffle unified memory
-   * executor memory should be overridden by adding comet shuffle memory size
+   * Since using unified memory executor memory should not be overridden
    */
-  test("executor memory overhead is correctly overridden") {
+  test("executor memory overhead is not overridden") {
     val execMemOverhead1 = spark.conf.get("spark.executor.memoryOverhead")
     val execMemOverhead2 = 
spark.sessionState.conf.getConfString("spark.executor.memoryOverhead")
     val execMemOverhead3 = 
spark.sparkContext.getConf.get("spark.executor.memoryOverhead")
     val execMemOverhead4 = 
spark.sparkContext.conf.get("spark.executor.memoryOverhead")
 
-    // in unified memory mode, comet memory overhead is 
spark.memory.offHeap.size (2G) * spark.comet.memory.overhead.factor (0.5) = 1G
-    // so the total executor memory overhead is executor memory overhead (1G) 
+ comet memory overhead (1G) = 2G
-    // and the overhead is overridden in MiB
-    assert(execMemOverhead1 == "2048M")
-    assert(execMemOverhead2 == "2048M")
-    assert(execMemOverhead3 == "2048M")
-    assert(execMemOverhead4 == "2048M")
+    // in unified memory mode, comet memory overhead is
+    // spark.memory.offHeap.size (2G) * spark.comet.memory.overhead.factor 
(0.5) = 1G  and the overhead is not overridden
+    assert(execMemOverhead1 == "1G")
+    assert(execMemOverhead2 == "1G")
+    assert(execMemOverhead3 == "1G")
+    assert(execMemOverhead4 == "1G")
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to