This is an automated email from the ASF dual-hosted git repository.

philo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 0a40b2433 [CORE] Only return columns of partitions that require read 
for iceberg (#5624)
0a40b2433 is described below

commit 0a40b2433f8314115fd3c713cbb044e684b57b6b
Author: Zouxxyy <[email protected]>
AuthorDate: Tue May 7 10:26:56 2024 +0800

    [CORE] Only return columns of partitions that require read for iceberg 
(#5624)
---
 .../gluten/execution/IcebergScanTransformer.scala  |  4 +--
 .../spark/source/GlutenIcebergSourceUtil.scala     | 30 +++++++++++++++++-----
 2 files changed, 25 insertions(+), 9 deletions(-)

diff --git 
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
 
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
index 9bb33678a..6e079bf7e 100644
--- 
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
+++ 
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
@@ -48,7 +48,7 @@ case class IcebergScanTransformer(
 
   override def filterExprs(): Seq[Expression] = 
pushdownFilters.getOrElse(Seq.empty)
 
-  override def getPartitionSchema: StructType = 
GlutenIcebergSourceUtil.getPartitionSchema(scan)
+  override def getPartitionSchema: StructType = 
GlutenIcebergSourceUtil.getReadPartitionSchema(scan)
 
   override def getDataSchema: StructType = new StructType()
 
@@ -63,7 +63,7 @@ case class IcebergScanTransformer(
       filteredPartitions,
       outputPartitioning)
     groupedPartitions.zipWithIndex.map {
-      case (p, index) => GlutenIcebergSourceUtil.genSplitInfo(p, index)
+      case (p, index) => GlutenIcebergSourceUtil.genSplitInfo(p, index, 
getPartitionSchema)
     }
   }
 
diff --git 
a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala
 
b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala
index 2b4f54aef..6b67e7636 100644
--- 
a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala
+++ 
b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala
@@ -34,7 +34,10 @@ import scala.collection.JavaConverters._
 
 object GlutenIcebergSourceUtil {
 
-  def genSplitInfo(inputPartition: InputPartition, index: Int): SplitInfo = 
inputPartition match {
+  def genSplitInfo(
+      inputPartition: InputPartition,
+      index: Int,
+      readPartitionSchema: StructType): SplitInfo = inputPartition match {
     case partition: SparkInputPartition =>
       val paths = new JArrayList[String]()
       val starts = new JArrayList[JLong]()
@@ -50,8 +53,8 @@ object GlutenIcebergSourceUtil {
           paths.add(filePath)
           starts.add(task.start())
           lengths.add(task.length())
-          partitionColumns.add(getPartitionColumns(task))
-          deleteFilesList.add(task.deletes());
+          partitionColumns.add(getPartitionColumns(task, readPartitionSchema))
+          deleteFilesList.add(task.deletes())
           val currentFileFormat = convertFileFormat(task.file().format())
           if (fileFormat == ReadFileFormat.UnknownFormat) {
             fileFormat = currentFileFormat
@@ -94,7 +97,7 @@ object GlutenIcebergSourceUtil {
       throw new GlutenNotSupportException("Only support iceberg 
SparkBatchQueryScan.")
   }
 
-  def getPartitionSchema(sparkScan: Scan): StructType = sparkScan match {
+  def getReadPartitionSchema(sparkScan: Scan): StructType = sparkScan match {
     case scan: SparkBatchQueryScan =>
       val tasks = scan.tasks().asScala
       asFileScanTask(tasks.toList).foreach {
@@ -102,7 +105,16 @@ object GlutenIcebergSourceUtil {
           val spec = task.spec()
           if (spec.isPartitioned) {
             var partitionSchema = new StructType()
-            val partitionFields = spec.partitionType().fields().asScala
+            val readFields = scan.readSchema().fields.map(_.name).toSet
+            // Iceberg will generate some non-table fields as partition 
fields, such as x_bucket,
+            // which will not appear in readFields, they also cannot be 
filtered.
+            val tableFields = 
spec.schema().columns().asScala.map(_.name()).toSet
+            val partitionFields =
+              spec
+                .partitionType()
+                .fields()
+                .asScala
+                .filter(f => !tableFields.contains(f.name) || 
readFields.contains(f.name()))
             partitionFields.foreach {
               field =>
                 TypeUtil.validatePartitionColumnType(field.`type`().typeId())
@@ -130,12 +142,16 @@ object GlutenIcebergSourceUtil {
     }
   }
 
-  private def getPartitionColumns(task: FileScanTask): JHashMap[String, 
String] = {
+  private def getPartitionColumns(
+      task: FileScanTask,
+      readPartitionSchema: StructType): JHashMap[String, String] = {
     val partitionColumns = new JHashMap[String, String]()
+    val readPartitionFields = readPartitionSchema.fields.map(_.name).toSet
     val spec = task.spec()
     val partition = task.partition()
     if (spec.isPartitioned) {
-      val partitionFields = spec.partitionType().fields().asScala
+      val partitionFields =
+        spec.partitionType().fields().asScala.filter(f => 
readPartitionFields.contains(f.name()))
       partitionFields.zipWithIndex.foreach {
         case (field, index) =>
           val partitionValue = partition.get(index, 
field.`type`().typeId().javaClass())


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to