This is an automated email from the ASF dual-hosted git repository.

changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 5096d668e [GLUTEN-6288][CH] Support BroadcastNestedLoopJoinExe[Part 
one] (#6290)
5096d668e is described below

commit 5096d668e263e2d90e95851b9a37fafc6713672c
Author: Shuai li <[email protected]>
AuthorDate: Thu Jul 11 09:56:36 2024 +0800

    [GLUTEN-6288][CH] Support BroadcastNestedLoopJoinExe[Part one] (#6290)
    
    * add bnlj
    
    * fix metric
    
    fix ci
    
    fix velox ci
    
    tag
    
    fix rebase
    
    all fallback
    
    fix style
    
    * fix ci
    
    fix ci
    
    fix ci 2
    
    fix velox error
    
    * fix ci error
    
    * fix checkstyle
    
    * fix ci
---
 .../gluten/vectorized/StorageJoinBuilder.java      |  10 +-
 .../gluten/backendsapi/clickhouse/CHBackend.scala  |   1 +
 .../backendsapi/clickhouse/CHMetricsApi.scala      |  29 +-
 .../clickhouse/CHSparkPlanExecApi.scala            |  22 +-
 .../CHBroadcastNestedLoopJoinExecTransformer.scala | 109 ++++++++
 .../extension/FallbackBroadcaseHashJoinRules.scala | 196 +++++++++----
 .../BroadcastNestedLoopJoinMetricsUpdater.scala    | 107 +++++++
 .../GlutenClickHouseTPCDSAbstractSuite.scala       |  11 +-
 .../GlutenClickHouseTPCHSaltNullParquetSuite.scala |  10 +-
 cpp-ch/local-engine/Common/CHUtil.cpp              |  74 ++++-
 cpp-ch/local-engine/Common/CHUtil.h                |  18 ++
 cpp-ch/local-engine/Join/BroadCastJoinBuilder.cpp  |  71 ++++-
 cpp-ch/local-engine/Join/BroadCastJoinBuilder.h    |   2 +-
 .../Join/StorageJoinFromReadBuffer.cpp             |  85 ++----
 .../local-engine/Join/StorageJoinFromReadBuffer.h  |   6 +-
 cpp-ch/local-engine/Parser/CrossRelParser.cpp      | 307 +++++++++++++++++++++
 .../Parser/{JoinRelParser.h => CrossRelParser.h}   |  25 +-
 cpp-ch/local-engine/Parser/JoinRelParser.cpp       |  42 +--
 cpp-ch/local-engine/Parser/JoinRelParser.h         |   2 -
 cpp-ch/local-engine/Parser/RelParser.cpp           |   2 +
 .../local-engine/Parser/SerializedPlanParser.cpp   |   1 +
 cpp-ch/local-engine/Parser/SerializedPlanParser.h  |   1 +
 cpp-ch/local-engine/local_engine_jni.cpp           |   3 +-
 .../substrait/proto/substrait/algebra.proto        |   1 +
 .../gluten/backendsapi/BackendSettingsApi.scala    |   2 +-
 .../BroadcastNestedLoopJoinExecTransformer.scala   |  62 ++++-
 .../org/apache/gluten/execution/JoinUtils.scala    |   3 +-
 .../extension/columnar/MiscColumnarRules.scala     |  22 ++
 .../extension/columnar/validator/Validators.scala  |   2 -
 .../org/apache/gluten/utils/SubstraitUtil.scala    |   4 +
 .../execution/ColumnarBroadcastExchangeExec.scala  |   9 +-
 .../spark/sql/gluten/GlutenFallbackSuite.scala     |   2 +-
 32 files changed, 995 insertions(+), 246 deletions(-)

diff --git 
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/StorageJoinBuilder.java
 
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/StorageJoinBuilder.java
index 9cb49b6a2..27725998f 100644
--- 
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/StorageJoinBuilder.java
+++ 
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/StorageJoinBuilder.java
@@ -74,12 +74,20 @@ public class StorageJoinBuilder {
                   return converter.genColumnNameWithExprId(attr);
                 })
             .collect(Collectors.joining(","));
+
+    int joinType;
+    if 
(broadCastContext.buildHashTableId().startsWith("BuiltBNLJBroadcastTable-")) {
+      joinType = 
SubstraitUtil.toCrossRelSubstrait(broadCastContext.joinType()).ordinal();
+    } else {
+      joinType = 
SubstraitUtil.toSubstrait(broadCastContext.joinType()).ordinal();
+    }
+
     return nativeBuild(
         broadCastContext.buildHashTableId(),
         batches,
         rowCount,
         joinKey,
-        SubstraitUtil.toSubstrait(broadCastContext.joinType()).ordinal(),
+        joinType,
         broadCastContext.hasMixedFiltCondition(),
         toNameStruct(output).toByteArray());
   }
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index 3b8499ac8..6b23c6f39 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -298,4 +298,5 @@ object CHBackendSettings extends BackendSettingsApi with 
Logging {
   }
 
   override def mergeTwoPhasesHashBaseAggregateIfNeed(): Boolean = true
+
 }
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
index a5fb4a185..5465e9b60 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
@@ -348,16 +348,29 @@ class CHMetricsApi extends MetricsApi with Logging with 
LogLevelUtil {
       metrics: Map[String, SQLMetric]): MetricsUpdater = new 
HashJoinMetricsUpdater(metrics)
 
   override def genNestedLoopJoinTransformerMetrics(
-      sparkContext: SparkContext): Map[String, SQLMetric] = {
-    throw new UnsupportedOperationException(
-      s"NestedLoopJoinTransformer metrics update is not supported in CH 
backend")
-  }
+      sparkContext: SparkContext): Map[String, SQLMetric] = Map(
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
+    "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output 
vectors"),
+    "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
output bytes"),
+    "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input 
rows"),
+    "inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of input 
bytes"),
+    "extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra 
operators time"),
+    "inputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of 
waiting for data"),
+    "outputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of 
waiting for output"),
+    "postProjectTime" ->
+      SQLMetrics.createTimingMetric(sparkContext, "time of postProjection"),
+    "probeTime" ->
+      SQLMetrics.createTimingMetric(sparkContext, "time of probe"),
+    "totalTime" -> SQLMetrics.createTimingMetric(sparkContext, "time"),
+    "fillingRightJoinSideTime" -> SQLMetrics.createTimingMetric(
+      sparkContext,
+      "filling right join side time"),
+    "conditionTime" -> SQLMetrics.createTimingMetric(sparkContext, "join 
condition time")
+  )
 
   override def genNestedLoopJoinTransformerMetricsUpdater(
-      metrics: Map[String, SQLMetric]): MetricsUpdater = {
-    throw new UnsupportedOperationException(
-      s"NestedLoopJoinTransformer metrics update is not supported in CH 
backend")
-  }
+      metrics: Map[String, SQLMetric]): MetricsUpdater = new 
BroadcastNestedLoopJoinMetricsUpdater(
+    metrics)
 
   override def genSampleTransformerMetrics(sparkContext: SparkContext): 
Map[String, SQLMetric] = {
     throw new UnsupportedOperationException(
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index 7665216ce..ffe8f2f9e 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -373,8 +373,13 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
       buildSide: BuildSide,
       joinType: JoinType,
       condition: Option[Expression]): BroadcastNestedLoopJoinExecTransformer =
-    throw new GlutenNotSupportException(
-      "BroadcastNestedLoopJoinExecTransformer is not supported in ch backend.")
+    CHBroadcastNestedLoopJoinExecTransformer(
+      left,
+      right,
+      buildSide,
+      joinType,
+      condition
+    )
 
   override def genSampleExecTransformer(
       lowerBound: Double,
@@ -460,16 +465,23 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
       child: SparkPlan,
       numOutputRows: SQLMetric,
       dataSize: SQLMetric): BuildSideRelation = {
-    val hashedRelationBroadcastMode = 
mode.asInstanceOf[HashedRelationBroadcastMode]
+
+    val buildKeys: Seq[Expression] = mode match {
+      case mode1: HashedRelationBroadcastMode =>
+        mode1.key
+      case _ =>
+        // IdentityBroadcastMode
+        Seq.empty
+    }
+
     val (newChild, newOutput, newBuildKeys) =
       if (
-        hashedRelationBroadcastMode.key
+        buildKeys
           .forall(k => k.isInstanceOf[AttributeReference] || 
k.isInstanceOf[BoundReference])
       ) {
         (child, child.output, Seq.empty[Expression])
       } else {
         // pre projection in case of expression join keys
-        val buildKeys = hashedRelationBroadcastMode.key
         val appendedProjections = new ArrayBuffer[NamedExpression]()
         val preProjectionBuildKeys = buildKeys.zipWithIndex.map {
           case (e, idx) =>
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHBroadcastNestedLoopJoinExecTransformer.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHBroadcastNestedLoopJoinExecTransformer.scala
new file mode 100644
index 000000000..35be8ee0b
--- /dev/null
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHBroadcastNestedLoopJoinExecTransformer.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.extension.ValidationResult
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.rpc.GlutenDriverEndpoint
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.optimizer.BuildSide
+import org.apache.spark.sql.catalyst.plans.{InnerLike, JoinType, LeftSemi}
+import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.joins.BuildSideRelation
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import com.google.protobuf.{Any, StringValue}
+
+case class CHBroadcastNestedLoopJoinExecTransformer(
+    left: SparkPlan,
+    right: SparkPlan,
+    buildSide: BuildSide,
+    joinType: JoinType,
+    condition: Option[Expression])
+  extends BroadcastNestedLoopJoinExecTransformer(
+    left,
+    right,
+    buildSide,
+    joinType,
+    condition
+  ) {
+
+  override def columnarInputRDDs: Seq[RDD[ColumnarBatch]] = {
+    val streamedRDD = getColumnarInputRDDs(streamedPlan)
+    val executionId = 
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
+    if (executionId != null) {
+      GlutenDriverEndpoint.collectResources(executionId, buildBroadcastTableId)
+    } else {
+      logWarning(
+        s"Can't not trace broadcast table data $buildBroadcastTableId" +
+          s" because execution id is null." +
+          s" Will clean up until expire time.")
+    }
+    val broadcast = buildPlan.executeBroadcast[BuildSideRelation]()
+    val context =
+      BroadCastHashJoinContext(Seq.empty, joinType, false, buildPlan.output, 
buildBroadcastTableId)
+    val broadcastRDD = CHBroadcastBuildSideRDD(sparkContext, broadcast, 
context)
+    streamedRDD :+ broadcastRDD
+  }
+
+  override protected def withNewChildrenInternal(
+      newLeft: SparkPlan,
+      newRight: SparkPlan): CHBroadcastNestedLoopJoinExecTransformer =
+    copy(left = newLeft, right = newRight)
+
+  def isMixedCondition(cond: Option[Expression]): Boolean = {
+    val res = if (cond.isDefined) {
+      val leftOutputSet = left.outputSet
+      val rightOutputSet = right.outputSet
+      val allReferences = cond.get.references
+      !(allReferences.subsetOf(leftOutputSet) || 
allReferences.subsetOf(rightOutputSet))
+    } else {
+      false
+    }
+    res
+  }
+
+  override def genJoinParameters(): Any = {
+    // for ch
+    val joinParametersStr = new StringBuffer("JoinParameters:")
+    joinParametersStr
+      .append("buildHashTableId=")
+      .append(buildBroadcastTableId)
+      .append("\n")
+    val message = StringValue
+      .newBuilder()
+      .setValue(joinParametersStr.toString)
+      .build()
+    BackendsApiManager.getTransformerApiInstance.packPBMessage(message)
+  }
+
+  override def validateJoinTypeAndBuildSide(): ValidationResult = {
+    joinType match {
+      case _: InnerLike =>
+      case _ =>
+        if (joinType == LeftSemi || condition.isDefined) {
+          return ValidationResult.notOk(
+            s"Broadcast Nested Loop join is not supported join type $joinType 
with conditions")
+        }
+    }
+
+    ValidationResult.ok
+  }
+
+}
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/FallbackBroadcaseHashJoinRules.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/FallbackBroadcaseHashJoinRules.scala
index 59c2d6494..c7f9b47de 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/FallbackBroadcaseHashJoinRules.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/FallbackBroadcaseHashJoinRules.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.{ColumnarBroadcastExchangeExec, 
SparkPlan}
 import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
BroadcastExchangeLike, ReusedExchangeExec}
-import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
+import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, 
BroadcastNestedLoopJoinExec}
 
 import scala.util.control.Breaks.{break, breakable}
 
@@ -89,10 +89,58 @@ case class FallbackBroadcastHashJoinPrepQueryStage(session: 
SparkSession) extend
           // Skip. This might be the case that the exchange was already
           // executed in earlier stage
         }
+      case bnlj: BroadcastNestedLoopJoinExec => applyBNLJPrepQueryStage(bnlj)
       case _ =>
     }
     plan
   }
+
+  private def applyBNLJPrepQueryStage(bnlj: BroadcastNestedLoopJoinExec) = {
+    val buildSidePlan = bnlj.buildSide match {
+      case BuildLeft => bnlj.left
+      case BuildRight => bnlj.right
+    }
+    val maybeExchange = buildSidePlan.find {
+      case BroadcastExchangeExec(_, _) => true
+      case _ => false
+    }
+    maybeExchange match {
+      case Some(exchange @ BroadcastExchangeExec(mode, child)) =>
+        val isTransformable =
+          if (
+            !GlutenConfig.getConf.enableColumnarBroadcastExchange ||
+            !GlutenConfig.getConf.enableColumnarBroadcastJoin
+          ) {
+            ValidationResult.notOk(
+              "columnar broadcast exchange is disabled or " +
+                "columnar broadcast join is disabled")
+          } else {
+            if (FallbackTags.nonEmpty(bnlj)) {
+              ValidationResult.notOk("broadcast join is already tagged as not 
transformable")
+            } else {
+              val transformer = BackendsApiManager.getSparkPlanExecApiInstance
+                .genBroadcastNestedLoopJoinExecTransformer(
+                  bnlj.left,
+                  bnlj.right,
+                  bnlj.buildSide,
+                  bnlj.joinType,
+                  bnlj.condition)
+              val isTransformable = transformer.doValidate()
+              if (isTransformable.isValid) {
+                val exchangeTransformer = ColumnarBroadcastExchangeExec(mode, 
child)
+                exchangeTransformer.doValidate()
+              } else {
+                isTransformable
+              }
+            }
+          }
+        FallbackTags.add(bnlj, isTransformable)
+        FallbackTags.add(exchange, isTransformable)
+      case _ =>
+      // Skip. This might be the case that the exchange was already
+      // executed in earlier stage
+    }
+  }
 }
 
 // For similar purpose with FallbackBroadcastHashJoinPrepQueryStage, executed 
during applying
@@ -103,6 +151,10 @@ case class FallbackBroadcastHashJoin(session: 
SparkSession) extends Rule[SparkPl
     GlutenConfig.getConf.enableColumnarBroadcastJoin &&
       GlutenConfig.getConf.enableColumnarBroadcastExchange
 
+  private val enableColumnarBroadcastNestedLoopJoin: Boolean =
+    GlutenConfig.getConf.broadcastNestedLoopJoinTransformerTransformerEnabled 
&&
+      GlutenConfig.getConf.enableColumnarBroadcastExchange
+
   override def apply(plan: SparkPlan): SparkPlan = {
     plan.foreachUp {
       p =>
@@ -138,63 +190,9 @@ case class FallbackBroadcastHashJoin(session: 
SparkSession) extends Rule[SparkPl
                   case BuildRight => bhj.right
                 }
 
-                val maybeExchange = buildSidePlan
-                  .find {
-                    case BroadcastExchangeExec(_, _) => true
-                    case _ => false
-                  }
-                  .map(_.asInstanceOf[BroadcastExchangeExec])
-
-                maybeExchange match {
-                  case Some(exchange @ BroadcastExchangeExec(mode, child)) =>
-                    isBhjTransformable.tagOnFallback(bhj)
-                    if (!isBhjTransformable.isValid) {
-                      FallbackTags.add(exchange, isBhjTransformable)
-                    }
-                  case None =>
-                    // we are in AQE, find the hidden exchange
-                    // FIXME did we consider the case that AQE: OFF && Reuse: 
ON ?
-                    var maybeHiddenExchange: Option[BroadcastExchangeLike] = 
None
-                    breakable {
-                      buildSidePlan.foreach {
-                        case e: BroadcastExchangeLike =>
-                          maybeHiddenExchange = Some(e)
-                          break
-                        case t: BroadcastQueryStageExec =>
-                          t.plan.foreach {
-                            case e2: BroadcastExchangeLike =>
-                              maybeHiddenExchange = Some(e2)
-                              break
-                            case r: ReusedExchangeExec =>
-                              r.child match {
-                                case e2: BroadcastExchangeLike =>
-                                  maybeHiddenExchange = Some(e2)
-                                  break
-                                case _ =>
-                              }
-                            case _ =>
-                          }
-                        case _ =>
-                      }
-                    }
-                    // restriction to force the hidden exchange to be found
-                    val exchange = maybeHiddenExchange.get
-                    // to conform to the underlying exchange's type, columnar 
or vanilla
-                    exchange match {
-                      case BroadcastExchangeExec(mode, child) =>
-                        FallbackTags.add(
-                          bhj,
-                          "it's a materialized broadcast exchange or reused 
broadcast exchange")
-                      case ColumnarBroadcastExchangeExec(mode, child) =>
-                        if (!isBhjTransformable.isValid) {
-                          throw new IllegalStateException(
-                            s"BroadcastExchange has already been" +
-                              s" transformed to columnar version but BHJ is 
determined as" +
-                              s" non-transformable: ${bhj.toString()}")
-                        }
-                    }
-                }
+                preTagBroadcastExchangeFallback(bhj, buildSidePlan, 
isBhjTransformable)
               }
+            case bnlj: BroadcastNestedLoopJoinExec => applyBNLJFallback(bnlj)
             case _ =>
           }
         } catch {
@@ -207,4 +205,88 @@ case class FallbackBroadcastHashJoin(session: 
SparkSession) extends Rule[SparkPl
     }
     plan
   }
+
+  private def applyBNLJFallback(bnlj: BroadcastNestedLoopJoinExec) = {
+    if (!enableColumnarBroadcastNestedLoopJoin) {
+      FallbackTags.add(bnlj, "columnar BroadcastJoin is not enabled in 
BroadcastNestedLoopJoinExec")
+    }
+
+    val transformer = BackendsApiManager.getSparkPlanExecApiInstance
+      .genBroadcastNestedLoopJoinExecTransformer(
+        bnlj.left,
+        bnlj.right,
+        bnlj.buildSide,
+        bnlj.joinType,
+        bnlj.condition)
+
+    val isBNLJTransformable = transformer.doValidate()
+    val buildSidePlan = bnlj.buildSide match {
+      case BuildLeft => bnlj.left
+      case BuildRight => bnlj.right
+    }
+
+    preTagBroadcastExchangeFallback(bnlj, buildSidePlan, isBNLJTransformable)
+  }
+
+  private def preTagBroadcastExchangeFallback(
+      plan: SparkPlan,
+      buildSidePlan: SparkPlan,
+      isTransformable: ValidationResult): Unit = {
+    val maybeExchange = buildSidePlan
+      .find {
+        case BroadcastExchangeExec(_, _) => true
+        case _ => false
+      }
+      .map(_.asInstanceOf[BroadcastExchangeExec])
+
+    maybeExchange match {
+      case Some(exchange @ BroadcastExchangeExec(_, _)) =>
+        isTransformable.tagOnFallback(plan)
+        if (!isTransformable.isValid) {
+          FallbackTags.add(exchange, isTransformable)
+        }
+      case None =>
+        // we are in AQE, find the hidden exchange
+        // FIXME did we consider the case that AQE: OFF && Reuse: ON ?
+        var maybeHiddenExchange: Option[BroadcastExchangeLike] = None
+        breakable {
+          buildSidePlan.foreach {
+            case e: BroadcastExchangeLike =>
+              maybeHiddenExchange = Some(e)
+              break
+            case t: BroadcastQueryStageExec =>
+              t.plan.foreach {
+                case e2: BroadcastExchangeLike =>
+                  maybeHiddenExchange = Some(e2)
+                  break
+                case r: ReusedExchangeExec =>
+                  r.child match {
+                    case e2: BroadcastExchangeLike =>
+                      maybeHiddenExchange = Some(e2)
+                      break
+                    case _ =>
+                  }
+                case _ =>
+              }
+            case _ =>
+          }
+        }
+        // restriction to force the hidden exchange to be found
+        val exchange = maybeHiddenExchange.get
+        // to conform to the underlying exchange's type, columnar or vanilla
+        exchange match {
+          case BroadcastExchangeExec(mode, child) =>
+            FallbackTags.add(
+              plan,
+              "it's a materialized broadcast exchange or reused broadcast 
exchange")
+          case ColumnarBroadcastExchangeExec(mode, child) =>
+            if (!isTransformable.isValid) {
+              throw new IllegalStateException(
+                s"BroadcastExchange has already been" +
+                  s" transformed to columnar version but BHJ is determined as" 
+
+                  s" non-transformable: ${plan.toString()}")
+            }
+        }
+    }
+  }
 }
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/BroadcastNestedLoopJoinMetricsUpdater.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/BroadcastNestedLoopJoinMetricsUpdater.scala
new file mode 100644
index 000000000..b1414bf97
--- /dev/null
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/BroadcastNestedLoopJoinMetricsUpdater.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.metrics
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.execution.metric.SQLMetric
+
+class BroadcastNestedLoopJoinMetricsUpdater(val metrics: Map[String, 
SQLMetric])
+  extends MetricsUpdater
+  with Logging {
+
+  override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
+    try {
+      if (opMetrics != null) {
+        val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics]
+        if (!operatorMetrics.metricsList.isEmpty && operatorMetrics.joinParams 
!= null) {
+          val joinParams = operatorMetrics.joinParams
+          var currentIdx = operatorMetrics.metricsList.size() - 1
+          var totalTime = 0L
+
+          // update fillingRightJoinSideTime
+          MetricsUtil
+            .getAllProcessorList(operatorMetrics.metricsList.get(currentIdx))
+            .foreach(
+              processor => {
+                if (processor.name.equalsIgnoreCase("FillingRightJoinSide")) {
+                  metrics("fillingRightJoinSideTime") += (processor.time / 
1000L).toLong
+                }
+              })
+
+          // joining
+          val joinMetricsData = operatorMetrics.metricsList.get(currentIdx)
+          metrics("outputVectors") += joinMetricsData.outputVectors
+          metrics("inputWaitTime") += (joinMetricsData.inputWaitTime / 
1000L).toLong
+          metrics("outputWaitTime") += (joinMetricsData.outputWaitTime / 
1000L).toLong
+          totalTime += joinMetricsData.time
+
+          MetricsUtil
+            .getAllProcessorList(joinMetricsData)
+            .foreach(
+              processor => {
+                if (processor.name.equalsIgnoreCase("FillingRightJoinSide")) {
+                  metrics("fillingRightJoinSideTime") += (processor.time / 
1000L).toLong
+                }
+                if (processor.name.equalsIgnoreCase("FilterTransform")) {
+                  metrics("conditionTime") += (processor.time / 1000L).toLong
+                  metrics("numOutputRows") += processor.outputRows - 
processor.inputRows
+                  metrics("outputBytes") += processor.outputBytes - 
processor.inputBytes
+                }
+                if (processor.name.equalsIgnoreCase("JoiningTransform")) {
+                  metrics("probeTime") += (processor.time / 1000L).toLong
+                }
+                if (
+                  
!BroadcastNestedLoopJoinMetricsUpdater.INCLUDING_PROCESSORS.contains(
+                    processor.name)
+                ) {
+                  metrics("extraTime") += (processor.time / 1000L).toLong
+                }
+                if (
+                  
BroadcastNestedLoopJoinMetricsUpdater.CH_PLAN_NODE_NAME.contains(processor.name)
+                ) {
+                  metrics("numOutputRows") += processor.outputRows
+                  metrics("outputBytes") += processor.outputBytes
+                  metrics("numInputRows") += processor.inputRows
+                  metrics("inputBytes") += processor.inputBytes
+                }
+              })
+
+          currentIdx -= 1
+
+          // post projection
+          if (joinParams.postProjectionNeeded) {
+            metrics("postProjectTime") +=
+              (operatorMetrics.metricsList.get(currentIdx).time / 1000L).toLong
+            metrics("outputVectors") += 
operatorMetrics.metricsList.get(currentIdx).outputVectors
+            totalTime += operatorMetrics.metricsList.get(currentIdx).time
+            currentIdx -= 1
+          }
+          metrics("totalTime") += (totalTime / 1000L).toLong
+        }
+      }
+    } catch {
+      case e: Exception =>
+        logError(s"Updating native metrics failed due to ${e.getCause}.")
+        throw e
+    }
+  }
+}
+
+object BroadcastNestedLoopJoinMetricsUpdater {
+  val INCLUDING_PROCESSORS = Array("JoiningTransform", "FillingRightJoinSide", 
"FilterTransform")
+  val CH_PLAN_NODE_NAME = Array("JoiningTransform")
+}
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSAbstractSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSAbstractSuite.scala
index bcdc1f5ef..4a732785c 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSAbstractSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSAbstractSuite.scala
@@ -56,17 +56,11 @@ abstract class GlutenClickHouseTPCDSAbstractSuite
             Seq("q" + "%d".format(queryNum))
           }
           val noFallBack = queryNum match {
-            case i
-                if i == 10 || i == 16 || i == 28 || i == 35 || i == 45 || i == 
77 ||
-                  i == 88 || i == 90 || i == 94 =>
+            case i if i == 10 || i == 16 || i == 35 || i == 45 || i == 94 =>
               // Q10 BroadcastHashJoin, ExistenceJoin
               // Q16 ShuffledHashJoin, NOT condition
-              // Q28 BroadcastNestedLoopJoin
               // Q35 BroadcastHashJoin, ExistenceJoin
               // Q45 BroadcastHashJoin, ExistenceJoin
-              // Q77 CartesianProduct
-              // Q88 BroadcastNestedLoopJoin
-              // Q90 BroadcastNestedLoopJoin
               // Q94 BroadcastHashJoin, LeftSemi, NOT condition
               (false, false)
             case j if j == 38 || j == 87 =>
@@ -76,6 +70,9 @@ abstract class GlutenClickHouseTPCDSAbstractSuite
               } else {
                 (false, true)
               }
+            case q77 if q77 == 77 && !isAqe =>
+              // Q77 CartesianProduct
+              (false, false)
             case other => (true, false)
           }
           sqlNums.map((_, noFallBack._1, noFallBack._2))
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSaltNullParquetSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSaltNullParquetSuite.scala
index b7bf818a3..bd8a37d92 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSaltNullParquetSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSaltNullParquetSuite.scala
@@ -1754,7 +1754,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends 
GlutenClickHouseTPCHAbstr
         |     on t0.a = t1.a
         |   ) t3
         | )""".stripMargin
-    compareResultsAgainstVanillaSpark(sql1, true, { _ => }, false)
+    compareResultsAgainstVanillaSpark(sql1, true, { _ => })
 
     val sql2 =
       """
@@ -1775,7 +1775,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends 
GlutenClickHouseTPCHAbstr
         |     on t0.a = t1.a
         |   ) t3
         | )""".stripMargin
-    compareResultsAgainstVanillaSpark(sql2, true, { _ => }, false)
+    compareResultsAgainstVanillaSpark(sql2, true, { _ => })
 
     val sql3 =
       """
@@ -1796,7 +1796,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends 
GlutenClickHouseTPCHAbstr
         |     on t0.a = t1.a
         |   ) t3
         | )""".stripMargin
-    compareResultsAgainstVanillaSpark(sql3, true, { _ => }, false)
+    compareResultsAgainstVanillaSpark(sql3, true, { _ => })
 
     val sql4 =
       """
@@ -1817,7 +1817,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends 
GlutenClickHouseTPCHAbstr
         |     on t0.a = t1.a
         |   ) t3
         | )""".stripMargin
-    compareResultsAgainstVanillaSpark(sql4, true, { _ => }, false)
+    compareResultsAgainstVanillaSpark(sql4, true, { _ => })
 
     val sql5 =
       """
@@ -1838,7 +1838,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends 
GlutenClickHouseTPCHAbstr
         |     on t0.a = t1.a
         |   ) t3
         | )""".stripMargin
-    compareResultsAgainstVanillaSpark(sql5, true, { _ => }, false)
+    compareResultsAgainstVanillaSpark(sql5, true, { _ => })
   }
 
   test("GLUTEN-1874 not null in one stream") {
diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp 
b/cpp-ch/local-engine/Common/CHUtil.cpp
index a3c856334..b74c18dd1 100644
--- a/cpp-ch/local-engine/Common/CHUtil.cpp
+++ b/cpp-ch/local-engine/Common/CHUtil.cpp
@@ -51,6 +51,7 @@
 #include <Parser/RelParser.h>
 #include <Parser/SerializedPlanParser.h>
 #include <Processors/Chunk.h>
+#include <Processors/QueryPlan/ExpressionStep.h>
 #include <Processors/QueryPlan/QueryPlan.h>
 #include <QueryPipeline/QueryPipelineBuilder.h>
 #include <QueryPipeline/printPipeline.h>
@@ -60,7 +61,6 @@
 #include <boost/algorithm/string/case_conv.hpp>
 #include <boost/algorithm/string/predicate.hpp>
 #include <google/protobuf/util/json_util.h>
-#include <google/protobuf/wrappers.pb.h>
 #include <sys/resource.h>
 #include <Poco/Logger.h>
 #include <Poco/Util/MapConfiguration.h>
@@ -84,8 +84,6 @@ extern const int CANNOT_PARSE_PROTOBUF_SCHEMA;
 
 namespace local_engine
 {
-constexpr auto VIRTUAL_ROW_COUNT_COLUMN = "__VIRTUAL_ROW_COUNT_COLUMN__";
-
 namespace fs = std::filesystem;
 
 DB::Block BlockUtil::buildRowCountHeader()
@@ -128,6 +126,27 @@ DB::Block BlockUtil::buildHeader(const 
DB::NamesAndTypesList & names_types_list)
     return DB::Block(cols);
 }
 
+/// The column names may be different in two blocks.
+/// and the nullability also could be different, with TPCDS-Q1 as an example.
+DB::ColumnWithTypeAndName
+BlockUtil::convertColumnAsNecessary(const DB::ColumnWithTypeAndName & column, 
const DB::ColumnWithTypeAndName & sample_column)
+{
+    if (sample_column.type->equals(*column.type))
+        return {column.column, column.type, sample_column.name};
+    else if (sample_column.type->isNullable() && !column.type->isNullable() && 
DB::removeNullable(sample_column.type)->equals(*column.type))
+    {
+        auto nullable_column = column;
+        DB::JoinCommon::convertColumnToNullable(nullable_column);
+        return {nullable_column.column, sample_column.type, 
sample_column.name};
+    }
+    else
+        throw DB::Exception(
+            DB::ErrorCodes::LOGICAL_ERROR,
+            "Columns have different types. original:{} expected:{}",
+            column.dumpStructure(),
+            sample_column.dumpStructure());
+}
+
 /**
  * There is a special case with which we need be careful. In spark, 
struct/map/list are always
  * wrapped in Nullable, but this should not happen in clickhouse.
@@ -1056,4 +1075,53 @@ UInt64 MemoryUtil::getMemoryRSS()
     return rss * sysconf(_SC_PAGESIZE);
 }
 
+
+void JoinUtil::reorderJoinOutput(DB::QueryPlan & plan, DB::Names cols)
+{
+    ActionsDAGPtr project = 
std::make_shared<ActionsDAG>(plan.getCurrentDataStream().header.getNamesAndTypesList());
+    NamesWithAliases project_cols;
+    for (const auto & col : cols)
+    {
+        project_cols.emplace_back(NameWithAlias(col, col));
+    }
+    project->project(project_cols);
+    QueryPlanStepPtr project_step = 
std::make_unique<ExpressionStep>(plan.getCurrentDataStream(), project);
+    project_step->setStepDescription("Reorder Join Output");
+    plan.addStep(std::move(project_step));
+}
+
+std::pair<DB::JoinKind, DB::JoinStrictness> 
JoinUtil::getJoinKindAndStrictness(substrait::JoinRel_JoinType join_type)
+{
+    switch (join_type)
+    {
+        case substrait::JoinRel_JoinType_JOIN_TYPE_INNER:
+            return {DB::JoinKind::Inner, DB::JoinStrictness::All};
+        case substrait::JoinRel_JoinType_JOIN_TYPE_LEFT_SEMI:
+            return {DB::JoinKind::Left, DB::JoinStrictness::Semi};
+        case substrait::JoinRel_JoinType_JOIN_TYPE_ANTI:
+            return {DB::JoinKind::Left, DB::JoinStrictness::Anti};
+        case substrait::JoinRel_JoinType_JOIN_TYPE_LEFT:
+            return {DB::JoinKind::Left, DB::JoinStrictness::All};
+        case substrait::JoinRel_JoinType_JOIN_TYPE_RIGHT:
+            return {DB::JoinKind::Right, DB::JoinStrictness::All};
+        case substrait::JoinRel_JoinType_JOIN_TYPE_OUTER:
+            return {DB::JoinKind::Full, DB::JoinStrictness::All};
+        default:
+            throw Exception(ErrorCodes::UNKNOWN_TYPE, "unsupported join type 
{}.", magic_enum::enum_name(join_type));
+    }
+}
+
+std::pair<DB::JoinKind, DB::JoinStrictness> 
JoinUtil::getCrossJoinKindAndStrictness(substrait::CrossRel_JoinType join_type)
+{
+    switch (join_type)
+    {
+        case substrait::CrossRel_JoinType_JOIN_TYPE_INNER:
+        case substrait::CrossRel_JoinType_JOIN_TYPE_LEFT:
+        case substrait::CrossRel_JoinType_JOIN_TYPE_OUTER:
+            return {DB::JoinKind::Cross, DB::JoinStrictness::All};
+        default:
+            throw Exception(ErrorCodes::UNKNOWN_TYPE, "unsupported join type 
{}.", magic_enum::enum_name(join_type));
+    }
+}
+
 }
diff --git a/cpp-ch/local-engine/Common/CHUtil.h 
b/cpp-ch/local-engine/Common/CHUtil.h
index 3ac0f63ce..65764af7d 100644
--- a/cpp-ch/local-engine/Common/CHUtil.h
+++ b/cpp-ch/local-engine/Common/CHUtil.h
@@ -16,6 +16,7 @@
  * limitations under the License.
  */
 #pragma once
+
 #include <filesystem>
 #include <Core/Block.h>
 #include <Core/ColumnWithTypeAndName.h>
@@ -25,6 +26,8 @@
 #include <Interpreters/Context.h>
 #include <Processors/Chunk.h>
 #include <base/types.h>
+#include <google/protobuf/wrappers.pb.h>
+#include <substrait/algebra.pb.h>
 #include <Common/CurrentThread.h>
 
 namespace DB
@@ -47,6 +50,9 @@ static const std::unordered_set<String> LONG_VALUE_SETTINGS{
 class BlockUtil
 {
 public:
+    static constexpr auto VIRTUAL_ROW_COUNT_COLUMN = 
"__VIRTUAL_ROW_COUNT_COLUMN__";
+    static constexpr auto RIHGT_COLUMN_PREFIX = "broadcast_right_";
+
     // Build a header block with a virtual column which will be
     // use to indicate the number of rows in a block.
     // Commonly seen in the following quries:
@@ -72,6 +78,10 @@ public:
         const std::unordered_set<size_t> & columns_to_skip_flatten = {});
 
     static DB::Block concatenateBlocksMemoryEfficiently(std::vector<DB::Block> 
&& blocks);
+
+    /// The column names may be different in two blocks.
+    /// and the nullability also could be different, with TPCDS-Q1 as an 
example.
+    static DB::ColumnWithTypeAndName convertColumnAsNecessary(const 
DB::ColumnWithTypeAndName & column, const DB::ColumnWithTypeAndName & 
sample_column);
 };
 
 class PODArrayUtil
@@ -296,4 +306,12 @@ private:
     mutable std::mutex mtx;
 };
 
+class JoinUtil
+{
+public:
+    static void reorderJoinOutput(DB::QueryPlan & plan, DB::Names cols);
+    static std::pair<DB::JoinKind, DB::JoinStrictness> 
getJoinKindAndStrictness(substrait::JoinRel_JoinType join_type);
+    static std::pair<DB::JoinKind, DB::JoinStrictness> 
getCrossJoinKindAndStrictness(substrait::CrossRel_JoinType join_type);
+};
+
 }
diff --git a/cpp-ch/local-engine/Join/BroadCastJoinBuilder.cpp 
b/cpp-ch/local-engine/Join/BroadCastJoinBuilder.cpp
index 1c79a00a7..4d5eae6dc 100644
--- a/cpp-ch/local-engine/Join/BroadCastJoinBuilder.cpp
+++ b/cpp-ch/local-engine/Join/BroadCastJoinBuilder.cpp
@@ -15,16 +15,18 @@
  * limitations under the License.
  */
 #include "BroadCastJoinBuilder.h"
+
 #include <Compression/CompressedReadBuffer.h>
 #include <Interpreters/TableJoin.h>
 #include <Join/StorageJoinFromReadBuffer.h>
 #include <Parser/JoinRelParser.h>
 #include <Parser/TypeParser.h>
+#include <QueryPipeline/ProfileInfo.h>
 #include <Shuffle/ShuffleReader.h>
 #include <jni/SharedPointerWrapper.h>
 #include <jni/jni_common.h>
 #include <Poco/StringTokenizer.h>
-#include <Common/CurrentThread.h>
+#include <Common/CHUtil.h>
 #include <Common/JNIUtils.h>
 #include <Common/logger_useful.h>
 
@@ -52,6 +54,20 @@ jlong callJavaGet(const std::string & id)
     return result;
 }
 
+DB::Block resetBuildTableBlockName(Block & block, bool only_one = false)
+{
+    DB::ColumnsWithTypeAndName new_cols;
+    for (const auto & col : block)
+    {
+        // Add a prefix to avoid column name conflicts with left table.
+        new_cols.emplace_back(col.column, col.type, 
BlockUtil::RIHGT_COLUMN_PREFIX + col.name);
+
+        if (only_one)
+            break;
+    }
+    return DB::Block(new_cols);
+}
+
 void cleanBuildHashTable(const std::string & hash_table_id, jlong instance)
 {
     /// Thread status holds raw pointer on query context, thus it always must 
be destroyed
@@ -81,26 +97,71 @@ std::shared_ptr<StorageJoinFromReadBuffer> buildJoin(
     DB::ReadBuffer & input,
     jlong row_count,
     const std::string & join_keys,
-    substrait::JoinRel_JoinType join_type,
+    jint join_type,
     bool has_mixed_join_condition,
     const std::string & named_struct)
 {
     auto join_key_list = Poco::StringTokenizer(join_keys, ",");
     Names key_names;
     for (const auto & key_name : join_key_list)
-        key_names.emplace_back(key_name);
+        key_names.emplace_back(BlockUtil::RIHGT_COLUMN_PREFIX + key_name);
+
     DB::JoinKind kind;
     DB::JoinStrictness strictness;
 
-    std::tie(kind, strictness) = getJoinKindAndStrictness(join_type);
+    if (key.starts_with("BuiltBNLJBroadcastTable-"))
+        std::tie(kind, strictness) = 
JoinUtil::getCrossJoinKindAndStrictness(static_cast<substrait::CrossRel_JoinType>(join_type));
+    else
+        std::tie(kind, strictness) = 
JoinUtil::getJoinKindAndStrictness(static_cast<substrait::JoinRel_JoinType>(join_type));
+
 
     substrait::NamedStruct substrait_struct;
     substrait_struct.ParseFromString(named_struct);
     Block header = TypeParser::buildBlockFromNamedStruct(substrait_struct);
+    header = resetBuildTableBlockName(header);
+
+    Blocks data;
+    {
+        bool header_empty = header.getNamesAndTypesList().empty();
+        bool only_one_column = header_empty;
+        NativeReader block_stream(input);
+        ProfileInfo info;
+        while (Block block = block_stream.read())
+        {
+            if (header_empty)
+            {
+                // In bnlj, buidside output maybe empty,
+                //   we use buildside header only for loop
+                // Like: select count(*) from t1 left join t2
+                header = resetBuildTableBlockName(block, true);
+                header_empty = false;
+            }
+
+            DB::ColumnsWithTypeAndName columns;
+            for (size_t i = 0; i < block.columns(); ++i)
+            {
+                const auto & column = block.getByPosition(i);
+                if (only_one_column)
+                {
+                    auto virtual_block = 
BlockUtil::buildRowCountBlock(column.column->size()).getColumnsWithTypeAndName();
+                    header = virtual_block;
+                    columns.emplace_back(virtual_block.back());
+                    break;
+                }
+
+                
columns.emplace_back(BlockUtil::convertColumnAsNecessary(column, 
header.getByPosition(i)));
+            }
+
+            DB::Block final_block(columns);
+            info.update(final_block);
+            data.emplace_back(std::move(final_block));
+        }
+    }
+
     ColumnsDescription columns_description(header.getNamesAndTypesList());
 
     return make_shared<StorageJoinFromReadBuffer>(
-        input,
+        data,
         row_count,
         key_names,
         true,
diff --git a/cpp-ch/local-engine/Join/BroadCastJoinBuilder.h 
b/cpp-ch/local-engine/Join/BroadCastJoinBuilder.h
index 9a6837e35..3d2e67f9d 100644
--- a/cpp-ch/local-engine/Join/BroadCastJoinBuilder.h
+++ b/cpp-ch/local-engine/Join/BroadCastJoinBuilder.h
@@ -35,7 +35,7 @@ std::shared_ptr<StorageJoinFromReadBuffer> buildJoin(
     DB::ReadBuffer & input,
     jlong row_count,
     const std::string & join_keys,
-    substrait::JoinRel_JoinType join_type,
+    jint join_type,
     bool has_mixed_join_condition,
     const std::string & named_struct);
 void cleanBuildHashTable(const std::string & hash_table_id, jlong instance);
diff --git a/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.cpp 
b/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.cpp
index 326e11a84..2f5afd434 100644
--- a/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.cpp
+++ b/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.cpp
@@ -16,12 +16,10 @@
  */
 #include "StorageJoinFromReadBuffer.h"
 
-#include <DataTypes/DataTypeNullable.h>
 #include <Interpreters/Context.h>
 #include <Interpreters/HashJoin/HashJoin.h>
 #include <Interpreters/TableJoin.h>
-#include <QueryPipeline/ProfileInfo.h>
-#include <Storages/IO/NativeReader.h>
+#include <Common/CHUtil.h>
 #include <Common/Exception.h>
 
 #include <Common/logger_useful.h>
@@ -43,8 +41,6 @@ extern const int DEADLOCK_AVOIDED;
 
 using namespace DB;
 
-constexpr auto RIHGT_COLUMN_PREFIX = "broadcast_right_";
-
 DB::Block rightSampleBlock(bool use_nulls, const StorageInMemoryMetadata & 
storage_metadata_, JoinKind kind)
 {
     DB::ColumnsWithTypeAndName new_cols;
@@ -52,7 +48,7 @@ DB::Block rightSampleBlock(bool use_nulls, const 
StorageInMemoryMetadata & stora
     for (const auto & col : block)
     {
         // Add a prefix to avoid column name conflicts with left table.
-        new_cols.emplace_back(col.column, col.type, RIHGT_COLUMN_PREFIX + 
col.name);
+        new_cols.emplace_back(col.column, col.type, col.name);
         if (use_nulls && isLeftOrFull(kind))
         {
             auto & new_col = new_cols.back();
@@ -66,7 +62,7 @@ namespace local_engine
 {
 
 StorageJoinFromReadBuffer::StorageJoinFromReadBuffer(
-    DB::ReadBuffer & in,
+    Blocks & data,
     size_t row_count_,
     const Names & key_names_,
     bool use_nulls_,
@@ -77,7 +73,7 @@ StorageJoinFromReadBuffer::StorageJoinFromReadBuffer(
     const ConstraintsDescription & constraints,
     const String & comment,
     const bool overwrite_)
-    : key_names({}), use_nulls(use_nulls_), row_count(row_count_), 
overwrite(overwrite_)
+    : key_names(key_names_), use_nulls(use_nulls_), row_count(row_count_), 
overwrite(overwrite_)
 {
     storage_metadata.setColumns(columns);
     storage_metadata.setConstraints(constraints);
@@ -86,74 +82,33 @@ StorageJoinFromReadBuffer::StorageJoinFromReadBuffer(
     for (const auto & key : key_names_)
         if (!storage_metadata.getColumns().hasPhysical(key))
             throw Exception(ErrorCodes::NO_SUCH_COLUMN_IN_TABLE, "Key column 
({}) does not exist in table declaration.", key);
-    for (const auto & name : key_names_)
-        key_names.push_back(RIHGT_COLUMN_PREFIX + name);
     auto table_join = std::make_shared<DB::TableJoin>(SizeLimits(), true, 
kind, strictness, key_names);
-    right_sample_block = rightSampleBlock(use_nulls, storage_metadata, 
table_join->kind());
-    /// If there is mixed join conditions, need to build the hash join lazily, 
which rely on the real table join.
-    if (!has_mixed_join_condition)
-        buildJoin(in, right_sample_block, table_join);
-    else
-        collectAllInputs(in, right_sample_block);
-}
 
-/// The column names may be different in two blocks.
-/// and the nullability also could be different, with TPCDS-Q1 as an example.
-static DB::ColumnWithTypeAndName convertColumnAsNecessary(const 
DB::ColumnWithTypeAndName & column, const DB::ColumnWithTypeAndName & 
sample_column)
-{
-    if (sample_column.type->equals(*column.type))
-        return {column.column, column.type, sample_column.name};
-    else if (
-        sample_column.type->isNullable() && !column.type->isNullable()
-        && DB::removeNullable(sample_column.type)->equals(*column.type))
+    if (key_names.empty())
     {
-        auto nullable_column = column;
-        DB::JoinCommon::convertColumnToNullable(nullable_column);
-        return {nullable_column.column, sample_column.type, 
sample_column.name};
+        // For bnlj cross join, keys clauses should be empty.
+        table_join->resetKeys();
     }
+
+    right_sample_block = rightSampleBlock(use_nulls, storage_metadata, 
table_join->kind());
+    /// If there is mixed join conditions, need to build the hash join lazily, 
which rely on the real table join.
+    if (!has_mixed_join_condition)
+        buildJoin(data, right_sample_block, table_join);
     else
-        throw DB::Exception(
-            DB::ErrorCodes::LOGICAL_ERROR,
-            "Columns have different types. original:{} expected:{}",
-            column.dumpStructure(),
-            sample_column.dumpStructure());
+        collectAllInputs(data, right_sample_block);
 }
 
-void StorageJoinFromReadBuffer::buildJoin(DB::ReadBuffer & in, const Block 
header, std::shared_ptr<DB::TableJoin> analyzed_join)
+void StorageJoinFromReadBuffer::buildJoin(Blocks & data, const Block header, 
std::shared_ptr<DB::TableJoin> analyzed_join)
 {
-    local_engine::NativeReader block_stream(in);
-    ProfileInfo info;
     join = std::make_shared<HashJoin>(analyzed_join, header, overwrite, 
row_count);
-    while (Block block = block_stream.read())
-    {
-        DB::ColumnsWithTypeAndName columns;
-        for (size_t i = 0; i < block.columns(); ++i)
-        {
-            const auto & column = block.getByPosition(i);
-            columns.emplace_back(convertColumnAsNecessary(column, 
header.getByPosition(i)));
-        }
-        DB::Block final_block(columns);
-        info.update(final_block);
-        join->addBlockToJoin(final_block, true);
-    }
+    for (Block block : data)
+        join->addBlockToJoin(std::move(block), true);
 }
 
-void StorageJoinFromReadBuffer::collectAllInputs(DB::ReadBuffer & in, const 
DB::Block header)
+void StorageJoinFromReadBuffer::collectAllInputs(Blocks & data, const 
DB::Block)
 {
-    local_engine::NativeReader block_stream(in);
-    ProfileInfo info;
-    while (Block block = block_stream.read())
-    {
-        DB::ColumnsWithTypeAndName columns;
-        for (size_t i = 0; i < block.columns(); ++i)
-        {
-            const auto & column = block.getByPosition(i);
-            columns.emplace_back(convertColumnAsNecessary(column, 
header.getByPosition(i)));
-        }
-        DB::Block final_block(columns);
-        info.update(final_block);
-        input_blocks.emplace_back(std::move(final_block));
-    }
+    for (Block block : data)
+        input_blocks.emplace_back(std::move(block));
 }
 
 void StorageJoinFromReadBuffer::buildJoinLazily(DB::Block header, 
std::shared_ptr<DB::TableJoin> analyzed_join)
@@ -174,7 +129,7 @@ void StorageJoinFromReadBuffer::buildJoinLazily(DB::Block 
header, std::shared_pt
         for (size_t i = 0; i < block.columns(); ++i)
         {
             const auto & column = block.getByPosition(i);
-            columns.emplace_back(convertColumnAsNecessary(column, 
header.getByPosition(i)));
+            columns.emplace_back(BlockUtil::convertColumnAsNecessary(column, 
header.getByPosition(i)));
         }
         DB::Block final_block(columns);
         join->addBlockToJoin(final_block, true);
diff --git a/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.h 
b/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.h
index ddefda69c..600210e66 100644
--- a/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.h
+++ b/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.h
@@ -35,7 +35,7 @@ class StorageJoinFromReadBuffer
 {
 public:
     StorageJoinFromReadBuffer(
-        DB::ReadBuffer & in_,
+        DB::Blocks & data,
         size_t row_count,
         const DB::Names & key_names_,
         bool use_nulls_,
@@ -65,8 +65,8 @@ private:
     std::shared_ptr<DB::HashJoin> join = nullptr;
 
     void readAllBlocksFromInput(DB::ReadBuffer & in);
-    void buildJoin(DB::ReadBuffer & in, const DB::Block header, 
std::shared_ptr<DB::TableJoin> analyzed_join);
-    void collectAllInputs(DB::ReadBuffer & in, const DB::Block header);
+    void buildJoin(DB::Blocks & data, const DB::Block header, 
std::shared_ptr<DB::TableJoin> analyzed_join);
+    void collectAllInputs(DB::Blocks & data, const DB::Block header);
     void buildJoinLazily(DB::Block header, std::shared_ptr<DB::TableJoin> 
analyzed_join);
 };
 }
diff --git a/cpp-ch/local-engine/Parser/CrossRelParser.cpp 
b/cpp-ch/local-engine/Parser/CrossRelParser.cpp
new file mode 100644
index 000000000..ea8986401
--- /dev/null
+++ b/cpp-ch/local-engine/Parser/CrossRelParser.cpp
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "CrossRelParser.h"
+
+#include <IO/ReadBufferFromString.h>
+#include <IO/ReadHelpers.h>
+#include <Interpreters/CollectJoinOnKeysVisitor.h>
+#include <Interpreters/GraceHashJoin.h>
+#include <Interpreters/HashJoin/HashJoin.h>
+#include <Interpreters/TableJoin.h>
+#include <Join/BroadCastJoinBuilder.h>
+#include <Join/StorageJoinFromReadBuffer.h>
+#include <Parser/SerializedPlanParser.h>
+#include <Parsers/ASTIdentifier.h>
+#include <Processors/QueryPlan/ExpressionStep.h>
+#include <Processors/QueryPlan/FilterStep.h>
+#include <Processors/QueryPlan/JoinStep.h>
+#include <google/protobuf/wrappers.pb.h>
+#include <Common/CHUtil.h>
+#include <Common/logger_useful.h>
+
+
+namespace DB
+{
+namespace ErrorCodes
+{
+    extern const int LOGICAL_ERROR;
+    extern const int UNKNOWN_TYPE;
+    extern const int BAD_ARGUMENTS;
+}
+}
+
+using namespace DB;
+
+
+
+
+namespace local_engine
+{
+String parseCrossJoinOptimizationInfos(const substrait::CrossRel & join)
+{
+    google::protobuf::StringValue optimization;
+    
optimization.ParseFromString(join.advanced_extension().optimization().value());
+    String storage_join_key;
+    ReadBufferFromString in(optimization.value());
+    assertString("JoinParameters:", in);
+    assertString("buildHashTableId=", in);
+    readString(storage_join_key, in);
+    return storage_join_key;
+}
+
+std::shared_ptr<DB::TableJoin> 
createCrossTableJoin(substrait::CrossRel_JoinType join_type)
+{
+    auto & global_context = SerializedPlanParser::global_context;
+    auto table_join = std::make_shared<TableJoin>(
+        global_context->getSettings(), 
global_context->getGlobalTemporaryVolume(), 
global_context->getTempDataOnDisk());
+
+    std::pair<DB::JoinKind, DB::JoinStrictness> kind_and_strictness = 
JoinUtil::getCrossJoinKindAndStrictness(join_type);
+    table_join->setKind(kind_and_strictness.first);
+    table_join->setStrictness(kind_and_strictness.second);
+    return table_join;
+}
+
+CrossRelParser::CrossRelParser(SerializedPlanParser * plan_paser_)
+    : RelParser(plan_paser_)
+    , function_mapping(plan_paser_->function_mapping)
+    , context(plan_paser_->context)
+    , extra_plan_holder(plan_paser_->extra_plan_holder)
+{
+}
+
+DB::QueryPlanPtr
+CrossRelParser::parse(DB::QueryPlanPtr /*query_plan*/, const substrait::Rel & 
/*rel*/, std::list<const substrait::Rel *> & /*rel_stack_*/)
+{
+    throw Exception(ErrorCodes::LOGICAL_ERROR, "join node has 2 inputs, can't 
call parse().");
+}
+
+const substrait::Rel & CrossRelParser::getSingleInput(const substrait::Rel & 
/*rel*/)
+{
+    throw Exception(ErrorCodes::LOGICAL_ERROR, "join node has 2 inputs, can't 
call getSingleInput().");
+}
+
+DB::QueryPlanPtr CrossRelParser::parseOp(const substrait::Rel & rel, 
std::list<const substrait::Rel *> & rel_stack)
+{
+    const auto & join = rel.cross();
+    if (!join.has_left() || !join.has_right())
+    {
+        throw Exception(ErrorCodes::BAD_ARGUMENTS, "left table or right table 
is missing.");
+    }
+
+    rel_stack.push_back(&rel);
+    auto left_plan = getPlanParser()->parseOp(join.left(), rel_stack);
+    auto right_plan = getPlanParser()->parseOp(join.right(), rel_stack);
+    rel_stack.pop_back();
+
+    return parseJoin(join, std::move(left_plan), std::move(right_plan));
+}
+
+void CrossRelParser::renamePlanColumns(DB::QueryPlan & left, DB::QueryPlan & 
right, const StorageJoinFromReadBuffer & storage_join)
+{
+    ActionsDAGPtr project = nullptr;
+    /// To support mixed join conditions, we must make sure that the column 
names in the right be the same as
+    /// storage_join's right sample block.
+    auto right_ori_header = 
right.getCurrentDataStream().header.getColumnsWithTypeAndName();
+    if (right_ori_header.size() > 0 && right_ori_header[0].name != 
BlockUtil::VIRTUAL_ROW_COUNT_COLUMN)
+    {
+        project = ActionsDAG::makeConvertingActions(
+            right_ori_header, 
storage_join.getRightSampleBlock().getColumnsWithTypeAndName(), 
ActionsDAG::MatchColumnsMode::Position);
+        if (project)
+        {
+            QueryPlanStepPtr project_step = 
std::make_unique<ExpressionStep>(right.getCurrentDataStream(), project);
+            project_step->setStepDescription("Rename Broadcast Table Name");
+            steps.emplace_back(project_step.get());
+            right.addStep(std::move(project_step));
+        }
+    }
+
+    /// If the columns name in right table is duplicated with left table, we 
need to rename the left table's columns,
+    /// avoid the columns name in the right table be changed in 
`addConvertStep`.
+    /// This could happen in tpc-ds q44.
+    DB::ColumnsWithTypeAndName new_left_cols;
+    const auto & right_header = right.getCurrentDataStream().header;
+    auto left_prefix = getUniqueName("left");
+    for (const auto & col : left.getCurrentDataStream().header)
+        if (right_header.has(col.name))
+            new_left_cols.emplace_back(col.column, col.type, left_prefix + 
col.name);
+        else
+            new_left_cols.emplace_back(col.column, col.type, col.name);
+    auto left_header = 
left.getCurrentDataStream().header.getColumnsWithTypeAndName();
+    project = ActionsDAG::makeConvertingActions(left_header, new_left_cols, 
ActionsDAG::MatchColumnsMode::Position);
+
+    if (project)
+    {
+        QueryPlanStepPtr project_step = 
std::make_unique<ExpressionStep>(left.getCurrentDataStream(), project);
+        project_step->setStepDescription("Rename Left Table Name for broadcast 
join");
+        steps.emplace_back(project_step.get());
+        left.addStep(std::move(project_step));
+    }
+}
+
+DB::QueryPlanPtr CrossRelParser::parseJoin(const substrait::CrossRel & join, 
DB::QueryPlanPtr left, DB::QueryPlanPtr right)
+{
+    auto storage_join_key = parseCrossJoinOptimizationInfos(join);
+    auto storage_join = BroadCastJoinBuilder::getJoin(storage_join_key) ;
+    renamePlanColumns(*left, *right, *storage_join);
+    auto table_join = createCrossTableJoin(join.type());
+    DB::Block right_header_before_convert_step = 
right->getCurrentDataStream().header;
+    addConvertStep(*table_join, *left, *right);
+
+    // Add a check to find error easily.
+    if(!blocksHaveEqualStructure(right_header_before_convert_step, 
right->getCurrentDataStream().header))
+    {
+        throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "For broadcast 
join, we must not change the columns name in the right table.\nleft 
header:{},\nright header: {} -> {}",
+            left->getCurrentDataStream().header.dumpNames(),
+            right_header_before_convert_step.dumpNames(),
+            right->getCurrentDataStream().header.dumpNames());
+    }
+
+    Names after_join_names;
+    auto left_names = left->getCurrentDataStream().header.getNames();
+    after_join_names.insert(after_join_names.end(), left_names.begin(), 
left_names.end());
+    auto right_name = table_join->columnsFromJoinedTable().getNames();
+    after_join_names.insert(after_join_names.end(), right_name.begin(), 
right_name.end());
+
+    auto left_header = left->getCurrentDataStream().header;
+    auto right_header = right->getCurrentDataStream().header;
+
+    QueryPlanPtr query_plan;
+    table_join->addDisjunct();
+    auto broadcast_hash_join = storage_join->getJoinLocked(table_join, 
context);
+    // table_join->resetKeys();
+    QueryPlanStepPtr join_step = 
std::make_unique<FilledJoinStep>(left->getCurrentDataStream(), 
broadcast_hash_join, 8192);
+
+    join_step->setStepDescription("STORAGE_JOIN");
+    steps.emplace_back(join_step.get());
+    left->addStep(std::move(join_step));
+    query_plan = std::move(left);
+    /// hold right plan for profile
+    extra_plan_holder.emplace_back(std::move(right));
+
+    addPostFilter(*query_plan, join);
+    Names cols;
+    for (auto after_join_name : after_join_names)
+    {
+        if (BlockUtil::VIRTUAL_ROW_COUNT_COLUMN == after_join_name)
+            continue;
+
+        cols.emplace_back(after_join_name);
+    }
+    JoinUtil::reorderJoinOutput(*query_plan, cols);
+
+    return query_plan;
+}
+
+
+void CrossRelParser::addPostFilter(DB::QueryPlan & query_plan, const 
substrait::CrossRel & join_rel)
+{
+    if (!join_rel.has_expression())
+        return;
+
+    auto expression = join_rel.expression();
+    std::string filter_name;
+    auto actions_dag = 
std::make_shared<ActionsDAG>(query_plan.getCurrentDataStream().header.getColumnsWithTypeAndName());
+    if (!expression.has_scalar_function())
+    {
+        // It may be singular_or_list
+        auto * in_node = getPlanParser()->parseExpression(actions_dag, 
expression);
+        filter_name = in_node->result_name;
+    }
+    else
+    {
+        
getPlanParser()->parseFunction(query_plan.getCurrentDataStream().header, 
expression, filter_name, actions_dag, true);
+    }
+    auto filter_step = 
std::make_unique<FilterStep>(query_plan.getCurrentDataStream(), actions_dag, 
filter_name, true);
+    filter_step->setStepDescription("Post Join Filter");
+    steps.emplace_back(filter_step.get());
+    query_plan.addStep(std::move(filter_step));
+}
+
+void CrossRelParser::addConvertStep(TableJoin & table_join, DB::QueryPlan & 
left, DB::QueryPlan & right)
+{
+    /// If the columns name in right table is duplicated with left table, we 
need to rename the right table's columns.
+    NameSet left_columns_set;
+    for (const auto & col : left.getCurrentDataStream().header.getNames())
+        left_columns_set.emplace(col);
+    table_join.setColumnsFromJoinedTable(
+        right.getCurrentDataStream().header.getNamesAndTypesList(), 
left_columns_set, getUniqueName("right") + ".");
+
+    // fix right table key duplicate
+    NamesWithAliases right_table_alias;
+    for (size_t idx = 0; idx < table_join.columnsFromJoinedTable().size(); 
idx++)
+    {
+        auto origin_name = 
right.getCurrentDataStream().header.getByPosition(idx).name;
+        auto dedup_name = 
table_join.columnsFromJoinedTable().getNames().at(idx);
+        if (origin_name != dedup_name)
+        {
+            right_table_alias.emplace_back(NameWithAlias(origin_name, 
dedup_name));
+        }
+    }
+    if (!right_table_alias.empty())
+    {
+        ActionsDAGPtr rename_dag = 
std::make_shared<ActionsDAG>(right.getCurrentDataStream().header.getNamesAndTypesList());
+        auto original_right_columns = right.getCurrentDataStream().header;
+        for (const auto & column_alias : right_table_alias)
+        {
+            if (original_right_columns.has(column_alias.first))
+            {
+                auto pos = 
original_right_columns.getPositionByName(column_alias.first);
+                const auto & alias = 
rename_dag->addAlias(*rename_dag->getInputs()[pos], column_alias.second);
+                rename_dag->getOutputs()[pos] = &alias;
+            }
+        }
+
+        QueryPlanStepPtr project_step = 
std::make_unique<ExpressionStep>(right.getCurrentDataStream(), rename_dag);
+        project_step->setStepDescription("Right Table Rename");
+        steps.emplace_back(project_step.get());
+        right.addStep(std::move(project_step));
+    }
+
+    for (const auto & column : table_join.columnsFromJoinedTable())
+    {
+        table_join.addJoinedColumn(column);
+    }
+    ActionsDAGPtr left_convert_actions = nullptr;
+    ActionsDAGPtr right_convert_actions = nullptr;
+    std::tie(left_convert_actions, right_convert_actions) = 
table_join.createConvertingActions(
+        left.getCurrentDataStream().header.getColumnsWithTypeAndName(), 
right.getCurrentDataStream().header.getColumnsWithTypeAndName());
+
+    if (right_convert_actions)
+    {
+        auto converting_step = 
std::make_unique<ExpressionStep>(right.getCurrentDataStream(), 
right_convert_actions);
+        converting_step->setStepDescription("Convert joined columns");
+        steps.emplace_back(converting_step.get());
+        right.addStep(std::move(converting_step));
+    }
+
+    if (left_convert_actions)
+    {
+        auto converting_step = 
std::make_unique<ExpressionStep>(left.getCurrentDataStream(), 
left_convert_actions);
+        converting_step->setStepDescription("Convert joined columns");
+        steps.emplace_back(converting_step.get());
+        left.addStep(std::move(converting_step));
+    }
+}
+
+
+void registerCrossRelParser(RelParserFactory & factory)
+{
+    auto builder = [](SerializedPlanParser * plan_paser) { return 
std::make_shared<CrossRelParser>(plan_paser); };
+    factory.registerBuilder(substrait::Rel::RelTypeCase::kCross, builder);
+}
+
+}
diff --git a/cpp-ch/local-engine/Parser/JoinRelParser.h 
b/cpp-ch/local-engine/Parser/CrossRelParser.h
similarity index 65%
copy from cpp-ch/local-engine/Parser/JoinRelParser.h
copy to cpp-ch/local-engine/Parser/CrossRelParser.h
index c423f4390..f1cd60385 100644
--- a/cpp-ch/local-engine/Parser/JoinRelParser.h
+++ b/cpp-ch/local-engine/Parser/CrossRelParser.h
@@ -17,7 +17,6 @@
 #pragma once
 
 #include <memory>
-#include <unordered_set>
 #include <Parser/RelParser.h>
 #include <substrait/algebra.pb.h>
 
@@ -31,13 +30,12 @@ namespace local_engine
 
 class StorageJoinFromReadBuffer;
 
-std::pair<DB::JoinKind, DB::JoinStrictness> 
getJoinKindAndStrictness(substrait::JoinRel_JoinType join_type);
 
-class JoinRelParser : public RelParser
+class CrossRelParser : public RelParser
 {
 public:
-    explicit JoinRelParser(SerializedPlanParser * plan_paser_);
-    ~JoinRelParser() override = default;
+    explicit CrossRelParser(SerializedPlanParser * plan_paser_);
+    ~CrossRelParser() override = default;
 
     DB::QueryPlanPtr
     parse(DB::QueryPlanPtr query_plan, const substrait::Rel & sort_rel, 
std::list<const substrait::Rel *> & rel_stack_) override;
@@ -52,23 +50,12 @@ private:
     std::vector<QueryPlanPtr> & extra_plan_holder;
 
 
-    DB::QueryPlanPtr parseJoin(const substrait::JoinRel & join, 
DB::QueryPlanPtr left, DB::QueryPlanPtr right);
+    DB::QueryPlanPtr parseJoin(const substrait::CrossRel & join, 
DB::QueryPlanPtr left, DB::QueryPlanPtr right);
     void renamePlanColumns(DB::QueryPlan & left, DB::QueryPlan & right, const 
StorageJoinFromReadBuffer & storage_join);
     void addConvertStep(TableJoin & table_join, DB::QueryPlan & left, 
DB::QueryPlan & right);
-    void collectJoinKeys(
-        TableJoin & table_join, const substrait::JoinRel & join_rel, const 
DB::Block & left_header, const DB::Block & right_header);
-
+    void addPostFilter(DB::QueryPlan & query_plan, const substrait::CrossRel & 
join);
     bool applyJoinFilter(
-        DB::TableJoin & table_join,
-        const substrait::JoinRel & join_rel,
-        DB::QueryPlan & left_plan,
-        DB::QueryPlan & right_plan,
-        bool allow_mixed_condition);
-
-    void addPostFilter(DB::QueryPlan & plan, const substrait::JoinRel & join);
-
-    static std::unordered_set<DB::JoinTableSide> 
extractTableSidesFromExpression(
-        const substrait::Expression & expr, const DB::Block & left_header, 
const DB::Block & right_header);
+        DB::TableJoin & table_join, const substrait::CrossRel & join_rel, 
DB::QueryPlan & left, DB::QueryPlan & right, bool allow_mixed_condition);
 };
 
 }
diff --git a/cpp-ch/local-engine/Parser/JoinRelParser.cpp 
b/cpp-ch/local-engine/Parser/JoinRelParser.cpp
index a6a146954..03734a2a9 100644
--- a/cpp-ch/local-engine/Parser/JoinRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/JoinRelParser.cpp
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 #include "JoinRelParser.h"
+
 #include <IO/ReadBufferFromString.h>
 #include <IO/ReadHelpers.h>
 #include <Interpreters/CollectJoinOnKeysVisitor.h>
@@ -30,6 +31,7 @@
 #include <Processors/QueryPlan/FilterStep.h>
 #include <Processors/QueryPlan/JoinStep.h>
 #include <google/protobuf/wrappers.pb.h>
+#include <Common/CHUtil.h>
 
 #include <Poco/Logger.h>
 #include <Common/logger_useful.h>
@@ -98,51 +100,15 @@ JoinOptimizationInfo parseJoinOptimizationInfo(const 
substrait::JoinRel & join)
     return info;
 }
 
-
-void reorderJoinOutput(DB::QueryPlan & plan, DB::Names cols)
-{
-    ActionsDAGPtr project = 
std::make_shared<ActionsDAG>(plan.getCurrentDataStream().header.getNamesAndTypesList());
-    NamesWithAliases project_cols;
-    for (const auto & col : cols)
-    {
-        project_cols.emplace_back(NameWithAlias(col, col));
-    }
-    project->project(project_cols);
-    QueryPlanStepPtr project_step = 
std::make_unique<ExpressionStep>(plan.getCurrentDataStream(), project);
-    project_step->setStepDescription("Reorder Join Output");
-    plan.addStep(std::move(project_step));
-}
-
 namespace local_engine
 {
-
-std::pair<DB::JoinKind, DB::JoinStrictness> 
getJoinKindAndStrictness(substrait::JoinRel_JoinType join_type)
-{
-    switch (join_type)
-    {
-        case substrait::JoinRel_JoinType_JOIN_TYPE_INNER:
-            return {DB::JoinKind::Inner, DB::JoinStrictness::All};
-        case substrait::JoinRel_JoinType_JOIN_TYPE_LEFT_SEMI:
-            return {DB::JoinKind::Left, DB::JoinStrictness::Semi};
-        case substrait::JoinRel_JoinType_JOIN_TYPE_ANTI:
-            return {DB::JoinKind::Left, DB::JoinStrictness::Anti};
-        case substrait::JoinRel_JoinType_JOIN_TYPE_LEFT:
-            return {DB::JoinKind::Left, DB::JoinStrictness::All};
-        case substrait::JoinRel_JoinType_JOIN_TYPE_RIGHT:
-            return {DB::JoinKind::Right, DB::JoinStrictness::All};
-        case substrait::JoinRel_JoinType_JOIN_TYPE_OUTER:
-            return {DB::JoinKind::Full, DB::JoinStrictness::All};
-        default:
-            throw Exception(ErrorCodes::UNKNOWN_TYPE, "unsupported join type 
{}.", magic_enum::enum_name(join_type));
-    }
-}
 std::shared_ptr<DB::TableJoin> 
createDefaultTableJoin(substrait::JoinRel_JoinType join_type)
 {
     auto & global_context = SerializedPlanParser::global_context;
     auto table_join = std::make_shared<TableJoin>(
         global_context->getSettings(), 
global_context->getGlobalTemporaryVolume(), 
global_context->getTempDataOnDisk());
 
-    std::pair<DB::JoinKind, DB::JoinStrictness> kind_and_strictness = 
getJoinKindAndStrictness(join_type);
+    std::pair<DB::JoinKind, DB::JoinStrictness> kind_and_strictness = 
JoinUtil::getJoinKindAndStrictness(join_type);
     table_join->setKind(kind_and_strictness.first);
     table_join->setStrictness(kind_and_strictness.second);
     return table_join;
@@ -436,7 +402,7 @@ DB::QueryPlanPtr JoinRelParser::parseJoin(const 
substrait::JoinRel & join, DB::Q
         query_plan = std::make_unique<QueryPlan>();
         query_plan->unitePlans(std::move(join_step), {std::move(plans)});
     }
-    reorderJoinOutput(*query_plan, after_join_names);
+    JoinUtil::reorderJoinOutput(*query_plan, after_join_names);
 
     return query_plan;
 }
diff --git a/cpp-ch/local-engine/Parser/JoinRelParser.h 
b/cpp-ch/local-engine/Parser/JoinRelParser.h
index c423f4390..15468b54b 100644
--- a/cpp-ch/local-engine/Parser/JoinRelParser.h
+++ b/cpp-ch/local-engine/Parser/JoinRelParser.h
@@ -31,8 +31,6 @@ namespace local_engine
 
 class StorageJoinFromReadBuffer;
 
-std::pair<DB::JoinKind, DB::JoinStrictness> 
getJoinKindAndStrictness(substrait::JoinRel_JoinType join_type);
-
 class JoinRelParser : public RelParser
 {
 public:
diff --git a/cpp-ch/local-engine/Parser/RelParser.cpp 
b/cpp-ch/local-engine/Parser/RelParser.cpp
index 282339c4d..f651146a3 100644
--- a/cpp-ch/local-engine/Parser/RelParser.cpp
+++ b/cpp-ch/local-engine/Parser/RelParser.cpp
@@ -156,6 +156,7 @@ void registerAggregateParser(RelParserFactory & factory);
 void registerProjectRelParser(RelParserFactory & factory);
 void registerJoinRelParser(RelParserFactory & factory);
 void registerFilterRelParser(RelParserFactory & factory);
+void registerCrossRelParser(RelParserFactory & factory);
 
 void registerRelParsers()
 {
@@ -166,6 +167,7 @@ void registerRelParsers()
     registerAggregateParser(factory);
     registerProjectRelParser(factory);
     registerJoinRelParser(factory);
+    registerCrossRelParser(factory);
     registerFilterRelParser(factory);
 }
 }
diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp 
b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
index 66b796060..c59166ddd 100644
--- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
+++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
@@ -528,6 +528,7 @@ QueryPlanPtr SerializedPlanParser::parseOp(const 
substrait::Rel & rel, std::list
         case substrait::Rel::RelTypeCase::kSort:
         case substrait::Rel::RelTypeCase::kWindow:
         case substrait::Rel::RelTypeCase::kJoin:
+        case substrait::Rel::RelTypeCase::kCross:
         case substrait::Rel::RelTypeCase::kExpand: {
             auto op_parser = 
RelParserFactory::instance().getBuilder(rel.rel_type_case())(this);
             query_plan = op_parser->parseOp(rel, rel_stack);
diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.h 
b/cpp-ch/local-engine/Parser/SerializedPlanParser.h
index cdeb4bdd7..fbc22a41d 100644
--- a/cpp-ch/local-engine/Parser/SerializedPlanParser.h
+++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.h
@@ -79,6 +79,7 @@ private:
     friend class FunctionExecutor;
     friend class NonNullableColumnsResolver;
     friend class JoinRelParser;
+    friend class CrossRelParser;
     friend class MergeTreeRelParser;
     friend class ProjectRelParser;
 
diff --git a/cpp-ch/local-engine/local_engine_jni.cpp 
b/cpp-ch/local-engine/local_engine_jni.cpp
index 695fc8585..627e6154c 100644
--- a/cpp-ch/local-engine/local_engine_jni.cpp
+++ b/cpp-ch/local-engine/local_engine_jni.cpp
@@ -1121,13 +1121,12 @@ JNIEXPORT jlong 
Java_org_apache_gluten_vectorized_StorageJoinBuilder_nativeBuild
     const auto named_struct_a = local_engine::getByteArrayElementsSafe(env, 
named_struct);
     const std::string::size_type struct_size = named_struct_a.length();
     std::string struct_string{reinterpret_cast<const char 
*>(named_struct_a.elems()), struct_size};
-    const auto join_type = 
static_cast<substrait::JoinRel_JoinType>(join_type_);
     const jsize length = env->GetArrayLength(in);
     local_engine::ReadBufferFromByteArray read_buffer_from_java_array(in, 
length);
     DB::CompressedReadBuffer input(read_buffer_from_java_array);
     local_engine::configureCompressedReadBuffer(input);
     const auto * obj = 
make_wrapper(local_engine::BroadCastJoinBuilder::buildJoin(
-        hash_table_id, input, row_count_, join_key, join_type, 
has_mixed_join_condition, struct_string));
+        hash_table_id, input, row_count_, join_key, join_type_, 
has_mixed_join_condition, struct_string));
     return obj->instance();
     LOCAL_ENGINE_JNI_METHOD_END(env, 0)
 }
diff --git 
a/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto 
b/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto
index 0e51baf5a..3813de868 100644
--- a/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto
+++ b/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto
@@ -259,6 +259,7 @@ message CrossRel {
     JOIN_TYPE_OUTER = 2;
     JOIN_TYPE_LEFT = 3;
     JOIN_TYPE_RIGHT = 4;
+    JOIN_TYPE_LEFT_SEMI = 5;
   }
 
   substrait.extensions.AdvancedExtension advanced_extension = 10;
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index d15948637..8ddcc7b7f 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -144,7 +144,7 @@ trait BackendSettingsApi {
 
   def supportCartesianProductExec(): Boolean = false
 
-  def supportBroadcastNestedLoopJoinExec(): Boolean = false
+  def supportBroadcastNestedLoopJoinExec(): Boolean = true
 
   def supportSampleExec(): Boolean = false
 
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala
index 092612ea7..b90c1ad8b 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala
@@ -16,20 +16,22 @@
  */
 package org.apache.gluten.execution
 
+import org.apache.gluten.GlutenConfig
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.extension.ValidationResult
 import org.apache.gluten.metrics.MetricsUpdater
-import org.apache.gluten.substrait.SubstraitContext
+import org.apache.gluten.substrait.{JoinParams, SubstraitContext}
 import org.apache.gluten.utils.SubstraitUtil
 
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, 
BuildSide}
-import org.apache.spark.sql.catalyst.plans.{InnerLike, JoinType, LeftOuter, 
RightOuter}
+import org.apache.spark.sql.catalyst.plans.{FullOuter, InnerLike, JoinType, 
LeftExistence, LeftOuter, RightOuter}
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.joins.BaseJoinExec
 import org.apache.spark.sql.execution.metric.SQLMetric
 
+import com.google.protobuf.Any
 import io.substrait.proto.CrossRel
 
 abstract class BroadcastNestedLoopJoinExecTransformer(
@@ -49,7 +51,8 @@ abstract class BroadcastNestedLoopJoinExecTransformer(
   private lazy val substraitJoinType: CrossRel.JoinType =
     SubstraitUtil.toCrossRelSubstrait(joinType)
 
-  private lazy val buildTableId: String = "BuildTable-" + buildPlan.id
+  // Unique ID for builded table
+  lazy val buildBroadcastTableId: String = "BuiltBNLJBroadcastTable-" + 
buildPlan.id
 
   // Hint substrait to switch the left and right,
   // since we assume always build right side in substrait.
@@ -79,6 +82,10 @@ abstract class BroadcastNestedLoopJoinExecTransformer(
         left.output ++ right.output.map(_.withNullability(true))
       case RightOuter =>
         left.output.map(_.withNullability(true)) ++ right.output
+      case LeftExistence(_) =>
+        left.output
+      case FullOuter =>
+        left.output.map(_.withNullability(true)) ++ 
right.output.map(_.withNullability(true))
       case x =>
         throw new IllegalArgumentException(s"${getClass.getSimpleName} not 
take $x as the JoinType")
     }
@@ -103,6 +110,8 @@ abstract class BroadcastNestedLoopJoinExecTransformer(
       }
   }
 
+  def genJoinParameters(): Any = Any.getDefaultInstance
+
   override protected def doTransform(context: SubstraitContext): 
TransformContext = {
     val streamedPlanContext = 
streamedPlan.asInstanceOf[TransformSupport].transform(context)
     val (inputStreamedRelNode, inputStreamedOutput) =
@@ -113,6 +122,10 @@ abstract class BroadcastNestedLoopJoinExecTransformer(
       (buildPlanContext.root, buildPlanContext.outputAttributes)
 
     val operatorId = context.nextOperatorId(this.nodeName)
+    val joinParams = new JoinParams
+    if (condition.isDefined) {
+      joinParams.isWithCondition = true
+    }
 
     val crossRel = JoinUtils.createCrossRel(
       substraitJoinType,
@@ -122,14 +135,17 @@ abstract class BroadcastNestedLoopJoinExecTransformer(
       inputStreamedOutput,
       inputBuildOutput,
       context,
-      operatorId
+      operatorId,
+      genJoinParameters()
     )
 
+    context.registerJoinParam(operatorId, joinParams)
+
     val projectRelPostJoinRel = JoinUtils.createProjectRelPostJoinRel(
       needSwitchChildren,
       joinType,
-      inputStreamedOutput,
-      inputBuildOutput,
+      streamedPlan.output,
+      buildPlan.output,
       context,
       operatorId,
       crossRel,
@@ -145,18 +161,39 @@ abstract class BroadcastNestedLoopJoinExecTransformer(
       inputBuildOutput)
   }
 
+  def validateJoinTypeAndBuildSide(): ValidationResult = {
+    val result = joinType match {
+      case _: InnerLike | LeftOuter | RightOuter => ValidationResult.ok
+      case _ =>
+        ValidationResult.notOk(s"$joinType join is not supported with 
BroadcastNestedLoopJoin")
+    }
+
+    if (!result.isValid) {
+      return result
+    }
+
+    (joinType, buildSide) match {
+      case (LeftOuter, BuildLeft) | (RightOuter, BuildRight) =>
+        ValidationResult.notOk(s"$joinType join is not supported with 
$buildSide")
+      case _ => ValidationResult.ok // continue
+    }
+  }
+
   override protected def doValidateInternal(): ValidationResult = {
-    if (!BackendsApiManager.getSettings.supportBroadcastNestedLoopJoinExec()) {
-      return ValidationResult.notOk("Broadcast Nested Loop join is not 
supported in this backend")
+    if 
(!GlutenConfig.getConf.broadcastNestedLoopJoinTransformerTransformerEnabled) {
+      return ValidationResult.notOk(
+        s"Config 
${GlutenConfig.BROADCAST_NESTED_LOOP_JOIN_TRANSFORMER_ENABLED.key} not enabled")
     }
+
     if (substraitJoinType == CrossRel.JoinType.UNRECOGNIZED) {
       return ValidationResult.notOk(s"$joinType join is not supported with 
BroadcastNestedLoopJoin")
     }
-    (joinType, buildSide) match {
-      case (LeftOuter, BuildLeft) | (RightOuter, BuildRight) =>
-        return ValidationResult.notOk(s"$joinType join is not supported with 
$buildSide")
-      case _ => // continue
+
+    val validateResult = validateJoinTypeAndBuildSide()
+    if (!validateResult.isValid) {
+      return validateResult
     }
+
     val substraitContext = new SubstraitContext
 
     val crossRel = JoinUtils.createCrossRel(
@@ -168,6 +205,7 @@ abstract class BroadcastNestedLoopJoinExecTransformer(
       buildPlan.output,
       substraitContext,
       substraitContext.nextOperatorId(this.nodeName),
+      genJoinParameters(),
       validation = true
     )
     doNativeValidation(substraitContext, crossRel)
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/execution/JoinUtils.scala 
b/gluten-core/src/main/scala/org/apache/gluten/execution/JoinUtils.scala
index eb2c0bfd7..9dd73800e 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/execution/JoinUtils.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/execution/JoinUtils.scala
@@ -337,6 +337,7 @@ object JoinUtils {
       inputBuildOutput: Seq[Attribute],
       substraitContext: SubstraitContext,
       operatorId: java.lang.Long,
+      joinParameters: Any,
       validation: Boolean = false
   ): RelNode = {
     val expressionNode = condition.map {
@@ -346,7 +347,7 @@ object JoinUtils {
           .doTransform(substraitContext.registeredFunction)
     }
     val extensionNode =
-      JoinUtils.createExtensionNode(inputStreamedOutput ++ inputBuildOutput, 
validation)
+      createJoinExtensionNode(joinParameters, inputStreamedOutput ++ 
inputBuildOutput)
 
     RelBuilder.makeCrossRel(
       inputStreamedRelNode,
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
index 15fc8bea7..b7a30f7e1 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
@@ -20,10 +20,14 @@ import 
org.apache.gluten.extension.columnar.transition.{ColumnarToRowLike, Trans
 import org.apache.gluten.utils.{LogLevelUtil, PlanUtil}
 
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, 
BuildSide}
+import org.apache.spark.sql.catalyst.plans.{JoinType, LeftSemi}
 import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule}
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, 
BroadcastQueryStageExec}
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
BroadcastExchangeLike, ShuffleExchangeLike}
+import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec
 import org.apache.spark.sql.internal.SQLConf
 
 object MiscColumnarRules {
@@ -190,4 +194,22 @@ object MiscColumnarRules {
         child
     }
   }
+
+  // Remove unnecessary bnlj like sql:
+  //   ``` select l.* from l left semi join r; ```
+  // The result always is left table.
+  case class RemoveBroadcastNestedLoopJoin() extends Rule[SparkPlan] {
+    override def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
+      case BroadcastNestedLoopJoinExec(
+            left: SparkPlan,
+            right: SparkPlan,
+            buildSide: BuildSide,
+            joinType: JoinType,
+            condition: Option[Expression]) if condition.isEmpty && joinType == 
LeftSemi =>
+        buildSide match {
+          case BuildLeft => right
+          case BuildRight => left
+        }
+    }
+  }
 }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
index 959bf808a..b6236ae9a 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
@@ -147,8 +147,6 @@ object Validators {
       case p: SortAggregateExec if !settings.replaceSortAggWithHashAgg =>
         fail(p)
       case p: CartesianProductExec if !settings.supportCartesianProductExec() 
=> fail(p)
-      case p: BroadcastNestedLoopJoinExec if 
!settings.supportBroadcastNestedLoopJoinExec() =>
-        fail(p)
       case p: TakeOrderedAndProjectExec if 
!settings.supportColumnarShuffleExec() => fail(p)
       case _ => pass()
     }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/utils/SubstraitUtil.scala 
b/gluten-core/src/main/scala/org/apache/gluten/utils/SubstraitUtil.scala
index 9671c7a6b..e8e7ce06f 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/utils/SubstraitUtil.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/utils/SubstraitUtil.scala
@@ -48,6 +48,10 @@ object SubstraitUtil {
       // the left and right relations are exchanged and the
       // join type is reverted.
       CrossRel.JoinType.JOIN_TYPE_LEFT
+    case LeftSemi =>
+      CrossRel.JoinType.JOIN_TYPE_LEFT_SEMI
+    case FullOuter =>
+      CrossRel.JoinType.JOIN_TYPE_OUTER
     case _ =>
       CrossRel.JoinType.UNRECOGNIZED
   }
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
 
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
index df1c87cb0..4da7a2f6f 100644
--- 
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
+++ 
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
@@ -27,7 +27,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.Statistics
-import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, 
BroadcastPartitioning, IdentityBroadcastMode, Partitioning}
+import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, 
BroadcastPartitioning, Partitioning}
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
BroadcastExchangeLike}
 import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.internal.SQLConf
@@ -134,13 +134,6 @@ case class ColumnarBroadcastExchangeExec(mode: 
BroadcastMode, child: SparkPlan)
   }
 
   override protected def doValidateInternal(): ValidationResult = {
-    // CH backend does not support IdentityBroadcastMode used in BNLJ
-    if (
-      mode == IdentityBroadcastMode && !BackendsApiManager.getSettings
-        .supportBroadcastNestedLoopJoinExec()
-    ) {
-      return ValidationResult.notOk("This backend does not support 
IdentityBroadcastMode and BNLJ")
-    }
     BackendsApiManager.getValidatorApiInstance
       .doSchemaValidate(schema)
       .map {
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
index 6860d6a12..e724cf31c 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
@@ -110,7 +110,7 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with 
AdaptiveSparkPlanHelp
           execution.get.fallbackNodeToReason.head._2
             .contains("FullOuter join is not supported with 
BroadcastNestedLoopJoin"))
       } else {
-        assert(execution.get.numFallbackNodes == 2)
+        assert(execution.get.numFallbackNodes == 0)
       }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to