This is an automated email from the ASF dual-hosted git repository.
ptoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 22d9709c2921 [SPARK-53738][SQL] PlannedWrite should preserve custom
sort order when query output contains literal
22d9709c2921 is described below
commit 22d9709c2921f358bd5c70036d886d9bb0ebb4f5
Author: Cheng Pan <[email protected]>
AuthorDate: Wed Oct 8 14:14:11 2025 +0200
[SPARK-53738][SQL] PlannedWrite should preserve custom sort order when
query output contains literal
### What changes were proposed in this pull request?
This PR fixes a bug in `plannedWrite`, where the `query` has a literal
output of the partition column.
```
CREATE TABLE t (i INT, j INT, k STRING) USING PARQUET PARTITIONED BY (k);
INSERT OVERWRITE t SELECT j AS i, i AS j, '0' as k FROM t0 SORT BY k, i;
```
The evaluation of `FileFormatWriter.orderingMatched` fails because
`SortOrder(Literal)` is eliminated by `EliminateSorts`.
The
[idea](https://github.com/apache/spark/pull/52474#discussion_r2387635927) is to
expose and keep "constant order" expressions from `child.outputOrdering`
### Why are the changes needed?
`V1Writes` will override the custom sort order when the query output
ordering does not satisfy the required ordering. Before SPARK-53707, when the
query's output contains literals in partition columns, the judgment produces a
false-negative result, thus causing the sort order not to take effect.
SPARK-53707 fixes the issue accidentally(and partially) by adding a
`Project` of query in `V1Writes`.
Before SPARK-53707
```
Sort [0 ASC NULLS FIRST, i#280 ASC NULLS FIRST], false
+- Project [j#287 AS i#280, i#286 AS j#281, 0 AS k#282]
+- Relation spark_catalog.default.t0[i#286,j#287,k#288] parquet
```
After SPARK-53707
```
Project [i#284, j#285, 0 AS k#290]
+- Sort [0 ASC NULLS FIRST, i#284 ASC NULLS FIRST], false
+- Project [i#284, j#285]
+- Relation spark_catalog.default.t0[i#284,j#285,k#286] parquet
```
This PR fixes the issue thoroughly, with a new UT added.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UT is added.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #52474 from pan3793/SPARK-53738.
Lead-authored-by: Cheng Pan <[email protected]>
Co-authored-by: Peter Toth <[email protected]>
Co-authored-by: Cheng Pan <[email protected]>
Signed-off-by: Peter Toth <[email protected]>
---
.../apache/spark/sql/catalyst/dsl/package.scala | 1 +
.../spark/sql/catalyst/expressions/SortOrder.scala | 49 +++++++++++++++++-----
.../sql/catalyst/expressions/aggregate/Mode.scala | 4 +-
.../expressions/aggregate/percentiles.scala | 4 +-
.../spark/sql/catalyst/optimizer/Optimizer.scala | 2 +-
.../plans/AliasAwareOutputExpression.scala | 6 ++-
.../plans/logical/basicLogicalOperators.scala | 3 +-
.../sql/catalyst/expressions/OrderingSuite.scala | 3 +-
.../org/apache/spark/sql/execution/SortExec.scala | 9 ++--
.../spark/sql/execution/SortPrefixUtils.scala | 8 ++++
.../sql/execution/columnar/InMemoryRelation.scala | 19 ++++++---
.../execution/datasources/DataSourceStrategy.scala | 3 ++
.../spark/sql/execution/datasources/V1Writes.scala | 20 +++++++--
.../window/WindowEvaluatorFactoryBase.scala | 4 +-
.../datasources/V1WriteCommandSuite.scala | 46 +++++++++++++++++++-
.../command/V1WriteHiveCommandSuite.scala | 32 ++++++++++++++
16 files changed, 179 insertions(+), 34 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 1d7cf5455e57..e9e897cfb78e 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -151,6 +151,7 @@ package object dsl extends SQLConfHelper {
def asc: SortOrder = SortOrder(expr, Ascending)
def asc_nullsLast: SortOrder = SortOrder(expr, Ascending, NullsLast,
Seq.empty)
+ def const: SortOrder = SortOrder(expr, Constant)
def desc: SortOrder = SortOrder(expr, Descending)
def desc_nullsFirst: SortOrder = SortOrder(expr, Descending, NullsFirst,
Seq.empty)
def as(alias: String): NamedExpression = Alias(expr, alias)()
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
index 166866c90b87..4632e064daa5 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
@@ -45,6 +45,11 @@ case object Descending extends SortDirection {
override def defaultNullOrdering: NullOrdering = NullsLast
}
+case object Constant extends SortDirection {
+ override def sql: String = "CONST"
+ override def defaultNullOrdering: NullOrdering = NullsFirst
+}
+
case object NullsFirst extends NullOrdering {
override def sql: String = "NULLS FIRST"
}
@@ -69,8 +74,13 @@ case class SortOrder(
override def children: Seq[Expression] = child +: sameOrderExpressions
- override def checkInputDataTypes(): TypeCheckResult =
- TypeUtils.checkForOrderingExpr(dataType, prettyName)
+ override def checkInputDataTypes(): TypeCheckResult = {
+ if (direction == Constant) {
+ TypeCheckResult.TypeCheckSuccess
+ } else {
+ TypeUtils.checkForOrderingExpr(dataType, prettyName)
+ }
+ }
override def dataType: DataType = child.dataType
override def nullable: Boolean = child.nullable
@@ -81,8 +91,8 @@ case class SortOrder(
def isAscending: Boolean = direction == Ascending
def satisfies(required: SortOrder): Boolean = {
- children.exists(required.child.semanticEquals) &&
- direction == required.direction && nullOrdering == required.nullOrdering
+ children.exists(required.child.semanticEquals) && (direction == Constant ||
+ direction == required.direction && nullOrdering == required.nullOrdering)
}
override protected def withNewChildrenInternal(newChildren:
IndexedSeq[Expression]): SortOrder =
@@ -101,21 +111,38 @@ object SortOrder {
* Returns if a sequence of SortOrder satisfies another sequence of
SortOrder.
*
* SortOrder sequence A satisfies SortOrder sequence B if and only if B is
an equivalent of A
- * or of A's prefix. Here are examples of ordering A satisfying ordering B:
+ * or of A's prefix, except for SortOrder in B that satisfies any constant
SortOrder in A.
+ *
+ * Here are examples of ordering A satisfying ordering B:
* <ul>
* <li>ordering A is [x, y] and ordering B is [x]</li>
+ * <li>ordering A is [z(const), x, y] and ordering B is [x, z]</li>
* <li>ordering A is [x(sameOrderExpressions=x1)] and ordering B is
[x1]</li>
* <li>ordering A is [x(sameOrderExpressions=x1), y] and ordering B is
[x1]</li>
* </ul>
*/
- def orderingSatisfies(ordering1: Seq[SortOrder], ordering2: Seq[SortOrder]):
Boolean = {
- if (ordering2.isEmpty) {
- true
- } else if (ordering2.length > ordering1.length) {
+ def orderingSatisfies(
+ providedOrdering: Seq[SortOrder], requiredOrdering: Seq[SortOrder]):
Boolean = {
+ if (requiredOrdering.isEmpty) {
+ return true
+ }
+
+ val (constantProvidedOrdering, nonConstantProvidedOrdering) =
providedOrdering.partition {
+ case SortOrder(_, Constant, _, _) => true
+ case SortOrder(child, _, _, _) => child.foldable
+ }
+
+ val effectiveRequiredOrdering = requiredOrdering.filterNot { requiredOrder
=>
+ constantProvidedOrdering.exists { providedOrder =>
+ providedOrder.satisfies(requiredOrder)
+ }
+ }
+
+ if (effectiveRequiredOrdering.length > nonConstantProvidedOrdering.length)
{
false
} else {
- ordering2.zip(ordering1).forall {
- case (o2, o1) => o1.satisfies(o2)
+ effectiveRequiredOrdering.zip(nonConstantProvidedOrdering).forall {
+ case (required, provided) => provided.satisfies(required)
}
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala
index 7a4f04bf04f7..b61dae585061 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.expressions.aggregate
import org.apache.spark.SparkIllegalArgumentException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{ExpressionBuilder,
UnresolvedWithinGroup}
-import org.apache.spark.sql.catalyst.expressions.{Ascending, Descending,
Expression, ExpressionDescription, ImplicitCastInputTypes, SortOrder}
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Constant,
Descending, Expression, ExpressionDescription, ImplicitCastInputTypes,
SortOrder}
import org.apache.spark.sql.catalyst.expressions.Cast.toSQLExpr
import org.apache.spark.sql.catalyst.trees.UnaryLike
import org.apache.spark.sql.catalyst.types.PhysicalDataType
@@ -199,6 +199,8 @@ case class Mode(
this.copy(child = child, reverseOpt = Some(true))
case SortOrder(child, Descending, _, _) =>
this.copy(child = child, reverseOpt = Some(false))
+ case SortOrder(child, Constant, _, _) =>
+ this.copy(child = child)
}
case _ => this
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala
index 6dfa1b499df2..942c06f60d12 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala
@@ -382,7 +382,7 @@ case class PercentileCont(left: Expression, right:
Expression, reverse: Boolean
nodeName, 1, orderingWithinGroup.length)
}
orderingWithinGroup.head match {
- case SortOrder(child, Ascending, _, _) => this.copy(left = child)
+ case SortOrder(child, Ascending | Constant, _, _) => this.copy(left =
child)
case SortOrder(child, Descending, _, _) => this.copy(left = child,
reverse = true)
}
}
@@ -440,7 +440,7 @@ case class PercentileDisc(
nodeName, 1, orderingWithinGroup.length)
}
orderingWithinGroup.head match {
- case SortOrder(expr, Ascending, _, _) => this.copy(child = expr)
+ case SortOrder(expr, Ascending | Constant, _, _) => this.copy(child =
expr)
case SortOrder(expr, Descending, _, _) => this.copy(child = expr,
reverse = true)
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index fc65c24afcb8..b5941cd5eddc 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1919,7 +1919,7 @@ object CombineFilters extends Rule[LogicalPlan] with
PredicateHelper {
object EliminateSorts extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan =
plan.transformUpWithPruning(_.containsPattern(SORT)) {
case s @ Sort(orders, _, child, _) if orders.isEmpty ||
orders.exists(_.child.foldable) =>
- val newOrders = orders.filterNot(_.child.foldable)
+ val newOrders = orders.filterNot(o => o.direction != Constant &&
o.child.foldable)
if (newOrders.isEmpty) {
child
} else {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala
index e1a9e8b5ea81..efbd7b0c8b81 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.plans
import scala.collection.mutable
import org.apache.spark.sql.catalyst.SQLConfHelper
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
AttributeSet, Empty2Null, Expression, NamedExpression, SortOrder}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
AttributeSet, Constant, Empty2Null, Expression, NamedExpression, SortOrder}
import org.apache.spark.sql.internal.SQLConf
/**
@@ -128,6 +128,8 @@ trait AliasAwareQueryOutputOrdering[T <: QueryPlan[T]]
}
}
}
- newOrdering.takeWhile(_.isDefined).flatten.toSeq
+ newOrdering.takeWhile(_.isDefined).flatten.toSeq ++
outputExpressions.collect {
+ case a @ Alias(child, _) if child.foldable => SortOrder(a.toAttribute,
Constant)
+ }
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index ad6939422b97..5a7548f13f1f 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -912,7 +912,8 @@ case class Sort(
override def maxRowsPerPartition: Option[Long] = {
if (global) maxRows else child.maxRowsPerPartition
}
- override def outputOrdering: Seq[SortOrder] = order
+ override def outputOrdering: Seq[SortOrder] =
+ order ++ child.outputOrdering.filter(_.direction == Constant)
final override val nodePatterns: Seq[TreePattern] = Seq(SORT)
override protected def withNewChildInternal(newChild: LogicalPlan): Sort =
copy(child = newChild)
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala
index 06c8b5ccef65..5facdaeb1aca 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala
@@ -43,9 +43,10 @@ class OrderingSuite extends SparkFunSuite with
ExpressionEvalHelper {
val sortOrder = direction match {
case Ascending => BoundReference(0, dataType, nullable = true).asc
case Descending => BoundReference(0, dataType, nullable = true).desc
+ case Constant => BoundReference(0, dataType, nullable = true).const
}
val expectedCompareResult = direction match {
- case Ascending => signum(expected)
+ case Ascending | Constant => signum(expected)
case Descending => -1 * signum(expected)
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
index 11fde41aae9e..708e054664c5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
@@ -46,7 +46,8 @@ case class SortExec(
override def output: Seq[Attribute] = child.output
- override def outputOrdering: Seq[SortOrder] = sortOrder
+ override def outputOrdering: Seq[SortOrder] =
+ sortOrder ++ child.outputOrdering.filter(_.direction == Constant)
// sort performed is local within a given partition so will retain
// child operator's partitioning
@@ -73,15 +74,17 @@ case class SortExec(
* should make it public.
*/
def createSorter(): UnsafeExternalRowSorter = {
+ val effectiveSortOrder = sortOrder.filterNot(_.direction == Constant)
+
rowSorter = new ThreadLocal[UnsafeExternalRowSorter]()
val ordering = RowOrdering.create(sortOrder, output)
// The comparator for comparing prefix
- val boundSortExpression = BindReferences.bindReference(sortOrder.head,
output)
+ val boundSortExpression =
BindReferences.bindReference(effectiveSortOrder.head, output)
val prefixComparator =
SortPrefixUtils.getPrefixComparator(boundSortExpression)
- val canUseRadixSort = enableRadixSort && sortOrder.length == 1 &&
+ val canUseRadixSort = enableRadixSort && effectiveSortOrder.length == 1 &&
SortPrefixUtils.canSortFullyWithPrefix(boundSortExpression)
// The generator for prefix
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala
index 7332bbcb1845..a50f2ba0d04f 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala
@@ -63,6 +63,8 @@ object SortPrefixUtils {
PrefixComparators.STRING_DESC_NULLS_FIRST
case Descending =>
PrefixComparators.STRING_DESC
+ case Constant =>
+ NoOpPrefixComparator
}
}
@@ -76,6 +78,8 @@ object SortPrefixUtils {
PrefixComparators.BINARY_DESC_NULLS_FIRST
case Descending =>
PrefixComparators.BINARY_DESC
+ case Constant =>
+ NoOpPrefixComparator
}
}
@@ -89,6 +93,8 @@ object SortPrefixUtils {
PrefixComparators.LONG_DESC_NULLS_FIRST
case Descending =>
PrefixComparators.LONG_DESC
+ case Constant =>
+ NoOpPrefixComparator
}
}
@@ -102,6 +108,8 @@ object SortPrefixUtils {
PrefixComparators.DOUBLE_DESC_NULLS_FIRST
case Descending =>
PrefixComparators.DOUBLE_DESC
+ case Constant =>
+ NoOpPrefixComparator
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index eabbc7fc74f5..bf7491625fa0 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -439,9 +439,7 @@ case class InMemoryRelation(
override def innerChildren: Seq[SparkPlan] = Seq(cachedPlan)
override def doCanonicalize(): logical.LogicalPlan =
- copy(output = output.map(QueryPlan.normalizeExpressions(_, output)),
- cacheBuilder,
- outputOrdering)
+ withOutput(output.map(QueryPlan.normalizeExpressions(_, output)))
@transient val partitionStatistics = new PartitionStatistics(output)
@@ -469,8 +467,13 @@ case class InMemoryRelation(
}
}
- def withOutput(newOutput: Seq[Attribute]): InMemoryRelation =
- InMemoryRelation(newOutput, cacheBuilder, outputOrdering,
statsOfPlanToCache)
+ def withOutput(newOutput: Seq[Attribute]): InMemoryRelation = {
+ val map = AttributeMap(output.zip(newOutput))
+ val newOutputOrdering = outputOrdering
+ .map(_.transform { case a: Attribute => map(a) })
+ .asInstanceOf[Seq[SortOrder]]
+ InMemoryRelation(newOutput, cacheBuilder, newOutputOrdering,
statsOfPlanToCache)
+ }
override def newInstance(): this.type = {
InMemoryRelation(
@@ -487,6 +490,12 @@ case class InMemoryRelation(
cloned
}
+ override def makeCopy(newArgs: Array[AnyRef]): LogicalPlan = {
+ val copied = super.makeCopy(newArgs).asInstanceOf[InMemoryRelation]
+ copied.statsOfPlanToCache = this.statsOfPlanToCache
+ copied
+ }
+
override def simpleString(maxFields: Int): String =
s"InMemoryRelation [${truncatedString(output, ", ", maxFields)}],
${cacheBuilder.storageLevel}"
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 2e47f08ac115..95bba45c3b1d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -25,6 +25,7 @@ import scala.jdk.CollectionConverters._
import org.apache.hadoop.fs.Path
+import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.internal.LogKeys.PREDICATES
import org.apache.spark.rdd.RDD
@@ -827,6 +828,8 @@ object DataSourceStrategy
val directionV2 = directionV1 match {
case Ascending => SortDirection.ASCENDING
case Descending => SortDirection.DESCENDING
+ case Constant =>
+ throw SparkException.internalError(s"Unexpected catalyst sort
direction $Constant")
}
val nullOrderingV2 = nullOrderingV1 match {
case NullsFirst => NullOrdering.NULLS_FIRST
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
index 280fe1068d81..53e2d3f74bb3 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute,
AttributeMap, AttributeSet, BitwiseAnd, Empty2Null, Expression, HiveHash,
Literal, NamedExpression, Pmod, SortOrder}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute,
AttributeMap, AttributeSet, BitwiseAnd, Constant, Empty2Null, Expression,
HiveHash, Literal, NamedExpression, Pmod, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Sort}
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.catalyst.rules.Rule
@@ -199,13 +199,27 @@ object V1WritesUtils {
expressions.exists(_.exists(_.isInstanceOf[Empty2Null]))
}
+ // SortOrder sequence A (outputOrdering) satisfies SortOrder sequence B
(requiredOrdering)
+ // if and only if B is an equivalent of A or of A's prefix, except for
SortOrder in B that
+ // satisfies any constant SortOrder in A.
def isOrderingMatched(
requiredOrdering: Seq[Expression],
outputOrdering: Seq[SortOrder]): Boolean = {
- if (requiredOrdering.length > outputOrdering.length) {
+ val (constantOutputOrdering, nonConstantOutputOrdering) =
outputOrdering.partition {
+ case SortOrder(_, Constant, _, _) => true
+ case SortOrder(child, _, _, _) => child.foldable
+ }
+
+ val effectiveRequiredOrdering = requiredOrdering.filterNot { requiredOrder
=>
+ constantOutputOrdering.exists { outputOrder =>
+ outputOrder.satisfies(outputOrder.copy(child = requiredOrder))
+ }
+ }
+
+ if (effectiveRequiredOrdering.length > nonConstantOutputOrdering.length) {
false
} else {
- requiredOrdering.zip(outputOrdering).forall {
+ effectiveRequiredOrdering.zip(nonConstantOutputOrdering).forall {
case (requiredOrder, outputOrder) =>
outputOrder.satisfies(outputOrder.copy(child = requiredOrder))
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowEvaluatorFactoryBase.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowEvaluatorFactoryBase.scala
index c2dedda832e2..3ce11acfc4d1 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowEvaluatorFactoryBase.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowEvaluatorFactoryBase.scala
@@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Add,
AggregateWindowFunction, Ascending, Attribute, BoundReference, CurrentRow,
DateAdd, DateAddYMInterval, DecimalAddNoOverflowCheck, Descending, Expression,
ExtractANSIIntervalDays, FrameLessOffsetWindowFunction, FrameType,
IdentityProjection, IntegerLiteral, MutableProjection, NamedExpression,
OffsetWindowFunction, PythonFuncExpression, RangeFrame, RowFrame, RowOrdering,
SortOrder, SpecifiedWindowFrame, TimestampAddInterval, TimestampA [...]
+import org.apache.spark.sql.catalyst.expressions.{Add,
AggregateWindowFunction, Ascending, Attribute, BoundReference, Constant,
CurrentRow, DateAdd, DateAddYMInterval, DecimalAddNoOverflowCheck, Descending,
Expression, ExtractANSIIntervalDays, FrameLessOffsetWindowFunction, FrameType,
IdentityProjection, IntegerLiteral, MutableProjection, NamedExpression,
OffsetWindowFunction, PythonFuncExpression, RangeFrame, RowFrame, RowOrdering,
SortOrder, SpecifiedWindowFrame, TimestampAddInterval, [...]
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf
@@ -95,7 +95,7 @@ trait WindowEvaluatorFactoryBase {
// Flip the sign of the offset when processing the order is descending
val boundOffset = sortExpr.direction match {
case Descending => UnaryMinus(offset)
- case Ascending => offset
+ case Ascending | Constant => offset
}
// Create the projection which returns the current 'value' modified by
adding the offset.
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
index 80d771428d90..c16b02906724 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
@@ -63,10 +63,23 @@ trait V1WriteCommandSuiteBase extends SQLTestUtils with
AdaptiveSparkPlanHelper
hasLogicalSort: Boolean,
orderingMatched: Boolean,
hasEmpty2Null: Boolean = false)(query: => Unit): Unit = {
- var optimizedPlan: LogicalPlan = null
+ executeAndCheckOrderingAndCustomValidate(
+ hasLogicalSort, orderingMatched, hasEmpty2Null)(query)(_ => ())
+ }
+
+ /**
+ * Execute a write query and check ordering of the plan, then do custom
validation
+ */
+ protected def executeAndCheckOrderingAndCustomValidate(
+ hasLogicalSort: Boolean,
+ orderingMatched: Boolean,
+ hasEmpty2Null: Boolean = false)(query: => Unit)(
+ customValidate: LogicalPlan => Unit): Unit = {
+ @volatile var optimizedPlan: LogicalPlan = null
val listener = new QueryExecutionListener {
override def onSuccess(funcName: String, qe: QueryExecution, durationNs:
Long): Unit = {
+ val conf = qe.sparkSession.sessionState.conf
qe.optimizedPlan match {
case w: V1WriteCommand =>
if (hasLogicalSort && conf.getConf(SQLConf.PLANNED_WRITE_ENABLED))
{
@@ -87,7 +100,8 @@ trait V1WriteCommandSuiteBase extends SQLTestUtils with
AdaptiveSparkPlanHelper
// Check whether the output ordering is matched before FileFormatWriter
executes rdd.
assert(FileFormatWriter.outputOrderingMatched == orderingMatched,
- s"Expect: $orderingMatched, Actual:
${FileFormatWriter.outputOrderingMatched}")
+ s"Expect orderingMatched: $orderingMatched, " +
+ s"Actual: ${FileFormatWriter.outputOrderingMatched}")
sparkContext.listenerBus.waitUntilEmpty()
@@ -103,6 +117,8 @@ trait V1WriteCommandSuiteBase extends SQLTestUtils with
AdaptiveSparkPlanHelper
assert(empty2nullExpr == hasEmpty2Null,
s"Expect hasEmpty2Null: $hasEmpty2Null, Actual: $empty2nullExpr.
Plan:\n$optimizedPlan")
+ customValidate(optimizedPlan)
+
spark.listenerManager.unregister(listener)
}
}
@@ -391,4 +407,30 @@ class V1WriteCommandSuite extends QueryTest with
SharedSparkSession with V1Write
}
}
}
+
+ test("v1 write with sort by literal column preserve custom order") {
+ withPlannedWrite { _ =>
+ withTable("t") {
+ sql(
+ """
+ |CREATE TABLE t(i INT, j INT, k STRING) USING PARQUET
+ |PARTITIONED BY (k)
+ |""".stripMargin)
+ executeAndCheckOrderingAndCustomValidate(hasLogicalSort = true,
orderingMatched = true) {
+ sql(
+ """
+ |INSERT OVERWRITE t
+ |SELECT i, j, '0' as k FROM t0 SORT BY k, i
+ |""".stripMargin)
+ } { optimizedPlan =>
+ assert {
+ optimizedPlan.outputOrdering.exists {
+ case SortOrder(attr: AttributeReference, _, _, _) => attr.name
== "i"
+ case _ => false
+ }
+ }
+ }
+ }
+ }
+ }
}
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala
index e0e056be5987..55bb2c60dcca 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.hive.execution.command
import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
SortOrder}
import org.apache.spark.sql.execution.datasources.V1WriteCommandSuiteBase
import org.apache.spark.sql.hive.HiveUtils._
import org.apache.spark.sql.hive.test.TestHiveSingleton
@@ -126,4 +127,35 @@ class V1WriteHiveCommandSuite
}
}
}
+
+ test("v1 write to hive table with sort by literal column preserve custom
order") {
+ withCovnertMetastore { _ =>
+ withPlannedWrite { _ =>
+ withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
+ withTable("t") {
+ sql(
+ """
+ |CREATE TABLE t(i INT, j INT, k STRING) STORED AS PARQUET
+ |PARTITIONED BY (k)
+ |""".stripMargin)
+ executeAndCheckOrderingAndCustomValidate(
+ hasLogicalSort = true, orderingMatched = true) {
+ sql(
+ """
+ |INSERT OVERWRITE t
+ |SELECT i, j, '0' as k FROM t0 SORT BY k, i
+ |""".stripMargin)
+ } { optimizedPlan =>
+ assert {
+ optimizedPlan.outputOrdering.exists {
+ case SortOrder(attr: AttributeReference, _, _, _) =>
attr.name == "i"
+ case _ => false
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]