This is an automated email from the ASF dual-hosted git repository.
kazuyukitanimura pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 655081b6 test: enabling Spark tests with offHeap requirement (#1177)
655081b6 is described below
commit 655081b66808d8f70abebd4d85af7c401c579a3a
Author: KAZUYUKI TANIMURA <[email protected]>
AuthorDate: Wed Dec 18 09:10:34 2024 -0800
test: enabling Spark tests with offHeap requirement (#1177)
## Which issue does this PR close?
## Rationale for this change
After https://github.com/apache/datafusion-comet/pull/1062 We have not
running Spark tests for native execution
## What changes are included in this PR?
Removed the off heap requirement for testing
## How are these changes tested?
Bringing back Spark tests for native execution
---
dev/diffs/4.0.0-preview1.diff | 103 ++++++++++++++++++++-
native/core/src/execution/jni_api.rs | 26 +++++-
.../scala/org/apache/comet/CometExecIterator.scala | 9 +-
.../apache/comet/CometSparkSessionExtensions.scala | 10 +-
spark/src/main/scala/org/apache/comet/Native.scala | 5 +
5 files changed, 142 insertions(+), 11 deletions(-)
diff --git a/dev/diffs/4.0.0-preview1.diff b/dev/diffs/4.0.0-preview1.diff
index ba68d2a7..db62ed60 100644
--- a/dev/diffs/4.0.0-preview1.diff
+++ b/dev/diffs/4.0.0-preview1.diff
@@ -146,6 +146,77 @@ index 698ca009b4f..57d774a3617 100644
-- Test tables
CREATE table explain_temp1 (key int, val int) USING PARQUET;
+diff --git a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int4.sql
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int4.sql
+index 3a409eea348..26e9aaf215c 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int4.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int4.sql
+@@ -6,6 +6,9 @@
+ --
https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/int4.sql
+ --
+
++-- TODO: https://github.com/apache/datafusion-comet/issues/551
++--SET spark.comet.enabled = false
++
+ CREATE TABLE INT4_TBL(f1 int) USING parquet;
+
+ -- [SPARK-28023] Trim the string when cast string type to other types
+diff --git a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql
+index fac23b4a26f..98b12ae5ccc 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql
+@@ -6,6 +6,10 @@
+ -- Test int8 64-bit integers.
+ --
https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/int8.sql
+ --
++
++-- TODO: https://github.com/apache/datafusion-comet/issues/551
++--SET spark.comet.enabled = false
++
+ CREATE TABLE INT8_TBL(q1 bigint, q2 bigint) USING parquet;
+
+ -- PostgreSQL implicitly casts string literals to data with integral types,
but
+diff --git
a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql
+index 0efe0877e9b..f9df0400c99 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql
+@@ -6,6 +6,9 @@
+ --
https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/select_having.sql
+ --
+
++-- TODO: https://github.com/apache/datafusion-comet/issues/551
++--SET spark.comet.enabled = false
++
+ -- load test data
+ CREATE TABLE test_having (a int, b int, c string, d string) USING parquet;
+ INSERT INTO test_having VALUES (0, 1, 'XXXX', 'A');
+diff --git
a/sql/core/src/test/resources/sql-tests/inputs/view-schema-binding-config.sql
b/sql/core/src/test/resources/sql-tests/inputs/view-schema-binding-config.sql
+index e803254ea64..74db78aee38 100644
+---
a/sql/core/src/test/resources/sql-tests/inputs/view-schema-binding-config.sql
++++
b/sql/core/src/test/resources/sql-tests/inputs/view-schema-binding-config.sql
+@@ -1,6 +1,9 @@
+ -- This test suits check the spark.sql.viewSchemaBindingMode configuration.
+ -- It can be DISABLED and COMPENSATION
+
++-- TODO: https://github.com/apache/datafusion-comet/issues/551
++--SET spark.comet.enabled = false
++
+ -- Verify the default binding is true
+ SET spark.sql.legacy.viewSchemaBindingMode;
+
+diff --git
a/sql/core/src/test/resources/sql-tests/inputs/view-schema-compensation.sql
b/sql/core/src/test/resources/sql-tests/inputs/view-schema-compensation.sql
+index 21a3ce1e122..f4762ab98f0 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/view-schema-compensation.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/view-schema-compensation.sql
+@@ -1,5 +1,9 @@
+ -- This test suite checks the WITH SCHEMA COMPENSATION clause
+ -- Disable ANSI mode to ensure we are forcing it explicitly in the CASTS
++
++-- TODO: https://github.com/apache/datafusion-comet/issues/551
++--SET spark.comet.enabled = false
++
+ SET spark.sql.ansi.enabled = false;
+
+ -- In COMPENSATION views get invalidated if the type can't cast
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index d023fb82185..0f4f03bda6c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -917,7 +988,7 @@ index 34c6c49bc49..f5dea07a213 100644
protected val baseResourcePath = {
// use the same way as `SQLQueryTestSuite` to get the resource path
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
-index 56c364e2084..a00a50e020a 100644
+index 56c364e2084..fc3abd7cdc4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1510,7 +1510,8 @@ class SQLQuerySuite extends QueryTest with
SharedSparkSession with AdaptiveSpark
@@ -930,6 +1001,36 @@ index 56c364e2084..a00a50e020a 100644
AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "external
sort") {
sql("SELECT * FROM testData2 ORDER BY a ASC, b ASC").collect()
}
+@@ -4454,7 +4455,8 @@ class SQLQuerySuite extends QueryTest with
SharedSparkSession with AdaptiveSpark
+ }
+
+ test("SPARK-39166: Query context of binary arithmetic should be serialized
to executors" +
+- " when WSCG is off") {
++ " when WSCG is off",
++ IgnoreComet("TODO:
https://github.com/apache/datafusion-comet/issues/551")) {
+ withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
+ SQLConf.ANSI_ENABLED.key -> "true") {
+ withTable("t") {
+@@ -4475,7 +4477,8 @@ class SQLQuerySuite extends QueryTest with
SharedSparkSession with AdaptiveSpark
+ }
+
+ test("SPARK-39175: Query context of Cast should be serialized to executors"
+
+- " when WSCG is off") {
++ " when WSCG is off",
++ IgnoreComet("TODO:
https://github.com/apache/datafusion-comet/issues/551")) {
+ withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
+ SQLConf.ANSI_ENABLED.key -> "true") {
+ withTable("t") {
+@@ -4502,7 +4505,8 @@ class SQLQuerySuite extends QueryTest with
SharedSparkSession with AdaptiveSpark
+ }
+
+ test("SPARK-39190,SPARK-39208,SPARK-39210: Query context of decimal
overflow error should " +
+- "be serialized to executors when WSCG is off") {
++ "be serialized to executors when WSCG is off",
++ IgnoreComet("TODO:
https://github.com/apache/datafusion-comet/issues/551")) {
+ withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
+ SQLConf.ANSI_ENABLED.key -> "true") {
+ withTable("t") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index 68f14f13bbd..174636cefb5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
diff --git a/native/core/src/execution/jni_api.rs
b/native/core/src/execution/jni_api.rs
index 491b389c..eb73675b 100644
--- a/native/core/src/execution/jni_api.rs
+++ b/native/core/src/execution/jni_api.rs
@@ -106,6 +106,9 @@ pub unsafe extern "system" fn
Java_org_apache_comet_Native_createPlan(
metrics_node: JObject,
comet_task_memory_manager_obj: JObject,
batch_size: jint,
+ use_unified_memory_manager: jboolean,
+ memory_limit: jlong,
+ memory_fraction: jdouble,
debug_native: jboolean,
explain_native: jboolean,
worker_threads: jint,
@@ -147,7 +150,13 @@ pub unsafe extern "system" fn
Java_org_apache_comet_Native_createPlan(
// We need to keep the session context alive. Some session state like
temporary
// dictionaries are stored in session context. If it is dropped, the
temporary
// dictionaries will be dropped as well.
- let session = prepare_datafusion_session_context(batch_size as usize,
task_memory_manager)?;
+ let session = prepare_datafusion_session_context(
+ batch_size as usize,
+ use_unified_memory_manager == 1,
+ memory_limit as usize,
+ memory_fraction,
+ task_memory_manager,
+ )?;
let plan_creation_time = start.elapsed();
@@ -174,13 +183,22 @@ pub unsafe extern "system" fn
Java_org_apache_comet_Native_createPlan(
/// Configure DataFusion session context.
fn prepare_datafusion_session_context(
batch_size: usize,
+ use_unified_memory_manager: bool,
+ memory_limit: usize,
+ memory_fraction: f64,
comet_task_memory_manager: Arc<GlobalRef>,
) -> CometResult<SessionContext> {
let mut rt_config =
RuntimeConfig::new().with_disk_manager(DiskManagerConfig::NewOs);
- // Set Comet memory pool for native
- let memory_pool = CometMemoryPool::new(comet_task_memory_manager);
- rt_config = rt_config.with_memory_pool(Arc::new(memory_pool));
+ // Check if we are using unified memory manager integrated with Spark.
+ if use_unified_memory_manager {
+ // Set Comet memory pool for native
+ let memory_pool = CometMemoryPool::new(comet_task_memory_manager);
+ rt_config = rt_config.with_memory_pool(Arc::new(memory_pool));
+ } else {
+ // Use the memory pool from DF
+ rt_config = rt_config.with_memory_limit(memory_limit, memory_fraction)
+ }
// Get Datafusion configuration from Spark Execution context
// can be configured in Comet Spark JVM using Spark --conf parameters
diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
index d57e9e2b..04d93069 100644
--- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
+++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
@@ -23,7 +23,7 @@ import org.apache.spark._
import org.apache.spark.sql.comet.CometMetricNode
import org.apache.spark.sql.vectorized._
-import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS,
COMET_DEBUG_ENABLED, COMET_EXPLAIN_NATIVE_ENABLED, COMET_WORKER_THREADS}
+import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS,
COMET_DEBUG_ENABLED, COMET_EXEC_MEMORY_FRACTION, COMET_EXPLAIN_NATIVE_ENABLED,
COMET_WORKER_THREADS}
import org.apache.comet.vector.NativeUtil
/**
@@ -60,6 +60,10 @@ class CometExecIterator(
new CometBatchIterator(iterator, nativeUtil)
}.toArray
private val plan = {
+ val conf = SparkEnv.get.conf
+ // Only enable unified memory manager when off-heap mode is enabled.
Otherwise,
+ // we'll use the built-in memory pool from DF, and initializes with
`memory_limit`
+ // and `memory_fraction` below.
nativeLib.createPlan(
id,
cometBatchIterators,
@@ -67,6 +71,9 @@ class CometExecIterator(
nativeMetrics,
new CometTaskMemoryManager(id),
batchSize = COMET_BATCH_SIZE.get(),
+ use_unified_memory_manager =
conf.getBoolean("spark.memory.offHeap.enabled", false),
+ memory_limit = CometSparkSessionExtensions.getCometMemoryOverhead(conf),
+ memory_fraction = COMET_EXEC_MEMORY_FRACTION.get(),
debug = COMET_DEBUG_ENABLED.get(),
explain = COMET_EXPLAIN_NATIVE_ENABLED.get(),
workerThreads = COMET_WORKER_THREADS.get(),
diff --git
a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
index 61c45daf..8bff6b5f 100644
--- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
+++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
@@ -53,7 +53,7 @@ import org.apache.spark.sql.types.{DoubleType, FloatType}
import org.apache.comet.CometConf._
import org.apache.comet.CometExplainInfo.getActualPlan
-import org.apache.comet.CometSparkSessionExtensions.{createMessage,
getCometBroadcastNotEnabledReason, getCometShuffleNotEnabledReason,
isANSIEnabled, isCometBroadCastForceEnabled, isCometEnabled,
isCometExecEnabled, isCometJVMShuffleMode, isCometNativeShuffleMode,
isCometScan, isCometScanEnabled, isCometShuffleEnabled, isSpark34Plus,
isSpark40Plus, shouldApplySparkToColumnar, withInfo, withInfos}
+import org.apache.comet.CometSparkSessionExtensions.{createMessage,
getCometBroadcastNotEnabledReason, getCometShuffleNotEnabledReason,
isANSIEnabled, isCometBroadCastForceEnabled, isCometEnabled,
isCometExecEnabled, isCometJVMShuffleMode, isCometNativeShuffleMode,
isCometScan, isCometScanEnabled, isCometShuffleEnabled, isOffHeapEnabled,
isSpark34Plus, isSpark40Plus, isTesting, shouldApplySparkToColumnar, withInfo,
withInfos}
import org.apache.comet.parquet.{CometParquetScan, SupportsComet}
import org.apache.comet.rules.RewriteJoin
import org.apache.comet.serde.OperatorOuterClass.Operator
@@ -921,8 +921,9 @@ class CometSparkSessionExtensions
override def apply(plan: SparkPlan): SparkPlan = {
// Comet required off-heap memory to be enabled
- if ("true" != conf.getConfString("spark.memory.offHeap.enabled",
"false")) {
- logInfo("Comet extension disabled because
spark.memory.offHeap.enabled=false")
+ if (!isOffHeapEnabled(conf) && !isTesting) {
+ logWarning("Comet native exec disabled because
spark.memory.offHeap.enabled=false")
+ withInfo(plan, "Comet native exec disabled because
spark.memory.offHeap.enabled=false")
return plan
}
@@ -1174,8 +1175,7 @@ object CometSparkSessionExtensions extends Logging {
}
private[comet] def isOffHeapEnabled(conf: SQLConf): Boolean =
- conf.contains("spark.memory.offHeap.enabled") &&
- conf.getConfString("spark.memory.offHeap.enabled").toBoolean
+ conf.getConfString("spark.memory.offHeap.enabled", "false").toBoolean
// Copied from org.apache.spark.util.Utils which is private to Spark.
private[comet] def isTesting: Boolean = {
diff --git a/spark/src/main/scala/org/apache/comet/Native.scala
b/spark/src/main/scala/org/apache/comet/Native.scala
index 64ada91a..083c0f2b 100644
--- a/spark/src/main/scala/org/apache/comet/Native.scala
+++ b/spark/src/main/scala/org/apache/comet/Native.scala
@@ -43,6 +43,7 @@ class Native extends NativeBase {
* @return
* the address to native query plan.
*/
+ // scalastyle:off
@native def createPlan(
id: Long,
iterators: Array[CometBatchIterator],
@@ -50,10 +51,14 @@ class Native extends NativeBase {
metrics: CometMetricNode,
taskMemoryManager: CometTaskMemoryManager,
batchSize: Int,
+ use_unified_memory_manager: Boolean,
+ memory_limit: Long,
+ memory_fraction: Double,
debug: Boolean,
explain: Boolean,
workerThreads: Int,
blockingThreads: Int): Long
+ // scalastyle:on
/**
* Execute a native query plan based on given input Arrow arrays.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]