LadyForest commented on code in PR #21717: URL: https://github.com/apache/flink/pull/21717#discussion_r1081032613
########## flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/parser/SqlCommandParserImpl.java: ########## @@ -18,35 +18,136 @@ package org.apache.flink.table.client.cli.parser; -import org.apache.flink.table.api.SqlParserException; -import org.apache.flink.table.client.gateway.Executor; +import org.apache.flink.sql.parser.impl.FlinkSqlParserImplTokenManager; +import org.apache.flink.sql.parser.impl.SimpleCharStream; +import org.apache.flink.sql.parser.impl.Token; +import org.apache.flink.table.api.SqlParserEOFException; import org.apache.flink.table.client.gateway.SqlExecutionException; -import org.apache.flink.table.operations.Operation; +import java.io.StringReader; +import java.util.Iterator; import java.util.Optional; -/** SqlCommandParserImpl wrappers an {@link Executor} supports parse a statement to an Operation. */ -public class SqlCommandParserImpl implements SqlCommandParser { - private final Executor executor; +import static org.apache.flink.sql.parser.impl.FlinkSqlParserImplConstants.EOF; +import static org.apache.flink.sql.parser.impl.FlinkSqlParserImplConstants.IDENTIFIER; +import static org.apache.flink.sql.parser.impl.FlinkSqlParserImplConstants.SEMICOLON; - public SqlCommandParserImpl(Executor executor) { - this.executor = executor; - } +/** + * The {@link SqlCommandParserImpl} uses {@link FlinkSqlParserImplTokenManager} to do lexical + * analysis. It cannot recognize special hive keywords yet because Hive has a slightly different + * vocabulary compared to Flink's, which causes the The {@link SqlCommandParserImpl} Review Comment: Nit: remove duplicated "The" ########## flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/parser/SqlCommandParserImpl.java: ########## @@ -18,35 +18,136 @@ package org.apache.flink.table.client.cli.parser; -import org.apache.flink.table.api.SqlParserException; -import org.apache.flink.table.client.gateway.Executor; +import org.apache.flink.sql.parser.impl.FlinkSqlParserImplTokenManager; +import org.apache.flink.sql.parser.impl.SimpleCharStream; +import org.apache.flink.sql.parser.impl.Token; +import org.apache.flink.table.api.SqlParserEOFException; import org.apache.flink.table.client.gateway.SqlExecutionException; -import org.apache.flink.table.operations.Operation; +import java.io.StringReader; +import java.util.Iterator; import java.util.Optional; -/** SqlCommandParserImpl wrappers an {@link Executor} supports parse a statement to an Operation. */ -public class SqlCommandParserImpl implements SqlCommandParser { - private final Executor executor; +import static org.apache.flink.sql.parser.impl.FlinkSqlParserImplConstants.EOF; +import static org.apache.flink.sql.parser.impl.FlinkSqlParserImplConstants.IDENTIFIER; +import static org.apache.flink.sql.parser.impl.FlinkSqlParserImplConstants.SEMICOLON; - public SqlCommandParserImpl(Executor executor) { - this.executor = executor; - } +/** + * The {@link SqlCommandParserImpl} uses {@link FlinkSqlParserImplTokenManager} to do lexical + * analysis. It cannot recognize special hive keywords yet because Hive has a slightly different + * vocabulary compared to Flink's, which causes the The {@link SqlCommandParserImpl} + * misunderstanding some Hive's keywords to IDENTIFIER. But the ClientParser is only responsible to + * check whether the statement is completed or not and only cares about a few statements. So it's + * acceptable to tolerate the inaccuracy here. + */ +public class SqlCommandParserImpl implements SqlCommandParser { - @Override - public Optional<Operation> parseCommand(String stmt) throws SqlParserException { + public Optional<Command> parseStatement(String statement) throws SqlExecutionException { // normalize - stmt = stmt.trim(); + statement = statement.trim(); // meet empty statement, e.g ";\n" - if (stmt.isEmpty() || stmt.equals(";")) { + if (statement.isEmpty() || statement.equals(";")) { return Optional.empty(); + } else { + return Optional.of(getCommand(new TokenIterator(statement.trim()))); Review Comment: Nit: no need to `trim` again? ########## flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/parser/SqlCommandParserImplTest.java: ########## @@ -28,105 +28,99 @@ import java.util.Arrays; import java.util.List; -import java.util.Optional; import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; -import static org.apache.flink.table.client.cli.parser.StatementType.BEGIN_STATEMENT_SET; -import static org.apache.flink.table.client.cli.parser.StatementType.CLEAR; -import static org.apache.flink.table.client.cli.parser.StatementType.END; -import static org.apache.flink.table.client.cli.parser.StatementType.EXPLAIN; -import static org.apache.flink.table.client.cli.parser.StatementType.HELP; -import static org.apache.flink.table.client.cli.parser.StatementType.OTHER; -import static org.apache.flink.table.client.cli.parser.StatementType.QUIT; -import static org.apache.flink.table.client.cli.parser.StatementType.SELECT; -import static org.apache.flink.table.client.cli.parser.StatementType.SHOW_CREATE; +import static org.apache.flink.table.client.cli.parser.Command.CLEAR; +import static org.apache.flink.table.client.cli.parser.Command.HELP; +import static org.apache.flink.table.client.cli.parser.Command.OTHER; +import static org.apache.flink.table.client.cli.parser.Command.QUIT; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** - * Testing whether {@link ClientParser} can parse statement to get {@link StatementType} correctly. + * Testing whether {@link SqlCommandParserImpl} can parse statement to get {@link Command} + * correctly. */ -public class ClientParserTest { +public class SqlCommandParserImplTest { private static final String EXECUTE_STATEMENT_SET = "EXECUTE STATEMENT SET BEGIN\n INSERT INTO StreamingTable SELECT * FROM (VALUES (1, 'Hello World'));"; - private final ClientParser clientParser = new ClientParser(); + private final SqlCommandParserImpl sqlCommandParserImpl = new SqlCommandParserImpl(); @ParameterizedTest @MethodSource("positiveCases") public void testParseStatement(TestSpec testData) { - Optional<StatementType> type = clientParser.parseStatement(testData.statement); - assertThat(type.orElse(null)).isEqualTo(testData.type); + Command command = sqlCommandParserImpl.parseStatement(testData.statement).orElse(null); + assertThat(command).isEqualTo(testData.command); } @ParameterizedTest @MethodSource("negativeCases") public void testParseIncompleteStatement(String statement) { - assertThatThrownBy(() -> clientParser.parseStatement(statement)) + assertThatThrownBy(() -> sqlCommandParserImpl.parseStatement(statement)) .satisfies(anyCauseMatches(SqlExecutionException.class)) .cause() .satisfies(anyCauseMatches(SqlParserEOFException.class)); } private static List<TestSpec> positiveCases() { return Arrays.asList( - TestSpec.of(";", OTHER), + TestSpec.of(";", null), TestSpec.of("; ;", OTHER), // comment and multi lines tests - TestSpec.of("SHOW --ignore;\n CREATE TABLE tbl;", SHOW_CREATE), - TestSpec.of("SHOW\n create\t TABLE `tbl`;", SHOW_CREATE), + TestSpec.of("SHOW --ignore;\n CREATE TABLE tbl;", OTHER), + TestSpec.of("SHOW\n create\t TABLE `tbl`;", OTHER), TestSpec.of("SHOW -- create\n TABLES;", OTHER), // special characters tests - TestSpec.of("SELECT * FROM `tbl`;", SELECT), - TestSpec.of("SHOW /* ignore */ CREATE TABLE \"tbl\";", SHOW_CREATE), - TestSpec.of("SELECT '\\';", SELECT), + TestSpec.of("SELECT * FROM `tbl`;", OTHER), + TestSpec.of("SHOW /* ignore */ CREATE TABLE \"tbl\";", OTHER), + TestSpec.of("SELECT '\\';", OTHER), // normal tests TestSpec.of("quit;", QUIT), // non case sensitive test TestSpec.of("QUIT;", QUIT), TestSpec.of("Quit;", QUIT), TestSpec.of("QuIt;", QUIT), TestSpec.of("clear;", CLEAR), TestSpec.of("help;", HELP), - TestSpec.of("EXPLAIN PLAN FOR 'what_ever';", EXPLAIN), - TestSpec.of("SHOW CREATE TABLE(what_ever);", SHOW_CREATE), - TestSpec.of("SHOW CREATE VIEW (what_ever);", SHOW_CREATE), - TestSpec.of("SHOW CREATE syntax_error;", SHOW_CREATE), + TestSpec.of("EXPLAIN PLAN FOR 'what_ever';", OTHER), + TestSpec.of("SHOW CREATE TABLE(what_ever);", OTHER), + TestSpec.of("SHOW CREATE VIEW (what_ever);", OTHER), + TestSpec.of("SHOW CREATE syntax_error;", OTHER), TestSpec.of("SHOW TABLES;", OTHER), - TestSpec.of("BEGIN STATEMENT SET;", BEGIN_STATEMENT_SET), + TestSpec.of("BEGIN STATEMENT SET;", OTHER), TestSpec.of("BEGIN statement;", OTHER), - TestSpec.of("END;", END), + TestSpec.of("END;", OTHER), TestSpec.of("END statement;", OTHER), // statement set tests TestSpec.of(EXECUTE_STATEMENT_SET, OTHER), - TestSpec.of("EXPLAIN " + EXECUTE_STATEMENT_SET, EXPLAIN), - TestSpec.of("EXPLAIN BEGIN STATEMENT SET;", EXPLAIN), + TestSpec.of("EXPLAIN " + EXECUTE_STATEMENT_SET, OTHER), + TestSpec.of("EXPLAIN BEGIN STATEMENT SET;", OTHER), TestSpec.of(EXECUTE_STATEMENT_SET + "\nEND;", OTHER), - TestSpec.of("EXPLAIN " + EXECUTE_STATEMENT_SET + "\nEND;", EXPLAIN), - TestSpec.of("EXPLAIN BEGIN STATEMENT SET;\nEND;", EXPLAIN)); + TestSpec.of("EXPLAIN " + EXECUTE_STATEMENT_SET + "\nEND;", OTHER), + TestSpec.of("EXPLAIN BEGIN STATEMENT SET;\nEND;", OTHER)); } private static List<String> negativeCases() { - return Arrays.asList( - "", "\n", " ", "-- comment;", "SHOW TABLES -- comment;", "SHOW TABLES"); + return Arrays.asList("-- comment;", "SHOW TABLES -- comment;", "SHOW TABLES"); } /** Used to load generated data. */ private static class TestSpec { String statement; - StatementType type; + Command command; - TestSpec(String statement, @Nullable StatementType type) { + TestSpec(String statement, @Nullable Command type) { this.statement = statement; - this.type = type; + this.command = type; Review Comment: Nit: align the naming? ########## flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/parser/SqlCommandParserImplTest.java: ########## @@ -28,105 +28,99 @@ import java.util.Arrays; import java.util.List; -import java.util.Optional; import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; -import static org.apache.flink.table.client.cli.parser.StatementType.BEGIN_STATEMENT_SET; -import static org.apache.flink.table.client.cli.parser.StatementType.CLEAR; -import static org.apache.flink.table.client.cli.parser.StatementType.END; -import static org.apache.flink.table.client.cli.parser.StatementType.EXPLAIN; -import static org.apache.flink.table.client.cli.parser.StatementType.HELP; -import static org.apache.flink.table.client.cli.parser.StatementType.OTHER; -import static org.apache.flink.table.client.cli.parser.StatementType.QUIT; -import static org.apache.flink.table.client.cli.parser.StatementType.SELECT; -import static org.apache.flink.table.client.cli.parser.StatementType.SHOW_CREATE; +import static org.apache.flink.table.client.cli.parser.Command.CLEAR; +import static org.apache.flink.table.client.cli.parser.Command.HELP; +import static org.apache.flink.table.client.cli.parser.Command.OTHER; +import static org.apache.flink.table.client.cli.parser.Command.QUIT; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** - * Testing whether {@link ClientParser} can parse statement to get {@link StatementType} correctly. + * Testing whether {@link SqlCommandParserImpl} can parse statement to get {@link Command} + * correctly. */ -public class ClientParserTest { +public class SqlCommandParserImplTest { private static final String EXECUTE_STATEMENT_SET = "EXECUTE STATEMENT SET BEGIN\n INSERT INTO StreamingTable SELECT * FROM (VALUES (1, 'Hello World'));"; - private final ClientParser clientParser = new ClientParser(); + private final SqlCommandParserImpl sqlCommandParserImpl = new SqlCommandParserImpl(); @ParameterizedTest @MethodSource("positiveCases") public void testParseStatement(TestSpec testData) { - Optional<StatementType> type = clientParser.parseStatement(testData.statement); - assertThat(type.orElse(null)).isEqualTo(testData.type); + Command command = sqlCommandParserImpl.parseStatement(testData.statement).orElse(null); + assertThat(command).isEqualTo(testData.command); } @ParameterizedTest @MethodSource("negativeCases") public void testParseIncompleteStatement(String statement) { - assertThatThrownBy(() -> clientParser.parseStatement(statement)) + assertThatThrownBy(() -> sqlCommandParserImpl.parseStatement(statement)) .satisfies(anyCauseMatches(SqlExecutionException.class)) .cause() .satisfies(anyCauseMatches(SqlParserEOFException.class)); } private static List<TestSpec> positiveCases() { return Arrays.asList( - TestSpec.of(";", OTHER), + TestSpec.of(";", null), TestSpec.of("; ;", OTHER), // comment and multi lines tests - TestSpec.of("SHOW --ignore;\n CREATE TABLE tbl;", SHOW_CREATE), - TestSpec.of("SHOW\n create\t TABLE `tbl`;", SHOW_CREATE), + TestSpec.of("SHOW --ignore;\n CREATE TABLE tbl;", OTHER), + TestSpec.of("SHOW\n create\t TABLE `tbl`;", OTHER), TestSpec.of("SHOW -- create\n TABLES;", OTHER), // special characters tests - TestSpec.of("SELECT * FROM `tbl`;", SELECT), - TestSpec.of("SHOW /* ignore */ CREATE TABLE \"tbl\";", SHOW_CREATE), - TestSpec.of("SELECT '\\';", SELECT), + TestSpec.of("SELECT * FROM `tbl`;", OTHER), + TestSpec.of("SHOW /* ignore */ CREATE TABLE \"tbl\";", OTHER), + TestSpec.of("SELECT '\\';", OTHER), // normal tests TestSpec.of("quit;", QUIT), // non case sensitive test TestSpec.of("QUIT;", QUIT), TestSpec.of("Quit;", QUIT), TestSpec.of("QuIt;", QUIT), TestSpec.of("clear;", CLEAR), TestSpec.of("help;", HELP), - TestSpec.of("EXPLAIN PLAN FOR 'what_ever';", EXPLAIN), - TestSpec.of("SHOW CREATE TABLE(what_ever);", SHOW_CREATE), - TestSpec.of("SHOW CREATE VIEW (what_ever);", SHOW_CREATE), - TestSpec.of("SHOW CREATE syntax_error;", SHOW_CREATE), + TestSpec.of("EXPLAIN PLAN FOR 'what_ever';", OTHER), + TestSpec.of("SHOW CREATE TABLE(what_ever);", OTHER), + TestSpec.of("SHOW CREATE VIEW (what_ever);", OTHER), + TestSpec.of("SHOW CREATE syntax_error;", OTHER), TestSpec.of("SHOW TABLES;", OTHER), - TestSpec.of("BEGIN STATEMENT SET;", BEGIN_STATEMENT_SET), + TestSpec.of("BEGIN STATEMENT SET;", OTHER), TestSpec.of("BEGIN statement;", OTHER), - TestSpec.of("END;", END), + TestSpec.of("END;", OTHER), TestSpec.of("END statement;", OTHER), // statement set tests TestSpec.of(EXECUTE_STATEMENT_SET, OTHER), - TestSpec.of("EXPLAIN " + EXECUTE_STATEMENT_SET, EXPLAIN), - TestSpec.of("EXPLAIN BEGIN STATEMENT SET;", EXPLAIN), + TestSpec.of("EXPLAIN " + EXECUTE_STATEMENT_SET, OTHER), + TestSpec.of("EXPLAIN BEGIN STATEMENT SET;", OTHER), TestSpec.of(EXECUTE_STATEMENT_SET + "\nEND;", OTHER), - TestSpec.of("EXPLAIN " + EXECUTE_STATEMENT_SET + "\nEND;", EXPLAIN), - TestSpec.of("EXPLAIN BEGIN STATEMENT SET;\nEND;", EXPLAIN)); + TestSpec.of("EXPLAIN " + EXECUTE_STATEMENT_SET + "\nEND;", OTHER), + TestSpec.of("EXPLAIN BEGIN STATEMENT SET;\nEND;", OTHER)); } private static List<String> negativeCases() { - return Arrays.asList( - "", "\n", " ", "-- comment;", "SHOW TABLES -- comment;", "SHOW TABLES"); + return Arrays.asList("-- comment;", "SHOW TABLES -- comment;", "SHOW TABLES"); } /** Used to load generated data. */ private static class TestSpec { String statement; - StatementType type; + Command command; Review Comment: Nit: add `@Nullable` annotation ########## flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/util/TestHiveCatalogFactory.java: ########## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogDatabaseImpl; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.hive.HiveCatalog; +import org.apache.flink.table.catalog.hive.HiveTestUtils; +import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory; +import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.types.DataType; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * A test factory that is the same as {@link HiveCatalogFactory} except returning a {@link + * HiveCatalog} always with an embedded Hive metastore to test logic of {@link HiveCatalogFactory}. + */ +public class TestHiveCatalogFactory extends HiveCatalogFactory { + public static final String ADDITIONAL_TEST_DATABASE = "additional_test_database"; + public static final String TEST_TABLE = "test_table"; + static final String TABLE_WITH_PARAMETERIZED_TYPES = "param_types_table"; + + @Override + public String factoryIdentifier() { + return "hive-test"; + } + + @Override + public Catalog createCatalog(Context context) { + final Configuration configuration = Configuration.fromMap(context.getOptions()); + + // Developers may already have their own production/testing hive-site.xml set in their + // environment, + // and Flink tests should avoid using those hive-site.xml. + // Thus, explicitly create a testing HiveConf for unit tests here + Catalog hiveCatalog = + HiveTestUtils.createHiveCatalog( + context.getName(), + configuration.getString(HiveCatalogFactoryOptions.HIVE_VERSION)); + + // Creates an additional database to test tableEnv.useDatabase() will switch current + // database of the catalog + hiveCatalog.open(); + try { + hiveCatalog.createDatabase( + ADDITIONAL_TEST_DATABASE, + new CatalogDatabaseImpl(new HashMap<>(), null), Review Comment: Nit: `Collections.emptyMap()` ########## flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/parser/SqlMultiLineParser.java: ########## @@ -44,11 +43,9 @@ public class SqlMultiLineParser extends DefaultParser { /** Sql command parser. */ private final SqlCommandParser parser; /** Exception caught in parsing. */ - private Throwable parseException = null; - /** Operation parsed. */ - private Operation parsedOperation = null; - /** Command read from terminal. */ - private String command; + private SqlExecutionException parseException = null; + /** Parsed statement type. */ + private Command statementType; Review Comment: Nit: add `@Nullable` annotation ########## flink-table/flink-sql-client/src/test/resources/sql/table.q: ########## @@ -830,15 +895,18 @@ CREATE TABLE IF NOT EXISTS daily_orders ( PRIMARY KEY(dt, `user`) NOT ENFORCED ) PARTITIONED BY (dt) WITH ( 'connector' = 'filesystem', - 'path' = '$VAR_BATCH_PATH', + 'path' = '/var/folders/1k/kcnx0zh91c1dkl0_m00ds2mh0000gp/T/junit8174794114152489198/3d95f3a7-ec94-4352-b6b8-2d933b6c89e33458890576674823971', Review Comment: revert this value ########## flink-table/flink-sql-client/src/test/resources/sql/table.q: ########## @@ -79,7 +79,10 @@ show tables; # test SHOW CREATE TABLE show create table orders; -CREATE TABLE `default_catalog`.`default_database`.`orders` ( ++-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ Review Comment: Is this by design? Personally, I felt the column length be better calculated according to the content length. Besides, the right border of the table is missing? ![image](https://user-images.githubusercontent.com/55568005/213472289-dcaccb1f-9410-494f-b77d-dc033f3eae80.png) I thought it should be the following format that aligned with what we have now. ```text Flink SQL> show create table foo; +-----------------------------------------------------------+ | result | +-----------------------------------------------------------+ | CREATE TABLE `default_catalog`.`default_database`.`foo` ( | | `f0` INT, | | `f1` VARCHAR(2147483647) | | ) WITH ( | | 'connector' = 'datagen' | | ) | +-----------------------------------------------------------+ 1 row in set ``` ########## flink-table/flink-sql-client/src/test/resources/sql/table.q: ########## @@ -79,7 +79,10 @@ show tables; # test SHOW CREATE TABLE show create table orders; -CREATE TABLE `default_catalog`.`default_database`.`orders` ( ++-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ Review Comment: ![image](https://user-images.githubusercontent.com/55568005/213477033-23014f61-46e8-44da-903b-232f7cdf66f4.png) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org