This is an automated email from the ASF dual-hosted git repository.
changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new d3ccd4aea [GLUTEN-5979][CH] Fix CHListenerApi initialize twice on
spark local mode (#6037)
d3ccd4aea is described below
commit d3ccd4aea027455752d84f54bf1f3b589660bd4a
Author: Wenzheng Liu <[email protected]>
AuthorDate: Tue Jun 11 17:57:01 2024 +0800
[GLUTEN-5979][CH] Fix CHListenerApi initialize twice on spark local mode
(#6037)
---
.../backendsapi/clickhouse/CHListenerApi.scala | 6 +-
.../execution/GlutenClickHouseNativeLibSuite.scala | 79 ++++++++++++++++++++++
2 files changed, 84 insertions(+), 1 deletion(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
index 665fdba88..43e0627df 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
@@ -48,7 +48,11 @@ class CHListenerApi extends ListenerApi with Logging {
override def onExecutorStart(pc: PluginContext): Unit = {
GlutenExecutorEndpoint.executorEndpoint = new
GlutenExecutorEndpoint(pc.executorID, pc.conf)
- initialize(pc.conf, isDriver = false)
+ if (pc.conf().get("spark.master").startsWith("local")) {
+ logDebug("Skipping duplicate initializing clickhouse backend on spark
local mode")
+ } else {
+ initialize(pc.conf, isDriver = false)
+ }
}
override def onExecutorShutdown(): Unit = shutdown()
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseNativeLibSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseNativeLibSuite.scala
new file mode 100644
index 000000000..0221f06bd
--- /dev/null
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseNativeLibSuite.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.GlutenConfig
+import org.apache.gluten.exception.GlutenException
+import org.apache.gluten.utils.UTSystemParameters
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.plans.PlanTest
+
+class GlutenClickHouseNativeLibSuite extends PlanTest {
+
+ private def baseSparkConf: SparkConf = {
+ new SparkConf()
+ .set("spark.plugins", "org.apache.gluten.GlutenPlugin")
+ .set("spark.default.parallelism", "1")
+ .set("spark.memory.offHeap.enabled", "true")
+ .set("spark.memory.offHeap.size", "1024MB")
+ .set("spark.gluten.sql.enable.native.validation", "false")
+ }
+
+ test("test columnar lib path not exist") {
+ var spark: SparkSession = null
+ try {
+ spark = SparkSession
+ .builder()
+ .master("local[1]")
+ .config(baseSparkConf)
+ .config(GlutenConfig.GLUTEN_LIB_PATH, "path/not/exist/libch.so")
+ .getOrCreate()
+ spark.sql("select 1").show()
+ } catch {
+ case e: Exception =>
+ assert(e.isInstanceOf[GlutenException])
+ assert(
+ e.getMessage.contains(
+ "library at path: path/not/exist/libch.so is not a file or does
not exist"))
+ } finally {
+ if (spark != null) {
+ spark.stop()
+ }
+ }
+ }
+
+ test("test CHListenerApi initialize only once") {
+ var spark: SparkSession = null
+ try {
+ spark = SparkSession
+ .builder()
+ .master("local[1]")
+ .config(baseSparkConf)
+ .config(GlutenConfig.GLUTEN_LIB_PATH,
UTSystemParameters.clickHouseLibPath)
+ .config(GlutenConfig.GLUTEN_EXECUTOR_LIB_PATH,
"/path/not/exist/libch.so")
+ .getOrCreate()
+ spark.sql("select 1").show()
+ } finally {
+ if (spark != null) {
+ spark.stop()
+ }
+ }
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]