viirya commented on code in PR #1063: URL: https://github.com/apache/datafusion-comet/pull/1063#discussion_r1841365156
########## spark/src/main/java/org/apache/spark/shuffle/comet/CometShuffleMemoryAllocator.java: ########## @@ -20,179 +20,86 @@ package org.apache.spark.shuffle.comet; import java.io.IOException; -import java.util.BitSet; import org.apache.spark.SparkConf; import org.apache.spark.memory.MemoryConsumer; import org.apache.spark.memory.MemoryMode; -import org.apache.spark.memory.SparkOutOfMemoryError; import org.apache.spark.memory.TaskMemoryManager; -import org.apache.spark.sql.internal.SQLConf; -import org.apache.spark.unsafe.array.LongArray; import org.apache.spark.unsafe.memory.MemoryBlock; -import org.apache.spark.unsafe.memory.UnsafeMemoryAllocator; +import org.apache.spark.util.Utils; -import org.apache.comet.CometSparkSessionExtensions$; +import org.apache.comet.CometConf$; /** * A simple memory allocator used by `CometShuffleExternalSorter` to allocate memory blocks which - * store serialized rows. We don't rely on Spark memory allocator because we need to allocate - * off-heap memory no matter memory mode is on-heap or off-heap. This allocator is configured with - * fixed size of memory, and it will throw `SparkOutOfMemoryError` if the memory is not enough. - * - * <p>Some methods are copied from `org.apache.spark.unsafe.memory.TaskMemoryManager` with - * modifications. Most modifications are to remove the dependency on the configured memory mode. + * store serialized rows. This class is simply an implementation of `MemoryConsumer` that delegates + * memory allocation to the `TaskMemoryManager`. This requires that the `TaskMemoryManager` is + * configured with `MemoryMode.OFF_HEAP`, i.e. it is using off-heap memory. */ -public final class CometShuffleMemoryAllocator extends MemoryConsumer { - private final UnsafeMemoryAllocator allocator = new UnsafeMemoryAllocator(); - - private final long pageSize; - private final long totalMemory; - private long allocatedMemory = 0L; - - /** The number of bits used to address the page table. */ - private static final int PAGE_NUMBER_BITS = 13; - - /** The number of entries in the page table. */ - private static final int PAGE_TABLE_SIZE = 1 << PAGE_NUMBER_BITS; - - private final MemoryBlock[] pageTable = new MemoryBlock[PAGE_TABLE_SIZE]; - private final BitSet allocatedPages = new BitSet(PAGE_TABLE_SIZE); +public final class CometShuffleMemoryAllocator extends CometShuffleMemoryAllocatorTrait { + private static CometShuffleMemoryAllocatorTrait INSTANCE; - private static final int OFFSET_BITS = 51; - private static final long MASK_LONG_LOWER_51_BITS = 0x7FFFFFFFFFFFFL; - - private static CometShuffleMemoryAllocator INSTANCE; - - public static synchronized CometShuffleMemoryAllocator getInstance( + /** + * Returns the singleton instance of `CometShuffleMemoryAllocator`. This method should be used + * instead of the constructor to ensure that only one instance of `CometShuffleMemoryAllocator` is + * created. For Spark tests, this returns `CometTestShuffleMemoryAllocator` which is a test-only + * allocator that should not be used in production. + */ + public static synchronized CometShuffleMemoryAllocatorTrait getInstance( Review Comment: Okay, sounds good. I will update this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org