This is an automated email from the ASF dual-hosted git repository.

paullin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new d4320e7a6 [KYUUBI #5505][FLINK] Support HELP command
d4320e7a6 is described below

commit d4320e7a6b6058d572732bbc41a0b5dc6ed1417e
Author: Xianxun Ye <[email protected]>
AuthorDate: Tue Nov 7 15:09:57 2023 +0800

    [KYUUBI #5505][FLINK] Support HELP command
    
    ### _Why are the changes needed?_
    
    resolve: #5505
    
    Show available commands when users type 'HELP;' in the beeline.
    
    #### Solution:
    
    - Using `ExtendedParser` parse statement and return Operation of Flink 
engine.
    - Check whether the operation is HelpOperation or not.
    - dependency on `flink-table-planner-loader.jar`.
    
    #### **Why not using Flink SQL Client Parser(SqlCommandParserImpl) to 
obtain the Command enum?**
    Flink 1.16's approach:
    ```
    val opt:Optional[Operation] = 
org.apache.flink.table.client.cli.SqlCommandParserImpl.parseCommand()
    check opt.get() instance of HelpOperation or not
    if yes return CliStrings.MESSAGE_HELP
    ```
    
    Flink 1.17 & 1.18
    ```
    val opt: Optional[Command] = 
org.apache.flink.table.client.cli.parser.SqlCommandParserImpl.parseStatement()
    check opt.get() is Command.HELP or not
    if yes return CliStrings.MESSAGE_HELP
    ```
    
    The `Command` class is added from Flink1.17;
    The `SqlCommandParserImpl` package is changed, and the method name is 
changed from Flink1.17;
    
    This approach requires distinguishing between different Flink versions and 
maintaining both implementations.
    It's more complicated, so abandoned.
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
 locally before make a pull request
    
    ### _Was this patch authored or co-authored using generative AI tooling?_
    
    Closes #5585 from YesOrNo828/help.
    
    Closes #5505
    
    e73b15e43 [Xianxun Ye] [KYUUBI #5505] [FLINK] Add 
job,update,delete,truncate and call statements
    5943dd072 [Xianxun Ye] [KYUUBI #5505] [FLINK] Add help messages
    fdc2db6ab [Xianxun Ye] [KYUUBI #5505] [FLINK] Support HELP command, fixed 
Pan's comments
    a728048fc [Xianxun Ye] [KYUUBI #5505] [FLINK] Support HELP command, fixed 
Pan's comments
    6323cfb85 [Xianxun Ye] [KYUUBI #5505] [FLINK] Support HELP command
    
    Authored-by: Xianxun Ye <[email protected]>
    Signed-off-by: Paul Lin <[email protected]>
---
 externals/kyuubi-flink-sql-engine/pom.xml          |   6 +
 .../engine/flink/operation/ExecuteStatement.scala  |  46 ++++
 .../engine/flink/result/CommandStrings.scala       | 245 +++++++++++++++++++++
 .../kyuubi/engine/flink/result/ResultSetUtil.scala |   7 +
 .../flink/operation/FlinkOperationSuite.scala      |  12 +-
 pom.xml                                            |   6 +
 6 files changed, 321 insertions(+), 1 deletion(-)

diff --git a/externals/kyuubi-flink-sql-engine/pom.xml 
b/externals/kyuubi-flink-sql-engine/pom.xml
index eec5c1cd9..1e12b82d0 100644
--- a/externals/kyuubi-flink-sql-engine/pom.xml
+++ b/externals/kyuubi-flink-sql-engine/pom.xml
@@ -105,6 +105,12 @@
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-planner-loader</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
         <!-- tests -->
         <dependency>
             <groupId>org.apache.kyuubi</groupId>
diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
index 0e0c476e2..f30b6ab86 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
@@ -17,10 +17,15 @@
 
 package org.apache.kyuubi.engine.flink.operation
 
+import java.util.Optional
+
 import scala.concurrent.duration.Duration
 
 import org.apache.flink.api.common.JobID
+import org.apache.flink.table.api.TableException
 import org.apache.flink.table.gateway.api.operation.OperationHandle
+import org.apache.flink.table.operations.Operation
+import org.apache.flink.table.operations.command.HelpOperation
 
 import org.apache.kyuubi.Logging
 import org.apache.kyuubi.engine.flink.FlinkEngineUtils
@@ -28,6 +33,7 @@ import org.apache.kyuubi.engine.flink.result.ResultSetUtil
 import org.apache.kyuubi.operation.OperationState
 import org.apache.kyuubi.operation.log.OperationLog
 import org.apache.kyuubi.session.Session
+import org.apache.kyuubi.util.reflect.{DynConstructors, DynFields, DynMethods}
 
 class ExecuteStatement(
     session: Session,
@@ -59,6 +65,14 @@ class ExecuteStatement(
   private def executeStatement(): Unit = {
     try {
       setState(OperationState.RUNNING)
+
+      val operation = parseExtendedStatement(statement)
+      if (operation.isPresent && operation.get().isInstanceOf[HelpOperation]) {
+        resultSet = ResultSetUtil.helpMessageResultSet
+        setState(OperationState.FINISHED)
+        return
+      }
+
       val resultFetcher = executor.executeStatement(
         new OperationHandle(getHandle.identifier),
         statement)
@@ -71,4 +85,36 @@ class ExecuteStatement(
       shutdownTimeoutMonitor()
     }
   }
+
+  private def parseExtendedStatement(statement: String): Optional[Operation] = 
{
+    val plannerModuleClassLoader: ClassLoader = getPlannerModuleClassLoader
+    val extendedParser: AnyRef =
+      DynConstructors.builder()
+        .loader(plannerModuleClassLoader)
+        .impl("org.apache.flink.table.planner.parse.ExtendedParser")
+        .build().newInstance()
+    DynMethods.builder("parse")
+      .hiddenImpl(extendedParser.getClass, classOf[String])
+      .buildChecked()
+      .invoke(extendedParser, statement)
+  }
+
+  private def getPlannerModuleClassLoader: ClassLoader = {
+    try {
+      val plannerModule = DynMethods.builder("getInstance")
+        .hiddenImpl("org.apache.flink.table.planner.loader.PlannerModule")
+        .buildStaticChecked()
+        .invoke().asInstanceOf[AnyRef]
+
+      DynFields.builder()
+        .hiddenImpl(plannerModule.getClass, "submoduleClassLoader")
+        .build[ClassLoader].bind(plannerModule).get
+    } catch {
+      case e: Exception =>
+        throw new TableException(
+          "Error obtaining Flink planner module ClassLoader. " +
+            "Make sure a flink-table-planner-loader.jar is on the classpath",
+          e)
+    }
+  }
 }
diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/CommandStrings.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/CommandStrings.scala
new file mode 100644
index 000000000..56a199fa6
--- /dev/null
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/CommandStrings.scala
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.result
+
+import scala.collection.mutable.ListBuffer
+
+import org.apache.flink.util.Preconditions
+import org.jline.utils.{AttributedString, AttributedStringBuilder, 
AttributedStyle}
+
+/**
+ * Utility class that contains all strings for Flink SQL commands and messages.
+ */
+object CommandStrings {
+  private val CMD_DESC_DELIMITER = "\t\t"
+
+  private class SQLCommandsDescriptions {
+    private var commandMaxLength = -1
+    private val commandsDescriptionList = ListBuffer[(String, String)]()
+
+    def commandDescription(command: String, description: String): 
SQLCommandsDescriptions = {
+      Preconditions.checkState(
+        command.nonEmpty,
+        s"content of command must not be empty.",
+        Seq(): _*)
+      Preconditions.checkState(
+        description.nonEmpty,
+        s"content of command's description must not be empty.",
+        Seq(): _*)
+
+      updateMaxCommandLength(command.length)
+      commandsDescriptionList += ((command, description))
+      this
+    }
+
+    private def updateMaxCommandLength(newLength: Int): Unit = {
+      Preconditions.checkState(newLength > 0)
+      if (commandMaxLength < newLength) {
+        commandMaxLength = newLength
+      }
+    }
+
+    private def formatDescription(input: String): String = {
+      val maxLineLength = 160
+      val newLinePrefix = " " * commandMaxLength + CMD_DESC_DELIMITER
+      val words = input.split(" ")
+
+      val (lastLine, lines) = words.foldLeft(("", List[String]())) {
+        case ((line, lines), word) =>
+          val newLine = if (line.isEmpty) word else line + " " + word
+          if (newLine.length > maxLineLength) (word, lines :+ line) else 
(newLine, lines)
+      }
+
+      (lines :+ lastLine).mkString("\n" + newLinePrefix)
+    }
+
+    def build(): AttributedString = {
+      val attributedStringBuilder = new AttributedStringBuilder
+      if (commandsDescriptionList.nonEmpty) {
+        commandsDescriptionList.foreach {
+          case (cmd, cmdDesc) =>
+            attributedStringBuilder
+              .style(AttributedStyle.DEFAULT.bold())
+              .append(cmd.padTo(commandMaxLength, " ").mkString)
+              .append(CMD_DESC_DELIMITER)
+              .style(AttributedStyle.DEFAULT)
+              .append(formatDescription(cmdDesc))
+              .append('\n')
+        }
+      }
+      attributedStringBuilder.toAttributedString
+    }
+  }
+
+  // scalastyle:off line.size.limit
+  val MESSAGE_HELP: AttributedString =
+    new AttributedStringBuilder()
+      .append("The following commands are available:\n\n")
+      .append(COMMANDS_DESCRIPTIONS)
+      .style(AttributedStyle.DEFAULT.underline())
+      .append("\nHint")
+      .style(AttributedStyle.DEFAULT)
+      .append(
+        ": Make sure that a statement ends with \";\" for finalizing 
(multi-line) statements.")
+      // About Documentation Link.
+      .style(AttributedStyle.DEFAULT)
+      .append(
+        "\nThe above list includes only the most frequently used 
statements.\nYou can also type any Flink SQL statement, please visit 
https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/overview/
 for more details.")
+      .toAttributedString
+
+  def COMMANDS_DESCRIPTIONS: AttributedString =
+    new SQLCommandsDescriptions()
+      .commandDescription(
+        "HELP",
+        "Prints the available commands or the detailed description of a 
specified command.")
+      .commandDescription(
+        "SET",
+        "Sets a session configuration property. Syntax: \"SET 
'<key>'='<value>';\". Use \"SET;\" for listing all properties.")
+      .commandDescription(
+        "RESET",
+        "Resets a session configuration property. Syntax: \"RESET '<key>';\". 
Use \"RESET;\" for reset all session properties.")
+      .commandDescription(
+        "INSERT INTO",
+        "Inserts the results of a SQL SELECT query into a declared table 
sink.")
+      .commandDescription(
+        "INSERT OVERWRITE",
+        "Inserts the results of a SQL SELECT query into a declared table sink 
and overwrite existing data.")
+      .commandDescription(
+        "SELECT",
+        "Executes a SQL SELECT query on the Flink cluster.")
+      .commandDescription(
+        "EXPLAIN",
+        "Describes the execution plan of a query or table with the given 
name.")
+      .commandDescription(
+        "BEGIN STATEMENT SET",
+        "Begins a statement set. Syntax: \"BEGIN STATEMENT SET;\"")
+      .commandDescription("END", "Ends a statement set. Syntax: \"END;\"")
+      .commandDescription(
+        "ADD JAR",
+        "Adds the specified jar file to the submitted jobs' classloader. 
Syntax: \"ADD JAR '<path_to_filename>.jar'\"")
+      .commandDescription(
+        "SHOW JARS",
+        "Shows the list of user-specified jar dependencies. This list is 
impacted by the ADD JAR commands.")
+      .commandDescription(
+        "SHOW CATALOGS",
+        "Shows the list of registered catalogs.")
+      .commandDescription(
+        "SHOW CURRENT CATALOG",
+        "Shows the name of the current catalog.")
+      .commandDescription(
+        "SHOW DATABASES",
+        "Shows all databases in the current catalog.")
+      .commandDescription(
+        "SHOW CURRENT DATABASE",
+        "Shows the name of the current database.")
+      .commandDescription(
+        "SHOW TABLES",
+        "Shows all tables for an optionally specified database. Syntax: \"SHOW 
TABLES [ ( FROM | IN ) [catalog_name.]database_name ] [ [NOT] LIKE 
<sql_like_pattern> ]\"")
+      .commandDescription(
+        "SHOW CREATE TABLE",
+        "Shows the CREATE TABLE statement that creates the specified table.")
+      .commandDescription(
+        "SHOW COLUMNS",
+        "Shows all columns of a table with the given name. Syntax: \"SHOW 
COLUMNS ( FROM | IN ) [[catalog_name.]database.]<table_name> [ [NOT] LIKE 
<sql_like_pattern>]\"")
+      .commandDescription(
+        "SHOW VIEWS",
+        "Shows all views in the current catalog and the current database.")
+      .commandDescription(
+        "SHOW CREATE VIEW",
+        "Shows the CREATE VIEW statement that creates the specified view. 
Syntax: \"SHOW CREATE VIEW [catalog_name.][db_name.]view_name\"")
+      .commandDescription(
+        "SHOW FUNCTIONS",
+        "Shows all user-defined and built-in functions in the current catalog 
and current database. Use \"SHOW USER FUNCTIONS\" for listing all user-defined 
functions in the current catalog and current database.")
+      .commandDescription(
+        "SHOW MODULES",
+        "Shows all enabled module names with resolution order.")
+      .commandDescription(
+        "USE CATALOG",
+        "Sets the current catalog. All subsequent commands that do not 
explicitly specify a catalog will use this one. If the provided catalog does 
not exist, an exception is thrown. The default current catalog is 
default_catalog. Syntax: \"USE CATALOG catalog_name\"")
+      .commandDescription(
+        "USE",
+        "Sets the current database. All subsequent commands that do not 
explicitly specify a database will use this one. If the provided database does 
not exist, an exception is thrown. The default current database is 
default_database. Syntax: \"USE [catalog_name.]database_name\"")
+      .commandDescription(
+        "DESC",
+        "Describes the schema of a table with the given name. Syntax: \"{ 
DESCRIBE | DESC } [catalog_name.][db_name.]table_name\"")
+      .commandDescription(
+        "ANALYZE",
+        "ANALYZE statements are used to collect statistics for existing tables 
and store the result to catalog. Only supports in batch mode. Syntax: \"ANALYZE 
TABLE [catalog_name.][db_name.]table_name PARTITION(partcol1[=val1] [, 
partcol2[=val2], ...]) COMPUTE STATISTICS [FOR COLUMNS col1 [, col2, ...] | FOR 
ALL COLUMNS]\"")
+      .commandDescription(
+        "ALTER TABLE",
+        "Renames a table or change a table's properties. Syntax: \"ALTER TABLE 
[catalog_name.][db_name.]table_name RENAME TO new_table_name\", the other 
syntax: \"ALTER TABLE [catalog_name.][db_name.]table_name SET ( key1=val1[, 
key2=val2, ...] )\"")
+      .commandDescription(
+        "ALTER VIEW",
+        "Renames a given view to a new name within the same catalog and 
database. Syntax: \"ALTER VIEW [catalog_name.][db_name.]view_name RENAME TO 
new_view_name\"")
+      .commandDescription(
+        "ALTER DATABASE",
+        "Changes a database's properties. Syntax: \"ALTER DATABASE 
[catalog_name.]db_name SET ( key1=val1[, key2=val2, ...] )\"")
+      .commandDescription(
+        "ALTER FUNCTION",
+        "Changes a catalog function with the new identifier and optional 
language tag. Syntax: \"ALTER [TEMPORARY|TEMPORARY SYSTEM] FUNCTION [IF EXISTS] 
[catalog_name.][db_name.]function_name AS identifier [LANGUAGE 
JAVA|SCALA|PYTHON]\"")
+      .commandDescription(
+        "DROP CATALOG",
+        "Drops a catalog with the given catalog name. Syntax: \"DROP CATALOG 
[IF EXISTS] catalog_name\"")
+      .commandDescription(
+        "DROP DATABASE",
+        "Drops a database with the given database name. Syntax: \"DROP 
DATABASE [IF EXISTS] [catalog_name.]db_name [ (RESTRICT | CASCADE) ]\"")
+      .commandDescription(
+        "DROP TABLE",
+        "Drops a table with the given table name. Syntax: \"DROP [TEMPORARY] 
TABLE [IF EXISTS] [catalog_name.][db_name.]table_name\"")
+      .commandDescription(
+        "DROP VIEW",
+        "Drops a view with the given view name. Syntax: \"DROP [TEMPORARY] 
VIEW  [IF EXISTS] [catalog_name.][db_name.]view_name\"")
+      .commandDescription(
+        "DROP FUNCTION",
+        "Drops a catalog function with the given function name. Syntax: \"DROP 
[TEMPORARY|TEMPORARY SYSTEM] FUNCTION [IF EXISTS] 
[catalog_name.][db_name.]function_name\"")
+      .commandDescription(
+        "CREATE CATALOG",
+        "Creates a catalog with the given catalog properties. Syntax: \"CREATE 
CATALOG catalog_name WITH ( 'key1'='value1'[, 'key2'='value2', ...] )\"")
+      .commandDescription(
+        "CREATE DATABASE",
+        "Creates a database with the given database properties. Syntax: 
\"CREATE DATABASE [IF NOT EXISTS] [catalog_name.]db_name [COMMENT 
'database_comment'] [WITH ( 'key1'='value1'[, 'key2'='value2', ...] )]\"")
+      .commandDescription(
+        "CREATE TABLE",
+        "Creates a table with the given table properties. Syntax: \"CREATE 
[TEMPORARY] TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name ( { 
col_name data_type [COMMENT col_comment] [column_constraint] | table_constraint 
} [,...] ) [COMMENT table_comment] [PARTITIONED BY (col_name, col_name, ...)] 
[WITH ( 'key1'='value1'[, 'key2'='value2', ...] )] \"")
+      .commandDescription(
+        "CREATE VIEW",
+        "Creates a view with the given view expression. Syntax: \"CREATE 
[TEMPORARY] VIEW [IF NOT EXISTS] [catalog_name.][db_name.]view_name 
[(column_name [,...])] [COMMENT view_comment] AS query_expression\"")
+      .commandDescription(
+        "CREATE FUNCTION",
+        "Creates a catalog function with the given function properties. 
Syntax: \"CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION [IF NOT EXISTS] 
[catalog_name.][db_name.]function_name AS identifier [LANGUAGE 
JAVA|SCALA|PYTHON] [USING JAR '<path_to_filename>.jar' [, JAR 
'<path_to_filename>.jar']* ]\"")
+      .commandDescription(
+        "SHOW JOBS",
+        "Show the jobs in the Flink cluster. Supports in version 1.17 and 
later.")
+      .commandDescription(
+        "STOP JOB",
+        "Stop the job with the given job ID. Supports in version 1.17 and 
later. Syntax: \"STOP JOB '<job_id>' [WITH SAVEPOINT] [WITH DRAIN]\"")
+      .commandDescription(
+        "UPDATE",
+        "Performs row-level updating on the target table. Only supports in 
batch mode. Supports in version 1.17 and later. Syntax: \"UPDATE 
[catalog_name.][db_name.]table_name SET col_name1 = col_val1 [, col_name2 = 
col_val2 ...] [WHERE condition]\"")
+      .commandDescription(
+        "DELETE",
+        "Performs row-level deleting on the target table. Only supports in 
batch mode. Supports in version 1.17 and later. Syntax: \"DELETE FROM 
[catalog_name.][db_name.]table_name [WHERE condition]\"")
+      .commandDescription(
+        "TRUNCATE TABLE",
+        "Truncates the target table. Only supports in batch mode. Supports in 
version 1.18 and later. Syntax: \"TRUNCATE TABLE 
[catalog_name.][db_name.]table_name\"")
+      .commandDescription(
+        "CALL",
+        "Calls a stored procedure. Supports in version 1.18 and later. Syntax: 
\"CALL [catalog_name.][database_name.]procedure_name ([ expression [, 
expression]* ] )\"")
+      .build()
+  // scalastyle:on
+}
diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala
index 8b722f1e5..583c64fc6 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala
@@ -58,6 +58,13 @@ object ResultSetUtil {
       .data(Array[Row](Row.of("OK")))
       .build
 
+  def helpMessageResultSet: ResultSet =
+    ResultSet.builder
+      .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
+      .columns(Column.physical("result", DataTypes.STRING))
+      .data(Array[Row](Row.of(CommandStrings.MESSAGE_HELP.toString)))
+      .build
+
   def fromResultFetcher(
       resultFetcher: ResultFetcher,
       maxRows: Int,
diff --git 
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
index 9469cf286..76f718976 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
@@ -32,7 +32,7 @@ import org.apache.kyuubi.Utils
 import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.engine.flink.FlinkEngineUtils.FLINK_RUNTIME_VERSION
 import org.apache.kyuubi.engine.flink.WithFlinkTestResources
-import org.apache.kyuubi.engine.flink.result.Constants
+import org.apache.kyuubi.engine.flink.result.{CommandStrings, Constants}
 import org.apache.kyuubi.engine.flink.util.TestUserClassLoaderJar
 import org.apache.kyuubi.jdbc.hive.{KyuubiSQLException, KyuubiStatement}
 import org.apache.kyuubi.jdbc.hive.common.TimestampTZ
@@ -1265,4 +1265,14 @@ abstract class FlinkOperationSuite extends 
HiveJDBCTestHelper with WithFlinkTest
       })
     assert(exception.getMessage === "Futures timed out after [60000 
milliseconds]")
   }
+
+  test("execute statement - help") {
+    withJdbcStatement() { stmt =>
+      val resultSet = stmt.executeQuery("help")
+      val metadata = resultSet.getMetaData
+      assert(metadata.getColumnName(1) === "result")
+      assert(resultSet.next())
+      
assert(resultSet.getString(1).equals(CommandStrings.MESSAGE_HELP.toString))
+    }
+  }
 }
diff --git a/pom.xml b/pom.xml
index 75bfd8650..815d5ed22 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1385,6 +1385,12 @@
                 <scope>provided</scope>
             </dependency>
 
+            <dependency>
+                <groupId>org.apache.flink</groupId>
+                <artifactId>flink-table-planner-loader</artifactId>
+                <version>${flink.version}</version>
+            </dependency>
+
             <dependency>
                 <groupId>org.apache.flink</groupId>
                 <artifactId>flink-sql-client</artifactId>

Reply via email to