PHILO-HE commented on code in PR #8931:
URL: https://github.com/apache/incubator-gluten/pull/8931#discussion_r2030721673
##########
backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala:
##########
@@ -95,6 +95,11 @@ object VeloxBackendSettings extends BackendSettingsApi {
val GLUTEN_VELOX_INTERNAL_UDF_LIB_PATHS = VeloxBackend.CONF_PREFIX +
".internal.udfLibraryPaths"
val GLUTEN_VELOX_UDF_ALLOW_TYPE_CONVERSION = VeloxBackend.CONF_PREFIX +
".udfAllowTypeConversion"
+ val GLUTEN_VELOX_BROADCAST_CACHE_EXPIRED_TIME: String =
+ VeloxBackend.CONF_PREFIX + ("broadcast.cache.expired.time")
+ // unit: SECONDS, default 1 day
+ val GLUTEN_VELOX_BROADCAST_CACHE_EXPIRED_TIME_DEFAULT: Int = 86400
Review Comment:
I note clickhouse has a similar config. Can we unify them into one and put
into GlutenConfig?
##########
backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala:
##########
@@ -125,9 +129,40 @@ case class BroadcastHashJoinExecTransformer(
override def columnarInputRDDs: Seq[RDD[ColumnarBatch]] = {
val streamedRDD = getColumnarInputRDDs(streamedPlan)
+ val executionId =
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
+ if (executionId != null) {
+ GlutenDriverEndpoint.collectResources(executionId, buildBroadcastTableId)
+ } else {
+ logWarning(
+ s"Can't not trace broadcast table data $buildBroadcastTableId" +
Review Comment:
typo. Use "Can not".
##########
cpp/velox/jni/JniHashTable.cc:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <arrow/c/abi.h>
+
+#include <jni/JniCommon.h>
+#include <iostream>
+#include "JniHashTable.h"
+#include "folly/String.h"
+#include "memory/ColumnarBatch.h"
+#include "memory/VeloxColumnarBatch.h"
+#include "substrait/algebra.pb.h"
+#include "substrait/type.pb.h"
+#include "velox/core/PlanNode.h"
+#include "velox/type/Type.h"
+
+namespace gluten {
+
+jstring charTojstring(JNIEnv* env, const char* pat) {
+ const jclass str_class = (env)->FindClass("Ljava/lang/String;");
+ const jmethodID ctor_id = (env)->GetMethodID(str_class, "<init>",
"([BLjava/lang/String;)V");
+ const jsize str_size = static_cast<jsize>(strlen(pat));
+ const jbyteArray bytes = (env)->NewByteArray(str_size);
+ (env)->SetByteArrayRegion(bytes, 0, str_size,
reinterpret_cast<jbyte*>(const_cast<char*>(pat)));
+ const jstring encoding = (env)->NewStringUTF("UTF-8");
+ const auto result = static_cast<jstring>((env)->NewObject(str_class,
ctor_id, bytes, encoding));
+ env->DeleteLocalRef(bytes);
+ env->DeleteLocalRef(encoding);
+ return result;
+}
+
+static jclass jniVeloxBroadcastBuildSideCache = nullptr;
+static jmethodID jniGet = nullptr;
+
+jlong callJavaGet(const std::string& id) {
+ JNIEnv* env;
+ if (vm->GetEnv(reinterpret_cast<void**>(&env), jniVersion) != JNI_OK) {
+ throw gluten::GlutenException("JNIEnv was not attached to current thread");
+ }
+
+ const jstring s = charTojstring(env, id.c_str());
Review Comment:
Seems we can simply create jstring by `env->NewStringUTF(id.c_str())`. Then,
`charTojstring` can be removed.
##########
backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.backendsapi.velox.VeloxBackendSettings
+import org.apache.gluten.vectorized.HashJoinBuilder
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.execution.ColumnarBuildSideRelation
+import org.apache.spark.sql.execution.joins.BuildSideRelation
+import org.apache.spark.sql.execution.unsafe.UnsafeColumnarBuildSideRelation
+
+import com.github.benmanes.caffeine.cache.{Cache, Caffeine, RemovalCause,
RemovalListener}
+
+import java.util.concurrent.TimeUnit
+
+case class BroadcastHashTable(pointer: Long, relation: BuildSideRelation)
+
+/**
+ * `VeloxBroadcastBuildSideCache` is used for controlling to build bhj hash
table once.
+ *
+ * The complicated part is due to reuse exchange, where multiple BHJ IDs
correspond to a
+ * `BuildSideRelation`.
+ */
+object VeloxBroadcastBuildSideCache
+ extends Logging
+ with RemovalListener[String, BroadcastHashTable] {
+
+ private lazy val expiredTime = SparkEnv.get.conf.getLong(
+ VeloxBackendSettings.GLUTEN_VELOX_BROADCAST_CACHE_EXPIRED_TIME,
+ VeloxBackendSettings.GLUTEN_VELOX_BROADCAST_CACHE_EXPIRED_TIME_DEFAULT
+ )
+
+ // Use for controlling to build bhj hash table once.
+ // key: hashtable id, value is hashtable backend pointer(long to string).
+ private val buildSideRelationCache: Cache[String, BroadcastHashTable] =
+ Caffeine.newBuilder
+ .expireAfterAccess(expiredTime, TimeUnit.SECONDS)
+ .removalListener(this)
+ .build[String, BroadcastHashTable]()
+
+ def getOrBuildBroadcastHashTable(
+ broadcast: Broadcast[BuildSideRelation],
+ broadCastContext: BroadCastHashJoinContext): BroadcastHashTable = {
+
+ buildSideRelationCache
+ .get(
+ broadCastContext.buildHashTableId,
+ (broadcast_id: String) => {
+ val (pointer, relation) = broadcast.value match {
+ case columnar: ColumnarBuildSideRelation =>
+ columnar.buildHashTable(broadCastContext)
+ case unsafe: UnsafeColumnarBuildSideRelation =>
+ unsafe.buildHashTable(broadCastContext)
+ }
+
+ logDebug(s"Create bhj $broadcast_id = 0x${pointer.toHexString}")
+ BroadcastHashTable(pointer, relation)
+ }
+ )
+ }
+
+ /** This is callback from c++ backend. */
+ def get(broadcastHashtableId: String): Long =
+ Option(buildSideRelationCache.getIfPresent(broadcastHashtableId))
+ .map(_.pointer)
+ .getOrElse(0)
+
+ def invalidateBroadcastHashtable(broadcastHashtableId: String): Unit = {
+ // Cleanup operations on the backend are idempotent.
+ buildSideRelationCache.invalidate(broadcastHashtableId)
+ }
+
+ /** Only used in UT. */
+ def size(): Long = buildSideRelationCache.estimatedSize()
+
+ def cleanAll(): Unit = buildSideRelationCache.invalidateAll()
+
+ override def onRemoval(key: String, value: BroadcastHashTable, cause:
RemovalCause): Unit = {
+ synchronized {
+ logDebug(s"Remove bhj $key = 0x${value.pointer.toHexString}")
+ if (value.relation != null) {
+ value.relation match {
+ case columnar: ColumnarBuildSideRelation =>
+ columnar.reset()
+ case unsafe: UnsafeColumnarBuildSideRelation =>
+ unsafe.reset()
+ }
+ }
+
+ HashJoinBuilder.clearHashTable(value.pointer)
+ }
+ }
+}
Review Comment:
Can we unify Velox's and Clickhouse's implementations to one?
##########
backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala:
##########
@@ -228,6 +245,9 @@ class VeloxListenerApi extends ListenerApi with Logging {
private def shutdown(): Unit = {
// TODO shutdown implementation in velox to release resources
+ if (!isMockBackend) {
+ VeloxBroadcastBuildSideCache.cleanAll()
+ }
Review Comment:
The check for mock backend in this file is only useful when running test.
Can we avoid such check logic in main code? Maybe, modify MockVeloxBackend.java
if helpful, e.g., assign arbitrary executor ID if the main code requires it.
##########
backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala:
##########
@@ -625,9 +629,108 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
child: SparkPlan,
numOutputRows: SQLMetric,
dataSize: SQLMetric): BuildSideRelation = {
+
+ val buildKeys = mode match {
+ case mode1: HashedRelationBroadcastMode =>
+ mode1.key
+ case _ =>
+ // IdentityBroadcastMode
+ Seq.empty
+ }
+ var offload = true
+ val (newChild, newOutput, newBuildKeys) =
+ if (VeloxConfig.get.enableBroadcastBuildOncePerExecutor) {
+ if (
+ buildKeys
+ .forall(
+ k =>
+ k.isInstanceOf[AttributeReference] ||
+ k.isInstanceOf[BoundReference])
+ ) {
+ (child, child.output, Seq.empty[Expression])
+ } else {
+ // pre projection in case of expression join keys
+ val appendedProjections = new ArrayBuffer[NamedExpression]()
+ val preProjectionBuildKeys = buildKeys.zipWithIndex.map {
+ case (e, idx) =>
+ e match {
+ case b: BoundReference => child.output(b.ordinal)
+ case o: Expression =>
Review Comment:
Do we need to add case AttributeReference which seems not requiring making
it an Alias?
##########
backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala:
##########
@@ -99,6 +100,9 @@ case class BroadcastHashJoinExecTransformer(
right,
isNullAwareAntiJoin) {
+ // Unique ID for builded table
Review Comment:
typo: built
##########
backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala:
##########
@@ -625,9 +629,108 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
child: SparkPlan,
numOutputRows: SQLMetric,
dataSize: SQLMetric): BuildSideRelation = {
+
+ val buildKeys = mode match {
+ case mode1: HashedRelationBroadcastMode =>
+ mode1.key
+ case _ =>
+ // IdentityBroadcastMode
+ Seq.empty
+ }
+ var offload = true
+ val (newChild, newOutput, newBuildKeys) =
+ if (VeloxConfig.get.enableBroadcastBuildOncePerExecutor) {
+ if (
+ buildKeys
+ .forall(
+ k =>
+ k.isInstanceOf[AttributeReference] ||
+ k.isInstanceOf[BoundReference])
+ ) {
+ (child, child.output, Seq.empty[Expression])
+ } else {
+ // pre projection in case of expression join keys
+ val appendedProjections = new ArrayBuffer[NamedExpression]()
+ val preProjectionBuildKeys = buildKeys.zipWithIndex.map {
+ case (e, idx) =>
+ e match {
+ case b: BoundReference => child.output(b.ordinal)
+ case o: Expression =>
+ val newExpr = Alias(o, "col_" + idx)()
+ appendedProjections += newExpr
+ newExpr
+ }
+ }
+
+ def wrapChild(child: SparkPlan): SparkPlan = {
+ val childWithAdapter =
+
ColumnarCollapseTransformStages.wrapInputIteratorTransformer(child)
+ val projectExecTransformer =
+ ProjectExecTransformer(child.output ++ appendedProjections,
childWithAdapter)
+ val validationResult = projectExecTransformer.doValidate()
+ if (validationResult.ok()) {
+ WholeStageTransformer(
+ ProjectExecTransformer(child.output ++ appendedProjections,
childWithAdapter))(
+
ColumnarCollapseTransformStages.transformStageCounter.incrementAndGet()
+ )
+ } else {
+ offload = false
+ child
+ }
+ }
+
+ val newChild = child match {
+ case wt: WholeStageTransformer =>
+ val projectTransformer =
+ ProjectExecTransformer(child.output ++ appendedProjections,
wt.child)
+ if (projectTransformer.doValidate().ok()) {
+ wt.withNewChildren(
+ Seq(ProjectExecTransformer(child.output ++
appendedProjections, wt.child)))
+
+ } else {
+ offload = false
+ child
+ }
+ case w: WholeStageCodegenExec =>
+ w.withNewChildren(Seq(ProjectExec(child.output ++
appendedProjections, w.child)))
+ case r: AQEShuffleReadExec if r.supportsColumnar =>
+ // when aqe is open
+ // TODO: remove this after pushdowning preprojection
+ wrapChild(r)
+ case r2c: RowToVeloxColumnarExec =>
+ wrapChild(r2c)
+ case union: ColumnarUnionExec =>
+ wrapChild(union)
+ case ordered: TakeOrderedAndProjectExecTransformer =>
+ wrapChild(ordered)
+ case a2v: ArrowColumnarToVeloxColumnarExec =>
+ wrapChild(a2v)
+ case other =>
+ offload = false
+ logWarning(
+ "Not supported operator" + other.nodeName +
Review Comment:
Nit: add a space.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]