This is an automated email from the ASF dual-hosted git repository.
changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 26ff58d3b [GLUTEN-5944][CH] Fallback to run delta vacuum command
(#5945)
26ff58d3b is described below
commit 26ff58d3b85485ca0a7b9d6bab2d82ae5aeff49b
Author: Zhichao Zhang <[email protected]>
AuthorDate: Sat Jun 1 14:47:55 2024 +0800
[GLUTEN-5944][CH] Fallback to run delta vacuum command (#5945)
Fallback to run delta vacuum command:
When AQE is on, now gluten CH backend + Delta ran delta vacuum command
failed, fallback to run it first.
Close #5944.
---
.../spark/sql/delta/commands/VacuumCommand.scala | 19 +++++++++++++++
.../spark/sql/delta/commands/VacuumCommand.scala | 19 +++++++++++++++
.../GlutenClickHouseDeltaParquetWriteSuite.scala | 12 ----------
.../GlutenClickHouseMergeTreeOptimizeSuite.scala | 27 ----------------------
.../GlutenClickHouseTableAfterRestart.scala | 4 ----
5 files changed, 38 insertions(+), 43 deletions(-)
diff --git
a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/VacuumCommand.scala
b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/VacuumCommand.scala
index 3a390f64d..c5527933b 100644
---
a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/VacuumCommand.scala
+++
b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/VacuumCommand.scala
@@ -16,6 +16,8 @@
*/
package org.apache.spark.sql.delta.commands
+import org.apache.gluten.utils.QueryPlanSelector
+
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.actions.{FileAction, RemoveFile}
@@ -141,6 +143,13 @@ object VacuumCommand extends VacuumCommandImpl with
Serializable {
val relativizeIgnoreError =
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_VACUUM_RELATIVIZE_IGNORE_ERROR)
+ // --- modified start
+ val originalEnabledGluten =
+
spark.sparkContext.getLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY)
+ // gluten can not support vacuum command
+
spark.sparkContext.setLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY,
"false")
+ // --- modified end
+
val validFiles = snapshot.stateDS
.mapPartitions {
actions =>
@@ -358,6 +367,16 @@ object VacuumCommand extends VacuumCommandImpl with
Serializable {
spark.createDataset(Seq(basePath)).toDF("path")
} finally {
allFilesAndDirs.unpersist()
+
+ // --- modified start
+ if (originalEnabledGluten != null) {
+ spark.sparkContext.setLocalProperty(
+ QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY,
originalEnabledGluten)
+ } else {
+ spark.sparkContext.setLocalProperty(
+ QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY, "true")
+ }
+ // --- modified end
}
}
}
diff --git
a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/commands/VacuumCommand.scala
b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/commands/VacuumCommand.scala
index 5be548caf..9f82feeee 100644
---
a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/commands/VacuumCommand.scala
+++
b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/commands/VacuumCommand.scala
@@ -33,6 +33,7 @@ import
com.fasterxml.jackson.databind.annotation.JsonDeserialize
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.gluten.utils.QueryPlanSelector
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.{Column, DataFrame, Dataset, SparkSession}
import
org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseConfig
@@ -157,6 +158,14 @@ object VacuumCommand extends VacuumCommandImpl with
Serializable {
val relativizeIgnoreError =
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_VACUUM_RELATIVIZE_IGNORE_ERROR)
val startTimeToIdentifyEligibleFiles = System.currentTimeMillis()
+
+ // --- modified start
+ val originalEnabledGluten =
+
spark.sparkContext.getLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY)
+ // gluten can not support vacuum command
+
spark.sparkContext.setLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY,
"false")
+ // --- modified end
+
val validFiles = snapshot.stateDS
.mapPartitions { actions =>
val reservoirBase = new Path(basePath)
@@ -349,6 +358,16 @@ object VacuumCommand extends VacuumCommandImpl with
Serializable {
spark.createDataset(Seq(basePath)).toDF("path")
} finally {
allFilesAndDirs.unpersist()
+
+ // --- modified start
+ if (originalEnabledGluten != null) {
+ spark.sparkContext.setLocalProperty(
+ QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY,
originalEnabledGluten)
+ } else {
+ spark.sparkContext.setLocalProperty(
+ QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY, "true")
+ }
+ // --- modified end
}
}
}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDeltaParquetWriteSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDeltaParquetWriteSuite.scala
index a097fc6cd..8fab604de 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDeltaParquetWriteSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDeltaParquetWriteSuite.scala
@@ -16,8 +16,6 @@
*/
package org.apache.gluten.execution
-import org.apache.gluten.GlutenConfig
-
import org.apache.spark.SparkConf
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.delta.actions.AddFile
@@ -1311,7 +1309,6 @@ class GlutenClickHouseDeltaParquetWriteSuite
val ret = spark.sql("select count(*) from
lineitem_delta_parquet_optimize_p2").collect()
assert(ret.apply(0).get(0) == 600572)
- spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
assert(countFiles(new
File(s"$basePath/lineitem_delta_parquet_optimize_p2")) == 23)
spark.sql("VACUUM lineitem_delta_parquet_optimize_p2 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
@@ -1319,7 +1316,6 @@ class GlutenClickHouseDeltaParquetWriteSuite
} else {
assert(countFiles(new
File(s"$basePath/lineitem_delta_parquet_optimize_p2")) == 7)
}
- spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
val ret2 = spark.sql("select count(*) from
lineitem_delta_parquet_optimize_p2").collect()
assert(ret2.apply(0).get(0) == 600572)
@@ -1343,7 +1339,6 @@ class GlutenClickHouseDeltaParquetWriteSuite
val ret = spark.sql("select count(*) from
lineitem_delta_parquet_optimize_p4").collect()
assert(ret.apply(0).get(0) == 600572)
- spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
assert(countFiles(new
File(s"$basePath/lineitem_delta_parquet_optimize_p4")) == 149)
spark.sql("VACUUM lineitem_delta_parquet_optimize_p4 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
@@ -1351,7 +1346,6 @@ class GlutenClickHouseDeltaParquetWriteSuite
} else {
assert(countFiles(new
File(s"$basePath/lineitem_delta_parquet_optimize_p4")) == 25)
}
- spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
val ret2 = spark.sql("select count(*) from
lineitem_delta_parquet_optimize_p4").collect()
assert(ret2.apply(0).get(0) == 600572)
@@ -1377,9 +1371,7 @@ class GlutenClickHouseDeltaParquetWriteSuite
val clickhouseTable = DeltaTable.forPath(spark, dataPath)
clickhouseTable.optimize().executeCompaction()
- spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
clickhouseTable.vacuum(0.0)
- spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(dataPath)) == 27)
} else {
@@ -1397,9 +1389,7 @@ class GlutenClickHouseDeltaParquetWriteSuite
val clickhouseTable = DeltaTable.forPath(spark, dataPath)
clickhouseTable.optimize().executeCompaction()
- spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
clickhouseTable.vacuum(0.0)
- spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(dataPath)) == 6)
} else {
@@ -1414,9 +1404,7 @@ class GlutenClickHouseDeltaParquetWriteSuite
val clickhouseTable = DeltaTable.forPath(spark, dataPath)
clickhouseTable.optimize().executeCompaction()
- spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
clickhouseTable.vacuum(0.0)
- spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(dataPath)) == 5)
} else {
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala
index c94a3bf50..650bbcc7b 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala
@@ -16,8 +16,6 @@
*/
package org.apache.gluten.execution
-import org.apache.gluten.GlutenConfig
-
import org.apache.spark.SparkConf
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
@@ -124,7 +122,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
val ret = spark.sql("select count(*) from
lineitem_mergetree_optimize_p").collect()
assert(ret.apply(0).get(0) == 600572)
- spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p")) ==
22728)
spark.sql("VACUUM lineitem_mergetree_optimize_p RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
@@ -134,8 +131,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p"))
== 22730)
}
- spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
-
val ret2 = spark.sql("select count(*) from
lineitem_mergetree_optimize_p").collect()
assert(ret2.apply(0).get(0) == 600572)
}
@@ -167,7 +162,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
val ret = spark.sql("select count(*) from
lineitem_mergetree_optimize_p2").collect()
assert(ret.apply(0).get(0) == 600572)
- spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2"))
== 372)
spark.sql("VACUUM lineitem_mergetree_optimize_p2 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
@@ -182,7 +176,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
} else {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2"))
== 226)
}
- spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
val ret2 = spark.sql("select count(*) from
lineitem_mergetree_optimize_p2").collect()
assert(ret2.apply(0).get(0) == 600572)
@@ -206,7 +199,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
val ret = spark.sql("select count(*) from
lineitem_mergetree_optimize_p3").collect()
assert(ret.apply(0).get(0) == 600572)
- spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3"))
== 516)
spark.sql("VACUUM lineitem_mergetree_optimize_p3 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
@@ -220,7 +212,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
} else {
assert(countFiles(new
File(s"$basePath/lineitem_mergetree_optimize_p3")) == 282)
}
- spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
val ret2 = spark.sql("select count(*) from
lineitem_mergetree_optimize_p3").collect()
assert(ret2.apply(0).get(0) == 600572)
@@ -245,7 +236,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
val ret = spark.sql("select count(*) from
lineitem_mergetree_optimize_p4").collect()
assert(ret.apply(0).get(0) == 600572)
- spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4"))
== 516)
spark.sql("VACUUM lineitem_mergetree_optimize_p4 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
@@ -259,7 +249,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
} else {
assert(countFiles(new
File(s"$basePath/lineitem_mergetree_optimize_p4")) == 282)
}
- spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
val ret2 = spark.sql("select count(*) from
lineitem_mergetree_optimize_p4").collect()
assert(ret2.apply(0).get(0) == 600572)
@@ -283,7 +272,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
spark.sql("optimize lineitem_mergetree_optimize_p5")
- spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS")
spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
@@ -293,7 +281,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
// this case will create a checkpoint
assert(countFiles(new
File(s"$basePath/lineitem_mergetree_optimize_p5")) == 105)
}
- spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
val ret = spark.sql("select count(*) from
lineitem_mergetree_optimize_p5").collect()
assert(ret.apply(0).get(0) == 600572)
@@ -309,7 +296,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
spark.sql("optimize lineitem_mergetree_optimize_p5")
- spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS")
spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
@@ -318,7 +304,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
// For Spark 3.3 + Delta 2.3, vacuum command will create two commit
files in deltalog dir.
assert(countFiles(new
File(s"$basePath/lineitem_mergetree_optimize_p5")) == 104)
}
- spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
val ret = spark.sql("select count(*) from
lineitem_mergetree_optimize_p5").collect()
assert(ret.apply(0).get(0) == 600572)
@@ -327,7 +312,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
// now merge all parts (testing merging from merged parts)
spark.sql("optimize lineitem_mergetree_optimize_p5")
- spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS")
spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
@@ -336,7 +320,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
// For Spark 3.3 + Delta 2.3, vacuum command will create two commit
files in deltalog dir.
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5"))
== 93)
}
- spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
val ret = spark.sql("select count(*) from
lineitem_mergetree_optimize_p5").collect()
assert(ret.apply(0).get(0) == 600572)
@@ -362,7 +345,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
val ret = spark.sql("select count(*) from
lineitem_mergetree_optimize_p6").collect()
assert(ret.apply(0).get(0) == 600572)
- spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p6"))
== {
if (sparkVersion.equals("3.2")) 499 else 528
})
@@ -371,7 +353,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p6"))
== {
if (sparkVersion.equals("3.2")) 315 else 327
})
- spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
val ret2 = spark.sql("select count(*) from
lineitem_mergetree_optimize_p6").collect()
assert(ret2.apply(0).get(0) == 600572)
@@ -394,9 +375,7 @@ class GlutenClickHouseMergeTreeOptimizeSuite
|""".stripMargin)
spark.sql("optimize lineitem_mergetree_index")
- spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
spark.sql("vacuum lineitem_mergetree_index")
- spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
val df = spark
.sql(s"""
@@ -440,10 +419,8 @@ class GlutenClickHouseMergeTreeOptimizeSuite
val clickhouseTable = ClickhouseTable.forPath(spark, dataPath)
clickhouseTable.optimize().executeCompaction()
- spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
clickhouseTable.vacuum(0.0)
clickhouseTable.vacuum(0.0)
- spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(dataPath)) == 99)
} else {
@@ -465,10 +442,8 @@ class GlutenClickHouseMergeTreeOptimizeSuite
val clickhouseTable = ClickhouseTable.forPath(spark, dataPath)
clickhouseTable.optimize().executeCompaction()
- spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
clickhouseTable.vacuum(0.0)
clickhouseTable.vacuum(0.0)
- spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(dataPath)) == 93)
} else {
@@ -483,10 +458,8 @@ class GlutenClickHouseMergeTreeOptimizeSuite
val clickhouseTable = ClickhouseTable.forPath(spark, dataPath)
clickhouseTable.optimize().executeCompaction()
- spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
clickhouseTable.vacuum(0.0)
clickhouseTable.vacuum(0.0)
- spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(dataPath)) == 77)
} else {
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala
index 9e55df0fa..baf79436c 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala
@@ -16,8 +16,6 @@
*/
package org.apache.gluten.execution
-import org.apache.gluten.GlutenConfig
-
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession.{getActiveSession, getDefaultSession}
@@ -250,9 +248,7 @@ class GlutenClickHouseTableAfterRestart
restartSpark()
- spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
spark.sql("vacuum table_restart_vacuum")
- spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
assert(spark.sql("select count(*) from
table_restart_vacuum").collect().apply(0).get(0) == 4)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]