This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 1065dd3d2 [VL] Fix inaccurate calculation of task slot number used by
s.g.s.c.b.v.IOThreads (#6071)
1065dd3d2 is described below
commit 1065dd3d2cd8733358fd8cad531be16190b0b12f
Author: Hongze Zhang <[email protected]>
AuthorDate: Fri Jun 14 13:34:27 2024 +0800
[VL] Fix inaccurate calculation of task slot number used by
s.g.s.c.b.v.IOThreads (#6071)
---
.../apache/gluten/utils/VeloxBloomFilterTest.java | 87 ++++++++++++----------
cpp/velox/compute/VeloxBackend.cc | 3 +
.../scala/org/apache/gluten/GlutenPlugin.scala | 5 +-
.../scala/org/apache/gluten/GlutenConfig.scala | 29 ++++++--
.../gluten/integration/action/Parameterized.scala | 2 +-
5 files changed, 78 insertions(+), 48 deletions(-)
diff --git
a/backends-velox/src/test/java/org/apache/gluten/utils/VeloxBloomFilterTest.java
b/backends-velox/src/test/java/org/apache/gluten/utils/VeloxBloomFilterTest.java
index ba349a4f0..fda4003dd 100644
---
a/backends-velox/src/test/java/org/apache/gluten/utils/VeloxBloomFilterTest.java
+++
b/backends-velox/src/test/java/org/apache/gluten/utils/VeloxBloomFilterTest.java
@@ -16,11 +16,13 @@
*/
package org.apache.gluten.utils;
+import org.apache.gluten.GlutenConfig;
import org.apache.gluten.backendsapi.ListenerApi;
import org.apache.gluten.backendsapi.velox.VeloxListenerApi;
import com.codahale.metrics.MetricRegistry;
import org.apache.spark.SparkConf;
+import org.apache.spark.SparkContext;
import org.apache.spark.api.plugin.PluginContext;
import org.apache.spark.resource.ResourceInformation;
import org.apache.spark.util.TaskResources$;
@@ -33,50 +35,13 @@ import org.junit.function.ThrowingRunnable;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.Collections;
import java.util.Map;
public class VeloxBloomFilterTest {
-
@BeforeClass
public static void setup() {
final ListenerApi api = new VeloxListenerApi();
- PluginContext pluginContext =
- new PluginContext() {
- @Override
- public MetricRegistry metricRegistry() {
- return null;
- }
-
- @Override
- public SparkConf conf() {
- return new SparkConf();
- }
-
- @Override
- public String executorID() {
- return "";
- }
-
- @Override
- public String hostname() {
- return "";
- }
-
- @Override
- public Map<String, ResourceInformation> resources() {
- return Collections.emptyMap();
- }
-
- @Override
- public void send(Object message) throws IOException {}
-
- @Override
- public Object ask(Object message) throws Exception {
- return null;
- }
- };
- api.onDriverStart(null, pluginContext);
+ api.onDriverStart(mockSparkContext(), mockPluginContext());
}
@Test
@@ -226,4 +191,50 @@ public class VeloxBloomFilterTest {
Assert.assertTrue(negativeFalsePositives > 0);
Assert.assertTrue(negativeFalsePositives < attemptCount);
}
+
+ private static SparkContext mockSparkContext() {
+ // Not yet implemented.
+ return null;
+ }
+
+ private static PluginContext mockPluginContext() {
+ return new PluginContext() {
+ @Override
+ public MetricRegistry metricRegistry() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public SparkConf conf() {
+ final SparkConf conf = new SparkConf();
+ conf.set(GlutenConfig.GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY(), "0");
+ return conf;
+ }
+
+ @Override
+ public String executorID() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String hostname() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Map<String, ResourceInformation> resources() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void send(Object message) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Object ask(Object message) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
}
diff --git a/cpp/velox/compute/VeloxBackend.cc
b/cpp/velox/compute/VeloxBackend.cc
index 10d1c7529..1ec587996 100644
--- a/cpp/velox/compute/VeloxBackend.cc
+++ b/cpp/velox/compute/VeloxBackend.cc
@@ -226,6 +226,9 @@ void VeloxBackend::initConnector() {
FLAGS_cache_prefetch_min_pct = backendConf_->get<int>(kCachePrefetchMinPct,
0);
auto ioThreads = backendConf_->get<int32_t>(kVeloxIOThreads,
kVeloxIOThreadsDefault);
+ GLUTEN_CHECK(
+ ioThreads >= 0,
+ kVeloxIOThreads + " was set to negative number " +
std::to_string(ioThreads) + ", this should not happen.");
if (ioThreads > 0) {
ioExecutor_ = std::make_unique<folly::IOThreadPoolExecutor>(ioThreads);
}
diff --git a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
index 7c601e48d..cafed66eb 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
@@ -162,8 +162,9 @@ private[gluten] class GlutenDriverPlugin extends
DriverPlugin with Logging {
// task slots
val taskSlots = SparkResourceUtil.getTaskSlots(conf)
+ conf.set(GlutenConfig.GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY,
taskSlots.toString)
- var onHeapSize: Long =
+ val onHeapSize: Long =
if (conf.contains(GlutenConfig.GLUTEN_ONHEAP_SIZE_KEY)) {
conf.getSizeAsBytes(GlutenConfig.GLUTEN_ONHEAP_SIZE_KEY)
} else {
@@ -175,7 +176,7 @@ private[gluten] class GlutenDriverPlugin extends
DriverPlugin with Logging {
// size. Otherwise, the off-heap size is set to the value specified by the
user (if any).
// Note that this means that we will IGNORE the off-heap size specified by
the user if the
// dynamic off-heap feature is enabled.
- var offHeapSize: Long =
+ val offHeapSize: Long =
if (conf.getBoolean(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED,
false)) {
// Since when dynamic off-heap sizing is enabled, we commingle on-heap
// and off-heap memory, we set the off-heap size to the usable on-heap
size. We will
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index 2376a1f39..13ad8e471 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -241,6 +241,12 @@ class GlutenConfig(conf: SQLConf) extends Logging {
def memoryIsolation: Boolean = conf.getConf(COLUMNAR_MEMORY_ISOLATION)
+ def numTaskSlotsPerExecutor: Int = {
+ val numSlots = conf.getConf(NUM_TASK_SLOTS_PER_EXECUTOR)
+ assert(numSlots > 0, s"Number of task slot not found. This should not
happen.")
+ numSlots
+ }
+
def offHeapMemorySize: Long = conf.getConf(COLUMNAR_OFFHEAP_SIZE_IN_BYTES)
def taskOffHeapMemorySize: Long =
conf.getConf(COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES)
@@ -271,7 +277,9 @@ class GlutenConfig(conf: SQLConf) extends Logging {
def veloxSsdODirectEnabled: Boolean =
conf.getConf(COLUMNAR_VELOX_SSD_ODIRECT_ENABLED)
- def veloxConnectorIOThreads: Integer =
conf.getConf(COLUMNAR_VELOX_CONNECTOR_IO_THREADS)
+ def veloxConnectorIOThreads: Int = {
+
conf.getConf(COLUMNAR_VELOX_CONNECTOR_IO_THREADS).getOrElse(numTaskSlotsPerExecutor)
+ }
def veloxSplitPreloadPerDriver: Integer =
conf.getConf(COLUMNAR_VELOX_SPLIT_PRELOAD_PER_DRIVER)
@@ -533,6 +541,7 @@ object GlutenConfig {
val GLUTEN_DEBUG_KEEP_JNI_WORKSPACE =
"spark.gluten.sql.debug.keepJniWorkspace"
// Added back to Spark Conf during executor initialization
+ val GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY =
"spark.gluten.numTaskSlotsPerExecutor"
val GLUTEN_OFFHEAP_SIZE_IN_BYTES_KEY =
"spark.gluten.memory.offHeap.size.in.bytes"
val GLUTEN_TASK_OFFHEAP_SIZE_IN_BYTES_KEY =
"spark.gluten.memory.task.offHeap.size.in.bytes"
val GLUTEN_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES_KEY =
@@ -678,7 +687,7 @@ object GlutenConfig {
(SPARK_S3_IAM_SESSION_NAME, ""),
(
COLUMNAR_VELOX_CONNECTOR_IO_THREADS.key,
- COLUMNAR_VELOX_CONNECTOR_IO_THREADS.defaultValueString),
+ conf.getOrElse(GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY, "-1")),
(COLUMNAR_SHUFFLE_CODEC.key, ""),
(COLUMNAR_SHUFFLE_CODEC_BACKEND.key, ""),
("spark.hadoop.input.connect.timeout", "180000"),
@@ -1165,6 +1174,16 @@ object GlutenConfig {
.stringConf
.createOptional
+ val NUM_TASK_SLOTS_PER_EXECUTOR =
+ buildConf(GlutenConfig.GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY)
+ .internal()
+ .doc(
+ "Must provide default value since non-execution operations " +
+ "(e.g. org.apache.spark.sql.Dataset#summary) doesn't propagate
configurations using " +
+ "org.apache.spark.sql.execution.SQLExecution#withSQLConfPropagated")
+ .intConf
+ .createWithDefaultString("-1")
+
val COLUMNAR_OFFHEAP_SIZE_IN_BYTES =
buildConf(GlutenConfig.GLUTEN_OFFHEAP_SIZE_IN_BYTES_KEY)
.internal()
@@ -1303,11 +1322,7 @@ object GlutenConfig {
.doc("The Size of the IO thread pool in the Connector. This thread pool
is used for split" +
" preloading and DirectBufferedInput.")
.intConf
- .createWithDefaultFunction(
- () =>
- SQLConf.get.getConfString("spark.executor.cores", "1").toInt /
SQLConf.get
- .getConfString("spark.task.cpus", "1")
- .toInt)
+ .createOptional
val COLUMNAR_VELOX_ASYNC_TIMEOUT =
buildStaticConf("spark.gluten.sql.columnar.backend.velox.asyncTimeoutOnTaskStopping")
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/Parameterized.scala
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/Parameterized.scala
index e2fc526ce..74f22a05f 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/Parameterized.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/Parameterized.scala
@@ -342,7 +342,7 @@ object Parameterized {
desc: String): Unit = {
println(s"Warming up: Running query: $id...")
try {
- val testDesc = "Gluten Spark %s %s warm up".format(desc, id)
+ val testDesc = "Gluten Spark %s [%s] Warm Up".format(desc, id)
sessionSwitcher.useSession("test", testDesc)
runner.createTables(creator, sessionSwitcher.spark())
val result = runner.runQuery(sessionSwitcher.spark(), testDesc, id,
explain = false)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]