This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 1065dd3d2 [VL] Fix inaccurate calculation of task slot number used by 
s.g.s.c.b.v.IOThreads (#6071)
1065dd3d2 is described below

commit 1065dd3d2cd8733358fd8cad531be16190b0b12f
Author: Hongze Zhang <[email protected]>
AuthorDate: Fri Jun 14 13:34:27 2024 +0800

    [VL] Fix inaccurate calculation of task slot number used by 
s.g.s.c.b.v.IOThreads (#6071)
---
 .../apache/gluten/utils/VeloxBloomFilterTest.java  | 87 ++++++++++++----------
 cpp/velox/compute/VeloxBackend.cc                  |  3 +
 .../scala/org/apache/gluten/GlutenPlugin.scala     |  5 +-
 .../scala/org/apache/gluten/GlutenConfig.scala     | 29 ++++++--
 .../gluten/integration/action/Parameterized.scala  |  2 +-
 5 files changed, 78 insertions(+), 48 deletions(-)

diff --git 
a/backends-velox/src/test/java/org/apache/gluten/utils/VeloxBloomFilterTest.java
 
b/backends-velox/src/test/java/org/apache/gluten/utils/VeloxBloomFilterTest.java
index ba349a4f0..fda4003dd 100644
--- 
a/backends-velox/src/test/java/org/apache/gluten/utils/VeloxBloomFilterTest.java
+++ 
b/backends-velox/src/test/java/org/apache/gluten/utils/VeloxBloomFilterTest.java
@@ -16,11 +16,13 @@
  */
 package org.apache.gluten.utils;
 
+import org.apache.gluten.GlutenConfig;
 import org.apache.gluten.backendsapi.ListenerApi;
 import org.apache.gluten.backendsapi.velox.VeloxListenerApi;
 
 import com.codahale.metrics.MetricRegistry;
 import org.apache.spark.SparkConf;
+import org.apache.spark.SparkContext;
 import org.apache.spark.api.plugin.PluginContext;
 import org.apache.spark.resource.ResourceInformation;
 import org.apache.spark.util.TaskResources$;
@@ -33,50 +35,13 @@ import org.junit.function.ThrowingRunnable;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Collections;
 import java.util.Map;
 
 public class VeloxBloomFilterTest {
-
   @BeforeClass
   public static void setup() {
     final ListenerApi api = new VeloxListenerApi();
-    PluginContext pluginContext =
-        new PluginContext() {
-          @Override
-          public MetricRegistry metricRegistry() {
-            return null;
-          }
-
-          @Override
-          public SparkConf conf() {
-            return new SparkConf();
-          }
-
-          @Override
-          public String executorID() {
-            return "";
-          }
-
-          @Override
-          public String hostname() {
-            return "";
-          }
-
-          @Override
-          public Map<String, ResourceInformation> resources() {
-            return Collections.emptyMap();
-          }
-
-          @Override
-          public void send(Object message) throws IOException {}
-
-          @Override
-          public Object ask(Object message) throws Exception {
-            return null;
-          }
-        };
-    api.onDriverStart(null, pluginContext);
+    api.onDriverStart(mockSparkContext(), mockPluginContext());
   }
 
   @Test
@@ -226,4 +191,50 @@ public class VeloxBloomFilterTest {
     Assert.assertTrue(negativeFalsePositives > 0);
     Assert.assertTrue(negativeFalsePositives < attemptCount);
   }
+
+  private static SparkContext mockSparkContext() {
+    // Not yet implemented.
+    return null;
+  }
+
+  private static PluginContext mockPluginContext() {
+    return new PluginContext() {
+      @Override
+      public MetricRegistry metricRegistry() {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public SparkConf conf() {
+        final SparkConf conf = new SparkConf();
+        conf.set(GlutenConfig.GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY(), "0");
+        return conf;
+      }
+
+      @Override
+      public String executorID() {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public String hostname() {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public Map<String, ResourceInformation> resources() {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public void send(Object message) throws IOException {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public Object ask(Object message) throws Exception {
+        throw new UnsupportedOperationException();
+      }
+    };
+  }
 }
diff --git a/cpp/velox/compute/VeloxBackend.cc 
b/cpp/velox/compute/VeloxBackend.cc
index 10d1c7529..1ec587996 100644
--- a/cpp/velox/compute/VeloxBackend.cc
+++ b/cpp/velox/compute/VeloxBackend.cc
@@ -226,6 +226,9 @@ void VeloxBackend::initConnector() {
   FLAGS_cache_prefetch_min_pct = backendConf_->get<int>(kCachePrefetchMinPct, 
0);
 
   auto ioThreads = backendConf_->get<int32_t>(kVeloxIOThreads, 
kVeloxIOThreadsDefault);
+  GLUTEN_CHECK(
+      ioThreads >= 0,
+      kVeloxIOThreads + " was set to negative number " + 
std::to_string(ioThreads) + ", this should not happen.");
   if (ioThreads > 0) {
     ioExecutor_ = std::make_unique<folly::IOThreadPoolExecutor>(ioThreads);
   }
diff --git a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala 
b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
index 7c601e48d..cafed66eb 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
@@ -162,8 +162,9 @@ private[gluten] class GlutenDriverPlugin extends 
DriverPlugin with Logging {
 
     // task slots
     val taskSlots = SparkResourceUtil.getTaskSlots(conf)
+    conf.set(GlutenConfig.GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY, 
taskSlots.toString)
 
-    var onHeapSize: Long =
+    val onHeapSize: Long =
       if (conf.contains(GlutenConfig.GLUTEN_ONHEAP_SIZE_KEY)) {
         conf.getSizeAsBytes(GlutenConfig.GLUTEN_ONHEAP_SIZE_KEY)
       } else {
@@ -175,7 +176,7 @@ private[gluten] class GlutenDriverPlugin extends 
DriverPlugin with Logging {
     // size. Otherwise, the off-heap size is set to the value specified by the 
user (if any).
     // Note that this means that we will IGNORE the off-heap size specified by 
the user if the
     // dynamic off-heap feature is enabled.
-    var offHeapSize: Long =
+    val offHeapSize: Long =
       if (conf.getBoolean(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED, 
false)) {
         // Since when dynamic off-heap sizing is enabled, we commingle on-heap
         // and off-heap memory, we set the off-heap size to the usable on-heap 
size. We will
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala 
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index 2376a1f39..13ad8e471 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -241,6 +241,12 @@ class GlutenConfig(conf: SQLConf) extends Logging {
 
   def memoryIsolation: Boolean = conf.getConf(COLUMNAR_MEMORY_ISOLATION)
 
+  def numTaskSlotsPerExecutor: Int = {
+    val numSlots = conf.getConf(NUM_TASK_SLOTS_PER_EXECUTOR)
+    assert(numSlots > 0, s"Number of task slot not found. This should not 
happen.")
+    numSlots
+  }
+
   def offHeapMemorySize: Long = conf.getConf(COLUMNAR_OFFHEAP_SIZE_IN_BYTES)
 
   def taskOffHeapMemorySize: Long = 
conf.getConf(COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES)
@@ -271,7 +277,9 @@ class GlutenConfig(conf: SQLConf) extends Logging {
 
   def veloxSsdODirectEnabled: Boolean = 
conf.getConf(COLUMNAR_VELOX_SSD_ODIRECT_ENABLED)
 
-  def veloxConnectorIOThreads: Integer = 
conf.getConf(COLUMNAR_VELOX_CONNECTOR_IO_THREADS)
+  def veloxConnectorIOThreads: Int = {
+    
conf.getConf(COLUMNAR_VELOX_CONNECTOR_IO_THREADS).getOrElse(numTaskSlotsPerExecutor)
+  }
 
   def veloxSplitPreloadPerDriver: Integer = 
conf.getConf(COLUMNAR_VELOX_SPLIT_PRELOAD_PER_DRIVER)
 
@@ -533,6 +541,7 @@ object GlutenConfig {
   val GLUTEN_DEBUG_KEEP_JNI_WORKSPACE = 
"spark.gluten.sql.debug.keepJniWorkspace"
 
   // Added back to Spark Conf during executor initialization
+  val GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY = 
"spark.gluten.numTaskSlotsPerExecutor"
   val GLUTEN_OFFHEAP_SIZE_IN_BYTES_KEY = 
"spark.gluten.memory.offHeap.size.in.bytes"
   val GLUTEN_TASK_OFFHEAP_SIZE_IN_BYTES_KEY = 
"spark.gluten.memory.task.offHeap.size.in.bytes"
   val GLUTEN_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES_KEY =
@@ -678,7 +687,7 @@ object GlutenConfig {
       (SPARK_S3_IAM_SESSION_NAME, ""),
       (
         COLUMNAR_VELOX_CONNECTOR_IO_THREADS.key,
-        COLUMNAR_VELOX_CONNECTOR_IO_THREADS.defaultValueString),
+        conf.getOrElse(GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY, "-1")),
       (COLUMNAR_SHUFFLE_CODEC.key, ""),
       (COLUMNAR_SHUFFLE_CODEC_BACKEND.key, ""),
       ("spark.hadoop.input.connect.timeout", "180000"),
@@ -1165,6 +1174,16 @@ object GlutenConfig {
       .stringConf
       .createOptional
 
+  val NUM_TASK_SLOTS_PER_EXECUTOR =
+    buildConf(GlutenConfig.GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY)
+      .internal()
+      .doc(
+        "Must provide default value since non-execution operations " +
+          "(e.g. org.apache.spark.sql.Dataset#summary) doesn't propagate 
configurations using " +
+          "org.apache.spark.sql.execution.SQLExecution#withSQLConfPropagated")
+      .intConf
+      .createWithDefaultString("-1")
+
   val COLUMNAR_OFFHEAP_SIZE_IN_BYTES =
     buildConf(GlutenConfig.GLUTEN_OFFHEAP_SIZE_IN_BYTES_KEY)
       .internal()
@@ -1303,11 +1322,7 @@ object GlutenConfig {
       .doc("The Size of the IO thread pool in the Connector. This thread pool 
is used for split" +
         " preloading and DirectBufferedInput.")
       .intConf
-      .createWithDefaultFunction(
-        () =>
-          SQLConf.get.getConfString("spark.executor.cores", "1").toInt / 
SQLConf.get
-            .getConfString("spark.task.cpus", "1")
-            .toInt)
+      .createOptional
 
   val COLUMNAR_VELOX_ASYNC_TIMEOUT =
     
buildStaticConf("spark.gluten.sql.columnar.backend.velox.asyncTimeoutOnTaskStopping")
diff --git 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/Parameterized.scala
 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/Parameterized.scala
index e2fc526ce..74f22a05f 100644
--- 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/Parameterized.scala
+++ 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/Parameterized.scala
@@ -342,7 +342,7 @@ object Parameterized {
       desc: String): Unit = {
     println(s"Warming up: Running query: $id...")
     try {
-      val testDesc = "Gluten Spark %s %s warm up".format(desc, id)
+      val testDesc = "Gluten Spark %s [%s] Warm Up".format(desc, id)
       sessionSwitcher.useSession("test", testDesc)
       runner.createTables(creator, sessionSwitcher.spark())
       val result = runner.runQuery(sessionSwitcher.spark(), testDesc, id, 
explain = false)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to