This is an automated email from the ASF dual-hosted git repository.
zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new b83bab99bd [GLUTEN-7180][CH] Fix ut `Eliminate NAAJ when BuildSide is
HashedRelationWithAllNullKeys` for the CH backend when the aqe is on (#7181)
b83bab99bd is described below
commit b83bab99bd875408104cb3685b4f6e9cd9a3b841
Author: Zhichao Zhang <[email protected]>
AuthorDate: Wed Sep 11 10:12:47 2024 +0800
[GLUTEN-7180][CH] Fix ut `Eliminate NAAJ when BuildSide is
HashedRelationWithAllNullKeys` for the CH backend when the aqe is on (#7181)
Fix ut Eliminate NAAJ when BuildSide is HashedRelationWithAllNullKeys for
the CH backend when the aqe is on
Close #7180.
---
.../apache/gluten/vectorized/CHNativeBlock.java | 6 +++
.../gluten/vectorized/StorageJoinBuilder.java | 9 ++--
.../gluten/backendsapi/clickhouse/CHBackend.scala | 4 ++
.../gluten/backendsapi/clickhouse/CHRuleApi.scala | 1 +
.../clickhouse/CHSparkPlanExecApi.scala | 36 ++++++++++++--
.../extension/CHAQEPropagateEmptyRelation.scala | 57 ++++++++++++++++++++++
.../org/apache/gluten/vectorized/BlockStats.java | 43 ++++++----------
.../commands/GlutenCHCacheDataCommand.scala | 6 +--
.../joins/ClickHouseBuildSideRelation.scala | 6 ++-
.../spark/sql/execution/utils/CHExecUtil.scala | 36 ++++++++++----
.../GlutenClickHouseTPCHSaltNullParquetSuite.scala | 49 +++++++++++++++++++
.../benchmarks/CHHashBuildBenchmark.scala | 9 ++--
cpp-ch/local-engine/Join/BroadCastJoinBuilder.cpp | 6 ++-
cpp-ch/local-engine/Join/BroadCastJoinBuilder.h | 3 +-
.../Join/StorageJoinFromReadBuffer.cpp | 38 +++------------
.../local-engine/Join/StorageJoinFromReadBuffer.h | 3 +-
cpp-ch/local-engine/local_engine_jni.cpp | 46 +++++++++++++++--
.../utils/clickhouse/ClickHouseTestSettings.scala | 2 +-
.../ClickHouseAdaptiveQueryExecSuite.scala | 23 +++++++--
.../utils/clickhouse/ClickHouseTestSettings.scala | 2 +-
.../ClickHouseAdaptiveQueryExecSuite.scala | 24 +++++++--
.../utils/clickhouse/ClickHouseTestSettings.scala | 2 +-
.../utils/clickhouse/ClickHouseTestSettings.scala | 2 +-
.../ClickHouseAdaptiveQueryExecSuite.scala | 23 +++++++--
24 files changed, 324 insertions(+), 112 deletions(-)
diff --git
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeBlock.java
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeBlock.java
index e3c51ae285..c8cce61b4f 100644
---
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeBlock.java
+++
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeBlock.java
@@ -71,6 +71,12 @@ public class CHNativeBlock {
public native void nativeClose(long blockAddress);
+ public native BlockStats nativeBlockStats(long blockAddress, int
columnPosition);
+
+ public BlockStats getBlockStats(int columnPosition) {
+ return nativeBlockStats(blockAddress, columnPosition);
+ }
+
public void close() {
if (blockAddress != 0) {
nativeClose(blockAddress);
diff --git
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/StorageJoinBuilder.java
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/StorageJoinBuilder.java
index d53ff17188..d417fa1a9d 100644
---
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/StorageJoinBuilder.java
+++
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/StorageJoinBuilder.java
@@ -48,7 +48,8 @@ public class StorageJoinBuilder {
boolean hasMixedFiltCondition,
boolean isExistenceJoin,
byte[] namedStruct,
- boolean isNullAwareAntiJoin);
+ boolean isNullAwareAntiJoin,
+ boolean hasNullKeyValues);
private StorageJoinBuilder() {}
@@ -58,7 +59,8 @@ public class StorageJoinBuilder {
long rowCount,
BroadCastHashJoinContext broadCastContext,
List<Expression> newBuildKeys,
- List<Attribute> newOutput) {
+ List<Attribute> newOutput,
+ boolean hasNullKeyValues) {
ConverterUtils$ converter = ConverterUtils$.MODULE$;
List<Expression> keys;
List<Attribute> output;
@@ -96,7 +98,8 @@ public class StorageJoinBuilder {
broadCastContext.hasMixedFiltCondition(),
broadCastContext.isExistenceJoin(),
toNameStruct(output).toByteArray(),
- broadCastContext.isNullAwareAntiJoin());
+ broadCastContext.isNullAwareAntiJoin(),
+ hasNullKeyValues);
}
/** create table named struct */
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index 163f7568f7..69ea899c42 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -133,6 +133,10 @@ object CHBackendSettings extends BackendSettingsApi with
Logging {
".runtime_config.max_source_concatenate_bytes"
val GLUTEN_MAX_SHUFFLE_READ_BYTES_DEFAULT = GLUTEN_MAX_BLOCK_SIZE_DEFAULT *
256
+ val GLUTEN_AQE_PROPAGATEEMPTY: String =
+ GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME +
+ ".aqe.propagate.empty.relation"
+
def affinityMode: String = {
SparkEnv.get.conf
.get(
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
index 61fed3f999..a83d349cdb 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
@@ -44,6 +44,7 @@ private object CHRuleApi {
def injectSpark(injector: SparkInjector): Unit = {
// Regular Spark rules.
injector.injectQueryStagePrepRule(FallbackBroadcastHashJoinPrepQueryStage.apply)
+ injector.injectQueryStagePrepRule(spark =>
CHAQEPropagateEmptyRelation(spark))
injector.injectParser(
(spark, parserInterface) => new GlutenCacheFilesSqlParser(spark,
parserInterface))
injector.injectParser(
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index a8996c4d2e..1108b8b3c5 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -474,12 +474,12 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with
Logging {
numOutputRows: SQLMetric,
dataSize: SQLMetric): BuildSideRelation = {
- val buildKeys: Seq[Expression] = mode match {
+ val (buildKeys, isNullAware) = mode match {
case mode1: HashedRelationBroadcastMode =>
- mode1.key
+ (mode1.key, mode1.isNullAware)
case _ =>
// IdentityBroadcastMode
- Seq.empty
+ (Seq.empty, false)
}
val (newChild, newOutput, newBuildKeys) =
@@ -532,8 +532,27 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with
Logging {
}
(newChild, (child.output ++ appendedProjections).map(_.toAttribute),
preProjectionBuildKeys)
}
+
+ // find the key index in the output
+ val keyColumnIndex = if (isNullAware) {
+ def findKeyOrdinal(key: Expression, output: Seq[Attribute]): Int = {
+ key match {
+ case b: BoundReference => b.ordinal
+ case n: NamedExpression =>
+ output.indexWhere(o => (o.name.equals(n.name) && o.exprId ==
n.exprId))
+ case _ => throw new GlutenException(s"Cannot find $key in the
child's output: $output")
+ }
+ }
+ if (newBuildKeys.isEmpty) {
+ findKeyOrdinal(buildKeys(0), newOutput)
+ } else {
+ findKeyOrdinal(newBuildKeys(0), newOutput)
+ }
+ } else {
+ 0
+ }
val countsAndBytes =
- CHExecUtil.buildSideRDD(dataSize, newChild).collect
+ CHExecUtil.buildSideRDD(dataSize, newChild, isNullAware,
keyColumnIndex).collect
val batches = countsAndBytes.map(_._2)
val totalBatchesSize = batches.map(_.length).sum
@@ -548,8 +567,15 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with
Logging {
s" written bytes is correct.")
}
val rowCount = countsAndBytes.map(_._1).sum
+ val hasNullKeyValues =
countsAndBytes.map(_._3).foldLeft[Boolean](false)((b, a) => { b || a })
numOutputRows += rowCount
- ClickHouseBuildSideRelation(mode, newOutput, batches.flatten, rowCount,
newBuildKeys)
+ ClickHouseBuildSideRelation(
+ mode,
+ newOutput,
+ batches.flatten,
+ rowCount,
+ newBuildKeys,
+ hasNullKeyValues)
}
/** Define backend specfic expression mappings. */
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CHAQEPropagateEmptyRelation.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CHAQEPropagateEmptyRelation.scala
new file mode 100644
index 0000000000..6f5afa9726
--- /dev/null
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CHAQEPropagateEmptyRelation.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension
+
+import org.apache.gluten.backendsapi.clickhouse.CHBackendSettings
+import org.apache.gluten.utils.PhysicalPlanSelector
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.plans.LeftAnti
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{ColumnarBroadcastExchangeExec,
LocalTableScanExec, SparkPlan}
+import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec
+import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
+import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec,
ClickHouseBuildSideRelation}
+
+case class CHAQEPropagateEmptyRelation(session: SparkSession) extends
Rule[SparkPlan] {
+
+ def apply(plan: SparkPlan): SparkPlan = PhysicalPlanSelector.maybe(session,
plan) {
+ if (!(session.conf.get(CHBackendSettings.GLUTEN_AQE_PROPAGATEEMPTY,
"true").toBoolean)) {
+ plan
+ } else {
+ plan.transform {
+ case bhj @ BroadcastHashJoinExec(_, _, joinType, _, _, left, right,
isNullAwareAntiJoin)
+ if (joinType == LeftAnti) && isNullAwareAntiJoin =>
+ right match {
+ case BroadcastQueryStageExec(_, plan: SparkPlan, _) =>
+ val columnarBroadcast = plan match {
+ case c: ColumnarBroadcastExchangeExec => c
+ case ReusedExchangeExec(_, c: ColumnarBroadcastExchangeExec)
=> c
+ }
+ val chBuildSideRelation =
columnarBroadcast.relationFuture.get().value
+ chBuildSideRelation match {
+ case c: ClickHouseBuildSideRelation if c.hasNullKeyValues =>
+ LocalTableScanExec(bhj.output, Seq.empty)
+ case _ => bhj
+ }
+ case o => bhj
+ }
+ case other => other
+ }
+ }
+ }
+}
diff --git a/cpp-ch/local-engine/Join/BroadCastJoinBuilder.h
b/backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/BlockStats.java
similarity index 52%
copy from cpp-ch/local-engine/Join/BroadCastJoinBuilder.h
copy to
backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/BlockStats.java
index d089d7420a..e47454cbed 100644
--- a/cpp-ch/local-engine/Join/BroadCastJoinBuilder.h
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/BlockStats.java
@@ -14,37 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#pragma once
-#include <memory>
-#include <jni.h>
-#include <substrait/algebra.pb.h>
+package org.apache.gluten.vectorized;
-namespace DB
-{
-class ReadBuffer;
-}
-
-namespace local_engine
-{
-class StorageJoinFromReadBuffer;
-namespace BroadCastJoinBuilder
-{
+public class BlockStats {
+ private final long blockRecordCount;
+ private final boolean hasNullKeyValues;
-std::shared_ptr<StorageJoinFromReadBuffer> buildJoin(
- const std::string & key,
- DB::ReadBuffer & input,
- jlong row_count,
- const std::string & join_keys,
- jint join_type,
- bool has_mixed_join_condition,
- bool is_existence_join,
- const std::string & named_struct,
- bool is_null_aware_anti_join);
-void cleanBuildHashTable(const std::string & hash_table_id, jlong instance);
-std::shared_ptr<StorageJoinFromReadBuffer> getJoin(const std::string &
hash_table_id);
+ public BlockStats(long blockRecordCount, boolean hasNullKeyValues) {
+ this.blockRecordCount = blockRecordCount;
+ this.hasNullKeyValues = hasNullKeyValues;
+ }
+ public long getBlockRecordCount() {
+ return blockRecordCount;
+ }
-void init(JNIEnv *);
-void destroy(JNIEnv *);
-}
+ public boolean isHasNullKeyValues() {
+ return hasNullKeyValues;
+ }
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala
index bb3cb5acce..43e3b4b7ab 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala
@@ -43,7 +43,7 @@ import scala.concurrent.Future
case class GlutenCHCacheDataCommand(
onlyMetaCache: Boolean,
asynExecute: Boolean,
- selectedColuman: Option[Seq[String]],
+ selectedColumn: Option[Seq[String]],
path: Option[String],
table: Option[TableIdentifier],
tsfilter: Option[String],
@@ -93,8 +93,8 @@ case class GlutenCHCacheDataCommand(
"a Delta table? Refusing to garbage collect.")
val allColumns = snapshot.dataSchema.fieldNames.toSeq
- val selectedColumns = if (selectedColuman.nonEmpty) {
- selectedColuman.get
+ val selectedColumns = if (selectedColumn.nonEmpty) {
+ selectedColumn.get
.filter(allColumns.contains(_))
.map(ConverterUtils.normalizeColName)
.toSeq
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/joins/ClickHouseBuildSideRelation.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/joins/ClickHouseBuildSideRelation.scala
index 862a8b4200..92887f16d7 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/joins/ClickHouseBuildSideRelation.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/joins/ClickHouseBuildSideRelation.scala
@@ -35,7 +35,8 @@ case class ClickHouseBuildSideRelation(
output: Seq[Attribute],
batches: Array[Byte],
numOfRows: Long,
- newBuildKeys: Seq[Expression] = Seq.empty)
+ newBuildKeys: Seq[Expression] = Seq.empty,
+ hasNullKeyValues: Boolean = false)
extends BuildSideRelation
with Logging {
@@ -58,7 +59,8 @@ case class ClickHouseBuildSideRelation(
numOfRows,
broadCastContext,
newBuildKeys.asJava,
- output.asJava)
+ output.asJava,
+ hasNullKeyValues)
(hashTableData, this)
} else {
(StorageJoinBuilder.nativeCloneBuildHashTable(hashTableData), null)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala
index 4496d893fc..6d91108d22 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala
@@ -58,10 +58,13 @@ object CHExecUtil extends Logging {
def toBytes(
dataSize: SQLMetric,
iter: Iterator[ColumnarBatch],
+ isNullAware: Boolean = false,
+ keyColumnIndex: Int = 0,
compressionCodec: Option[String] = Some("lz4"),
compressionLevel: Option[Int] = None,
- bufferSize: Int = 4 << 10): Iterator[(Int, Array[Byte])] = {
- var count = 0
+ bufferSize: Int = 4 << 10): Iterator[(Long, Array[Byte], Boolean)] = {
+ var count = 0L
+ var hasNullKeyValues = false
val bos = new ByteArrayOutputStream()
val buffer = new Array[Byte](bufferSize) // 4K
val level = compressionLevel.getOrElse(Int.MinValue)
@@ -69,20 +72,35 @@ object CHExecUtil extends Logging {
compressionCodec
.map(new BlockOutputStream(bos, buffer, dataSize, true, _, level,
bufferSize))
.getOrElse(new BlockOutputStream(bos, buffer, dataSize, false, "",
level, bufferSize))
- while (iter.hasNext) {
- val batch = iter.next()
- count += batch.numRows
- blockOutputStream.write(batch)
+ if (isNullAware) {
+ while (iter.hasNext) {
+ val batch = iter.next()
+ val blockStats =
CHNativeBlock.fromColumnarBatch(batch).getBlockStats(keyColumnIndex)
+ count += blockStats.getBlockRecordCount
+ hasNullKeyValues = hasNullKeyValues || blockStats.isHasNullKeyValues
+ blockOutputStream.write(batch)
+ }
+ } else {
+ while (iter.hasNext) {
+ val batch = iter.next()
+ count += batch.numRows()
+ blockOutputStream.write(batch)
+ }
}
blockOutputStream.flush()
blockOutputStream.close()
- Iterator((count, bos.toByteArray))
+ Iterator((count, bos.toByteArray, hasNullKeyValues))
}
- def buildSideRDD(dataSize: SQLMetric, newChild: SparkPlan): RDD[(Int,
Array[Byte])] = {
+ def buildSideRDD(
+ dataSize: SQLMetric,
+ newChild: SparkPlan,
+ isNullAware: Boolean,
+ keyColumnIndex: Int
+ ): RDD[(Long, Array[Byte], Boolean)] = {
newChild
.executeColumnar()
- .mapPartitionsInternal(iter => toBytes(dataSize, iter))
+ .mapPartitionsInternal(iter => toBytes(dataSize, iter, isNullAware,
keyColumnIndex))
}
private def buildRangePartitionSampleRDD(
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala
index 49697872e8..f7cf0de376 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala
@@ -2935,6 +2935,55 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends
GlutenClickHouseTPCHAbstr
df => {
checkBHJWithIsNullAwareAntiJoin(df)
})
+
+ withSQLConf(("spark.sql.adaptive.enabled", "true")) {
+ def checkAQEBHJWithIsNullAwareAntiJoin(df: DataFrame, isNullAwareBhjCnt:
Int = 1): Unit = {
+ val bhjs = collect(df.queryExecution.executedPlan) {
+ case bhj: CHBroadcastHashJoinExecTransformer if
bhj.isNullAwareAntiJoin => true
+ }
+ assert(bhjs.size == isNullAwareBhjCnt)
+ }
+
+ val sql6 =
+ s"""
+ |select * from partsupp
+ |where
+ |ps_suppkey NOT IN (SELECT suppkey FROM VALUES (null), (6)
sub(suppkey))
+ |""".stripMargin
+ compareResultsAgainstVanillaSpark(
+ sql6,
+ true,
+ df => {
+ checkAQEBHJWithIsNullAwareAntiJoin(df, 0)
+ })
+
+ val sql7 =
+ s"""
+ |select * from partsupp
+ |where
+ |cast(ps_suppkey AS INT) NOT IN (SELECT suppkey FROM VALUES (null),
(6) sub(suppkey))
+ |""".stripMargin
+ compareResultsAgainstVanillaSpark(
+ sql7,
+ true,
+ df => {
+ checkAQEBHJWithIsNullAwareAntiJoin(df, 0)
+ })
+
+ val sql8 =
+ s"""
+ |select * from partsupp
+ |where
+ |ps_suppkey NOT IN (SELECT suppkey FROM VALUES (5), (6)
sub(suppkey))
+ |""".stripMargin
+ compareResultsAgainstVanillaSpark(
+ sql8,
+ true,
+ df => {
+ checkAQEBHJWithIsNullAwareAntiJoin(df)
+ })
+ }
+
}
test("soundex") {
diff --git
a/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHHashBuildBenchmark.scala
b/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHHashBuildBenchmark.scala
index 87c389a651..2c36b404ec 100644
---
a/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHHashBuildBenchmark.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHHashBuildBenchmark.scala
@@ -58,7 +58,7 @@ object CHHashBuildBenchmark extends SqlBasedBenchmark with
CHSqlBasedBenchmark w
|select $scanSchema from parquet.`$parquetDir`
|
|""".stripMargin)
- val rowCount: Int = chParquet.count().toInt
+ val rowCount = chParquet.count()
val runs = Seq(1, 2, 4, 8, 16, 32, 64).reverse
.map(num => rowCount / num)
@@ -79,13 +79,14 @@ object CHHashBuildBenchmark extends SqlBasedBenchmark with
CHSqlBasedBenchmark w
s"build hash table with $num rows with $iteration hash tables",
executedCnt) {
_ =>
- for (i <- 0 until iteration) {
+ for (i <- 0L until iteration) {
val table = StorageJoinBuilder.build(
bytes,
num,
relation,
new util.ArrayList[Expression](),
- new util.ArrayList[Attribute]())
+ new util.ArrayList[Attribute](),
+ false)
StorageJoinBuilder.nativeCleanBuildHashTable("", table)
}
}
@@ -94,7 +95,7 @@ object CHHashBuildBenchmark extends SqlBasedBenchmark with
CHSqlBasedBenchmark w
}
private def createBroadcastRelation(
- child: SparkPlan): (Array[Byte], Int, BroadCastHashJoinContext) = {
+ child: SparkPlan): (Array[Byte], Long, BroadCastHashJoinContext) = {
val dataSize = SQLMetrics.createSizeMetric(spark.sparkContext, "size of
files read")
val countsAndBytes = child
diff --git a/cpp-ch/local-engine/Join/BroadCastJoinBuilder.cpp
b/cpp-ch/local-engine/Join/BroadCastJoinBuilder.cpp
index 0b88137b32..5f20c17bfb 100644
--- a/cpp-ch/local-engine/Join/BroadCastJoinBuilder.cpp
+++ b/cpp-ch/local-engine/Join/BroadCastJoinBuilder.cpp
@@ -116,7 +116,8 @@ std::shared_ptr<StorageJoinFromReadBuffer> buildJoin(
bool has_mixed_join_condition,
bool is_existence_join,
const std::string & named_struct,
- bool is_null_aware_anti_join)
+ bool is_null_aware_anti_join,
+ bool has_null_key_values)
{
auto join_key_list = Poco::StringTokenizer(join_keys, ",");
Names key_names;
@@ -193,7 +194,8 @@ std::shared_ptr<StorageJoinFromReadBuffer> buildJoin(
ConstraintsDescription(),
key,
true,
- is_null_aware_anti_join);
+ is_null_aware_anti_join,
+ has_null_key_values);
}
void init(JNIEnv * env)
diff --git a/cpp-ch/local-engine/Join/BroadCastJoinBuilder.h
b/cpp-ch/local-engine/Join/BroadCastJoinBuilder.h
index d089d7420a..52db2e7e75 100644
--- a/cpp-ch/local-engine/Join/BroadCastJoinBuilder.h
+++ b/cpp-ch/local-engine/Join/BroadCastJoinBuilder.h
@@ -39,7 +39,8 @@ std::shared_ptr<StorageJoinFromReadBuffer> buildJoin(
bool has_mixed_join_condition,
bool is_existence_join,
const std::string & named_struct,
- bool is_null_aware_anti_join);
+ bool is_null_aware_anti_join,
+ bool has_null_key_values);
void cleanBuildHashTable(const std::string & hash_table_id, jlong instance);
std::shared_ptr<StorageJoinFromReadBuffer> getJoin(const std::string &
hash_table_id);
diff --git a/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.cpp
b/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.cpp
index 90baa754f3..834ba97815 100644
--- a/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.cpp
+++ b/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.cpp
@@ -74,9 +74,11 @@ StorageJoinFromReadBuffer::StorageJoinFromReadBuffer(
const ConstraintsDescription & constraints,
const String & comment,
const bool overwrite_,
- bool is_null_aware_anti_join_)
- : key_names(key_names_), use_nulls(use_nulls_), row_count(row_count_),
overwrite(overwrite_), is_null_aware_anti_join(is_null_aware_anti_join_)
+ bool is_null_aware_anti_join_,
+ bool has_null_key_values_)
+ : key_names(key_names_), use_nulls(use_nulls_), row_count(row_count_),
overwrite(overwrite_), is_null_aware_anti_join(is_null_aware_anti_join_),
has_null_key_value(has_null_key_values_)
{
+ is_empty_hash_table = row_count < 1;
storage_metadata.setColumns(columns);
storage_metadata.setConstraints(constraints);
storage_metadata.setComment(comment);
@@ -106,36 +108,8 @@ void StorageJoinFromReadBuffer::buildJoin(Blocks & data,
const Block header, std
auto build_join = [&]
{
join = std::make_shared<HashJoin>(analyzed_join, header, overwrite,
row_count);
- // only when is_null_aware_anti_join is true, it needs to check
whether is null key value exists
- if (is_null_aware_anti_join)
- {
- is_empty_hash_table = data.empty();
- size_t total_size = 0;
- for (Block block : data)
- {
- for (size_t i = 0; i < block.columns(); ++i)
- {
- const auto & column = block.getByPosition(i);
- if (column.name == key_names.at(0))
- {
- if (const auto * nullable =
checkAndGetColumn<ColumnNullable>(column.column.get()))
- {
- const auto & null_map_data =
nullable->getNullMapData();
- // check whether there is null key value
- has_null_key_value =
!DB::memoryIsZero(null_map_data.data(), 0, null_map_data.size());
- }
- }
- }
- total_size += block.rows();
- join->addBlockToJoin(std::move(block), true);
- }
- is_empty_hash_table = (total_size < 1);
- }
- else
- {
- for (Block block : data)
- join->addBlockToJoin(std::move(block), true);
- }
+ for (Block block : data)
+ join->addBlockToJoin(std::move(block), true);
};
/// Record memory usage in Total Memory Tracker
ThreadFromGlobalPoolNoTracingContextPropagation thread(build_join);
diff --git a/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.h
b/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.h
index 8c9416b544..e000952a45 100644
--- a/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.h
+++ b/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.h
@@ -47,7 +47,8 @@ public:
const DB::ConstraintsDescription & constraints_,
const String & comment,
bool overwrite_,
- bool is_null_aware_anti_join_);
+ bool is_null_aware_anti_join_,
+ bool has_null_key_values_);
bool has_null_key_value = false;
bool is_empty_hash_table = false;
diff --git a/cpp-ch/local-engine/local_engine_jni.cpp
b/cpp-ch/local-engine/local_engine_jni.cpp
index 15c721be34..79c777eddc 100644
--- a/cpp-ch/local-engine/local_engine_jni.cpp
+++ b/cpp-ch/local-engine/local_engine_jni.cpp
@@ -110,6 +110,9 @@ static jmethodID block_stripes_constructor;
static jclass split_result_class;
static jmethodID split_result_constructor;
+static jclass block_stats_class;
+static jmethodID block_stats_constructor;
+
JNIEXPORT jint JNI_OnLoad(JavaVM * vm, void * /*reserved*/)
{
JNIEnv * env;
@@ -127,6 +130,9 @@ JNIEXPORT jint JNI_OnLoad(JavaVM * vm, void * /*reserved*/)
split_result_class = local_engine::CreateGlobalClassReference(env,
"Lorg/apache/gluten/vectorized/CHSplitResult;");
split_result_constructor = local_engine::GetMethodID(env,
split_result_class, "<init>", "(JJJJJJ[J[JJJJ)V");
+ block_stats_class = local_engine::CreateGlobalClassReference(env,
"Lorg/apache/gluten/vectorized/BlockStats;");
+ block_stats_constructor = local_engine::GetMethodID(env,
block_stats_class, "<init>", "(JZ)V");
+
local_engine::ShuffleReader::input_stream_class
= local_engine::CreateGlobalClassReference(env,
"Lorg/apache/gluten/vectorized/ShuffleInputStream;");
local_engine::NativeSplitter::iterator_class
@@ -184,6 +190,7 @@ JNIEXPORT void JNI_OnUnload(JavaVM * vm, void *
/*reserved*/)
env->DeleteGlobalRef(spark_row_info_class);
env->DeleteGlobalRef(block_stripes_class);
env->DeleteGlobalRef(split_result_class);
+ env->DeleteGlobalRef(block_stats_class);
env->DeleteGlobalRef(local_engine::ShuffleReader::input_stream_class);
env->DeleteGlobalRef(local_engine::NativeSplitter::iterator_class);
env->DeleteGlobalRef(local_engine::WriteBufferFromJavaOutputStream::output_stream_class);
@@ -332,8 +339,8 @@
Java_org_apache_gluten_vectorized_CHColumnVector_nativeHasNull(JNIEnv * env, job
else
{
const auto * nullable =
checkAndGetColumn<DB::ColumnNullable>(&*col.column);
- size_t num_nulls = std::accumulate(nullable->getNullMapData().begin(),
nullable->getNullMapData().end(), 0);
- return num_nulls < block->rows();
+ const auto & null_map_data = nullable->getNullMapData();
+ return !DB::memoryIsZero(null_map_data.data(), 0,
null_map_data.size());
}
LOCAL_ENGINE_JNI_METHOD_END(env, false)
}
@@ -506,6 +513,35 @@ JNIEXPORT jlong
Java_org_apache_gluten_vectorized_CHNativeBlock_nativeTotalBytes
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
+JNIEXPORT jobject
Java_org_apache_gluten_vectorized_CHNativeBlock_nativeBlockStats(JNIEnv * env,
jobject obj, jlong block_address, jint column_position)
+{
+ LOCAL_ENGINE_JNI_METHOD_START
+ DB::Block * block = reinterpret_cast<DB::Block *>(block_address);
+ auto col = getColumnFromColumnVector(env, obj, block_address,
column_position);
+ if (!col.column->isNullable())
+ {
+ jobject block_stats = env->NewObject(
+ block_stats_class,
+ block_stats_constructor,
+ block->rows(),
+ false);
+ return block_stats;
+ }
+ else
+ {
+ const auto * nullable =
checkAndGetColumn<DB::ColumnNullable>(&*col.column);
+ const auto & null_map_data = nullable->getNullMapData();
+
+ jobject block_stats = env->NewObject(
+ block_stats_class,
+ block_stats_constructor,
+ block->rows(),
+ !DB::memoryIsZero(null_map_data.data(), 0, null_map_data.size()));
+ return block_stats;
+ }
+ LOCAL_ENGINE_JNI_METHOD_END(env, nullptr)
+}
+
JNIEXPORT jlong
Java_org_apache_gluten_vectorized_CHStreamReader_createNativeShuffleReader(
JNIEnv * env, jclass /*clazz*/, jobject input_stream, jboolean compressed,
jlong max_shuffle_read_rows, jlong max_shuffle_read_bytes)
{
@@ -1065,7 +1101,8 @@ JNIEXPORT jlong
Java_org_apache_gluten_vectorized_StorageJoinBuilder_nativeBuild
jboolean has_mixed_join_condition,
jboolean is_existence_join,
jbyteArray named_struct,
- jboolean is_null_aware_anti_join)
+ jboolean is_null_aware_anti_join,
+ jboolean has_null_key_values)
{
LOCAL_ENGINE_JNI_METHOD_START
const auto hash_table_id = jstring2string(env, key);
@@ -1086,7 +1123,8 @@ JNIEXPORT jlong
Java_org_apache_gluten_vectorized_StorageJoinBuilder_nativeBuild
has_mixed_join_condition,
is_existence_join,
struct_string,
- is_null_aware_anti_join));
+ is_null_aware_anti_join,
+ has_null_key_values));
return obj->instance();
LOCAL_ENGINE_JNI_METHOD_END(env, 0)
}
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
index 6d5083dbe2..b527a8ec1f 100644
---
a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
@@ -1157,11 +1157,11 @@ class ClickHouseTestSettings extends
BackendTestSettings {
"Avoid changing merge join to broadcast join if too many empty
partitions on build plan")
.exclude("SPARK-29544: adaptive skew join with different join types")
.exclude("SPARK-34682: AQEShuffleReadExec operating on canonicalized plan")
+ .exclude("SPARK-32717: AQEOptimizer should respect excludedRules
configuration")
.exclude("metrics of the shuffle read")
.exclude("SPARK-31220, SPARK-32056: repartition by expression with AQE")
.exclude("SPARK-31220, SPARK-32056: repartition by range with AQE")
.exclude("SPARK-31220, SPARK-32056: repartition using sql and hint with
AQE")
- .exclude("SPARK-32573: Eliminate NAAJ when BuildSide is
HashedRelationWithAllNullKeys")
.exclude("SPARK-32753: Only copy tags to node with no tags")
.exclude("Logging plan changes for AQE")
.exclude("SPARK-33551: Do not use AQE shuffle read for repartition")
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
index d59a7d1ff9..6d3c3e865d 100644
---
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
@@ -800,10 +800,25 @@ class ClickHouseAdaptiveQueryExecSuite extends
AdaptiveQueryExecSuite with Glute
}
}
- // because gluten use columnar format, which cannot execute to get
rowIterator, then get the key
- // null status
- ignore(
- GLUTEN_TEST + "SPARK-32573: Eliminate NAAJ when BuildSide is
HashedRelationWithAllNullKeys") {}
+ testGluten("SPARK-32717: AQEOptimizer should respect excludedRules
configuration") {
+ withSQLConf(
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> Long.MaxValue.toString,
+ // This test is a copy of test(SPARK-32573), in order to test the
configuration
+ // `spark.sql.adaptive.optimizer.excludedRules` works as expect.
+ "spark.gluten.sql.columnar.backend.ch.aqe.propagate.empty.relation" ->
"false"
+ ) {
+ val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
+ "SELECT * FROM testData2 t1 WHERE t1.b NOT IN (SELECT b FROM
testData3)")
+ val bhj = findTopLevelBroadcastHashJoin(plan)
+ assert(bhj.size == 1)
+ val join = findTopLevelBaseJoin(adaptivePlan)
+ // this is different compares to test(SPARK-32573) due to the rule
+ // `EliminateUnnecessaryJoin` has been excluded.
+ assert(join.nonEmpty)
+ checkNumLocalShuffleReads(adaptivePlan)
+ }
+ }
// EmptyRelation case
ignore(
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
index de979ac274..110dbffc73 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
@@ -1178,11 +1178,11 @@ class ClickHouseTestSettings extends
BackendTestSettings {
.exclude("SPARK-37753: Allow changing outer join to broadcast join even if
too many empty partitions on broadcast side")
.exclude("SPARK-29544: adaptive skew join with different join types")
.exclude("SPARK-34682: AQEShuffleReadExec operating on canonicalized plan")
+ .exclude("SPARK-32717: AQEOptimizer should respect excludedRules
configuration")
.exclude("metrics of the shuffle read")
.exclude("SPARK-31220, SPARK-32056: repartition by expression with AQE")
.exclude("SPARK-31220, SPARK-32056: repartition by range with AQE")
.exclude("SPARK-31220, SPARK-32056: repartition using sql and hint with
AQE")
- .exclude("SPARK-32573: Eliminate NAAJ when BuildSide is
HashedRelationWithAllNullKeys")
.exclude("SPARK-32753: Only copy tags to node with no tags")
.exclude("Logging plan changes for AQE")
.exclude("SPARK-33551: Do not use AQE shuffle read for repartition")
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
index 9ddaac1851..441f3a60a3 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
@@ -21,7 +21,6 @@ import
org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase, Shuffl
import org.apache.spark.SparkConf
import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
import org.apache.spark.sql.{Dataset, GlutenSQLTestsTrait, Row}
-import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive._
@@ -799,10 +798,25 @@ class ClickHouseAdaptiveQueryExecSuite extends
AdaptiveQueryExecSuite with Glute
}
}
- // because gluten use columnar format, which cannot execute to get
rowIterator, then get the key
- // null status
- ignore(
- GLUTEN_TEST + "SPARK-32573: Eliminate NAAJ when BuildSide is
HashedRelationWithAllNullKeys") {}
+ testGluten("SPARK-32717: AQEOptimizer should respect excludedRules
configuration") {
+ withSQLConf(
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> Long.MaxValue.toString,
+ // This test is a copy of test(SPARK-32573), in order to test the
configuration
+ // `spark.sql.adaptive.optimizer.excludedRules` works as expect.
+ "spark.gluten.sql.columnar.backend.ch.aqe.propagate.empty.relation" ->
"false"
+ ) {
+ val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
+ "SELECT * FROM testData2 t1 WHERE t1.b NOT IN (SELECT b FROM
testData3)")
+ val bhj = findTopLevelBroadcastHashJoin(plan)
+ assert(bhj.size == 1)
+ val join = findTopLevelBaseJoin(adaptivePlan)
+ // this is different compares to test(SPARK-32573) due to the rule
+ // `EliminateUnnecessaryJoin` has been excluded.
+ assert(join.nonEmpty)
+ checkNumLocalShuffleReads(adaptivePlan)
+ }
+ }
testGluten("SPARK-32753: Only copy tags to node with no tags") {
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
index 89a44c602e..d478adbb32 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
@@ -1023,11 +1023,11 @@ class ClickHouseTestSettings extends
BackendTestSettings {
.exclude("SPARK-37753: Allow changing outer join to broadcast join even if
too many empty partitions on broadcast side")
.exclude("SPARK-29544: adaptive skew join with different join types")
.exclude("SPARK-34682: AQEShuffleReadExec operating on canonicalized plan")
+ .exclude("SPARK-32717: AQEOptimizer should respect excludedRules
configuration")
.exclude("metrics of the shuffle read")
.exclude("SPARK-31220, SPARK-32056: repartition by expression with AQE")
.exclude("SPARK-31220, SPARK-32056: repartition by range with AQE")
.exclude("SPARK-31220, SPARK-32056: repartition using sql and hint with
AQE")
- .exclude("SPARK-32573: Eliminate NAAJ when BuildSide is
HashedRelationWithAllNullKeys")
.exclude("SPARK-32753: Only copy tags to node with no tags")
.exclude("Logging plan changes for AQE")
.exclude("SPARK-33551: Do not use AQE shuffle read for repartition")
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
index 388036c558..089f72cd11 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
@@ -1023,11 +1023,11 @@ class ClickHouseTestSettings extends
BackendTestSettings {
.exclude("SPARK-37753: Allow changing outer join to broadcast join even if
too many empty partitions on broadcast side")
.exclude("SPARK-29544: adaptive skew join with different join types")
.exclude("SPARK-34682: AQEShuffleReadExec operating on canonicalized plan")
+ .exclude("SPARK-32717: AQEOptimizer should respect excludedRules
configuration")
.exclude("metrics of the shuffle read")
.exclude("SPARK-31220, SPARK-32056: repartition by expression with AQE")
.exclude("SPARK-31220, SPARK-32056: repartition by range with AQE")
.exclude("SPARK-31220, SPARK-32056: repartition using sql and hint with
AQE")
- .exclude("SPARK-32573: Eliminate NAAJ when BuildSide is
HashedRelationWithAllNullKeys")
.exclude("SPARK-32753: Only copy tags to node with no tags")
.exclude("Logging plan changes for AQE")
.exclude("SPARK-33551: Do not use AQE shuffle read for repartition")
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
index 49d47fa65b..2e5df7b859 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
@@ -797,10 +797,25 @@ class ClickHouseAdaptiveQueryExecSuite extends
AdaptiveQueryExecSuite with Glute
}
}
- // because gluten use columnar format, which cannot execute to get
rowIterator, then get the key
- // null status
- ignore(
- GLUTEN_TEST + "SPARK-32573: Eliminate NAAJ when BuildSide is
HashedRelationWithAllNullKeys") {}
+ testGluten("SPARK-32717: AQEOptimizer should respect excludedRules
configuration") {
+ withSQLConf(
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> Long.MaxValue.toString,
+ // This test is a copy of test(SPARK-32573), in order to test the
configuration
+ // `spark.sql.adaptive.optimizer.excludedRules` works as expect.
+ "spark.gluten.sql.columnar.backend.ch.aqe.propagate.empty.relation" ->
"false"
+ ) {
+ val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
+ "SELECT * FROM testData2 t1 WHERE t1.b NOT IN (SELECT b FROM
testData3)")
+ val bhj = findTopLevelBroadcastHashJoin(plan)
+ assert(bhj.size == 1)
+ val join = findTopLevelBaseJoin(adaptivePlan)
+ // this is different compares to test(SPARK-32573) due to the rule
+ // `EliminateUnnecessaryJoin` has been excluded.
+ assert(join.nonEmpty)
+ checkNumLocalShuffleReads(adaptivePlan)
+ }
+ }
// EmptyRelation case
ignore(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]