This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 1a0b96925 [CORE] Move driver/executor endpoint to CH backend (#5914)
1a0b96925 is described below

commit 1a0b969250de57cf7cb6266484f755064058ce0e
Author: Xiduo You <[email protected]>
AuthorDate: Fri May 31 09:28:14 2024 +0800

    [CORE] Move driver/executor endpoint to CH backend (#5914)
---
 .../gluten/backendsapi/clickhouse/CHBackend.scala  |  1 -
 .../backendsapi/clickhouse/CHBroadcastApi.scala    | 45 ----------------------
 .../backendsapi/clickhouse/CHListenerApi.scala     | 16 ++++++--
 .../backendsapi/clickhouse/CHTransformerApi.scala  |  5 +++
 .../execution/CHHashJoinExecTransformer.scala      | 20 ++++++++--
 .../listener/CHGlutenSQLAppStatusListener.scala    | 11 +++++-
 .../apache/spark/rpc/GlutenDriverEndpoint.scala    |  0
 .../apache/spark/rpc/GlutenExecutorEndpoint.scala  | 11 ++++--
 .../org/apache/spark/rpc/GlutenRpcConstants.scala  |  0
 .../org/apache/spark/rpc/GlutenRpcMessages.scala   |  0
 .../gluten/backendsapi/velox/VeloxBackend.scala    |  1 -
 .../backendsapi/velox/VeloxBroadcastApi.scala      | 32 ---------------
 .../backendsapi/velox/VeloxListenerApi.scala       | 12 +++---
 .../backendsapi/velox/VeloxSparkPlanExecApi.scala  |  2 +-
 .../ShuffledHashJoinExecTransformer.scala          |  9 ++++-
 ...oxBroadcastNestedLoopJoinExecTransformer.scala} | 13 +++++--
 .../apache/gluten/utils/VeloxBloomFilterTest.java  | 43 ++++++++++++++++++++-
 gluten-core/pom.xml                                |  4 --
 .../scala/org/apache/gluten/GlutenPlugin.scala     |  8 +---
 .../org/apache/gluten/backendsapi/Backend.scala    |  2 -
 .../gluten/backendsapi/BackendsApiManager.scala    |  4 --
 .../apache/gluten/backendsapi/BroadcastApi.scala   | 42 --------------------
 .../apache/gluten/backendsapi/ListenerApi.scala    |  7 ++--
 .../apache/gluten/backendsapi/TransformerApi.scala |  3 ++
 .../BroadcastNestedLoopJoinExecTransformer.scala   | 18 +--------
 .../gluten/execution/JoinExecTransformer.scala     | 16 +-------
 .../spark/listener/GlutenListenerFactory.scala     |  3 --
 .../org/apache/spark/sql/GlutenQueryTest.scala     |  4 +-
 28 files changed, 131 insertions(+), 201 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index e5f68a869..c79d0aaee 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -49,7 +49,6 @@ class CHBackend extends Backend {
   override def validatorApi(): ValidatorApi = new CHValidatorApi
   override def metricsApi(): MetricsApi = new CHMetricsApi
   override def listenerApi(): ListenerApi = new CHListenerApi
-  override def broadcastApi(): BroadcastApi = new CHBroadcastApi
   override def settings(): BackendSettingsApi = CHBackendSettings
 }
 
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBroadcastApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBroadcastApi.scala
deleted file mode 100644
index d70ba6b8d..000000000
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBroadcastApi.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gluten.backendsapi.clickhouse
-
-import org.apache.gluten.backendsapi.BroadcastApi
-import org.apache.gluten.execution.CHBroadcastBuildSideCache
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.rpc.GlutenDriverEndpoint
-
-class CHBroadcastApi extends BroadcastApi with Logging {
-  override def cleanExecutionBroadcastTable(
-      executionId: String,
-      broadcastTableIds: java.util.Set[String]): Unit = {
-    if (broadcastTableIds != null) {
-      broadcastTableIds.forEach(
-        resource_id => 
CHBroadcastBuildSideCache.invalidateBroadcastHashtable(resource_id))
-    }
-  }
-
-  override def collectExecutionBroadcastTableId(executionId: String, 
buildTableId: String): Unit = {
-    if (executionId != null) {
-      GlutenDriverEndpoint.collectResources(executionId, buildTableId)
-    } else {
-      logWarning(
-        s"Can't not trace broadcast hash table data $buildTableId" +
-          s" because execution id is null." +
-          s" Will clean up until expire time.")
-    }
-  }
-}
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
index 952812d68..665fdba88 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
@@ -23,9 +23,12 @@ import 
org.apache.gluten.execution.datasource.{GlutenOrcWriterInjects, GlutenPar
 import org.apache.gluten.expression.UDFMappings
 import org.apache.gluten.vectorized.{CHNativeExpressionEvaluator, JniLibLoader}
 
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.api.plugin.PluginContext
 import org.apache.spark.internal.Logging
+import org.apache.spark.listener.CHGlutenSQLAppStatusListener
 import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.rpc.{GlutenDriverEndpoint, GlutenExecutorEndpoint}
 import org.apache.spark.sql.execution.datasources.v1._
 import org.apache.spark.util.SparkDirectoryUtil
 
@@ -35,11 +38,18 @@ import java.util.TimeZone
 
 class CHListenerApi extends ListenerApi with Logging {
 
-  override def onDriverStart(conf: SparkConf): Unit = initialize(conf, 
isDriver = true)
+  override def onDriverStart(sc: SparkContext, pc: PluginContext): Unit = {
+    GlutenDriverEndpoint.glutenDriverEndpointRef = (new 
GlutenDriverEndpoint).self
+    CHGlutenSQLAppStatusListener.registerListener(sc)
+    initialize(pc.conf, isDriver = true)
+  }
 
   override def onDriverShutdown(): Unit = shutdown()
 
-  override def onExecutorStart(conf: SparkConf): Unit = initialize(conf, 
isDriver = false)
+  override def onExecutorStart(pc: PluginContext): Unit = {
+    GlutenExecutorEndpoint.executorEndpoint = new 
GlutenExecutorEndpoint(pc.executorID, pc.conf)
+    initialize(pc.conf, isDriver = false)
+  }
 
   override def onExecutorShutdown(): Unit = shutdown()
 
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala
index ea3398e77..0aab14b78 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala
@@ -23,6 +23,7 @@ import 
org.apache.gluten.substrait.expression.{BooleanLiteralNode, ExpressionBui
 import org.apache.gluten.utils.{CHInputPartitionsUtil, ExpressionDocUtil}
 
 import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.GlutenDriverEndpoint
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.connector.read.InputPartition
 import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
@@ -227,4 +228,8 @@ class CHTransformerApi extends TransformerApi with Logging {
   }
 
   override def packPBMessage(message: Message): Any = Any.pack(message)
+
+  override def invalidateSQLExecutionResource(executionId: String): Unit = {
+    GlutenDriverEndpoint.invalidateResourceRelation(executionId)
+  }
 }
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashJoinExecTransformer.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashJoinExecTransformer.scala
index 046f26043..c3ab89df5 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashJoinExecTransformer.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashJoinExecTransformer.scala
@@ -21,10 +21,12 @@ import org.apache.gluten.extension.ValidationResult
 import org.apache.gluten.utils.CHJoinValidateUtil
 
 import org.apache.spark.{broadcast, SparkContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.rpc.GlutenDriverEndpoint
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.optimizer.BuildSide
 import org.apache.spark.sql.catalyst.plans._
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
 import org.apache.spark.sql.execution.joins.BuildSideRelation
 import org.apache.spark.sql.vectorized.ColumnarBatch
 
@@ -116,10 +118,22 @@ case class CHBroadcastHashJoinExecTransformer(
     super.doValidateInternal()
   }
 
-  override protected def createBroadcastBuildSideRDD(): BroadcastBuildSideRDD 
= {
+  override def columnarInputRDDs: Seq[RDD[ColumnarBatch]] = {
+    val streamedRDD = getColumnarInputRDDs(streamedPlan)
+    val executionId = 
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
+    if (executionId != null) {
+      GlutenDriverEndpoint.collectResources(executionId, buildHashTableId)
+    } else {
+      logWarning(
+        s"Can't not trace broadcast hash table data $buildHashTableId" +
+          s" because execution id is null." +
+          s" Will clean up until expire time.")
+    }
     val broadcast = buildPlan.executeBroadcast[BuildSideRelation]()
     val context =
       BroadCastHashJoinContext(buildKeyExprs, joinType, buildPlan.output, 
buildHashTableId)
-    CHBroadcastBuildSideRDD(sparkContext, broadcast, context)
+    val broadcastRDD = CHBroadcastBuildSideRDD(sparkContext, broadcast, 
context)
+    // FIXME: Do we have to make build side a RDD?
+    streamedRDD :+ broadcastRDD
   }
 }
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/listener/GlutenSQLAppStatusListener.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/listener/CHGlutenSQLAppStatusListener.scala
similarity index 86%
rename from 
gluten-core/src/main/scala/org/apache/spark/listener/GlutenSQLAppStatusListener.scala
rename to 
backends-clickhouse/src/main/scala/org/apache/spark/listener/CHGlutenSQLAppStatusListener.scala
index 8c98d3044..7984fa846 100644
--- 
a/gluten-core/src/main/scala/org/apache/spark/listener/GlutenSQLAppStatusListener.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/listener/CHGlutenSQLAppStatusListener.scala
@@ -16,14 +16,15 @@
  */
 package org.apache.spark.listener
 
+import org.apache.spark.SparkContext
 import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{GlutenDriverEndpoint, RpcEndpointRef}
 import org.apache.spark.rpc.GlutenRpcMessages._
-import org.apache.spark.rpc.RpcEndpointRef
 import org.apache.spark.scheduler._
 import org.apache.spark.sql.execution.ui._
 
 /** Gluten SQL listener. Used for monitor sql on whole life cycle.Create and 
release resource. */
-class GlutenSQLAppStatusListener(val driverEndpointRef: RpcEndpointRef)
+class CHGlutenSQLAppStatusListener(val driverEndpointRef: RpcEndpointRef)
   extends SparkListener
   with Logging {
 
@@ -68,3 +69,9 @@ class GlutenSQLAppStatusListener(val driverEndpointRef: 
RpcEndpointRef)
     logTrace(s"Execution $executionId end.")
   }
 }
+object CHGlutenSQLAppStatusListener {
+  def registerListener(sc: SparkContext): Unit = {
+    sc.listenerBus.addToStatusQueue(
+      new 
CHGlutenSQLAppStatusListener(GlutenDriverEndpoint.glutenDriverEndpointRef))
+  }
+}
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/rpc/GlutenDriverEndpoint.scala 
b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenDriverEndpoint.scala
similarity index 100%
rename from 
gluten-core/src/main/scala/org/apache/spark/rpc/GlutenDriverEndpoint.scala
rename to 
backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenDriverEndpoint.scala
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala 
b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala
similarity index 89%
rename from 
gluten-core/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala
rename to 
backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala
index e48194232..f05933ef7 100644
--- 
a/gluten-core/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.rpc
 
-import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.execution.CHBroadcastBuildSideCache
 
 import org.apache.spark.{SparkConf, SparkEnv}
 import org.apache.spark.internal.{config, Logging}
@@ -60,10 +60,15 @@ class GlutenExecutorEndpoint(val executorId: String, val 
conf: SparkConf)
 
   override def receive: PartialFunction[Any, Unit] = {
     case GlutenCleanExecutionResource(executionId, hashIds) =>
-      BackendsApiManager.getBroadcastApiInstance
-        .cleanExecutionBroadcastTable(executionId, hashIds)
+      if (executionId != null) {
+        hashIds.forEach(
+          resource_id => 
CHBroadcastBuildSideCache.invalidateBroadcastHashtable(resource_id))
+      }
 
     case e =>
       logError(s"Received unexpected message. $e")
   }
 }
+object GlutenExecutorEndpoint {
+  var executorEndpoint: GlutenExecutorEndpoint = _
+}
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/rpc/GlutenRpcConstants.scala 
b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcConstants.scala
similarity index 100%
rename from 
gluten-core/src/main/scala/org/apache/spark/rpc/GlutenRpcConstants.scala
rename to 
backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcConstants.scala
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala 
b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala
similarity index 100%
rename from 
gluten-core/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala
rename to 
backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 7f928bd33..c8dbfb29e 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -51,7 +51,6 @@ class VeloxBackend extends Backend {
   override def validatorApi(): ValidatorApi = new VeloxValidatorApi
   override def metricsApi(): MetricsApi = new VeloxMetricsApi
   override def listenerApi(): ListenerApi = new VeloxListenerApi
-  override def broadcastApi(): BroadcastApi = new VeloxBroadcastApi
   override def settings(): BackendSettingsApi = VeloxBackendSettings
 }
 
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBroadcastApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBroadcastApi.scala
deleted file mode 100644
index bae3bb635..000000000
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBroadcastApi.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gluten.backendsapi.velox
-
-import org.apache.gluten.backendsapi.BroadcastApi
-
-import java.util
-
-class VeloxBroadcastApi extends BroadcastApi {
-
-  override def collectExecutionBroadcastTableId(executionId: String, 
buildTableId: String): Unit =
-    super.collectExecutionBroadcastTableId(executionId, buildTableId)
-
-  override def cleanExecutionBroadcastTable(
-      executionId: String,
-      broadcastTableIds: util.Set[String]): Unit =
-    super.cleanExecutionBroadcastTable(executionId, broadcastTableIds)
-}
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
index bbeb3a271..0b1d131d3 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
@@ -25,7 +25,8 @@ import org.apache.gluten.init.NativeBackendInitializer
 import org.apache.gluten.utils._
 import org.apache.gluten.vectorized.{JniLibLoader, JniWorkspace}
 
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.api.plugin.PluginContext
 import 
org.apache.spark.sql.execution.datasources.velox.{VeloxOrcWriterInjects, 
VeloxParquetWriterInjects, VeloxRowSplitter}
 import org.apache.spark.sql.expression.UDFResolver
 import org.apache.spark.sql.internal.{GlutenConfigUtil, StaticSQLConf}
@@ -38,7 +39,8 @@ import scala.sys.process._
 class VeloxListenerApi extends ListenerApi {
   private val ARROW_VERSION = "1500"
 
-  override def onDriverStart(conf: SparkConf): Unit = {
+  override def onDriverStart(sc: SparkContext, pc: PluginContext): Unit = {
+    val conf = pc.conf()
     // sql table cache serializer
     if (conf.getBoolean(GlutenConfig.COLUMNAR_TABLE_CACHE_ENABLED.key, 
defaultValue = false)) {
       conf.set(
@@ -51,9 +53,9 @@ class VeloxListenerApi extends ListenerApi {
 
   override def onDriverShutdown(): Unit = shutdown()
 
-  override def onExecutorStart(conf: SparkConf): Unit = {
-    UDFResolver.resolveUdfConf(conf, isDriver = false)
-    initialize(conf)
+  override def onExecutorStart(pc: PluginContext): Unit = {
+    UDFResolver.resolveUdfConf(pc.conf(), isDriver = false)
+    initialize(pc.conf())
   }
 
   override def onExecutorShutdown(): Unit = shutdown()
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index 58b27e8a7..16c11f111 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -473,7 +473,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
       buildSide: BuildSide,
       joinType: JoinType,
       condition: Option[Expression]): BroadcastNestedLoopJoinExecTransformer =
-    GlutenBroadcastNestedLoopJoinExecTransformer(
+    VeloxBroadcastNestedLoopJoinExecTransformer(
       left,
       right,
       buildSide,
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/ShuffledHashJoinExecTransformer.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/execution/ShuffledHashJoinExecTransformer.scala
index c9c60772f..002afea31 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/execution/ShuffledHashJoinExecTransformer.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/execution/ShuffledHashJoinExecTransformer.scala
@@ -16,12 +16,14 @@
  */
 package org.apache.gluten.execution
 
+import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, 
BuildSide}
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.execution.{FilterExec, SparkPlan}
 import org.apache.spark.sql.execution.aggregate.BaseAggregateExec
 import org.apache.spark.sql.execution.joins.BuildSideRelation
+import org.apache.spark.sql.vectorized.ColumnarBatch
 
 import io.substrait.proto.JoinRel
 
@@ -196,8 +198,11 @@ case class BroadcastHashJoinExecTransformer(
       newRight: SparkPlan): BroadcastHashJoinExecTransformer =
     copy(left = newLeft, right = newRight)
 
-  override protected def createBroadcastBuildSideRDD(): BroadcastBuildSideRDD 
= {
+  override def columnarInputRDDs: Seq[RDD[ColumnarBatch]] = {
+    val streamedRDD = getColumnarInputRDDs(streamedPlan)
     val broadcast = buildPlan.executeBroadcast[BuildSideRelation]()
-    VeloxBroadcastBuildSideRDD(sparkContext, broadcast)
+    val broadcastRDD = VeloxBroadcastBuildSideRDD(sparkContext, broadcast)
+    // FIXME: Do we have to make build side a RDD?
+    streamedRDD :+ broadcastRDD
   }
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/GlutenBroadcastNestedLoopJoinExecTransformer.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastNestedLoopJoinExecTransformer.scala
similarity index 76%
rename from 
backends-velox/src/main/scala/org/apache/gluten/execution/GlutenBroadcastNestedLoopJoinExecTransformer.scala
rename to 
backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastNestedLoopJoinExecTransformer.scala
index 3cde6b27b..851742269 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/execution/GlutenBroadcastNestedLoopJoinExecTransformer.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastNestedLoopJoinExecTransformer.scala
@@ -16,13 +16,15 @@
  */
 package org.apache.gluten.execution
 
+import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.optimizer.BuildSide
 import org.apache.spark.sql.catalyst.plans.JoinType
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.joins.BuildSideRelation
+import org.apache.spark.sql.vectorized.ColumnarBatch
 
-case class GlutenBroadcastNestedLoopJoinExecTransformer(
+case class VeloxBroadcastNestedLoopJoinExecTransformer(
     left: SparkPlan,
     right: SparkPlan,
     buildSide: BuildSide,
@@ -36,14 +38,17 @@ case class GlutenBroadcastNestedLoopJoinExecTransformer(
     condition
   ) {
 
-  override protected def createBroadcastBuildSideRDD(): BroadcastBuildSideRDD 
= {
+  override def columnarInputRDDs: Seq[RDD[ColumnarBatch]] = {
+    val streamedRDD = getColumnarInputRDDs(streamedPlan)
     val broadcast = buildPlan.executeBroadcast[BuildSideRelation]()
-    VeloxBroadcastBuildSideRDD(sparkContext, broadcast)
+    val broadcastRDD = VeloxBroadcastBuildSideRDD(sparkContext, broadcast)
+    // FIXME: Do we have to make build side a RDD?
+    streamedRDD :+ broadcastRDD
   }
 
   override protected def withNewChildrenInternal(
       newLeft: SparkPlan,
-      newRight: SparkPlan): GlutenBroadcastNestedLoopJoinExecTransformer =
+      newRight: SparkPlan): VeloxBroadcastNestedLoopJoinExecTransformer =
     copy(left = newLeft, right = newRight)
 
 }
diff --git 
a/backends-velox/src/test/java/org/apache/gluten/utils/VeloxBloomFilterTest.java
 
b/backends-velox/src/test/java/org/apache/gluten/utils/VeloxBloomFilterTest.java
index db54bd783..ba349a4f0 100644
--- 
a/backends-velox/src/test/java/org/apache/gluten/utils/VeloxBloomFilterTest.java
+++ 
b/backends-velox/src/test/java/org/apache/gluten/utils/VeloxBloomFilterTest.java
@@ -19,7 +19,10 @@ package org.apache.gluten.utils;
 import org.apache.gluten.backendsapi.ListenerApi;
 import org.apache.gluten.backendsapi.velox.VeloxListenerApi;
 
+import com.codahale.metrics.MetricRegistry;
 import org.apache.spark.SparkConf;
+import org.apache.spark.api.plugin.PluginContext;
+import org.apache.spark.resource.ResourceInformation;
 import org.apache.spark.util.TaskResources$;
 import org.apache.spark.util.sketch.BloomFilter;
 import org.apache.spark.util.sketch.IncompatibleMergeException;
@@ -28,14 +31,52 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.function.ThrowingRunnable;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Map;
 
 public class VeloxBloomFilterTest {
 
   @BeforeClass
   public static void setup() {
     final ListenerApi api = new VeloxListenerApi();
-    api.onDriverStart(new SparkConf());
+    PluginContext pluginContext =
+        new PluginContext() {
+          @Override
+          public MetricRegistry metricRegistry() {
+            return null;
+          }
+
+          @Override
+          public SparkConf conf() {
+            return new SparkConf();
+          }
+
+          @Override
+          public String executorID() {
+            return "";
+          }
+
+          @Override
+          public String hostname() {
+            return "";
+          }
+
+          @Override
+          public Map<String, ResourceInformation> resources() {
+            return Collections.emptyMap();
+          }
+
+          @Override
+          public void send(Object message) throws IOException {}
+
+          @Override
+          public Object ask(Object message) throws Exception {
+            return null;
+          }
+        };
+    api.onDriverStart(null, pluginContext);
   }
 
   @Test
diff --git a/gluten-core/pom.xml b/gluten-core/pom.xml
index 3934d535e..740de5928 100644
--- a/gluten-core/pom.xml
+++ b/gluten-core/pom.xml
@@ -24,10 +24,6 @@
       <artifactId>gluten-ui</artifactId>
       <version>${project.version}</version>
     </dependency>
-    <dependency>
-      <groupId>com.github.ben-manes.caffeine</groupId>
-      <artifactId>caffeine</artifactId>
-    </dependency>
     <dependency>
       <groupId>org.apache.gluten</groupId>
       <artifactId>${sparkshim.artifactId}</artifactId>
diff --git a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala 
b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
index 6c3d62c1e..7c601e48d 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
@@ -31,7 +31,6 @@ import org.apache.spark.api.plugin.{DriverPlugin, 
ExecutorPlugin, PluginContext,
 import org.apache.spark.internal.Logging
 import org.apache.spark.listener.GlutenListenerFactory
 import org.apache.spark.network.util.JavaUtils
-import org.apache.spark.rpc.{GlutenDriverEndpoint, GlutenExecutorEndpoint}
 import org.apache.spark.sql.SparkSessionExtensions
 import org.apache.spark.sql.execution.ui.GlutenEventUtils
 import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
@@ -73,8 +72,7 @@ private[gluten] class GlutenDriverPlugin extends DriverPlugin 
with Logging {
     }
     // Initialize Backends API
     BackendsApiManager.initialize()
-    BackendsApiManager.getListenerApiInstance.onDriverStart(conf)
-    GlutenDriverEndpoint.glutenDriverEndpointRef = (new 
GlutenDriverEndpoint).self
+    BackendsApiManager.getListenerApiInstance.onDriverStart(sc, pluginContext)
     GlutenListenerFactory.addToSparkListenerBus(sc)
     ExpressionMappings.expressionExtensionTransformer =
       ExpressionUtil.extendedExpressionTransformer(
@@ -257,7 +255,6 @@ private[gluten] class GlutenDriverPlugin extends 
DriverPlugin with Logging {
 }
 
 private[gluten] class GlutenExecutorPlugin extends ExecutorPlugin {
-  private var executorEndpoint: GlutenExecutorEndpoint = _
   private val taskListeners: Seq[TaskListener] = Array(TaskResources)
 
   /** Initialize the executor plugin. */
@@ -267,8 +264,7 @@ private[gluten] class GlutenExecutorPlugin extends 
ExecutorPlugin {
     // Initialize Backends API
     // TODO categorize the APIs by driver's or executor's
     BackendsApiManager.initialize()
-    BackendsApiManager.getListenerApiInstance.onExecutorStart(conf)
-    executorEndpoint = new GlutenExecutorEndpoint(ctx.executorID(), conf)
+    BackendsApiManager.getListenerApiInstance.onExecutorStart(ctx)
   }
 
   /** Clean up and terminate this plugin. For example: close the native 
engine. */
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/Backend.scala 
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/Backend.scala
index 6ad78e105..2c465ac61 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/Backend.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/Backend.scala
@@ -33,8 +33,6 @@ trait Backend {
 
   def listenerApi(): ListenerApi
 
-  def broadcastApi(): BroadcastApi
-
   def settings(): BackendSettingsApi
 }
 
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendsApiManager.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendsApiManager.scala
index 1d9690d17..f2c93d8c7 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendsApiManager.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendsApiManager.scala
@@ -83,10 +83,6 @@ object BackendsApiManager {
     backend.metricsApi()
   }
 
-  def getBroadcastApiInstance: BroadcastApi = {
-    backend.broadcastApi()
-  }
-
   def getSettings: BackendSettingsApi = {
     backend.settings
   }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BroadcastApi.scala 
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BroadcastApi.scala
deleted file mode 100644
index 8b8b0d649..000000000
--- 
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BroadcastApi.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gluten.backendsapi
-
-trait BroadcastApi {
-
-  /**
-   * Should call by driver. Collect Broadcast Hash Table Ids.
-   *
-   * @param executionId
-   *   execution id
-   * @param buildTableId
-   *   build table id
-   */
-  def collectExecutionBroadcastTableId(executionId: String, buildTableId: 
String): Unit = {}
-
-  /**
-   * Should call by executor. On execution end. Clean executor broadcast build 
hashtable.
-   *
-   * @param executionId
-   *   execution id
-   * @param broadcastTableIds
-   *   broadcast table ids
-   */
-  def cleanExecutionBroadcastTable(
-      executionId: String,
-      broadcastTableIds: java.util.Set[String]): Unit = {}
-}
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/ListenerApi.scala 
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/ListenerApi.scala
index aaba345fc..bad169b72 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/ListenerApi.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/ListenerApi.scala
@@ -16,11 +16,12 @@
  */
 package org.apache.gluten.backendsapi
 
-import org.apache.spark.SparkConf
+import org.apache.spark.SparkContext
+import org.apache.spark.api.plugin.PluginContext
 
 trait ListenerApi {
-  def onDriverStart(conf: SparkConf): Unit = {}
+  def onDriverStart(sc: SparkContext, pc: PluginContext): Unit = {}
   def onDriverShutdown(): Unit = {}
-  def onExecutorStart(conf: SparkConf): Unit = {}
+  def onExecutorStart(pc: PluginContext): Unit = {}
   def onExecutorShutdown(): Unit = {}
 }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/TransformerApi.scala 
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/TransformerApi.scala
index e41df0f2f..05a639ac2 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/TransformerApi.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/TransformerApi.scala
@@ -70,4 +70,7 @@ trait TransformerApi {
   def getNativePlanString(substraitPlan: Array[Byte], details: Boolean): String
 
   def packPBMessage(message: Message): Any
+
+  /** This method is only used for CH backend tests */
+  def invalidateSQLExecutionResource(executionId: String): Unit = {}
 }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala
index 2f666a811..092612ea7 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala
@@ -22,15 +22,13 @@ import org.apache.gluten.metrics.MetricsUpdater
 import org.apache.gluten.substrait.SubstraitContext
 import org.apache.gluten.utils.SubstraitUtil
 
-import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, 
BuildSide}
 import org.apache.spark.sql.catalyst.plans.{InnerLike, JoinType, LeftOuter, 
RightOuter}
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
-import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.joins.BaseJoinExec
 import org.apache.spark.sql.execution.metric.SQLMetric
-import org.apache.spark.sql.vectorized.ColumnarBatch
 
 import io.substrait.proto.CrossRel
 
@@ -66,20 +64,6 @@ abstract class BroadcastNestedLoopJoinExecTransformer(
     (right, left)
   }
 
-  override def columnarInputRDDs: Seq[RDD[ColumnarBatch]] = {
-    val streamedRDD = getColumnarInputRDDs(streamedPlan)
-    val broadcastRDD = {
-      val executionId = 
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
-      BackendsApiManager.getBroadcastApiInstance
-        .collectExecutionBroadcastTableId(executionId, buildTableId)
-      createBroadcastBuildSideRDD()
-    }
-    // FIXME: Do we have to make build side a RDD?
-    streamedRDD :+ broadcastRDD
-  }
-
-  protected def createBroadcastBuildSideRDD(): BroadcastBuildSideRDD
-
   @transient override lazy val metrics: Map[String, SQLMetric] =
     
BackendsApiManager.getMetricsApiInstance.genNestedLoopJoinTransformerMetrics(sparkContext)
 
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala
index 0414c95aa..6c707e5aa 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, 
BuildSide}
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.execution.{ExpandOutputPartitioningShim, 
SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.{ExpandOutputPartitioningShim, SparkPlan}
 import org.apache.spark.sql.execution.joins.{BaseJoinExec, 
HashedRelationBroadcastMode, HashJoin}
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.types._
@@ -414,18 +414,4 @@ abstract class BroadcastHashJoinExecTransformerBase(
   override def genJoinParametersInternal(): (Int, Int, String) = {
     (1, if (isNullAwareAntiJoin) 1 else 0, buildHashTableId)
   }
-
-  override def columnarInputRDDs: Seq[RDD[ColumnarBatch]] = {
-    val streamedRDD = getColumnarInputRDDs(streamedPlan)
-    val broadcastRDD = {
-      val executionId = 
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
-      BackendsApiManager.getBroadcastApiInstance
-        .collectExecutionBroadcastTableId(executionId, buildHashTableId)
-      createBroadcastBuildSideRDD()
-    }
-    // FIXME: Do we have to make build side a RDD?
-    streamedRDD :+ broadcastRDD
-  }
-
-  protected def createBroadcastBuildSideRDD(): BroadcastBuildSideRDD
 }
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/listener/GlutenListenerFactory.scala
 
b/gluten-core/src/main/scala/org/apache/spark/listener/GlutenListenerFactory.scala
index 9413941fe..721711af5 100644
--- 
a/gluten-core/src/main/scala/org/apache/spark/listener/GlutenListenerFactory.scala
+++ 
b/gluten-core/src/main/scala/org/apache/spark/listener/GlutenListenerFactory.scala
@@ -20,12 +20,9 @@ import org.apache.gluten.GlutenConfig
 import org.apache.gluten.softaffinity.scheduler.SoftAffinityListener
 
 import org.apache.spark.SparkContext
-import org.apache.spark.rpc.GlutenDriverEndpoint
 
 object GlutenListenerFactory {
   def addToSparkListenerBus(sc: SparkContext): Unit = {
-    sc.listenerBus.addToStatusQueue(
-      new 
GlutenSQLAppStatusListener(GlutenDriverEndpoint.glutenDriverEndpointRef))
     if (
       sc.getConf.getBoolean(
         GlutenConfig.GLUTEN_SOFT_AFFINITY_ENABLED,
diff --git 
a/gluten-core/src/test/scala/org/apache/spark/sql/GlutenQueryTest.scala 
b/gluten-core/src/test/scala/org/apache/spark/sql/GlutenQueryTest.scala
index 35afc731b..ab30cb14e 100644
--- a/gluten-core/src/test/scala/org/apache/spark/sql/GlutenQueryTest.scala
+++ b/gluten-core/src/test/scala/org/apache/spark/sql/GlutenQueryTest.scala
@@ -20,10 +20,10 @@ package org.apache.spark.sql
  * Why we need a GlutenQueryTest when we already have QueryTest?
  *   1. We need to modify the way org.apache.spark.sql.CHQueryTest#compare 
compares double
  */
+import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.sql.shims.SparkShimLoader
 
 import org.apache.spark.SPARK_VERSION_SHORT
-import org.apache.spark.rpc.GlutenDriverEndpoint
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -339,7 +339,7 @@ object GlutenQueryTest extends Assertions {
       SQLExecution.withExecutionId(df.sparkSession, executionId) {
         df.rdd.count() // Also attempt to deserialize as an RDD [SPARK-15791]
       }
-      GlutenDriverEndpoint.invalidateResourceRelation(executionId)
+      
BackendsApiManager.getTransformerApiInstance.invalidateSQLExecutionResource(executionId)
     }
 
     val sparkAnswer =


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to