This is an automated email from the ASF dual-hosted git repository.

kazuyukitanimura pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 655081b6 test: enabling Spark tests with offHeap requirement (#1177)
655081b6 is described below

commit 655081b66808d8f70abebd4d85af7c401c579a3a
Author: KAZUYUKI TANIMURA <[email protected]>
AuthorDate: Wed Dec 18 09:10:34 2024 -0800

    test: enabling Spark tests with offHeap requirement (#1177)
    
    ## Which issue does this PR close?
    
    ## Rationale for this change
    
    After https://github.com/apache/datafusion-comet/pull/1062 We have not 
running Spark tests for native execution
    
    ## What changes are included in this PR?
    
    Removed the off heap requirement for testing
    
    ## How are these changes tested?
    
    Bringing back Spark tests for native execution
---
 dev/diffs/4.0.0-preview1.diff                      | 103 ++++++++++++++++++++-
 native/core/src/execution/jni_api.rs               |  26 +++++-
 .../scala/org/apache/comet/CometExecIterator.scala |   9 +-
 .../apache/comet/CometSparkSessionExtensions.scala |  10 +-
 spark/src/main/scala/org/apache/comet/Native.scala |   5 +
 5 files changed, 142 insertions(+), 11 deletions(-)

diff --git a/dev/diffs/4.0.0-preview1.diff b/dev/diffs/4.0.0-preview1.diff
index ba68d2a7..db62ed60 100644
--- a/dev/diffs/4.0.0-preview1.diff
+++ b/dev/diffs/4.0.0-preview1.diff
@@ -146,6 +146,77 @@ index 698ca009b4f..57d774a3617 100644
  
  -- Test tables
  CREATE table  explain_temp1 (key int, val int) USING PARQUET;
+diff --git a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int4.sql 
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int4.sql
+index 3a409eea348..26e9aaf215c 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int4.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int4.sql
+@@ -6,6 +6,9 @@
+ -- 
https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/int4.sql
+ --
+ 
++-- TODO: https://github.com/apache/datafusion-comet/issues/551
++--SET spark.comet.enabled = false
++
+ CREATE TABLE INT4_TBL(f1 int) USING parquet;
+ 
+ -- [SPARK-28023] Trim the string when cast string type to other types
+diff --git a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql 
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql
+index fac23b4a26f..98b12ae5ccc 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql
+@@ -6,6 +6,10 @@
+ -- Test int8 64-bit integers.
+ -- 
https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/int8.sql
+ --
++
++-- TODO: https://github.com/apache/datafusion-comet/issues/551
++--SET spark.comet.enabled = false
++
+ CREATE TABLE INT8_TBL(q1 bigint, q2 bigint) USING parquet;
+ 
+ -- PostgreSQL implicitly casts string literals to data with integral types, 
but
+diff --git 
a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql 
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql
+index 0efe0877e9b..f9df0400c99 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql
+@@ -6,6 +6,9 @@
+ -- 
https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/select_having.sql
+ --
+ 
++-- TODO: https://github.com/apache/datafusion-comet/issues/551
++--SET spark.comet.enabled = false
++
+ -- load test data
+ CREATE TABLE test_having (a int, b int, c string, d string) USING parquet;
+ INSERT INTO test_having VALUES (0, 1, 'XXXX', 'A');
+diff --git 
a/sql/core/src/test/resources/sql-tests/inputs/view-schema-binding-config.sql 
b/sql/core/src/test/resources/sql-tests/inputs/view-schema-binding-config.sql
+index e803254ea64..74db78aee38 100644
+--- 
a/sql/core/src/test/resources/sql-tests/inputs/view-schema-binding-config.sql
++++ 
b/sql/core/src/test/resources/sql-tests/inputs/view-schema-binding-config.sql
+@@ -1,6 +1,9 @@
+ -- This test suits check the spark.sql.viewSchemaBindingMode configuration.
+ -- It can be DISABLED and COMPENSATION
+ 
++-- TODO: https://github.com/apache/datafusion-comet/issues/551
++--SET spark.comet.enabled = false
++
+ -- Verify the default binding is true
+ SET spark.sql.legacy.viewSchemaBindingMode;
+ 
+diff --git 
a/sql/core/src/test/resources/sql-tests/inputs/view-schema-compensation.sql 
b/sql/core/src/test/resources/sql-tests/inputs/view-schema-compensation.sql
+index 21a3ce1e122..f4762ab98f0 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/view-schema-compensation.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/view-schema-compensation.sql
+@@ -1,5 +1,9 @@
+ -- This test suite checks the WITH SCHEMA COMPENSATION clause
+ -- Disable ANSI mode to ensure we are forcing it explicitly in the CASTS
++
++-- TODO: https://github.com/apache/datafusion-comet/issues/551
++--SET spark.comet.enabled = false
++
+ SET spark.sql.ansi.enabled = false;
+ 
+ -- In COMPENSATION views get invalidated if the type can't cast
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
 index d023fb82185..0f4f03bda6c 100644
 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -917,7 +988,7 @@ index 34c6c49bc49..f5dea07a213 100644
    protected val baseResourcePath = {
      // use the same way as `SQLQueryTestSuite` to get the resource path
 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
-index 56c364e2084..a00a50e020a 100644
+index 56c364e2084..fc3abd7cdc4 100644
 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
 +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
 @@ -1510,7 +1510,8 @@ class SQLQuerySuite extends QueryTest with 
SharedSparkSession with AdaptiveSpark
@@ -930,6 +1001,36 @@ index 56c364e2084..a00a50e020a 100644
      AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "external 
sort") {
        sql("SELECT * FROM testData2 ORDER BY a ASC, b ASC").collect()
      }
+@@ -4454,7 +4455,8 @@ class SQLQuerySuite extends QueryTest with 
SharedSparkSession with AdaptiveSpark
+   }
+ 
+   test("SPARK-39166: Query context of binary arithmetic should be serialized 
to executors" +
+-    " when WSCG is off") {
++    " when WSCG is off",
++    IgnoreComet("TODO: 
https://github.com/apache/datafusion-comet/issues/551";)) {
+     withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
+       SQLConf.ANSI_ENABLED.key -> "true") {
+       withTable("t") {
+@@ -4475,7 +4477,8 @@ class SQLQuerySuite extends QueryTest with 
SharedSparkSession with AdaptiveSpark
+   }
+ 
+   test("SPARK-39175: Query context of Cast should be serialized to executors" 
+
+-    " when WSCG is off") {
++    " when WSCG is off",
++    IgnoreComet("TODO: 
https://github.com/apache/datafusion-comet/issues/551";)) {
+     withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
+       SQLConf.ANSI_ENABLED.key -> "true") {
+       withTable("t") {
+@@ -4502,7 +4505,8 @@ class SQLQuerySuite extends QueryTest with 
SharedSparkSession with AdaptiveSpark
+   }
+ 
+   test("SPARK-39190,SPARK-39208,SPARK-39210: Query context of decimal 
overflow error should " +
+-    "be serialized to executors when WSCG is off") {
++    "be serialized to executors when WSCG is off",
++    IgnoreComet("TODO: 
https://github.com/apache/datafusion-comet/issues/551";)) {
+     withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
+       SQLConf.ANSI_ENABLED.key -> "true") {
+       withTable("t") {
 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
 index 68f14f13bbd..174636cefb5 100644
 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
diff --git a/native/core/src/execution/jni_api.rs 
b/native/core/src/execution/jni_api.rs
index 491b389c..eb73675b 100644
--- a/native/core/src/execution/jni_api.rs
+++ b/native/core/src/execution/jni_api.rs
@@ -106,6 +106,9 @@ pub unsafe extern "system" fn 
Java_org_apache_comet_Native_createPlan(
     metrics_node: JObject,
     comet_task_memory_manager_obj: JObject,
     batch_size: jint,
+    use_unified_memory_manager: jboolean,
+    memory_limit: jlong,
+    memory_fraction: jdouble,
     debug_native: jboolean,
     explain_native: jboolean,
     worker_threads: jint,
@@ -147,7 +150,13 @@ pub unsafe extern "system" fn 
Java_org_apache_comet_Native_createPlan(
         // We need to keep the session context alive. Some session state like 
temporary
         // dictionaries are stored in session context. If it is dropped, the 
temporary
         // dictionaries will be dropped as well.
-        let session = prepare_datafusion_session_context(batch_size as usize, 
task_memory_manager)?;
+        let session = prepare_datafusion_session_context(
+            batch_size as usize,
+            use_unified_memory_manager == 1,
+            memory_limit as usize,
+            memory_fraction,
+            task_memory_manager,
+        )?;
 
         let plan_creation_time = start.elapsed();
 
@@ -174,13 +183,22 @@ pub unsafe extern "system" fn 
Java_org_apache_comet_Native_createPlan(
 /// Configure DataFusion session context.
 fn prepare_datafusion_session_context(
     batch_size: usize,
+    use_unified_memory_manager: bool,
+    memory_limit: usize,
+    memory_fraction: f64,
     comet_task_memory_manager: Arc<GlobalRef>,
 ) -> CometResult<SessionContext> {
     let mut rt_config = 
RuntimeConfig::new().with_disk_manager(DiskManagerConfig::NewOs);
 
-    // Set Comet memory pool for native
-    let memory_pool = CometMemoryPool::new(comet_task_memory_manager);
-    rt_config = rt_config.with_memory_pool(Arc::new(memory_pool));
+    // Check if we are using unified memory manager integrated with Spark.
+    if use_unified_memory_manager {
+        // Set Comet memory pool for native
+        let memory_pool = CometMemoryPool::new(comet_task_memory_manager);
+        rt_config = rt_config.with_memory_pool(Arc::new(memory_pool));
+    } else {
+        // Use the memory pool from DF
+        rt_config = rt_config.with_memory_limit(memory_limit, memory_fraction)
+    }
 
     // Get Datafusion configuration from Spark Execution context
     // can be configured in Comet Spark JVM using Spark --conf parameters
diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala 
b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
index d57e9e2b..04d93069 100644
--- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
+++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
@@ -23,7 +23,7 @@ import org.apache.spark._
 import org.apache.spark.sql.comet.CometMetricNode
 import org.apache.spark.sql.vectorized._
 
-import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS, 
COMET_DEBUG_ENABLED, COMET_EXPLAIN_NATIVE_ENABLED, COMET_WORKER_THREADS}
+import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS, 
COMET_DEBUG_ENABLED, COMET_EXEC_MEMORY_FRACTION, COMET_EXPLAIN_NATIVE_ENABLED, 
COMET_WORKER_THREADS}
 import org.apache.comet.vector.NativeUtil
 
 /**
@@ -60,6 +60,10 @@ class CometExecIterator(
     new CometBatchIterator(iterator, nativeUtil)
   }.toArray
   private val plan = {
+    val conf = SparkEnv.get.conf
+    // Only enable unified memory manager when off-heap mode is enabled. 
Otherwise,
+    // we'll use the built-in memory pool from DF, and initializes with 
`memory_limit`
+    // and `memory_fraction` below.
     nativeLib.createPlan(
       id,
       cometBatchIterators,
@@ -67,6 +71,9 @@ class CometExecIterator(
       nativeMetrics,
       new CometTaskMemoryManager(id),
       batchSize = COMET_BATCH_SIZE.get(),
+      use_unified_memory_manager = 
conf.getBoolean("spark.memory.offHeap.enabled", false),
+      memory_limit = CometSparkSessionExtensions.getCometMemoryOverhead(conf),
+      memory_fraction = COMET_EXEC_MEMORY_FRACTION.get(),
       debug = COMET_DEBUG_ENABLED.get(),
       explain = COMET_EXPLAIN_NATIVE_ENABLED.get(),
       workerThreads = COMET_WORKER_THREADS.get(),
diff --git 
a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala 
b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
index 61c45daf..8bff6b5f 100644
--- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
+++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
@@ -53,7 +53,7 @@ import org.apache.spark.sql.types.{DoubleType, FloatType}
 
 import org.apache.comet.CometConf._
 import org.apache.comet.CometExplainInfo.getActualPlan
-import org.apache.comet.CometSparkSessionExtensions.{createMessage, 
getCometBroadcastNotEnabledReason, getCometShuffleNotEnabledReason, 
isANSIEnabled, isCometBroadCastForceEnabled, isCometEnabled, 
isCometExecEnabled, isCometJVMShuffleMode, isCometNativeShuffleMode, 
isCometScan, isCometScanEnabled, isCometShuffleEnabled, isSpark34Plus, 
isSpark40Plus, shouldApplySparkToColumnar, withInfo, withInfos}
+import org.apache.comet.CometSparkSessionExtensions.{createMessage, 
getCometBroadcastNotEnabledReason, getCometShuffleNotEnabledReason, 
isANSIEnabled, isCometBroadCastForceEnabled, isCometEnabled, 
isCometExecEnabled, isCometJVMShuffleMode, isCometNativeShuffleMode, 
isCometScan, isCometScanEnabled, isCometShuffleEnabled, isOffHeapEnabled, 
isSpark34Plus, isSpark40Plus, isTesting, shouldApplySparkToColumnar, withInfo, 
withInfos}
 import org.apache.comet.parquet.{CometParquetScan, SupportsComet}
 import org.apache.comet.rules.RewriteJoin
 import org.apache.comet.serde.OperatorOuterClass.Operator
@@ -921,8 +921,9 @@ class CometSparkSessionExtensions
     override def apply(plan: SparkPlan): SparkPlan = {
 
       // Comet required off-heap memory to be enabled
-      if ("true" != conf.getConfString("spark.memory.offHeap.enabled", 
"false")) {
-        logInfo("Comet extension disabled because 
spark.memory.offHeap.enabled=false")
+      if (!isOffHeapEnabled(conf) && !isTesting) {
+        logWarning("Comet native exec disabled because 
spark.memory.offHeap.enabled=false")
+        withInfo(plan, "Comet native exec disabled because 
spark.memory.offHeap.enabled=false")
         return plan
       }
 
@@ -1174,8 +1175,7 @@ object CometSparkSessionExtensions extends Logging {
   }
 
   private[comet] def isOffHeapEnabled(conf: SQLConf): Boolean =
-    conf.contains("spark.memory.offHeap.enabled") &&
-      conf.getConfString("spark.memory.offHeap.enabled").toBoolean
+    conf.getConfString("spark.memory.offHeap.enabled", "false").toBoolean
 
   // Copied from org.apache.spark.util.Utils which is private to Spark.
   private[comet] def isTesting: Boolean = {
diff --git a/spark/src/main/scala/org/apache/comet/Native.scala 
b/spark/src/main/scala/org/apache/comet/Native.scala
index 64ada91a..083c0f2b 100644
--- a/spark/src/main/scala/org/apache/comet/Native.scala
+++ b/spark/src/main/scala/org/apache/comet/Native.scala
@@ -43,6 +43,7 @@ class Native extends NativeBase {
    * @return
    *   the address to native query plan.
    */
+  // scalastyle:off
   @native def createPlan(
       id: Long,
       iterators: Array[CometBatchIterator],
@@ -50,10 +51,14 @@ class Native extends NativeBase {
       metrics: CometMetricNode,
       taskMemoryManager: CometTaskMemoryManager,
       batchSize: Int,
+      use_unified_memory_manager: Boolean,
+      memory_limit: Long,
+      memory_fraction: Double,
       debug: Boolean,
       explain: Boolean,
       workerThreads: Int,
       blockingThreads: Int): Long
+  // scalastyle:on
 
   /**
    * Execute a native query plan based on given input Arrow arrays.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to