This is an automated email from the ASF dual-hosted git repository.

kejia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new f2fecae9d5 [GLUTEN-8393][VL] Fix smj result mismatch issue  (#8394)
f2fecae9d5 is described below

commit f2fecae9d5f9546d517d0340db52b33cda8ecaad
Author: JiaKe <[email protected]>
AuthorDate: Fri Jan 3 12:58:58 2025 +0800

    [GLUTEN-8393][VL] Fix smj result mismatch issue  (#8394)
---
 .github/workflows/velox_backend.yml                | 64 ++++++++++++++++++++++
 .../gluten/execution/MiscOperatorSuite.scala       | 14 +++--
 .../gluten/execution/VeloxMetricsSuite.scala       |  4 +-
 .../VeloxOrcDataTypeValidationSuite.scala          |  4 +-
 .../VeloxParquetDataTypeValidationSuite.scala      |  4 +-
 .../apache/gluten/execution/VeloxTPCHSuite.scala   |  5 ++
 cpp/velox/substrait/SubstraitToVeloxPlan.cc        | 12 ++--
 .../substrait/SubstraitToVeloxPlanValidator.cc     |  1 +
 .../execution/SortMergeJoinExecTransformer.scala   |  5 --
 9 files changed, 93 insertions(+), 20 deletions(-)

diff --git a/.github/workflows/velox_backend.yml 
b/.github/workflows/velox_backend.yml
index 55cb42176f..ada2ea3f23 100644
--- a/.github/workflows/velox_backend.yml
+++ b/.github/workflows/velox_backend.yml
@@ -973,6 +973,70 @@ jobs:
           name: test-report-spark35-slow-ras
           path: '**/surefire-reports/TEST-*.xml'
 
+  run-spark-test-spark35-smj:
+    needs: build-native-lib-centos-7
+    runs-on: ubuntu-20.04
+    container: apache/gluten:centos-8
+    steps:
+      - uses: actions/checkout@v2
+      - name: Download All Artifacts
+        uses: actions/download-artifact@v3
+        with:
+          name: velox-native-lib-centos-7-${{github.sha}}
+          path: ./cpp/build/releases
+      - name: Download Arrow Jars
+        uses: actions/download-artifact@v3
+        with:
+          name: arrow-jars-centos-7-${{github.sha}}
+          path: /root/.m2/repository/org/apache/arrow/
+      - name: Prepare
+        run: |
+          dnf module -y install python39 && \
+          alternatives --set python3 /usr/bin/python3.9 && \
+          pip3 install setuptools && \
+          pip3 install pyspark==3.5.2 cython && \
+          pip3 install pandas pyarrow
+      - name: Build and Run unit test for Spark 3.5.2 (other tests)
+        run: |
+          cd $GITHUB_WORKSPACE/
+          export SPARK_SCALA_VERSION=2.12
+          $MVN_CMD clean test -Pspark-3.5 -Pbackends-velox -Pceleborn 
-Piceberg -Pdelta -Pspark-ut \
+          -DargLine="-Dspark.test.home=/opt/shims/spark35/spark_home/ 
-Dspark.gluten.sql.columnar.forceShuffledHashJoin=false" \
+          
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags
+      - name: Upload test report
+        uses: actions/upload-artifact@v4
+        with:
+          name: test-report-spark35-smj
+          path: '**/surefire-reports/TEST-*.xml'
+
+  run-spark-test-spark35-slow-smj:
+    needs: build-native-lib-centos-7
+    runs-on: ubuntu-20.04
+    container: apache/gluten:centos-8
+    steps:
+      - uses: actions/checkout@v2
+      - name: Download All Artifacts
+        uses: actions/download-artifact@v3
+        with:
+          name: velox-native-lib-centos-7-${{github.sha}}
+          path: ./cpp/build/releases
+      - name: Download Arrow Jars
+        uses: actions/download-artifact@v3
+        with:
+          name: arrow-jars-centos-7-${{github.sha}}
+          path: /root/.m2/repository/org/apache/arrow/
+      - name: Build and Run unit test for Spark 3.5.2 (slow tests)
+        run: |
+          cd $GITHUB_WORKSPACE/
+          $MVN_CMD clean test -Pspark-3.5 -Pbackends-velox -Pceleborn 
-Piceberg -Pdelta -Pspark-ut \
+          -DargLine="-Dspark.test.home=/opt/shims/spark35/spark_home/ 
-Dspark.gluten.sql.columnar.forceShuffledHashJoin=false" \
+          -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
+      - name: Upload test report
+        uses: actions/upload-artifact@v4
+        with:
+          name: test-report-spark35-slow-smj
+          path: '**/surefire-reports/TEST-*.xml'
+
   run-cpp-test-udf-test:
     runs-on: ubuntu-20.04
     container: apache/gluten:centos-8
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
index 989def88e7..42e3e0bcc7 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
@@ -1275,12 +1275,14 @@ class MiscOperatorSuite extends 
VeloxWholeStageTransformerSuite with AdaptiveSpa
             |select cast(id as int) as c1, cast(id as string) c2 from 
range(100) order by c1 desc;
             |""".stripMargin)
 
-      runQueryAndCompare(
-        """
-          |select * from t1 cross join t2 on t1.c1 = t2.c1;
-          |""".stripMargin
-      ) {
-        checkGlutenOperatorMatch[ShuffledHashJoinExecTransformer]
+      withSQLConf("spark.gluten.sql.columnar.forceShuffledHashJoin" -> "true") 
{
+        runQueryAndCompare(
+          """
+            |select * from t1 cross join t2 on t1.c1 = t2.c1;
+            |""".stripMargin
+        ) {
+          checkGlutenOperatorMatch[ShuffledHashJoinExecTransformer]
+        }
       }
 
       withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "1MB") {
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala
index a7ad8514f9..5bb7ed6333 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala
@@ -101,7 +101,9 @@ class VeloxMetricsSuite extends 
VeloxWholeStageTransformerSuite with AdaptiveSpa
   }
 
   test("test shuffle hash join metrics") {
-    withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+    withSQLConf(
+      GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key -> "true",
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
       // without preproject
       runQueryAndCompare(
         "SELECT * FROM metrics_t1 join metrics_t2 on metrics_t1.c1 = 
metrics_t2.c1"
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxOrcDataTypeValidationSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxOrcDataTypeValidationSuite.scala
index 6ac59ba4fa..e47524ce92 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxOrcDataTypeValidationSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxOrcDataTypeValidationSuite.scala
@@ -287,7 +287,9 @@ class VeloxOrcDataTypeValidationSuite extends 
VeloxWholeStageTransformerSuite {
     }
 
     // Validation: ShuffledHashJoin.
-    withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "-1") {
+    withSQLConf(
+      "spark.gluten.sql.columnar.forceShuffledHashJoin" -> "true",
+      "spark.sql.autoBroadcastJoinThreshold" -> "-1") {
       runQueryAndCompare(
         "select type1.date from type1," +
           " type2 where type1.date = type2.date") {
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxParquetDataTypeValidationSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxParquetDataTypeValidationSuite.scala
index cb5614f396..f1279229a9 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxParquetDataTypeValidationSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxParquetDataTypeValidationSuite.scala
@@ -286,7 +286,9 @@ class VeloxParquetDataTypeValidationSuite extends 
VeloxWholeStageTransformerSuit
     }
 
     // Validation: ShuffledHashJoin.
-    withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "-1") {
+    withSQLConf(
+      "spark.gluten.sql.columnar.forceShuffledHashJoin" -> "true",
+      "spark.sql.autoBroadcastJoinThreshold" -> "-1") {
       runQueryAndCompare(
         "select type1.date from type1," +
           " type2 where type1.date = type2.date") {
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxTPCHSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxTPCHSuite.scala
index f69c6b8422..375c343b59 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxTPCHSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxTPCHSuite.scala
@@ -116,6 +116,11 @@ abstract class VeloxTPCHSuite extends 
VeloxTPCHTableSupport {
     }
   }
 
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf
+      .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "true")
+  }
+
   test("TPC-H q1") {
     runTPCHQuery(1, tpchQueries, queriesResults, compareResult = false, 
noFallBack = true) {
       checkGoldenFile(_, 1)
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc 
b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
index e95b4b74af..dab9837936 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
@@ -1435,18 +1435,18 @@ void SubstraitToVeloxPlanConverter::extractJoinKeys(
     const ::substrait::Expression& joinExpression,
     std::vector<const ::substrait::Expression::FieldReference*>& leftExprs,
     std::vector<const ::substrait::Expression::FieldReference*>& rightExprs) {
-  std::vector<const ::substrait::Expression*> expressions;
-  expressions.push_back(&joinExpression);
+  std::stack<const ::substrait::Expression*> expressions;
+  expressions.push(&joinExpression);
   while (!expressions.empty()) {
-    auto visited = expressions.back();
-    expressions.pop_back();
+    auto visited = expressions.top();
+    expressions.pop();
     if (visited->rex_type_case() == 
::substrait::Expression::RexTypeCase::kScalarFunction) {
       const auto& funcName = SubstraitParser::getNameBeforeDelimiter(
           SubstraitParser::findVeloxFunction(functionMap_, 
visited->scalar_function().function_reference()));
       const auto& args = visited->scalar_function().arguments();
       if (funcName == "and") {
-        expressions.push_back(&args[0].value());
-        expressions.push_back(&args[1].value());
+        expressions.push(&args[1].value());
+        expressions.push(&args[0].value());
       } else if (funcName == "eq" || funcName == "equalto" || funcName == 
"decimal_equalto") {
         VELOX_CHECK(std::all_of(args.cbegin(), args.cend(), [](const 
::substrait::FunctionArgument& arg) {
           return arg.value().has_selection();
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc 
b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
index 996b3bdce0..25e87699fe 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
@@ -980,6 +980,7 @@ bool SubstraitToVeloxPlanValidator::validate(const 
::substrait::JoinRel& joinRel
       SubstraitParser::configSetInOptimization(joinRel.advanced_extension(), 
"isSMJ=")) {
     switch (joinRel.type()) {
       case ::substrait::JoinRel_JoinType_JOIN_TYPE_INNER:
+      case ::substrait::JoinRel_JoinType_JOIN_TYPE_OUTER:
       case ::substrait::JoinRel_JoinType_JOIN_TYPE_LEFT:
       case ::substrait::JoinRel_JoinType_JOIN_TYPE_RIGHT:
       case ::substrait::JoinRel_JoinType_JOIN_TYPE_LEFT_SEMI:
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/SortMergeJoinExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/SortMergeJoinExecTransformer.scala
index 5c57e5b62a..3bca596940 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/SortMergeJoinExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/SortMergeJoinExecTransformer.scala
@@ -250,11 +250,6 @@ case class SortMergeJoinExecTransformer(
     projectList) {
 
   override protected def doValidateInternal(): ValidationResult = {
-    // Firstly, need to check if the Substrait plan for this operator can be 
successfully generated.
-    if (substraitJoinType == JoinRel.JoinType.JOIN_TYPE_OUTER) {
-      return ValidationResult
-        .failed(s"Found unsupported join type of $joinType for velox smj: 
$substraitJoinType")
-    }
     super.doValidateInternal()
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to