This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.12.2-shadow in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 89370ae36846eabd2623284426e0a2df4bc746bc Author: 苏承祥 <[email protected]> AuthorDate: Wed Dec 7 20:03:30 2022 +0800 [HUDI-5314] add call help procedure (#7361) * add call help procedure Co-authored-by: 苏承祥 <[email protected]> --- .../hudi/spark/sql/parser/HoodieSqlCommon.g4 | 6 +- .../hudi/command/procedures/HelpProcedure.scala | 125 +++++++++++++++++++++ .../hudi/command/procedures/HoodieProcedures.scala | 5 + .../sql/parser/HoodieSqlCommonAstBuilder.scala | 21 ++-- .../sql/hudi/procedure/TestCommitsProcedure.scala | 2 +- .../sql/hudi/procedure/TestHelpProcedure.scala | 84 ++++++++++++++ 6 files changed, 231 insertions(+), 12 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlCommon.g4 b/hudi-spark-datasource/hudi-spark/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlCommon.g4 index 8643170f892..8a3106f7a56 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlCommon.g4 +++ b/hudi-spark-datasource/hudi-spark/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlCommon.g4 @@ -47,7 +47,7 @@ statement : compactionStatement #compactionCommand - | CALL multipartIdentifier '(' (callArgument (',' callArgument)*)? ')' #call + | CALL multipartIdentifier callArgumentList? #call | CREATE INDEX (IF NOT EXISTS)? identifier ON TABLE? tableIdentifier (USING indexType=identifier)? LEFT_PAREN columns=multipartIdentifierPropertyList RIGHT_PAREN @@ -69,6 +69,10 @@ : (db=IDENTIFIER '.')? table=IDENTIFIER ; + callArgumentList + : '(' (callArgument (',' callArgument)*)? ')' + ; + callArgument : expression #positionalArgument | identifier '=>' expression #namedArgument diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HelpProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HelpProcedure.scala new file mode 100644 index 00000000000..b17d068e818 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HelpProcedure.scala @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.exception.HoodieException +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util.function.Supplier + +class HelpProcedure extends BaseProcedure with ProcedureBuilder with Logging { + + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.optional(0, "cmd", DataTypes.StringType, None) + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("result", DataTypes.StringType, nullable = true, Metadata.empty) + )) + + + /** + * Returns the description of this procedure. + */ + override def description: String = s"The procedure help command allows you to view all the commands currently provided, as well as their parameters and output fields." + + def parameters: Array[ProcedureParameter] = PARAMETERS + + def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + val line = "\n" + val tab = "\t" + if (args.map.isEmpty) { + val procedures: Map[String, Supplier[ProcedureBuilder]] = HoodieProcedures.procedures() + val result = new StringBuilder + result.append("synopsis").append(line) + .append(tab).append("call [command]([key1]=>[value1],[key2]=>[value2])").append(line) + result.append("commands and description").append(line) + procedures.keySet.foreach(name => { + val builderSupplier: Option[Supplier[ProcedureBuilder]] = procedures.get(name) + if (builderSupplier.isDefined) { + val procedure: Procedure = builderSupplier.get.get().build + result.append(tab) + .append(name).append(tab) + .append(procedure.description).append(line) + } + }) + result.append("You can use 'call help(cmd=>[command])' to view the detailed parameters of the command").append(line) + Seq(Row(result.toString())) + } else { + val cmdOpt: Option[Any] = getArgValueOrDefault(args, PARAMETERS(0)) + assert(cmdOpt.isDefined, "The cmd parameter is required") + val cmd: String = cmdOpt.get.asInstanceOf[String] + val procedures: Map[String, Supplier[ProcedureBuilder]] = HoodieProcedures.procedures() + val builderSupplier: Option[Supplier[ProcedureBuilder]] = procedures.get(cmd.trim) + if (builderSupplier.isEmpty) { + throw new HoodieException(s"can not find $cmd command in procedures.") + } + val procedure: Procedure = builderSupplier.get.get().build + val result = new StringBuilder + + result.append("parameters:").append(line) + // set parameters header + result.append(tab) + .append(lengthFormat("param")).append(tab) + .append(lengthFormat("type_name")).append(tab) + .append(lengthFormat("default_value")).append(tab) + .append(lengthFormat("required")).append(line) + procedure.parameters.foreach(param => { + result.append(tab) + .append(lengthFormat(param.name)).append(tab) + .append(lengthFormat(param.dataType.typeName)).append(tab) + .append(lengthFormat(param.default.toString)).append(tab) + .append(lengthFormat(param.required.toString)).append(line) + }) + result.append("outputType:").append(line) + // set outputType header + result.append(tab) + .append(lengthFormat("name")).append(tab) + .append(lengthFormat("type_name")).append(tab) + .append(lengthFormat("nullable")).append(tab) + .append(lengthFormat("metadata")).append(line) + procedure.outputType.map(field => { + result.append(tab) + .append(lengthFormat(field.name)).append(tab) + .append(lengthFormat(field.dataType.typeName)).append(tab) + .append(lengthFormat(field.nullable.toString)).append(tab) + .append(lengthFormat(field.metadata.toString())).append(line) + }) + Seq(Row(result.toString())) + } + } + + def lengthFormat(string: String): String = { + String.format("%-30s", string) + } + + override def build = new HelpProcedure() +} + +object HelpProcedure { + val NAME = "help" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get() = new HelpProcedure() + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala index 3b38925455f..a53fec33fe9 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala @@ -28,6 +28,10 @@ object HoodieProcedures { if (builderSupplier.isDefined) builderSupplier.get.get() else null } + def procedures(): Map[String, Supplier[ProcedureBuilder]] = { + BUILDERS + } + private def initProcedureBuilders: Map[String, Supplier[ProcedureBuilder]] = { Map((RunCompactionProcedure.NAME, RunCompactionProcedure.builder) ,(ShowCompactionProcedure.NAME, ShowCompactionProcedure.builder) @@ -83,6 +87,7 @@ object HoodieProcedures { ,(CopyToTempView.NAME, CopyToTempView.builder) ,(ShowCommitExtraMetadataProcedure.NAME, ShowCommitExtraMetadataProcedure.builder) ,(ShowTablePropertiesProcedure.NAME, ShowTablePropertiesProcedure.builder) + ,(HelpProcedure.NAME, HelpProcedure.builder) ) } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieSqlCommonAstBuilder.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieSqlCommonAstBuilder.scala index d0e5ed61338..4005ef97e45 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieSqlCommonAstBuilder.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieSqlCommonAstBuilder.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation} import org.apache.spark.sql.catalyst.expressions.{Expression, Literal} -import org.apache.spark.sql.catalyst.parser.{ParseException, ParserInterface, ParserUtils} +import org.apache.spark.sql.catalyst.parser.{ParserInterface, ParserUtils} import org.apache.spark.sql.catalyst.plans.logical._ import java.util.Locale @@ -92,13 +92,14 @@ class HoodieSqlCommonAstBuilder(session: SparkSession, delegate: ParserInterface } override def visitCall(ctx: CallContext): LogicalPlan = withOrigin(ctx) { - if (ctx.callArgument().isEmpty) { - throw new ParseException(s"Procedures arguments is empty", ctx) + if (ctx.callArgumentList() == null || ctx.callArgumentList().callArgument() == null || ctx.callArgumentList().callArgument().size() == 0) { + val name: Seq[String] = ctx.multipartIdentifier().parts.asScala.map(_.getText) + CallCommand(name, Seq()) + } else { + val name: Seq[String] = ctx.multipartIdentifier().parts.asScala.map(_.getText) + val args: Seq[CallArgument] = ctx.callArgumentList().callArgument().asScala.map(typedVisit[CallArgument]) + CallCommand(name, args) } - - val name: Seq[String] = ctx.multipartIdentifier().parts.asScala.map(_.getText) - val args: Seq[CallArgument] = ctx.callArgument().asScala.map(typedVisit[CallArgument]) - CallCommand(name, args) } /** @@ -167,9 +168,9 @@ class HoodieSqlCommonAstBuilder(session: SparkSession, delegate: ParserInterface } val columns = ctx.columns.multipartIdentifierProperty.asScala - .map(_.multipartIdentifier).map(typedVisit[Seq[String]]).toSeq + .map(_.multipartIdentifier).map(typedVisit[Seq[String]]).toSeq val columnsProperties = ctx.columns.multipartIdentifierProperty.asScala - .map(x => (Option(x.options).map(visitPropertyKeyValues).getOrElse(Map.empty))).toSeq + .map(x => (Option(x.options).map(visitPropertyKeyValues).getOrElse(Map.empty))).toSeq val options = Option(ctx.indexOptions).map(visitPropertyKeyValues).getOrElse(Map.empty) CreateIndex( @@ -223,7 +224,7 @@ class HoodieSqlCommonAstBuilder(session: SparkSession, delegate: ParserInterface * This should be called through [[visitPropertyKeyValues]] or [[visitPropertyKeys]]. */ override def visitPropertyList( - ctx: PropertyListContext): Map[String, String] = withOrigin(ctx) { + ctx: PropertyListContext): Map[String, String] = withOrigin(ctx) { val properties = ctx.property.asScala.map { property => val key = visitPropertyKey(property.key) val value = visitPropertyValue(property.value) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCommitsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCommitsProcedure.scala index 03cf26800df..febcd452799 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCommitsProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCommitsProcedure.scala @@ -310,7 +310,7 @@ class TestCommitsProcedure extends HoodieSparkProcedureTestBase { // Check required fields checkExceptionContain(s"""call show_commit_extra_metadata()""")( - s"arguments is empty") + s"Argument: table is required") // collect commits for table val commits = spark.sql(s"""call show_commits(table => '$tableName', limit => 10)""").collect() diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestHelpProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestHelpProcedure.scala new file mode 100644 index 00000000000..2150682f893 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestHelpProcedure.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.procedure + +import org.apache.spark.sql.Row +import org.apache.spark.sql.hudi.command.procedures.{HoodieProcedures, Procedure, ProcedureBuilder} + +import java.util +import java.util.function.Supplier + +class TestHelpProcedure extends HoodieSparkProcedureTestBase { + + test("Test Call help Procedure with no params") { + val help: util.List[Row] = spark.sql("call help").collectAsList() + assert(help.size() == 1) + + val help2: util.List[Row] = spark.sql("call help()").collectAsList() + assert(help2.size() == 1) + + assert(help.get(0).toString().equals(help2.get(0).toString())) + + + val helpStr: String = help.get(0).toString() + val procedures: Map[String, Supplier[ProcedureBuilder]] = HoodieProcedures.procedures() + + // check all procedures + procedures.keySet.foreach(name => { + // check cmd contains all procedure name + assert(helpStr.contains(name)) + // check cmd contains all procedure description + val builderSupplier: Option[Supplier[ProcedureBuilder]] = procedures.get(name) + assert(builderSupplier.isDefined) + val procedure: Procedure = builderSupplier.get.get().build + assert(helpStr.contains(procedure.description)) + }) + } + + + test("Test Call help Procedure with params") { + + // check not valid params + checkExceptionContain("call help(not_valid=>true)")("The cmd parameter is required") + + val procedures: Map[String, Supplier[ProcedureBuilder]] = HoodieProcedures.procedures() + + // check all procedures + procedures.keySet.foreach(name => { + val help: util.List[Row] = spark.sql(s"call help(cmd=>'$name')").collectAsList() + assert(help.size() == 1) + + val helpStr: String = help.get(0).toString() + val builderSupplier: Option[Supplier[ProcedureBuilder]] = procedures.get(name) + + assert(builderSupplier.isDefined) + + // check result contains params + val procedure: Procedure = builderSupplier.get.get().build + procedure.parameters.foreach(params => { + assert(helpStr.contains(params.name)) + }) + + // check result contains outputType + procedure.outputType.foreach(output => { + assert(helpStr.contains(output.name)) + }) + }) + } + +}
