This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 1a0b96925 [CORE] Move driver/executor endpoint to CH backend (#5914)
1a0b96925 is described below
commit 1a0b969250de57cf7cb6266484f755064058ce0e
Author: Xiduo You <[email protected]>
AuthorDate: Fri May 31 09:28:14 2024 +0800
[CORE] Move driver/executor endpoint to CH backend (#5914)
---
.../gluten/backendsapi/clickhouse/CHBackend.scala | 1 -
.../backendsapi/clickhouse/CHBroadcastApi.scala | 45 ----------------------
.../backendsapi/clickhouse/CHListenerApi.scala | 16 ++++++--
.../backendsapi/clickhouse/CHTransformerApi.scala | 5 +++
.../execution/CHHashJoinExecTransformer.scala | 20 ++++++++--
.../listener/CHGlutenSQLAppStatusListener.scala | 11 +++++-
.../apache/spark/rpc/GlutenDriverEndpoint.scala | 0
.../apache/spark/rpc/GlutenExecutorEndpoint.scala | 11 ++++--
.../org/apache/spark/rpc/GlutenRpcConstants.scala | 0
.../org/apache/spark/rpc/GlutenRpcMessages.scala | 0
.../gluten/backendsapi/velox/VeloxBackend.scala | 1 -
.../backendsapi/velox/VeloxBroadcastApi.scala | 32 ---------------
.../backendsapi/velox/VeloxListenerApi.scala | 12 +++---
.../backendsapi/velox/VeloxSparkPlanExecApi.scala | 2 +-
.../ShuffledHashJoinExecTransformer.scala | 9 ++++-
...oxBroadcastNestedLoopJoinExecTransformer.scala} | 13 +++++--
.../apache/gluten/utils/VeloxBloomFilterTest.java | 43 ++++++++++++++++++++-
gluten-core/pom.xml | 4 --
.../scala/org/apache/gluten/GlutenPlugin.scala | 8 +---
.../org/apache/gluten/backendsapi/Backend.scala | 2 -
.../gluten/backendsapi/BackendsApiManager.scala | 4 --
.../apache/gluten/backendsapi/BroadcastApi.scala | 42 --------------------
.../apache/gluten/backendsapi/ListenerApi.scala | 7 ++--
.../apache/gluten/backendsapi/TransformerApi.scala | 3 ++
.../BroadcastNestedLoopJoinExecTransformer.scala | 18 +--------
.../gluten/execution/JoinExecTransformer.scala | 16 +-------
.../spark/listener/GlutenListenerFactory.scala | 3 --
.../org/apache/spark/sql/GlutenQueryTest.scala | 4 +-
28 files changed, 131 insertions(+), 201 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index e5f68a869..c79d0aaee 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -49,7 +49,6 @@ class CHBackend extends Backend {
override def validatorApi(): ValidatorApi = new CHValidatorApi
override def metricsApi(): MetricsApi = new CHMetricsApi
override def listenerApi(): ListenerApi = new CHListenerApi
- override def broadcastApi(): BroadcastApi = new CHBroadcastApi
override def settings(): BackendSettingsApi = CHBackendSettings
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBroadcastApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBroadcastApi.scala
deleted file mode 100644
index d70ba6b8d..000000000
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBroadcastApi.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gluten.backendsapi.clickhouse
-
-import org.apache.gluten.backendsapi.BroadcastApi
-import org.apache.gluten.execution.CHBroadcastBuildSideCache
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.rpc.GlutenDriverEndpoint
-
-class CHBroadcastApi extends BroadcastApi with Logging {
- override def cleanExecutionBroadcastTable(
- executionId: String,
- broadcastTableIds: java.util.Set[String]): Unit = {
- if (broadcastTableIds != null) {
- broadcastTableIds.forEach(
- resource_id =>
CHBroadcastBuildSideCache.invalidateBroadcastHashtable(resource_id))
- }
- }
-
- override def collectExecutionBroadcastTableId(executionId: String,
buildTableId: String): Unit = {
- if (executionId != null) {
- GlutenDriverEndpoint.collectResources(executionId, buildTableId)
- } else {
- logWarning(
- s"Can't not trace broadcast hash table data $buildTableId" +
- s" because execution id is null." +
- s" Will clean up until expire time.")
- }
- }
-}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
index 952812d68..665fdba88 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
@@ -23,9 +23,12 @@ import
org.apache.gluten.execution.datasource.{GlutenOrcWriterInjects, GlutenPar
import org.apache.gluten.expression.UDFMappings
import org.apache.gluten.vectorized.{CHNativeExpressionEvaluator, JniLibLoader}
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.api.plugin.PluginContext
import org.apache.spark.internal.Logging
+import org.apache.spark.listener.CHGlutenSQLAppStatusListener
import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.rpc.{GlutenDriverEndpoint, GlutenExecutorEndpoint}
import org.apache.spark.sql.execution.datasources.v1._
import org.apache.spark.util.SparkDirectoryUtil
@@ -35,11 +38,18 @@ import java.util.TimeZone
class CHListenerApi extends ListenerApi with Logging {
- override def onDriverStart(conf: SparkConf): Unit = initialize(conf,
isDriver = true)
+ override def onDriverStart(sc: SparkContext, pc: PluginContext): Unit = {
+ GlutenDriverEndpoint.glutenDriverEndpointRef = (new
GlutenDriverEndpoint).self
+ CHGlutenSQLAppStatusListener.registerListener(sc)
+ initialize(pc.conf, isDriver = true)
+ }
override def onDriverShutdown(): Unit = shutdown()
- override def onExecutorStart(conf: SparkConf): Unit = initialize(conf,
isDriver = false)
+ override def onExecutorStart(pc: PluginContext): Unit = {
+ GlutenExecutorEndpoint.executorEndpoint = new
GlutenExecutorEndpoint(pc.executorID, pc.conf)
+ initialize(pc.conf, isDriver = false)
+ }
override def onExecutorShutdown(): Unit = shutdown()
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala
index ea3398e77..0aab14b78 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala
@@ -23,6 +23,7 @@ import
org.apache.gluten.substrait.expression.{BooleanLiteralNode, ExpressionBui
import org.apache.gluten.utils.{CHInputPartitionsUtil, ExpressionDocUtil}
import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.GlutenDriverEndpoint
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
@@ -227,4 +228,8 @@ class CHTransformerApi extends TransformerApi with Logging {
}
override def packPBMessage(message: Message): Any = Any.pack(message)
+
+ override def invalidateSQLExecutionResource(executionId: String): Unit = {
+ GlutenDriverEndpoint.invalidateResourceRelation(executionId)
+ }
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashJoinExecTransformer.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashJoinExecTransformer.scala
index 046f26043..c3ab89df5 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashJoinExecTransformer.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashJoinExecTransformer.scala
@@ -21,10 +21,12 @@ import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.utils.CHJoinValidateUtil
import org.apache.spark.{broadcast, SparkContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.rpc.GlutenDriverEndpoint
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.optimizer.BuildSide
import org.apache.spark.sql.catalyst.plans._
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.joins.BuildSideRelation
import org.apache.spark.sql.vectorized.ColumnarBatch
@@ -116,10 +118,22 @@ case class CHBroadcastHashJoinExecTransformer(
super.doValidateInternal()
}
- override protected def createBroadcastBuildSideRDD(): BroadcastBuildSideRDD
= {
+ override def columnarInputRDDs: Seq[RDD[ColumnarBatch]] = {
+ val streamedRDD = getColumnarInputRDDs(streamedPlan)
+ val executionId =
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
+ if (executionId != null) {
+ GlutenDriverEndpoint.collectResources(executionId, buildHashTableId)
+ } else {
+ logWarning(
+ s"Can't not trace broadcast hash table data $buildHashTableId" +
+ s" because execution id is null." +
+ s" Will clean up until expire time.")
+ }
val broadcast = buildPlan.executeBroadcast[BuildSideRelation]()
val context =
BroadCastHashJoinContext(buildKeyExprs, joinType, buildPlan.output,
buildHashTableId)
- CHBroadcastBuildSideRDD(sparkContext, broadcast, context)
+ val broadcastRDD = CHBroadcastBuildSideRDD(sparkContext, broadcast,
context)
+ // FIXME: Do we have to make build side a RDD?
+ streamedRDD :+ broadcastRDD
}
}
diff --git
a/gluten-core/src/main/scala/org/apache/spark/listener/GlutenSQLAppStatusListener.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/listener/CHGlutenSQLAppStatusListener.scala
similarity index 86%
rename from
gluten-core/src/main/scala/org/apache/spark/listener/GlutenSQLAppStatusListener.scala
rename to
backends-clickhouse/src/main/scala/org/apache/spark/listener/CHGlutenSQLAppStatusListener.scala
index 8c98d3044..7984fa846 100644
---
a/gluten-core/src/main/scala/org/apache/spark/listener/GlutenSQLAppStatusListener.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/listener/CHGlutenSQLAppStatusListener.scala
@@ -16,14 +16,15 @@
*/
package org.apache.spark.listener
+import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{GlutenDriverEndpoint, RpcEndpointRef}
import org.apache.spark.rpc.GlutenRpcMessages._
-import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler._
import org.apache.spark.sql.execution.ui._
/** Gluten SQL listener. Used for monitor sql on whole life cycle.Create and
release resource. */
-class GlutenSQLAppStatusListener(val driverEndpointRef: RpcEndpointRef)
+class CHGlutenSQLAppStatusListener(val driverEndpointRef: RpcEndpointRef)
extends SparkListener
with Logging {
@@ -68,3 +69,9 @@ class GlutenSQLAppStatusListener(val driverEndpointRef:
RpcEndpointRef)
logTrace(s"Execution $executionId end.")
}
}
+object CHGlutenSQLAppStatusListener {
+ def registerListener(sc: SparkContext): Unit = {
+ sc.listenerBus.addToStatusQueue(
+ new
CHGlutenSQLAppStatusListener(GlutenDriverEndpoint.glutenDriverEndpointRef))
+ }
+}
diff --git
a/gluten-core/src/main/scala/org/apache/spark/rpc/GlutenDriverEndpoint.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenDriverEndpoint.scala
similarity index 100%
rename from
gluten-core/src/main/scala/org/apache/spark/rpc/GlutenDriverEndpoint.scala
rename to
backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenDriverEndpoint.scala
diff --git
a/gluten-core/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala
similarity index 89%
rename from
gluten-core/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala
rename to
backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala
index e48194232..f05933ef7 100644
---
a/gluten-core/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala
@@ -16,7 +16,7 @@
*/
package org.apache.spark.rpc
-import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.execution.CHBroadcastBuildSideCache
import org.apache.spark.{SparkConf, SparkEnv}
import org.apache.spark.internal.{config, Logging}
@@ -60,10 +60,15 @@ class GlutenExecutorEndpoint(val executorId: String, val
conf: SparkConf)
override def receive: PartialFunction[Any, Unit] = {
case GlutenCleanExecutionResource(executionId, hashIds) =>
- BackendsApiManager.getBroadcastApiInstance
- .cleanExecutionBroadcastTable(executionId, hashIds)
+ if (executionId != null) {
+ hashIds.forEach(
+ resource_id =>
CHBroadcastBuildSideCache.invalidateBroadcastHashtable(resource_id))
+ }
case e =>
logError(s"Received unexpected message. $e")
}
}
+object GlutenExecutorEndpoint {
+ var executorEndpoint: GlutenExecutorEndpoint = _
+}
diff --git
a/gluten-core/src/main/scala/org/apache/spark/rpc/GlutenRpcConstants.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcConstants.scala
similarity index 100%
rename from
gluten-core/src/main/scala/org/apache/spark/rpc/GlutenRpcConstants.scala
rename to
backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcConstants.scala
diff --git
a/gluten-core/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala
similarity index 100%
rename from
gluten-core/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala
rename to
backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 7f928bd33..c8dbfb29e 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -51,7 +51,6 @@ class VeloxBackend extends Backend {
override def validatorApi(): ValidatorApi = new VeloxValidatorApi
override def metricsApi(): MetricsApi = new VeloxMetricsApi
override def listenerApi(): ListenerApi = new VeloxListenerApi
- override def broadcastApi(): BroadcastApi = new VeloxBroadcastApi
override def settings(): BackendSettingsApi = VeloxBackendSettings
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBroadcastApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBroadcastApi.scala
deleted file mode 100644
index bae3bb635..000000000
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBroadcastApi.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gluten.backendsapi.velox
-
-import org.apache.gluten.backendsapi.BroadcastApi
-
-import java.util
-
-class VeloxBroadcastApi extends BroadcastApi {
-
- override def collectExecutionBroadcastTableId(executionId: String,
buildTableId: String): Unit =
- super.collectExecutionBroadcastTableId(executionId, buildTableId)
-
- override def cleanExecutionBroadcastTable(
- executionId: String,
- broadcastTableIds: util.Set[String]): Unit =
- super.cleanExecutionBroadcastTable(executionId, broadcastTableIds)
-}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
index bbeb3a271..0b1d131d3 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
@@ -25,7 +25,8 @@ import org.apache.gluten.init.NativeBackendInitializer
import org.apache.gluten.utils._
import org.apache.gluten.vectorized.{JniLibLoader, JniWorkspace}
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.api.plugin.PluginContext
import
org.apache.spark.sql.execution.datasources.velox.{VeloxOrcWriterInjects,
VeloxParquetWriterInjects, VeloxRowSplitter}
import org.apache.spark.sql.expression.UDFResolver
import org.apache.spark.sql.internal.{GlutenConfigUtil, StaticSQLConf}
@@ -38,7 +39,8 @@ import scala.sys.process._
class VeloxListenerApi extends ListenerApi {
private val ARROW_VERSION = "1500"
- override def onDriverStart(conf: SparkConf): Unit = {
+ override def onDriverStart(sc: SparkContext, pc: PluginContext): Unit = {
+ val conf = pc.conf()
// sql table cache serializer
if (conf.getBoolean(GlutenConfig.COLUMNAR_TABLE_CACHE_ENABLED.key,
defaultValue = false)) {
conf.set(
@@ -51,9 +53,9 @@ class VeloxListenerApi extends ListenerApi {
override def onDriverShutdown(): Unit = shutdown()
- override def onExecutorStart(conf: SparkConf): Unit = {
- UDFResolver.resolveUdfConf(conf, isDriver = false)
- initialize(conf)
+ override def onExecutorStart(pc: PluginContext): Unit = {
+ UDFResolver.resolveUdfConf(pc.conf(), isDriver = false)
+ initialize(pc.conf())
}
override def onExecutorShutdown(): Unit = shutdown()
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index 58b27e8a7..16c11f111 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -473,7 +473,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
buildSide: BuildSide,
joinType: JoinType,
condition: Option[Expression]): BroadcastNestedLoopJoinExecTransformer =
- GlutenBroadcastNestedLoopJoinExecTransformer(
+ VeloxBroadcastNestedLoopJoinExecTransformer(
left,
right,
buildSide,
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/ShuffledHashJoinExecTransformer.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/ShuffledHashJoinExecTransformer.scala
index c9c60772f..002afea31 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/ShuffledHashJoinExecTransformer.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/ShuffledHashJoinExecTransformer.scala
@@ -16,12 +16,14 @@
*/
package org.apache.gluten.execution
+import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight,
BuildSide}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.execution.{FilterExec, SparkPlan}
import org.apache.spark.sql.execution.aggregate.BaseAggregateExec
import org.apache.spark.sql.execution.joins.BuildSideRelation
+import org.apache.spark.sql.vectorized.ColumnarBatch
import io.substrait.proto.JoinRel
@@ -196,8 +198,11 @@ case class BroadcastHashJoinExecTransformer(
newRight: SparkPlan): BroadcastHashJoinExecTransformer =
copy(left = newLeft, right = newRight)
- override protected def createBroadcastBuildSideRDD(): BroadcastBuildSideRDD
= {
+ override def columnarInputRDDs: Seq[RDD[ColumnarBatch]] = {
+ val streamedRDD = getColumnarInputRDDs(streamedPlan)
val broadcast = buildPlan.executeBroadcast[BuildSideRelation]()
- VeloxBroadcastBuildSideRDD(sparkContext, broadcast)
+ val broadcastRDD = VeloxBroadcastBuildSideRDD(sparkContext, broadcast)
+ // FIXME: Do we have to make build side a RDD?
+ streamedRDD :+ broadcastRDD
}
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/GlutenBroadcastNestedLoopJoinExecTransformer.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastNestedLoopJoinExecTransformer.scala
similarity index 76%
rename from
backends-velox/src/main/scala/org/apache/gluten/execution/GlutenBroadcastNestedLoopJoinExecTransformer.scala
rename to
backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastNestedLoopJoinExecTransformer.scala
index 3cde6b27b..851742269 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/GlutenBroadcastNestedLoopJoinExecTransformer.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastNestedLoopJoinExecTransformer.scala
@@ -16,13 +16,15 @@
*/
package org.apache.gluten.execution
+import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.optimizer.BuildSide
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.joins.BuildSideRelation
+import org.apache.spark.sql.vectorized.ColumnarBatch
-case class GlutenBroadcastNestedLoopJoinExecTransformer(
+case class VeloxBroadcastNestedLoopJoinExecTransformer(
left: SparkPlan,
right: SparkPlan,
buildSide: BuildSide,
@@ -36,14 +38,17 @@ case class GlutenBroadcastNestedLoopJoinExecTransformer(
condition
) {
- override protected def createBroadcastBuildSideRDD(): BroadcastBuildSideRDD
= {
+ override def columnarInputRDDs: Seq[RDD[ColumnarBatch]] = {
+ val streamedRDD = getColumnarInputRDDs(streamedPlan)
val broadcast = buildPlan.executeBroadcast[BuildSideRelation]()
- VeloxBroadcastBuildSideRDD(sparkContext, broadcast)
+ val broadcastRDD = VeloxBroadcastBuildSideRDD(sparkContext, broadcast)
+ // FIXME: Do we have to make build side a RDD?
+ streamedRDD :+ broadcastRDD
}
override protected def withNewChildrenInternal(
newLeft: SparkPlan,
- newRight: SparkPlan): GlutenBroadcastNestedLoopJoinExecTransformer =
+ newRight: SparkPlan): VeloxBroadcastNestedLoopJoinExecTransformer =
copy(left = newLeft, right = newRight)
}
diff --git
a/backends-velox/src/test/java/org/apache/gluten/utils/VeloxBloomFilterTest.java
b/backends-velox/src/test/java/org/apache/gluten/utils/VeloxBloomFilterTest.java
index db54bd783..ba349a4f0 100644
---
a/backends-velox/src/test/java/org/apache/gluten/utils/VeloxBloomFilterTest.java
+++
b/backends-velox/src/test/java/org/apache/gluten/utils/VeloxBloomFilterTest.java
@@ -19,7 +19,10 @@ package org.apache.gluten.utils;
import org.apache.gluten.backendsapi.ListenerApi;
import org.apache.gluten.backendsapi.velox.VeloxListenerApi;
+import com.codahale.metrics.MetricRegistry;
import org.apache.spark.SparkConf;
+import org.apache.spark.api.plugin.PluginContext;
+import org.apache.spark.resource.ResourceInformation;
import org.apache.spark.util.TaskResources$;
import org.apache.spark.util.sketch.BloomFilter;
import org.apache.spark.util.sketch.IncompatibleMergeException;
@@ -28,14 +31,52 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.function.ThrowingRunnable;
+import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Map;
public class VeloxBloomFilterTest {
@BeforeClass
public static void setup() {
final ListenerApi api = new VeloxListenerApi();
- api.onDriverStart(new SparkConf());
+ PluginContext pluginContext =
+ new PluginContext() {
+ @Override
+ public MetricRegistry metricRegistry() {
+ return null;
+ }
+
+ @Override
+ public SparkConf conf() {
+ return new SparkConf();
+ }
+
+ @Override
+ public String executorID() {
+ return "";
+ }
+
+ @Override
+ public String hostname() {
+ return "";
+ }
+
+ @Override
+ public Map<String, ResourceInformation> resources() {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public void send(Object message) throws IOException {}
+
+ @Override
+ public Object ask(Object message) throws Exception {
+ return null;
+ }
+ };
+ api.onDriverStart(null, pluginContext);
}
@Test
diff --git a/gluten-core/pom.xml b/gluten-core/pom.xml
index 3934d535e..740de5928 100644
--- a/gluten-core/pom.xml
+++ b/gluten-core/pom.xml
@@ -24,10 +24,6 @@
<artifactId>gluten-ui</artifactId>
<version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>com.github.ben-manes.caffeine</groupId>
- <artifactId>caffeine</artifactId>
- </dependency>
<dependency>
<groupId>org.apache.gluten</groupId>
<artifactId>${sparkshim.artifactId}</artifactId>
diff --git a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
index 6c3d62c1e..7c601e48d 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
@@ -31,7 +31,6 @@ import org.apache.spark.api.plugin.{DriverPlugin,
ExecutorPlugin, PluginContext,
import org.apache.spark.internal.Logging
import org.apache.spark.listener.GlutenListenerFactory
import org.apache.spark.network.util.JavaUtils
-import org.apache.spark.rpc.{GlutenDriverEndpoint, GlutenExecutorEndpoint}
import org.apache.spark.sql.SparkSessionExtensions
import org.apache.spark.sql.execution.ui.GlutenEventUtils
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
@@ -73,8 +72,7 @@ private[gluten] class GlutenDriverPlugin extends DriverPlugin
with Logging {
}
// Initialize Backends API
BackendsApiManager.initialize()
- BackendsApiManager.getListenerApiInstance.onDriverStart(conf)
- GlutenDriverEndpoint.glutenDriverEndpointRef = (new
GlutenDriverEndpoint).self
+ BackendsApiManager.getListenerApiInstance.onDriverStart(sc, pluginContext)
GlutenListenerFactory.addToSparkListenerBus(sc)
ExpressionMappings.expressionExtensionTransformer =
ExpressionUtil.extendedExpressionTransformer(
@@ -257,7 +255,6 @@ private[gluten] class GlutenDriverPlugin extends
DriverPlugin with Logging {
}
private[gluten] class GlutenExecutorPlugin extends ExecutorPlugin {
- private var executorEndpoint: GlutenExecutorEndpoint = _
private val taskListeners: Seq[TaskListener] = Array(TaskResources)
/** Initialize the executor plugin. */
@@ -267,8 +264,7 @@ private[gluten] class GlutenExecutorPlugin extends
ExecutorPlugin {
// Initialize Backends API
// TODO categorize the APIs by driver's or executor's
BackendsApiManager.initialize()
- BackendsApiManager.getListenerApiInstance.onExecutorStart(conf)
- executorEndpoint = new GlutenExecutorEndpoint(ctx.executorID(), conf)
+ BackendsApiManager.getListenerApiInstance.onExecutorStart(ctx)
}
/** Clean up and terminate this plugin. For example: close the native
engine. */
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/Backend.scala
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/Backend.scala
index 6ad78e105..2c465ac61 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/Backend.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/Backend.scala
@@ -33,8 +33,6 @@ trait Backend {
def listenerApi(): ListenerApi
- def broadcastApi(): BroadcastApi
-
def settings(): BackendSettingsApi
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendsApiManager.scala
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendsApiManager.scala
index 1d9690d17..f2c93d8c7 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendsApiManager.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendsApiManager.scala
@@ -83,10 +83,6 @@ object BackendsApiManager {
backend.metricsApi()
}
- def getBroadcastApiInstance: BroadcastApi = {
- backend.broadcastApi()
- }
-
def getSettings: BackendSettingsApi = {
backend.settings
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BroadcastApi.scala
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BroadcastApi.scala
deleted file mode 100644
index 8b8b0d649..000000000
---
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BroadcastApi.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gluten.backendsapi
-
-trait BroadcastApi {
-
- /**
- * Should call by driver. Collect Broadcast Hash Table Ids.
- *
- * @param executionId
- * execution id
- * @param buildTableId
- * build table id
- */
- def collectExecutionBroadcastTableId(executionId: String, buildTableId:
String): Unit = {}
-
- /**
- * Should call by executor. On execution end. Clean executor broadcast build
hashtable.
- *
- * @param executionId
- * execution id
- * @param broadcastTableIds
- * broadcast table ids
- */
- def cleanExecutionBroadcastTable(
- executionId: String,
- broadcastTableIds: java.util.Set[String]): Unit = {}
-}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/ListenerApi.scala
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/ListenerApi.scala
index aaba345fc..bad169b72 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/ListenerApi.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/ListenerApi.scala
@@ -16,11 +16,12 @@
*/
package org.apache.gluten.backendsapi
-import org.apache.spark.SparkConf
+import org.apache.spark.SparkContext
+import org.apache.spark.api.plugin.PluginContext
trait ListenerApi {
- def onDriverStart(conf: SparkConf): Unit = {}
+ def onDriverStart(sc: SparkContext, pc: PluginContext): Unit = {}
def onDriverShutdown(): Unit = {}
- def onExecutorStart(conf: SparkConf): Unit = {}
+ def onExecutorStart(pc: PluginContext): Unit = {}
def onExecutorShutdown(): Unit = {}
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/TransformerApi.scala
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/TransformerApi.scala
index e41df0f2f..05a639ac2 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/TransformerApi.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/TransformerApi.scala
@@ -70,4 +70,7 @@ trait TransformerApi {
def getNativePlanString(substraitPlan: Array[Byte], details: Boolean): String
def packPBMessage(message: Message): Any
+
+ /** This method is only used for CH backend tests */
+ def invalidateSQLExecutionResource(executionId: String): Unit = {}
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala
index 2f666a811..092612ea7 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala
@@ -22,15 +22,13 @@ import org.apache.gluten.metrics.MetricsUpdater
import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.utils.SubstraitUtil
-import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight,
BuildSide}
import org.apache.spark.sql.catalyst.plans.{InnerLike, JoinType, LeftOuter,
RightOuter}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
-import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.joins.BaseJoinExec
import org.apache.spark.sql.execution.metric.SQLMetric
-import org.apache.spark.sql.vectorized.ColumnarBatch
import io.substrait.proto.CrossRel
@@ -66,20 +64,6 @@ abstract class BroadcastNestedLoopJoinExecTransformer(
(right, left)
}
- override def columnarInputRDDs: Seq[RDD[ColumnarBatch]] = {
- val streamedRDD = getColumnarInputRDDs(streamedPlan)
- val broadcastRDD = {
- val executionId =
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
- BackendsApiManager.getBroadcastApiInstance
- .collectExecutionBroadcastTableId(executionId, buildTableId)
- createBroadcastBuildSideRDD()
- }
- // FIXME: Do we have to make build side a RDD?
- streamedRDD :+ broadcastRDD
- }
-
- protected def createBroadcastBuildSideRDD(): BroadcastBuildSideRDD
-
@transient override lazy val metrics: Map[String, SQLMetric] =
BackendsApiManager.getMetricsApiInstance.genNestedLoopJoinTransformerMetrics(sparkContext)
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala
index 0414c95aa..6c707e5aa 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight,
BuildSide}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.execution.{ExpandOutputPartitioningShim,
SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.{ExpandOutputPartitioningShim, SparkPlan}
import org.apache.spark.sql.execution.joins.{BaseJoinExec,
HashedRelationBroadcastMode, HashJoin}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types._
@@ -414,18 +414,4 @@ abstract class BroadcastHashJoinExecTransformerBase(
override def genJoinParametersInternal(): (Int, Int, String) = {
(1, if (isNullAwareAntiJoin) 1 else 0, buildHashTableId)
}
-
- override def columnarInputRDDs: Seq[RDD[ColumnarBatch]] = {
- val streamedRDD = getColumnarInputRDDs(streamedPlan)
- val broadcastRDD = {
- val executionId =
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
- BackendsApiManager.getBroadcastApiInstance
- .collectExecutionBroadcastTableId(executionId, buildHashTableId)
- createBroadcastBuildSideRDD()
- }
- // FIXME: Do we have to make build side a RDD?
- streamedRDD :+ broadcastRDD
- }
-
- protected def createBroadcastBuildSideRDD(): BroadcastBuildSideRDD
}
diff --git
a/gluten-core/src/main/scala/org/apache/spark/listener/GlutenListenerFactory.scala
b/gluten-core/src/main/scala/org/apache/spark/listener/GlutenListenerFactory.scala
index 9413941fe..721711af5 100644
---
a/gluten-core/src/main/scala/org/apache/spark/listener/GlutenListenerFactory.scala
+++
b/gluten-core/src/main/scala/org/apache/spark/listener/GlutenListenerFactory.scala
@@ -20,12 +20,9 @@ import org.apache.gluten.GlutenConfig
import org.apache.gluten.softaffinity.scheduler.SoftAffinityListener
import org.apache.spark.SparkContext
-import org.apache.spark.rpc.GlutenDriverEndpoint
object GlutenListenerFactory {
def addToSparkListenerBus(sc: SparkContext): Unit = {
- sc.listenerBus.addToStatusQueue(
- new
GlutenSQLAppStatusListener(GlutenDriverEndpoint.glutenDriverEndpointRef))
if (
sc.getConf.getBoolean(
GlutenConfig.GLUTEN_SOFT_AFFINITY_ENABLED,
diff --git
a/gluten-core/src/test/scala/org/apache/spark/sql/GlutenQueryTest.scala
b/gluten-core/src/test/scala/org/apache/spark/sql/GlutenQueryTest.scala
index 35afc731b..ab30cb14e 100644
--- a/gluten-core/src/test/scala/org/apache/spark/sql/GlutenQueryTest.scala
+++ b/gluten-core/src/test/scala/org/apache/spark/sql/GlutenQueryTest.scala
@@ -20,10 +20,10 @@ package org.apache.spark.sql
* Why we need a GlutenQueryTest when we already have QueryTest?
* 1. We need to modify the way org.apache.spark.sql.CHQueryTest#compare
compares double
*/
+import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.spark.SPARK_VERSION_SHORT
-import org.apache.spark.rpc.GlutenDriverEndpoint
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
@@ -339,7 +339,7 @@ object GlutenQueryTest extends Assertions {
SQLExecution.withExecutionId(df.sparkSession, executionId) {
df.rdd.count() // Also attempt to deserialize as an RDD [SPARK-15791]
}
- GlutenDriverEndpoint.invalidateResourceRelation(executionId)
+
BackendsApiManager.getTransformerApiInstance.invalidateSQLExecutionResource(executionId)
}
val sparkAnswer =
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]