This is an automated email from the ASF dual-hosted git repository.

changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 26ff58d3b [GLUTEN-5944][CH] Fallback to run delta vacuum command 
(#5945)
26ff58d3b is described below

commit 26ff58d3b85485ca0a7b9d6bab2d82ae5aeff49b
Author: Zhichao Zhang <[email protected]>
AuthorDate: Sat Jun 1 14:47:55 2024 +0800

    [GLUTEN-5944][CH] Fallback to run delta vacuum command (#5945)
    
    Fallback to run delta vacuum command:
    When AQE is on, now gluten CH backend + Delta ran delta vacuum command 
failed, fallback to run it first.
    
    Close #5944.
---
 .../spark/sql/delta/commands/VacuumCommand.scala   | 19 +++++++++++++++
 .../spark/sql/delta/commands/VacuumCommand.scala   | 19 +++++++++++++++
 .../GlutenClickHouseDeltaParquetWriteSuite.scala   | 12 ----------
 .../GlutenClickHouseMergeTreeOptimizeSuite.scala   | 27 ----------------------
 .../GlutenClickHouseTableAfterRestart.scala        |  4 ----
 5 files changed, 38 insertions(+), 43 deletions(-)

diff --git 
a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/VacuumCommand.scala
 
b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/VacuumCommand.scala
index 3a390f64d..c5527933b 100644
--- 
a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/VacuumCommand.scala
+++ 
b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/VacuumCommand.scala
@@ -16,6 +16,8 @@
  */
 package org.apache.spark.sql.delta.commands
 
+import org.apache.gluten.utils.QueryPlanSelector
+
 import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.spark.sql.delta._
 import org.apache.spark.sql.delta.actions.{FileAction, RemoveFile}
@@ -141,6 +143,13 @@ object VacuumCommand extends VacuumCommandImpl with 
Serializable {
       val relativizeIgnoreError =
         
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_VACUUM_RELATIVIZE_IGNORE_ERROR)
 
+      // --- modified start
+      val originalEnabledGluten =
+        
spark.sparkContext.getLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY)
+      // gluten can not support vacuum command
+      
spark.sparkContext.setLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY,
 "false")
+      // --- modified end
+
       val validFiles = snapshot.stateDS
         .mapPartitions {
           actions =>
@@ -358,6 +367,16 @@ object VacuumCommand extends VacuumCommandImpl with 
Serializable {
         spark.createDataset(Seq(basePath)).toDF("path")
       } finally {
         allFilesAndDirs.unpersist()
+
+        // --- modified start
+        if (originalEnabledGluten != null) {
+          spark.sparkContext.setLocalProperty(
+            QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY, 
originalEnabledGluten)
+        } else {
+          spark.sparkContext.setLocalProperty(
+            QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY, "true")
+        }
+        // --- modified end
       }
     }
   }
diff --git 
a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/commands/VacuumCommand.scala
 
b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/commands/VacuumCommand.scala
index 5be548caf..9f82feeee 100644
--- 
a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/commands/VacuumCommand.scala
+++ 
b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/commands/VacuumCommand.scala
@@ -33,6 +33,7 @@ import 
com.fasterxml.jackson.databind.annotation.JsonDeserialize
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
+import org.apache.gluten.utils.QueryPlanSelector
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.sql.{Column, DataFrame, Dataset, SparkSession}
 import 
org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseConfig
@@ -157,6 +158,14 @@ object VacuumCommand extends VacuumCommandImpl with 
Serializable {
       val relativizeIgnoreError =
         
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_VACUUM_RELATIVIZE_IGNORE_ERROR)
       val startTimeToIdentifyEligibleFiles = System.currentTimeMillis()
+
+      // --- modified start
+      val originalEnabledGluten =
+        
spark.sparkContext.getLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY)
+      // gluten can not support vacuum command
+      
spark.sparkContext.setLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY,
 "false")
+      // --- modified end
+
       val validFiles = snapshot.stateDS
         .mapPartitions { actions =>
           val reservoirBase = new Path(basePath)
@@ -349,6 +358,16 @@ object VacuumCommand extends VacuumCommandImpl with 
Serializable {
         spark.createDataset(Seq(basePath)).toDF("path")
       } finally {
         allFilesAndDirs.unpersist()
+
+        // --- modified start
+        if (originalEnabledGluten != null) {
+          spark.sparkContext.setLocalProperty(
+            QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY, 
originalEnabledGluten)
+        } else {
+          spark.sparkContext.setLocalProperty(
+            QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY, "true")
+        }
+        // --- modified end
       }
     }
   }
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDeltaParquetWriteSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDeltaParquetWriteSuite.scala
index a097fc6cd..8fab604de 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDeltaParquetWriteSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDeltaParquetWriteSuite.scala
@@ -16,8 +16,6 @@
  */
 package org.apache.gluten.execution
 
-import org.apache.gluten.GlutenConfig
-
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.delta.actions.AddFile
@@ -1311,7 +1309,6 @@ class GlutenClickHouseDeltaParquetWriteSuite
     val ret = spark.sql("select count(*) from 
lineitem_delta_parquet_optimize_p2").collect()
     assert(ret.apply(0).get(0) == 600572)
 
-    spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
     assert(countFiles(new 
File(s"$basePath/lineitem_delta_parquet_optimize_p2")) == 23)
     spark.sql("VACUUM lineitem_delta_parquet_optimize_p2 RETAIN 0 HOURS")
     if (sparkVersion.equals("3.2")) {
@@ -1319,7 +1316,6 @@ class GlutenClickHouseDeltaParquetWriteSuite
     } else {
       assert(countFiles(new 
File(s"$basePath/lineitem_delta_parquet_optimize_p2")) == 7)
     }
-    spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
 
     val ret2 = spark.sql("select count(*) from 
lineitem_delta_parquet_optimize_p2").collect()
     assert(ret2.apply(0).get(0) == 600572)
@@ -1343,7 +1339,6 @@ class GlutenClickHouseDeltaParquetWriteSuite
       val ret = spark.sql("select count(*) from 
lineitem_delta_parquet_optimize_p4").collect()
       assert(ret.apply(0).get(0) == 600572)
 
-      spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
       assert(countFiles(new 
File(s"$basePath/lineitem_delta_parquet_optimize_p4")) == 149)
       spark.sql("VACUUM lineitem_delta_parquet_optimize_p4 RETAIN 0 HOURS")
       if (sparkVersion.equals("3.2")) {
@@ -1351,7 +1346,6 @@ class GlutenClickHouseDeltaParquetWriteSuite
       } else {
         assert(countFiles(new 
File(s"$basePath/lineitem_delta_parquet_optimize_p4")) == 25)
       }
-      spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
 
       val ret2 = spark.sql("select count(*) from 
lineitem_delta_parquet_optimize_p4").collect()
       assert(ret2.apply(0).get(0) == 600572)
@@ -1377,9 +1371,7 @@ class GlutenClickHouseDeltaParquetWriteSuite
       val clickhouseTable = DeltaTable.forPath(spark, dataPath)
       clickhouseTable.optimize().executeCompaction()
 
-      spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
       clickhouseTable.vacuum(0.0)
-      spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
       if (sparkVersion.equals("3.2")) {
         assert(countFiles(new File(dataPath)) == 27)
       } else {
@@ -1397,9 +1389,7 @@ class GlutenClickHouseDeltaParquetWriteSuite
       val clickhouseTable = DeltaTable.forPath(spark, dataPath)
       clickhouseTable.optimize().executeCompaction()
 
-      spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
       clickhouseTable.vacuum(0.0)
-      spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
       if (sparkVersion.equals("3.2")) {
         assert(countFiles(new File(dataPath)) == 6)
       } else {
@@ -1414,9 +1404,7 @@ class GlutenClickHouseDeltaParquetWriteSuite
     val clickhouseTable = DeltaTable.forPath(spark, dataPath)
     clickhouseTable.optimize().executeCompaction()
 
-    spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
     clickhouseTable.vacuum(0.0)
-    spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
     if (sparkVersion.equals("3.2")) {
       assert(countFiles(new File(dataPath)) == 5)
     } else {
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala
index c94a3bf50..650bbcc7b 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala
@@ -16,8 +16,6 @@
  */
 package org.apache.gluten.execution
 
-import org.apache.gluten.GlutenConfig
-
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
@@ -124,7 +122,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
     val ret = spark.sql("select count(*) from 
lineitem_mergetree_optimize_p").collect()
     assert(ret.apply(0).get(0) == 600572)
 
-    spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
     assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p")) == 
22728)
     spark.sql("VACUUM lineitem_mergetree_optimize_p RETAIN 0 HOURS")
     if (sparkVersion.equals("3.2")) {
@@ -134,8 +131,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
       assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p")) 
== 22730)
     }
 
-    spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
-
     val ret2 = spark.sql("select count(*) from 
lineitem_mergetree_optimize_p").collect()
     assert(ret2.apply(0).get(0) == 600572)
   }
@@ -167,7 +162,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
     val ret = spark.sql("select count(*) from 
lineitem_mergetree_optimize_p2").collect()
     assert(ret.apply(0).get(0) == 600572)
 
-    spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
     assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")) 
== 372)
     spark.sql("VACUUM lineitem_mergetree_optimize_p2 RETAIN 0 HOURS")
     if (sparkVersion.equals("3.2")) {
@@ -182,7 +176,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
     } else {
       assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")) 
== 226)
     }
-    spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
 
     val ret2 = spark.sql("select count(*) from 
lineitem_mergetree_optimize_p2").collect()
     assert(ret2.apply(0).get(0) == 600572)
@@ -206,7 +199,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
       val ret = spark.sql("select count(*) from 
lineitem_mergetree_optimize_p3").collect()
       assert(ret.apply(0).get(0) == 600572)
 
-      spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
       assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) 
== 516)
       spark.sql("VACUUM lineitem_mergetree_optimize_p3 RETAIN 0 HOURS")
       if (sparkVersion.equals("3.2")) {
@@ -220,7 +212,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
       } else {
         assert(countFiles(new 
File(s"$basePath/lineitem_mergetree_optimize_p3")) == 282)
       }
-      spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
 
       val ret2 = spark.sql("select count(*) from 
lineitem_mergetree_optimize_p3").collect()
       assert(ret2.apply(0).get(0) == 600572)
@@ -245,7 +236,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
       val ret = spark.sql("select count(*) from 
lineitem_mergetree_optimize_p4").collect()
       assert(ret.apply(0).get(0) == 600572)
 
-      spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
       assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) 
== 516)
       spark.sql("VACUUM lineitem_mergetree_optimize_p4 RETAIN 0 HOURS")
       if (sparkVersion.equals("3.2")) {
@@ -259,7 +249,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
       } else {
         assert(countFiles(new 
File(s"$basePath/lineitem_mergetree_optimize_p4")) == 282)
       }
-      spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
 
       val ret2 = spark.sql("select count(*) from 
lineitem_mergetree_optimize_p4").collect()
       assert(ret2.apply(0).get(0) == 600572)
@@ -283,7 +272,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
 
       spark.sql("optimize lineitem_mergetree_optimize_p5")
 
-      spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
       spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS")
       spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS")
       if (sparkVersion.equals("3.2")) {
@@ -293,7 +281,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
         // this case will create a checkpoint
         assert(countFiles(new 
File(s"$basePath/lineitem_mergetree_optimize_p5")) == 105)
       }
-      spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
 
       val ret = spark.sql("select count(*) from 
lineitem_mergetree_optimize_p5").collect()
       assert(ret.apply(0).get(0) == 600572)
@@ -309,7 +296,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
 
       spark.sql("optimize lineitem_mergetree_optimize_p5")
 
-      spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
       spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS")
       spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS")
       if (sparkVersion.equals("3.2")) {
@@ -318,7 +304,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
         // For Spark 3.3 + Delta 2.3, vacuum command will create two commit 
files in deltalog dir.
         assert(countFiles(new 
File(s"$basePath/lineitem_mergetree_optimize_p5")) == 104)
       }
-      spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
 
       val ret = spark.sql("select count(*) from 
lineitem_mergetree_optimize_p5").collect()
       assert(ret.apply(0).get(0) == 600572)
@@ -327,7 +312,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
     // now merge all parts (testing merging from merged parts)
     spark.sql("optimize lineitem_mergetree_optimize_p5")
 
-    spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
     spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS")
     spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS")
     if (sparkVersion.equals("3.2")) {
@@ -336,7 +320,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
       // For Spark 3.3 + Delta 2.3, vacuum command will create two commit 
files in deltalog dir.
       assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")) 
== 93)
     }
-    spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
 
     val ret = spark.sql("select count(*) from 
lineitem_mergetree_optimize_p5").collect()
     assert(ret.apply(0).get(0) == 600572)
@@ -362,7 +345,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
     val ret = spark.sql("select count(*) from 
lineitem_mergetree_optimize_p6").collect()
     assert(ret.apply(0).get(0) == 600572)
 
-    spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
     assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p6")) 
== {
       if (sparkVersion.equals("3.2")) 499 else 528
     })
@@ -371,7 +353,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
     assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p6")) 
== {
       if (sparkVersion.equals("3.2")) 315 else 327
     })
-    spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
 
     val ret2 = spark.sql("select count(*) from 
lineitem_mergetree_optimize_p6").collect()
     assert(ret2.apply(0).get(0) == 600572)
@@ -394,9 +375,7 @@ class GlutenClickHouseMergeTreeOptimizeSuite
                    |""".stripMargin)
 
       spark.sql("optimize lineitem_mergetree_index")
-      spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
       spark.sql("vacuum lineitem_mergetree_index")
-      spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
 
       val df = spark
         .sql(s"""
@@ -440,10 +419,8 @@ class GlutenClickHouseMergeTreeOptimizeSuite
       val clickhouseTable = ClickhouseTable.forPath(spark, dataPath)
       clickhouseTable.optimize().executeCompaction()
 
-      spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
       clickhouseTable.vacuum(0.0)
       clickhouseTable.vacuum(0.0)
-      spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
       if (sparkVersion.equals("3.2")) {
         assert(countFiles(new File(dataPath)) == 99)
       } else {
@@ -465,10 +442,8 @@ class GlutenClickHouseMergeTreeOptimizeSuite
       val clickhouseTable = ClickhouseTable.forPath(spark, dataPath)
       clickhouseTable.optimize().executeCompaction()
 
-      spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
       clickhouseTable.vacuum(0.0)
       clickhouseTable.vacuum(0.0)
-      spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
       if (sparkVersion.equals("3.2")) {
         assert(countFiles(new File(dataPath)) == 93)
       } else {
@@ -483,10 +458,8 @@ class GlutenClickHouseMergeTreeOptimizeSuite
     val clickhouseTable = ClickhouseTable.forPath(spark, dataPath)
     clickhouseTable.optimize().executeCompaction()
 
-    spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
     clickhouseTable.vacuum(0.0)
     clickhouseTable.vacuum(0.0)
-    spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
     if (sparkVersion.equals("3.2")) {
       assert(countFiles(new File(dataPath)) == 77)
     } else {
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala
index 9e55df0fa..baf79436c 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala
@@ -16,8 +16,6 @@
  */
 package org.apache.gluten.execution
 
-import org.apache.gluten.GlutenConfig
-
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.SparkSession.{getActiveSession, getDefaultSession}
@@ -250,9 +248,7 @@ class GlutenClickHouseTableAfterRestart
 
     restartSpark()
 
-    spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
     spark.sql("vacuum table_restart_vacuum")
-    spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
 
     assert(spark.sql("select count(*) from 
table_restart_vacuum").collect().apply(0).get(0) == 4)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to