This is an automated email from the ASF dual-hosted git repository.

zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new b83bab99bd [GLUTEN-7180][CH] Fix ut `Eliminate NAAJ when BuildSide is 
HashedRelationWithAllNullKeys` for the CH backend when the aqe is on (#7181)
b83bab99bd is described below

commit b83bab99bd875408104cb3685b4f6e9cd9a3b841
Author: Zhichao Zhang <[email protected]>
AuthorDate: Wed Sep 11 10:12:47 2024 +0800

    [GLUTEN-7180][CH] Fix ut `Eliminate NAAJ when BuildSide is 
HashedRelationWithAllNullKeys` for the CH backend when the aqe is on (#7181)
    
    Fix ut Eliminate NAAJ when BuildSide is HashedRelationWithAllNullKeys for 
the CH backend when the aqe is on
    
    Close #7180.
---
 .../apache/gluten/vectorized/CHNativeBlock.java    |  6 +++
 .../gluten/vectorized/StorageJoinBuilder.java      |  9 ++--
 .../gluten/backendsapi/clickhouse/CHBackend.scala  |  4 ++
 .../gluten/backendsapi/clickhouse/CHRuleApi.scala  |  1 +
 .../clickhouse/CHSparkPlanExecApi.scala            | 36 ++++++++++++--
 .../extension/CHAQEPropagateEmptyRelation.scala    | 57 ++++++++++++++++++++++
 .../org/apache/gluten/vectorized/BlockStats.java   | 43 ++++++----------
 .../commands/GlutenCHCacheDataCommand.scala        |  6 +--
 .../joins/ClickHouseBuildSideRelation.scala        |  6 ++-
 .../spark/sql/execution/utils/CHExecUtil.scala     | 36 ++++++++++----
 .../GlutenClickHouseTPCHSaltNullParquetSuite.scala | 49 +++++++++++++++++++
 .../benchmarks/CHHashBuildBenchmark.scala          |  9 ++--
 cpp-ch/local-engine/Join/BroadCastJoinBuilder.cpp  |  6 ++-
 cpp-ch/local-engine/Join/BroadCastJoinBuilder.h    |  3 +-
 .../Join/StorageJoinFromReadBuffer.cpp             | 38 +++------------
 .../local-engine/Join/StorageJoinFromReadBuffer.h  |  3 +-
 cpp-ch/local-engine/local_engine_jni.cpp           | 46 +++++++++++++++--
 .../utils/clickhouse/ClickHouseTestSettings.scala  |  2 +-
 .../ClickHouseAdaptiveQueryExecSuite.scala         | 23 +++++++--
 .../utils/clickhouse/ClickHouseTestSettings.scala  |  2 +-
 .../ClickHouseAdaptiveQueryExecSuite.scala         | 24 +++++++--
 .../utils/clickhouse/ClickHouseTestSettings.scala  |  2 +-
 .../utils/clickhouse/ClickHouseTestSettings.scala  |  2 +-
 .../ClickHouseAdaptiveQueryExecSuite.scala         | 23 +++++++--
 24 files changed, 324 insertions(+), 112 deletions(-)

diff --git 
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeBlock.java
 
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeBlock.java
index e3c51ae285..c8cce61b4f 100644
--- 
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeBlock.java
+++ 
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeBlock.java
@@ -71,6 +71,12 @@ public class CHNativeBlock {
 
   public native void nativeClose(long blockAddress);
 
+  public native BlockStats nativeBlockStats(long blockAddress, int 
columnPosition);
+
+  public BlockStats getBlockStats(int columnPosition) {
+    return nativeBlockStats(blockAddress, columnPosition);
+  }
+
   public void close() {
     if (blockAddress != 0) {
       nativeClose(blockAddress);
diff --git 
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/StorageJoinBuilder.java
 
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/StorageJoinBuilder.java
index d53ff17188..d417fa1a9d 100644
--- 
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/StorageJoinBuilder.java
+++ 
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/StorageJoinBuilder.java
@@ -48,7 +48,8 @@ public class StorageJoinBuilder {
       boolean hasMixedFiltCondition,
       boolean isExistenceJoin,
       byte[] namedStruct,
-      boolean isNullAwareAntiJoin);
+      boolean isNullAwareAntiJoin,
+      boolean hasNullKeyValues);
 
   private StorageJoinBuilder() {}
 
@@ -58,7 +59,8 @@ public class StorageJoinBuilder {
       long rowCount,
       BroadCastHashJoinContext broadCastContext,
       List<Expression> newBuildKeys,
-      List<Attribute> newOutput) {
+      List<Attribute> newOutput,
+      boolean hasNullKeyValues) {
     ConverterUtils$ converter = ConverterUtils$.MODULE$;
     List<Expression> keys;
     List<Attribute> output;
@@ -96,7 +98,8 @@ public class StorageJoinBuilder {
         broadCastContext.hasMixedFiltCondition(),
         broadCastContext.isExistenceJoin(),
         toNameStruct(output).toByteArray(),
-        broadCastContext.isNullAwareAntiJoin());
+        broadCastContext.isNullAwareAntiJoin(),
+        hasNullKeyValues);
   }
 
   /** create table named struct */
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index 163f7568f7..69ea899c42 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -133,6 +133,10 @@ object CHBackendSettings extends BackendSettingsApi with 
Logging {
       ".runtime_config.max_source_concatenate_bytes"
   val GLUTEN_MAX_SHUFFLE_READ_BYTES_DEFAULT = GLUTEN_MAX_BLOCK_SIZE_DEFAULT * 
256
 
+  val GLUTEN_AQE_PROPAGATEEMPTY: String =
+    GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME +
+      ".aqe.propagate.empty.relation"
+
   def affinityMode: String = {
     SparkEnv.get.conf
       .get(
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
index 61fed3f999..a83d349cdb 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
@@ -44,6 +44,7 @@ private object CHRuleApi {
   def injectSpark(injector: SparkInjector): Unit = {
     // Regular Spark rules.
     
injector.injectQueryStagePrepRule(FallbackBroadcastHashJoinPrepQueryStage.apply)
+    injector.injectQueryStagePrepRule(spark => 
CHAQEPropagateEmptyRelation(spark))
     injector.injectParser(
       (spark, parserInterface) => new GlutenCacheFilesSqlParser(spark, 
parserInterface))
     injector.injectParser(
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index a8996c4d2e..1108b8b3c5 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -474,12 +474,12 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with 
Logging {
       numOutputRows: SQLMetric,
       dataSize: SQLMetric): BuildSideRelation = {
 
-    val buildKeys: Seq[Expression] = mode match {
+    val (buildKeys, isNullAware) = mode match {
       case mode1: HashedRelationBroadcastMode =>
-        mode1.key
+        (mode1.key, mode1.isNullAware)
       case _ =>
         // IdentityBroadcastMode
-        Seq.empty
+        (Seq.empty, false)
     }
 
     val (newChild, newOutput, newBuildKeys) =
@@ -532,8 +532,27 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with 
Logging {
         }
         (newChild, (child.output ++ appendedProjections).map(_.toAttribute), 
preProjectionBuildKeys)
       }
+
+    // find the key index in the output
+    val keyColumnIndex = if (isNullAware) {
+      def findKeyOrdinal(key: Expression, output: Seq[Attribute]): Int = {
+        key match {
+          case b: BoundReference => b.ordinal
+          case n: NamedExpression =>
+            output.indexWhere(o => (o.name.equals(n.name) && o.exprId == 
n.exprId))
+          case _ => throw new GlutenException(s"Cannot find $key in the 
child's output: $output")
+        }
+      }
+      if (newBuildKeys.isEmpty) {
+        findKeyOrdinal(buildKeys(0), newOutput)
+      } else {
+        findKeyOrdinal(newBuildKeys(0), newOutput)
+      }
+    } else {
+      0
+    }
     val countsAndBytes =
-      CHExecUtil.buildSideRDD(dataSize, newChild).collect
+      CHExecUtil.buildSideRDD(dataSize, newChild, isNullAware, 
keyColumnIndex).collect
 
     val batches = countsAndBytes.map(_._2)
     val totalBatchesSize = batches.map(_.length).sum
@@ -548,8 +567,15 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with 
Logging {
           s" written bytes is correct.")
     }
     val rowCount = countsAndBytes.map(_._1).sum
+    val hasNullKeyValues = 
countsAndBytes.map(_._3).foldLeft[Boolean](false)((b, a) => { b || a })
     numOutputRows += rowCount
-    ClickHouseBuildSideRelation(mode, newOutput, batches.flatten, rowCount, 
newBuildKeys)
+    ClickHouseBuildSideRelation(
+      mode,
+      newOutput,
+      batches.flatten,
+      rowCount,
+      newBuildKeys,
+      hasNullKeyValues)
   }
 
   /** Define backend specfic expression mappings. */
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CHAQEPropagateEmptyRelation.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CHAQEPropagateEmptyRelation.scala
new file mode 100644
index 0000000000..6f5afa9726
--- /dev/null
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CHAQEPropagateEmptyRelation.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension
+
+import org.apache.gluten.backendsapi.clickhouse.CHBackendSettings
+import org.apache.gluten.utils.PhysicalPlanSelector
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.plans.LeftAnti
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{ColumnarBroadcastExchangeExec, 
LocalTableScanExec, SparkPlan}
+import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec
+import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
+import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, 
ClickHouseBuildSideRelation}
+
+case class CHAQEPropagateEmptyRelation(session: SparkSession) extends 
Rule[SparkPlan] {
+
+  def apply(plan: SparkPlan): SparkPlan = PhysicalPlanSelector.maybe(session, 
plan) {
+    if (!(session.conf.get(CHBackendSettings.GLUTEN_AQE_PROPAGATEEMPTY, 
"true").toBoolean)) {
+      plan
+    } else {
+      plan.transform {
+        case bhj @ BroadcastHashJoinExec(_, _, joinType, _, _, left, right, 
isNullAwareAntiJoin)
+            if (joinType == LeftAnti) && isNullAwareAntiJoin =>
+          right match {
+            case BroadcastQueryStageExec(_, plan: SparkPlan, _) =>
+              val columnarBroadcast = plan match {
+                case c: ColumnarBroadcastExchangeExec => c
+                case ReusedExchangeExec(_, c: ColumnarBroadcastExchangeExec) 
=> c
+              }
+              val chBuildSideRelation = 
columnarBroadcast.relationFuture.get().value
+              chBuildSideRelation match {
+                case c: ClickHouseBuildSideRelation if c.hasNullKeyValues =>
+                  LocalTableScanExec(bhj.output, Seq.empty)
+                case _ => bhj
+              }
+            case o => bhj
+          }
+        case other => other
+      }
+    }
+  }
+}
diff --git a/cpp-ch/local-engine/Join/BroadCastJoinBuilder.h 
b/backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/BlockStats.java
similarity index 52%
copy from cpp-ch/local-engine/Join/BroadCastJoinBuilder.h
copy to 
backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/BlockStats.java
index d089d7420a..e47454cbed 100644
--- a/cpp-ch/local-engine/Join/BroadCastJoinBuilder.h
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/BlockStats.java
@@ -14,37 +14,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#pragma once
-#include <memory>
-#include <jni.h>
-#include <substrait/algebra.pb.h>
+package org.apache.gluten.vectorized;
 
-namespace DB
-{
-class ReadBuffer;
-}
-
-namespace local_engine
-{
-class StorageJoinFromReadBuffer;
-namespace BroadCastJoinBuilder
-{
+public class BlockStats {
+  private final long blockRecordCount;
+  private final boolean hasNullKeyValues;
 
-std::shared_ptr<StorageJoinFromReadBuffer> buildJoin(
-    const std::string & key,
-    DB::ReadBuffer & input,
-    jlong row_count,
-    const std::string & join_keys,
-    jint join_type,
-    bool has_mixed_join_condition,
-    bool is_existence_join,
-    const std::string & named_struct,
-    bool is_null_aware_anti_join);
-void cleanBuildHashTable(const std::string & hash_table_id, jlong instance);
-std::shared_ptr<StorageJoinFromReadBuffer> getJoin(const std::string & 
hash_table_id);
+  public BlockStats(long blockRecordCount, boolean hasNullKeyValues) {
+    this.blockRecordCount = blockRecordCount;
+    this.hasNullKeyValues = hasNullKeyValues;
+  }
 
+  public long getBlockRecordCount() {
+    return blockRecordCount;
+  }
 
-void init(JNIEnv *);
-void destroy(JNIEnv *);
-}
+  public boolean isHasNullKeyValues() {
+    return hasNullKeyValues;
+  }
 }
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala
index bb3cb5acce..43e3b4b7ab 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala
@@ -43,7 +43,7 @@ import scala.concurrent.Future
 case class GlutenCHCacheDataCommand(
     onlyMetaCache: Boolean,
     asynExecute: Boolean,
-    selectedColuman: Option[Seq[String]],
+    selectedColumn: Option[Seq[String]],
     path: Option[String],
     table: Option[TableIdentifier],
     tsfilter: Option[String],
@@ -93,8 +93,8 @@ case class GlutenCHCacheDataCommand(
         "a Delta table? Refusing to garbage collect.")
 
     val allColumns = snapshot.dataSchema.fieldNames.toSeq
-    val selectedColumns = if (selectedColuman.nonEmpty) {
-      selectedColuman.get
+    val selectedColumns = if (selectedColumn.nonEmpty) {
+      selectedColumn.get
         .filter(allColumns.contains(_))
         .map(ConverterUtils.normalizeColName)
         .toSeq
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/joins/ClickHouseBuildSideRelation.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/joins/ClickHouseBuildSideRelation.scala
index 862a8b4200..92887f16d7 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/joins/ClickHouseBuildSideRelation.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/joins/ClickHouseBuildSideRelation.scala
@@ -35,7 +35,8 @@ case class ClickHouseBuildSideRelation(
     output: Seq[Attribute],
     batches: Array[Byte],
     numOfRows: Long,
-    newBuildKeys: Seq[Expression] = Seq.empty)
+    newBuildKeys: Seq[Expression] = Seq.empty,
+    hasNullKeyValues: Boolean = false)
   extends BuildSideRelation
   with Logging {
 
@@ -58,7 +59,8 @@ case class ClickHouseBuildSideRelation(
           numOfRows,
           broadCastContext,
           newBuildKeys.asJava,
-          output.asJava)
+          output.asJava,
+          hasNullKeyValues)
         (hashTableData, this)
       } else {
         (StorageJoinBuilder.nativeCloneBuildHashTable(hashTableData), null)
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala
index 4496d893fc..6d91108d22 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala
@@ -58,10 +58,13 @@ object CHExecUtil extends Logging {
   def toBytes(
       dataSize: SQLMetric,
       iter: Iterator[ColumnarBatch],
+      isNullAware: Boolean = false,
+      keyColumnIndex: Int = 0,
       compressionCodec: Option[String] = Some("lz4"),
       compressionLevel: Option[Int] = None,
-      bufferSize: Int = 4 << 10): Iterator[(Int, Array[Byte])] = {
-    var count = 0
+      bufferSize: Int = 4 << 10): Iterator[(Long, Array[Byte], Boolean)] = {
+    var count = 0L
+    var hasNullKeyValues = false
     val bos = new ByteArrayOutputStream()
     val buffer = new Array[Byte](bufferSize) // 4K
     val level = compressionLevel.getOrElse(Int.MinValue)
@@ -69,20 +72,35 @@ object CHExecUtil extends Logging {
       compressionCodec
         .map(new BlockOutputStream(bos, buffer, dataSize, true, _, level, 
bufferSize))
         .getOrElse(new BlockOutputStream(bos, buffer, dataSize, false, "", 
level, bufferSize))
-    while (iter.hasNext) {
-      val batch = iter.next()
-      count += batch.numRows
-      blockOutputStream.write(batch)
+    if (isNullAware) {
+      while (iter.hasNext) {
+        val batch = iter.next()
+        val blockStats = 
CHNativeBlock.fromColumnarBatch(batch).getBlockStats(keyColumnIndex)
+        count += blockStats.getBlockRecordCount
+        hasNullKeyValues = hasNullKeyValues || blockStats.isHasNullKeyValues
+        blockOutputStream.write(batch)
+      }
+    } else {
+      while (iter.hasNext) {
+        val batch = iter.next()
+        count += batch.numRows()
+        blockOutputStream.write(batch)
+      }
     }
     blockOutputStream.flush()
     blockOutputStream.close()
-    Iterator((count, bos.toByteArray))
+    Iterator((count, bos.toByteArray, hasNullKeyValues))
   }
 
-  def buildSideRDD(dataSize: SQLMetric, newChild: SparkPlan): RDD[(Int, 
Array[Byte])] = {
+  def buildSideRDD(
+      dataSize: SQLMetric,
+      newChild: SparkPlan,
+      isNullAware: Boolean,
+      keyColumnIndex: Int
+  ): RDD[(Long, Array[Byte], Boolean)] = {
     newChild
       .executeColumnar()
-      .mapPartitionsInternal(iter => toBytes(dataSize, iter))
+      .mapPartitionsInternal(iter => toBytes(dataSize, iter, isNullAware, 
keyColumnIndex))
   }
 
   private def buildRangePartitionSampleRDD(
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala
index 49697872e8..f7cf0de376 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala
@@ -2935,6 +2935,55 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends 
GlutenClickHouseTPCHAbstr
       df => {
         checkBHJWithIsNullAwareAntiJoin(df)
       })
+
+    withSQLConf(("spark.sql.adaptive.enabled", "true")) {
+      def checkAQEBHJWithIsNullAwareAntiJoin(df: DataFrame, isNullAwareBhjCnt: 
Int = 1): Unit = {
+        val bhjs = collect(df.queryExecution.executedPlan) {
+          case bhj: CHBroadcastHashJoinExecTransformer if 
bhj.isNullAwareAntiJoin => true
+        }
+        assert(bhjs.size == isNullAwareBhjCnt)
+      }
+
+      val sql6 =
+        s"""
+           |select * from partsupp
+           |where
+           |ps_suppkey NOT IN (SELECT suppkey FROM VALUES (null), (6) 
sub(suppkey))
+           |""".stripMargin
+      compareResultsAgainstVanillaSpark(
+        sql6,
+        true,
+        df => {
+          checkAQEBHJWithIsNullAwareAntiJoin(df, 0)
+        })
+
+      val sql7 =
+        s"""
+           |select * from partsupp
+           |where
+           |cast(ps_suppkey AS INT) NOT IN (SELECT suppkey FROM VALUES (null), 
(6) sub(suppkey))
+           |""".stripMargin
+      compareResultsAgainstVanillaSpark(
+        sql7,
+        true,
+        df => {
+          checkAQEBHJWithIsNullAwareAntiJoin(df, 0)
+        })
+
+      val sql8 =
+        s"""
+           |select * from partsupp
+           |where
+           |ps_suppkey NOT IN (SELECT suppkey FROM VALUES (5), (6) 
sub(suppkey))
+           |""".stripMargin
+      compareResultsAgainstVanillaSpark(
+        sql8,
+        true,
+        df => {
+          checkAQEBHJWithIsNullAwareAntiJoin(df)
+        })
+    }
+
   }
 
   test("soundex") {
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHHashBuildBenchmark.scala
 
b/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHHashBuildBenchmark.scala
index 87c389a651..2c36b404ec 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHHashBuildBenchmark.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHHashBuildBenchmark.scala
@@ -58,7 +58,7 @@ object CHHashBuildBenchmark extends SqlBasedBenchmark with 
CHSqlBasedBenchmark w
                                  |select $scanSchema from parquet.`$parquetDir`
                                  |
                                  |""".stripMargin)
-    val rowCount: Int = chParquet.count().toInt
+    val rowCount = chParquet.count()
 
     val runs = Seq(1, 2, 4, 8, 16, 32, 64).reverse
       .map(num => rowCount / num)
@@ -79,13 +79,14 @@ object CHHashBuildBenchmark extends SqlBasedBenchmark with 
CHSqlBasedBenchmark w
             s"build hash table with $num rows with $iteration hash tables",
             executedCnt) {
             _ =>
-              for (i <- 0 until iteration) {
+              for (i <- 0L until iteration) {
                 val table = StorageJoinBuilder.build(
                   bytes,
                   num,
                   relation,
                   new util.ArrayList[Expression](),
-                  new util.ArrayList[Attribute]())
+                  new util.ArrayList[Attribute](),
+                  false)
                 StorageJoinBuilder.nativeCleanBuildHashTable("", table)
               }
           }
@@ -94,7 +95,7 @@ object CHHashBuildBenchmark extends SqlBasedBenchmark with 
CHSqlBasedBenchmark w
   }
 
   private def createBroadcastRelation(
-      child: SparkPlan): (Array[Byte], Int, BroadCastHashJoinContext) = {
+      child: SparkPlan): (Array[Byte], Long, BroadCastHashJoinContext) = {
     val dataSize = SQLMetrics.createSizeMetric(spark.sparkContext, "size of 
files read")
 
     val countsAndBytes = child
diff --git a/cpp-ch/local-engine/Join/BroadCastJoinBuilder.cpp 
b/cpp-ch/local-engine/Join/BroadCastJoinBuilder.cpp
index 0b88137b32..5f20c17bfb 100644
--- a/cpp-ch/local-engine/Join/BroadCastJoinBuilder.cpp
+++ b/cpp-ch/local-engine/Join/BroadCastJoinBuilder.cpp
@@ -116,7 +116,8 @@ std::shared_ptr<StorageJoinFromReadBuffer> buildJoin(
     bool has_mixed_join_condition,
     bool is_existence_join,
     const std::string & named_struct,
-    bool is_null_aware_anti_join)
+    bool is_null_aware_anti_join,
+    bool has_null_key_values)
 {
     auto join_key_list = Poco::StringTokenizer(join_keys, ",");
     Names key_names;
@@ -193,7 +194,8 @@ std::shared_ptr<StorageJoinFromReadBuffer> buildJoin(
         ConstraintsDescription(),
         key,
         true,
-        is_null_aware_anti_join);
+        is_null_aware_anti_join,
+        has_null_key_values);
 }
 
 void init(JNIEnv * env)
diff --git a/cpp-ch/local-engine/Join/BroadCastJoinBuilder.h 
b/cpp-ch/local-engine/Join/BroadCastJoinBuilder.h
index d089d7420a..52db2e7e75 100644
--- a/cpp-ch/local-engine/Join/BroadCastJoinBuilder.h
+++ b/cpp-ch/local-engine/Join/BroadCastJoinBuilder.h
@@ -39,7 +39,8 @@ std::shared_ptr<StorageJoinFromReadBuffer> buildJoin(
     bool has_mixed_join_condition,
     bool is_existence_join,
     const std::string & named_struct,
-    bool is_null_aware_anti_join);
+    bool is_null_aware_anti_join,
+    bool has_null_key_values);
 void cleanBuildHashTable(const std::string & hash_table_id, jlong instance);
 std::shared_ptr<StorageJoinFromReadBuffer> getJoin(const std::string & 
hash_table_id);
 
diff --git a/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.cpp 
b/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.cpp
index 90baa754f3..834ba97815 100644
--- a/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.cpp
+++ b/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.cpp
@@ -74,9 +74,11 @@ StorageJoinFromReadBuffer::StorageJoinFromReadBuffer(
     const ConstraintsDescription & constraints,
     const String & comment,
     const bool overwrite_,
-    bool is_null_aware_anti_join_)
-    : key_names(key_names_), use_nulls(use_nulls_), row_count(row_count_), 
overwrite(overwrite_), is_null_aware_anti_join(is_null_aware_anti_join_)
+    bool is_null_aware_anti_join_,
+    bool has_null_key_values_)
+    : key_names(key_names_), use_nulls(use_nulls_), row_count(row_count_), 
overwrite(overwrite_), is_null_aware_anti_join(is_null_aware_anti_join_), 
has_null_key_value(has_null_key_values_)
 {
+    is_empty_hash_table = row_count < 1;
     storage_metadata.setColumns(columns);
     storage_metadata.setConstraints(constraints);
     storage_metadata.setComment(comment);
@@ -106,36 +108,8 @@ void StorageJoinFromReadBuffer::buildJoin(Blocks & data, 
const Block header, std
     auto build_join = [&]
     {
         join = std::make_shared<HashJoin>(analyzed_join, header, overwrite, 
row_count);
-        // only when is_null_aware_anti_join is true, it needs to check 
whether is null key value exists
-        if (is_null_aware_anti_join)
-        {
-            is_empty_hash_table = data.empty();
-            size_t total_size = 0;
-            for (Block block : data)
-            {
-                for (size_t i = 0; i < block.columns(); ++i)
-                {
-                    const auto & column = block.getByPosition(i);
-                    if (column.name == key_names.at(0))
-                    {
-                        if (const auto * nullable = 
checkAndGetColumn<ColumnNullable>(column.column.get()))
-                        {
-                            const auto & null_map_data = 
nullable->getNullMapData();
-                            // check whether there is null key value
-                            has_null_key_value = 
!DB::memoryIsZero(null_map_data.data(), 0, null_map_data.size());
-                        }
-                    }
-                }
-                total_size += block.rows();
-                join->addBlockToJoin(std::move(block), true);
-            }
-            is_empty_hash_table = (total_size < 1);
-        }
-        else
-        {
-            for (Block block : data)
-                join->addBlockToJoin(std::move(block), true);
-        }
+        for (Block block : data)
+            join->addBlockToJoin(std::move(block), true);
     };
     /// Record memory usage in Total Memory Tracker
     ThreadFromGlobalPoolNoTracingContextPropagation thread(build_join);
diff --git a/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.h 
b/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.h
index 8c9416b544..e000952a45 100644
--- a/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.h
+++ b/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.h
@@ -47,7 +47,8 @@ public:
         const DB::ConstraintsDescription & constraints_,
         const String & comment,
         bool overwrite_,
-        bool is_null_aware_anti_join_);
+        bool is_null_aware_anti_join_,
+        bool has_null_key_values_);
 
     bool has_null_key_value = false;
     bool is_empty_hash_table = false;
diff --git a/cpp-ch/local-engine/local_engine_jni.cpp 
b/cpp-ch/local-engine/local_engine_jni.cpp
index 15c721be34..79c777eddc 100644
--- a/cpp-ch/local-engine/local_engine_jni.cpp
+++ b/cpp-ch/local-engine/local_engine_jni.cpp
@@ -110,6 +110,9 @@ static jmethodID block_stripes_constructor;
 static jclass split_result_class;
 static jmethodID split_result_constructor;
 
+static jclass block_stats_class;
+static jmethodID block_stats_constructor;
+
 JNIEXPORT jint JNI_OnLoad(JavaVM * vm, void * /*reserved*/)
 {
     JNIEnv * env;
@@ -127,6 +130,9 @@ JNIEXPORT jint JNI_OnLoad(JavaVM * vm, void * /*reserved*/)
     split_result_class = local_engine::CreateGlobalClassReference(env, 
"Lorg/apache/gluten/vectorized/CHSplitResult;");
     split_result_constructor = local_engine::GetMethodID(env, 
split_result_class, "<init>", "(JJJJJJ[J[JJJJ)V");
 
+    block_stats_class = local_engine::CreateGlobalClassReference(env, 
"Lorg/apache/gluten/vectorized/BlockStats;");
+    block_stats_constructor = local_engine::GetMethodID(env, 
block_stats_class, "<init>", "(JZ)V");
+
     local_engine::ShuffleReader::input_stream_class
         = local_engine::CreateGlobalClassReference(env, 
"Lorg/apache/gluten/vectorized/ShuffleInputStream;");
     local_engine::NativeSplitter::iterator_class
@@ -184,6 +190,7 @@ JNIEXPORT void JNI_OnUnload(JavaVM * vm, void * 
/*reserved*/)
     env->DeleteGlobalRef(spark_row_info_class);
     env->DeleteGlobalRef(block_stripes_class);
     env->DeleteGlobalRef(split_result_class);
+    env->DeleteGlobalRef(block_stats_class);
     env->DeleteGlobalRef(local_engine::ShuffleReader::input_stream_class);
     env->DeleteGlobalRef(local_engine::NativeSplitter::iterator_class);
     
env->DeleteGlobalRef(local_engine::WriteBufferFromJavaOutputStream::output_stream_class);
@@ -332,8 +339,8 @@ 
Java_org_apache_gluten_vectorized_CHColumnVector_nativeHasNull(JNIEnv * env, job
     else
     {
         const auto * nullable = 
checkAndGetColumn<DB::ColumnNullable>(&*col.column);
-        size_t num_nulls = std::accumulate(nullable->getNullMapData().begin(), 
nullable->getNullMapData().end(), 0);
-        return num_nulls < block->rows();
+        const auto & null_map_data = nullable->getNullMapData();
+        return !DB::memoryIsZero(null_map_data.data(), 0, 
null_map_data.size());
     }
     LOCAL_ENGINE_JNI_METHOD_END(env, false)
 }
@@ -506,6 +513,35 @@ JNIEXPORT jlong 
Java_org_apache_gluten_vectorized_CHNativeBlock_nativeTotalBytes
     LOCAL_ENGINE_JNI_METHOD_END(env, -1)
 }
 
+JNIEXPORT jobject 
Java_org_apache_gluten_vectorized_CHNativeBlock_nativeBlockStats(JNIEnv * env, 
jobject obj, jlong block_address, jint column_position)
+{
+    LOCAL_ENGINE_JNI_METHOD_START
+    DB::Block * block = reinterpret_cast<DB::Block *>(block_address);
+    auto col = getColumnFromColumnVector(env, obj, block_address, 
column_position);
+    if (!col.column->isNullable())
+    {
+        jobject block_stats = env->NewObject(
+            block_stats_class,
+            block_stats_constructor,
+            block->rows(),
+            false);
+        return block_stats;
+    }
+    else
+    {
+        const auto * nullable = 
checkAndGetColumn<DB::ColumnNullable>(&*col.column);
+        const auto & null_map_data = nullable->getNullMapData();
+
+        jobject block_stats = env->NewObject(
+            block_stats_class,
+            block_stats_constructor,
+            block->rows(),
+            !DB::memoryIsZero(null_map_data.data(), 0, null_map_data.size()));
+        return block_stats;
+    }
+    LOCAL_ENGINE_JNI_METHOD_END(env, nullptr)
+}
+
 JNIEXPORT jlong 
Java_org_apache_gluten_vectorized_CHStreamReader_createNativeShuffleReader(
     JNIEnv * env, jclass /*clazz*/, jobject input_stream, jboolean compressed, 
jlong max_shuffle_read_rows, jlong max_shuffle_read_bytes)
 {
@@ -1065,7 +1101,8 @@ JNIEXPORT jlong 
Java_org_apache_gluten_vectorized_StorageJoinBuilder_nativeBuild
     jboolean has_mixed_join_condition,
     jboolean is_existence_join,
     jbyteArray named_struct,
-    jboolean is_null_aware_anti_join)
+    jboolean is_null_aware_anti_join,
+    jboolean has_null_key_values)
 {
     LOCAL_ENGINE_JNI_METHOD_START
     const auto hash_table_id = jstring2string(env, key);
@@ -1086,7 +1123,8 @@ JNIEXPORT jlong 
Java_org_apache_gluten_vectorized_StorageJoinBuilder_nativeBuild
         has_mixed_join_condition,
         is_existence_join,
         struct_string,
-        is_null_aware_anti_join));
+        is_null_aware_anti_join,
+        has_null_key_values));
     return obj->instance();
     LOCAL_ENGINE_JNI_METHOD_END(env, 0)
 }
diff --git 
a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
 
b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
index 6d5083dbe2..b527a8ec1f 100644
--- 
a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
+++ 
b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
@@ -1157,11 +1157,11 @@ class ClickHouseTestSettings extends 
BackendTestSettings {
       "Avoid changing merge join to broadcast join if too many empty 
partitions on build plan")
     .exclude("SPARK-29544: adaptive skew join with different join types")
     .exclude("SPARK-34682: AQEShuffleReadExec operating on canonicalized plan")
+    .exclude("SPARK-32717: AQEOptimizer should respect excludedRules 
configuration")
     .exclude("metrics of the shuffle read")
     .exclude("SPARK-31220, SPARK-32056: repartition by expression with AQE")
     .exclude("SPARK-31220, SPARK-32056: repartition by range with AQE")
     .exclude("SPARK-31220, SPARK-32056: repartition using sql and hint with 
AQE")
-    .exclude("SPARK-32573: Eliminate NAAJ when BuildSide is 
HashedRelationWithAllNullKeys")
     .exclude("SPARK-32753: Only copy tags to node with no tags")
     .exclude("Logging plan changes for AQE")
     .exclude("SPARK-33551: Do not use AQE shuffle read for repartition")
diff --git 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
index d59a7d1ff9..6d3c3e865d 100644
--- 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
+++ 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
@@ -800,10 +800,25 @@ class ClickHouseAdaptiveQueryExecSuite extends 
AdaptiveQueryExecSuite with Glute
     }
   }
 
-  // because gluten use columnar format, which cannot execute to get 
rowIterator, then get the key
-  // null status
-  ignore(
-    GLUTEN_TEST + "SPARK-32573: Eliminate NAAJ when BuildSide is 
HashedRelationWithAllNullKeys") {}
+  testGluten("SPARK-32717: AQEOptimizer should respect excludedRules 
configuration") {
+    withSQLConf(
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> Long.MaxValue.toString,
+      // This test is a copy of test(SPARK-32573), in order to test the 
configuration
+      // `spark.sql.adaptive.optimizer.excludedRules` works as expect.
+      "spark.gluten.sql.columnar.backend.ch.aqe.propagate.empty.relation" -> 
"false"
+    ) {
+      val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
+        "SELECT * FROM testData2 t1 WHERE t1.b NOT IN (SELECT b FROM 
testData3)")
+      val bhj = findTopLevelBroadcastHashJoin(plan)
+      assert(bhj.size == 1)
+      val join = findTopLevelBaseJoin(adaptivePlan)
+      // this is different compares to test(SPARK-32573) due to the rule
+      // `EliminateUnnecessaryJoin` has been excluded.
+      assert(join.nonEmpty)
+      checkNumLocalShuffleReads(adaptivePlan)
+    }
+  }
 
   // EmptyRelation case
   ignore(
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
index de979ac274..110dbffc73 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
@@ -1178,11 +1178,11 @@ class ClickHouseTestSettings extends 
BackendTestSettings {
     .exclude("SPARK-37753: Allow changing outer join to broadcast join even if 
too many empty partitions on broadcast side")
     .exclude("SPARK-29544: adaptive skew join with different join types")
     .exclude("SPARK-34682: AQEShuffleReadExec operating on canonicalized plan")
+    .exclude("SPARK-32717: AQEOptimizer should respect excludedRules 
configuration")
     .exclude("metrics of the shuffle read")
     .exclude("SPARK-31220, SPARK-32056: repartition by expression with AQE")
     .exclude("SPARK-31220, SPARK-32056: repartition by range with AQE")
     .exclude("SPARK-31220, SPARK-32056: repartition using sql and hint with 
AQE")
-    .exclude("SPARK-32573: Eliminate NAAJ when BuildSide is 
HashedRelationWithAllNullKeys")
     .exclude("SPARK-32753: Only copy tags to node with no tags")
     .exclude("Logging plan changes for AQE")
     .exclude("SPARK-33551: Do not use AQE shuffle read for repartition")
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
index 9ddaac1851..441f3a60a3 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
@@ -21,7 +21,6 @@ import 
org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase, Shuffl
 import org.apache.spark.SparkConf
 import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
 import org.apache.spark.sql.{Dataset, GlutenSQLTestsTrait, Row}
-import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST
 import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.adaptive._
@@ -799,10 +798,25 @@ class ClickHouseAdaptiveQueryExecSuite extends 
AdaptiveQueryExecSuite with Glute
     }
   }
 
-  // because gluten use columnar format, which cannot execute to get 
rowIterator, then get the key
-  // null status
-  ignore(
-    GLUTEN_TEST + "SPARK-32573: Eliminate NAAJ when BuildSide is 
HashedRelationWithAllNullKeys") {}
+  testGluten("SPARK-32717: AQEOptimizer should respect excludedRules 
configuration") {
+    withSQLConf(
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> Long.MaxValue.toString,
+      // This test is a copy of test(SPARK-32573), in order to test the 
configuration
+      // `spark.sql.adaptive.optimizer.excludedRules` works as expect.
+      "spark.gluten.sql.columnar.backend.ch.aqe.propagate.empty.relation" -> 
"false"
+    ) {
+      val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
+        "SELECT * FROM testData2 t1 WHERE t1.b NOT IN (SELECT b FROM 
testData3)")
+      val bhj = findTopLevelBroadcastHashJoin(plan)
+      assert(bhj.size == 1)
+      val join = findTopLevelBaseJoin(adaptivePlan)
+      // this is different compares to test(SPARK-32573) due to the rule
+      // `EliminateUnnecessaryJoin` has been excluded.
+      assert(join.nonEmpty)
+      checkNumLocalShuffleReads(adaptivePlan)
+    }
+  }
 
   testGluten("SPARK-32753: Only copy tags to node with no tags") {
     withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
index 89a44c602e..d478adbb32 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
@@ -1023,11 +1023,11 @@ class ClickHouseTestSettings extends 
BackendTestSettings {
     .exclude("SPARK-37753: Allow changing outer join to broadcast join even if 
too many empty partitions on broadcast side")
     .exclude("SPARK-29544: adaptive skew join with different join types")
     .exclude("SPARK-34682: AQEShuffleReadExec operating on canonicalized plan")
+    .exclude("SPARK-32717: AQEOptimizer should respect excludedRules 
configuration")
     .exclude("metrics of the shuffle read")
     .exclude("SPARK-31220, SPARK-32056: repartition by expression with AQE")
     .exclude("SPARK-31220, SPARK-32056: repartition by range with AQE")
     .exclude("SPARK-31220, SPARK-32056: repartition using sql and hint with 
AQE")
-    .exclude("SPARK-32573: Eliminate NAAJ when BuildSide is 
HashedRelationWithAllNullKeys")
     .exclude("SPARK-32753: Only copy tags to node with no tags")
     .exclude("Logging plan changes for AQE")
     .exclude("SPARK-33551: Do not use AQE shuffle read for repartition")
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
index 388036c558..089f72cd11 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
@@ -1023,11 +1023,11 @@ class ClickHouseTestSettings extends 
BackendTestSettings {
     .exclude("SPARK-37753: Allow changing outer join to broadcast join even if 
too many empty partitions on broadcast side")
     .exclude("SPARK-29544: adaptive skew join with different join types")
     .exclude("SPARK-34682: AQEShuffleReadExec operating on canonicalized plan")
+    .exclude("SPARK-32717: AQEOptimizer should respect excludedRules 
configuration")
     .exclude("metrics of the shuffle read")
     .exclude("SPARK-31220, SPARK-32056: repartition by expression with AQE")
     .exclude("SPARK-31220, SPARK-32056: repartition by range with AQE")
     .exclude("SPARK-31220, SPARK-32056: repartition using sql and hint with 
AQE")
-    .exclude("SPARK-32573: Eliminate NAAJ when BuildSide is 
HashedRelationWithAllNullKeys")
     .exclude("SPARK-32753: Only copy tags to node with no tags")
     .exclude("Logging plan changes for AQE")
     .exclude("SPARK-33551: Do not use AQE shuffle read for repartition")
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
index 49d47fa65b..2e5df7b859 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
@@ -797,10 +797,25 @@ class ClickHouseAdaptiveQueryExecSuite extends 
AdaptiveQueryExecSuite with Glute
     }
   }
 
-  // because gluten use columnar format, which cannot execute to get 
rowIterator, then get the key
-  // null status
-  ignore(
-    GLUTEN_TEST + "SPARK-32573: Eliminate NAAJ when BuildSide is 
HashedRelationWithAllNullKeys") {}
+  testGluten("SPARK-32717: AQEOptimizer should respect excludedRules 
configuration") {
+    withSQLConf(
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> Long.MaxValue.toString,
+      // This test is a copy of test(SPARK-32573), in order to test the 
configuration
+      // `spark.sql.adaptive.optimizer.excludedRules` works as expect.
+      "spark.gluten.sql.columnar.backend.ch.aqe.propagate.empty.relation" -> 
"false"
+    ) {
+      val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
+        "SELECT * FROM testData2 t1 WHERE t1.b NOT IN (SELECT b FROM 
testData3)")
+      val bhj = findTopLevelBroadcastHashJoin(plan)
+      assert(bhj.size == 1)
+      val join = findTopLevelBaseJoin(adaptivePlan)
+      // this is different compares to test(SPARK-32573) due to the rule
+      // `EliminateUnnecessaryJoin` has been excluded.
+      assert(join.nonEmpty)
+      checkNumLocalShuffleReads(adaptivePlan)
+    }
+  }
 
   // EmptyRelation case
   ignore(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to