This is an automated email from the ASF dual-hosted git repository.
kejia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new f2fecae9d5 [GLUTEN-8393][VL] Fix smj result mismatch issue (#8394)
f2fecae9d5 is described below
commit f2fecae9d5f9546d517d0340db52b33cda8ecaad
Author: JiaKe <[email protected]>
AuthorDate: Fri Jan 3 12:58:58 2025 +0800
[GLUTEN-8393][VL] Fix smj result mismatch issue (#8394)
---
.github/workflows/velox_backend.yml | 64 ++++++++++++++++++++++
.../gluten/execution/MiscOperatorSuite.scala | 14 +++--
.../gluten/execution/VeloxMetricsSuite.scala | 4 +-
.../VeloxOrcDataTypeValidationSuite.scala | 4 +-
.../VeloxParquetDataTypeValidationSuite.scala | 4 +-
.../apache/gluten/execution/VeloxTPCHSuite.scala | 5 ++
cpp/velox/substrait/SubstraitToVeloxPlan.cc | 12 ++--
.../substrait/SubstraitToVeloxPlanValidator.cc | 1 +
.../execution/SortMergeJoinExecTransformer.scala | 5 --
9 files changed, 93 insertions(+), 20 deletions(-)
diff --git a/.github/workflows/velox_backend.yml
b/.github/workflows/velox_backend.yml
index 55cb42176f..ada2ea3f23 100644
--- a/.github/workflows/velox_backend.yml
+++ b/.github/workflows/velox_backend.yml
@@ -973,6 +973,70 @@ jobs:
name: test-report-spark35-slow-ras
path: '**/surefire-reports/TEST-*.xml'
+ run-spark-test-spark35-smj:
+ needs: build-native-lib-centos-7
+ runs-on: ubuntu-20.04
+ container: apache/gluten:centos-8
+ steps:
+ - uses: actions/checkout@v2
+ - name: Download All Artifacts
+ uses: actions/download-artifact@v3
+ with:
+ name: velox-native-lib-centos-7-${{github.sha}}
+ path: ./cpp/build/releases
+ - name: Download Arrow Jars
+ uses: actions/download-artifact@v3
+ with:
+ name: arrow-jars-centos-7-${{github.sha}}
+ path: /root/.m2/repository/org/apache/arrow/
+ - name: Prepare
+ run: |
+ dnf module -y install python39 && \
+ alternatives --set python3 /usr/bin/python3.9 && \
+ pip3 install setuptools && \
+ pip3 install pyspark==3.5.2 cython && \
+ pip3 install pandas pyarrow
+ - name: Build and Run unit test for Spark 3.5.2 (other tests)
+ run: |
+ cd $GITHUB_WORKSPACE/
+ export SPARK_SCALA_VERSION=2.12
+ $MVN_CMD clean test -Pspark-3.5 -Pbackends-velox -Pceleborn
-Piceberg -Pdelta -Pspark-ut \
+ -DargLine="-Dspark.test.home=/opt/shims/spark35/spark_home/
-Dspark.gluten.sql.columnar.forceShuffledHashJoin=false" \
+
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags
+ - name: Upload test report
+ uses: actions/upload-artifact@v4
+ with:
+ name: test-report-spark35-smj
+ path: '**/surefire-reports/TEST-*.xml'
+
+ run-spark-test-spark35-slow-smj:
+ needs: build-native-lib-centos-7
+ runs-on: ubuntu-20.04
+ container: apache/gluten:centos-8
+ steps:
+ - uses: actions/checkout@v2
+ - name: Download All Artifacts
+ uses: actions/download-artifact@v3
+ with:
+ name: velox-native-lib-centos-7-${{github.sha}}
+ path: ./cpp/build/releases
+ - name: Download Arrow Jars
+ uses: actions/download-artifact@v3
+ with:
+ name: arrow-jars-centos-7-${{github.sha}}
+ path: /root/.m2/repository/org/apache/arrow/
+ - name: Build and Run unit test for Spark 3.5.2 (slow tests)
+ run: |
+ cd $GITHUB_WORKSPACE/
+ $MVN_CMD clean test -Pspark-3.5 -Pbackends-velox -Pceleborn
-Piceberg -Pdelta -Pspark-ut \
+ -DargLine="-Dspark.test.home=/opt/shims/spark35/spark_home/
-Dspark.gluten.sql.columnar.forceShuffledHashJoin=false" \
+ -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
+ - name: Upload test report
+ uses: actions/upload-artifact@v4
+ with:
+ name: test-report-spark35-slow-smj
+ path: '**/surefire-reports/TEST-*.xml'
+
run-cpp-test-udf-test:
runs-on: ubuntu-20.04
container: apache/gluten:centos-8
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
index 989def88e7..42e3e0bcc7 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
@@ -1275,12 +1275,14 @@ class MiscOperatorSuite extends
VeloxWholeStageTransformerSuite with AdaptiveSpa
|select cast(id as int) as c1, cast(id as string) c2 from
range(100) order by c1 desc;
|""".stripMargin)
- runQueryAndCompare(
- """
- |select * from t1 cross join t2 on t1.c1 = t2.c1;
- |""".stripMargin
- ) {
- checkGlutenOperatorMatch[ShuffledHashJoinExecTransformer]
+ withSQLConf("spark.gluten.sql.columnar.forceShuffledHashJoin" -> "true")
{
+ runQueryAndCompare(
+ """
+ |select * from t1 cross join t2 on t1.c1 = t2.c1;
+ |""".stripMargin
+ ) {
+ checkGlutenOperatorMatch[ShuffledHashJoinExecTransformer]
+ }
}
withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "1MB") {
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala
index a7ad8514f9..5bb7ed6333 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala
@@ -101,7 +101,9 @@ class VeloxMetricsSuite extends
VeloxWholeStageTransformerSuite with AdaptiveSpa
}
test("test shuffle hash join metrics") {
- withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+ withSQLConf(
+ GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key -> "true",
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
// without preproject
runQueryAndCompare(
"SELECT * FROM metrics_t1 join metrics_t2 on metrics_t1.c1 =
metrics_t2.c1"
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxOrcDataTypeValidationSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxOrcDataTypeValidationSuite.scala
index 6ac59ba4fa..e47524ce92 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxOrcDataTypeValidationSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxOrcDataTypeValidationSuite.scala
@@ -287,7 +287,9 @@ class VeloxOrcDataTypeValidationSuite extends
VeloxWholeStageTransformerSuite {
}
// Validation: ShuffledHashJoin.
- withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "-1") {
+ withSQLConf(
+ "spark.gluten.sql.columnar.forceShuffledHashJoin" -> "true",
+ "spark.sql.autoBroadcastJoinThreshold" -> "-1") {
runQueryAndCompare(
"select type1.date from type1," +
" type2 where type1.date = type2.date") {
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxParquetDataTypeValidationSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxParquetDataTypeValidationSuite.scala
index cb5614f396..f1279229a9 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxParquetDataTypeValidationSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxParquetDataTypeValidationSuite.scala
@@ -286,7 +286,9 @@ class VeloxParquetDataTypeValidationSuite extends
VeloxWholeStageTransformerSuit
}
// Validation: ShuffledHashJoin.
- withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "-1") {
+ withSQLConf(
+ "spark.gluten.sql.columnar.forceShuffledHashJoin" -> "true",
+ "spark.sql.autoBroadcastJoinThreshold" -> "-1") {
runQueryAndCompare(
"select type1.date from type1," +
" type2 where type1.date = type2.date") {
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxTPCHSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxTPCHSuite.scala
index f69c6b8422..375c343b59 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxTPCHSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxTPCHSuite.scala
@@ -116,6 +116,11 @@ abstract class VeloxTPCHSuite extends
VeloxTPCHTableSupport {
}
}
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf
+ .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "true")
+ }
+
test("TPC-H q1") {
runTPCHQuery(1, tpchQueries, queriesResults, compareResult = false,
noFallBack = true) {
checkGoldenFile(_, 1)
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
index e95b4b74af..dab9837936 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
@@ -1435,18 +1435,18 @@ void SubstraitToVeloxPlanConverter::extractJoinKeys(
const ::substrait::Expression& joinExpression,
std::vector<const ::substrait::Expression::FieldReference*>& leftExprs,
std::vector<const ::substrait::Expression::FieldReference*>& rightExprs) {
- std::vector<const ::substrait::Expression*> expressions;
- expressions.push_back(&joinExpression);
+ std::stack<const ::substrait::Expression*> expressions;
+ expressions.push(&joinExpression);
while (!expressions.empty()) {
- auto visited = expressions.back();
- expressions.pop_back();
+ auto visited = expressions.top();
+ expressions.pop();
if (visited->rex_type_case() ==
::substrait::Expression::RexTypeCase::kScalarFunction) {
const auto& funcName = SubstraitParser::getNameBeforeDelimiter(
SubstraitParser::findVeloxFunction(functionMap_,
visited->scalar_function().function_reference()));
const auto& args = visited->scalar_function().arguments();
if (funcName == "and") {
- expressions.push_back(&args[0].value());
- expressions.push_back(&args[1].value());
+ expressions.push(&args[1].value());
+ expressions.push(&args[0].value());
} else if (funcName == "eq" || funcName == "equalto" || funcName ==
"decimal_equalto") {
VELOX_CHECK(std::all_of(args.cbegin(), args.cend(), [](const
::substrait::FunctionArgument& arg) {
return arg.value().has_selection();
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
index 996b3bdce0..25e87699fe 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
@@ -980,6 +980,7 @@ bool SubstraitToVeloxPlanValidator::validate(const
::substrait::JoinRel& joinRel
SubstraitParser::configSetInOptimization(joinRel.advanced_extension(),
"isSMJ=")) {
switch (joinRel.type()) {
case ::substrait::JoinRel_JoinType_JOIN_TYPE_INNER:
+ case ::substrait::JoinRel_JoinType_JOIN_TYPE_OUTER:
case ::substrait::JoinRel_JoinType_JOIN_TYPE_LEFT:
case ::substrait::JoinRel_JoinType_JOIN_TYPE_RIGHT:
case ::substrait::JoinRel_JoinType_JOIN_TYPE_LEFT_SEMI:
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/SortMergeJoinExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/SortMergeJoinExecTransformer.scala
index 5c57e5b62a..3bca596940 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/SortMergeJoinExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/SortMergeJoinExecTransformer.scala
@@ -250,11 +250,6 @@ case class SortMergeJoinExecTransformer(
projectList) {
override protected def doValidateInternal(): ValidationResult = {
- // Firstly, need to check if the Substrait plan for this operator can be
successfully generated.
- if (substraitJoinType == JoinRel.JoinType.JOIN_TYPE_OUTER) {
- return ValidationResult
- .failed(s"Found unsupported join type of $joinType for velox smj:
$substraitJoinType")
- }
super.doValidateInternal()
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]