advancedxy commented on code in PR #83:
URL: 
https://github.com/apache/arrow-datafusion-comet/pull/83#discussion_r1502504983


##########
core/src/execution/jni_api.rs:
##########
@@ -103,6 +103,7 @@ pub unsafe extern "system" fn 
Java_org_apache_comet_Native_createPlan(
     iterators: jobjectArray,
     serialized_query: jbyteArray,
     metrics_node: JObject,
+    task_memory_manager_obj: JObject,

Review Comment:
   how about rename this to `comet_task_memory_manager_obj`? 
   
   When I first read the code, I thought it was the Spark's TaskMemoryManager 
object. However it's comet's `CometTaskMemoryManager`.   It would be clear to 
call it `comet_task_memory_manager_obj`
   
   Other occurrence could be renamed too.



##########
spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import java.io.IOException;
+
+import org.apache.spark.memory.MemoryConsumer;
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.memory.TaskMemoryManager;
+
+/**
+ * A adapter class that is used by Comet native to acquire & release memory 
through Spark's unified
+ * memory manager. This assumes Spark's off-heap memory mode is enabled.
+ */
+public class CometTaskMemoryManager {
+  private final TaskMemoryManager internal;
+  private final NativeMemoryConsumer nativeMemoryConsumer;
+
+  public CometTaskMemoryManager() {
+    this.internal = TaskContext$.MODULE$.get().taskMemoryManager();
+    this.nativeMemoryConsumer = new NativeMemoryConsumer();
+  }
+
+  // Called by Comet native through JNI.
+  // Returns the actual amount of memory (in bytes) granted.
+  public long acquireMemory(long size) {
+    return internal.acquireExecutionMemory(size, nativeMemoryConsumer);
+  }
+
+  // Called by Comet native through JNI
+  public void releaseMemory(long size) {
+    internal.releaseExecutionMemory(size, nativeMemoryConsumer);
+  }
+
+  /**
+   * A dummy memory consumer that does nothing when spilling. At the moment, 
Comet native doesn't
+   * share the same API as Spark and cannot trigger spill when acquire memory. 
Therefore, when
+   * acquiring memory from native or JVM, spilling can only be triggered from 
JVM operators.
+   */
+  private class NativeMemoryConsumer extends MemoryConsumer {

Review Comment:
   The consumer's toString might be used when the debugging log is turned on. 
   
   It would be great that we can override this class to provide `toString` 
method and also add a unique flag/id to identify the corresponding consumer for 
the native plan/execution.



##########
core/src/execution/jni_api.rs:
##########
@@ -186,18 +189,29 @@ fn prepare_datafusion_session_context(
 
     let mut rt_config = 
RuntimeConfig::new().with_disk_manager(DiskManagerConfig::NewOs);
 
-    // Set up memory limit if specified
-    if conf.contains_key("memory_limit") {
-        let memory_limit = conf.get("memory_limit").unwrap().parse::<usize>()?;
-
-        let memory_fraction = conf
-            .get("memory_fraction")
-            .ok_or(CometError::Internal(
-                "Config 'memory_fraction' is not specified from Comet JVM 
side".to_string(),
-            ))?
-            .parse::<f64>()?;
-
-        rt_config = rt_config.with_memory_limit(memory_limit, memory_fraction);
+    let use_unified_memory_manager = conf
+        .get("use_unified_memory_manager")
+        .ok_or(CometError::Internal(
+            "Config 'use_unified_memory_manager' is not specified from Comet 
JVM side".to_string(),
+        ))?

Review Comment:
   I believe a more permissive way is to treat **unsetting** 
`use_unified_memory_manager` as false?



##########
spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala:
##########
@@ -65,28 +64,18 @@ abstract class CometTestBase
     val conf = new SparkConf()
     conf.set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName)
     conf.set(SQLConf.SHUFFLE_PARTITIONS, 10) // reduce parallelism in tests
-    conf.set("spark.shuffle.manager", shuffleManager)
+    conf.set(SQLConf.ANSI_ENABLED.key, "false")
+    conf.set(SHUFFLE_MANAGER, shuffleManager)
+    conf.set(MEMORY_OFFHEAP_ENABLED.key, "true")

Review Comment:
   Are we still going to test the default memory pool implementation in 
DataFusion?
   
   Seems like all the test code path are the unified memory manager now.



##########
spark/src/main/scala/org/apache/comet/CometExecIterator.scala:
##########
@@ -54,7 +54,13 @@ class CometExecIterator(
   }.toArray
   private val plan = {
     val configs = createNativeConf
-    nativeLib.createPlan(id, configs, cometBatchIterators, protobufQueryPlan, 
nativeMetrics)
+    nativeLib.createPlan(
+      id,
+      configs,
+      cometBatchIterators,
+      protobufQueryPlan,
+      nativeMetrics,
+      new CometTaskMemoryManager)

Review Comment:
   I'm referring this. I think we can pass `id` to `CometTaskMemoryManager` and 
use that for identity mark.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to