This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new fec49dc12b [HUDI-4165] Support Create/Drop/Show/Refresh Index Syntax 
for Spark SQL (#5761)
fec49dc12b is described below

commit fec49dc12b19801a75c3f61c9da0d47ef0c1d2f8
Author: huberylee <[email protected]>
AuthorDate: Fri Jun 17 18:33:58 2022 +0800

    [HUDI-4165] Support Create/Drop/Show/Refresh Index Syntax for Spark SQL 
(#5761)
    
    * Support Create/Drop/Show/Refresh Index Syntax for Spark SQL
---
 .../org/apache/hudi/common/index/HoodieIndex.java  | 119 +++++++++++++++++
 .../apache/hudi/common/index/HoodieIndexType.java  |  54 ++++++++
 .../hudi/spark/sql/parser/HoodieSqlCommon.g4       |  76 ++++++++++-
 .../spark/sql/catalyst/plans/logical/Index.scala   | 111 ++++++++++++++++
 .../spark/sql/hudi/analysis/HoodieAnalysis.scala   |  22 ++++
 .../spark/sql/hudi/command/IndexCommands.scala     | 101 +++++++++++++++
 .../sql/parser/HoodieSqlCommonAstBuilder.scala     | 143 ++++++++++++++++++++-
 .../sql/hudi/command/index/TestIndexSyntax.scala   |  85 ++++++++++++
 8 files changed, 709 insertions(+), 2 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/index/HoodieIndex.java 
b/hudi-common/src/main/java/org/apache/hudi/common/index/HoodieIndex.java
new file mode 100644
index 0000000000..6dabb1a41f
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/index/HoodieIndex.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.index;
+
+import java.util.Arrays;
+import java.util.Map;
+
+public class HoodieIndex {
+  private String indexName;
+  private String[] colNames;
+  private HoodieIndexType indexType;
+  private Map<String, Map<String, String>> colOptions;
+  private Map<String, String> options;
+
+  public HoodieIndex() {
+  }
+
+  public HoodieIndex(
+      String indexName,
+      String[] colNames,
+      HoodieIndexType indexType,
+      Map<String, Map<String, String>> colOptions,
+      Map<String, String> options) {
+    this.indexName = indexName;
+    this.colNames = colNames;
+    this.indexType = indexType;
+    this.colOptions = colOptions;
+    this.options = options;
+  }
+
+  public String getIndexName() {
+    return indexName;
+  }
+
+  public String[] getColNames() {
+    return colNames;
+  }
+
+  public HoodieIndexType getIndexType() {
+    return indexType;
+  }
+
+  public Map<String, Map<String, String>> getColOptions() {
+    return colOptions;
+  }
+
+  public Map<String, String> getOptions() {
+    return options;
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  @Override
+  public String toString() {
+    return "HoodieIndex{"
+        + "indexName='" + indexName + '\''
+        + ", colNames='" + Arrays.toString(colNames) + '\''
+        + ", indexType=" + indexType
+        + ", colOptions=" + colOptions
+        + ", options=" + options
+        + '}';
+  }
+
+  public static class Builder {
+    private String indexName;
+    private String[] colNames;
+    private HoodieIndexType indexType;
+    private Map<String, Map<String, String>> colOptions;
+    private Map<String, String> options;
+
+    public Builder setIndexName(String indexName) {
+      this.indexName = indexName;
+      return this;
+    }
+
+    public Builder setColNames(String[] colNames) {
+      this.colNames = colNames;
+      return this;
+    }
+
+    public Builder setIndexType(String indexType) {
+      this.indexType = HoodieIndexType.of(indexType);
+      return this;
+    }
+
+    public Builder setColOptions(Map<String, Map<String, String>> colOptions) {
+      this.colOptions = colOptions;
+      return this;
+    }
+
+    public Builder setOptions(Map<String, String> options) {
+      this.options = options;
+      return this;
+    }
+
+    public HoodieIndex build() {
+      return new HoodieIndex(indexName, colNames, indexType, colOptions, 
options);
+    }
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/index/HoodieIndexType.java 
b/hudi-common/src/main/java/org/apache/hudi/common/index/HoodieIndexType.java
new file mode 100644
index 0000000000..03618a7679
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/index/HoodieIndexType.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.index;
+
+import org.apache.hudi.exception.HoodieIndexException;
+
+import java.util.Arrays;
+
+public enum HoodieIndexType {
+  LUCENE((byte) 1);
+
+  private final byte type;
+
+  HoodieIndexType(byte type) {
+    this.type = type;
+  }
+
+  public byte getValue() {
+    return type;
+  }
+
+  public static HoodieIndexType of(byte indexType) {
+    return Arrays.stream(HoodieIndexType.values())
+        .filter(t -> t.type == indexType)
+        .findAny()
+        .orElseThrow(() ->
+            new HoodieIndexException("Unknown hoodie index type:" + 
indexType));
+  }
+
+  public static HoodieIndexType of(String indexType) {
+    return Arrays.stream(HoodieIndexType.values())
+        .filter(t -> t.name().equals(indexType.toUpperCase()))
+        .findAny()
+        .orElseThrow(() ->
+            new HoodieIndexException("Unknown hoodie index type:" + 
indexType));
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlCommon.g4
 
b/hudi-spark-datasource/hudi-spark/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlCommon.g4
index 0cde14a4e4..65e17bfb46 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlCommon.g4
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlCommon.g4
@@ -48,6 +48,13 @@
  statement
     : compactionStatement                                                      
 #compactionCommand
     | CALL multipartIdentifier '(' (callArgument (',' callArgument)*)? ')'     
 #call
+    | CREATE INDEX (IF NOT EXISTS)? identifier ON TABLE?
+          tableIdentifier (USING indexType=identifier)?
+          LEFT_PAREN columns=multipartIdentifierPropertyList RIGHT_PAREN
+          (OPTIONS indexOptions=propertyList)?                                 
 #createIndex
+    | DROP INDEX (IF EXISTS)? identifier ON TABLE? tableIdentifier             
 #dropIndex
+    | SHOW INDEXES (FROM | IN) TABLE? tableIdentifier                          
 #showIndexes
+    | REFRESH INDEX identifier ON TABLE? tableIdentifier                       
 #refreshIndex
     | .*?                                                                      
 #passThrough
     ;
 
@@ -99,6 +106,14 @@
     | MINUS? BIGDECIMAL_LITERAL       #bigDecimalLiteral
     ;
 
+ multipartIdentifierPropertyList
+     : multipartIdentifierProperty (COMMA multipartIdentifierProperty)*
+     ;
+
+ multipartIdentifierProperty
+     : multipartIdentifier (OPTIONS options=propertyList)?
+     ;
+
  multipartIdentifier
     : parts+=identifier ('.' parts+=identifier)*
     ;
@@ -114,9 +129,53 @@
     ;
 
  nonReserved
-     : CALL | COMPACTION | RUN | SCHEDULE | ON | SHOW | LIMIT
+     : CALL
+     | COMPACTION
+     | CREATE
+     | DROP
+     | EXISTS
+     | FROM
+     | IN
+     | INDEX
+     | INDEXES
+     | IF
+     | LIMIT
+     | NOT
+     | ON
+     | OPTIONS
+     | REFRESH
+     | RUN
+     | SCHEDULE
+     | SHOW
+     | TABLE
+     | USING
      ;
 
+ propertyList
+     : LEFT_PAREN property (COMMA property)* RIGHT_PAREN
+     ;
+
+ property
+     : key=propertyKey (EQ? value=propertyValue)?
+     ;
+
+ propertyKey
+     : identifier (DOT identifier)*
+     | STRING
+     ;
+
+ propertyValue
+     : INTEGER_VALUE
+     | DECIMAL_VALUE
+     | booleanValue
+     | STRING
+     ;
+
+ LEFT_PAREN: '(';
+ RIGHT_PAREN: ')';
+ COMMA: ',';
+ DOT: '.';
+
  ALL: 'ALL';
  AT: 'AT';
  CALL: 'CALL';
@@ -132,6 +191,21 @@
  FALSE: 'FALSE';
  INTERVAL: 'INTERVAL';
  TO: 'TO';
+ CREATE: 'CREATE';
+ INDEX: 'INDEX';
+ INDEXES: 'INDEXES';
+ IF: 'IF';
+ NOT: 'NOT';
+ EXISTS: 'EXISTS';
+ TABLE: 'TABLE';
+ USING: 'USING';
+ OPTIONS: 'OPTIONS';
+ DROP: 'DROP';
+ FROM: 'FROM';
+ IN: 'IN';
+ REFRESH: 'REFRESH';
+
+ EQ: '=' | '==';
 
  PLUS: '+';
  MINUS: '-';
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Index.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Index.scala
new file mode 100644
index 0000000000..12ee2e8058
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Index.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
+import org.apache.spark.sql.types.StringType
+
+/**
+ * The logical plan of the CREATE INDEX command.
+ */
+case class CreateIndex(
+    table: LogicalPlan,
+    indexName: String,
+    indexType: String,
+    ignoreIfExists: Boolean,
+    columns: Seq[(Attribute, Map[String, String])],
+    properties: Map[String, String],
+    override val output: Seq[Attribute] = CreateIndex.getOutputAttrs) extends 
Command {
+
+  override def children: Seq[LogicalPlan] = Seq(table)
+
+  override lazy val resolved: Boolean = table.resolved && 
columns.forall(_._1.resolved)
+
+  def withNewChildrenInternal(newChild: IndexedSeq[LogicalPlan]): CreateIndex 
= {
+    copy(table = newChild.head)
+  }
+}
+
+object CreateIndex {
+  def getOutputAttrs: Seq[Attribute] = Seq.empty
+}
+
+/**
+ * The logical plan of the DROP INDEX command.
+ */
+case class DropIndex(
+    table: LogicalPlan,
+    indexName: String,
+    ignoreIfNotExists: Boolean,
+    override val output: Seq[Attribute] = DropIndex.getOutputAttrs) extends 
Command {
+
+  override def children: Seq[LogicalPlan] = Seq(table)
+
+  def withNewChildrenInternal(newChild: IndexedSeq[LogicalPlan]): DropIndex = {
+    copy(table = newChild.head)
+  }
+}
+
+object DropIndex {
+  def getOutputAttrs: Seq[Attribute] = Seq.empty
+}
+
+/**
+ * The logical plan of the SHOW INDEXES command.
+ */
+case class ShowIndexes(
+    table: LogicalPlan,
+    override val output: Seq[Attribute] = ShowIndexes.getOutputAttrs) extends 
Command {
+
+  override def children: Seq[LogicalPlan] = Seq(table)
+
+  def withNewChildrenInternal(newChild: IndexedSeq[LogicalPlan]): ShowIndexes 
= {
+    copy(table = newChild.head)
+  }
+}
+
+object ShowIndexes {
+  def getOutputAttrs: Seq[Attribute] = Seq(
+    AttributeReference("index_name", StringType, nullable = false)(),
+    AttributeReference("col_name", StringType, nullable = false)(),
+    AttributeReference("index_type", StringType, nullable = false)(),
+    AttributeReference("col_options", StringType, nullable = true)(),
+    AttributeReference("options", StringType, nullable = true)()
+  )
+}
+
+/**
+ * The logical plan of the REFRESH INDEX command.
+ */
+case class RefreshIndex(
+    table: LogicalPlan,
+    indexName: String,
+    override val output: Seq[Attribute] = RefreshIndex.getOutputAttrs) extends 
Command {
+
+  override def children: Seq[LogicalPlan] = Seq(table)
+
+  def withNewChildrenInternal(newChild: IndexedSeq[LogicalPlan]): RefreshIndex 
= {
+    copy(table = newChild.head)
+  }
+}
+
+object RefreshIndex {
+  def getOutputAttrs: Seq[Attribute] = Seq.empty
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
index 97e453ff7e..a44abc72cf 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
@@ -171,6 +171,28 @@ case class HoodieAnalysis(sparkSession: SparkSession) 
extends Rule[LogicalPlan]
         } else {
           c
         }
+
+      // Convert to CreateIndexCommand
+      case CreateIndex(table, indexName, indexType, ignoreIfExists, columns, 
properties, output)
+        if table.resolved && sparkAdapter.isHoodieTable(table, sparkSession) =>
+        CreateIndexCommand(
+          getTableIdentifier(table), indexName, indexType, ignoreIfExists, 
columns, properties, output)
+
+      // Convert to DropIndexCommand
+      case DropIndex(table, indexName, ignoreIfNotExists, output)
+        if table.resolved && sparkAdapter.isHoodieTable(table, sparkSession) =>
+        DropIndexCommand(getTableIdentifier(table), indexName, 
ignoreIfNotExists, output)
+
+      // Convert to ShowIndexesCommand
+      case ShowIndexes(table, output)
+        if table.resolved && sparkAdapter.isHoodieTable(table, sparkSession) =>
+        ShowIndexesCommand(getTableIdentifier(table), output)
+
+      // Covert to RefreshCommand
+      case RefreshIndex(table, indexName, output)
+        if table.resolved && sparkAdapter.isHoodieTable(table, sparkSession) =>
+        RefreshIndexCommand(getTableIdentifier(table), indexName, output)
+
       case _ => plan
     }
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala
new file mode 100644
index 0000000000..5d73af31a9
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.hudi.command
+
+import org.apache.hudi.common.index.HoodieIndex
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.{Row, SparkSession}
+
+case class CreateIndexCommand(
+    tableId: TableIdentifier,
+    indexName: String,
+    indexType: String,
+    ignoreIfExists: Boolean,
+    columns: Seq[(Attribute, Map[String, String])],
+    properties: Map[String, String],
+    override val output: Seq[Attribute]) extends IndexBaseCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    // The implementation for different index type
+    Seq.empty
+  }
+}
+
+case class DropIndexCommand(
+    tableId: TableIdentifier,
+    indexName: String,
+    ignoreIfNotExists: Boolean,
+    override val output: Seq[Attribute]) extends IndexBaseCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    // The implementation for different index type
+    Seq.empty
+  }
+}
+
+case class ShowIndexesCommand(
+    tableId: TableIdentifier,
+    override val output: Seq[Attribute]) extends IndexBaseCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    // The implementation for different index type
+    Seq.empty
+  }
+}
+
+case class RefreshIndexCommand(
+    tableId: TableIdentifier,
+    indexName: String,
+    override val output: Seq[Attribute]) extends IndexBaseCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    // The implementation for different index type
+    Seq.empty
+  }
+}
+
+abstract class IndexBaseCommand extends HoodieLeafRunnableCommand with Logging 
{
+
+  /**
+   * Check hoodie index exists. In a hoodie table, hoodie index name
+   * must be unique, so the index name will be checked firstly,
+   *
+   * @param secondaryIndexes Current hoodie indexes
+   * @param indexName        The index name to be checked
+   * @param colNames         The column names to be checked
+   * @return true if the index exists
+   */
+  def indexExists(
+      secondaryIndexes: Option[Array[HoodieIndex]],
+      indexName: String,
+      indexType: Option[String] = None,
+      colNames: Option[Array[String]] = None): Boolean = {
+    secondaryIndexes.exists(i => {
+      i.exists(_.getIndexName.equals(indexName)) ||
+          // Index type and column name need to be checked if present
+          indexType.exists(t =>
+            colNames.exists(c =>
+              i.exists(index =>
+                index.getIndexType.name().equalsIgnoreCase(t) && 
index.getColNames.sameElements(c))))
+    })
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieSqlCommonAstBuilder.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieSqlCommonAstBuilder.scala
index 771798dd22..d0e5ed6133 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieSqlCommonAstBuilder.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieSqlCommonAstBuilder.scala
@@ -25,11 +25,12 @@ import 
org.apache.hudi.spark.sql.parser.HoodieSqlCommonParser._
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, 
UnresolvedRelation}
 import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
 import org.apache.spark.sql.catalyst.parser.{ParseException, ParserInterface, 
ParserUtils}
 import org.apache.spark.sql.catalyst.plans.logical._
 
+import java.util.Locale
 import scala.collection.JavaConverters._
 
 class HoodieSqlCommonAstBuilder(session: SparkSession, delegate: 
ParserInterface)
@@ -147,4 +148,144 @@ class HoodieSqlCommonAstBuilder(session: SparkSession, 
delegate: ParserInterface
   private def typedVisit[T](ctx: ParseTree): T = {
     ctx.accept(this).asInstanceOf[T]
   }
+
+  /**
+   * Create an index, returning a [[CreateIndex]] logical plan.
+   * For example:
+   * {{{
+   * CREATE INDEX index_name ON [TABLE] table_name [USING index_type] 
(column_index_property_list)
+   *   [OPTIONS indexPropertyList]
+   *   column_index_property_list: column_name [OPTIONS(indexPropertyList)]  [ 
,  . . . ]
+   *   indexPropertyList: index_property_name [= index_property_value] [ ,  . 
. . ]
+   * }}}
+   */
+  override def visitCreateIndex(ctx: CreateIndexContext): LogicalPlan = 
withOrigin(ctx) {
+    val (indexName, indexType) = if (ctx.identifier.size() == 1) {
+      (ctx.identifier(0).getText, "")
+    } else {
+      (ctx.identifier(0).getText, ctx.identifier(1).getText)
+    }
+
+    val columns = ctx.columns.multipartIdentifierProperty.asScala
+        .map(_.multipartIdentifier).map(typedVisit[Seq[String]]).toSeq
+    val columnsProperties = ctx.columns.multipartIdentifierProperty.asScala
+        .map(x => 
(Option(x.options).map(visitPropertyKeyValues).getOrElse(Map.empty))).toSeq
+    val options = 
Option(ctx.indexOptions).map(visitPropertyKeyValues).getOrElse(Map.empty)
+
+    CreateIndex(
+      visitTableIdentifier(ctx.tableIdentifier()),
+      indexName,
+      indexType,
+      ctx.EXISTS != null,
+      columns.map(UnresolvedAttribute(_)).zip(columnsProperties),
+      options)
+  }
+
+  /**
+   * Drop an index, returning a [[DropIndex]] logical plan.
+   * For example:
+   * {{{
+   *   DROP INDEX [IF EXISTS] index_name ON [TABLE] table_name
+   * }}}
+   */
+  override def visitDropIndex(ctx: DropIndexContext): LogicalPlan = 
withOrigin(ctx) {
+    val indexName = ctx.identifier.getText
+    DropIndex(
+      visitTableIdentifier(ctx.tableIdentifier()),
+      indexName,
+      ctx.EXISTS != null)
+  }
+
+  /**
+   * Show indexes, returning a [[ShowIndexes]] logical plan.
+   * For example:
+   * {{{
+   *   SHOW INDEXES (FROM | IN) [TABLE] table_name
+   * }}}
+   */
+  override def visitShowIndexes(ctx: ShowIndexesContext): LogicalPlan = 
withOrigin(ctx) {
+    ShowIndexes(visitTableIdentifier(ctx.tableIdentifier()))
+  }
+
+  /**
+   * Refresh index, returning a [[RefreshIndex]] logical plan
+   * For example:
+   * {{{
+   *   REFRESH INDEX index_name ON [TABLE] table_name
+   * }}}
+   */
+  override def visitRefreshIndex(ctx: RefreshIndexContext): LogicalPlan = 
withOrigin(ctx) {
+    RefreshIndex(visitTableIdentifier(ctx.tableIdentifier()), 
ctx.identifier.getText)
+  }
+
+  /**
+   * Convert a property list into a key-value map.
+   * This should be called through [[visitPropertyKeyValues]] or 
[[visitPropertyKeys]].
+   */
+  override def visitPropertyList(
+      ctx: PropertyListContext): Map[String, String] = withOrigin(ctx) {
+    val properties = ctx.property.asScala.map { property =>
+      val key = visitPropertyKey(property.key)
+      val value = visitPropertyValue(property.value)
+      key -> value
+    }
+    // Check for duplicate property names.
+    checkDuplicateKeys(properties.toSeq, ctx)
+    properties.toMap
+  }
+
+  /**
+   * Parse a key-value map from a [[PropertyListContext]], assuming all values 
are specified.
+   */
+  def visitPropertyKeyValues(ctx: PropertyListContext): Map[String, String] = {
+    val props = visitPropertyList(ctx)
+    val badKeys = props.collect { case (key, null) => key }
+    if (badKeys.nonEmpty) {
+      operationNotAllowed(
+        s"Values must be specified for key(s): ${badKeys.mkString("[", ",", 
"]")}", ctx)
+    }
+    props
+  }
+
+  /**
+   * Parse a list of keys from a [[PropertyListContext]], assuming no values 
are specified.
+   */
+  def visitPropertyKeys(ctx: PropertyListContext): Seq[String] = {
+    val props = visitPropertyList(ctx)
+    val badKeys = props.filter { case (_, v) => v != null }.keys
+    if (badKeys.nonEmpty) {
+      operationNotAllowed(
+        s"Values should not be specified for key(s): ${badKeys.mkString("[", 
",", "]")}", ctx)
+    }
+    props.keys.toSeq
+  }
+
+  /**
+   * A property key can either be String or a collection of dot separated 
elements. This
+   * function extracts the property key based on whether its a string literal 
or a property
+   * identifier.
+   */
+  override def visitPropertyKey(key: PropertyKeyContext): String = {
+    if (key.STRING != null) {
+      string(key.STRING)
+    } else {
+      key.getText
+    }
+  }
+
+  /**
+   * A property value can be String, Integer, Boolean or Decimal. This 
function extracts
+   * the property value based on whether its a string, integer, boolean or 
decimal literal.
+   */
+  override def visitPropertyValue(value: PropertyValueContext): String = {
+    if (value == null) {
+      null
+    } else if (value.STRING != null) {
+      string(value.STRING)
+    } else if (value.booleanValue != null) {
+      value.getText.toLowerCase(Locale.ROOT)
+    } else {
+      value.getText
+    }
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestIndexSyntax.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestIndexSyntax.scala
new file mode 100644
index 0000000000..3536ae9e0a
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestIndexSyntax.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.index
+
+import org.apache.spark.sql.catalyst.analysis.Analyzer
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
+import org.apache.spark.sql.hudi.command.{CreateIndexCommand, 
DropIndexCommand, ShowIndexesCommand}
+
+class TestIndexSyntax extends HoodieSparkSqlTestBase {
+
+  test("Test Create/Drop/Show/Refresh Index") {
+    withTempDir { tmp =>
+      Seq("cow", "mor").foreach { tableType =>
+        val tableName = generateTableName
+        val basePath = s"${tmp.getCanonicalPath}/$tableName"
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  ts long
+             |) using hudi
+             | options (
+             |  primaryKey ='id',
+             |  type = '$tableType',
+             |  preCombineField = 'ts'
+             | )
+             | partitioned by(ts)
+             | location '$basePath'
+       """.stripMargin)
+        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+        spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
+        spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
+
+        val sqlParser: ParserInterface = spark.sessionState.sqlParser
+        val analyzer: Analyzer = spark.sessionState.analyzer
+
+        var logicalPlan = sqlParser.parsePlan(s"show indexes from 
default.$tableName")
+        var resolvedLogicalPlan = analyzer.execute(logicalPlan)
+        
assertResult(s"`default`.`$tableName`")(resolvedLogicalPlan.asInstanceOf[ShowIndexesCommand].tableId.toString())
+
+        logicalPlan = sqlParser.parsePlan(s"create index idx_name on 
$tableName using lucene (name) options(block_size=1024)")
+        resolvedLogicalPlan = analyzer.execute(logicalPlan)
+        
assertResult(s"`default`.`$tableName`")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].tableId.toString())
+        
assertResult("idx_name")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].indexName)
+        
assertResult("lucene")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].indexType)
+        
assertResult(false)(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].ignoreIfExists)
+        assertResult(Map("block_size" -> 
"1024"))(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].properties)
+
+        logicalPlan = sqlParser.parsePlan(s"create index if not exists 
idx_price on $tableName using lucene (price options(order='desc')) 
options(block_size=512)")
+        resolvedLogicalPlan = analyzer.execute(logicalPlan)
+        
assertResult(s"`default`.`$tableName`")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].tableId.toString())
+        
assertResult("idx_price")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].indexName)
+        
assertResult("lucene")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].indexType)
+        assertResult(Map("order" -> 
"desc"))(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].columns.head._2)
+        assertResult(Map("block_size" -> 
"512"))(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].properties)
+
+        logicalPlan = sqlParser.parsePlan(s"drop index if exists idx_name on 
$tableName")
+        resolvedLogicalPlan = analyzer.execute(logicalPlan)
+        
assertResult(s"`default`.`$tableName`")(resolvedLogicalPlan.asInstanceOf[DropIndexCommand].tableId.toString())
+        
assertResult("idx_name")(resolvedLogicalPlan.asInstanceOf[DropIndexCommand].indexName)
+        
assertResult(true)(resolvedLogicalPlan.asInstanceOf[DropIndexCommand].ignoreIfNotExists)
+      }
+    }
+  }
+}

Reply via email to