This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 0f6d7643a [KYUUBI #6554] Delete redundant code related to zorder
0f6d7643a is described below
commit 0f6d7643aea0a82ebb2a7179b9422edb2c3d13d8
Author: huangxiaoping <[email protected]>
AuthorDate: Tue Jul 23 12:14:55 2024 +0800
[KYUUBI #6554] Delete redundant code related to zorder
# :mag: Description
## Issue References ๐
This pull request fixes #6554
## Describe Your Solution ๐ง
- Delete
`/kyuubi/extensions/spark/kyuubi-extension-spark-3-x/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWritingBase.scala`
file
- Rename `InsertZorderBeforeWriting33.scala` to
`InsertZorderBeforeWriting.scala`
- Rename `InsertZorderHelper33, InsertZorderBeforeWritingDatasource33,
InsertZorderBeforeWritingHive33, ZorderSuiteSpark33` to `InsertZorderHelper,
InsertZorderBeforeWritingDatasource, InsertZorderBeforeWritingHive,
ZorderSuiteSpark`
## Types of changes :bookmark:
- [ ] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Test Plan ๐งช
#### Behavior Without This Pull Request :coffin:
#### Behavior With This Pull Request :tada:
#### Related Unit Tests
---
# Checklist ๐
- [x] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
**Be nice. Be informative.**
Closes #6555 from huangxiaopingRD/6554.
Closes #6554
26de4fa09 [huangxiaoping] [KYUUBI #6554] Delete redundant code related to
zorder
Authored-by: huangxiaoping <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../kyuubi/sql/KyuubiSparkSQLCommonExtension.scala | 6 +-
...ing33.scala => InsertZorderBeforeWriting.scala} | 14 +-
.../sql/zorder/InsertZorderBeforeWritingBase.scala | 188 ---------------------
.../scala/org/apache/spark/sql/ZorderSuite.scala | 6 +-
.../kyuubi/sql/KyuubiSparkSQLCommonExtension.scala | 6 +-
.../sql/zorder/InsertZorderBeforeWriting.scala | 14 +-
.../sql/zorder/InsertZorderBeforeWritingBase.scala | 155 -----------------
.../kyuubi/sql/KyuubiSparkSQLCommonExtension.scala | 6 +-
.../sql/zorder/InsertZorderBeforeWriting.scala | 14 +-
.../sql/zorder/InsertZorderBeforeWritingBase.scala | 155 -----------------
10 files changed, 39 insertions(+), 525 deletions(-)
diff --git
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala
index c001ffc6c..3dda669a8 100644
---
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala
@@ -19,7 +19,7 @@ package org.apache.kyuubi.sql
import org.apache.spark.sql.SparkSessionExtensions
-import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource33,
InsertZorderBeforeWritingHive33, ResolveZorder}
+import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource,
InsertZorderBeforeWritingHive, ResolveZorder}
class KyuubiSparkSQLCommonExtension extends (SparkSessionExtensions => Unit) {
override def apply(extensions: SparkSessionExtensions): Unit = {
@@ -38,8 +38,8 @@ object KyuubiSparkSQLCommonExtension {
// should be applied before
// RepartitionBeforeWriting and RebalanceBeforeWriting
// because we can only apply one of them (i.e. Global Sort or
Repartition/Rebalance)
-
extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingDatasource33)
- extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingHive33)
+ extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingDatasource)
+ extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingHive)
extensions.injectPostHocResolutionRule(FinalStageConfigIsolationCleanRule)
extensions.injectQueryStagePrepRule(_ => InsertShuffleNodeBeforeJoin)
diff --git
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting33.scala
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
similarity index 96%
rename from
extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting33.scala
rename to
extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
index 4ae2057dc..4b2494bc8 100644
---
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting33.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
@@ -28,7 +28,11 @@ import
org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, Inse
import org.apache.kyuubi.sql.{KyuubiSQLConf, KyuubiSQLExtensionException}
-trait InsertZorderHelper33 extends Rule[LogicalPlan] with ZorderBuilder {
+trait ZorderBuilder {
+ def buildZorder(children: Seq[Expression]): ZorderBase
+}
+
+trait InsertZorderHelper extends Rule[LogicalPlan] with ZorderBuilder {
private val KYUUBI_ZORDER_ENABLED = "kyuubi.zorder.enabled"
private val KYUUBI_ZORDER_COLS = "kyuubi.zorder.cols"
@@ -140,8 +144,8 @@ trait InsertZorderHelper33 extends Rule[LogicalPlan] with
ZorderBuilder {
}
}
-case class InsertZorderBeforeWritingDatasource33(session: SparkSession)
- extends InsertZorderHelper33 {
+case class InsertZorderBeforeWritingDatasource(session: SparkSession)
+ extends InsertZorderHelper {
override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match {
case insert: InsertIntoHadoopFsRelationCommand
if insert.query.resolved &&
@@ -172,8 +176,8 @@ case class InsertZorderBeforeWritingDatasource33(session:
SparkSession)
}
}
-case class InsertZorderBeforeWritingHive33(session: SparkSession)
- extends InsertZorderHelper33 {
+case class InsertZorderBeforeWritingHive(session: SparkSession)
+ extends InsertZorderHelper {
override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match {
case insert: InsertIntoHiveTable
if insert.query.resolved &&
diff --git
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWritingBase.scala
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWritingBase.scala
deleted file mode 100644
index a99b6cca7..000000000
---
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWritingBase.scala
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.zorder
-
-import java.util.Locale
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.{Ascending, Expression,
NullsLast, SortOrder}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project,
Repartition, RepartitionByExpression, Sort}
-import org.apache.spark.sql.catalyst.rules.Rule
-import
org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand
-import
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
-import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand,
InsertIntoHiveTable, OptimizedCreateHiveTableAsSelectCommand}
-
-import org.apache.kyuubi.sql.KyuubiSQLConf
-
-/**
- * TODO: shall we forbid zorder if it's dynamic partition inserts ?
- * Insert zorder before writing datasource if the target table properties has
zorder properties
- */
-abstract class InsertZorderBeforeWritingDatasourceBase
- extends InsertZorderHelper {
- override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match {
- case insert: InsertIntoHadoopFsRelationCommand
- if insert.query.resolved && insert.bucketSpec.isEmpty &&
insert.catalogTable.isDefined &&
- isZorderEnabled(insert.catalogTable.get.properties) =>
- val newQuery = insertZorder(insert.catalogTable.get, insert.query)
- if (newQuery.eq(insert.query)) {
- insert
- } else {
- insert.copy(query = newQuery)
- }
-
- case ctas: CreateDataSourceTableAsSelectCommand
- if ctas.query.resolved && ctas.table.bucketSpec.isEmpty &&
- isZorderEnabled(ctas.table.properties) =>
- val newQuery = insertZorder(ctas.table, ctas.query)
- if (newQuery.eq(ctas.query)) {
- ctas
- } else {
- ctas.copy(query = newQuery)
- }
-
- case _ => plan
- }
-}
-
-/**
- * TODO: shall we forbid zorder if it's dynamic partition inserts ?
- * Insert zorder before writing hive if the target table properties has zorder
properties
- */
-abstract class InsertZorderBeforeWritingHiveBase
- extends InsertZorderHelper {
- override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match {
- case insert: InsertIntoHiveTable
- if insert.query.resolved && insert.table.bucketSpec.isEmpty &&
- isZorderEnabled(insert.table.properties) =>
- val newQuery = insertZorder(insert.table, insert.query)
- if (newQuery.eq(insert.query)) {
- insert
- } else {
- insert.copy(query = newQuery)
- }
-
- case ctas: CreateHiveTableAsSelectCommand
- if ctas.query.resolved && ctas.tableDesc.bucketSpec.isEmpty &&
- isZorderEnabled(ctas.tableDesc.properties) =>
- val newQuery = insertZorder(ctas.tableDesc, ctas.query)
- if (newQuery.eq(ctas.query)) {
- ctas
- } else {
- ctas.copy(query = newQuery)
- }
-
- case octas: OptimizedCreateHiveTableAsSelectCommand
- if octas.query.resolved && octas.tableDesc.bucketSpec.isEmpty &&
- isZorderEnabled(octas.tableDesc.properties) =>
- val newQuery = insertZorder(octas.tableDesc, octas.query)
- if (newQuery.eq(octas.query)) {
- octas
- } else {
- octas.copy(query = newQuery)
- }
-
- case _ => plan
- }
-}
-
-trait ZorderBuilder {
- def buildZorder(children: Seq[Expression]): ZorderBase
-}
-
-trait InsertZorderHelper extends Rule[LogicalPlan] with ZorderBuilder {
- private val KYUUBI_ZORDER_ENABLED = "kyuubi.zorder.enabled"
- private val KYUUBI_ZORDER_COLS = "kyuubi.zorder.cols"
-
- def isZorderEnabled(props: Map[String, String]): Boolean = {
- props.contains(KYUUBI_ZORDER_ENABLED) &&
- "true".equalsIgnoreCase(props(KYUUBI_ZORDER_ENABLED)) &&
- props.contains(KYUUBI_ZORDER_COLS)
- }
-
- def getZorderColumns(props: Map[String, String]): Seq[String] = {
- val cols = props.get(KYUUBI_ZORDER_COLS)
- assert(cols.isDefined)
- cols.get.split(",").map(_.trim.toLowerCase(Locale.ROOT))
- }
-
- def canInsertZorder(query: LogicalPlan): Boolean = query match {
- case Project(_, child) => canInsertZorder(child)
- // TODO: actually, we can force zorder even if existed some shuffle
- case _: Sort => false
- case _: RepartitionByExpression => false
- case _: Repartition => false
- case _ => true
- }
-
- def insertZorder(catalogTable: CatalogTable, plan: LogicalPlan): LogicalPlan
= {
- if (!canInsertZorder(plan)) {
- return plan
- }
- val cols = getZorderColumns(catalogTable.properties)
- val attrs = plan.output.map(attr => (attr.name, attr)).toMap
- if (cols.exists(!attrs.contains(_))) {
- logWarning(s"target table does not contain all zorder cols:
${cols.mkString(",")}, " +
- s"please check your table properties ${KYUUBI_ZORDER_COLS}.")
- plan
- } else {
- val bound = cols.map(attrs(_))
- val orderExpr =
- if (bound.length == 1) {
- bound.head
- } else {
- buildZorder(bound)
- }
- // TODO: We can do rebalance partitions before local sort of zorder
after SPARK 3.3
- // see https://github.com/apache/spark/pull/34542
- Sort(
- SortOrder(orderExpr, Ascending, NullsLast, Seq.empty) :: Nil,
- conf.getConf(KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED),
- plan)
- }
- }
-
- def applyInternal(plan: LogicalPlan): LogicalPlan
-
- final override def apply(plan: LogicalPlan): LogicalPlan = {
- if (conf.getConf(KyuubiSQLConf.INSERT_ZORDER_BEFORE_WRITING)) {
- applyInternal(plan)
- } else {
- plan
- }
- }
-}
-
-/**
- * TODO: shall we forbid zorder if it's dynamic partition inserts ?
- * Insert zorder before writing datasource if the target table properties has
zorder properties
- */
-case class InsertZorderBeforeWritingDatasource(session: SparkSession)
- extends InsertZorderBeforeWritingDatasourceBase {
- override def buildZorder(children: Seq[Expression]): ZorderBase =
Zorder(children)
-}
-
-/**
- * TODO: shall we forbid zorder if it's dynamic partition inserts ?
- * Insert zorder before writing hive if the target table properties has zorder
properties
- */
-case class InsertZorderBeforeWritingHive(session: SparkSession)
- extends InsertZorderBeforeWritingHiveBase {
- override def buildZorder(children: Seq[Expression]): ZorderBase =
Zorder(children)
-}
diff --git
a/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/ZorderSuite.scala
b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/ZorderSuite.scala
index a08366f1d..d18e30359 100644
---
a/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/ZorderSuite.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/ZorderSuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.kyuubi.sql.{KyuubiSQLConf, SparkKyuubiSparkSQLParser}
import org.apache.kyuubi.sql.zorder.Zorder
-trait ZorderSuiteSpark33 extends ZorderSuiteBase {
+trait ZorderSuiteSpark extends ZorderSuiteBase {
test("Add rebalance before zorder") {
Seq("true" -> false, "false" -> true).foreach { case (useOriginalOrdering,
zorder) =>
@@ -115,10 +115,10 @@ trait ParserSuite { self: ZorderSuiteBase =>
class ZorderWithCodegenEnabledSuite
extends ZorderWithCodegenEnabledSuiteBase
- with ZorderSuiteSpark33
+ with ZorderSuiteSpark
with ParserSuite {}
class ZorderWithCodegenDisabledSuite
extends ZorderWithCodegenDisabledSuiteBase
- with ZorderSuiteSpark33
+ with ZorderSuiteSpark
with ParserSuite {}
diff --git
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala
index f39ad3cc3..c4ddcef2b 100644
---
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala
@@ -19,7 +19,7 @@ package org.apache.kyuubi.sql
import org.apache.spark.sql.SparkSessionExtensions
-import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource33,
InsertZorderBeforeWritingHive33, ResolveZorder}
+import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource,
InsertZorderBeforeWritingHive, ResolveZorder}
class KyuubiSparkSQLCommonExtension extends (SparkSessionExtensions => Unit) {
override def apply(extensions: SparkSessionExtensions): Unit = {
@@ -38,8 +38,8 @@ object KyuubiSparkSQLCommonExtension {
// should be applied before
// RepartitionBeforeWriting and RebalanceBeforeWriting
// because we can only apply one of them (i.e. Global Sort or
Repartition/Rebalance)
-
extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingDatasource33)
- extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingHive33)
+ extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingDatasource)
+ extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingHive)
extensions.injectPostHocResolutionRule(FinalStageConfigIsolationCleanRule)
extensions.injectQueryStagePrepRule(_ => InsertShuffleNodeBeforeJoin)
diff --git
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
index 73ed5e253..003ba6b68 100644
---
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
@@ -27,7 +27,11 @@ import
org.apache.spark.sql.hive.execution.InsertIntoHiveTable
import org.apache.kyuubi.sql.{KyuubiSQLConf, KyuubiSQLExtensionException}
-trait InsertZorderHelper33 extends Rule[LogicalPlan] with ZorderBuilder {
+trait ZorderBuilder {
+ def buildZorder(children: Seq[Expression]): ZorderBase
+}
+
+trait InsertZorderHelper extends Rule[LogicalPlan] with ZorderBuilder {
private val KYUUBI_ZORDER_ENABLED = "kyuubi.zorder.enabled"
private val KYUUBI_ZORDER_COLS = "kyuubi.zorder.cols"
@@ -139,8 +143,8 @@ trait InsertZorderHelper33 extends Rule[LogicalPlan] with
ZorderBuilder {
}
}
-case class InsertZorderBeforeWritingDatasource33(session: SparkSession)
- extends InsertZorderHelper33 {
+case class InsertZorderBeforeWritingDatasource(session: SparkSession)
+ extends InsertZorderHelper {
override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match {
case insert: InsertIntoHadoopFsRelationCommand
if insert.query.resolved &&
@@ -159,8 +163,8 @@ case class InsertZorderBeforeWritingDatasource33(session:
SparkSession)
}
}
-case class InsertZorderBeforeWritingHive33(session: SparkSession)
- extends InsertZorderHelper33 {
+case class InsertZorderBeforeWritingHive(session: SparkSession)
+ extends InsertZorderHelper {
override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match {
case insert: InsertIntoHiveTable
if insert.query.resolved &&
diff --git
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWritingBase.scala
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWritingBase.scala
deleted file mode 100644
index 2c59d148e..000000000
---
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWritingBase.scala
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.zorder
-
-import java.util.Locale
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.{Ascending, Expression,
NullsLast, SortOrder}
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.rules.Rule
-import
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
-import org.apache.spark.sql.hive.execution.InsertIntoHiveTable
-
-import org.apache.kyuubi.sql.KyuubiSQLConf
-
-/**
- * TODO: shall we forbid zorder if it's dynamic partition inserts ?
- * Insert zorder before writing datasource if the target table properties has
zorder properties
- */
-abstract class InsertZorderBeforeWritingDatasourceBase
- extends InsertZorderHelper {
- override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match {
- case insert: InsertIntoHadoopFsRelationCommand
- if insert.query.resolved && insert.bucketSpec.isEmpty &&
insert.catalogTable.isDefined &&
- isZorderEnabled(insert.catalogTable.get.properties) =>
- val newQuery = insertZorder(insert.catalogTable.get, insert.query)
- if (newQuery.eq(insert.query)) {
- insert
- } else {
- insert.copy(query = newQuery)
- }
- case _ => plan
- }
-}
-
-/**
- * TODO: shall we forbid zorder if it's dynamic partition inserts ?
- * Insert zorder before writing hive if the target table properties has zorder
properties
- */
-abstract class InsertZorderBeforeWritingHiveBase
- extends InsertZorderHelper {
- override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match {
- case insert: InsertIntoHiveTable
- if insert.query.resolved && insert.table.bucketSpec.isEmpty &&
- isZorderEnabled(insert.table.properties) =>
- val newQuery = insertZorder(insert.table, insert.query)
- if (newQuery.eq(insert.query)) {
- insert
- } else {
- insert.copy(query = newQuery)
- }
- case _ => plan
- }
-}
-
-trait ZorderBuilder {
- def buildZorder(children: Seq[Expression]): ZorderBase
-}
-
-trait InsertZorderHelper extends Rule[LogicalPlan] with ZorderBuilder {
- private val KYUUBI_ZORDER_ENABLED = "kyuubi.zorder.enabled"
- private val KYUUBI_ZORDER_COLS = "kyuubi.zorder.cols"
-
- def isZorderEnabled(props: Map[String, String]): Boolean = {
- props.contains(KYUUBI_ZORDER_ENABLED) &&
- "true".equalsIgnoreCase(props(KYUUBI_ZORDER_ENABLED)) &&
- props.contains(KYUUBI_ZORDER_COLS)
- }
-
- def getZorderColumns(props: Map[String, String]): Seq[String] = {
- val cols = props.get(KYUUBI_ZORDER_COLS)
- assert(cols.isDefined)
- cols.get.split(",").map(_.trim.toLowerCase(Locale.ROOT))
- }
-
- def canInsertZorder(query: LogicalPlan): Boolean = query match {
- case Project(_, child) => canInsertZorder(child)
- // TODO: actually, we can force zorder even if existed some shuffle
- case _: Sort => false
- case _: RepartitionByExpression => false
- case _: Repartition => false
- case _ => true
- }
-
- def insertZorder(catalogTable: CatalogTable, plan: LogicalPlan): LogicalPlan
= {
- if (!canInsertZorder(plan)) {
- return plan
- }
- val cols = getZorderColumns(catalogTable.properties)
- val attrs = plan.output.map(attr => (attr.name, attr)).toMap
- if (cols.exists(!attrs.contains(_))) {
- logWarning(s"target table does not contain all zorder cols:
${cols.mkString(",")}, " +
- s"please check your table properties ${KYUUBI_ZORDER_COLS}.")
- plan
- } else {
- val bound = cols.map(attrs(_))
- val orderExpr =
- if (bound.length == 1) {
- bound.head
- } else {
- buildZorder(bound)
- }
- // TODO: We can do rebalance partitions before local sort of zorder
after SPARK 3.3
- // see https://github.com/apache/spark/pull/34542
- Sort(
- SortOrder(orderExpr, Ascending, NullsLast, Seq.empty) :: Nil,
- conf.getConf(KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED),
- plan)
- }
- }
-
- def applyInternal(plan: LogicalPlan): LogicalPlan
-
- final override def apply(plan: LogicalPlan): LogicalPlan = {
- if (conf.getConf(KyuubiSQLConf.INSERT_ZORDER_BEFORE_WRITING)) {
- applyInternal(plan)
- } else {
- plan
- }
- }
-}
-
-/**
- * TODO: shall we forbid zorder if it's dynamic partition inserts ?
- * Insert zorder before writing datasource if the target table properties has
zorder properties
- */
-case class InsertZorderBeforeWritingDatasource(session: SparkSession)
- extends InsertZorderBeforeWritingDatasourceBase {
- override def buildZorder(children: Seq[Expression]): ZorderBase =
Zorder(children)
-}
-
-/**
- * TODO: shall we forbid zorder if it's dynamic partition inserts ?
- * Insert zorder before writing hive if the target table properties has zorder
properties
- */
-case class InsertZorderBeforeWritingHive(session: SparkSession)
- extends InsertZorderBeforeWritingHiveBase {
- override def buildZorder(children: Seq[Expression]): ZorderBase =
Zorder(children)
-}
diff --git
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala
index ad95ac429..450a2c35e 100644
---
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala
@@ -19,7 +19,7 @@ package org.apache.kyuubi.sql
import org.apache.spark.sql.SparkSessionExtensions
-import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource33,
InsertZorderBeforeWritingHive33, ResolveZorder}
+import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource,
InsertZorderBeforeWritingHive, ResolveZorder}
class KyuubiSparkSQLCommonExtension extends (SparkSessionExtensions => Unit) {
override def apply(extensions: SparkSessionExtensions): Unit = {
@@ -38,8 +38,8 @@ object KyuubiSparkSQLCommonExtension {
// should be applied before
// RepartitionBeforeWriting and RebalanceBeforeWriting
// because we can only apply one of them (i.e. Global Sort or
Repartition/Rebalance)
-
extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingDatasource33)
- extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingHive33)
+ extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingDatasource)
+ extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingHive)
extensions.injectPostHocResolutionRule(FinalStageConfigIsolationCleanRule)
extensions.injectQueryStagePrepRule(_ => InsertShuffleNodeBeforeJoin)
diff --git
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
index 73ed5e253..003ba6b68 100644
---
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
@@ -27,7 +27,11 @@ import
org.apache.spark.sql.hive.execution.InsertIntoHiveTable
import org.apache.kyuubi.sql.{KyuubiSQLConf, KyuubiSQLExtensionException}
-trait InsertZorderHelper33 extends Rule[LogicalPlan] with ZorderBuilder {
+trait ZorderBuilder {
+ def buildZorder(children: Seq[Expression]): ZorderBase
+}
+
+trait InsertZorderHelper extends Rule[LogicalPlan] with ZorderBuilder {
private val KYUUBI_ZORDER_ENABLED = "kyuubi.zorder.enabled"
private val KYUUBI_ZORDER_COLS = "kyuubi.zorder.cols"
@@ -139,8 +143,8 @@ trait InsertZorderHelper33 extends Rule[LogicalPlan] with
ZorderBuilder {
}
}
-case class InsertZorderBeforeWritingDatasource33(session: SparkSession)
- extends InsertZorderHelper33 {
+case class InsertZorderBeforeWritingDatasource(session: SparkSession)
+ extends InsertZorderHelper {
override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match {
case insert: InsertIntoHadoopFsRelationCommand
if insert.query.resolved &&
@@ -159,8 +163,8 @@ case class InsertZorderBeforeWritingDatasource33(session:
SparkSession)
}
}
-case class InsertZorderBeforeWritingHive33(session: SparkSession)
- extends InsertZorderHelper33 {
+case class InsertZorderBeforeWritingHive(session: SparkSession)
+ extends InsertZorderHelper {
override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match {
case insert: InsertIntoHiveTable
if insert.query.resolved &&
diff --git
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWritingBase.scala
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWritingBase.scala
deleted file mode 100644
index 2c59d148e..000000000
---
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWritingBase.scala
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.zorder
-
-import java.util.Locale
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.{Ascending, Expression,
NullsLast, SortOrder}
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.rules.Rule
-import
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
-import org.apache.spark.sql.hive.execution.InsertIntoHiveTable
-
-import org.apache.kyuubi.sql.KyuubiSQLConf
-
-/**
- * TODO: shall we forbid zorder if it's dynamic partition inserts ?
- * Insert zorder before writing datasource if the target table properties has
zorder properties
- */
-abstract class InsertZorderBeforeWritingDatasourceBase
- extends InsertZorderHelper {
- override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match {
- case insert: InsertIntoHadoopFsRelationCommand
- if insert.query.resolved && insert.bucketSpec.isEmpty &&
insert.catalogTable.isDefined &&
- isZorderEnabled(insert.catalogTable.get.properties) =>
- val newQuery = insertZorder(insert.catalogTable.get, insert.query)
- if (newQuery.eq(insert.query)) {
- insert
- } else {
- insert.copy(query = newQuery)
- }
- case _ => plan
- }
-}
-
-/**
- * TODO: shall we forbid zorder if it's dynamic partition inserts ?
- * Insert zorder before writing hive if the target table properties has zorder
properties
- */
-abstract class InsertZorderBeforeWritingHiveBase
- extends InsertZorderHelper {
- override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match {
- case insert: InsertIntoHiveTable
- if insert.query.resolved && insert.table.bucketSpec.isEmpty &&
- isZorderEnabled(insert.table.properties) =>
- val newQuery = insertZorder(insert.table, insert.query)
- if (newQuery.eq(insert.query)) {
- insert
- } else {
- insert.copy(query = newQuery)
- }
- case _ => plan
- }
-}
-
-trait ZorderBuilder {
- def buildZorder(children: Seq[Expression]): ZorderBase
-}
-
-trait InsertZorderHelper extends Rule[LogicalPlan] with ZorderBuilder {
- private val KYUUBI_ZORDER_ENABLED = "kyuubi.zorder.enabled"
- private val KYUUBI_ZORDER_COLS = "kyuubi.zorder.cols"
-
- def isZorderEnabled(props: Map[String, String]): Boolean = {
- props.contains(KYUUBI_ZORDER_ENABLED) &&
- "true".equalsIgnoreCase(props(KYUUBI_ZORDER_ENABLED)) &&
- props.contains(KYUUBI_ZORDER_COLS)
- }
-
- def getZorderColumns(props: Map[String, String]): Seq[String] = {
- val cols = props.get(KYUUBI_ZORDER_COLS)
- assert(cols.isDefined)
- cols.get.split(",").map(_.trim.toLowerCase(Locale.ROOT))
- }
-
- def canInsertZorder(query: LogicalPlan): Boolean = query match {
- case Project(_, child) => canInsertZorder(child)
- // TODO: actually, we can force zorder even if existed some shuffle
- case _: Sort => false
- case _: RepartitionByExpression => false
- case _: Repartition => false
- case _ => true
- }
-
- def insertZorder(catalogTable: CatalogTable, plan: LogicalPlan): LogicalPlan
= {
- if (!canInsertZorder(plan)) {
- return plan
- }
- val cols = getZorderColumns(catalogTable.properties)
- val attrs = plan.output.map(attr => (attr.name, attr)).toMap
- if (cols.exists(!attrs.contains(_))) {
- logWarning(s"target table does not contain all zorder cols:
${cols.mkString(",")}, " +
- s"please check your table properties ${KYUUBI_ZORDER_COLS}.")
- plan
- } else {
- val bound = cols.map(attrs(_))
- val orderExpr =
- if (bound.length == 1) {
- bound.head
- } else {
- buildZorder(bound)
- }
- // TODO: We can do rebalance partitions before local sort of zorder
after SPARK 3.3
- // see https://github.com/apache/spark/pull/34542
- Sort(
- SortOrder(orderExpr, Ascending, NullsLast, Seq.empty) :: Nil,
- conf.getConf(KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED),
- plan)
- }
- }
-
- def applyInternal(plan: LogicalPlan): LogicalPlan
-
- final override def apply(plan: LogicalPlan): LogicalPlan = {
- if (conf.getConf(KyuubiSQLConf.INSERT_ZORDER_BEFORE_WRITING)) {
- applyInternal(plan)
- } else {
- plan
- }
- }
-}
-
-/**
- * TODO: shall we forbid zorder if it's dynamic partition inserts ?
- * Insert zorder before writing datasource if the target table properties has
zorder properties
- */
-case class InsertZorderBeforeWritingDatasource(session: SparkSession)
- extends InsertZorderBeforeWritingDatasourceBase {
- override def buildZorder(children: Seq[Expression]): ZorderBase =
Zorder(children)
-}
-
-/**
- * TODO: shall we forbid zorder if it's dynamic partition inserts ?
- * Insert zorder before writing hive if the target table properties has zorder
properties
- */
-case class InsertZorderBeforeWritingHive(session: SparkSession)
- extends InsertZorderBeforeWritingHiveBase {
- override def buildZorder(children: Seq[Expression]): ZorderBase =
Zorder(children)
-}