This is an automated email from the ASF dual-hosted git repository.

yuanzhou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new b5b454fc7 [GLUTEN-4917][VL] Enable iceberg/delta in new CI (#5230)
b5b454fc7 is described below

commit b5b454fc7e2f399b317405753055752679b8adc9
Author: Yan Ma <[email protected]>
AuthorDate: Wed Apr 3 20:43:25 2024 +0800

    [GLUTEN-4917][VL] Enable iceberg/delta in new CI (#5230)
---
 .github/workflows/velox_docker.yml                        | 12 ++++++------
 .../apache/gluten/execution/WholeStageTransformer.scala   |  2 +-
 .../gluten/execution/WholeStageTransformerSuite.scala     | 15 +++++++++++++++
 .../org/apache/gluten/execution/VeloxDeltaSuite.scala     |  1 +
 .../org/apache/gluten/execution/VeloxTPCHDeltaSuite.scala |  1 +
 .../apache/gluten/execution/IcebergScanTransformer.scala  |  4 ++--
 .../org/apache/gluten/execution/VeloxIcebergSuite.scala   | 15 ---------------
 .../apache/gluten/execution/VeloxTPCHIcebergSuite.scala   |  5 ++++-
 8 files changed, 30 insertions(+), 25 deletions(-)

diff --git a/.github/workflows/velox_docker.yml 
b/.github/workflows/velox_docker.yml
index ddee9b496..76dd956e8 100644
--- a/.github/workflows/velox_docker.yml
+++ b/.github/workflows/velox_docker.yml
@@ -364,8 +364,8 @@ jobs:
           export SPARK_SCALA_VERSION=2.12
           export MAVEN_HOME=/usr/lib/maven 
           export PATH=${PATH}:${MAVEN_HOME}/bin
-          mvn -ntp clean install -Pspark-3.2 -Pspark-ut -Pbackends-velox -Prss 
-DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark32/spark_home/" 
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags
 && \
-          mvn -ntp test -Pspark-3.2 -Pbackends-velox -DtagsToExclude=None 
-DtagsToInclude=org.apache.gluten.tags.UDFTest
+          mvn -ntp clean install -Pspark-3.2 -Pspark-ut -Pbackends-velox -Prss 
-Piceberg -Pdelta 
-DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark32/spark_home/" 
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags
 && \
+          mvn -ntp test -Pspark-3.2 -Pbackends-velox -Piceberg -Pdelta 
-DtagsToExclude=None -DtagsToInclude=org.apache.gluten.tags.UDFTest
       - name: Upload golden files
         if: failure()
         uses: actions/upload-artifact@v4
@@ -455,8 +455,8 @@ jobs:
           export SPARK_SCALA_VERSION=2.12 && \
           export MAVEN_HOME=/usr/lib/maven 
           export PATH=${PATH}:${MAVEN_HOME}/bin
-          mvn -ntp clean install -Pspark-3.3 -Pbackends-velox -Prss -Pspark-ut 
-DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark33/spark_home/" 
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags
 && \
-          mvn -ntp test -Pspark-3.3 -Pbackends-velox -DtagsToExclude=None 
-DtagsToInclude=org.apache.gluten.tags.UDFTest
+          mvn -ntp clean install -Pspark-3.3 -Pbackends-velox -Prss -Piceberg 
-Pdelta -Pspark-ut 
-DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark33/spark_home/" 
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags
 && \
+          mvn -ntp test -Pspark-3.3 -Pbackends-velox -Piceberg -Pdelta 
-DtagsToExclude=None -DtagsToInclude=org.apache.gluten.tags.UDFTest
       - name: Upload golden files
         if: failure()
         uses: actions/upload-artifact@v4
@@ -542,8 +542,8 @@ jobs:
           export SPARK_SCALA_VERSION=2.12 && \
           export MAVEN_HOME=/usr/lib/maven 
           export PATH=${PATH}:${MAVEN_HOME}/bin
-          mvn -ntp clean install -Pspark-3.4 -Pbackends-velox -Prss  
-Pspark-ut 
-DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark34/spark_home/" 
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags
 && \
-          mvn -ntp test -Pspark-3.4 -Pbackends-velox -DtagsToExclude=None 
-DtagsToInclude=org.apache.gluten.tags.UDFTest
+          mvn -ntp clean install -Pspark-3.4 -Pbackends-velox -Prss -Piceberg 
-Pdelta -Pspark-ut 
-DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark34/spark_home/" 
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags
 && \
+          mvn -ntp test -Pspark-3.4 -Pbackends-velox -Piceberg -Pdelta 
-DtagsToExclude=None -DtagsToInclude=org.apache.gluten.tags.UDFTest
       - name: Upload golden files
         if: failure()
         uses: actions/upload-artifact@v4
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
index 277750b1a..b809ac4bf 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
@@ -376,7 +376,7 @@ case class WholeStageTransformer(child: SparkPlan, 
materializeInput: Boolean = f
     // transpose =>
     // scan1 | scan2
     //  p11  |  p21    => substraitContext.setSplitInfo([p11, p21])
-    //  p12  |  p22    => substraitContext.setSplitInfo([p11, p22])
+    //  p12  |  p22    => substraitContext.setSplitInfo([p12, p22])
     //  p13  |  p23    ...
     //  p14  |  p24
     //      ...
diff --git 
a/gluten-core/src/test/scala/org/apache/gluten/execution/WholeStageTransformerSuite.scala
 
b/gluten-core/src/test/scala/org/apache/gluten/execution/WholeStageTransformerSuite.scala
index 67c12a6f7..a45a8ad7d 100644
--- 
a/gluten-core/src/test/scala/org/apache/gluten/execution/WholeStageTransformerSuite.scala
+++ 
b/gluten-core/src/test/scala/org/apache/gluten/execution/WholeStageTransformerSuite.scala
@@ -100,6 +100,21 @@ abstract class WholeStageTransformerSuite
       .set("spark.gluten.ui.enabled", "false")
   }
 
+  protected def isSparkVersionAtleast(version: String): Boolean = {
+    val currentVersion = spark.version
+    val currentVersionSplit = currentVersion.split("\\.")
+    val versionSplit = version.split("\\.")
+    currentVersionSplit.zip(versionSplit).foreach {
+      case (current, required) =>
+        if (current.toInt > required.toInt) {
+          return true
+        } else if (current.toInt < required.toInt) {
+          return false
+        }
+    }
+    true
+  }
+
   protected def checkFallbackOperators(df: DataFrame, num: Int): Unit = {
     // Decrease one VeloxColumnarToRowExec for the top level node
     assert(
diff --git 
a/gluten-delta/src/test/scala/org/apache/gluten/execution/VeloxDeltaSuite.scala 
b/gluten-delta/src/test/scala/org/apache/gluten/execution/VeloxDeltaSuite.scala
index fc483e771..64afd486d 100644
--- 
a/gluten-delta/src/test/scala/org/apache/gluten/execution/VeloxDeltaSuite.scala
+++ 
b/gluten-delta/src/test/scala/org/apache/gluten/execution/VeloxDeltaSuite.scala
@@ -82,6 +82,7 @@ class VeloxDeltaSuite extends WholeStageTransformerSuite {
   }
 
   test("delta: time travel") {
+    assume(isSparkVersionAtleast("3.3"))
     withTable("delta_tm") {
       spark.sql(s"""
                    |create table delta_tm (id int, name string) using delta
diff --git 
a/gluten-delta/src/test/scala/org/apache/gluten/execution/VeloxTPCHDeltaSuite.scala
 
b/gluten-delta/src/test/scala/org/apache/gluten/execution/VeloxTPCHDeltaSuite.scala
index 60ea0fbcc..12b033333 100644
--- 
a/gluten-delta/src/test/scala/org/apache/gluten/execution/VeloxTPCHDeltaSuite.scala
+++ 
b/gluten-delta/src/test/scala/org/apache/gluten/execution/VeloxTPCHDeltaSuite.scala
@@ -36,6 +36,7 @@ class VeloxTPCHDeltaSuite extends VeloxTPCHSuite {
 
   override protected def sparkConf: SparkConf = {
     super.sparkConf
+      .set("spark.executor.memory", "4g")
       .set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
       .set("spark.sql.catalog.spark_catalog", 
"org.apache.spark.sql.delta.catalog.DeltaCatalog")
   }
diff --git 
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
 
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
index 303a9d79b..9bb33678a 100644
--- 
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
+++ 
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
DynamicPruningExpression, Expression, Literal}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.connector.catalog.Table
-import org.apache.spark.sql.connector.read.Scan
+import org.apache.spark.sql.connector.read.{InputPartition, Scan}
 import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
 import org.apache.spark.sql.types.StructType
 
@@ -56,7 +56,7 @@ case class IcebergScanTransformer(
 
   override lazy val fileFormat: ReadFileFormat = 
GlutenIcebergSourceUtil.getFileFormat(scan)
 
-  override def getSplitInfos: Seq[SplitInfo] = {
+  override def getSplitInfosFromPartitions(partitions: Seq[InputPartition]): 
Seq[SplitInfo] = {
     val groupedPartitions = SparkShimLoader.getSparkShims.orderPartitions(
       scan,
       keyGroupedPartitioning,
diff --git 
a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
 
b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
index ab1c0737e..46c77d5b8 100644
--- 
a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
+++ 
b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
@@ -43,21 +43,6 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
       .set("spark.sql.catalog.spark_catalog.warehouse", 
s"file://$rootPath/tpch-data-iceberg-velox")
   }
 
-  private def isSparkVersionAtleast(version: String): Boolean = {
-    val currentVersion = spark.version
-    val currentVersionSplit = currentVersion.split("\\.")
-    val versionSplit = version.split("\\.")
-    currentVersionSplit.zip(versionSplit).foreach {
-      case (current, required) =>
-        if (current.toInt > required.toInt) {
-          return true
-        } else if (current.toInt < required.toInt) {
-          return false
-        }
-    }
-    true
-  }
-
   test("iceberg transformer exists") {
     withTable("iceberg_tb") {
       spark.sql("""
diff --git 
a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxTPCHIcebergSuite.scala
 
b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxTPCHIcebergSuite.scala
index f4eead2a1..44badf159 100644
--- 
a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxTPCHIcebergSuite.scala
+++ 
b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxTPCHIcebergSuite.scala
@@ -38,6 +38,7 @@ class VeloxTPCHIcebergSuite extends VeloxTPCHSuite {
 
   override protected def sparkConf: SparkConf = {
     super.sparkConf
+      .set("spark.executor.memory", "4g")
       .set(
         "spark.sql.extensions",
         "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
@@ -99,7 +100,9 @@ class VeloxPartitionedTableTPCHIcebergSuite extends 
VeloxTPCHIcebergSuite {
         val tablePath = new File(resourcePath, table.name).getAbsolutePath
         val tableDF = spark.read.format(fileFormat).load(tablePath)
 
-        tableDF.write
+        tableDF
+          .repartition(50)
+          .write
           .format("iceberg")
           .partitionBy(table.partitionColumns: _*)
           .option(SparkWriteOptions.FANOUT_ENABLED, "true")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to