This is an automated email from the ASF dual-hosted git repository.
forwardxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new fec49dc12b [HUDI-4165] Support Create/Drop/Show/Refresh Index Syntax
for Spark SQL (#5761)
fec49dc12b is described below
commit fec49dc12b19801a75c3f61c9da0d47ef0c1d2f8
Author: huberylee <[email protected]>
AuthorDate: Fri Jun 17 18:33:58 2022 +0800
[HUDI-4165] Support Create/Drop/Show/Refresh Index Syntax for Spark SQL
(#5761)
* Support Create/Drop/Show/Refresh Index Syntax for Spark SQL
---
.../org/apache/hudi/common/index/HoodieIndex.java | 119 +++++++++++++++++
.../apache/hudi/common/index/HoodieIndexType.java | 54 ++++++++
.../hudi/spark/sql/parser/HoodieSqlCommon.g4 | 76 ++++++++++-
.../spark/sql/catalyst/plans/logical/Index.scala | 111 ++++++++++++++++
.../spark/sql/hudi/analysis/HoodieAnalysis.scala | 22 ++++
.../spark/sql/hudi/command/IndexCommands.scala | 101 +++++++++++++++
.../sql/parser/HoodieSqlCommonAstBuilder.scala | 143 ++++++++++++++++++++-
.../sql/hudi/command/index/TestIndexSyntax.scala | 85 ++++++++++++
8 files changed, 709 insertions(+), 2 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/index/HoodieIndex.java
b/hudi-common/src/main/java/org/apache/hudi/common/index/HoodieIndex.java
new file mode 100644
index 0000000000..6dabb1a41f
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/index/HoodieIndex.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.index;
+
+import java.util.Arrays;
+import java.util.Map;
+
+public class HoodieIndex {
+ private String indexName;
+ private String[] colNames;
+ private HoodieIndexType indexType;
+ private Map<String, Map<String, String>> colOptions;
+ private Map<String, String> options;
+
+ public HoodieIndex() {
+ }
+
+ public HoodieIndex(
+ String indexName,
+ String[] colNames,
+ HoodieIndexType indexType,
+ Map<String, Map<String, String>> colOptions,
+ Map<String, String> options) {
+ this.indexName = indexName;
+ this.colNames = colNames;
+ this.indexType = indexType;
+ this.colOptions = colOptions;
+ this.options = options;
+ }
+
+ public String getIndexName() {
+ return indexName;
+ }
+
+ public String[] getColNames() {
+ return colNames;
+ }
+
+ public HoodieIndexType getIndexType() {
+ return indexType;
+ }
+
+ public Map<String, Map<String, String>> getColOptions() {
+ return colOptions;
+ }
+
+ public Map<String, String> getOptions() {
+ return options;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ @Override
+ public String toString() {
+ return "HoodieIndex{"
+ + "indexName='" + indexName + '\''
+ + ", colNames='" + Arrays.toString(colNames) + '\''
+ + ", indexType=" + indexType
+ + ", colOptions=" + colOptions
+ + ", options=" + options
+ + '}';
+ }
+
+ public static class Builder {
+ private String indexName;
+ private String[] colNames;
+ private HoodieIndexType indexType;
+ private Map<String, Map<String, String>> colOptions;
+ private Map<String, String> options;
+
+ public Builder setIndexName(String indexName) {
+ this.indexName = indexName;
+ return this;
+ }
+
+ public Builder setColNames(String[] colNames) {
+ this.colNames = colNames;
+ return this;
+ }
+
+ public Builder setIndexType(String indexType) {
+ this.indexType = HoodieIndexType.of(indexType);
+ return this;
+ }
+
+ public Builder setColOptions(Map<String, Map<String, String>> colOptions) {
+ this.colOptions = colOptions;
+ return this;
+ }
+
+ public Builder setOptions(Map<String, String> options) {
+ this.options = options;
+ return this;
+ }
+
+ public HoodieIndex build() {
+ return new HoodieIndex(indexName, colNames, indexType, colOptions,
options);
+ }
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/index/HoodieIndexType.java
b/hudi-common/src/main/java/org/apache/hudi/common/index/HoodieIndexType.java
new file mode 100644
index 0000000000..03618a7679
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/common/index/HoodieIndexType.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.index;
+
+import org.apache.hudi.exception.HoodieIndexException;
+
+import java.util.Arrays;
+
+public enum HoodieIndexType {
+ LUCENE((byte) 1);
+
+ private final byte type;
+
+ HoodieIndexType(byte type) {
+ this.type = type;
+ }
+
+ public byte getValue() {
+ return type;
+ }
+
+ public static HoodieIndexType of(byte indexType) {
+ return Arrays.stream(HoodieIndexType.values())
+ .filter(t -> t.type == indexType)
+ .findAny()
+ .orElseThrow(() ->
+ new HoodieIndexException("Unknown hoodie index type:" +
indexType));
+ }
+
+ public static HoodieIndexType of(String indexType) {
+ return Arrays.stream(HoodieIndexType.values())
+ .filter(t -> t.name().equals(indexType.toUpperCase()))
+ .findAny()
+ .orElseThrow(() ->
+ new HoodieIndexException("Unknown hoodie index type:" +
indexType));
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlCommon.g4
b/hudi-spark-datasource/hudi-spark/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlCommon.g4
index 0cde14a4e4..65e17bfb46 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlCommon.g4
+++
b/hudi-spark-datasource/hudi-spark/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlCommon.g4
@@ -48,6 +48,13 @@
statement
: compactionStatement
#compactionCommand
| CALL multipartIdentifier '(' (callArgument (',' callArgument)*)? ')'
#call
+ | CREATE INDEX (IF NOT EXISTS)? identifier ON TABLE?
+ tableIdentifier (USING indexType=identifier)?
+ LEFT_PAREN columns=multipartIdentifierPropertyList RIGHT_PAREN
+ (OPTIONS indexOptions=propertyList)?
#createIndex
+ | DROP INDEX (IF EXISTS)? identifier ON TABLE? tableIdentifier
#dropIndex
+ | SHOW INDEXES (FROM | IN) TABLE? tableIdentifier
#showIndexes
+ | REFRESH INDEX identifier ON TABLE? tableIdentifier
#refreshIndex
| .*?
#passThrough
;
@@ -99,6 +106,14 @@
| MINUS? BIGDECIMAL_LITERAL #bigDecimalLiteral
;
+ multipartIdentifierPropertyList
+ : multipartIdentifierProperty (COMMA multipartIdentifierProperty)*
+ ;
+
+ multipartIdentifierProperty
+ : multipartIdentifier (OPTIONS options=propertyList)?
+ ;
+
multipartIdentifier
: parts+=identifier ('.' parts+=identifier)*
;
@@ -114,9 +129,53 @@
;
nonReserved
- : CALL | COMPACTION | RUN | SCHEDULE | ON | SHOW | LIMIT
+ : CALL
+ | COMPACTION
+ | CREATE
+ | DROP
+ | EXISTS
+ | FROM
+ | IN
+ | INDEX
+ | INDEXES
+ | IF
+ | LIMIT
+ | NOT
+ | ON
+ | OPTIONS
+ | REFRESH
+ | RUN
+ | SCHEDULE
+ | SHOW
+ | TABLE
+ | USING
;
+ propertyList
+ : LEFT_PAREN property (COMMA property)* RIGHT_PAREN
+ ;
+
+ property
+ : key=propertyKey (EQ? value=propertyValue)?
+ ;
+
+ propertyKey
+ : identifier (DOT identifier)*
+ | STRING
+ ;
+
+ propertyValue
+ : INTEGER_VALUE
+ | DECIMAL_VALUE
+ | booleanValue
+ | STRING
+ ;
+
+ LEFT_PAREN: '(';
+ RIGHT_PAREN: ')';
+ COMMA: ',';
+ DOT: '.';
+
ALL: 'ALL';
AT: 'AT';
CALL: 'CALL';
@@ -132,6 +191,21 @@
FALSE: 'FALSE';
INTERVAL: 'INTERVAL';
TO: 'TO';
+ CREATE: 'CREATE';
+ INDEX: 'INDEX';
+ INDEXES: 'INDEXES';
+ IF: 'IF';
+ NOT: 'NOT';
+ EXISTS: 'EXISTS';
+ TABLE: 'TABLE';
+ USING: 'USING';
+ OPTIONS: 'OPTIONS';
+ DROP: 'DROP';
+ FROM: 'FROM';
+ IN: 'IN';
+ REFRESH: 'REFRESH';
+
+ EQ: '=' | '==';
PLUS: '+';
MINUS: '-';
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Index.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Index.scala
new file mode 100644
index 0000000000..12ee2e8058
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Index.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference}
+import org.apache.spark.sql.types.StringType
+
+/**
+ * The logical plan of the CREATE INDEX command.
+ */
+case class CreateIndex(
+ table: LogicalPlan,
+ indexName: String,
+ indexType: String,
+ ignoreIfExists: Boolean,
+ columns: Seq[(Attribute, Map[String, String])],
+ properties: Map[String, String],
+ override val output: Seq[Attribute] = CreateIndex.getOutputAttrs) extends
Command {
+
+ override def children: Seq[LogicalPlan] = Seq(table)
+
+ override lazy val resolved: Boolean = table.resolved &&
columns.forall(_._1.resolved)
+
+ def withNewChildrenInternal(newChild: IndexedSeq[LogicalPlan]): CreateIndex
= {
+ copy(table = newChild.head)
+ }
+}
+
+object CreateIndex {
+ def getOutputAttrs: Seq[Attribute] = Seq.empty
+}
+
+/**
+ * The logical plan of the DROP INDEX command.
+ */
+case class DropIndex(
+ table: LogicalPlan,
+ indexName: String,
+ ignoreIfNotExists: Boolean,
+ override val output: Seq[Attribute] = DropIndex.getOutputAttrs) extends
Command {
+
+ override def children: Seq[LogicalPlan] = Seq(table)
+
+ def withNewChildrenInternal(newChild: IndexedSeq[LogicalPlan]): DropIndex = {
+ copy(table = newChild.head)
+ }
+}
+
+object DropIndex {
+ def getOutputAttrs: Seq[Attribute] = Seq.empty
+}
+
+/**
+ * The logical plan of the SHOW INDEXES command.
+ */
+case class ShowIndexes(
+ table: LogicalPlan,
+ override val output: Seq[Attribute] = ShowIndexes.getOutputAttrs) extends
Command {
+
+ override def children: Seq[LogicalPlan] = Seq(table)
+
+ def withNewChildrenInternal(newChild: IndexedSeq[LogicalPlan]): ShowIndexes
= {
+ copy(table = newChild.head)
+ }
+}
+
+object ShowIndexes {
+ def getOutputAttrs: Seq[Attribute] = Seq(
+ AttributeReference("index_name", StringType, nullable = false)(),
+ AttributeReference("col_name", StringType, nullable = false)(),
+ AttributeReference("index_type", StringType, nullable = false)(),
+ AttributeReference("col_options", StringType, nullable = true)(),
+ AttributeReference("options", StringType, nullable = true)()
+ )
+}
+
+/**
+ * The logical plan of the REFRESH INDEX command.
+ */
+case class RefreshIndex(
+ table: LogicalPlan,
+ indexName: String,
+ override val output: Seq[Attribute] = RefreshIndex.getOutputAttrs) extends
Command {
+
+ override def children: Seq[LogicalPlan] = Seq(table)
+
+ def withNewChildrenInternal(newChild: IndexedSeq[LogicalPlan]): RefreshIndex
= {
+ copy(table = newChild.head)
+ }
+}
+
+object RefreshIndex {
+ def getOutputAttrs: Seq[Attribute] = Seq.empty
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
index 97e453ff7e..a44abc72cf 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
@@ -171,6 +171,28 @@ case class HoodieAnalysis(sparkSession: SparkSession)
extends Rule[LogicalPlan]
} else {
c
}
+
+ // Convert to CreateIndexCommand
+ case CreateIndex(table, indexName, indexType, ignoreIfExists, columns,
properties, output)
+ if table.resolved && sparkAdapter.isHoodieTable(table, sparkSession) =>
+ CreateIndexCommand(
+ getTableIdentifier(table), indexName, indexType, ignoreIfExists,
columns, properties, output)
+
+ // Convert to DropIndexCommand
+ case DropIndex(table, indexName, ignoreIfNotExists, output)
+ if table.resolved && sparkAdapter.isHoodieTable(table, sparkSession) =>
+ DropIndexCommand(getTableIdentifier(table), indexName,
ignoreIfNotExists, output)
+
+ // Convert to ShowIndexesCommand
+ case ShowIndexes(table, output)
+ if table.resolved && sparkAdapter.isHoodieTable(table, sparkSession) =>
+ ShowIndexesCommand(getTableIdentifier(table), output)
+
+ // Covert to RefreshCommand
+ case RefreshIndex(table, indexName, output)
+ if table.resolved && sparkAdapter.isHoodieTable(table, sparkSession) =>
+ RefreshIndexCommand(getTableIdentifier(table), indexName, output)
+
case _ => plan
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala
new file mode 100644
index 0000000000..5d73af31a9
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.hudi.command
+
+import org.apache.hudi.common.index.HoodieIndex
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.{Row, SparkSession}
+
+case class CreateIndexCommand(
+ tableId: TableIdentifier,
+ indexName: String,
+ indexType: String,
+ ignoreIfExists: Boolean,
+ columns: Seq[(Attribute, Map[String, String])],
+ properties: Map[String, String],
+ override val output: Seq[Attribute]) extends IndexBaseCommand {
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ // The implementation for different index type
+ Seq.empty
+ }
+}
+
+case class DropIndexCommand(
+ tableId: TableIdentifier,
+ indexName: String,
+ ignoreIfNotExists: Boolean,
+ override val output: Seq[Attribute]) extends IndexBaseCommand {
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ // The implementation for different index type
+ Seq.empty
+ }
+}
+
+case class ShowIndexesCommand(
+ tableId: TableIdentifier,
+ override val output: Seq[Attribute]) extends IndexBaseCommand {
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ // The implementation for different index type
+ Seq.empty
+ }
+}
+
+case class RefreshIndexCommand(
+ tableId: TableIdentifier,
+ indexName: String,
+ override val output: Seq[Attribute]) extends IndexBaseCommand {
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ // The implementation for different index type
+ Seq.empty
+ }
+}
+
+abstract class IndexBaseCommand extends HoodieLeafRunnableCommand with Logging
{
+
+ /**
+ * Check hoodie index exists. In a hoodie table, hoodie index name
+ * must be unique, so the index name will be checked firstly,
+ *
+ * @param secondaryIndexes Current hoodie indexes
+ * @param indexName The index name to be checked
+ * @param colNames The column names to be checked
+ * @return true if the index exists
+ */
+ def indexExists(
+ secondaryIndexes: Option[Array[HoodieIndex]],
+ indexName: String,
+ indexType: Option[String] = None,
+ colNames: Option[Array[String]] = None): Boolean = {
+ secondaryIndexes.exists(i => {
+ i.exists(_.getIndexName.equals(indexName)) ||
+ // Index type and column name need to be checked if present
+ indexType.exists(t =>
+ colNames.exists(c =>
+ i.exists(index =>
+ index.getIndexType.name().equalsIgnoreCase(t) &&
index.getColNames.sameElements(c))))
+ })
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieSqlCommonAstBuilder.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieSqlCommonAstBuilder.scala
index 771798dd22..d0e5ed6133 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieSqlCommonAstBuilder.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieSqlCommonAstBuilder.scala
@@ -25,11 +25,12 @@ import
org.apache.hudi.spark.sql.parser.HoodieSqlCommonParser._
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute,
UnresolvedRelation}
import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
import org.apache.spark.sql.catalyst.parser.{ParseException, ParserInterface,
ParserUtils}
import org.apache.spark.sql.catalyst.plans.logical._
+import java.util.Locale
import scala.collection.JavaConverters._
class HoodieSqlCommonAstBuilder(session: SparkSession, delegate:
ParserInterface)
@@ -147,4 +148,144 @@ class HoodieSqlCommonAstBuilder(session: SparkSession,
delegate: ParserInterface
private def typedVisit[T](ctx: ParseTree): T = {
ctx.accept(this).asInstanceOf[T]
}
+
+ /**
+ * Create an index, returning a [[CreateIndex]] logical plan.
+ * For example:
+ * {{{
+ * CREATE INDEX index_name ON [TABLE] table_name [USING index_type]
(column_index_property_list)
+ * [OPTIONS indexPropertyList]
+ * column_index_property_list: column_name [OPTIONS(indexPropertyList)] [
, . . . ]
+ * indexPropertyList: index_property_name [= index_property_value] [ , .
. . ]
+ * }}}
+ */
+ override def visitCreateIndex(ctx: CreateIndexContext): LogicalPlan =
withOrigin(ctx) {
+ val (indexName, indexType) = if (ctx.identifier.size() == 1) {
+ (ctx.identifier(0).getText, "")
+ } else {
+ (ctx.identifier(0).getText, ctx.identifier(1).getText)
+ }
+
+ val columns = ctx.columns.multipartIdentifierProperty.asScala
+ .map(_.multipartIdentifier).map(typedVisit[Seq[String]]).toSeq
+ val columnsProperties = ctx.columns.multipartIdentifierProperty.asScala
+ .map(x =>
(Option(x.options).map(visitPropertyKeyValues).getOrElse(Map.empty))).toSeq
+ val options =
Option(ctx.indexOptions).map(visitPropertyKeyValues).getOrElse(Map.empty)
+
+ CreateIndex(
+ visitTableIdentifier(ctx.tableIdentifier()),
+ indexName,
+ indexType,
+ ctx.EXISTS != null,
+ columns.map(UnresolvedAttribute(_)).zip(columnsProperties),
+ options)
+ }
+
+ /**
+ * Drop an index, returning a [[DropIndex]] logical plan.
+ * For example:
+ * {{{
+ * DROP INDEX [IF EXISTS] index_name ON [TABLE] table_name
+ * }}}
+ */
+ override def visitDropIndex(ctx: DropIndexContext): LogicalPlan =
withOrigin(ctx) {
+ val indexName = ctx.identifier.getText
+ DropIndex(
+ visitTableIdentifier(ctx.tableIdentifier()),
+ indexName,
+ ctx.EXISTS != null)
+ }
+
+ /**
+ * Show indexes, returning a [[ShowIndexes]] logical plan.
+ * For example:
+ * {{{
+ * SHOW INDEXES (FROM | IN) [TABLE] table_name
+ * }}}
+ */
+ override def visitShowIndexes(ctx: ShowIndexesContext): LogicalPlan =
withOrigin(ctx) {
+ ShowIndexes(visitTableIdentifier(ctx.tableIdentifier()))
+ }
+
+ /**
+ * Refresh index, returning a [[RefreshIndex]] logical plan
+ * For example:
+ * {{{
+ * REFRESH INDEX index_name ON [TABLE] table_name
+ * }}}
+ */
+ override def visitRefreshIndex(ctx: RefreshIndexContext): LogicalPlan =
withOrigin(ctx) {
+ RefreshIndex(visitTableIdentifier(ctx.tableIdentifier()),
ctx.identifier.getText)
+ }
+
+ /**
+ * Convert a property list into a key-value map.
+ * This should be called through [[visitPropertyKeyValues]] or
[[visitPropertyKeys]].
+ */
+ override def visitPropertyList(
+ ctx: PropertyListContext): Map[String, String] = withOrigin(ctx) {
+ val properties = ctx.property.asScala.map { property =>
+ val key = visitPropertyKey(property.key)
+ val value = visitPropertyValue(property.value)
+ key -> value
+ }
+ // Check for duplicate property names.
+ checkDuplicateKeys(properties.toSeq, ctx)
+ properties.toMap
+ }
+
+ /**
+ * Parse a key-value map from a [[PropertyListContext]], assuming all values
are specified.
+ */
+ def visitPropertyKeyValues(ctx: PropertyListContext): Map[String, String] = {
+ val props = visitPropertyList(ctx)
+ val badKeys = props.collect { case (key, null) => key }
+ if (badKeys.nonEmpty) {
+ operationNotAllowed(
+ s"Values must be specified for key(s): ${badKeys.mkString("[", ",",
"]")}", ctx)
+ }
+ props
+ }
+
+ /**
+ * Parse a list of keys from a [[PropertyListContext]], assuming no values
are specified.
+ */
+ def visitPropertyKeys(ctx: PropertyListContext): Seq[String] = {
+ val props = visitPropertyList(ctx)
+ val badKeys = props.filter { case (_, v) => v != null }.keys
+ if (badKeys.nonEmpty) {
+ operationNotAllowed(
+ s"Values should not be specified for key(s): ${badKeys.mkString("[",
",", "]")}", ctx)
+ }
+ props.keys.toSeq
+ }
+
+ /**
+ * A property key can either be String or a collection of dot separated
elements. This
+ * function extracts the property key based on whether its a string literal
or a property
+ * identifier.
+ */
+ override def visitPropertyKey(key: PropertyKeyContext): String = {
+ if (key.STRING != null) {
+ string(key.STRING)
+ } else {
+ key.getText
+ }
+ }
+
+ /**
+ * A property value can be String, Integer, Boolean or Decimal. This
function extracts
+ * the property value based on whether its a string, integer, boolean or
decimal literal.
+ */
+ override def visitPropertyValue(value: PropertyValueContext): String = {
+ if (value == null) {
+ null
+ } else if (value.STRING != null) {
+ string(value.STRING)
+ } else if (value.booleanValue != null) {
+ value.getText.toLowerCase(Locale.ROOT)
+ } else {
+ value.getText
+ }
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestIndexSyntax.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestIndexSyntax.scala
new file mode 100644
index 0000000000..3536ae9e0a
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestIndexSyntax.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.index
+
+import org.apache.spark.sql.catalyst.analysis.Analyzer
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
+import org.apache.spark.sql.hudi.command.{CreateIndexCommand,
DropIndexCommand, ShowIndexesCommand}
+
+class TestIndexSyntax extends HoodieSparkSqlTestBase {
+
+ test("Test Create/Drop/Show/Refresh Index") {
+ withTempDir { tmp =>
+ Seq("cow", "mor").foreach { tableType =>
+ val tableName = generateTableName
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | options (
+ | primaryKey ='id',
+ | type = '$tableType',
+ | preCombineField = 'ts'
+ | )
+ | partitioned by(ts)
+ | location '$basePath'
+ """.stripMargin)
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+ spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
+
+ val sqlParser: ParserInterface = spark.sessionState.sqlParser
+ val analyzer: Analyzer = spark.sessionState.analyzer
+
+ var logicalPlan = sqlParser.parsePlan(s"show indexes from
default.$tableName")
+ var resolvedLogicalPlan = analyzer.execute(logicalPlan)
+
assertResult(s"`default`.`$tableName`")(resolvedLogicalPlan.asInstanceOf[ShowIndexesCommand].tableId.toString())
+
+ logicalPlan = sqlParser.parsePlan(s"create index idx_name on
$tableName using lucene (name) options(block_size=1024)")
+ resolvedLogicalPlan = analyzer.execute(logicalPlan)
+
assertResult(s"`default`.`$tableName`")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].tableId.toString())
+
assertResult("idx_name")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].indexName)
+
assertResult("lucene")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].indexType)
+
assertResult(false)(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].ignoreIfExists)
+ assertResult(Map("block_size" ->
"1024"))(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].properties)
+
+ logicalPlan = sqlParser.parsePlan(s"create index if not exists
idx_price on $tableName using lucene (price options(order='desc'))
options(block_size=512)")
+ resolvedLogicalPlan = analyzer.execute(logicalPlan)
+
assertResult(s"`default`.`$tableName`")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].tableId.toString())
+
assertResult("idx_price")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].indexName)
+
assertResult("lucene")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].indexType)
+ assertResult(Map("order" ->
"desc"))(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].columns.head._2)
+ assertResult(Map("block_size" ->
"512"))(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].properties)
+
+ logicalPlan = sqlParser.parsePlan(s"drop index if exists idx_name on
$tableName")
+ resolvedLogicalPlan = analyzer.execute(logicalPlan)
+
assertResult(s"`default`.`$tableName`")(resolvedLogicalPlan.asInstanceOf[DropIndexCommand].tableId.toString())
+
assertResult("idx_name")(resolvedLogicalPlan.asInstanceOf[DropIndexCommand].indexName)
+
assertResult(true)(resolvedLogicalPlan.asInstanceOf[DropIndexCommand].ignoreIfNotExists)
+ }
+ }
+ }
+}