This is an automated email from the ASF dual-hosted git repository.
sunchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 08044ec3ce3 [SPARK-40508][SQL] Treat unknown partitioning as
UnknownPartitioning
08044ec3ce3 is described below
commit 08044ec3ce35f263ae5356816aa20601456590c7
Author: Ted Yu <[email protected]>
AuthorDate: Wed Sep 21 09:41:22 2022 -0700
[SPARK-40508][SQL] Treat unknown partitioning as UnknownPartitioning
### What changes were proposed in this pull request?
When running spark application against spark 3.3, I see the following :
```
java.lang.IllegalArgumentException: Unsupported data source V2 partitioning
type: CustomPartitioning
at
org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioning$$anonfun$apply$1.applyOrElse(V2ScanPartitioning.scala:46)
at
org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioning$$anonfun$apply$1.applyOrElse(V2ScanPartitioning.scala:34)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
```
The `CustomPartitioning` works fine with Spark 3.2.1
This PR proposes to relax the code and treat all unknown partitioning the
same way as that for `UnknownPartitioning`.
### Why are the changes needed?
3.3.0 doesn't seem to warrant such behavioral change (from that of 3.2.1
release).
### Does this PR introduce _any_ user-facing change?
This would allow user's custom partitioning to continue to work with 3.3.x
releases.
### How was this patch tested?
Existing test suite.
Closes #37952 from tedyu/v2-part-unk.
Authored-by: Ted Yu <[email protected]>
Signed-off-by: Chao Sun <[email protected]>
---
.../execution/datasources/v2/V2ScanPartitioningAndOrdering.scala | 9 ++++++---
1 file changed, 6 insertions(+), 3 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanPartitioningAndOrdering.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanPartitioningAndOrdering.scala
index 5c8c7cf420d..b03dda11168 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanPartitioningAndOrdering.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanPartitioningAndOrdering.scala
@@ -16,6 +16,7 @@
*/
package org.apache.spark.sql.execution.datasources.v2
+import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -29,7 +30,7 @@ import org.apache.spark.util.collection.Utils.sequenceToOption
* and ordering reported by data sources to their catalyst counterparts. Then,
annotates the plan
* with the partitioning and ordering result.
*/
-object V2ScanPartitioningAndOrdering extends Rule[LogicalPlan] with
SQLConfHelper {
+object V2ScanPartitioningAndOrdering extends Rule[LogicalPlan] with
SQLConfHelper with Logging {
override def apply(plan: LogicalPlan): LogicalPlan = {
val scanRules = Seq[LogicalPlan => LogicalPlan] (partitioning, ordering)
@@ -54,8 +55,10 @@ object V2ScanPartitioningAndOrdering extends
Rule[LogicalPlan] with SQLConfHelpe
}
}
case _: UnknownPartitioning => None
- case p => throw new IllegalArgumentException("Unsupported data source
V2 partitioning " +
- "type: " + p.getClass.getSimpleName)
+ case p =>
+ logWarning("Spark ignores the partitioning
${p.getClass.getSimpleName}." +
+ " Please use KeyGroupedPartitioning for better performance")
+ None
}
d.copy(keyGroupedPartitioning = catalystPartitioning)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]