This is an automated email from the ASF dual-hosted git repository.

yuanzhou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new b3edec3ac [GLUTEN-5341] Support iceberg bucketjoin for Spark3.5 (#5378)
b3edec3ac is described below

commit b3edec3ace29cce66577c28d45a281065dba9a86
Author: Yan Ma <[email protected]>
AuthorDate: Thu Apr 25 08:33:02 2024 +0800

    [GLUTEN-5341] Support iceberg bucketjoin for Spark3.5 (#5378)
    
    This PR provides bucket joins support for Spark3.5 to fix corresponding UT 
failures.
---
 .../gluten/execution/VeloxIcebergSuite.scala       | 11 ++---
 .../gluten/sql/shims/spark35/Spark35Shims.scala    | 51 ++++++++++++++++++----
 .../datasources/v2/BatchScanExecShim.scala         | 13 +++++-
 3 files changed, 59 insertions(+), 16 deletions(-)

diff --git 
a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
 
b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
index 9ffcc80cf..77a1c790b 100644
--- 
a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
+++ 
b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
@@ -58,8 +58,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
     }
   }
 
-  // Ignored due to failures, see 
https://github.com/apache/incubator-gluten/issues/5362
-  ignore("iceberg bucketed join", Some("3.4")) {
+  testWithSpecifiedSparkVersion("iceberg bucketed join", Some("3.4")) {
     val leftTable = "p_str_tb"
     val rightTable = "p_int_tb"
     withTable(leftTable, rightTable) {
@@ -139,8 +138,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
     }
   }
 
-  // Ignored due to failures, see 
https://github.com/apache/incubator-gluten/issues/5362
-  ignore("iceberg bucketed join with partition", Some("3.4")) {
+  testWithSpecifiedSparkVersion("iceberg bucketed join with partition", 
Some("3.4")) {
     val leftTable = "p_str_tb"
     val rightTable = "p_int_tb"
     withTable(leftTable, rightTable) {
@@ -220,8 +218,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
     }
   }
 
-  // Ignored due to failures, see 
https://github.com/apache/incubator-gluten/issues/5362
-  ignore("iceberg bucketed join with partition filter", Some("3.4")) {
+  testWithSpecifiedSparkVersion("iceberg bucketed join with partition filter", 
Some("3.4")) {
     val leftTable = "p_str_tb"
     val rightTable = "p_int_tb"
     withTable(leftTable, rightTable) {
@@ -302,7 +299,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
     }
   }
 
-  test("iceberg: time travel") {
+  testWithSpecifiedSparkVersion("iceberg: time travel") {
     withTable("iceberg_tm") {
       spark.sql(s"""
                    |create table iceberg_tm (id int, name string) using iceberg
diff --git 
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
 
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
index 1be269b6c..f78d4ca6b 100644
--- 
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
+++ 
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
@@ -31,12 +31,13 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.expressions._
 import 
org.apache.spark.sql.catalyst.expressions.aggregate.{BloomFilterAggregate, 
RegrIntercept, RegrR2, RegrReplacement, RegrSlope, RegrSXY, 
TypedImperativeAggregate}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, 
Distribution}
+import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, 
Distribution, KeyGroupedPartitioning, Partitioning}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.types.DataTypeUtils
-import org.apache.spark.sql.catalyst.util.TimestampFormatter
+import org.apache.spark.sql.catalyst.util.{InternalRowComparableWrapper, 
TimestampFormatter}
 import org.apache.spark.sql.connector.catalog.Table
 import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition, 
Scan}
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
@@ -382,12 +383,46 @@ class Spark35Shims extends SparkShims {
         None
     }
   }
-
-  override def getKeyGroupedPartitioning(batchScan: BatchScanExec): 
Option[Seq[Expression]] = null
-
-  override def getCommonPartitionValues(batchScan: BatchScanExec): 
Option[Seq[(InternalRow, Int)]] =
-    null
-
+  override def getKeyGroupedPartitioning(batchScan: BatchScanExec): 
Option[Seq[Expression]] = {
+    batchScan.keyGroupedPartitioning
+  }
+
+  override def getCommonPartitionValues(
+      batchScan: BatchScanExec): Option[Seq[(InternalRow, Int)]] = {
+    batchScan.spjParams.commonPartitionValues
+  }
+
+  override def orderPartitions(
+      scan: Scan,
+      keyGroupedPartitioning: Option[Seq[Expression]],
+      filteredPartitions: Seq[Seq[InputPartition]],
+      outputPartitioning: Partitioning): Seq[InputPartition] = {
+    scan match {
+      case _ if keyGroupedPartitioning.isDefined =>
+        var newPartitions = filteredPartitions
+        outputPartitioning match {
+          case p: KeyGroupedPartitioning =>
+            val partitionMapping = newPartitions
+              .map(
+                s =>
+                  InternalRowComparableWrapper(
+                    s.head.asInstanceOf[HasPartitionKey],
+                    p.expressions) -> s)
+              .toMap
+            newPartitions = p.partitionValues.map {
+              partValue =>
+                // Use empty partition for those partition values that are not 
present
+                partitionMapping.getOrElse(
+                  InternalRowComparableWrapper(partValue, p.expressions),
+                  Seq.empty)
+            }
+          case _ =>
+        }
+        newPartitions.flatten
+      case _ =>
+        filteredPartitions.flatten
+    }
+  }
   override def supportsRowBased(plan: SparkPlan): Boolean = 
plan.supportsRowBased
 
   override def withTryEvalMode(expr: Expression): Boolean = {
diff --git 
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
 
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
index ec12fd33a..bb3806097 100644
--- 
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
+++ 
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
@@ -41,7 +41,18 @@ abstract class BatchScanExecShim(
     val commonPartitionValues: Option[Seq[(InternalRow, Int)]] = None,
     val applyPartialClustering: Boolean = false,
     val replicatePartitions: Boolean = false)
-  extends AbstractBatchScanExec(output, scan, runtimeFilters, table = table) {
+  extends AbstractBatchScanExec(
+    output,
+    scan,
+    runtimeFilters,
+    ordering,
+    table,
+    StoragePartitionJoinParams(
+      keyGroupedPartitioning,
+      commonPartitionValues,
+      applyPartialClustering,
+      replicatePartitions)
+  ) {
 
   // Note: "metrics" is made transient to avoid sending driver-side metrics to 
tasks.
   @transient override lazy val metrics: Map[String, SQLMetric] = Map()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to