This is an automated email from the ASF dual-hosted git repository.
maxgekk pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 6f053f5a8d5c [SPARK-50990][SQL] Refactor `UpCast` resolution out of
the `Analyzer`
6f053f5a8d5c is described below
commit 6f053f5a8d5c6c740747ca848919352c1f05dca0
Author: Vladimir Golubev <[email protected]>
AuthorDate: Sun Jan 26 09:58:00 2025 +0300
[SPARK-50990][SQL] Refactor `UpCast` resolution out of the `Analyzer`
### What changes were proposed in this pull request?
Refactor `UpCast` resolution out of the `Analyzer`.
### Why are the changes needed?
To reuse this code in single-pass `Resolver`.
### Does this PR introduce _any_ user-facing change?
No, just a refactoring.
### How was this patch tested?
Existing tests.
### Was this patch authored or co-authored using generative AI tooling?
Copilot.nvim.
Closes #49669 from vladimirg-db/vladimirg-db/refactor-upcast-resolution-out.
Authored-by: Vladimir Golubev <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
(cherry picked from commit d0b1b0b22e8d9c05c7eac6b11746ddfcdb6c5a83)
Signed-off-by: Max Gekk <[email protected]>
---
.../spark/sql/catalyst/analysis/Analyzer.scala | 37 ++----------
.../sql/catalyst/analysis/UpCastResolution.scala | 69 ++++++++++++++++++++++
2 files changed, 73 insertions(+), 33 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 686048cc9b5e..5477857cd468 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -3727,45 +3727,16 @@ class Analyzer(override val catalogManager:
CatalogManager) extends RuleExecutor
* Replace the [[UpCast]] expression by [[Cast]], and throw exceptions if
the cast may truncate.
*/
object ResolveUpCast extends Rule[LogicalPlan] {
- private def fail(from: Expression, to: DataType, walkedTypePath:
Seq[String]) = {
- val fromStr = from match {
- case l: LambdaVariable => "array element"
- case e => e.sql
- }
- throw QueryCompilationErrors.upCastFailureError(fromStr, from, to,
walkedTypePath)
- }
-
def apply(plan: LogicalPlan): LogicalPlan =
plan.resolveOperatorsUpWithPruning(
_.containsPattern(UP_CAST), ruleId) {
case p if !p.childrenResolved => p
case p if p.resolved => p
case p => p.transformExpressionsWithPruning(_.containsPattern(UP_CAST),
ruleId) {
- case u @ UpCast(child, _, _) if !child.resolved => u
-
- case UpCast(_, target, _) if target != DecimalType &&
!target.isInstanceOf[DataType] =>
- throw SparkException.internalError(
- s"UpCast only supports DecimalType as AbstractDataType yet, but
got: $target")
-
- case UpCast(child, target, walkedTypePath) if target == DecimalType
- && child.dataType.isInstanceOf[DecimalType] =>
- assert(walkedTypePath.nonEmpty,
- "object DecimalType should only be used inside ExpressionEncoder")
-
- // SPARK-31750: if we want to upcast to the general decimal type,
and the `child` is
- // already decimal type, we can remove the `Upcast` and accept any
precision/scale.
- // This can happen for cases like
`spark.read.parquet("/tmp/file").as[BigDecimal]`.
- child
-
- case UpCast(child, target: AtomicType, _)
- if conf.getConf(SQLConf.LEGACY_LOOSE_UPCAST) &&
- child.dataType == StringType =>
- Cast(child, target.asNullable)
-
- case u @ UpCast(child, _, walkedTypePath) if
!Cast.canUpCast(child.dataType, u.dataType) =>
- fail(child, u.dataType, walkedTypePath)
-
- case u @ UpCast(child, _, _) => Cast(child, u.dataType)
+ case unresolvedUpCast @ UpCast(child, _, _) if !child.resolved =>
+ unresolvedUpCast
+ case unresolvedUpCast: UpCast =>
+ UpCastResolution.resolve(unresolvedUpCast)
}
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UpCastResolution.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UpCastResolution.scala
new file mode 100644
index 000000000000..df51a330a050
--- /dev/null
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UpCastResolution.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.SQLConfHelper
+import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, UpCast}
+import org.apache.spark.sql.catalyst.expressions.objects.LambdaVariable
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{AtomicType, DataType, DecimalType,
StringType}
+
+object UpCastResolution extends SQLConfHelper {
+ def resolve(unresolvedUpCast: UpCast): Expression = unresolvedUpCast match {
+ case UpCast(_, target, _) if target != DecimalType &&
!target.isInstanceOf[DataType] =>
+ throw SparkException.internalError(
+ s"UpCast only supports DecimalType as AbstractDataType yet, but got:
$target"
+ )
+
+ case UpCast(child, target, walkedTypePath)
+ if target == DecimalType
+ && child.dataType.isInstanceOf[DecimalType] =>
+ assert(
+ walkedTypePath.nonEmpty,
+ "object DecimalType should only be used inside ExpressionEncoder"
+ )
+
+ // SPARK-31750: if we want to upcast to the general decimal type, and
the `child` is
+ // already decimal type, we can remove the `Upcast` and accept any
precision/scale.
+ // This can happen for cases like
`spark.read.parquet("/tmp/file").as[BigDecimal]`.
+ child
+
+ case UpCast(child, target: AtomicType, _)
+ if conf.getConf(SQLConf.LEGACY_LOOSE_UPCAST) &&
+ child.dataType == StringType =>
+ Cast(child, target.asNullable)
+
+ case unresolvedUpCast @ UpCast(child, _, walkedTypePath)
+ if !Cast.canUpCast(child.dataType, unresolvedUpCast.dataType) =>
+ fail(child, unresolvedUpCast.dataType, walkedTypePath)
+
+ case unresolvedUpCast @ UpCast(child, _, _) =>
+ Cast(child, unresolvedUpCast.dataType)
+ }
+
+ private def fail(from: Expression, to: DataType, walkedTypePath:
Seq[String]) = {
+ val fromStr = from match {
+ case l: LambdaVariable => "array element"
+ case e => e.sql
+ }
+
+ throw QueryCompilationErrors.upCastFailureError(fromStr, from, to,
walkedTypePath)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]