This is an automated email from the ASF dual-hosted git repository.
yuanzhou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new b5b454fc7 [GLUTEN-4917][VL] Enable iceberg/delta in new CI (#5230)
b5b454fc7 is described below
commit b5b454fc7e2f399b317405753055752679b8adc9
Author: Yan Ma <[email protected]>
AuthorDate: Wed Apr 3 20:43:25 2024 +0800
[GLUTEN-4917][VL] Enable iceberg/delta in new CI (#5230)
---
.github/workflows/velox_docker.yml | 12 ++++++------
.../apache/gluten/execution/WholeStageTransformer.scala | 2 +-
.../gluten/execution/WholeStageTransformerSuite.scala | 15 +++++++++++++++
.../org/apache/gluten/execution/VeloxDeltaSuite.scala | 1 +
.../org/apache/gluten/execution/VeloxTPCHDeltaSuite.scala | 1 +
.../apache/gluten/execution/IcebergScanTransformer.scala | 4 ++--
.../org/apache/gluten/execution/VeloxIcebergSuite.scala | 15 ---------------
.../apache/gluten/execution/VeloxTPCHIcebergSuite.scala | 5 ++++-
8 files changed, 30 insertions(+), 25 deletions(-)
diff --git a/.github/workflows/velox_docker.yml
b/.github/workflows/velox_docker.yml
index ddee9b496..76dd956e8 100644
--- a/.github/workflows/velox_docker.yml
+++ b/.github/workflows/velox_docker.yml
@@ -364,8 +364,8 @@ jobs:
export SPARK_SCALA_VERSION=2.12
export MAVEN_HOME=/usr/lib/maven
export PATH=${PATH}:${MAVEN_HOME}/bin
- mvn -ntp clean install -Pspark-3.2 -Pspark-ut -Pbackends-velox -Prss
-DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark32/spark_home/"
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags
&& \
- mvn -ntp test -Pspark-3.2 -Pbackends-velox -DtagsToExclude=None
-DtagsToInclude=org.apache.gluten.tags.UDFTest
+ mvn -ntp clean install -Pspark-3.2 -Pspark-ut -Pbackends-velox -Prss
-Piceberg -Pdelta
-DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark32/spark_home/"
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags
&& \
+ mvn -ntp test -Pspark-3.2 -Pbackends-velox -Piceberg -Pdelta
-DtagsToExclude=None -DtagsToInclude=org.apache.gluten.tags.UDFTest
- name: Upload golden files
if: failure()
uses: actions/upload-artifact@v4
@@ -455,8 +455,8 @@ jobs:
export SPARK_SCALA_VERSION=2.12 && \
export MAVEN_HOME=/usr/lib/maven
export PATH=${PATH}:${MAVEN_HOME}/bin
- mvn -ntp clean install -Pspark-3.3 -Pbackends-velox -Prss -Pspark-ut
-DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark33/spark_home/"
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags
&& \
- mvn -ntp test -Pspark-3.3 -Pbackends-velox -DtagsToExclude=None
-DtagsToInclude=org.apache.gluten.tags.UDFTest
+ mvn -ntp clean install -Pspark-3.3 -Pbackends-velox -Prss -Piceberg
-Pdelta -Pspark-ut
-DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark33/spark_home/"
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags
&& \
+ mvn -ntp test -Pspark-3.3 -Pbackends-velox -Piceberg -Pdelta
-DtagsToExclude=None -DtagsToInclude=org.apache.gluten.tags.UDFTest
- name: Upload golden files
if: failure()
uses: actions/upload-artifact@v4
@@ -542,8 +542,8 @@ jobs:
export SPARK_SCALA_VERSION=2.12 && \
export MAVEN_HOME=/usr/lib/maven
export PATH=${PATH}:${MAVEN_HOME}/bin
- mvn -ntp clean install -Pspark-3.4 -Pbackends-velox -Prss
-Pspark-ut
-DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark34/spark_home/"
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags
&& \
- mvn -ntp test -Pspark-3.4 -Pbackends-velox -DtagsToExclude=None
-DtagsToInclude=org.apache.gluten.tags.UDFTest
+ mvn -ntp clean install -Pspark-3.4 -Pbackends-velox -Prss -Piceberg
-Pdelta -Pspark-ut
-DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark34/spark_home/"
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags
&& \
+ mvn -ntp test -Pspark-3.4 -Pbackends-velox -Piceberg -Pdelta
-DtagsToExclude=None -DtagsToInclude=org.apache.gluten.tags.UDFTest
- name: Upload golden files
if: failure()
uses: actions/upload-artifact@v4
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
index 277750b1a..b809ac4bf 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
@@ -376,7 +376,7 @@ case class WholeStageTransformer(child: SparkPlan,
materializeInput: Boolean = f
// transpose =>
// scan1 | scan2
// p11 | p21 => substraitContext.setSplitInfo([p11, p21])
- // p12 | p22 => substraitContext.setSplitInfo([p11, p22])
+ // p12 | p22 => substraitContext.setSplitInfo([p12, p22])
// p13 | p23 ...
// p14 | p24
// ...
diff --git
a/gluten-core/src/test/scala/org/apache/gluten/execution/WholeStageTransformerSuite.scala
b/gluten-core/src/test/scala/org/apache/gluten/execution/WholeStageTransformerSuite.scala
index 67c12a6f7..a45a8ad7d 100644
---
a/gluten-core/src/test/scala/org/apache/gluten/execution/WholeStageTransformerSuite.scala
+++
b/gluten-core/src/test/scala/org/apache/gluten/execution/WholeStageTransformerSuite.scala
@@ -100,6 +100,21 @@ abstract class WholeStageTransformerSuite
.set("spark.gluten.ui.enabled", "false")
}
+ protected def isSparkVersionAtleast(version: String): Boolean = {
+ val currentVersion = spark.version
+ val currentVersionSplit = currentVersion.split("\\.")
+ val versionSplit = version.split("\\.")
+ currentVersionSplit.zip(versionSplit).foreach {
+ case (current, required) =>
+ if (current.toInt > required.toInt) {
+ return true
+ } else if (current.toInt < required.toInt) {
+ return false
+ }
+ }
+ true
+ }
+
protected def checkFallbackOperators(df: DataFrame, num: Int): Unit = {
// Decrease one VeloxColumnarToRowExec for the top level node
assert(
diff --git
a/gluten-delta/src/test/scala/org/apache/gluten/execution/VeloxDeltaSuite.scala
b/gluten-delta/src/test/scala/org/apache/gluten/execution/VeloxDeltaSuite.scala
index fc483e771..64afd486d 100644
---
a/gluten-delta/src/test/scala/org/apache/gluten/execution/VeloxDeltaSuite.scala
+++
b/gluten-delta/src/test/scala/org/apache/gluten/execution/VeloxDeltaSuite.scala
@@ -82,6 +82,7 @@ class VeloxDeltaSuite extends WholeStageTransformerSuite {
}
test("delta: time travel") {
+ assume(isSparkVersionAtleast("3.3"))
withTable("delta_tm") {
spark.sql(s"""
|create table delta_tm (id int, name string) using delta
diff --git
a/gluten-delta/src/test/scala/org/apache/gluten/execution/VeloxTPCHDeltaSuite.scala
b/gluten-delta/src/test/scala/org/apache/gluten/execution/VeloxTPCHDeltaSuite.scala
index 60ea0fbcc..12b033333 100644
---
a/gluten-delta/src/test/scala/org/apache/gluten/execution/VeloxTPCHDeltaSuite.scala
+++
b/gluten-delta/src/test/scala/org/apache/gluten/execution/VeloxTPCHDeltaSuite.scala
@@ -36,6 +36,7 @@ class VeloxTPCHDeltaSuite extends VeloxTPCHSuite {
override protected def sparkConf: SparkConf = {
super.sparkConf
+ .set("spark.executor.memory", "4g")
.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.set("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog")
}
diff --git
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
index 303a9d79b..9bb33678a 100644
---
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
+++
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
DynamicPruningExpression, Expression, Literal}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.connector.catalog.Table
-import org.apache.spark.sql.connector.read.Scan
+import org.apache.spark.sql.connector.read.{InputPartition, Scan}
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.types.StructType
@@ -56,7 +56,7 @@ case class IcebergScanTransformer(
override lazy val fileFormat: ReadFileFormat =
GlutenIcebergSourceUtil.getFileFormat(scan)
- override def getSplitInfos: Seq[SplitInfo] = {
+ override def getSplitInfosFromPartitions(partitions: Seq[InputPartition]):
Seq[SplitInfo] = {
val groupedPartitions = SparkShimLoader.getSparkShims.orderPartitions(
scan,
keyGroupedPartitioning,
diff --git
a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
index ab1c0737e..46c77d5b8 100644
---
a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
+++
b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
@@ -43,21 +43,6 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
.set("spark.sql.catalog.spark_catalog.warehouse",
s"file://$rootPath/tpch-data-iceberg-velox")
}
- private def isSparkVersionAtleast(version: String): Boolean = {
- val currentVersion = spark.version
- val currentVersionSplit = currentVersion.split("\\.")
- val versionSplit = version.split("\\.")
- currentVersionSplit.zip(versionSplit).foreach {
- case (current, required) =>
- if (current.toInt > required.toInt) {
- return true
- } else if (current.toInt < required.toInt) {
- return false
- }
- }
- true
- }
-
test("iceberg transformer exists") {
withTable("iceberg_tb") {
spark.sql("""
diff --git
a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxTPCHIcebergSuite.scala
b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxTPCHIcebergSuite.scala
index f4eead2a1..44badf159 100644
---
a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxTPCHIcebergSuite.scala
+++
b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxTPCHIcebergSuite.scala
@@ -38,6 +38,7 @@ class VeloxTPCHIcebergSuite extends VeloxTPCHSuite {
override protected def sparkConf: SparkConf = {
super.sparkConf
+ .set("spark.executor.memory", "4g")
.set(
"spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
@@ -99,7 +100,9 @@ class VeloxPartitionedTableTPCHIcebergSuite extends
VeloxTPCHIcebergSuite {
val tablePath = new File(resourcePath, table.name).getAbsolutePath
val tableDF = spark.read.format(fileFormat).load(tablePath)
- tableDF.write
+ tableDF
+ .repartition(50)
+ .write
.format("iceberg")
.partitionBy(table.partitionColumns: _*)
.option(SparkWriteOptions.FANOUT_ENABLED, "true")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]