This is an automated email from the ASF dual-hosted git repository.
paullin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new d4320e7a6 [KYUUBI #5505][FLINK] Support HELP command
d4320e7a6 is described below
commit d4320e7a6b6058d572732bbc41a0b5dc6ed1417e
Author: Xianxun Ye <[email protected]>
AuthorDate: Tue Nov 7 15:09:57 2023 +0800
[KYUUBI #5505][FLINK] Support HELP command
### _Why are the changes needed?_
resolve: #5505
Show available commands when users type 'HELP;' in the beeline.
#### Solution:
- Using `ExtendedParser` parse statement and return Operation of Flink
engine.
- Check whether the operation is HelpOperation or not.
- dependency on `flink-table-planner-loader.jar`.
#### **Why not using Flink SQL Client Parser(SqlCommandParserImpl) to
obtain the Command enum?**
Flink 1.16's approach:
```
val opt:Optional[Operation] =
org.apache.flink.table.client.cli.SqlCommandParserImpl.parseCommand()
check opt.get() instance of HelpOperation or not
if yes return CliStrings.MESSAGE_HELP
```
Flink 1.17 & 1.18
```
val opt: Optional[Command] =
org.apache.flink.table.client.cli.parser.SqlCommandParserImpl.parseStatement()
check opt.get() is Command.HELP or not
if yes return CliStrings.MESSAGE_HELP
```
The `Command` class is added from Flink1.17;
The `SqlCommandParserImpl` package is changed, and the method name is
changed from Flink1.17;
This approach requires distinguishing between different Flink versions and
maintaining both implementations.
It's more complicated, so abandoned.
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
locally before make a pull request
### _Was this patch authored or co-authored using generative AI tooling?_
Closes #5585 from YesOrNo828/help.
Closes #5505
e73b15e43 [Xianxun Ye] [KYUUBI #5505] [FLINK] Add
job,update,delete,truncate and call statements
5943dd072 [Xianxun Ye] [KYUUBI #5505] [FLINK] Add help messages
fdc2db6ab [Xianxun Ye] [KYUUBI #5505] [FLINK] Support HELP command, fixed
Pan's comments
a728048fc [Xianxun Ye] [KYUUBI #5505] [FLINK] Support HELP command, fixed
Pan's comments
6323cfb85 [Xianxun Ye] [KYUUBI #5505] [FLINK] Support HELP command
Authored-by: Xianxun Ye <[email protected]>
Signed-off-by: Paul Lin <[email protected]>
---
externals/kyuubi-flink-sql-engine/pom.xml | 6 +
.../engine/flink/operation/ExecuteStatement.scala | 46 ++++
.../engine/flink/result/CommandStrings.scala | 245 +++++++++++++++++++++
.../kyuubi/engine/flink/result/ResultSetUtil.scala | 7 +
.../flink/operation/FlinkOperationSuite.scala | 12 +-
pom.xml | 6 +
6 files changed, 321 insertions(+), 1 deletion(-)
diff --git a/externals/kyuubi-flink-sql-engine/pom.xml
b/externals/kyuubi-flink-sql-engine/pom.xml
index eec5c1cd9..1e12b82d0 100644
--- a/externals/kyuubi-flink-sql-engine/pom.xml
+++ b/externals/kyuubi-flink-sql-engine/pom.xml
@@ -105,6 +105,12 @@
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner-loader</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
<!-- tests -->
<dependency>
<groupId>org.apache.kyuubi</groupId>
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
index 0e0c476e2..f30b6ab86 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
@@ -17,10 +17,15 @@
package org.apache.kyuubi.engine.flink.operation
+import java.util.Optional
+
import scala.concurrent.duration.Duration
import org.apache.flink.api.common.JobID
+import org.apache.flink.table.api.TableException
import org.apache.flink.table.gateway.api.operation.OperationHandle
+import org.apache.flink.table.operations.Operation
+import org.apache.flink.table.operations.command.HelpOperation
import org.apache.kyuubi.Logging
import org.apache.kyuubi.engine.flink.FlinkEngineUtils
@@ -28,6 +33,7 @@ import org.apache.kyuubi.engine.flink.result.ResultSetUtil
import org.apache.kyuubi.operation.OperationState
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
+import org.apache.kyuubi.util.reflect.{DynConstructors, DynFields, DynMethods}
class ExecuteStatement(
session: Session,
@@ -59,6 +65,14 @@ class ExecuteStatement(
private def executeStatement(): Unit = {
try {
setState(OperationState.RUNNING)
+
+ val operation = parseExtendedStatement(statement)
+ if (operation.isPresent && operation.get().isInstanceOf[HelpOperation]) {
+ resultSet = ResultSetUtil.helpMessageResultSet
+ setState(OperationState.FINISHED)
+ return
+ }
+
val resultFetcher = executor.executeStatement(
new OperationHandle(getHandle.identifier),
statement)
@@ -71,4 +85,36 @@ class ExecuteStatement(
shutdownTimeoutMonitor()
}
}
+
+ private def parseExtendedStatement(statement: String): Optional[Operation] =
{
+ val plannerModuleClassLoader: ClassLoader = getPlannerModuleClassLoader
+ val extendedParser: AnyRef =
+ DynConstructors.builder()
+ .loader(plannerModuleClassLoader)
+ .impl("org.apache.flink.table.planner.parse.ExtendedParser")
+ .build().newInstance()
+ DynMethods.builder("parse")
+ .hiddenImpl(extendedParser.getClass, classOf[String])
+ .buildChecked()
+ .invoke(extendedParser, statement)
+ }
+
+ private def getPlannerModuleClassLoader: ClassLoader = {
+ try {
+ val plannerModule = DynMethods.builder("getInstance")
+ .hiddenImpl("org.apache.flink.table.planner.loader.PlannerModule")
+ .buildStaticChecked()
+ .invoke().asInstanceOf[AnyRef]
+
+ DynFields.builder()
+ .hiddenImpl(plannerModule.getClass, "submoduleClassLoader")
+ .build[ClassLoader].bind(plannerModule).get
+ } catch {
+ case e: Exception =>
+ throw new TableException(
+ "Error obtaining Flink planner module ClassLoader. " +
+ "Make sure a flink-table-planner-loader.jar is on the classpath",
+ e)
+ }
+ }
}
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/CommandStrings.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/CommandStrings.scala
new file mode 100644
index 000000000..56a199fa6
--- /dev/null
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/CommandStrings.scala
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.result
+
+import scala.collection.mutable.ListBuffer
+
+import org.apache.flink.util.Preconditions
+import org.jline.utils.{AttributedString, AttributedStringBuilder,
AttributedStyle}
+
+/**
+ * Utility class that contains all strings for Flink SQL commands and messages.
+ */
+object CommandStrings {
+ private val CMD_DESC_DELIMITER = "\t\t"
+
+ private class SQLCommandsDescriptions {
+ private var commandMaxLength = -1
+ private val commandsDescriptionList = ListBuffer[(String, String)]()
+
+ def commandDescription(command: String, description: String):
SQLCommandsDescriptions = {
+ Preconditions.checkState(
+ command.nonEmpty,
+ s"content of command must not be empty.",
+ Seq(): _*)
+ Preconditions.checkState(
+ description.nonEmpty,
+ s"content of command's description must not be empty.",
+ Seq(): _*)
+
+ updateMaxCommandLength(command.length)
+ commandsDescriptionList += ((command, description))
+ this
+ }
+
+ private def updateMaxCommandLength(newLength: Int): Unit = {
+ Preconditions.checkState(newLength > 0)
+ if (commandMaxLength < newLength) {
+ commandMaxLength = newLength
+ }
+ }
+
+ private def formatDescription(input: String): String = {
+ val maxLineLength = 160
+ val newLinePrefix = " " * commandMaxLength + CMD_DESC_DELIMITER
+ val words = input.split(" ")
+
+ val (lastLine, lines) = words.foldLeft(("", List[String]())) {
+ case ((line, lines), word) =>
+ val newLine = if (line.isEmpty) word else line + " " + word
+ if (newLine.length > maxLineLength) (word, lines :+ line) else
(newLine, lines)
+ }
+
+ (lines :+ lastLine).mkString("\n" + newLinePrefix)
+ }
+
+ def build(): AttributedString = {
+ val attributedStringBuilder = new AttributedStringBuilder
+ if (commandsDescriptionList.nonEmpty) {
+ commandsDescriptionList.foreach {
+ case (cmd, cmdDesc) =>
+ attributedStringBuilder
+ .style(AttributedStyle.DEFAULT.bold())
+ .append(cmd.padTo(commandMaxLength, " ").mkString)
+ .append(CMD_DESC_DELIMITER)
+ .style(AttributedStyle.DEFAULT)
+ .append(formatDescription(cmdDesc))
+ .append('\n')
+ }
+ }
+ attributedStringBuilder.toAttributedString
+ }
+ }
+
+ // scalastyle:off line.size.limit
+ val MESSAGE_HELP: AttributedString =
+ new AttributedStringBuilder()
+ .append("The following commands are available:\n\n")
+ .append(COMMANDS_DESCRIPTIONS)
+ .style(AttributedStyle.DEFAULT.underline())
+ .append("\nHint")
+ .style(AttributedStyle.DEFAULT)
+ .append(
+ ": Make sure that a statement ends with \";\" for finalizing
(multi-line) statements.")
+ // About Documentation Link.
+ .style(AttributedStyle.DEFAULT)
+ .append(
+ "\nThe above list includes only the most frequently used
statements.\nYou can also type any Flink SQL statement, please visit
https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/overview/
for more details.")
+ .toAttributedString
+
+ def COMMANDS_DESCRIPTIONS: AttributedString =
+ new SQLCommandsDescriptions()
+ .commandDescription(
+ "HELP",
+ "Prints the available commands or the detailed description of a
specified command.")
+ .commandDescription(
+ "SET",
+ "Sets a session configuration property. Syntax: \"SET
'<key>'='<value>';\". Use \"SET;\" for listing all properties.")
+ .commandDescription(
+ "RESET",
+ "Resets a session configuration property. Syntax: \"RESET '<key>';\".
Use \"RESET;\" for reset all session properties.")
+ .commandDescription(
+ "INSERT INTO",
+ "Inserts the results of a SQL SELECT query into a declared table
sink.")
+ .commandDescription(
+ "INSERT OVERWRITE",
+ "Inserts the results of a SQL SELECT query into a declared table sink
and overwrite existing data.")
+ .commandDescription(
+ "SELECT",
+ "Executes a SQL SELECT query on the Flink cluster.")
+ .commandDescription(
+ "EXPLAIN",
+ "Describes the execution plan of a query or table with the given
name.")
+ .commandDescription(
+ "BEGIN STATEMENT SET",
+ "Begins a statement set. Syntax: \"BEGIN STATEMENT SET;\"")
+ .commandDescription("END", "Ends a statement set. Syntax: \"END;\"")
+ .commandDescription(
+ "ADD JAR",
+ "Adds the specified jar file to the submitted jobs' classloader.
Syntax: \"ADD JAR '<path_to_filename>.jar'\"")
+ .commandDescription(
+ "SHOW JARS",
+ "Shows the list of user-specified jar dependencies. This list is
impacted by the ADD JAR commands.")
+ .commandDescription(
+ "SHOW CATALOGS",
+ "Shows the list of registered catalogs.")
+ .commandDescription(
+ "SHOW CURRENT CATALOG",
+ "Shows the name of the current catalog.")
+ .commandDescription(
+ "SHOW DATABASES",
+ "Shows all databases in the current catalog.")
+ .commandDescription(
+ "SHOW CURRENT DATABASE",
+ "Shows the name of the current database.")
+ .commandDescription(
+ "SHOW TABLES",
+ "Shows all tables for an optionally specified database. Syntax: \"SHOW
TABLES [ ( FROM | IN ) [catalog_name.]database_name ] [ [NOT] LIKE
<sql_like_pattern> ]\"")
+ .commandDescription(
+ "SHOW CREATE TABLE",
+ "Shows the CREATE TABLE statement that creates the specified table.")
+ .commandDescription(
+ "SHOW COLUMNS",
+ "Shows all columns of a table with the given name. Syntax: \"SHOW
COLUMNS ( FROM | IN ) [[catalog_name.]database.]<table_name> [ [NOT] LIKE
<sql_like_pattern>]\"")
+ .commandDescription(
+ "SHOW VIEWS",
+ "Shows all views in the current catalog and the current database.")
+ .commandDescription(
+ "SHOW CREATE VIEW",
+ "Shows the CREATE VIEW statement that creates the specified view.
Syntax: \"SHOW CREATE VIEW [catalog_name.][db_name.]view_name\"")
+ .commandDescription(
+ "SHOW FUNCTIONS",
+ "Shows all user-defined and built-in functions in the current catalog
and current database. Use \"SHOW USER FUNCTIONS\" for listing all user-defined
functions in the current catalog and current database.")
+ .commandDescription(
+ "SHOW MODULES",
+ "Shows all enabled module names with resolution order.")
+ .commandDescription(
+ "USE CATALOG",
+ "Sets the current catalog. All subsequent commands that do not
explicitly specify a catalog will use this one. If the provided catalog does
not exist, an exception is thrown. The default current catalog is
default_catalog. Syntax: \"USE CATALOG catalog_name\"")
+ .commandDescription(
+ "USE",
+ "Sets the current database. All subsequent commands that do not
explicitly specify a database will use this one. If the provided database does
not exist, an exception is thrown. The default current database is
default_database. Syntax: \"USE [catalog_name.]database_name\"")
+ .commandDescription(
+ "DESC",
+ "Describes the schema of a table with the given name. Syntax: \"{
DESCRIBE | DESC } [catalog_name.][db_name.]table_name\"")
+ .commandDescription(
+ "ANALYZE",
+ "ANALYZE statements are used to collect statistics for existing tables
and store the result to catalog. Only supports in batch mode. Syntax: \"ANALYZE
TABLE [catalog_name.][db_name.]table_name PARTITION(partcol1[=val1] [,
partcol2[=val2], ...]) COMPUTE STATISTICS [FOR COLUMNS col1 [, col2, ...] | FOR
ALL COLUMNS]\"")
+ .commandDescription(
+ "ALTER TABLE",
+ "Renames a table or change a table's properties. Syntax: \"ALTER TABLE
[catalog_name.][db_name.]table_name RENAME TO new_table_name\", the other
syntax: \"ALTER TABLE [catalog_name.][db_name.]table_name SET ( key1=val1[,
key2=val2, ...] )\"")
+ .commandDescription(
+ "ALTER VIEW",
+ "Renames a given view to a new name within the same catalog and
database. Syntax: \"ALTER VIEW [catalog_name.][db_name.]view_name RENAME TO
new_view_name\"")
+ .commandDescription(
+ "ALTER DATABASE",
+ "Changes a database's properties. Syntax: \"ALTER DATABASE
[catalog_name.]db_name SET ( key1=val1[, key2=val2, ...] )\"")
+ .commandDescription(
+ "ALTER FUNCTION",
+ "Changes a catalog function with the new identifier and optional
language tag. Syntax: \"ALTER [TEMPORARY|TEMPORARY SYSTEM] FUNCTION [IF EXISTS]
[catalog_name.][db_name.]function_name AS identifier [LANGUAGE
JAVA|SCALA|PYTHON]\"")
+ .commandDescription(
+ "DROP CATALOG",
+ "Drops a catalog with the given catalog name. Syntax: \"DROP CATALOG
[IF EXISTS] catalog_name\"")
+ .commandDescription(
+ "DROP DATABASE",
+ "Drops a database with the given database name. Syntax: \"DROP
DATABASE [IF EXISTS] [catalog_name.]db_name [ (RESTRICT | CASCADE) ]\"")
+ .commandDescription(
+ "DROP TABLE",
+ "Drops a table with the given table name. Syntax: \"DROP [TEMPORARY]
TABLE [IF EXISTS] [catalog_name.][db_name.]table_name\"")
+ .commandDescription(
+ "DROP VIEW",
+ "Drops a view with the given view name. Syntax: \"DROP [TEMPORARY]
VIEW [IF EXISTS] [catalog_name.][db_name.]view_name\"")
+ .commandDescription(
+ "DROP FUNCTION",
+ "Drops a catalog function with the given function name. Syntax: \"DROP
[TEMPORARY|TEMPORARY SYSTEM] FUNCTION [IF EXISTS]
[catalog_name.][db_name.]function_name\"")
+ .commandDescription(
+ "CREATE CATALOG",
+ "Creates a catalog with the given catalog properties. Syntax: \"CREATE
CATALOG catalog_name WITH ( 'key1'='value1'[, 'key2'='value2', ...] )\"")
+ .commandDescription(
+ "CREATE DATABASE",
+ "Creates a database with the given database properties. Syntax:
\"CREATE DATABASE [IF NOT EXISTS] [catalog_name.]db_name [COMMENT
'database_comment'] [WITH ( 'key1'='value1'[, 'key2'='value2', ...] )]\"")
+ .commandDescription(
+ "CREATE TABLE",
+ "Creates a table with the given table properties. Syntax: \"CREATE
[TEMPORARY] TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name ( {
col_name data_type [COMMENT col_comment] [column_constraint] | table_constraint
} [,...] ) [COMMENT table_comment] [PARTITIONED BY (col_name, col_name, ...)]
[WITH ( 'key1'='value1'[, 'key2'='value2', ...] )] \"")
+ .commandDescription(
+ "CREATE VIEW",
+ "Creates a view with the given view expression. Syntax: \"CREATE
[TEMPORARY] VIEW [IF NOT EXISTS] [catalog_name.][db_name.]view_name
[(column_name [,...])] [COMMENT view_comment] AS query_expression\"")
+ .commandDescription(
+ "CREATE FUNCTION",
+ "Creates a catalog function with the given function properties.
Syntax: \"CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION [IF NOT EXISTS]
[catalog_name.][db_name.]function_name AS identifier [LANGUAGE
JAVA|SCALA|PYTHON] [USING JAR '<path_to_filename>.jar' [, JAR
'<path_to_filename>.jar']* ]\"")
+ .commandDescription(
+ "SHOW JOBS",
+ "Show the jobs in the Flink cluster. Supports in version 1.17 and
later.")
+ .commandDescription(
+ "STOP JOB",
+ "Stop the job with the given job ID. Supports in version 1.17 and
later. Syntax: \"STOP JOB '<job_id>' [WITH SAVEPOINT] [WITH DRAIN]\"")
+ .commandDescription(
+ "UPDATE",
+ "Performs row-level updating on the target table. Only supports in
batch mode. Supports in version 1.17 and later. Syntax: \"UPDATE
[catalog_name.][db_name.]table_name SET col_name1 = col_val1 [, col_name2 =
col_val2 ...] [WHERE condition]\"")
+ .commandDescription(
+ "DELETE",
+ "Performs row-level deleting on the target table. Only supports in
batch mode. Supports in version 1.17 and later. Syntax: \"DELETE FROM
[catalog_name.][db_name.]table_name [WHERE condition]\"")
+ .commandDescription(
+ "TRUNCATE TABLE",
+ "Truncates the target table. Only supports in batch mode. Supports in
version 1.18 and later. Syntax: \"TRUNCATE TABLE
[catalog_name.][db_name.]table_name\"")
+ .commandDescription(
+ "CALL",
+ "Calls a stored procedure. Supports in version 1.18 and later. Syntax:
\"CALL [catalog_name.][database_name.]procedure_name ([ expression [,
expression]* ] )\"")
+ .build()
+ // scalastyle:on
+}
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala
index 8b722f1e5..583c64fc6 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala
@@ -58,6 +58,13 @@ object ResultSetUtil {
.data(Array[Row](Row.of("OK")))
.build
+ def helpMessageResultSet: ResultSet =
+ ResultSet.builder
+ .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
+ .columns(Column.physical("result", DataTypes.STRING))
+ .data(Array[Row](Row.of(CommandStrings.MESSAGE_HELP.toString)))
+ .build
+
def fromResultFetcher(
resultFetcher: ResultFetcher,
maxRows: Int,
diff --git
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
index 9469cf286..76f718976 100644
---
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
+++
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
@@ -32,7 +32,7 @@ import org.apache.kyuubi.Utils
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.flink.FlinkEngineUtils.FLINK_RUNTIME_VERSION
import org.apache.kyuubi.engine.flink.WithFlinkTestResources
-import org.apache.kyuubi.engine.flink.result.Constants
+import org.apache.kyuubi.engine.flink.result.{CommandStrings, Constants}
import org.apache.kyuubi.engine.flink.util.TestUserClassLoaderJar
import org.apache.kyuubi.jdbc.hive.{KyuubiSQLException, KyuubiStatement}
import org.apache.kyuubi.jdbc.hive.common.TimestampTZ
@@ -1265,4 +1265,14 @@ abstract class FlinkOperationSuite extends
HiveJDBCTestHelper with WithFlinkTest
})
assert(exception.getMessage === "Futures timed out after [60000
milliseconds]")
}
+
+ test("execute statement - help") {
+ withJdbcStatement() { stmt =>
+ val resultSet = stmt.executeQuery("help")
+ val metadata = resultSet.getMetaData
+ assert(metadata.getColumnName(1) === "result")
+ assert(resultSet.next())
+
assert(resultSet.getString(1).equals(CommandStrings.MESSAGE_HELP.toString))
+ }
+ }
}
diff --git a/pom.xml b/pom.xml
index 75bfd8650..815d5ed22 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1385,6 +1385,12 @@
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner-loader</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-client</artifactId>