This is an automated email from the ASF dual-hosted git repository.

philo pushed a commit to branch branch-1.2
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/branch-1.2 by this push:
     new 73b0fb6ebf [VL][1.2] Port #6563 #6679 for build options and 
collectQueryExecutionFallbackSummary fix (#7919)
73b0fb6ebf is described below

commit 73b0fb6ebfb3c8fe20db15332a797786e7e03972
Author: Wei-Ting Chen <[email protected]>
AuthorDate: Mon Nov 18 10:01:27 2024 +0800

    [VL][1.2] Port #6563 #6679 for build options and 
collectQueryExecutionFallbackSummary fix (#7919)
---
 cpp/velox/tests/CMakeLists.txt                     |   4 +-
 cpp/velox/tests/MemoryManagerTest.cc               |   3 +-
 .../spark/sql/execution/GlutenImplicits.scala      | 231 +++++++++++----------
 3 files changed, 120 insertions(+), 118 deletions(-)

diff --git a/cpp/velox/tests/CMakeLists.txt b/cpp/velox/tests/CMakeLists.txt
index f3d65f127f..dac83cd877 100644
--- a/cpp/velox/tests/CMakeLists.txt
+++ b/cpp/velox/tests/CMakeLists.txt
@@ -29,8 +29,8 @@ function(add_velox_test TEST_EXEC)
   target_include_directories(
     ${TEST_EXEC} PRIVATE ${CMAKE_SOURCE_DIR}/velox ${CMAKE_SOURCE_DIR}/src
                          ${VELOX_BUILD_PATH}/_deps/duckdb-src/src/include)
-  target_link_libraries(${TEST_EXEC} velox_benchmark_common GTest::gtest
-                        GTest::gtest_main)
+  target_link_libraries(${TEST_EXEC} velox GTest::gtest GTest::gtest_main
+                        google::glog)
   gtest_discover_tests(${TEST_EXEC} DISCOVERY_MODE PRE_TEST)
 endfunction()
 
diff --git a/cpp/velox/tests/MemoryManagerTest.cc 
b/cpp/velox/tests/MemoryManagerTest.cc
index 52f2fa8b66..d86bd46e23 100644
--- a/cpp/velox/tests/MemoryManagerTest.cc
+++ b/cpp/velox/tests/MemoryManagerTest.cc
@@ -15,7 +15,6 @@
  * limitations under the License.
  */
 
-#include "benchmarks/common/BenchmarkUtils.h"
 #include "compute/VeloxBackend.h"
 #include "config/VeloxConfig.h"
 #include "memory/VeloxMemoryManager.h"
@@ -50,7 +49,7 @@ class MemoryManagerTest : public ::testing::Test {
     std::unordered_map<std::string, std::string> conf = {
         {kMemoryReservationBlockSize, 
std::to_string(kMemoryReservationBlockSizeDefault)},
         {kVeloxMemInitCapacity, std::to_string(kVeloxMemInitCapacityDefault)}};
-    initVeloxBackend(conf);
+    gluten::VeloxBackend::create(conf);
   }
 
   void SetUp() override {
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
 
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
index eb42f0a884..2e2af6517d 100644
--- 
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
+++ 
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
@@ -20,7 +20,7 @@ import org.apache.gluten.execution.WholeStageTransformer
 import org.apache.gluten.extension.GlutenPlan
 import org.apache.gluten.utils.PlanUtil
 
-import org.apache.spark.sql.{AnalysisException, Dataset}
+import org.apache.spark.sql.{AnalysisException, Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.{CommandResult, LogicalPlan}
 import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
@@ -87,131 +87,134 @@ object GlutenImplicits {
     }
   }
 
-  implicit class DatasetTransformer[T](dateset: Dataset[T]) {
-    private def isFinalAdaptivePlan(p: AdaptiveSparkPlanExec): Boolean = {
-      val args = p.argString(Int.MaxValue)
-      val index = args.indexOf("isFinalPlan=")
-      assert(index >= 0)
-      args.substring(index + "isFinalPlan=".length).trim.toBoolean
-    }
+  private def isFinalAdaptivePlan(p: AdaptiveSparkPlanExec): Boolean = {
+    val args = p.argString(Int.MaxValue)
+    val index = args.indexOf("isFinalPlan=")
+    assert(index >= 0)
+    args.substring(index + "isFinalPlan=".length).trim.toBoolean
+  }
 
-    private def collectFallbackNodes(plan: QueryPlan[_]): FallbackInfo = {
-      var numGlutenNodes = 0
-      val fallbackNodeToReason = new mutable.HashMap[String, String]
-
-      def collect(tmp: QueryPlan[_]): Unit = {
-        tmp.foreachUp {
-          case _: ExecutedCommandExec =>
-          case _: CommandResultExec =>
-          case _: V2CommandExec =>
-          case _: DataWritingCommandExec =>
-          case _: WholeStageCodegenExec =>
-          case _: WholeStageTransformer =>
-          case _: InputAdapter =>
-          case _: ColumnarInputAdapter =>
-          case _: InputIteratorTransformer =>
-          case _: ColumnarToRowTransition =>
-          case _: RowToColumnarTransition =>
-          case p: ReusedExchangeExec =>
-          case p: AdaptiveSparkPlanExec if isFinalAdaptivePlan(p) =>
-            collect(p.executedPlan)
-          case p: AdaptiveSparkPlanExec =>
-            // if we are here that means we are inside table cache.
-            val (innerNumGlutenNodes, innerFallbackNodeToReason) =
-              withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
-                // re-plan manually to skip cached data
-                val newSparkPlan = QueryExecution.createSparkPlan(
-                  dateset.sparkSession,
-                  dateset.sparkSession.sessionState.planner,
-                  p.inputPlan.logicalLink.get)
-                val newExecutedPlan = QueryExecution.prepareExecutedPlan(
-                  dateset.sparkSession,
-                  newSparkPlan
-                )
-                processPlan(
-                  newExecutedPlan,
-                  new PlanStringConcat().append,
-                  Some(plan => collectFallbackNodes(plan)))
-              }
-            numGlutenNodes += innerNumGlutenNodes
-            fallbackNodeToReason.++=(innerFallbackNodeToReason)
-          case p: QueryStageExec => collect(p.plan)
-          case p: GlutenPlan =>
-            numGlutenNodes += 1
-            p.innerChildren.foreach(collect)
-          case i: InMemoryTableScanExec =>
-            if (PlanUtil.isGlutenTableCache(i)) {
-              numGlutenNodes += 1
-            } else {
-              addFallbackNodeWithReason(i, "Columnar table cache is disabled", 
fallbackNodeToReason)
+  private def collectFallbackNodes(spark: SparkSession, plan: QueryPlan[_]): 
FallbackInfo = {
+    var numGlutenNodes = 0
+    val fallbackNodeToReason = new mutable.HashMap[String, String]
+
+    def collect(tmp: QueryPlan[_]): Unit = {
+      tmp.foreachUp {
+        case _: ExecutedCommandExec =>
+        case _: CommandResultExec =>
+        case _: V2CommandExec =>
+        case _: DataWritingCommandExec =>
+        case _: WholeStageCodegenExec =>
+        case _: WholeStageTransformer =>
+        case _: InputAdapter =>
+        case _: ColumnarInputAdapter =>
+        case _: InputIteratorTransformer =>
+        case _: ColumnarToRowTransition =>
+        case _: RowToColumnarTransition =>
+        case p: ReusedExchangeExec =>
+        case p: AdaptiveSparkPlanExec if isFinalAdaptivePlan(p) =>
+          collect(p.executedPlan)
+        case p: AdaptiveSparkPlanExec =>
+          // if we are here that means we are inside table cache.
+          val (innerNumGlutenNodes, innerFallbackNodeToReason) =
+            withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
+              // re-plan manually to skip cached data
+              val newSparkPlan = QueryExecution.createSparkPlan(
+                spark,
+                spark.sessionState.planner,
+                p.inputPlan.logicalLink.get)
+              val newExecutedPlan = QueryExecution.prepareExecutedPlan(
+                spark,
+                newSparkPlan
+              )
+              processPlan(
+                newExecutedPlan,
+                new PlanStringConcat().append,
+                Some(plan => collectFallbackNodes(spark, plan)))
             }
-            collect(i.relation.cachedPlan)
-          case _: AQEShuffleReadExec => // Ignore
-          case p: SparkPlan =>
-            handleVanillaSparkPlan(p, fallbackNodeToReason)
-            p.innerChildren.foreach(collect)
-          case _ =>
-        }
+          numGlutenNodes += innerNumGlutenNodes
+          fallbackNodeToReason.++=(innerFallbackNodeToReason)
+        case p: QueryStageExec => collect(p.plan)
+        case p: GlutenPlan =>
+          numGlutenNodes += 1
+          p.innerChildren.foreach(collect)
+        case i: InMemoryTableScanExec =>
+          if (PlanUtil.isGlutenTableCache(i)) {
+            numGlutenNodes += 1
+          } else {
+            addFallbackNodeWithReason(i, "Columnar table cache is disabled", 
fallbackNodeToReason)
+          }
+          collect(i.relation.cachedPlan)
+        case _: AQEShuffleReadExec => // Ignore
+        case p: SparkPlan =>
+          handleVanillaSparkPlan(p, fallbackNodeToReason)
+          p.innerChildren.foreach(collect)
+        case _ =>
       }
-
-      collect(plan)
-      (numGlutenNodes, fallbackNodeToReason.toMap)
     }
 
-    private def collectQueryExecutionFallbackSummary(qe: QueryExecution): 
FallbackSummary = {
-      var totalNumGlutenNodes = 0
-      var totalNumFallbackNodes = 0
-      val totalPhysicalPlanDescription = new ArrayBuffer[String]()
-      val totalFallbackNodeToReason = new ArrayBuffer[Map[String, String]]()
-
-      def handlePlanWithAQEAndTableCache(
-          plan: SparkPlan,
-          logicalPlan: LogicalPlan,
-          isMaterialized: Boolean): Unit = {
-        val concat = new PlanStringConcat()
-        val collectFallbackFunc = Some(plan => collectFallbackNodes(plan))
-        val (numGlutenNodes, fallbackNodeToReason) = if (!isMaterialized) {
-          withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
-            // AQE is not materialized, so the columnar rules are not applied.
-            // For this case, We apply columnar rules manually with disable 
AQE.
-            val qe = dateset.sparkSession.sessionState.executePlan(logicalPlan)
-            processPlan(qe.executedPlan, concat.append, collectFallbackFunc)
-          }
-        } else {
-          processPlan(plan, concat.append, collectFallbackFunc)
-        }
-        totalNumGlutenNodes += numGlutenNodes
-        totalNumFallbackNodes += fallbackNodeToReason.size
-        totalPhysicalPlanDescription.append(concat.toString())
-        totalFallbackNodeToReason.append(fallbackNodeToReason)
-      }
+    collect(plan)
+    (numGlutenNodes, fallbackNodeToReason.toMap)
+  }
 
-      // For command-like query, e.g., `INSERT INTO TABLE ...`
-      qe.commandExecuted.foreach {
-        case r: CommandResult =>
-          handlePlanWithAQEAndTableCache(r.commandPhysicalPlan, 
r.commandLogicalPlan, true)
-        case _ => // ignore
+  // collect fallback sumaary from query execution, make this method public as 
a util method
+  def collectQueryExecutionFallbackSummary(
+      spark: SparkSession,
+      qe: QueryExecution): FallbackSummary = {
+    var totalNumGlutenNodes = 0
+    var totalNumFallbackNodes = 0
+    val totalPhysicalPlanDescription = new ArrayBuffer[String]()
+    val totalFallbackNodeToReason = new ArrayBuffer[Map[String, String]]()
+
+    def handlePlanWithAQEAndTableCache(
+        plan: SparkPlan,
+        logicalPlan: LogicalPlan,
+        isMaterialized: Boolean): Unit = {
+      val concat = new PlanStringConcat()
+      val collectFallbackFunc = Some(plan => collectFallbackNodes(spark, plan))
+      val (numGlutenNodes, fallbackNodeToReason) = if (!isMaterialized) {
+        withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
+          // AQE is not materialized, so the columnar rules are not applied.
+          // For this case, We apply columnar rules manually with disable AQE.
+          val qe = spark.sessionState.executePlan(logicalPlan)
+          processPlan(qe.executedPlan, concat.append, collectFallbackFunc)
+        }
+      } else {
+        processPlan(plan, concat.append, collectFallbackFunc)
       }
+      totalNumGlutenNodes += numGlutenNodes
+      totalNumFallbackNodes += fallbackNodeToReason.size
+      totalPhysicalPlanDescription.append(concat.toString())
+      totalFallbackNodeToReason.append(fallbackNodeToReason)
+    }
 
-      // For query, e.g., `SELECT * FROM ...`
-      if (qe.executedPlan.find(_.isInstanceOf[CommandResultExec]).isEmpty) {
-        val isMaterialized = qe.executedPlan.find {
-          case a: AdaptiveSparkPlanExec if isFinalAdaptivePlan(a) => true
-          case _ => false
-        }.isDefined
-        handlePlanWithAQEAndTableCache(qe.executedPlan, qe.analyzed, 
isMaterialized)
-      }
+    // For command-like query, e.g., `INSERT INTO TABLE ...`
+    qe.commandExecuted.foreach {
+      case r: CommandResult =>
+        handlePlanWithAQEAndTableCache(r.commandPhysicalPlan, 
r.commandLogicalPlan, true)
+      case _ => // ignore
+    }
 
-      FallbackSummary(
-        totalNumGlutenNodes,
-        totalNumFallbackNodes,
-        totalPhysicalPlanDescription.toSeq,
-        totalFallbackNodeToReason.toSeq
-      )
+    // For query, e.g., `SELECT * FROM ...`
+    if (qe.executedPlan.find(_.isInstanceOf[CommandResultExec]).isEmpty) {
+      val isMaterialized = qe.executedPlan.find {
+        case a: AdaptiveSparkPlanExec if isFinalAdaptivePlan(a) => true
+        case _ => false
+      }.isDefined
+      handlePlanWithAQEAndTableCache(qe.executedPlan, qe.analyzed, 
isMaterialized)
     }
 
+    FallbackSummary(
+      totalNumGlutenNodes,
+      totalNumFallbackNodes,
+      totalPhysicalPlanDescription.toSeq,
+      totalFallbackNodeToReason.toSeq
+    )
+  }
+
+  implicit class DatasetTransformer[T](dateset: Dataset[T]) {
     def fallbackSummary(): FallbackSummary = {
-      collectQueryExecutionFallbackSummary(dateset.queryExecution)
+      collectQueryExecutionFallbackSummary(dateset.sparkSession, 
dateset.queryExecution)
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to