This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new ba2880c [KYUUBI #1055] Refine zorder plan
ba2880c is described below
commit ba2880cd666809c6bba15386733252a25c743938
Author: ulysses-you <[email protected]>
AuthorDate: Thu Sep 9 10:15:49 2021 +0800
[KYUUBI #1055] Refine zorder plan
<!--
Thanks for sending a pull request!
Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
2. If the PR is related to an issue in
https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your
PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
3. If the PR is unfinished, add '[WIP]' in your PR title, e.g.,
'[WIP][KYUUBI #XXXX] Your PR title ...'.
-->
### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
1. If you add a feature, you can talk about the use case of it.
2. If you fix a bug, you can clarify why it is a bug.
-->
- Change origin `OptimizeZorderCommand` to `OptimizeZorderStatement` which
holds the parsed plan
- Add `OptimizeZorderCommand` which delegate an another command(e.g.
`InsertIntoHive`) to execute; the reason we should add a new command (not reuse
`InserIntoStatement`) is we may need to implement read and overwrite same table
in future. And also it's tricky to add `InserIntoStatement` during analyze.
- Change `ZorderBeforeWrite` to `ResolveZorder`, so we can resolve
`OptimizeZorderStatement` to `OptimizeZorderCommand`
So the code should look cleaner
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #1055 from ulysses-you/zorder-insert.
Closes #1055
d252d35e [ulysses-you] simplify
223f68ed [ulysses-you] refine zorder plan
Authored-by: ulysses-you <[email protected]>
Signed-off-by: ulysses-you <[email protected]>
---
...ion.scala => KyuubiSQLExtensionException.scala} | 12 +++-
.../kyuubi/sql/KyuubiSparkSQLExtension.scala | 4 +-
.../kyuubi/sql/zorder/OptimizeZorderCommand.scala | 58 +++++++++++++------
...ception.scala => OptimizeZorderStatement.scala} | 13 ++++-
.../apache/kyuubi/sql/zorder/ResolveZorder.scala | 63 ++++++++++++++++++++
.../org/apache/kyuubi/sql/zorder/Zorder.scala | 6 +-
.../kyuubi/sql/zorder/ZorderBeforeWrite.scala | 67 ----------------------
.../kyuubi/sql/zorder/ZorderBytesUtils.scala | 4 +-
.../kyuubi/sql/zorder/ZorderSqlAstBuilder.scala | 44 +++++++++-----
.../apache/spark/sql/KyuubiExtensionSuite.scala | 19 +++++-
10 files changed, 181 insertions(+), 109 deletions(-)
diff --git
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderException.scala
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLExtensionException.scala
similarity index 77%
copy from
dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderException.scala
copy to
dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLExtensionException.scala
index e8eef35..88c5a98 100644
---
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderException.scala
+++
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLExtensionException.scala
@@ -15,6 +15,14 @@
* limitations under the License.
*/
-package org.apache.kyuubi.sql.zorder
+package org.apache.kyuubi.sql
-class ZorderException(message: String, cause: Throwable = null) extends
Exception(message, cause)
+import java.sql.SQLException
+
+class KyuubiSQLExtensionException(reason: String, cause: Throwable)
+ extends SQLException(reason, cause) {
+
+ def this(reason: String) = {
+ this(reason, null)
+ }
+}
diff --git
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
index 550fc27..34015f5 100644
---
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
+++
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
@@ -20,7 +20,7 @@ package org.apache.kyuubi.sql
import org.apache.spark.sql.SparkSessionExtensions
import org.apache.kyuubi.sql.sqlclassification.KyuubiSqlClassification
-import org.apache.kyuubi.sql.zorder.ZorderBeforeWrite
+import org.apache.kyuubi.sql.zorder.ResolveZorder
import org.apache.kyuubi.sql.zorder.ZorderSparkSqlExtensionsParser
// scalastyle:off line.size.limit
@@ -34,7 +34,7 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions
=> Unit) {
override def apply(extensions: SparkSessionExtensions): Unit = {
// inject zorder parser and related rules
extensions.injectParser{ case (_, parser) => new
ZorderSparkSqlExtensionsParser(parser) }
- extensions.injectResolutionRule(ZorderBeforeWrite)
+ extensions.injectResolutionRule(ResolveZorder)
extensions.injectPostHocResolutionRule(KyuubiSqlClassification)
extensions.injectPostHocResolutionRule(RepartitionBeforeWrite)
diff --git
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderCommand.scala
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderCommand.scala
index 3be3bdb..763a8ab 100644
---
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderCommand.scala
+++
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderCommand.scala
@@ -17,27 +17,51 @@
package org.apache.kyuubi.sql.zorder
-import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute,
UnresolvedRelation}
-import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute,
Expression, NullsLast, SortOrder}
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Sort,
UnaryNode}
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.command.DataWritingCommand
+import org.apache.spark.sql.hive.execution.InsertIntoHiveTable
-case class OptimizeZorderCommand(child: LogicalPlan) extends UnaryNode {
- override def output: Seq[Attribute] = child.output
-}
+import org.apache.kyuubi.sql.KyuubiSQLExtensionException
+
+/**
+ * A runnable command for zorder, we delegate to real command to execute
+ */
+case class OptimizeZorderCommand(
+ catalogTable: CatalogTable,
+ query: LogicalPlan) extends DataWritingCommand {
+ override def outputColumnNames: Seq[String] = query.output.map(_.name)
-object OptimizeZorderCommand {
+ private def isHiveTable: Boolean = {
+ catalogTable.provider.isEmpty ||
+ (catalogTable.provider.isDefined &&
"hive".equalsIgnoreCase(catalogTable.provider.get))
+ }
+
+ private def getWritingCommand(session: SparkSession): DataWritingCommand = {
+ // TODO: Support convert hive relation to datasource relation, can see
+ // [[org.apache.spark.sql.hive.RelationConversions]]
+ InsertIntoHiveTable(
+ catalogTable,
+ catalogTable.partitionColumnNames.map(p => (p, None)).toMap,
+ query,
+ overwrite = true,
+ ifPartitionNotExists = false,
+ outputColumnNames
+ )
+ }
- def apply(tableIdent: Seq[String],
- whereExp: Option[Expression],
- sortArr: Seq[UnresolvedAttribute]): OptimizeZorderCommand = {
- val table = UnresolvedRelation(tableIdent)
- val child = whereExp match {
- case Some(x) => Filter(x, table)
- case None => table
+ override def run(session: SparkSession, child: SparkPlan): Seq[Row] = {
+ // TODO: Support datasource relation
+ // TODO: Support read and insert overwrite the same table for some table
format
+ if (!isHiveTable) {
+ throw new KyuubiSQLExtensionException("only support hive table")
}
- val sortOrder = SortOrder(Zorder(sortArr), Ascending, NullsLast, Seq.empty)
- val zorderSort = Sort(Seq(sortOrder), true, child)
- OptimizeZorderCommand(zorderSort)
+ val command = getWritingCommand(session)
+ command.run(session, child)
+ DataWritingCommand.propogateMetrics(session.sparkContext, command, metrics)
+ Seq.empty
}
}
diff --git
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderException.scala
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderStatement.scala
similarity index 66%
rename from
dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderException.scala
rename to
dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderStatement.scala
index e8eef35..07cb3ac 100644
---
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderException.scala
+++
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderStatement.scala
@@ -17,4 +17,15 @@
package org.apache.kyuubi.sql.zorder
-class ZorderException(message: String, cause: Throwable = null) extends
Exception(message, cause)
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode}
+
+/**
+ * A zorder statement that contains we parsed from SQL.
+ * We should convert this plan to certain command at Analyzer.
+ */
+case class OptimizeZorderStatement(
+ tableIdentifier: Seq[String],
+ child: LogicalPlan) extends UnaryNode {
+ override def output: Seq[Attribute] = child.output
+}
diff --git
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ResolveZorder.scala
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ResolveZorder.scala
new file mode 100644
index 0000000..e0b14b4
--- /dev/null
+++
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ResolveZorder.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql.zorder
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.expressions.AttributeSet
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan,
SubqueryAlias}
+import org.apache.spark.sql.catalyst.rules.Rule
+
+import org.apache.kyuubi.sql.KyuubiSQLExtensionException
+
+/**
+ * Resolve `OptimizeZorderStatement` to `OptimizeZorderCommand`
+ */
+case class ResolveZorder(session: SparkSession) extends Rule[LogicalPlan] {
+ private def checkQueryAllowed(query: LogicalPlan): Unit = query foreach {
+ case Filter(condition, SubqueryAlias(_, tableRelation: HiveTableRelation))
=>
+ if (tableRelation.partitionCols.isEmpty) {
+ throw new KyuubiSQLExtensionException("Filters are only supported for
partitioned table")
+ }
+
+ val partitionKeyIds = AttributeSet(tableRelation.partitionCols)
+ if (condition.references.isEmpty ||
!condition.references.subsetOf(partitionKeyIds)) {
+ throw new KyuubiSQLExtensionException("Only partition column filters
are allowed")
+ }
+
+ case _ =>
+ }
+
+ private def getTableIdentifier(tableIdent: Seq[String]): TableIdentifier =
tableIdent match {
+ case Seq(tbl) => TableIdentifier.apply(tbl)
+ case Seq(db, tbl) => TableIdentifier.apply(tbl, Some(db))
+ case _ => throw new KyuubiSQLExtensionException(
+ "only support session catalog table, please use db.table instead")
+ }
+
+ override def apply(plan: LogicalPlan): LogicalPlan = plan match {
+ case OptimizeZorderStatement(tableIdent, query) if query.resolved =>
+ checkQueryAllowed(query)
+ val tableIdentifier = getTableIdentifier(tableIdent)
+ val catalogTable =
session.sessionState.catalog.getTableMetadata(tableIdentifier)
+ OptimizeZorderCommand(catalogTable, query)
+
+ case _ => plan
+ }
+}
diff --git
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/Zorder.scala
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/Zorder.scala
index 3d78eb0..feb846a 100644
---
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/Zorder.scala
+++
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/Zorder.scala
@@ -22,6 +22,8 @@ import
org.apache.spark.sql.catalyst.expressions.{BoundReference, Expression}
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.types.{BinaryType, BooleanType, ByteType,
DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType,
ShortType, StringType, TimestampType}
+import org.apache.kyuubi.sql.KyuubiSQLExtensionException
+
case class Zorder(children: Seq[Expression]) extends Expression with
CodegenFallback {
private lazy val defaultNullValues = {
children.map {
@@ -50,10 +52,10 @@ case class Zorder(children: Seq[Expression]) extends
Expression with CodegenFall
case d: DecimalType =>
Long.MaxValue
case other: Any =>
- throw new ZorderException("Unsupported z-order type: " +
other.getClass)
+ throw new KyuubiSQLExtensionException("Unsupported z-order type: "
+ other.getClass)
}
case other: Any =>
- throw new ZorderException("Unknown z-order column: " + other)
+ throw new KyuubiSQLExtensionException("Unknown z-order column: " +
other)
}
}
diff --git
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderBeforeWrite.scala
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderBeforeWrite.scala
deleted file mode 100644
index b059a56..0000000
---
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderBeforeWrite.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.zorder
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeSet
-import org.apache.spark.sql.catalyst.plans.logical.{Filter,
InsertIntoStatement, LogicalPlan, SubqueryAlias}
-import org.apache.spark.sql.catalyst.rules.Rule
-
-case class ZorderBeforeWrite(session: SparkSession) extends Rule[LogicalPlan] {
- override def apply(plan: LogicalPlan): LogicalPlan = {
- plan match {
- case o @ OptimizeZorderCommand(child) =>
- var relation: Option[HiveTableRelation] = None
- var partitionSpec: Map[String, Option[String]] = Map.empty
- val newChild = child.resolveOperatorsUp {
- case f @ Filter(condition,
- SubqueryAlias(_, tableRelation: HiveTableRelation)) if f.resolved
=>
- if (!tableRelation.isPartitioned) {
- throw new ZorderException("Filters are only supported for
partitioned table")
- }
-
- val partitionKeyIds = AttributeSet(tableRelation.partitionCols)
- if (condition.references.isEmpty ||
!condition.references.subsetOf(partitionKeyIds)) {
- throw new ZorderException("Only partition column filters are
allowed")
- }
-
- val partitions = tableRelation.partitionCols.map { p =>
- p.name -> None
- }
- partitionSpec = Map(partitions: _*)
- f
- case r: HiveTableRelation =>
- relation = Option(r)
- r
- }
-
- if (o.resolved) {
- relation match {
- case Some(table) =>
- InsertIntoStatement(table, partitionSpec, Nil, newChild, true,
false)
- case None => plan
- }
- } else {
- plan
- }
-
- case _ => plan
- }
- }
-}
diff --git
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderBytesUtils.scala
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderBytesUtils.scala
index 64f40aa..36efdc7 100644
---
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderBytesUtils.scala
+++
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderBytesUtils.scala
@@ -22,6 +22,8 @@ import java.nio.charset.Charset
import org.apache.spark.sql.types.Decimal
+import org.apache.kyuubi.sql.KyuubiSQLExtensionException
+
object ZorderBytesUtils {
def interleaveMultiByteArray(arrays: Array[Array[Byte]]): Array[Byte] = {
var totalLength = 0
@@ -87,7 +89,7 @@ object ZorderBytesUtils {
case dec: Decimal =>
ZorderBytesUtils.longToByte(dec.toLong)
case other: Any =>
- throw new ZorderException("Unsupported z-order type: " +
other.getClass)
+ throw new KyuubiSQLExtensionException("Unsupported z-order type: " +
other.getClass)
}
}
diff --git
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSqlAstBuilder.scala
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSqlAstBuilder.scala
index 022f330..179ccb3 100644
---
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSqlAstBuilder.scala
+++
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSqlAstBuilder.scala
@@ -26,11 +26,11 @@ import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import org.antlr.v4.runtime.ParserRuleContext
import org.antlr.v4.runtime.tree.{ParseTree, TerminalNode}
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
-import org.apache.spark.sql.catalyst.expressions.{And, EqualNullSafe, EqualTo,
Expression, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual,
Literal, Not, Or}
+import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute,
UnresolvedRelation, UnresolvedStar}
+import org.apache.spark.sql.catalyst.expressions.{And, Ascending,
EqualNullSafe, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, LessThan,
LessThanOrEqual, Literal, Not, NullsLast, Or, SortOrder}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.parser.ParserUtils.{string,
stringWithoutUnescape, withOrigin}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan,
Project, Sort}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId,
stringToDate, stringToTimestamp}
import org.apache.spark.sql.catalyst.util.IntervalUtils
import org.apache.spark.sql.hive.HiveAnalysis.conf
@@ -49,24 +49,40 @@ class ZorderSqlAstBuilder extends
ZorderSqlExtensionsBaseVisitor[AnyRef] {
protected def multiPart(ctx: ParserRuleContext): Seq[String] =
typedVisit(ctx)
- protected def zorder(ctx: ParserRuleContext): Seq[UnresolvedAttribute] =
typedVisit(ctx)
-
override def visitSingleStatement(ctx: SingleStatementContext): LogicalPlan
= {
visit(ctx.statement()).asInstanceOf[LogicalPlan]
}
override def visitPassThrough(ctx: PassThroughContext): LogicalPlan = null
- override def visitOptimizeZorder(ctx: OptimizeZorderContext):
- OptimizeZorderCommand = withOrigin(ctx) {
- val tableIdent = multiPart(ctx.multipartIdentifier())
- val whereItem = if (ctx.whereClause() == null) {
- None
- } else {
- Option(expression(ctx.whereClause().booleanExpression()))
- }
+ override def visitOptimizeZorder(
+ ctx: OptimizeZorderContext): OptimizeZorderStatement = withOrigin(ctx) {
+ val tableIdent = multiPart(ctx.multipartIdentifier())
+ val table = UnresolvedRelation(tableIdent)
+
+ val whereClause = if (ctx.whereClause() == null) {
+ None
+ } else {
+ Option(expression(ctx.whereClause().booleanExpression()))
+ }
+
+ val tableWithFilter = whereClause match {
+ case Some(expr) => Filter(expr, table)
+ case None => table
+ }
+
+ val zorderCols = ctx.zorderClause().order.asScala
+ .map(visitMultipartIdentifier)
+ .map(UnresolvedAttribute(_))
+ .toSeq
+
+ val query =
+ Sort(
+ SortOrder(Zorder(zorderCols), Ascending, NullsLast, Seq.empty) :: Nil,
+ true,
+ Project(Seq(UnresolvedStar(None)), tableWithFilter))
- OptimizeZorderCommand(tableIdent, whereItem, zorder(ctx.zorderClause()))
+ OptimizeZorderStatement(tableIdent, query)
}
override def visitQuery(ctx: QueryContext): Expression = withOrigin(ctx) {
diff --git
a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala
b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala
index cbbd43e..2f64587 100644
---
a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala
+++
b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.test.SQLTestData.TestData
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.kyuubi.sql.{FinalStageConfigIsolation, KyuubiSQLConf}
-import org.apache.kyuubi.sql.zorder.ZorderException
+import org.apache.kyuubi.sql.KyuubiSQLExtensionException
class KyuubiExtensionSuite extends QueryTest with SQLTestUtils with
AdaptiveSparkPlanHelper {
@@ -908,7 +908,9 @@ class KyuubiExtensionSuite extends QueryTest with
SQLTestUtils with AdaptiveSpar
"(2,0,2),(2,1,1),(2,2,5),(2,3,5)," +
"(3,0,3),(3,1,4),(3,2,9),(3,3,0)")
- val e = intercept[ZorderException](sql("OPTIMIZE up WHERE c1 > 1
ZORDER BY c1, c2"))
+ val e = intercept[KyuubiSQLExtensionException] {
+ sql("OPTIMIZE up WHERE c1 > 1 ZORDER BY c1, c2")
+ }
assert(e.getMessage == "Filters are only supported for partitioned
table")
sql("OPTIMIZE up ZORDER BY c1, c2")
@@ -999,7 +1001,7 @@ class KyuubiExtensionSuite extends QueryTest with
SQLTestUtils with AdaptiveSpar
"(2,0,2),(2,1,1),(2,2,5),(2,3,5)," +
"(3,0,3),(3,1,4),(3,2,9),(3,3,0)")
- val e = intercept[ZorderException](
+ val e = intercept[KyuubiSQLExtensionException](
sql(s"OPTIMIZE p WHERE id = 1 AND c1 > 1 ZORDER BY c1, c2")
)
assert(e.getMessage == "Only partition column filters are allowed")
@@ -1026,4 +1028,15 @@ class KyuubiExtensionSuite extends QueryTest with
SQLTestUtils with AdaptiveSpar
}
}
}
+
+ test("optimize zorder with datasource table") {
+ // TODO remove this if we support datasource table
+ withTable("t") {
+ sql("CREATE TABLE t (c1 int, c2 int) USING PARQUET")
+ val msg = intercept[KyuubiSQLExtensionException] {
+ sql("OPTIMIZE t ZORDER BY c1, c2")
+ }.getMessage
+ assert(msg.contains("only support hive table"))
+ }
+ }
}