This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new ba2880c  [KYUUBI #1055] Refine zorder plan
ba2880c is described below

commit ba2880cd666809c6bba15386733252a25c743938
Author: ulysses-you <[email protected]>
AuthorDate: Thu Sep 9 10:15:49 2021 +0800

    [KYUUBI #1055] Refine zorder plan
    
    <!--
    Thanks for sending a pull request!
    
    Here are some tips for you:
      1. If this is your first time, please read our contributor guidelines: 
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
      2. If the PR is related to an issue in 
https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your 
PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
      3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., 
'[WIP][KYUUBI #XXXX] Your PR title ...'.
    -->
    
    ### _Why are the changes needed?_
    <!--
    Please clarify why the changes are needed. For instance,
      1. If you add a feature, you can talk about the use case of it.
      2. If you fix a bug, you can clarify why it is a bug.
    -->
    - Change origin `OptimizeZorderCommand` to `OptimizeZorderStatement` which 
holds the parsed plan
    - Add `OptimizeZorderCommand` which delegate an another command(e.g. 
`InsertIntoHive`) to execute; the reason we should add a new command (not reuse 
`InserIntoStatement`) is we may need to implement read and overwrite same table 
in future. And also it's tricky to add `InserIntoStatement` during analyze.
    - Change `ZorderBeforeWrite` to `ResolveZorder`, so we can resolve 
`OptimizeZorderStatement` to `OptimizeZorderCommand`
    
    So the code should look cleaner
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run 
test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #1055 from ulysses-you/zorder-insert.
    
    Closes #1055
    
    d252d35e [ulysses-you] simplify
    223f68ed [ulysses-you] refine zorder plan
    
    Authored-by: ulysses-you <[email protected]>
    Signed-off-by: ulysses-you <[email protected]>
---
 ...ion.scala => KyuubiSQLExtensionException.scala} | 12 +++-
 .../kyuubi/sql/KyuubiSparkSQLExtension.scala       |  4 +-
 .../kyuubi/sql/zorder/OptimizeZorderCommand.scala  | 58 +++++++++++++------
 ...ception.scala => OptimizeZorderStatement.scala} | 13 ++++-
 .../apache/kyuubi/sql/zorder/ResolveZorder.scala   | 63 ++++++++++++++++++++
 .../org/apache/kyuubi/sql/zorder/Zorder.scala      |  6 +-
 .../kyuubi/sql/zorder/ZorderBeforeWrite.scala      | 67 ----------------------
 .../kyuubi/sql/zorder/ZorderBytesUtils.scala       |  4 +-
 .../kyuubi/sql/zorder/ZorderSqlAstBuilder.scala    | 44 +++++++++-----
 .../apache/spark/sql/KyuubiExtensionSuite.scala    | 19 +++++-
 10 files changed, 181 insertions(+), 109 deletions(-)

diff --git 
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderException.scala
 
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLExtensionException.scala
similarity index 77%
copy from 
dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderException.scala
copy to 
dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLExtensionException.scala
index e8eef35..88c5a98 100644
--- 
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderException.scala
+++ 
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLExtensionException.scala
@@ -15,6 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.kyuubi.sql.zorder
+package org.apache.kyuubi.sql
 
-class ZorderException(message: String, cause: Throwable = null) extends 
Exception(message, cause)
+import java.sql.SQLException
+
+class KyuubiSQLExtensionException(reason: String, cause: Throwable)
+  extends SQLException(reason, cause) {
+
+  def this(reason: String) = {
+    this(reason, null)
+  }
+}
diff --git 
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
 
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
index 550fc27..34015f5 100644
--- 
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
+++ 
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
@@ -20,7 +20,7 @@ package org.apache.kyuubi.sql
 import org.apache.spark.sql.SparkSessionExtensions
 
 import org.apache.kyuubi.sql.sqlclassification.KyuubiSqlClassification
-import org.apache.kyuubi.sql.zorder.ZorderBeforeWrite
+import org.apache.kyuubi.sql.zorder.ResolveZorder
 import org.apache.kyuubi.sql.zorder.ZorderSparkSqlExtensionsParser
 
 // scalastyle:off line.size.limit
@@ -34,7 +34,7 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions 
=> Unit) {
   override def apply(extensions: SparkSessionExtensions): Unit = {
     // inject zorder parser and related rules
     extensions.injectParser{ case (_, parser) => new 
ZorderSparkSqlExtensionsParser(parser) }
-    extensions.injectResolutionRule(ZorderBeforeWrite)
+    extensions.injectResolutionRule(ResolveZorder)
 
     extensions.injectPostHocResolutionRule(KyuubiSqlClassification)
     extensions.injectPostHocResolutionRule(RepartitionBeforeWrite)
diff --git 
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderCommand.scala
 
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderCommand.scala
index 3be3bdb..763a8ab 100644
--- 
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderCommand.scala
+++ 
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderCommand.scala
@@ -17,27 +17,51 @@
 
 package org.apache.kyuubi.sql.zorder
 
-import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, 
UnresolvedRelation}
-import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, 
Expression, NullsLast, SortOrder}
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Sort, 
UnaryNode}
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.command.DataWritingCommand
+import org.apache.spark.sql.hive.execution.InsertIntoHiveTable
 
-case class OptimizeZorderCommand(child: LogicalPlan) extends UnaryNode {
-  override def output: Seq[Attribute] = child.output
-}
+import org.apache.kyuubi.sql.KyuubiSQLExtensionException
+
+/**
+ * A runnable command for zorder, we delegate to real command to execute
+ */
+case class OptimizeZorderCommand(
+    catalogTable: CatalogTable,
+    query: LogicalPlan) extends DataWritingCommand {
+  override def outputColumnNames: Seq[String] = query.output.map(_.name)
 
-object OptimizeZorderCommand {
+  private def isHiveTable: Boolean = {
+    catalogTable.provider.isEmpty ||
+      (catalogTable.provider.isDefined && 
"hive".equalsIgnoreCase(catalogTable.provider.get))
+  }
+
+  private def getWritingCommand(session: SparkSession): DataWritingCommand = {
+    // TODO: Support convert hive relation to datasource relation, can see
+    //  [[org.apache.spark.sql.hive.RelationConversions]]
+    InsertIntoHiveTable(
+      catalogTable,
+      catalogTable.partitionColumnNames.map(p => (p, None)).toMap,
+      query,
+      overwrite = true,
+      ifPartitionNotExists = false,
+      outputColumnNames
+    )
+  }
 
-  def apply(tableIdent: Seq[String],
-            whereExp: Option[Expression],
-            sortArr: Seq[UnresolvedAttribute]): OptimizeZorderCommand = {
-    val table = UnresolvedRelation(tableIdent)
-    val child = whereExp match {
-      case Some(x) => Filter(x, table)
-      case None => table
+  override def run(session: SparkSession, child: SparkPlan): Seq[Row] = {
+    // TODO: Support datasource relation
+    // TODO: Support read and insert overwrite the same table for some table 
format
+    if (!isHiveTable) {
+      throw new KyuubiSQLExtensionException("only support hive table")
     }
 
-    val sortOrder = SortOrder(Zorder(sortArr), Ascending, NullsLast, Seq.empty)
-    val zorderSort = Sort(Seq(sortOrder), true, child)
-    OptimizeZorderCommand(zorderSort)
+    val command = getWritingCommand(session)
+    command.run(session, child)
+    DataWritingCommand.propogateMetrics(session.sparkContext, command, metrics)
+    Seq.empty
   }
 }
diff --git 
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderException.scala
 
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderStatement.scala
similarity index 66%
rename from 
dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderException.scala
rename to 
dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderStatement.scala
index e8eef35..07cb3ac 100644
--- 
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderException.scala
+++ 
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderStatement.scala
@@ -17,4 +17,15 @@
 
 package org.apache.kyuubi.sql.zorder
 
-class ZorderException(message: String, cause: Throwable = null) extends 
Exception(message, cause)
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode}
+
+/**
+ * A zorder statement that contains we parsed from SQL.
+ * We should convert this plan to certain command at Analyzer.
+ */
+case class OptimizeZorderStatement(
+    tableIdentifier: Seq[String],
+    child: LogicalPlan) extends UnaryNode {
+  override def output: Seq[Attribute] = child.output
+}
diff --git 
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ResolveZorder.scala
 
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ResolveZorder.scala
new file mode 100644
index 0000000..e0b14b4
--- /dev/null
+++ 
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ResolveZorder.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql.zorder
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.expressions.AttributeSet
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
SubqueryAlias}
+import org.apache.spark.sql.catalyst.rules.Rule
+
+import org.apache.kyuubi.sql.KyuubiSQLExtensionException
+
+/**
+ * Resolve `OptimizeZorderStatement` to `OptimizeZorderCommand`
+ */
+case class ResolveZorder(session: SparkSession) extends Rule[LogicalPlan] {
+  private def checkQueryAllowed(query: LogicalPlan): Unit = query foreach {
+    case Filter(condition, SubqueryAlias(_, tableRelation: HiveTableRelation)) 
=>
+      if (tableRelation.partitionCols.isEmpty) {
+        throw new KyuubiSQLExtensionException("Filters are only supported for 
partitioned table")
+      }
+
+      val partitionKeyIds = AttributeSet(tableRelation.partitionCols)
+      if (condition.references.isEmpty || 
!condition.references.subsetOf(partitionKeyIds)) {
+        throw new KyuubiSQLExtensionException("Only partition column filters 
are allowed")
+      }
+
+    case _ =>
+  }
+
+  private def getTableIdentifier(tableIdent: Seq[String]): TableIdentifier = 
tableIdent match {
+    case Seq(tbl) => TableIdentifier.apply(tbl)
+    case Seq(db, tbl) => TableIdentifier.apply(tbl, Some(db))
+    case _ => throw new KyuubiSQLExtensionException(
+      "only support session catalog table, please use db.table instead")
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan match {
+    case OptimizeZorderStatement(tableIdent, query) if query.resolved =>
+      checkQueryAllowed(query)
+      val tableIdentifier = getTableIdentifier(tableIdent)
+      val catalogTable = 
session.sessionState.catalog.getTableMetadata(tableIdentifier)
+      OptimizeZorderCommand(catalogTable, query)
+
+    case _ => plan
+  }
+}
diff --git 
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/Zorder.scala
 
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/Zorder.scala
index 3d78eb0..feb846a 100644
--- 
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/Zorder.scala
+++ 
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/Zorder.scala
@@ -22,6 +22,8 @@ import 
org.apache.spark.sql.catalyst.expressions.{BoundReference, Expression}
 import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
 import org.apache.spark.sql.types.{BinaryType, BooleanType, ByteType, 
DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, 
ShortType, StringType, TimestampType}
 
+import org.apache.kyuubi.sql.KyuubiSQLExtensionException
+
 case class Zorder(children: Seq[Expression]) extends Expression with 
CodegenFallback {
   private lazy val defaultNullValues = {
     children.map {
@@ -50,10 +52,10 @@ case class Zorder(children: Seq[Expression]) extends 
Expression with CodegenFall
           case d: DecimalType =>
             Long.MaxValue
           case other: Any =>
-            throw new ZorderException("Unsupported z-order type: " + 
other.getClass)
+            throw new KyuubiSQLExtensionException("Unsupported z-order type: " 
+ other.getClass)
         }
       case other: Any =>
-        throw new ZorderException("Unknown z-order column: " + other)
+        throw new KyuubiSQLExtensionException("Unknown z-order column: " + 
other)
     }
   }
 
diff --git 
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderBeforeWrite.scala
 
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderBeforeWrite.scala
deleted file mode 100644
index b059a56..0000000
--- 
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderBeforeWrite.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.zorder
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeSet
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, 
InsertIntoStatement, LogicalPlan, SubqueryAlias}
-import org.apache.spark.sql.catalyst.rules.Rule
-
-case class ZorderBeforeWrite(session: SparkSession) extends Rule[LogicalPlan] {
-  override def apply(plan: LogicalPlan): LogicalPlan = {
-    plan match {
-      case o @ OptimizeZorderCommand(child) =>
-        var relation: Option[HiveTableRelation] = None
-        var partitionSpec: Map[String, Option[String]] = Map.empty
-        val newChild = child.resolveOperatorsUp {
-          case f @ Filter(condition,
-            SubqueryAlias(_, tableRelation: HiveTableRelation)) if f.resolved 
=>
-            if (!tableRelation.isPartitioned) {
-              throw new ZorderException("Filters are only supported for 
partitioned table")
-            }
-
-            val partitionKeyIds = AttributeSet(tableRelation.partitionCols)
-            if (condition.references.isEmpty || 
!condition.references.subsetOf(partitionKeyIds)) {
-              throw new ZorderException("Only partition column filters are 
allowed")
-            }
-
-            val partitions = tableRelation.partitionCols.map { p =>
-              p.name -> None
-            }
-            partitionSpec = Map(partitions: _*)
-            f
-          case r: HiveTableRelation =>
-            relation = Option(r)
-            r
-        }
-
-        if (o.resolved) {
-          relation match {
-            case Some(table) =>
-              InsertIntoStatement(table, partitionSpec, Nil, newChild, true, 
false)
-            case None => plan
-          }
-        } else {
-          plan
-        }
-
-      case _ => plan
-    }
-  }
-}
diff --git 
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderBytesUtils.scala
 
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderBytesUtils.scala
index 64f40aa..36efdc7 100644
--- 
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderBytesUtils.scala
+++ 
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderBytesUtils.scala
@@ -22,6 +22,8 @@ import java.nio.charset.Charset
 
 import org.apache.spark.sql.types.Decimal
 
+import org.apache.kyuubi.sql.KyuubiSQLExtensionException
+
 object ZorderBytesUtils {
   def interleaveMultiByteArray(arrays: Array[Array[Byte]]): Array[Byte] = {
     var totalLength = 0
@@ -87,7 +89,7 @@ object ZorderBytesUtils {
       case dec: Decimal =>
         ZorderBytesUtils.longToByte(dec.toLong)
       case other: Any =>
-        throw new ZorderException("Unsupported z-order type: " + 
other.getClass)
+        throw new KyuubiSQLExtensionException("Unsupported z-order type: " + 
other.getClass)
     }
   }
 
diff --git 
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSqlAstBuilder.scala
 
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSqlAstBuilder.scala
index 022f330..179ccb3 100644
--- 
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSqlAstBuilder.scala
+++ 
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSqlAstBuilder.scala
@@ -26,11 +26,11 @@ import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 import org.antlr.v4.runtime.ParserRuleContext
 import org.antlr.v4.runtime.tree.{ParseTree, TerminalNode}
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
-import org.apache.spark.sql.catalyst.expressions.{And, EqualNullSafe, EqualTo, 
Expression, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, 
Literal, Not, Or}
+import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, 
UnresolvedRelation, UnresolvedStar}
+import org.apache.spark.sql.catalyst.expressions.{And, Ascending, 
EqualNullSafe, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, LessThan, 
LessThanOrEqual, Literal, Not, NullsLast, Or, SortOrder}
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.parser.ParserUtils.{string, 
stringWithoutUnescape, withOrigin}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project, Sort}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, 
stringToDate, stringToTimestamp}
 import org.apache.spark.sql.catalyst.util.IntervalUtils
 import org.apache.spark.sql.hive.HiveAnalysis.conf
@@ -49,24 +49,40 @@ class ZorderSqlAstBuilder extends 
ZorderSqlExtensionsBaseVisitor[AnyRef] {
 
   protected def multiPart(ctx: ParserRuleContext): Seq[String] = 
typedVisit(ctx)
 
-  protected def zorder(ctx: ParserRuleContext): Seq[UnresolvedAttribute] = 
typedVisit(ctx)
-
   override def visitSingleStatement(ctx: SingleStatementContext): LogicalPlan 
= {
     visit(ctx.statement()).asInstanceOf[LogicalPlan]
   }
 
   override def visitPassThrough(ctx: PassThroughContext): LogicalPlan = null
 
-  override def visitOptimizeZorder(ctx: OptimizeZorderContext):
-    OptimizeZorderCommand = withOrigin(ctx) {
-      val tableIdent = multiPart(ctx.multipartIdentifier())
-      val whereItem = if (ctx.whereClause() == null) {
-        None
-      } else {
-        Option(expression(ctx.whereClause().booleanExpression()))
-      }
+  override def visitOptimizeZorder(
+      ctx: OptimizeZorderContext): OptimizeZorderStatement = withOrigin(ctx) {
+    val tableIdent = multiPart(ctx.multipartIdentifier())
+    val table = UnresolvedRelation(tableIdent)
+
+    val whereClause = if (ctx.whereClause() == null) {
+      None
+    } else {
+      Option(expression(ctx.whereClause().booleanExpression()))
+    }
+
+    val tableWithFilter = whereClause match {
+      case Some(expr) => Filter(expr, table)
+      case None => table
+    }
+
+    val zorderCols = ctx.zorderClause().order.asScala
+      .map(visitMultipartIdentifier)
+      .map(UnresolvedAttribute(_))
+      .toSeq
+
+    val query =
+      Sort(
+        SortOrder(Zorder(zorderCols), Ascending, NullsLast, Seq.empty) :: Nil,
+        true,
+        Project(Seq(UnresolvedStar(None)), tableWithFilter))
 
-      OptimizeZorderCommand(tableIdent, whereItem, zorder(ctx.zorderClause()))
+    OptimizeZorderStatement(tableIdent, query)
   }
 
   override def visitQuery(ctx: QueryContext): Expression = withOrigin(ctx) {
diff --git 
a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala
 
b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala
index cbbd43e..2f64587 100644
--- 
a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala
+++ 
b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.test.SQLTestData.TestData
 import org.apache.spark.sql.test.SQLTestUtils
 
 import org.apache.kyuubi.sql.{FinalStageConfigIsolation, KyuubiSQLConf}
-import org.apache.kyuubi.sql.zorder.ZorderException
+import org.apache.kyuubi.sql.KyuubiSQLExtensionException
 
 class KyuubiExtensionSuite extends QueryTest with SQLTestUtils with 
AdaptiveSparkPlanHelper {
 
@@ -908,7 +908,9 @@ class KyuubiExtensionSuite extends QueryTest with 
SQLTestUtils with AdaptiveSpar
           "(2,0,2),(2,1,1),(2,2,5),(2,3,5)," +
           "(3,0,3),(3,1,4),(3,2,9),(3,3,0)")
 
-        val e = intercept[ZorderException](sql("OPTIMIZE up WHERE c1 > 1 
ZORDER BY c1, c2"))
+        val e = intercept[KyuubiSQLExtensionException] {
+          sql("OPTIMIZE up WHERE c1 > 1 ZORDER BY c1, c2")
+        }
         assert(e.getMessage == "Filters are only supported for partitioned 
table")
 
         sql("OPTIMIZE up ZORDER BY c1, c2")
@@ -999,7 +1001,7 @@ class KyuubiExtensionSuite extends QueryTest with 
SQLTestUtils with AdaptiveSpar
           "(2,0,2),(2,1,1),(2,2,5),(2,3,5)," +
           "(3,0,3),(3,1,4),(3,2,9),(3,3,0)")
 
-        val e = intercept[ZorderException](
+        val e = intercept[KyuubiSQLExtensionException](
           sql(s"OPTIMIZE p WHERE id = 1 AND c1 > 1 ZORDER BY c1, c2")
         )
         assert(e.getMessage == "Only partition column filters are allowed")
@@ -1026,4 +1028,15 @@ class KyuubiExtensionSuite extends QueryTest with 
SQLTestUtils with AdaptiveSpar
       }
     }
   }
+
+  test("optimize zorder with datasource table") {
+    // TODO remove this if we support datasource table
+    withTable("t") {
+      sql("CREATE TABLE t (c1 int, c2 int) USING PARQUET")
+      val msg = intercept[KyuubiSQLExtensionException] {
+        sql("OPTIMIZE t ZORDER BY c1, c2")
+      }.getMessage
+      assert(msg.contains("only support hive table"))
+    }
+  }
 }

Reply via email to