This is an automated email from the ASF dual-hosted git repository.
yuanzhou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new b3edec3ac [GLUTEN-5341] Support iceberg bucketjoin for Spark3.5 (#5378)
b3edec3ac is described below
commit b3edec3ace29cce66577c28d45a281065dba9a86
Author: Yan Ma <[email protected]>
AuthorDate: Thu Apr 25 08:33:02 2024 +0800
[GLUTEN-5341] Support iceberg bucketjoin for Spark3.5 (#5378)
This PR provides bucket joins support for Spark3.5 to fix corresponding UT
failures.
---
.../gluten/execution/VeloxIcebergSuite.scala | 11 ++---
.../gluten/sql/shims/spark35/Spark35Shims.scala | 51 ++++++++++++++++++----
.../datasources/v2/BatchScanExecShim.scala | 13 +++++-
3 files changed, 59 insertions(+), 16 deletions(-)
diff --git
a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
index 9ffcc80cf..77a1c790b 100644
---
a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
+++
b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
@@ -58,8 +58,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
}
}
- // Ignored due to failures, see
https://github.com/apache/incubator-gluten/issues/5362
- ignore("iceberg bucketed join", Some("3.4")) {
+ testWithSpecifiedSparkVersion("iceberg bucketed join", Some("3.4")) {
val leftTable = "p_str_tb"
val rightTable = "p_int_tb"
withTable(leftTable, rightTable) {
@@ -139,8 +138,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
}
}
- // Ignored due to failures, see
https://github.com/apache/incubator-gluten/issues/5362
- ignore("iceberg bucketed join with partition", Some("3.4")) {
+ testWithSpecifiedSparkVersion("iceberg bucketed join with partition",
Some("3.4")) {
val leftTable = "p_str_tb"
val rightTable = "p_int_tb"
withTable(leftTable, rightTable) {
@@ -220,8 +218,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
}
}
- // Ignored due to failures, see
https://github.com/apache/incubator-gluten/issues/5362
- ignore("iceberg bucketed join with partition filter", Some("3.4")) {
+ testWithSpecifiedSparkVersion("iceberg bucketed join with partition filter",
Some("3.4")) {
val leftTable = "p_str_tb"
val rightTable = "p_int_tb"
withTable(leftTable, rightTable) {
@@ -302,7 +299,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
}
}
- test("iceberg: time travel") {
+ testWithSpecifiedSparkVersion("iceberg: time travel") {
withTable("iceberg_tm") {
spark.sql(s"""
|create table iceberg_tm (id int, name string) using iceberg
diff --git
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
index 1be269b6c..f78d4ca6b 100644
---
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
+++
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
@@ -31,12 +31,13 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions._
import
org.apache.spark.sql.catalyst.expressions.aggregate.{BloomFilterAggregate,
RegrIntercept, RegrR2, RegrReplacement, RegrSlope, RegrSXY,
TypedImperativeAggregate}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution,
Distribution}
+import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution,
Distribution, KeyGroupedPartitioning, Partitioning}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.types.DataTypeUtils
-import org.apache.spark.sql.catalyst.util.TimestampFormatter
+import org.apache.spark.sql.catalyst.util.{InternalRowComparableWrapper,
TimestampFormatter}
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition,
Scan}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
@@ -382,12 +383,46 @@ class Spark35Shims extends SparkShims {
None
}
}
-
- override def getKeyGroupedPartitioning(batchScan: BatchScanExec):
Option[Seq[Expression]] = null
-
- override def getCommonPartitionValues(batchScan: BatchScanExec):
Option[Seq[(InternalRow, Int)]] =
- null
-
+ override def getKeyGroupedPartitioning(batchScan: BatchScanExec):
Option[Seq[Expression]] = {
+ batchScan.keyGroupedPartitioning
+ }
+
+ override def getCommonPartitionValues(
+ batchScan: BatchScanExec): Option[Seq[(InternalRow, Int)]] = {
+ batchScan.spjParams.commonPartitionValues
+ }
+
+ override def orderPartitions(
+ scan: Scan,
+ keyGroupedPartitioning: Option[Seq[Expression]],
+ filteredPartitions: Seq[Seq[InputPartition]],
+ outputPartitioning: Partitioning): Seq[InputPartition] = {
+ scan match {
+ case _ if keyGroupedPartitioning.isDefined =>
+ var newPartitions = filteredPartitions
+ outputPartitioning match {
+ case p: KeyGroupedPartitioning =>
+ val partitionMapping = newPartitions
+ .map(
+ s =>
+ InternalRowComparableWrapper(
+ s.head.asInstanceOf[HasPartitionKey],
+ p.expressions) -> s)
+ .toMap
+ newPartitions = p.partitionValues.map {
+ partValue =>
+ // Use empty partition for those partition values that are not
present
+ partitionMapping.getOrElse(
+ InternalRowComparableWrapper(partValue, p.expressions),
+ Seq.empty)
+ }
+ case _ =>
+ }
+ newPartitions.flatten
+ case _ =>
+ filteredPartitions.flatten
+ }
+ }
override def supportsRowBased(plan: SparkPlan): Boolean =
plan.supportsRowBased
override def withTryEvalMode(expr: Expression): Boolean = {
diff --git
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
index ec12fd33a..bb3806097 100644
---
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
+++
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
@@ -41,7 +41,18 @@ abstract class BatchScanExecShim(
val commonPartitionValues: Option[Seq[(InternalRow, Int)]] = None,
val applyPartialClustering: Boolean = false,
val replicatePartitions: Boolean = false)
- extends AbstractBatchScanExec(output, scan, runtimeFilters, table = table) {
+ extends AbstractBatchScanExec(
+ output,
+ scan,
+ runtimeFilters,
+ ordering,
+ table,
+ StoragePartitionJoinParams(
+ keyGroupedPartitioning,
+ commonPartitionValues,
+ applyPartialClustering,
+ replicatePartitions)
+ ) {
// Note: "metrics" is made transient to avoid sending driver-side metrics to
tasks.
@transient override lazy val metrics: Map[String, SQLMetric] = Map()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]