sunchao commented on code in PR #83:
URL: 
https://github.com/apache/arrow-datafusion-comet/pull/83#discussion_r1503800708


##########
spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import java.io.IOException;
+
+import org.apache.spark.memory.MemoryConsumer;
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.memory.TaskMemoryManager;
+
+/**
+ * A adapter class that is used by Comet native to acquire & release memory 
through Spark's unified
+ * memory manager. This assumes Spark's off-heap memory mode is enabled.
+ */
+public class CometTaskMemoryManager {
+  private final TaskMemoryManager internal;
+  private final NativeMemoryConsumer nativeMemoryConsumer;
+
+  public CometTaskMemoryManager() {
+    this.internal = TaskContext$.MODULE$.get().taskMemoryManager();
+    this.nativeMemoryConsumer = new NativeMemoryConsumer();
+  }
+
+  // Called by Comet native through JNI.
+  // Returns the actual amount of memory (in bytes) granted.
+  public long acquireMemory(long size) {
+    return internal.acquireExecutionMemory(size, nativeMemoryConsumer);
+  }
+
+  // Called by Comet native through JNI
+  public void releaseMemory(long size) {
+    internal.releaseExecutionMemory(size, nativeMemoryConsumer);
+  }
+
+  /**
+   * A dummy memory consumer that does nothing when spilling. At the moment, 
Comet native doesn't
+   * share the same API as Spark and cannot trigger spill when acquire memory. 
Therefore, when
+   * acquiring memory from native or JVM, spilling can only be triggered from 
JVM operators.
+   */
+  private class NativeMemoryConsumer extends MemoryConsumer {

Review Comment:
   Sure, I can add a `toString` for now and we can figure out how to use it for 
debugging purpose later.



##########
core/src/execution/jni_api.rs:
##########
@@ -103,6 +103,7 @@ pub unsafe extern "system" fn 
Java_org_apache_comet_Native_createPlan(
     iterators: jobjectArray,
     serialized_query: jbyteArray,
     metrics_node: JObject,
+    task_memory_manager_obj: JObject,

Review Comment:
   Sure



##########
spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala:
##########
@@ -65,28 +64,18 @@ abstract class CometTestBase
     val conf = new SparkConf()
     conf.set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName)
     conf.set(SQLConf.SHUFFLE_PARTITIONS, 10) // reduce parallelism in tests
-    conf.set("spark.shuffle.manager", shuffleManager)
+    conf.set(SQLConf.ANSI_ENABLED.key, "false")
+    conf.set(SHUFFLE_MANAGER, shuffleManager)
+    conf.set(MEMORY_OFFHEAP_ENABLED.key, "true")

Review Comment:
   In the long term I'm thinking to only use the memory pool defined in Comet. 
This currently requires users to turn on off-heap mode in Spark and set the 
off-heap memory accordingly, so configuration changes are necessary when they 
want to use Comet. Ideally we should be able to use `DriverPlugin` to override 
the memory settings so Comet may just work out of box (need to change Spark in 
a few places).
   
   The default memory manager path is kept only for now until we are able to do 
the override through `DriverPlugin`. Internally we still run all the Spark SQL 
tests using the default memory manager, and can probably do the same here too. 



##########
core/src/execution/jni_api.rs:
##########
@@ -186,18 +189,29 @@ fn prepare_datafusion_session_context(
 
     let mut rt_config = 
RuntimeConfig::new().with_disk_manager(DiskManagerConfig::NewOs);
 
-    // Set up memory limit if specified
-    if conf.contains_key("memory_limit") {
-        let memory_limit = conf.get("memory_limit").unwrap().parse::<usize>()?;
-
-        let memory_fraction = conf
-            .get("memory_fraction")
-            .ok_or(CometError::Internal(
-                "Config 'memory_fraction' is not specified from Comet JVM 
side".to_string(),
-            ))?
-            .parse::<f64>()?;
-
-        rt_config = rt_config.with_memory_limit(memory_limit, memory_fraction);
+    let use_unified_memory_manager = conf
+        .get("use_unified_memory_manager")
+        .ok_or(CometError::Internal(
+            "Config 'use_unified_memory_manager' is not specified from Comet 
JVM side".to_string(),
+        ))?

Review Comment:
   Yea sure, although this is an internal error from developer side if not set.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to