viirya commented on code in PR #1063:
URL: https://github.com/apache/datafusion-comet/pull/1063#discussion_r1841365156


##########
spark/src/main/java/org/apache/spark/shuffle/comet/CometShuffleMemoryAllocator.java:
##########
@@ -20,179 +20,86 @@
 package org.apache.spark.shuffle.comet;
 
 import java.io.IOException;
-import java.util.BitSet;
 
 import org.apache.spark.SparkConf;
 import org.apache.spark.memory.MemoryConsumer;
 import org.apache.spark.memory.MemoryMode;
-import org.apache.spark.memory.SparkOutOfMemoryError;
 import org.apache.spark.memory.TaskMemoryManager;
-import org.apache.spark.sql.internal.SQLConf;
-import org.apache.spark.unsafe.array.LongArray;
 import org.apache.spark.unsafe.memory.MemoryBlock;
-import org.apache.spark.unsafe.memory.UnsafeMemoryAllocator;
+import org.apache.spark.util.Utils;
 
-import org.apache.comet.CometSparkSessionExtensions$;
+import org.apache.comet.CometConf$;
 
 /**
  * A simple memory allocator used by `CometShuffleExternalSorter` to allocate 
memory blocks which
- * store serialized rows. We don't rely on Spark memory allocator because we 
need to allocate
- * off-heap memory no matter memory mode is on-heap or off-heap. This 
allocator is configured with
- * fixed size of memory, and it will throw `SparkOutOfMemoryError` if the 
memory is not enough.
- *
- * <p>Some methods are copied from 
`org.apache.spark.unsafe.memory.TaskMemoryManager` with
- * modifications. Most modifications are to remove the dependency on the 
configured memory mode.
+ * store serialized rows. This class is simply an implementation of 
`MemoryConsumer` that delegates
+ * memory allocation to the `TaskMemoryManager`. This requires that the 
`TaskMemoryManager` is
+ * configured with `MemoryMode.OFF_HEAP`, i.e. it is using off-heap memory.
  */
-public final class CometShuffleMemoryAllocator extends MemoryConsumer {
-  private final UnsafeMemoryAllocator allocator = new UnsafeMemoryAllocator();
-
-  private final long pageSize;
-  private final long totalMemory;
-  private long allocatedMemory = 0L;
-
-  /** The number of bits used to address the page table. */
-  private static final int PAGE_NUMBER_BITS = 13;
-
-  /** The number of entries in the page table. */
-  private static final int PAGE_TABLE_SIZE = 1 << PAGE_NUMBER_BITS;
-
-  private final MemoryBlock[] pageTable = new MemoryBlock[PAGE_TABLE_SIZE];
-  private final BitSet allocatedPages = new BitSet(PAGE_TABLE_SIZE);
+public final class CometShuffleMemoryAllocator extends 
CometShuffleMemoryAllocatorTrait {
+  private static CometShuffleMemoryAllocatorTrait INSTANCE;
 
-  private static final int OFFSET_BITS = 51;
-  private static final long MASK_LONG_LOWER_51_BITS = 0x7FFFFFFFFFFFFL;
-
-  private static CometShuffleMemoryAllocator INSTANCE;
-
-  public static synchronized CometShuffleMemoryAllocator getInstance(
+  /**
+   * Returns the singleton instance of `CometShuffleMemoryAllocator`. This 
method should be used
+   * instead of the constructor to ensure that only one instance of 
`CometShuffleMemoryAllocator` is
+   * created. For Spark tests, this returns `CometTestShuffleMemoryAllocator` 
which is a test-only
+   * allocator that should not be used in production.
+   */
+  public static synchronized CometShuffleMemoryAllocatorTrait getInstance(

Review Comment:
   Okay, sounds good. I will update this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to