This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new bce61f54c Fix CometShuffleManager hang by deferring SparkEnv access 
(#3002)
bce61f54c is described below

commit bce61f54ce2450f76223d45c3305802c3bb77601
Author: Shekhar Prasad Rajak <[email protected]>
AuthorDate: Mon Dec 29 20:58:29 2025 +0530

    Fix CometShuffleManager hang by deferring SparkEnv access (#3002)
---
 .github/workflows/spark_sql_test.yml               |  4 --
 .../execution/shuffle/CometShuffleManager.scala    | 45 +++++++++++++++-------
 2 files changed, 31 insertions(+), 18 deletions(-)

diff --git a/.github/workflows/spark_sql_test.yml 
b/.github/workflows/spark_sql_test.yml
index 3d7aa2e2f..d143ef83a 100644
--- a/.github/workflows/spark_sql_test.yml
+++ b/.github/workflows/spark_sql_test.yml
@@ -59,10 +59,6 @@ jobs:
           - {name: "sql_hive-1", args1: "", args2: "hive/testOnly * -- -l 
org.apache.spark.tags.ExtendedHiveTest -l org.apache.spark.tags.SlowHiveTest"}
           - {name: "sql_hive-2", args1: "", args2: "hive/testOnly * -- -n 
org.apache.spark.tags.ExtendedHiveTest"}
           - {name: "sql_hive-3", args1: "", args2: "hive/testOnly * -- -n 
org.apache.spark.tags.SlowHiveTest"}
-        # Skip sql_hive-1 for Spark 4.0 due to 
https://github.com/apache/datafusion-comet/issues/2946
-        exclude:
-          - spark-version: {short: '4.0', full: '4.0.1', java: 17}
-            module: {name: "sql_hive-1", args1: "", args2: "hive/testOnly * -- 
-l org.apache.spark.tags.ExtendedHiveTest -l 
org.apache.spark.tags.SlowHiveTest"}
       fail-fast: false
     name: spark-sql-${{ matrix.module.name }}/${{ matrix.os }}/spark-${{ 
matrix.spark-version.full }}/java-${{ matrix.spark-version.java }}
     runs-on: ${{ matrix.os }}
diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala
 
b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala
index 927e30932..aa47dfa16 100644
--- 
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala
+++ 
b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala
@@ -58,7 +58,37 @@ class CometShuffleManager(conf: SparkConf) extends 
ShuffleManager with Logging {
    */
   private[this] val taskIdMapsForShuffle = new ConcurrentHashMap[Int, 
OpenHashSet[Long]]()
 
-  private lazy val shuffleExecutorComponents = 
loadShuffleExecutorComponents(conf)
+  // Lazy initialization to avoid accessing SparkEnv.get during ShuffleManager 
construction,
+  // which can cause hangs when SparkEnv is not fully initialized (e.g., 
during Hive metastore ops)
+  // This is only initialized when getWriter/getReader is called (during task 
execution),
+  // at which point SparkEnv should be fully available
+  @volatile private var _shuffleExecutorComponents: ShuffleExecutorComponents 
= _
+
+  private def shuffleExecutorComponents: ShuffleExecutorComponents = {
+    if (_shuffleExecutorComponents == null) {
+      synchronized {
+        if (_shuffleExecutorComponents == null) {
+          val executorComponents = 
ShuffleDataIOUtils.loadShuffleDataIO(conf).executor()
+          val extraConfigs =
+            
conf.getAllWithPrefix(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX).toMap
+          // SparkEnv.get should be available when getWriter/getReader is 
called
+          // (during task execution), but check for null to avoid hangs
+          val env = SparkEnv.get
+          if (env == null) {
+            throw new IllegalStateException(
+              "SparkEnv.get is null during shuffleExecutorComponents 
initialization. " +
+                "This may indicate a timing issue with SparkEnv 
initialization.")
+          }
+          executorComponents.initializeExecutor(
+            conf.getAppId,
+            env.executorId,
+            extraConfigs.asJava)
+          _shuffleExecutorComponents = executorComponents
+        }
+      }
+    }
+    _shuffleExecutorComponents
+  }
 
   override val shuffleBlockResolver: IndexShuffleBlockResolver = {
     // The patch versions of Spark 3.4 have different constructor signatures:
@@ -253,19 +283,6 @@ class CometShuffleManager(conf: SparkConf) extends 
ShuffleManager with Logging {
 
 object CometShuffleManager extends Logging {
 
-  /**
-   * Loads executor components for shuffle data IO.
-   */
-  private def loadShuffleExecutorComponents(conf: SparkConf): 
ShuffleExecutorComponents = {
-    val executorComponents = 
ShuffleDataIOUtils.loadShuffleDataIO(conf).executor()
-    val extraConfigs = 
conf.getAllWithPrefix(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX).toMap
-    executorComponents.initializeExecutor(
-      conf.getAppId,
-      SparkEnv.get.executorId,
-      extraConfigs.asJava)
-    executorComponents
-  }
-
   def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): 
Boolean = {
     // We cannot bypass sorting if we need to do map-side aggregation.
     if (dep.mapSideCombine) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to