This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 4f723fb57ec [HUDI-6963] Fix class conflict of CreateIndex from
Spark3.3 (#9895)
4f723fb57ec is described below
commit 4f723fb57ec3d859893d8eee80e1d2b6ceb05d18
Author: Rex(Hui) An <[email protected]>
AuthorDate: Sat Oct 28 02:14:57 2023 +0800
[HUDI-6963] Fix class conflict of CreateIndex from Spark3.3 (#9895)
CreateIndex is added in
[HUDI-4165](https://github.com/apache/hudi/pull/5761/files), and spark 3.3 also
include this in [SPARK-36895](https://github.com/apache/spark/pull/34148).
Since `CreateIndex` uses same package
`org.apache.spark.sql.catalyst.plans.logical` in HUDI and Spark3.3, but params
are not same. So it could introduce class conflict issues if we use it.
This commit still keeps the same package path with Spark, but changes to
1. Use the same params like Spark, so there should be no class conflict
2. Only support Index related commands from **Spark3.2**, since Spark2
doesn't have `org.apache.spark.sql.catalyst.analysis.FieldName` but
`CreateIndex` requires
3. Resolve columns for CreateIndex during Analyze stage
---
.../spark/sql/HoodieCatalystPlansUtils.scala | 20 ++-
.../hudi/spark/sql/parser/HoodieSqlCommon.g4 | 68 ----------
.../spark/sql/hudi/analysis/HoodieAnalysis.scala | 37 ++++--
.../spark/sql/hudi/command/IndexCommands.scala | 26 +---
.../sql/parser/HoodieSqlCommonAstBuilder.scala | 142 +--------------------
.../sql/hudi/command/index/TestIndexSyntax.scala | 97 +++++++-------
.../hudi/command/index/TestSecondaryIndex.scala | 108 ++++++++--------
.../spark/sql/HoodieSpark2CatalystPlanUtils.scala | 13 ++
.../spark/sql/HoodieSpark30CatalystPlanUtils.scala | 13 +-
.../spark/sql/HoodieSpark31CatalystPlanUtils.scala | 13 +-
.../src/main/antlr4/imports/SqlBase.g4 | 32 +++++
.../apache/hudi/spark/sql/parser/HoodieSqlBase.g4 | 7 +
.../spark/sql/HoodieSpark32CatalystPlanUtils.scala | 38 +++++-
.../HoodieSpark3_2ExtendedSqlAstBuilder.scala | 139 ++++++++++++++++++++
.../parser/HoodieSpark3_2ExtendedSqlParser.scala | 6 +-
.../spark/sql/catalyst/plans/logical/Index.scala | 46 ++-----
.../hudi/analysis/HoodieSpark32PlusAnalysis.scala | 38 +++++-
.../src/main/antlr4/imports/SqlBase.g4 | 32 +++++
.../apache/hudi/spark/sql/parser/HoodieSqlBase.g4 | 7 +
.../spark/sql/HoodieSpark33CatalystPlanUtils.scala | 38 +++++-
.../HoodieSpark3_3ExtendedSqlAstBuilder.scala | 139 ++++++++++++++++++++
.../parser/HoodieSpark3_3ExtendedSqlParser.scala | 6 +-
.../src/main/antlr4/imports/SqlBase.g4 | 32 +++++
.../apache/hudi/spark/sql/parser/HoodieSqlBase.g4 | 7 +
.../spark/sql/HoodieSpark34CatalystPlanUtils.scala | 38 +++++-
.../HoodieSpark3_4ExtendedSqlAstBuilder.scala | 139 ++++++++++++++++++++
.../parser/HoodieSpark3_4ExtendedSqlParser.scala | 6 +-
27 files changed, 909 insertions(+), 378 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
index 9cfe23f86cc..64ee645ba0f 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
@@ -22,7 +22,6 @@ import
org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
-import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.internal.SQLConf
trait HoodieCatalystPlansUtils {
@@ -79,6 +78,25 @@ trait HoodieCatalystPlansUtils {
*/
def unapplyMergeIntoTable(plan: LogicalPlan): Option[(LogicalPlan,
LogicalPlan, Expression)]
+ /**
+ * Decomposes [[MatchCreateIndex]] into its arguments with accommodation.
+ */
+ def unapplyCreateIndex(plan: LogicalPlan): Option[(LogicalPlan, String,
String, Boolean, Seq[(Seq[String], Map[String, String])], Map[String, String])]
+
+ /**
+ * Decomposes [[MatchDropIndex]] into its arguments with accommodation.
+ */
+ def unapplyDropIndex(plan: LogicalPlan): Option[(LogicalPlan, String,
Boolean)]
+
+ /**
+ * Decomposes [[MatchShowIndexes]] into its arguments with accommodation.
+ */
+ def unapplyShowIndexes(plan: LogicalPlan): Option[(LogicalPlan,
Seq[Attribute])]
+
+ /**
+ * Decomposes [[MatchRefreshIndex]] into its arguments with accommodation.
+ */
+ def unapplyRefreshIndex(plan: LogicalPlan): Option[(LogicalPlan, String)]
/**
* Spark requires file formats to append the partition path fields to the
end of the schema.
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlCommon.g4
b/hudi-spark-datasource/hudi-spark/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlCommon.g4
index 8a3106f7a56..a98ad979140 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlCommon.g4
+++
b/hudi-spark-datasource/hudi-spark/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlCommon.g4
@@ -48,13 +48,6 @@
statement
: compactionStatement
#compactionCommand
| CALL multipartIdentifier callArgumentList? #call
- | CREATE INDEX (IF NOT EXISTS)? identifier ON TABLE?
- tableIdentifier (USING indexType=identifier)?
- LEFT_PAREN columns=multipartIdentifierPropertyList RIGHT_PAREN
- (OPTIONS indexOptions=propertyList)?
#createIndex
- | DROP INDEX (IF EXISTS)? identifier ON TABLE? tableIdentifier
#dropIndex
- | SHOW INDEXES (FROM | IN) TABLE? tableIdentifier
#showIndexes
- | REFRESH INDEX identifier ON TABLE? tableIdentifier
#refreshIndex
| .*?
#passThrough
;
@@ -110,14 +103,6 @@
| MINUS? BIGDECIMAL_LITERAL #bigDecimalLiteral
;
- multipartIdentifierPropertyList
- : multipartIdentifierProperty (COMMA multipartIdentifierProperty)*
- ;
-
- multipartIdentifierProperty
- : multipartIdentifier (OPTIONS options=propertyList)?
- ;
-
multipartIdentifier
: parts+=identifier ('.' parts+=identifier)*
;
@@ -135,51 +120,13 @@
nonReserved
: CALL
| COMPACTION
- | CREATE
- | DROP
- | EXISTS
- | FROM
- | IN
- | INDEX
- | INDEXES
- | IF
| LIMIT
- | NOT
| ON
- | OPTIONS
- | REFRESH
| RUN
| SCHEDULE
| SHOW
- | TABLE
- | USING
;
- propertyList
- : LEFT_PAREN property (COMMA property)* RIGHT_PAREN
- ;
-
- property
- : key=propertyKey (EQ? value=propertyValue)?
- ;
-
- propertyKey
- : identifier (DOT identifier)*
- | STRING
- ;
-
- propertyValue
- : INTEGER_VALUE
- | DECIMAL_VALUE
- | booleanValue
- | STRING
- ;
-
- LEFT_PAREN: '(';
- RIGHT_PAREN: ')';
- COMMA: ',';
- DOT: '.';
-
ALL: 'ALL';
AT: 'AT';
CALL: 'CALL';
@@ -195,21 +142,6 @@
FALSE: 'FALSE';
INTERVAL: 'INTERVAL';
TO: 'TO';
- CREATE: 'CREATE';
- INDEX: 'INDEX';
- INDEXES: 'INDEXES';
- IF: 'IF';
- NOT: 'NOT';
- EXISTS: 'EXISTS';
- TABLE: 'TABLE';
- USING: 'USING';
- OPTIONS: 'OPTIONS';
- DROP: 'DROP';
- FROM: 'FROM';
- IN: 'IN';
- REFRESH: 'REFRESH';
-
- EQ: '=' | '==';
PLUS: '+';
MINUS: '-';
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
index eb7f00ef26b..051c38d9567 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.{CreateTable,
LogicalRelation}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{isMetaField,
removeMetaFields}
-import
org.apache.spark.sql.hudi.analysis.HoodieAnalysis.{MatchCreateTableLike,
MatchInsertIntoStatement, MatchMergeIntoTable, ResolvesToHudiTable,
sparkAdapter}
+import org.apache.spark.sql.hudi.analysis.HoodieAnalysis.{MatchCreateIndex,
MatchCreateTableLike, MatchDropIndex, MatchInsertIntoStatement,
MatchMergeIntoTable, MatchRefreshIndex, MatchShowIndexes, ResolvesToHudiTable,
sparkAdapter}
import org.apache.spark.sql.hudi.command._
import org.apache.spark.sql.hudi.command.procedures.{HoodieProcedures,
Procedure, ProcedureArgs}
import org.apache.spark.sql.{AnalysisException, SparkSession}
@@ -354,6 +354,26 @@ object HoodieAnalysis extends SparkAdapterSupport {
sparkAdapter.getCatalystPlanUtils.unapplyCreateTableLikeCommand(plan)
}
+ private[sql] object MatchCreateIndex {
+ def unapply(plan: LogicalPlan): Option[(LogicalPlan, String, String,
Boolean, Seq[(Seq[String], Map[String, String])], Map[String, String])] =
+ sparkAdapter.getCatalystPlanUtils.unapplyCreateIndex(plan)
+ }
+
+ private[sql] object MatchDropIndex {
+ def unapply(plan: LogicalPlan): Option[(LogicalPlan, String, Boolean)] =
+ sparkAdapter.getCatalystPlanUtils.unapplyDropIndex(plan)
+ }
+
+ private[sql] object MatchShowIndexes {
+ def unapply(plan: LogicalPlan): Option[(LogicalPlan, Seq[Attribute])] =
+ sparkAdapter.getCatalystPlanUtils.unapplyShowIndexes(plan)
+ }
+
+ private[sql] object MatchRefreshIndex {
+ def unapply(plan: LogicalPlan): Option[(LogicalPlan, String)] =
+ sparkAdapter.getCatalystPlanUtils.unapplyRefreshIndex(plan)
+ }
+
private[sql] def failAnalysis(msg: String): Nothing = {
throw new AnalysisException(msg)
}
@@ -442,21 +462,20 @@ case class ResolveImplementations() extends
Rule[LogicalPlan] {
}
// Convert to CreateIndexCommand
- case ci @ CreateIndex(plan @ ResolvesToHudiTable(table), indexName,
indexType, ignoreIfExists, columns, options, output) =>
- // TODO need to resolve columns
- CreateIndexCommand(table, indexName, indexType, ignoreIfExists,
columns, options, output)
+ case ci @ MatchCreateIndex(plan @ ResolvesToHudiTable(table),
indexName, indexType, ignoreIfExists, columns, options) if ci.resolved =>
+ CreateIndexCommand(table, indexName, indexType, ignoreIfExists,
columns, options)
// Convert to DropIndexCommand
- case di @ DropIndex(plan @ ResolvesToHudiTable(table), indexName,
ignoreIfNotExists, output) if di.resolved =>
- DropIndexCommand(table, indexName, ignoreIfNotExists, output)
+ case di @ MatchDropIndex(plan @ ResolvesToHudiTable(table), indexName,
ignoreIfNotExists) if di.resolved =>
+ DropIndexCommand(table, indexName, ignoreIfNotExists)
// Convert to ShowIndexesCommand
- case si @ ShowIndexes(plan @ ResolvesToHudiTable(table), output) if
si.resolved =>
+ case si @ MatchShowIndexes(plan @ ResolvesToHudiTable(table), output)
if si.resolved =>
ShowIndexesCommand(table, output)
// Covert to RefreshCommand
- case ri @ RefreshIndex(plan @ ResolvesToHudiTable(table), indexName,
output) if ri.resolved =>
- RefreshIndexCommand(table, indexName, output)
+ case ri @ MatchRefreshIndex(plan @ ResolvesToHudiTable(table),
indexName) if ri.resolved =>
+ RefreshIndexCommand(table, indexName)
case _ => plan
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala
index 8ac0831a22f..da7f99fa41f 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala
@@ -19,10 +19,9 @@
package org.apache.spark.sql.hudi.command
-import com.fasterxml.jackson.annotation.{JsonAutoDetect, PropertyAccessor}
-import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.util.JsonUtils
import org.apache.hudi.secondary.index.SecondaryIndexManager
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.catalog.CatalogTable
@@ -32,23 +31,21 @@ import
org.apache.spark.sql.hudi.HoodieSqlCommonUtils.getTableLocation
import org.apache.spark.sql.{Row, SparkSession}
import java.util
-
import scala.collection.JavaConverters.{collectionAsScalaIterableConverter,
mapAsJavaMapConverter}
case class CreateIndexCommand(table: CatalogTable,
indexName: String,
indexType: String,
ignoreIfExists: Boolean,
- columns: Seq[(Attribute, Map[String, String])],
- options: Map[String, String],
- override val output: Seq[Attribute]) extends
IndexBaseCommand {
+ columns: Seq[(Seq[String], Map[String, String])],
+ options: Map[String, String]) extends
IndexBaseCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val tableId = table.identifier
val metaClient = createHoodieTableMetaClient(tableId, sparkSession)
val columnsMap: java.util.LinkedHashMap[String, java.util.Map[String,
String]] =
new util.LinkedHashMap[String, java.util.Map[String, String]]()
- columns.map(c => columnsMap.put(c._1.name, c._2.asJava))
+ columns.map(c => columnsMap.put(c._1.mkString("."), c._2.asJava))
SecondaryIndexManager.getInstance().create(
metaClient, indexName, indexType, ignoreIfExists, columnsMap,
options.asJava)
@@ -65,8 +62,7 @@ case class CreateIndexCommand(table: CatalogTable,
case class DropIndexCommand(table: CatalogTable,
indexName: String,
- ignoreIfNotExists: Boolean,
- override val output: Seq[Attribute]) extends
IndexBaseCommand {
+ ignoreIfNotExists: Boolean) extends
IndexBaseCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val tableId = table.identifier
@@ -90,7 +86,7 @@ case class ShowIndexesCommand(table: CatalogTable,
val metaClient = createHoodieTableMetaClient(table.identifier,
sparkSession)
val secondaryIndexes = SecondaryIndexManager.getInstance().show(metaClient)
- val mapper = getObjectMapper
+ val mapper = JsonUtils.getObjectMapper
toScalaOption(secondaryIndexes).map(x =>
x.asScala.map(i => {
val colOptions =
@@ -100,18 +96,10 @@ case class ShowIndexesCommand(table: CatalogTable,
i.getIndexType.name().toLowerCase, colOptions, options)
}).toSeq).getOrElse(Seq.empty[Row])
}
-
- protected def getObjectMapper: ObjectMapper = {
- val mapper = new ObjectMapper
- mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
- mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY)
- mapper
- }
}
case class RefreshIndexCommand(table: CatalogTable,
- indexName: String,
- override val output: Seq[Attribute]) extends
IndexBaseCommand {
+ indexName: String) extends IndexBaseCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val metaClient = createHoodieTableMetaClient(table.identifier,
sparkSession)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieSqlCommonAstBuilder.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieSqlCommonAstBuilder.scala
index 4005ef97e45..25cf2641a3c 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieSqlCommonAstBuilder.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieSqlCommonAstBuilder.scala
@@ -25,7 +25,7 @@ import
org.apache.hudi.spark.sql.parser.HoodieSqlCommonParser._
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute,
UnresolvedRelation}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
import org.apache.spark.sql.catalyst.parser.{ParserInterface, ParserUtils}
import org.apache.spark.sql.catalyst.plans.logical._
@@ -149,144 +149,4 @@ class HoodieSqlCommonAstBuilder(session: SparkSession,
delegate: ParserInterface
private def typedVisit[T](ctx: ParseTree): T = {
ctx.accept(this).asInstanceOf[T]
}
-
- /**
- * Create an index, returning a [[CreateIndex]] logical plan.
- * For example:
- * {{{
- * CREATE INDEX index_name ON [TABLE] table_name [USING index_type]
(column_index_property_list)
- * [OPTIONS indexPropertyList]
- * column_index_property_list: column_name [OPTIONS(indexPropertyList)] [
, . . . ]
- * indexPropertyList: index_property_name [= index_property_value] [ , .
. . ]
- * }}}
- */
- override def visitCreateIndex(ctx: CreateIndexContext): LogicalPlan =
withOrigin(ctx) {
- val (indexName, indexType) = if (ctx.identifier.size() == 1) {
- (ctx.identifier(0).getText, "")
- } else {
- (ctx.identifier(0).getText, ctx.identifier(1).getText)
- }
-
- val columns = ctx.columns.multipartIdentifierProperty.asScala
- .map(_.multipartIdentifier).map(typedVisit[Seq[String]]).toSeq
- val columnsProperties = ctx.columns.multipartIdentifierProperty.asScala
- .map(x =>
(Option(x.options).map(visitPropertyKeyValues).getOrElse(Map.empty))).toSeq
- val options =
Option(ctx.indexOptions).map(visitPropertyKeyValues).getOrElse(Map.empty)
-
- CreateIndex(
- visitTableIdentifier(ctx.tableIdentifier()),
- indexName,
- indexType,
- ctx.EXISTS != null,
- columns.map(UnresolvedAttribute(_)).zip(columnsProperties),
- options)
- }
-
- /**
- * Drop an index, returning a [[DropIndex]] logical plan.
- * For example:
- * {{{
- * DROP INDEX [IF EXISTS] index_name ON [TABLE] table_name
- * }}}
- */
- override def visitDropIndex(ctx: DropIndexContext): LogicalPlan =
withOrigin(ctx) {
- val indexName = ctx.identifier.getText
- DropIndex(
- visitTableIdentifier(ctx.tableIdentifier()),
- indexName,
- ctx.EXISTS != null)
- }
-
- /**
- * Show indexes, returning a [[ShowIndexes]] logical plan.
- * For example:
- * {{{
- * SHOW INDEXES (FROM | IN) [TABLE] table_name
- * }}}
- */
- override def visitShowIndexes(ctx: ShowIndexesContext): LogicalPlan =
withOrigin(ctx) {
- ShowIndexes(visitTableIdentifier(ctx.tableIdentifier()))
- }
-
- /**
- * Refresh index, returning a [[RefreshIndex]] logical plan
- * For example:
- * {{{
- * REFRESH INDEX index_name ON [TABLE] table_name
- * }}}
- */
- override def visitRefreshIndex(ctx: RefreshIndexContext): LogicalPlan =
withOrigin(ctx) {
- RefreshIndex(visitTableIdentifier(ctx.tableIdentifier()),
ctx.identifier.getText)
- }
-
- /**
- * Convert a property list into a key-value map.
- * This should be called through [[visitPropertyKeyValues]] or
[[visitPropertyKeys]].
- */
- override def visitPropertyList(
- ctx: PropertyListContext): Map[String,
String] = withOrigin(ctx) {
- val properties = ctx.property.asScala.map { property =>
- val key = visitPropertyKey(property.key)
- val value = visitPropertyValue(property.value)
- key -> value
- }
- // Check for duplicate property names.
- checkDuplicateKeys(properties.toSeq, ctx)
- properties.toMap
- }
-
- /**
- * Parse a key-value map from a [[PropertyListContext]], assuming all values
are specified.
- */
- def visitPropertyKeyValues(ctx: PropertyListContext): Map[String, String] = {
- val props = visitPropertyList(ctx)
- val badKeys = props.collect { case (key, null) => key }
- if (badKeys.nonEmpty) {
- operationNotAllowed(
- s"Values must be specified for key(s): ${badKeys.mkString("[", ",",
"]")}", ctx)
- }
- props
- }
-
- /**
- * Parse a list of keys from a [[PropertyListContext]], assuming no values
are specified.
- */
- def visitPropertyKeys(ctx: PropertyListContext): Seq[String] = {
- val props = visitPropertyList(ctx)
- val badKeys = props.filter { case (_, v) => v != null }.keys
- if (badKeys.nonEmpty) {
- operationNotAllowed(
- s"Values should not be specified for key(s): ${badKeys.mkString("[",
",", "]")}", ctx)
- }
- props.keys.toSeq
- }
-
- /**
- * A property key can either be String or a collection of dot separated
elements. This
- * function extracts the property key based on whether its a string literal
or a property
- * identifier.
- */
- override def visitPropertyKey(key: PropertyKeyContext): String = {
- if (key.STRING != null) {
- string(key.STRING)
- } else {
- key.getText
- }
- }
-
- /**
- * A property value can be String, Integer, Boolean or Decimal. This
function extracts
- * the property value based on whether its a string, integer, boolean or
decimal literal.
- */
- override def visitPropertyValue(value: PropertyValueContext): String = {
- if (value == null) {
- null
- } else if (value.STRING != null) {
- string(value.STRING)
- } else if (value.booleanValue != null) {
- value.getText.toLowerCase(Locale.ROOT)
- } else {
- value.getText
- }
- }
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestIndexSyntax.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestIndexSyntax.scala
index cb04c9d8d8b..43b4063260f 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestIndexSyntax.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestIndexSyntax.scala
@@ -19,6 +19,7 @@
package org.apache.spark.sql.hudi.command.index
+import org.apache.hudi.HoodieSparkUtils
import org.apache.spark.sql.catalyst.analysis.Analyzer
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.parser.ParserInterface
@@ -28,59 +29,61 @@ import
org.apache.spark.sql.hudi.command.{CreateIndexCommand, DropIndexCommand,
class TestIndexSyntax extends HoodieSparkSqlTestBase {
test("Test Create/Drop/Show/Refresh Index") {
- withTempDir { tmp =>
- Seq("cow", "mor").foreach { tableType =>
- val databaseName = "default"
- val tableName = generateTableName
- val basePath = s"${tmp.getCanonicalPath}/$tableName"
- spark.sql(
- s"""
- |create table $tableName (
- | id int,
- | name string,
- | price double,
- | ts long
- |) using hudi
- | options (
- | primaryKey ='id',
- | type = '$tableType',
- | preCombineField = 'ts'
- | )
- | partitioned by(ts)
- | location '$basePath'
+ if (HoodieSparkUtils.gteqSpark3_2) {
+ withTempDir { tmp =>
+ Seq("cow", "mor").foreach { tableType =>
+ val databaseName = "default"
+ val tableName = generateTableName
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | options (
+ | primaryKey ='id',
+ | type = '$tableType',
+ | preCombineField = 'ts'
+ | )
+ | partitioned by(ts)
+ | location '$basePath'
""".stripMargin)
- spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
- spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
- spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+ spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
- val sqlParser: ParserInterface = spark.sessionState.sqlParser
- val analyzer: Analyzer = spark.sessionState.analyzer
+ val sqlParser: ParserInterface = spark.sessionState.sqlParser
+ val analyzer: Analyzer = spark.sessionState.analyzer
- var logicalPlan = sqlParser.parsePlan(s"show indexes from
default.$tableName")
- var resolvedLogicalPlan = analyzer.execute(logicalPlan)
-
assertTableIdentifier(resolvedLogicalPlan.asInstanceOf[ShowIndexesCommand].table,
databaseName, tableName)
+ var logicalPlan = sqlParser.parsePlan(s"show indexes from
default.$tableName")
+ var resolvedLogicalPlan = analyzer.execute(logicalPlan)
+
assertTableIdentifier(resolvedLogicalPlan.asInstanceOf[ShowIndexesCommand].table,
databaseName, tableName)
- logicalPlan = sqlParser.parsePlan(s"create index idx_name on
$tableName using lucene (name) options(block_size=1024)")
- resolvedLogicalPlan = analyzer.execute(logicalPlan)
-
assertTableIdentifier(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].table,
databaseName, tableName)
-
assertResult("idx_name")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].indexName)
-
assertResult("lucene")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].indexType)
-
assertResult(false)(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].ignoreIfExists)
- assertResult(Map("block_size" ->
"1024"))(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].options)
+ logicalPlan = sqlParser.parsePlan(s"create index idx_name on
$tableName using lucene (name) options(block_size=1024)")
+ resolvedLogicalPlan = analyzer.execute(logicalPlan)
+
assertTableIdentifier(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].table,
databaseName, tableName)
+
assertResult("idx_name")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].indexName)
+
assertResult("lucene")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].indexType)
+
assertResult(false)(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].ignoreIfExists)
+ assertResult(Map("block_size" ->
"1024"))(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].options)
- logicalPlan = sqlParser.parsePlan(s"create index if not exists
idx_price on $tableName using lucene (price options(order='desc'))
options(block_size=512)")
- resolvedLogicalPlan = analyzer.execute(logicalPlan)
-
assertTableIdentifier(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].table,
databaseName, tableName)
-
assertResult("idx_price")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].indexName)
-
assertResult("lucene")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].indexType)
- assertResult(Map("order" ->
"desc"))(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].columns.head._2)
- assertResult(Map("block_size" ->
"512"))(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].options)
+ logicalPlan = sqlParser.parsePlan(s"create index if not exists
idx_price on $tableName using lucene (price options(order='desc'))
options(block_size=512)")
+ resolvedLogicalPlan = analyzer.execute(logicalPlan)
+
assertTableIdentifier(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].table,
databaseName, tableName)
+
assertResult("idx_price")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].indexName)
+
assertResult("lucene")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].indexType)
+ assertResult(Map("order" ->
"desc"))(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].columns.head._2)
+ assertResult(Map("block_size" ->
"512"))(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].options)
- logicalPlan = sqlParser.parsePlan(s"drop index if exists idx_name on
$tableName")
- resolvedLogicalPlan = analyzer.execute(logicalPlan)
-
assertTableIdentifier(resolvedLogicalPlan.asInstanceOf[DropIndexCommand].table,
databaseName, tableName)
-
assertResult("idx_name")(resolvedLogicalPlan.asInstanceOf[DropIndexCommand].indexName)
-
assertResult(true)(resolvedLogicalPlan.asInstanceOf[DropIndexCommand].ignoreIfNotExists)
+ logicalPlan = sqlParser.parsePlan(s"drop index if exists idx_name on
$tableName")
+ resolvedLogicalPlan = analyzer.execute(logicalPlan)
+
assertTableIdentifier(resolvedLogicalPlan.asInstanceOf[DropIndexCommand].table,
databaseName, tableName)
+
assertResult("idx_name")(resolvedLogicalPlan.asInstanceOf[DropIndexCommand].indexName)
+
assertResult(true)(resolvedLogicalPlan.asInstanceOf[DropIndexCommand].ignoreIfNotExists)
+ }
}
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala
index eae89099a62..816fecc38f5 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala
@@ -19,73 +19,77 @@
package org.apache.spark.sql.hudi.command.index
+import org.apache.hudi.HoodieSparkUtils
import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
class TestSecondaryIndex extends HoodieSparkSqlTestBase {
+
test("Test Create/Show/Drop Secondary Index") {
- withTempDir { tmp =>
- Seq("cow", "mor").foreach { tableType =>
- val tableName = generateTableName
- val basePath = s"${tmp.getCanonicalPath}/$tableName"
- spark.sql(
- s"""
- |create table $tableName (
- | id int,
- | name string,
- | price double,
- | ts long
- |) using hudi
- | options (
- | primaryKey ='id',
- | type = '$tableType',
- | preCombineField = 'ts'
- | )
- | partitioned by(ts)
- | location '$basePath'
+ if (HoodieSparkUtils.gteqSpark3_2) {
+ withTempDir { tmp =>
+ Seq("cow", "mor").foreach { tableType =>
+ val tableName = generateTableName
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | options (
+ | primaryKey ='id',
+ | type = '$tableType',
+ | preCombineField = 'ts'
+ | )
+ | partitioned by(ts)
+ | location '$basePath'
""".stripMargin)
- spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
- spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
- spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
- checkAnswer(s"show indexes from default.$tableName")()
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+ spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
+ checkAnswer(s"show indexes from default.$tableName")()
- checkAnswer(s"create index idx_name on $tableName using lucene (name)
options(block_size=1024)")()
- checkAnswer(s"create index idx_price on $tableName using lucene (price
options(order='desc')) options(block_size=512)")()
+ checkAnswer(s"create index idx_name on $tableName using lucene
(name) options(block_size=1024)")()
+ checkAnswer(s"create index idx_price on $tableName using lucene
(price options(order='desc')) options(block_size=512)")()
- // Create an index with multiple columns
- checkException(s"create index idx_id_ts on $tableName using lucene
(id, ts)")("Lucene index only support single column")
+ // Create an index with multiple columns
+ checkException(s"create index idx_id_ts on $tableName using lucene
(id, ts)")("Lucene index only support single column")
- // Create an index with the occupied name
- checkException(s"create index idx_price on $tableName using lucene
(price)")(
- "Secondary index already exists: idx_price"
- )
+ // Create an index with the occupied name
+ checkException(s"create index idx_price on $tableName using lucene
(price)")(
+ "Secondary index already exists: idx_price"
+ )
- // Create indexes repeatedly on columns(index name is different, but
the index type and involved column is same)
- checkException(s"create index idx_price_1 on $tableName using lucene
(price)")(
- "Secondary index already exists: idx_price_1"
- )
+ // Create indexes repeatedly on columns(index name is different, but
the index type and involved column is same)
+ checkException(s"create index idx_price_1 on $tableName using lucene
(price)")(
+ "Secondary index already exists: idx_price_1"
+ )
- spark.sql(s"show indexes from $tableName").show()
- checkAnswer(s"show indexes from $tableName")(
- Seq("idx_name", "name", "lucene", "", "{\"block_size\":\"1024\"}"),
- Seq("idx_price", "price", "lucene",
"{\"price\":{\"order\":\"desc\"}}", "{\"block_size\":\"512\"}")
- )
+ spark.sql(s"show indexes from $tableName").show()
+ checkAnswer(s"show indexes from $tableName")(
+ Seq("idx_name", "name", "lucene", "", "{\"block_size\":\"1024\"}"),
+ Seq("idx_price", "price", "lucene",
"{\"price\":{\"order\":\"desc\"}}", "{\"block_size\":\"512\"}")
+ )
- checkAnswer(s"drop index idx_name on $tableName")()
- checkException(s"drop index idx_name on $tableName")("Secondary index
not exists: idx_name")
+ checkAnswer(s"drop index idx_name on $tableName")()
+ checkException(s"drop index idx_name on $tableName")("Secondary
index not exists: idx_name")
- spark.sql(s"show indexes from $tableName").show()
- checkAnswer(s"show indexes from $tableName")(
- Seq("idx_price", "price", "lucene",
"{\"price\":{\"order\":\"desc\"}}", "{\"block_size\":\"512\"}")
- )
+ spark.sql(s"show indexes from $tableName").show()
+ checkAnswer(s"show indexes from $tableName")(
+ Seq("idx_price", "price", "lucene",
"{\"price\":{\"order\":\"desc\"}}", "{\"block_size\":\"512\"}")
+ )
- checkAnswer(s"drop index idx_price on $tableName")()
- checkAnswer(s"show indexes from $tableName")()
+ checkAnswer(s"drop index idx_price on $tableName")()
+ checkAnswer(s"show indexes from $tableName")()
- checkException(s"drop index idx_price on $tableName")("Secondary index
not exists: idx_price")
+ checkException(s"drop index idx_price on $tableName")("Secondary
index not exists: idx_price")
- checkException(s"create index idx_price_1 on $tableName using lucene
(field_not_exist)")(
- "Field not exists: field_not_exist"
- )
+ checkExceptionContain(s"create index idx_price_1 on $tableName using
lucene (field_not_exist)")(
+ "Missing field field_not_exist"
+ )
+ }
}
}
}
diff --git
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
index 6fb1719cede..069a4608073 100644
---
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
+++
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
@@ -107,4 +107,17 @@ object HoodieSpark2CatalystPlanUtils extends
HoodieCatalystPlansUtils {
case _ => plan
}
}
+
+ /**
+ * Commands of managing indexes are not supported for Spark2.
+ */
+ override def unapplyCreateIndex(plan: LogicalPlan): Option[(LogicalPlan,
String, String, Boolean, Seq[(Seq[String], Map[String, String])], Map[String,
String])] = {
+ None
+ }
+
+ override def unapplyDropIndex(plan: LogicalPlan): Option[(LogicalPlan,
String, Boolean)] = None
+
+ override def unapplyShowIndexes(plan: LogicalPlan): Option[(LogicalPlan,
Seq[Attribute])] = None
+
+ override def unapplyRefreshIndex(plan: LogicalPlan): Option[(LogicalPlan,
String)] = None
}
diff --git
a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala
index e9757c821d9..6cd5da79b86 100644
---
a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala
+++
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala
@@ -21,7 +21,7 @@ package org.apache.spark.sql
import org.apache.hudi.SparkHoodieTableFileIndex
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.ResolvedTable
-import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression,
ProjectionOverSchema}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet,
Expression, ProjectionOverSchema}
import org.apache.spark.sql.catalyst.planning.ScanOperation
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan,
MergeIntoTable, Project}
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
@@ -71,4 +71,15 @@ object HoodieSpark30CatalystPlanUtils extends
HoodieSpark3CatalystPlanUtils {
Some((c.tableName, true, false, c.cmd))
}
}
+
+ /**
+ * Commands of managing indexes are not supported for Spark3.0
+ */
+ override def unapplyCreateIndex(plan: LogicalPlan): Option[(LogicalPlan,
String, String, Boolean, Seq[(Seq[String], Map[String, String])], Map[String,
String])] = None
+
+ override def unapplyDropIndex(plan: LogicalPlan): Option[(LogicalPlan,
String, Boolean)] = None
+
+ override def unapplyShowIndexes(plan: LogicalPlan): Option[(LogicalPlan,
Seq[Attribute])] = None
+
+ override def unapplyRefreshIndex(plan: LogicalPlan): Option[(LogicalPlan,
String)] = None
}
diff --git
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala
index df94529ce12..8a56d0fba25 100644
---
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala
+++
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala
@@ -21,7 +21,7 @@ package org.apache.spark.sql
import org.apache.hudi.SparkHoodieTableFileIndex
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.ResolvedTable
-import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression,
ProjectionOverSchema}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet,
Expression, ProjectionOverSchema}
import org.apache.spark.sql.catalyst.planning.ScanOperation
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan,
MergeIntoTable, Project}
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
@@ -71,4 +71,15 @@ object HoodieSpark31CatalystPlanUtils extends
HoodieSpark3CatalystPlanUtils {
Some((c.tableName, true, false, c.cmd))
}
}
+
+ /**
+ * Managing Indexes commands are not supported for Spark3.1
+ */
+ override def unapplyCreateIndex(plan: LogicalPlan): Option[(LogicalPlan,
String, String, Boolean, Seq[(Seq[String], Map[String, String])], Map[String,
String])] = None
+
+ override def unapplyDropIndex(plan: LogicalPlan): Option[(LogicalPlan,
String, Boolean)] = None
+
+ override def unapplyShowIndexes(plan: LogicalPlan): Option[(LogicalPlan,
Seq[Attribute])] = None
+
+ override def unapplyRefreshIndex(plan: LogicalPlan): Option[(LogicalPlan,
String)] = None
}
diff --git
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/antlr4/imports/SqlBase.g4
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/antlr4/imports/SqlBase.g4
index d4e1e48351c..d7f87b4e5aa 100644
--- a/hudi-spark-datasource/hudi-spark3.2.x/src/main/antlr4/imports/SqlBase.g4
+++ b/hudi-spark-datasource/hudi-spark3.2.x/src/main/antlr4/imports/SqlBase.g4
@@ -755,6 +755,34 @@ functionIdentifier
: (db=errorCapturingIdentifier '.')? function=errorCapturingIdentifier
;
+multipartIdentifierPropertyList
+ : multipartIdentifierProperty (COMMA multipartIdentifierProperty)*
+ ;
+
+multipartIdentifierProperty
+ : multipartIdentifier (OPTIONS options=propertyList)?
+ ;
+
+propertyList
+ : LEFT_PAREN property (COMMA property)* RIGHT_PAREN
+ ;
+
+property
+ : key=propertyKey (EQ? value=propertyValue)?
+ ;
+
+propertyKey
+ : identifier (DOT identifier)*
+ | STRING
+ ;
+
+propertyValue
+ : INTEGER_VALUE
+ | DECIMAL_VALUE
+ | booleanValue
+ | STRING
+ ;
+
namedExpression
: expression (AS? (name=errorCapturingIdentifier | identifierList))?
;
@@ -1797,6 +1825,10 @@ TIMESTAMP: 'TIMESTAMP';
//============================
// End of the keywords list
//============================
+LEFT_PAREN: '(';
+RIGHT_PAREN: ')';
+COMMA: ',';
+DOT: '.';
EQ : '=' | '==';
NSEQ: '<=>';
diff --git
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlBase.g4
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlBase.g4
index 585a7f1c2fb..ddbecfefc76 100644
---
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlBase.g4
+++
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlBase.g4
@@ -29,5 +29,12 @@ statement
| createTableHeader ('(' colTypeList ')')? tableProvider?
createTableClauses
(AS? query)?
#createTable
+ | CREATE INDEX (IF NOT EXISTS)? identifier ON TABLE?
+ tableIdentifier (USING indexType=identifier)?
+ LEFT_PAREN columns=multipartIdentifierPropertyList RIGHT_PAREN
+ (OPTIONS indexOptions=propertyList)?
#createIndex
+ | DROP INDEX (IF EXISTS)? identifier ON TABLE? tableIdentifier
#dropIndex
+ | SHOW INDEXES (FROM | IN) TABLE? tableIdentifier
#showIndexes
+ | REFRESH INDEX identifier ON TABLE? tableIdentifier
#refreshIndex
| .*?
#passThrough
;
diff --git
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala
index d4624625d75..1bb4638fcdb 100644
---
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala
+++
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.analysis.AnalysisErrorAt
import org.apache.spark.sql.catalyst.analysis.ResolvedTable
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet,
Expression, ProjectionOverSchema}
import org.apache.spark.sql.catalyst.planning.ScanOperation
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan,
MergeIntoTable, Project}
+import org.apache.spark.sql.catalyst.plans.logical.{CreateIndex, DropIndex,
LogicalPlan, MergeIntoTable, Project, RefreshIndex, ShowIndexes}
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
import org.apache.spark.sql.execution.command.RepairTableCommand
import
org.apache.spark.sql.execution.datasources.parquet.NewHoodieParquetFileFormat
@@ -91,4 +91,40 @@ object HoodieSpark32CatalystPlanUtils extends
HoodieSpark3CatalystPlanUtils {
override def failAnalysisForMIT(a: Attribute, cols: String): Unit = {
a.failAnalysis(s"cannot resolve ${a.sql} in MERGE command given columns
[$cols]")
}
+
+ override def unapplyCreateIndex(plan: LogicalPlan): Option[(LogicalPlan,
String, String, Boolean, Seq[(Seq[String], Map[String, String])], Map[String,
String])] = {
+ plan match {
+ case ci @ CreateIndex(table, indexName, indexType, ignoreIfExists,
columns, properties) =>
+ Some((table, indexName, indexType, ignoreIfExists, columns.map(col =>
(col._1.name, col._2)), properties))
+ case _ =>
+ None
+ }
+ }
+
+ override def unapplyDropIndex(plan: LogicalPlan): Option[(LogicalPlan,
String, Boolean)] = {
+ plan match {
+ case ci @ DropIndex(table, indexName, ignoreIfNotExists) =>
+ Some((table, indexName, ignoreIfNotExists))
+ case _ =>
+ None
+ }
+ }
+
+ override def unapplyShowIndexes(plan: LogicalPlan): Option[(LogicalPlan,
Seq[Attribute])] = {
+ plan match {
+ case ci @ ShowIndexes(table, output) =>
+ Some((table, output))
+ case _ =>
+ None
+ }
+ }
+
+ override def unapplyRefreshIndex(plan: LogicalPlan): Option[(LogicalPlan,
String)] = {
+ plan match {
+ case ci @ RefreshIndex(table, indexName) =>
+ Some((table, indexName))
+ case _ =>
+ None
+ }
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_2ExtendedSqlAstBuilder.scala
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_2ExtendedSqlAstBuilder.scala
index 196a77cb13a..f750ddaf9c1 100644
---
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_2ExtendedSqlAstBuilder.scala
+++
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_2ExtendedSqlAstBuilder.scala
@@ -3317,6 +3317,145 @@ class HoodieSpark3_2ExtendedSqlAstBuilder(conf:
SQLConf, delegate: ParserInterfa
position = Option(ctx.colPosition).map(pos =>
UnresolvedFieldPosition(typedVisit[ColumnPosition](pos))))
}
+
+ /**
+ * Create an index, returning a [[CreateIndex]] logical plan.
+ * For example:
+ * {{{
+ * CREATE INDEX index_name ON [TABLE] table_name [USING index_type]
(column_index_property_list)
+ * [OPTIONS indexPropertyList]
+ * column_index_property_list: column_name [OPTIONS(indexPropertyList)] [
, . . . ]
+ * indexPropertyList: index_property_name [= index_property_value] [ , .
. . ]
+ * }}}
+ */
+ override def visitCreateIndex(ctx: CreateIndexContext): LogicalPlan =
withOrigin(ctx) {
+ val (indexName, indexType) = if (ctx.identifier.size() == 1) {
+ (ctx.identifier(0).getText, "")
+ } else {
+ (ctx.identifier(0).getText, ctx.identifier(1).getText)
+ }
+
+ val columns = ctx.columns.multipartIdentifierProperty.asScala
+ .map(_.multipartIdentifier).map(typedVisit[Seq[String]]).toSeq
+ val columnsProperties = ctx.columns.multipartIdentifierProperty.asScala
+ .map(x =>
(Option(x.options).map(visitPropertyKeyValues).getOrElse(Map.empty))).toSeq
+ val options =
Option(ctx.indexOptions).map(visitPropertyKeyValues).getOrElse(Map.empty)
+
+ CreateIndex(
+ UnresolvedRelation(visitTableIdentifier(ctx.tableIdentifier())),
+ indexName,
+ indexType,
+ ctx.EXISTS != null,
+ columns.map(UnresolvedFieldName).zip(columnsProperties),
+ options)
+ }
+
+ /**
+ * Drop an index, returning a [[DropIndex]] logical plan.
+ * For example:
+ * {{{
+ * DROP INDEX [IF EXISTS] index_name ON [TABLE] table_name
+ * }}}
+ */
+ override def visitDropIndex(ctx: DropIndexContext): LogicalPlan =
withOrigin(ctx) {
+ val indexName = ctx.identifier.getText
+ DropIndex(
+ UnresolvedRelation(visitTableIdentifier(ctx.tableIdentifier())),
+ indexName,
+ ctx.EXISTS != null)
+ }
+
+ /**
+ * Show indexes, returning a [[ShowIndexes]] logical plan.
+ * For example:
+ * {{{
+ * SHOW INDEXES (FROM | IN) [TABLE] table_name
+ * }}}
+ */
+ override def visitShowIndexes(ctx: ShowIndexesContext): LogicalPlan =
withOrigin(ctx) {
+
ShowIndexes(UnresolvedRelation(visitTableIdentifier(ctx.tableIdentifier())))
+ }
+
+ /**
+ * Refresh index, returning a [[RefreshIndex]] logical plan
+ * For example:
+ * {{{
+ * REFRESH INDEX index_name ON [TABLE] table_name
+ * }}}
+ */
+ override def visitRefreshIndex(ctx: RefreshIndexContext): LogicalPlan =
withOrigin(ctx) {
+
RefreshIndex(UnresolvedRelation(visitTableIdentifier(ctx.tableIdentifier())),
ctx.identifier.getText)
+ }
+
+ /**
+ * Convert a property list into a key-value map.
+ * This should be called through [[visitPropertyKeyValues]] or
[[visitPropertyKeys]].
+ */
+ override def visitPropertyList(ctx: PropertyListContext): Map[String,
String] = withOrigin(ctx) {
+ val properties = ctx.property.asScala.map { property =>
+ val key = visitPropertyKey(property.key)
+ val value = visitPropertyValue(property.value)
+ key -> value
+ }
+ // Check for duplicate property names.
+ checkDuplicateKeys(properties.toSeq, ctx)
+ properties.toMap
+ }
+
+ /**
+ * Parse a key-value map from a [[PropertyListContext]], assuming all values
are specified.
+ */
+ def visitPropertyKeyValues(ctx: PropertyListContext): Map[String, String] = {
+ val props = visitPropertyList(ctx)
+ val badKeys = props.collect { case (key, null) => key }
+ if (badKeys.nonEmpty) {
+ operationNotAllowed(
+ s"Values must be specified for key(s): ${badKeys.mkString("[", ",",
"]")}", ctx)
+ }
+ props
+ }
+
+ /**
+ * Parse a list of keys from a [[PropertyListContext]], assuming no values
are specified.
+ */
+ def visitPropertyKeys(ctx: PropertyListContext): Seq[String] = {
+ val props = visitPropertyList(ctx)
+ val badKeys = props.filter { case (_, v) => v != null }.keys
+ if (badKeys.nonEmpty) {
+ operationNotAllowed(
+ s"Values should not be specified for key(s): ${badKeys.mkString("[",
",", "]")}", ctx)
+ }
+ props.keys.toSeq
+ }
+
+ /**
+ * A property key can either be String or a collection of dot separated
elements. This
+ * function extracts the property key based on whether its a string literal
or a property
+ * identifier.
+ */
+ override def visitPropertyKey(key: PropertyKeyContext): String = {
+ if (key.STRING != null) {
+ string(key.STRING)
+ } else {
+ key.getText
+ }
+ }
+
+ /**
+ * A property value can be String, Integer, Boolean or Decimal. This
function extracts
+ * the property value based on whether its a string, integer, boolean or
decimal literal.
+ */
+ override def visitPropertyValue(value: PropertyValueContext): String = {
+ if (value == null) {
+ null
+ } else if (value.STRING != null) {
+ string(value.STRING)
+ } else if (value.booleanValue != null) {
+ value.getText.toLowerCase(Locale.ROOT)
+ } else {
+ value.getText
+ }
+ }
}
/**
diff --git
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_2ExtendedSqlParser.scala
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_2ExtendedSqlParser.scala
index 1f8d02340d9..a8a684dde75 100644
---
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_2ExtendedSqlParser.scala
+++
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_2ExtendedSqlParser.scala
@@ -121,7 +121,11 @@ class HoodieSpark3_2ExtendedSqlParser(session:
SparkSession, delegate: ParserInt
normalized.contains("system_time as of") ||
normalized.contains("timestamp as of") ||
normalized.contains("system_version as of") ||
- normalized.contains("version as of")
+ normalized.contains("version as of") ||
+ normalized.contains("create index") ||
+ normalized.contains("drop index") ||
+ normalized.contains("show indexes") ||
+ normalized.contains("refresh index")
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Index.scala
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Index.scala
similarity index 70%
rename from
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Index.scala
rename to
hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Index.scala
index 1cc8c997284..2524a838d5f 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Index.scala
+++
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Index.scala
@@ -19,20 +19,19 @@
package org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.analysis.FieldName
import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference}
import org.apache.spark.sql.types.StringType
/**
* The logical plan of the CREATE INDEX command.
*/
-case class CreateIndex(
- table: LogicalPlan,
- indexName: String,
- indexType: String,
- ignoreIfExists: Boolean,
- columns: Seq[(Attribute, Map[String, String])],
- options: Map[String, String],
- override val output: Seq[Attribute] = CreateIndex.getOutputAttrs) extends
Command {
+case class CreateIndex(table: LogicalPlan,
+ indexName: String,
+ indexType: String,
+ ignoreIfExists: Boolean,
+ columns: Seq[(FieldName, Map[String, String])],
+ properties: Map[String, String]) extends Command {
override def children: Seq[LogicalPlan] = Seq(table)
@@ -43,18 +42,12 @@ case class CreateIndex(
}
}
-object CreateIndex {
- def getOutputAttrs: Seq[Attribute] = Seq.empty
-}
-
/**
* The logical plan of the DROP INDEX command.
*/
-case class DropIndex(
- table: LogicalPlan,
- indexName: String,
- ignoreIfNotExists: Boolean,
- override val output: Seq[Attribute] = DropIndex.getOutputAttrs) extends
Command {
+case class DropIndex(table: LogicalPlan,
+ indexName: String,
+ ignoreIfNotExists: Boolean) extends Command {
override def children: Seq[LogicalPlan] = Seq(table)
@@ -63,16 +56,11 @@ case class DropIndex(
}
}
-object DropIndex {
- def getOutputAttrs: Seq[Attribute] = Seq.empty
-}
-
/**
* The logical plan of the SHOW INDEXES command.
*/
-case class ShowIndexes(
- table: LogicalPlan,
- override val output: Seq[Attribute] = ShowIndexes.getOutputAttrs) extends
Command {
+case class ShowIndexes(table: LogicalPlan,
+ override val output: Seq[Attribute] =
ShowIndexes.getOutputAttrs) extends Command {
override def children: Seq[LogicalPlan] = Seq(table)
@@ -94,10 +82,8 @@ object ShowIndexes {
/**
* The logical plan of the REFRESH INDEX command.
*/
-case class RefreshIndex(
- table: LogicalPlan,
- indexName: String,
- override val output: Seq[Attribute] = RefreshIndex.getOutputAttrs) extends
Command {
+case class RefreshIndex(table: LogicalPlan,
+ indexName: String) extends Command {
override def children: Seq[LogicalPlan] = Seq(table)
@@ -105,7 +91,3 @@ case class RefreshIndex(
copy(table = newChild.head)
}
}
-
-object RefreshIndex {
- def getOutputAttrs: Seq[Attribute] = Seq.empty
-}
diff --git
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala
index d64bc94301a..1c08717455a 100644
---
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala
+++
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala
@@ -21,12 +21,13 @@ import org.apache.hadoop.fs.Path
import org.apache.hudi.{DataSourceReadOptions, DefaultSource,
SparkAdapterSupport}
import org.apache.spark.sql.HoodieSpark3CatalystPlanUtils.MatchResolvedTable
import
org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer.resolveExpressionByPlanChildren
-import org.apache.spark.sql.catalyst.analysis.{AnalysisErrorAt,
EliminateSubqueryAliases, NamedRelation, UnresolvedAttribute,
UnresolvedPartitionSpec}
+import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases,
NamedRelation, ResolvedFieldName, UnresolvedAttribute, UnresolvedFieldName,
UnresolvedPartitionSpec}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logcal.{HoodieQuery,
HoodieTableChanges, HoodieTableChangesOptionsParser}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.Origin
import
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
import org.apache.spark.sql.connector.catalog.{Table, V1Table}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
@@ -201,6 +202,12 @@ case class HoodieSpark32PlusResolveReferences(spark:
SparkSession) extends Rule[
matchedActions = newMatchedActions,
notMatchedActions = newNotMatchedActions)
}
+
+ case cmd: CreateIndex if cmd.table.resolved &&
cmd.columns.exists(_._1.isInstanceOf[UnresolvedFieldName]) =>
+ cmd.copy(columns = cmd.columns.map {
+ case (u: UnresolvedFieldName, prop) => resolveFieldNames(cmd.table,
u.name, u) -> prop
+ case other => other
+ })
}
def resolveAssignments(
@@ -244,6 +251,35 @@ case class HoodieSpark32PlusResolveReferences(spark:
SparkSession) extends Rule[
}
}
+ /**
+ * Returns the resolved field name if the field can be resolved, returns
None if the column is
+ * not found. An error will be thrown in CheckAnalysis for columns that
can't be resolved.
+ */
+ private def resolveFieldNames(table: LogicalPlan,
+ fieldName: Seq[String],
+ context: Expression): ResolvedFieldName = {
+ resolveFieldNamesOpt(table, fieldName, context)
+ .getOrElse(throw missingFieldError(fieldName, table, context.origin))
+ }
+
+ private def resolveFieldNamesOpt(table: LogicalPlan,
+ fieldName: Seq[String],
+ context: Expression):
Option[ResolvedFieldName] = {
+ table.schema.findNestedField(
+ fieldName, includeCollections = true, conf.resolver, context.origin
+ ).map {
+ case (path, field) => ResolvedFieldName(path, field)
+ }
+ }
+
+ private def missingFieldError(fieldName: Seq[String], table: LogicalPlan,
context: Origin): Throwable = {
+ throw new AnalysisException(
+ s"Missing field ${fieldName.mkString(".")} with schema:\n" +
+ table.schema.treeString,
+ context.line,
+ context.startPosition)
+ }
+
private[sql] object MatchMergeIntoTable {
def unapply(plan: LogicalPlan): Option[(LogicalPlan, LogicalPlan,
Expression)] =
sparkAdapter.getCatalystPlanUtils.unapplyMergeIntoTable(plan)
diff --git
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/antlr4/imports/SqlBase.g4
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/antlr4/imports/SqlBase.g4
index d4e1e48351c..d7f87b4e5aa 100644
--- a/hudi-spark-datasource/hudi-spark3.3.x/src/main/antlr4/imports/SqlBase.g4
+++ b/hudi-spark-datasource/hudi-spark3.3.x/src/main/antlr4/imports/SqlBase.g4
@@ -755,6 +755,34 @@ functionIdentifier
: (db=errorCapturingIdentifier '.')? function=errorCapturingIdentifier
;
+multipartIdentifierPropertyList
+ : multipartIdentifierProperty (COMMA multipartIdentifierProperty)*
+ ;
+
+multipartIdentifierProperty
+ : multipartIdentifier (OPTIONS options=propertyList)?
+ ;
+
+propertyList
+ : LEFT_PAREN property (COMMA property)* RIGHT_PAREN
+ ;
+
+property
+ : key=propertyKey (EQ? value=propertyValue)?
+ ;
+
+propertyKey
+ : identifier (DOT identifier)*
+ | STRING
+ ;
+
+propertyValue
+ : INTEGER_VALUE
+ | DECIMAL_VALUE
+ | booleanValue
+ | STRING
+ ;
+
namedExpression
: expression (AS? (name=errorCapturingIdentifier | identifierList))?
;
@@ -1797,6 +1825,10 @@ TIMESTAMP: 'TIMESTAMP';
//============================
// End of the keywords list
//============================
+LEFT_PAREN: '(';
+RIGHT_PAREN: ')';
+COMMA: ',';
+DOT: '.';
EQ : '=' | '==';
NSEQ: '<=>';
diff --git
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlBase.g4
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlBase.g4
index 585a7f1c2fb..ddbecfefc76 100644
---
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlBase.g4
+++
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlBase.g4
@@ -29,5 +29,12 @@ statement
| createTableHeader ('(' colTypeList ')')? tableProvider?
createTableClauses
(AS? query)?
#createTable
+ | CREATE INDEX (IF NOT EXISTS)? identifier ON TABLE?
+ tableIdentifier (USING indexType=identifier)?
+ LEFT_PAREN columns=multipartIdentifierPropertyList RIGHT_PAREN
+ (OPTIONS indexOptions=propertyList)?
#createIndex
+ | DROP INDEX (IF EXISTS)? identifier ON TABLE? tableIdentifier
#dropIndex
+ | SHOW INDEXES (FROM | IN) TABLE? tableIdentifier
#showIndexes
+ | REFRESH INDEX identifier ON TABLE? tableIdentifier
#refreshIndex
| .*?
#passThrough
;
diff --git
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala
index 16f2517d128..85bd4a2c5e5 100644
---
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala
+++
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.analysis.AnalysisErrorAt
import org.apache.spark.sql.catalyst.analysis.ResolvedTable
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet,
Expression, ProjectionOverSchema}
import org.apache.spark.sql.catalyst.planning.ScanOperation
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan,
MergeIntoTable, Project}
+import org.apache.spark.sql.catalyst.plans.logical.{CreateIndex, DropIndex,
LogicalPlan, MergeIntoTable, Project, RefreshIndex, ShowIndexes}
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
import org.apache.spark.sql.execution.command.RepairTableCommand
import
org.apache.spark.sql.execution.datasources.parquet.NewHoodieParquetFileFormat
@@ -76,4 +76,40 @@ object HoodieSpark33CatalystPlanUtils extends
HoodieSpark3CatalystPlanUtils {
override def failAnalysisForMIT(a: Attribute, cols: String): Unit = {
a.failAnalysis(s"cannot resolve ${a.sql} in MERGE command given columns
[$cols]")
}
+
+ override def unapplyCreateIndex(plan: LogicalPlan): Option[(LogicalPlan,
String, String, Boolean, Seq[(Seq[String], Map[String, String])], Map[String,
String])] = {
+ plan match {
+ case ci @ CreateIndex(table, indexName, indexType, ignoreIfExists,
columns, properties) =>
+ Some((table, indexName, indexType, ignoreIfExists, columns.map(col =>
(col._1.name, col._2)), properties))
+ case _ =>
+ None
+ }
+ }
+
+ override def unapplyDropIndex(plan: LogicalPlan): Option[(LogicalPlan,
String, Boolean)] = {
+ plan match {
+ case ci @ DropIndex(table, indexName, ignoreIfNotExists) =>
+ Some((table, indexName, ignoreIfNotExists))
+ case _ =>
+ None
+ }
+ }
+
+ override def unapplyShowIndexes(plan: LogicalPlan): Option[(LogicalPlan,
Seq[Attribute])] = {
+ plan match {
+ case ci @ ShowIndexes(table, output) =>
+ Some((table, output))
+ case _ =>
+ None
+ }
+ }
+
+ override def unapplyRefreshIndex(plan: LogicalPlan): Option[(LogicalPlan,
String)] = {
+ plan match {
+ case ci @ RefreshIndex(table, indexName) =>
+ Some((table, indexName))
+ case _ =>
+ None
+ }
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_3ExtendedSqlAstBuilder.scala
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_3ExtendedSqlAstBuilder.scala
index 694a7133e4b..4e5e32e76fe 100644
---
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_3ExtendedSqlAstBuilder.scala
+++
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_3ExtendedSqlAstBuilder.scala
@@ -3327,6 +3327,145 @@ class HoodieSpark3_3ExtendedSqlAstBuilder(conf:
SQLConf, delegate: ParserInterfa
position = Option(ctx.colPosition).map(pos =>
UnresolvedFieldPosition(typedVisit[ColumnPosition](pos))))
}
+
+ /**
+ * Create an index, returning a [[CreateIndex]] logical plan.
+ * For example:
+ * {{{
+ * CREATE INDEX index_name ON [TABLE] table_name [USING index_type]
(column_index_property_list)
+ * [OPTIONS indexPropertyList]
+ * column_index_property_list: column_name [OPTIONS(indexPropertyList)] [
, . . . ]
+ * indexPropertyList: index_property_name [= index_property_value] [ , .
. . ]
+ * }}}
+ */
+ override def visitCreateIndex(ctx: CreateIndexContext): LogicalPlan =
withOrigin(ctx) {
+ val (indexName, indexType) = if (ctx.identifier.size() == 1) {
+ (ctx.identifier(0).getText, "")
+ } else {
+ (ctx.identifier(0).getText, ctx.identifier(1).getText)
+ }
+
+ val columns = ctx.columns.multipartIdentifierProperty.asScala
+ .map(_.multipartIdentifier).map(typedVisit[Seq[String]]).toSeq
+ val columnsProperties = ctx.columns.multipartIdentifierProperty.asScala
+ .map(x =>
(Option(x.options).map(visitPropertyKeyValues).getOrElse(Map.empty))).toSeq
+ val options =
Option(ctx.indexOptions).map(visitPropertyKeyValues).getOrElse(Map.empty)
+
+ CreateIndex(
+ UnresolvedRelation(visitTableIdentifier(ctx.tableIdentifier())),
+ indexName,
+ indexType,
+ ctx.EXISTS != null,
+ columns.map(UnresolvedFieldName).zip(columnsProperties),
+ options)
+ }
+
+ /**
+ * Drop an index, returning a [[DropIndex]] logical plan.
+ * For example:
+ * {{{
+ * DROP INDEX [IF EXISTS] index_name ON [TABLE] table_name
+ * }}}
+ */
+ override def visitDropIndex(ctx: DropIndexContext): LogicalPlan =
withOrigin(ctx) {
+ val indexName = ctx.identifier.getText
+ DropIndex(
+ UnresolvedRelation(visitTableIdentifier(ctx.tableIdentifier())),
+ indexName,
+ ctx.EXISTS != null)
+ }
+
+ /**
+ * Show indexes, returning a [[ShowIndexes]] logical plan.
+ * For example:
+ * {{{
+ * SHOW INDEXES (FROM | IN) [TABLE] table_name
+ * }}}
+ */
+ override def visitShowIndexes(ctx: ShowIndexesContext): LogicalPlan =
withOrigin(ctx) {
+
ShowIndexes(UnresolvedRelation(visitTableIdentifier(ctx.tableIdentifier())))
+ }
+
+ /**
+ * Refresh index, returning a [[RefreshIndex]] logical plan
+ * For example:
+ * {{{
+ * REFRESH INDEX index_name ON [TABLE] table_name
+ * }}}
+ */
+ override def visitRefreshIndex(ctx: RefreshIndexContext): LogicalPlan =
withOrigin(ctx) {
+
RefreshIndex(UnresolvedRelation(visitTableIdentifier(ctx.tableIdentifier())),
ctx.identifier.getText)
+ }
+
+ /**
+ * Convert a property list into a key-value map.
+ * This should be called through [[visitPropertyKeyValues]] or
[[visitPropertyKeys]].
+ */
+ override def visitPropertyList(ctx: PropertyListContext): Map[String,
String] = withOrigin(ctx) {
+ val properties = ctx.property.asScala.map { property =>
+ val key = visitPropertyKey(property.key)
+ val value = visitPropertyValue(property.value)
+ key -> value
+ }
+ // Check for duplicate property names.
+ checkDuplicateKeys(properties.toSeq, ctx)
+ properties.toMap
+ }
+
+ /**
+ * Parse a key-value map from a [[PropertyListContext]], assuming all values
are specified.
+ */
+ def visitPropertyKeyValues(ctx: PropertyListContext): Map[String, String] = {
+ val props = visitPropertyList(ctx)
+ val badKeys = props.collect { case (key, null) => key }
+ if (badKeys.nonEmpty) {
+ operationNotAllowed(
+ s"Values must be specified for key(s): ${badKeys.mkString("[", ",",
"]")}", ctx)
+ }
+ props
+ }
+
+ /**
+ * Parse a list of keys from a [[PropertyListContext]], assuming no values
are specified.
+ */
+ def visitPropertyKeys(ctx: PropertyListContext): Seq[String] = {
+ val props = visitPropertyList(ctx)
+ val badKeys = props.filter { case (_, v) => v != null }.keys
+ if (badKeys.nonEmpty) {
+ operationNotAllowed(
+ s"Values should not be specified for key(s): ${badKeys.mkString("[",
",", "]")}", ctx)
+ }
+ props.keys.toSeq
+ }
+
+ /**
+ * A property key can either be String or a collection of dot separated
elements. This
+ * function extracts the property key based on whether its a string literal
or a property
+ * identifier.
+ */
+ override def visitPropertyKey(key: PropertyKeyContext): String = {
+ if (key.STRING != null) {
+ string(key.STRING)
+ } else {
+ key.getText
+ }
+ }
+
+ /**
+ * A property value can be String, Integer, Boolean or Decimal. This
function extracts
+ * the property value based on whether its a string, integer, boolean or
decimal literal.
+ */
+ override def visitPropertyValue(value: PropertyValueContext): String = {
+ if (value == null) {
+ null
+ } else if (value.STRING != null) {
+ string(value.STRING)
+ } else if (value.booleanValue != null) {
+ value.getText.toLowerCase(Locale.ROOT)
+ } else {
+ value.getText
+ }
+ }
}
/**
diff --git
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_3ExtendedSqlParser.scala
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_3ExtendedSqlParser.scala
index 4c59f56828f..24b665c8a37 100644
---
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_3ExtendedSqlParser.scala
+++
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_3ExtendedSqlParser.scala
@@ -123,7 +123,11 @@ class HoodieSpark3_3ExtendedSqlParser(session:
SparkSession, delegate: ParserInt
normalized.contains("system_time as of") ||
normalized.contains("timestamp as of") ||
normalized.contains("system_version as of") ||
- normalized.contains("version as of")
+ normalized.contains("version as of") ||
+ normalized.contains("create index") ||
+ normalized.contains("drop index") ||
+ normalized.contains("show indexes") ||
+ normalized.contains("refresh index")
}
}
diff --git
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/antlr4/imports/SqlBase.g4
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/antlr4/imports/SqlBase.g4
index d4e1e48351c..d7f87b4e5aa 100644
--- a/hudi-spark-datasource/hudi-spark3.4.x/src/main/antlr4/imports/SqlBase.g4
+++ b/hudi-spark-datasource/hudi-spark3.4.x/src/main/antlr4/imports/SqlBase.g4
@@ -755,6 +755,34 @@ functionIdentifier
: (db=errorCapturingIdentifier '.')? function=errorCapturingIdentifier
;
+multipartIdentifierPropertyList
+ : multipartIdentifierProperty (COMMA multipartIdentifierProperty)*
+ ;
+
+multipartIdentifierProperty
+ : multipartIdentifier (OPTIONS options=propertyList)?
+ ;
+
+propertyList
+ : LEFT_PAREN property (COMMA property)* RIGHT_PAREN
+ ;
+
+property
+ : key=propertyKey (EQ? value=propertyValue)?
+ ;
+
+propertyKey
+ : identifier (DOT identifier)*
+ | STRING
+ ;
+
+propertyValue
+ : INTEGER_VALUE
+ | DECIMAL_VALUE
+ | booleanValue
+ | STRING
+ ;
+
namedExpression
: expression (AS? (name=errorCapturingIdentifier | identifierList))?
;
@@ -1797,6 +1825,10 @@ TIMESTAMP: 'TIMESTAMP';
//============================
// End of the keywords list
//============================
+LEFT_PAREN: '(';
+RIGHT_PAREN: ')';
+COMMA: ',';
+DOT: '.';
EQ : '=' | '==';
NSEQ: '<=>';
diff --git
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlBase.g4
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlBase.g4
index 585a7f1c2fb..ddbecfefc76 100644
---
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlBase.g4
+++
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlBase.g4
@@ -29,5 +29,12 @@ statement
| createTableHeader ('(' colTypeList ')')? tableProvider?
createTableClauses
(AS? query)?
#createTable
+ | CREATE INDEX (IF NOT EXISTS)? identifier ON TABLE?
+ tableIdentifier (USING indexType=identifier)?
+ LEFT_PAREN columns=multipartIdentifierPropertyList RIGHT_PAREN
+ (OPTIONS indexOptions=propertyList)?
#createIndex
+ | DROP INDEX (IF EXISTS)? identifier ON TABLE? tableIdentifier
#dropIndex
+ | SHOW INDEXES (FROM | IN) TABLE? tableIdentifier
#showIndexes
+ | REFRESH INDEX identifier ON TABLE? tableIdentifier
#refreshIndex
| .*?
#passThrough
;
diff --git
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala
index 947a73285f5..41b629aac8e 100644
---
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala
+++
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{AnalysisErrorAt, ResolvedTable}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet,
Expression, ProjectionOverSchema}
import org.apache.spark.sql.catalyst.planning.ScanOperation
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan,
MergeIntoTable, Project}
+import org.apache.spark.sql.catalyst.plans.logical.{CreateIndex, DropIndex,
LogicalPlan, MergeIntoTable, Project, RefreshIndex, ShowIndexes}
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
import org.apache.spark.sql.execution.command.RepairTableCommand
import
org.apache.spark.sql.execution.datasources.parquet.NewHoodieParquetFileFormat
@@ -79,4 +79,40 @@ object HoodieSpark34CatalystPlanUtils extends
HoodieSpark3CatalystPlanUtils {
"sqlExpr" -> a.sql,
"cols" -> cols))
}
+
+ override def unapplyCreateIndex(plan: LogicalPlan): Option[(LogicalPlan,
String, String, Boolean, Seq[(Seq[String], Map[String, String])], Map[String,
String])] = {
+ plan match {
+ case ci@CreateIndex(table, indexName, indexType, ignoreIfExists,
columns, properties) =>
+ Some((table, indexName, indexType, ignoreIfExists, columns.map(col =>
(col._1.name, col._2)), properties))
+ case _ =>
+ None
+ }
+ }
+
+ override def unapplyDropIndex(plan: LogicalPlan): Option[(LogicalPlan,
String, Boolean)] = {
+ plan match {
+ case ci@DropIndex(table, indexName, ignoreIfNotExists) =>
+ Some((table, indexName, ignoreIfNotExists))
+ case _ =>
+ None
+ }
+ }
+
+ override def unapplyShowIndexes(plan: LogicalPlan): Option[(LogicalPlan,
Seq[Attribute])] = {
+ plan match {
+ case ci@ShowIndexes(table, output) =>
+ Some((table, output))
+ case _ =>
+ None
+ }
+ }
+
+ override def unapplyRefreshIndex(plan: LogicalPlan): Option[(LogicalPlan,
String)] = {
+ plan match {
+ case ci@RefreshIndex(table, indexName) =>
+ Some((table, indexName))
+ case _ =>
+ None
+ }
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_4ExtendedSqlAstBuilder.scala
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_4ExtendedSqlAstBuilder.scala
index fe07aeb74e8..670d56ea37a 100644
---
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_4ExtendedSqlAstBuilder.scala
+++
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_4ExtendedSqlAstBuilder.scala
@@ -3331,6 +3331,145 @@ class HoodieSpark3_4ExtendedSqlAstBuilder(conf:
SQLConf, delegate: ParserInterfa
UnresolvedFieldPosition(typedVisit[ColumnPosition](pos))),
default = Option(null))
}
+
+ /**
+ * Create an index, returning a [[CreateIndex]] logical plan.
+ * For example:
+ * {{{
+ * CREATE INDEX index_name ON [TABLE] table_name [USING index_type]
(column_index_property_list)
+ * [OPTIONS indexPropertyList]
+ * column_index_property_list: column_name [OPTIONS(indexPropertyList)] [
, . . . ]
+ * indexPropertyList: index_property_name [= index_property_value] [ , .
. . ]
+ * }}}
+ */
+ override def visitCreateIndex(ctx: CreateIndexContext): LogicalPlan =
withOrigin(ctx) {
+ val (indexName, indexType) = if (ctx.identifier.size() == 1) {
+ (ctx.identifier(0).getText, "")
+ } else {
+ (ctx.identifier(0).getText, ctx.identifier(1).getText)
+ }
+
+ val columns = ctx.columns.multipartIdentifierProperty.asScala
+ .map(_.multipartIdentifier).map(typedVisit[Seq[String]]).toSeq
+ val columnsProperties = ctx.columns.multipartIdentifierProperty.asScala
+ .map(x =>
(Option(x.options).map(visitPropertyKeyValues).getOrElse(Map.empty))).toSeq
+ val options =
Option(ctx.indexOptions).map(visitPropertyKeyValues).getOrElse(Map.empty)
+
+ CreateIndex(
+ UnresolvedRelation(visitTableIdentifier(ctx.tableIdentifier())),
+ indexName,
+ indexType,
+ ctx.EXISTS != null,
+ columns.map(UnresolvedFieldName).zip(columnsProperties),
+ options)
+ }
+
+ /**
+ * Drop an index, returning a [[DropIndex]] logical plan.
+ * For example:
+ * {{{
+ * DROP INDEX [IF EXISTS] index_name ON [TABLE] table_name
+ * }}}
+ */
+ override def visitDropIndex(ctx: DropIndexContext): LogicalPlan =
withOrigin(ctx) {
+ val indexName = ctx.identifier.getText
+ DropIndex(
+ UnresolvedRelation(visitTableIdentifier(ctx.tableIdentifier())),
+ indexName,
+ ctx.EXISTS != null)
+ }
+
+ /**
+ * Show indexes, returning a [[ShowIndexes]] logical plan.
+ * For example:
+ * {{{
+ * SHOW INDEXES (FROM | IN) [TABLE] table_name
+ * }}}
+ */
+ override def visitShowIndexes(ctx: ShowIndexesContext): LogicalPlan =
withOrigin(ctx) {
+
ShowIndexes(UnresolvedRelation(visitTableIdentifier(ctx.tableIdentifier())))
+ }
+
+ /**
+ * Refresh index, returning a [[RefreshIndex]] logical plan
+ * For example:
+ * {{{
+ * REFRESH INDEX index_name ON [TABLE] table_name
+ * }}}
+ */
+ override def visitRefreshIndex(ctx: RefreshIndexContext): LogicalPlan =
withOrigin(ctx) {
+
RefreshIndex(UnresolvedRelation(visitTableIdentifier(ctx.tableIdentifier())),
ctx.identifier.getText)
+ }
+
+ /**
+ * Convert a property list into a key-value map.
+ * This should be called through [[visitPropertyKeyValues]] or
[[visitPropertyKeys]].
+ */
+ override def visitPropertyList(ctx: PropertyListContext): Map[String,
String] = withOrigin(ctx) {
+ val properties = ctx.property.asScala.map { property =>
+ val key = visitPropertyKey(property.key)
+ val value = visitPropertyValue(property.value)
+ key -> value
+ }
+ // Check for duplicate property names.
+ checkDuplicateKeys(properties.toSeq, ctx)
+ properties.toMap
+ }
+
+ /**
+ * Parse a key-value map from a [[PropertyListContext]], assuming all values
are specified.
+ */
+ def visitPropertyKeyValues(ctx: PropertyListContext): Map[String, String] = {
+ val props = visitPropertyList(ctx)
+ val badKeys = props.collect { case (key, null) => key }
+ if (badKeys.nonEmpty) {
+ operationNotAllowed(
+ s"Values must be specified for key(s): ${badKeys.mkString("[", ",",
"]")}", ctx)
+ }
+ props
+ }
+
+ /**
+ * Parse a list of keys from a [[PropertyListContext]], assuming no values
are specified.
+ */
+ def visitPropertyKeys(ctx: PropertyListContext): Seq[String] = {
+ val props = visitPropertyList(ctx)
+ val badKeys = props.filter { case (_, v) => v != null }.keys
+ if (badKeys.nonEmpty) {
+ operationNotAllowed(
+ s"Values should not be specified for key(s): ${badKeys.mkString("[",
",", "]")}", ctx)
+ }
+ props.keys.toSeq
+ }
+
+ /**
+ * A property key can either be String or a collection of dot separated
elements. This
+ * function extracts the property key based on whether its a string literal
or a property
+ * identifier.
+ */
+ override def visitPropertyKey(key: PropertyKeyContext): String = {
+ if (key.STRING != null) {
+ string(key.STRING)
+ } else {
+ key.getText
+ }
+ }
+
+ /**
+ * A property value can be String, Integer, Boolean or Decimal. This
function extracts
+ * the property value based on whether its a string, integer, boolean or
decimal literal.
+ */
+ override def visitPropertyValue(value: PropertyValueContext): String = {
+ if (value == null) {
+ null
+ } else if (value.STRING != null) {
+ string(value.STRING)
+ } else if (value.booleanValue != null) {
+ value.getText.toLowerCase(Locale.ROOT)
+ } else {
+ value.getText
+ }
+ }
}
/**
diff --git
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_4ExtendedSqlParser.scala
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_4ExtendedSqlParser.scala
index 8b554af6af3..dc7472ff83d 100644
---
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_4ExtendedSqlParser.scala
+++
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_4ExtendedSqlParser.scala
@@ -123,7 +123,11 @@ class HoodieSpark3_4ExtendedSqlParser(session:
SparkSession, delegate: ParserInt
normalized.contains("system_time as of") ||
normalized.contains("timestamp as of") ||
normalized.contains("system_version as of") ||
- normalized.contains("version as of")
+ normalized.contains("version as of") ||
+ normalized.contains("create index") ||
+ normalized.contains("drop index") ||
+ normalized.contains("show indexes") ||
+ normalized.contains("refresh index")
}
}