This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f6d7643a [KYUUBI #6554] Delete redundant code related to zorder
0f6d7643a is described below

commit 0f6d7643aea0a82ebb2a7179b9422edb2c3d13d8
Author: huangxiaoping <[email protected]>
AuthorDate: Tue Jul 23 12:14:55 2024 +0800

    [KYUUBI #6554] Delete redundant code related to zorder
    
    # :mag: Description
    ## Issue References ๐Ÿ”—
    
    This pull request fixes #6554
    
    ## Describe Your Solution ๐Ÿ”ง
    
    - Delete 
`/kyuubi/extensions/spark/kyuubi-extension-spark-3-x/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWritingBase.scala`
 file
    - Rename `InsertZorderBeforeWriting33.scala` to 
`InsertZorderBeforeWriting.scala`
    - Rename `InsertZorderHelper33,  InsertZorderBeforeWritingDatasource33,  
InsertZorderBeforeWritingHive33, ZorderSuiteSpark33` to `InsertZorderHelper,  
InsertZorderBeforeWritingDatasource,  InsertZorderBeforeWritingHive, 
ZorderSuiteSpark`
    
    ## Types of changes :bookmark:
    
    - [ ] Bugfix (non-breaking change which fixes an issue)
    - [ ] New feature (non-breaking change which adds functionality)
    - [ ] Breaking change (fix or feature that would cause existing 
functionality to change)
    
    ## Test Plan ๐Ÿงช
    
    #### Behavior Without This Pull Request :coffin:
    
    #### Behavior With This Pull Request :tada:
    
    #### Related Unit Tests
    
    ---
    
    # Checklist ๐Ÿ“
    
    - [x] This patch was not authored or co-authored using [Generative 
Tooling](https://www.apache.org/legal/generative-tooling.html)
    
    **Be nice. Be informative.**
    
    Closes #6555 from huangxiaopingRD/6554.
    
    Closes #6554
    
    26de4fa09 [huangxiaoping] [KYUUBI #6554] Delete redundant code related to 
zorder
    
    Authored-by: huangxiaoping <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../kyuubi/sql/KyuubiSparkSQLCommonExtension.scala |   6 +-
 ...ing33.scala => InsertZorderBeforeWriting.scala} |  14 +-
 .../sql/zorder/InsertZorderBeforeWritingBase.scala | 188 ---------------------
 .../scala/org/apache/spark/sql/ZorderSuite.scala   |   6 +-
 .../kyuubi/sql/KyuubiSparkSQLCommonExtension.scala |   6 +-
 .../sql/zorder/InsertZorderBeforeWriting.scala     |  14 +-
 .../sql/zorder/InsertZorderBeforeWritingBase.scala | 155 -----------------
 .../kyuubi/sql/KyuubiSparkSQLCommonExtension.scala |   6 +-
 .../sql/zorder/InsertZorderBeforeWriting.scala     |  14 +-
 .../sql/zorder/InsertZorderBeforeWritingBase.scala | 155 -----------------
 10 files changed, 39 insertions(+), 525 deletions(-)

diff --git 
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala
 
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala
index c001ffc6c..3dda669a8 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala
@@ -19,7 +19,7 @@ package org.apache.kyuubi.sql
 
 import org.apache.spark.sql.SparkSessionExtensions
 
-import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource33, 
InsertZorderBeforeWritingHive33, ResolveZorder}
+import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource, 
InsertZorderBeforeWritingHive, ResolveZorder}
 
 class KyuubiSparkSQLCommonExtension extends (SparkSessionExtensions => Unit) {
   override def apply(extensions: SparkSessionExtensions): Unit = {
@@ -38,8 +38,8 @@ object KyuubiSparkSQLCommonExtension {
     // should be applied before
     // RepartitionBeforeWriting and RebalanceBeforeWriting
     // because we can only apply one of them (i.e. Global Sort or 
Repartition/Rebalance)
-    
extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingDatasource33)
-    extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingHive33)
+    extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingDatasource)
+    extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingHive)
     extensions.injectPostHocResolutionRule(FinalStageConfigIsolationCleanRule)
 
     extensions.injectQueryStagePrepRule(_ => InsertShuffleNodeBeforeJoin)
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting33.scala
 
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
similarity index 96%
rename from 
extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting33.scala
rename to 
extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
index 4ae2057dc..4b2494bc8 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting33.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
@@ -28,7 +28,11 @@ import 
org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, Inse
 
 import org.apache.kyuubi.sql.{KyuubiSQLConf, KyuubiSQLExtensionException}
 
-trait InsertZorderHelper33 extends Rule[LogicalPlan] with ZorderBuilder {
+trait ZorderBuilder {
+  def buildZorder(children: Seq[Expression]): ZorderBase
+}
+
+trait InsertZorderHelper extends Rule[LogicalPlan] with ZorderBuilder {
   private val KYUUBI_ZORDER_ENABLED = "kyuubi.zorder.enabled"
   private val KYUUBI_ZORDER_COLS = "kyuubi.zorder.cols"
 
@@ -140,8 +144,8 @@ trait InsertZorderHelper33 extends Rule[LogicalPlan] with 
ZorderBuilder {
   }
 }
 
-case class InsertZorderBeforeWritingDatasource33(session: SparkSession)
-  extends InsertZorderHelper33 {
+case class InsertZorderBeforeWritingDatasource(session: SparkSession)
+  extends InsertZorderHelper {
   override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match {
     case insert: InsertIntoHadoopFsRelationCommand
         if insert.query.resolved &&
@@ -172,8 +176,8 @@ case class InsertZorderBeforeWritingDatasource33(session: 
SparkSession)
   }
 }
 
-case class InsertZorderBeforeWritingHive33(session: SparkSession)
-  extends InsertZorderHelper33 {
+case class InsertZorderBeforeWritingHive(session: SparkSession)
+  extends InsertZorderHelper {
   override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match {
     case insert: InsertIntoHiveTable
         if insert.query.resolved &&
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWritingBase.scala
 
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWritingBase.scala
deleted file mode 100644
index a99b6cca7..000000000
--- 
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWritingBase.scala
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.zorder
-
-import java.util.Locale
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.{Ascending, Expression, 
NullsLast, SortOrder}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, 
Repartition, RepartitionByExpression, Sort}
-import org.apache.spark.sql.catalyst.rules.Rule
-import 
org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand
-import 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
-import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, 
InsertIntoHiveTable, OptimizedCreateHiveTableAsSelectCommand}
-
-import org.apache.kyuubi.sql.KyuubiSQLConf
-
-/**
- * TODO: shall we forbid zorder if it's dynamic partition inserts ?
- * Insert zorder before writing datasource if the target table properties has 
zorder properties
- */
-abstract class InsertZorderBeforeWritingDatasourceBase
-  extends InsertZorderHelper {
-  override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match {
-    case insert: InsertIntoHadoopFsRelationCommand
-        if insert.query.resolved && insert.bucketSpec.isEmpty && 
insert.catalogTable.isDefined &&
-          isZorderEnabled(insert.catalogTable.get.properties) =>
-      val newQuery = insertZorder(insert.catalogTable.get, insert.query)
-      if (newQuery.eq(insert.query)) {
-        insert
-      } else {
-        insert.copy(query = newQuery)
-      }
-
-    case ctas: CreateDataSourceTableAsSelectCommand
-        if ctas.query.resolved && ctas.table.bucketSpec.isEmpty &&
-          isZorderEnabled(ctas.table.properties) =>
-      val newQuery = insertZorder(ctas.table, ctas.query)
-      if (newQuery.eq(ctas.query)) {
-        ctas
-      } else {
-        ctas.copy(query = newQuery)
-      }
-
-    case _ => plan
-  }
-}
-
-/**
- * TODO: shall we forbid zorder if it's dynamic partition inserts ?
- * Insert zorder before writing hive if the target table properties has zorder 
properties
- */
-abstract class InsertZorderBeforeWritingHiveBase
-  extends InsertZorderHelper {
-  override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match {
-    case insert: InsertIntoHiveTable
-        if insert.query.resolved && insert.table.bucketSpec.isEmpty &&
-          isZorderEnabled(insert.table.properties) =>
-      val newQuery = insertZorder(insert.table, insert.query)
-      if (newQuery.eq(insert.query)) {
-        insert
-      } else {
-        insert.copy(query = newQuery)
-      }
-
-    case ctas: CreateHiveTableAsSelectCommand
-        if ctas.query.resolved && ctas.tableDesc.bucketSpec.isEmpty &&
-          isZorderEnabled(ctas.tableDesc.properties) =>
-      val newQuery = insertZorder(ctas.tableDesc, ctas.query)
-      if (newQuery.eq(ctas.query)) {
-        ctas
-      } else {
-        ctas.copy(query = newQuery)
-      }
-
-    case octas: OptimizedCreateHiveTableAsSelectCommand
-        if octas.query.resolved && octas.tableDesc.bucketSpec.isEmpty &&
-          isZorderEnabled(octas.tableDesc.properties) =>
-      val newQuery = insertZorder(octas.tableDesc, octas.query)
-      if (newQuery.eq(octas.query)) {
-        octas
-      } else {
-        octas.copy(query = newQuery)
-      }
-
-    case _ => plan
-  }
-}
-
-trait ZorderBuilder {
-  def buildZorder(children: Seq[Expression]): ZorderBase
-}
-
-trait InsertZorderHelper extends Rule[LogicalPlan] with ZorderBuilder {
-  private val KYUUBI_ZORDER_ENABLED = "kyuubi.zorder.enabled"
-  private val KYUUBI_ZORDER_COLS = "kyuubi.zorder.cols"
-
-  def isZorderEnabled(props: Map[String, String]): Boolean = {
-    props.contains(KYUUBI_ZORDER_ENABLED) &&
-    "true".equalsIgnoreCase(props(KYUUBI_ZORDER_ENABLED)) &&
-    props.contains(KYUUBI_ZORDER_COLS)
-  }
-
-  def getZorderColumns(props: Map[String, String]): Seq[String] = {
-    val cols = props.get(KYUUBI_ZORDER_COLS)
-    assert(cols.isDefined)
-    cols.get.split(",").map(_.trim.toLowerCase(Locale.ROOT))
-  }
-
-  def canInsertZorder(query: LogicalPlan): Boolean = query match {
-    case Project(_, child) => canInsertZorder(child)
-    // TODO: actually, we can force zorder even if existed some shuffle
-    case _: Sort => false
-    case _: RepartitionByExpression => false
-    case _: Repartition => false
-    case _ => true
-  }
-
-  def insertZorder(catalogTable: CatalogTable, plan: LogicalPlan): LogicalPlan 
= {
-    if (!canInsertZorder(plan)) {
-      return plan
-    }
-    val cols = getZorderColumns(catalogTable.properties)
-    val attrs = plan.output.map(attr => (attr.name, attr)).toMap
-    if (cols.exists(!attrs.contains(_))) {
-      logWarning(s"target table does not contain all zorder cols: 
${cols.mkString(",")}, " +
-        s"please check your table properties ${KYUUBI_ZORDER_COLS}.")
-      plan
-    } else {
-      val bound = cols.map(attrs(_))
-      val orderExpr =
-        if (bound.length == 1) {
-          bound.head
-        } else {
-          buildZorder(bound)
-        }
-      // TODO: We can do rebalance partitions before local sort of zorder 
after SPARK 3.3
-      //   see https://github.com/apache/spark/pull/34542
-      Sort(
-        SortOrder(orderExpr, Ascending, NullsLast, Seq.empty) :: Nil,
-        conf.getConf(KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED),
-        plan)
-    }
-  }
-
-  def applyInternal(plan: LogicalPlan): LogicalPlan
-
-  final override def apply(plan: LogicalPlan): LogicalPlan = {
-    if (conf.getConf(KyuubiSQLConf.INSERT_ZORDER_BEFORE_WRITING)) {
-      applyInternal(plan)
-    } else {
-      plan
-    }
-  }
-}
-
-/**
- * TODO: shall we forbid zorder if it's dynamic partition inserts ?
- * Insert zorder before writing datasource if the target table properties has 
zorder properties
- */
-case class InsertZorderBeforeWritingDatasource(session: SparkSession)
-  extends InsertZorderBeforeWritingDatasourceBase {
-  override def buildZorder(children: Seq[Expression]): ZorderBase = 
Zorder(children)
-}
-
-/**
- * TODO: shall we forbid zorder if it's dynamic partition inserts ?
- * Insert zorder before writing hive if the target table properties has zorder 
properties
- */
-case class InsertZorderBeforeWritingHive(session: SparkSession)
-  extends InsertZorderBeforeWritingHiveBase {
-  override def buildZorder(children: Seq[Expression]): ZorderBase = 
Zorder(children)
-}
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/ZorderSuite.scala
 
b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/ZorderSuite.scala
index a08366f1d..d18e30359 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/ZorderSuite.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/ZorderSuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.kyuubi.sql.{KyuubiSQLConf, SparkKyuubiSparkSQLParser}
 import org.apache.kyuubi.sql.zorder.Zorder
 
-trait ZorderSuiteSpark33 extends ZorderSuiteBase {
+trait ZorderSuiteSpark extends ZorderSuiteBase {
 
   test("Add rebalance before zorder") {
     Seq("true" -> false, "false" -> true).foreach { case (useOriginalOrdering, 
zorder) =>
@@ -115,10 +115,10 @@ trait ParserSuite { self: ZorderSuiteBase =>
 
 class ZorderWithCodegenEnabledSuite
   extends ZorderWithCodegenEnabledSuiteBase
-  with ZorderSuiteSpark33
+  with ZorderSuiteSpark
   with ParserSuite {}
 
 class ZorderWithCodegenDisabledSuite
   extends ZorderWithCodegenDisabledSuiteBase
-  with ZorderSuiteSpark33
+  with ZorderSuiteSpark
   with ParserSuite {}
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala
 
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala
index f39ad3cc3..c4ddcef2b 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala
@@ -19,7 +19,7 @@ package org.apache.kyuubi.sql
 
 import org.apache.spark.sql.SparkSessionExtensions
 
-import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource33, 
InsertZorderBeforeWritingHive33, ResolveZorder}
+import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource, 
InsertZorderBeforeWritingHive, ResolveZorder}
 
 class KyuubiSparkSQLCommonExtension extends (SparkSessionExtensions => Unit) {
   override def apply(extensions: SparkSessionExtensions): Unit = {
@@ -38,8 +38,8 @@ object KyuubiSparkSQLCommonExtension {
     // should be applied before
     // RepartitionBeforeWriting and RebalanceBeforeWriting
     // because we can only apply one of them (i.e. Global Sort or 
Repartition/Rebalance)
-    
extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingDatasource33)
-    extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingHive33)
+    extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingDatasource)
+    extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingHive)
     extensions.injectPostHocResolutionRule(FinalStageConfigIsolationCleanRule)
 
     extensions.injectQueryStagePrepRule(_ => InsertShuffleNodeBeforeJoin)
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
 
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
index 73ed5e253..003ba6b68 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
@@ -27,7 +27,11 @@ import 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable
 
 import org.apache.kyuubi.sql.{KyuubiSQLConf, KyuubiSQLExtensionException}
 
-trait InsertZorderHelper33 extends Rule[LogicalPlan] with ZorderBuilder {
+trait ZorderBuilder {
+  def buildZorder(children: Seq[Expression]): ZorderBase
+}
+
+trait InsertZorderHelper extends Rule[LogicalPlan] with ZorderBuilder {
   private val KYUUBI_ZORDER_ENABLED = "kyuubi.zorder.enabled"
   private val KYUUBI_ZORDER_COLS = "kyuubi.zorder.cols"
 
@@ -139,8 +143,8 @@ trait InsertZorderHelper33 extends Rule[LogicalPlan] with 
ZorderBuilder {
   }
 }
 
-case class InsertZorderBeforeWritingDatasource33(session: SparkSession)
-  extends InsertZorderHelper33 {
+case class InsertZorderBeforeWritingDatasource(session: SparkSession)
+  extends InsertZorderHelper {
   override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match {
     case insert: InsertIntoHadoopFsRelationCommand
         if insert.query.resolved &&
@@ -159,8 +163,8 @@ case class InsertZorderBeforeWritingDatasource33(session: 
SparkSession)
   }
 }
 
-case class InsertZorderBeforeWritingHive33(session: SparkSession)
-  extends InsertZorderHelper33 {
+case class InsertZorderBeforeWritingHive(session: SparkSession)
+  extends InsertZorderHelper {
   override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match {
     case insert: InsertIntoHiveTable
         if insert.query.resolved &&
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWritingBase.scala
 
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWritingBase.scala
deleted file mode 100644
index 2c59d148e..000000000
--- 
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWritingBase.scala
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.zorder
-
-import java.util.Locale
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.{Ascending, Expression, 
NullsLast, SortOrder}
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.rules.Rule
-import 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
-import org.apache.spark.sql.hive.execution.InsertIntoHiveTable
-
-import org.apache.kyuubi.sql.KyuubiSQLConf
-
-/**
- * TODO: shall we forbid zorder if it's dynamic partition inserts ?
- * Insert zorder before writing datasource if the target table properties has 
zorder properties
- */
-abstract class InsertZorderBeforeWritingDatasourceBase
-  extends InsertZorderHelper {
-  override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match {
-    case insert: InsertIntoHadoopFsRelationCommand
-        if insert.query.resolved && insert.bucketSpec.isEmpty && 
insert.catalogTable.isDefined &&
-          isZorderEnabled(insert.catalogTable.get.properties) =>
-      val newQuery = insertZorder(insert.catalogTable.get, insert.query)
-      if (newQuery.eq(insert.query)) {
-        insert
-      } else {
-        insert.copy(query = newQuery)
-      }
-    case _ => plan
-  }
-}
-
-/**
- * TODO: shall we forbid zorder if it's dynamic partition inserts ?
- * Insert zorder before writing hive if the target table properties has zorder 
properties
- */
-abstract class InsertZorderBeforeWritingHiveBase
-  extends InsertZorderHelper {
-  override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match {
-    case insert: InsertIntoHiveTable
-        if insert.query.resolved && insert.table.bucketSpec.isEmpty &&
-          isZorderEnabled(insert.table.properties) =>
-      val newQuery = insertZorder(insert.table, insert.query)
-      if (newQuery.eq(insert.query)) {
-        insert
-      } else {
-        insert.copy(query = newQuery)
-      }
-    case _ => plan
-  }
-}
-
-trait ZorderBuilder {
-  def buildZorder(children: Seq[Expression]): ZorderBase
-}
-
-trait InsertZorderHelper extends Rule[LogicalPlan] with ZorderBuilder {
-  private val KYUUBI_ZORDER_ENABLED = "kyuubi.zorder.enabled"
-  private val KYUUBI_ZORDER_COLS = "kyuubi.zorder.cols"
-
-  def isZorderEnabled(props: Map[String, String]): Boolean = {
-    props.contains(KYUUBI_ZORDER_ENABLED) &&
-    "true".equalsIgnoreCase(props(KYUUBI_ZORDER_ENABLED)) &&
-    props.contains(KYUUBI_ZORDER_COLS)
-  }
-
-  def getZorderColumns(props: Map[String, String]): Seq[String] = {
-    val cols = props.get(KYUUBI_ZORDER_COLS)
-    assert(cols.isDefined)
-    cols.get.split(",").map(_.trim.toLowerCase(Locale.ROOT))
-  }
-
-  def canInsertZorder(query: LogicalPlan): Boolean = query match {
-    case Project(_, child) => canInsertZorder(child)
-    // TODO: actually, we can force zorder even if existed some shuffle
-    case _: Sort => false
-    case _: RepartitionByExpression => false
-    case _: Repartition => false
-    case _ => true
-  }
-
-  def insertZorder(catalogTable: CatalogTable, plan: LogicalPlan): LogicalPlan 
= {
-    if (!canInsertZorder(plan)) {
-      return plan
-    }
-    val cols = getZorderColumns(catalogTable.properties)
-    val attrs = plan.output.map(attr => (attr.name, attr)).toMap
-    if (cols.exists(!attrs.contains(_))) {
-      logWarning(s"target table does not contain all zorder cols: 
${cols.mkString(",")}, " +
-        s"please check your table properties ${KYUUBI_ZORDER_COLS}.")
-      plan
-    } else {
-      val bound = cols.map(attrs(_))
-      val orderExpr =
-        if (bound.length == 1) {
-          bound.head
-        } else {
-          buildZorder(bound)
-        }
-      // TODO: We can do rebalance partitions before local sort of zorder 
after SPARK 3.3
-      //   see https://github.com/apache/spark/pull/34542
-      Sort(
-        SortOrder(orderExpr, Ascending, NullsLast, Seq.empty) :: Nil,
-        conf.getConf(KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED),
-        plan)
-    }
-  }
-
-  def applyInternal(plan: LogicalPlan): LogicalPlan
-
-  final override def apply(plan: LogicalPlan): LogicalPlan = {
-    if (conf.getConf(KyuubiSQLConf.INSERT_ZORDER_BEFORE_WRITING)) {
-      applyInternal(plan)
-    } else {
-      plan
-    }
-  }
-}
-
-/**
- * TODO: shall we forbid zorder if it's dynamic partition inserts ?
- * Insert zorder before writing datasource if the target table properties has 
zorder properties
- */
-case class InsertZorderBeforeWritingDatasource(session: SparkSession)
-  extends InsertZorderBeforeWritingDatasourceBase {
-  override def buildZorder(children: Seq[Expression]): ZorderBase = 
Zorder(children)
-}
-
-/**
- * TODO: shall we forbid zorder if it's dynamic partition inserts ?
- * Insert zorder before writing hive if the target table properties has zorder 
properties
- */
-case class InsertZorderBeforeWritingHive(session: SparkSession)
-  extends InsertZorderBeforeWritingHiveBase {
-  override def buildZorder(children: Seq[Expression]): ZorderBase = 
Zorder(children)
-}
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala
 
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala
index ad95ac429..450a2c35e 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala
@@ -19,7 +19,7 @@ package org.apache.kyuubi.sql
 
 import org.apache.spark.sql.SparkSessionExtensions
 
-import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource33, 
InsertZorderBeforeWritingHive33, ResolveZorder}
+import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource, 
InsertZorderBeforeWritingHive, ResolveZorder}
 
 class KyuubiSparkSQLCommonExtension extends (SparkSessionExtensions => Unit) {
   override def apply(extensions: SparkSessionExtensions): Unit = {
@@ -38,8 +38,8 @@ object KyuubiSparkSQLCommonExtension {
     // should be applied before
     // RepartitionBeforeWriting and RebalanceBeforeWriting
     // because we can only apply one of them (i.e. Global Sort or 
Repartition/Rebalance)
-    
extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingDatasource33)
-    extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingHive33)
+    extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingDatasource)
+    extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingHive)
     extensions.injectPostHocResolutionRule(FinalStageConfigIsolationCleanRule)
 
     extensions.injectQueryStagePrepRule(_ => InsertShuffleNodeBeforeJoin)
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
 
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
index 73ed5e253..003ba6b68 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
@@ -27,7 +27,11 @@ import 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable
 
 import org.apache.kyuubi.sql.{KyuubiSQLConf, KyuubiSQLExtensionException}
 
-trait InsertZorderHelper33 extends Rule[LogicalPlan] with ZorderBuilder {
+trait ZorderBuilder {
+  def buildZorder(children: Seq[Expression]): ZorderBase
+}
+
+trait InsertZorderHelper extends Rule[LogicalPlan] with ZorderBuilder {
   private val KYUUBI_ZORDER_ENABLED = "kyuubi.zorder.enabled"
   private val KYUUBI_ZORDER_COLS = "kyuubi.zorder.cols"
 
@@ -139,8 +143,8 @@ trait InsertZorderHelper33 extends Rule[LogicalPlan] with 
ZorderBuilder {
   }
 }
 
-case class InsertZorderBeforeWritingDatasource33(session: SparkSession)
-  extends InsertZorderHelper33 {
+case class InsertZorderBeforeWritingDatasource(session: SparkSession)
+  extends InsertZorderHelper {
   override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match {
     case insert: InsertIntoHadoopFsRelationCommand
         if insert.query.resolved &&
@@ -159,8 +163,8 @@ case class InsertZorderBeforeWritingDatasource33(session: 
SparkSession)
   }
 }
 
-case class InsertZorderBeforeWritingHive33(session: SparkSession)
-  extends InsertZorderHelper33 {
+case class InsertZorderBeforeWritingHive(session: SparkSession)
+  extends InsertZorderHelper {
   override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match {
     case insert: InsertIntoHiveTable
         if insert.query.resolved &&
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWritingBase.scala
 
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWritingBase.scala
deleted file mode 100644
index 2c59d148e..000000000
--- 
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWritingBase.scala
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.zorder
-
-import java.util.Locale
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.{Ascending, Expression, 
NullsLast, SortOrder}
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.rules.Rule
-import 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
-import org.apache.spark.sql.hive.execution.InsertIntoHiveTable
-
-import org.apache.kyuubi.sql.KyuubiSQLConf
-
-/**
- * TODO: shall we forbid zorder if it's dynamic partition inserts ?
- * Insert zorder before writing datasource if the target table properties has 
zorder properties
- */
-abstract class InsertZorderBeforeWritingDatasourceBase
-  extends InsertZorderHelper {
-  override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match {
-    case insert: InsertIntoHadoopFsRelationCommand
-        if insert.query.resolved && insert.bucketSpec.isEmpty && 
insert.catalogTable.isDefined &&
-          isZorderEnabled(insert.catalogTable.get.properties) =>
-      val newQuery = insertZorder(insert.catalogTable.get, insert.query)
-      if (newQuery.eq(insert.query)) {
-        insert
-      } else {
-        insert.copy(query = newQuery)
-      }
-    case _ => plan
-  }
-}
-
-/**
- * TODO: shall we forbid zorder if it's dynamic partition inserts ?
- * Insert zorder before writing hive if the target table properties has zorder 
properties
- */
-abstract class InsertZorderBeforeWritingHiveBase
-  extends InsertZorderHelper {
-  override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match {
-    case insert: InsertIntoHiveTable
-        if insert.query.resolved && insert.table.bucketSpec.isEmpty &&
-          isZorderEnabled(insert.table.properties) =>
-      val newQuery = insertZorder(insert.table, insert.query)
-      if (newQuery.eq(insert.query)) {
-        insert
-      } else {
-        insert.copy(query = newQuery)
-      }
-    case _ => plan
-  }
-}
-
-trait ZorderBuilder {
-  def buildZorder(children: Seq[Expression]): ZorderBase
-}
-
-trait InsertZorderHelper extends Rule[LogicalPlan] with ZorderBuilder {
-  private val KYUUBI_ZORDER_ENABLED = "kyuubi.zorder.enabled"
-  private val KYUUBI_ZORDER_COLS = "kyuubi.zorder.cols"
-
-  def isZorderEnabled(props: Map[String, String]): Boolean = {
-    props.contains(KYUUBI_ZORDER_ENABLED) &&
-    "true".equalsIgnoreCase(props(KYUUBI_ZORDER_ENABLED)) &&
-    props.contains(KYUUBI_ZORDER_COLS)
-  }
-
-  def getZorderColumns(props: Map[String, String]): Seq[String] = {
-    val cols = props.get(KYUUBI_ZORDER_COLS)
-    assert(cols.isDefined)
-    cols.get.split(",").map(_.trim.toLowerCase(Locale.ROOT))
-  }
-
-  def canInsertZorder(query: LogicalPlan): Boolean = query match {
-    case Project(_, child) => canInsertZorder(child)
-    // TODO: actually, we can force zorder even if existed some shuffle
-    case _: Sort => false
-    case _: RepartitionByExpression => false
-    case _: Repartition => false
-    case _ => true
-  }
-
-  def insertZorder(catalogTable: CatalogTable, plan: LogicalPlan): LogicalPlan 
= {
-    if (!canInsertZorder(plan)) {
-      return plan
-    }
-    val cols = getZorderColumns(catalogTable.properties)
-    val attrs = plan.output.map(attr => (attr.name, attr)).toMap
-    if (cols.exists(!attrs.contains(_))) {
-      logWarning(s"target table does not contain all zorder cols: 
${cols.mkString(",")}, " +
-        s"please check your table properties ${KYUUBI_ZORDER_COLS}.")
-      plan
-    } else {
-      val bound = cols.map(attrs(_))
-      val orderExpr =
-        if (bound.length == 1) {
-          bound.head
-        } else {
-          buildZorder(bound)
-        }
-      // TODO: We can do rebalance partitions before local sort of zorder 
after SPARK 3.3
-      //   see https://github.com/apache/spark/pull/34542
-      Sort(
-        SortOrder(orderExpr, Ascending, NullsLast, Seq.empty) :: Nil,
-        conf.getConf(KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED),
-        plan)
-    }
-  }
-
-  def applyInternal(plan: LogicalPlan): LogicalPlan
-
-  final override def apply(plan: LogicalPlan): LogicalPlan = {
-    if (conf.getConf(KyuubiSQLConf.INSERT_ZORDER_BEFORE_WRITING)) {
-      applyInternal(plan)
-    } else {
-      plan
-    }
-  }
-}
-
-/**
- * TODO: shall we forbid zorder if it's dynamic partition inserts ?
- * Insert zorder before writing datasource if the target table properties has 
zorder properties
- */
-case class InsertZorderBeforeWritingDatasource(session: SparkSession)
-  extends InsertZorderBeforeWritingDatasourceBase {
-  override def buildZorder(children: Seq[Expression]): ZorderBase = 
Zorder(children)
-}
-
-/**
- * TODO: shall we forbid zorder if it's dynamic partition inserts ?
- * Insert zorder before writing hive if the target table properties has zorder 
properties
- */
-case class InsertZorderBeforeWritingHive(session: SparkSession)
-  extends InsertZorderBeforeWritingHiveBase {
-  override def buildZorder(children: Seq[Expression]): ZorderBase = 
Zorder(children)
-}


Reply via email to