This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit baa272311772e4bc2aa59779b7f61786575c2978 Author: Jark Wu <[email protected]> AuthorDate: Fri Mar 19 16:36:20 2021 +0800 [FLINK-21466][sql-client] Add unit tests for SqlClient This closes #15255 --- .../docs/dev/table/sql/gettingStarted.md | 2 +- docs/content.zh/docs/dev/table/sqlClient.md | 10 +- docs/content/docs/dev/table/sql/gettingStarted.md | 2 +- docs/content/docs/dev/table/sqlClient.md | 12 +- .../flink/tests/util/flink/FlinkContainer.java | 1 - .../flink/tests/util/flink/FlinkDistribution.java | 1 - .../test-scripts/test_sql_client.sh | 2 +- .../org/apache/flink/table/client/SqlClient.java | 12 +- .../apache/flink/table/client/cli/CliClient.java | 17 ++- .../flink/table/client/cli/CliOptionsParser.java | 5 +- .../apache/flink/table/client/SqlClientTest.java | 149 +++++++++++++++++++++ .../table/client/cli/TerminalStreamsResource.java | 45 +++++++ .../src/test/resources/sql-client-help.out | 116 ++++++++++++++++ pom.xml | 1 + 14 files changed, 356 insertions(+), 19 deletions(-) diff --git a/docs/content.zh/docs/dev/table/sql/gettingStarted.md b/docs/content.zh/docs/dev/table/sql/gettingStarted.md index 3bfe11e..17ee4f1 100644 --- a/docs/content.zh/docs/dev/table/sql/gettingStarted.md +++ b/docs/content.zh/docs/dev/table/sql/gettingStarted.md @@ -50,7 +50,7 @@ Flink SQL 使得使用标准 SQL 开发流应用程序变的简单。如果你 在安装文件夹中运行 `sql-client` 脚本来启动 SQL 客户端。 ```bash -./bin/sql-client.sh embedded +./bin/sql-client.sh ``` ### Hello World diff --git a/docs/content.zh/docs/dev/table/sqlClient.md b/docs/content.zh/docs/dev/table/sqlClient.md index 48a5fe8..8758340 100644 --- a/docs/content.zh/docs/dev/table/sqlClient.md +++ b/docs/content.zh/docs/dev/table/sqlClient.md @@ -53,6 +53,12 @@ SQL Client 脚本也位于 Flink 的 bin 目录中。[将来](sqlClient.html#lim ./bin/sql-client.sh ``` +或者显式使用 `embedded` 模式: + +```bash +./bin/sql-client.sh embedded +``` + 默认情况下,SQL 客户端将从 `./conf/sql-client-defaults.yaml` 中读取配置。有关环境配置文件结构的更多信息,请参见[配置部分](sqlClient.html#environment-files)。 ### 执行 SQL 查询 @@ -160,9 +166,9 @@ SQL 客户端启动时可以添加 CLI 选项,具体如下。 ```text ./bin/sql-client.sh --help -Mode "embedded" submits Flink jobs from the local machine. +Mode "embedded" (default) submits Flink jobs from the local machine. - Syntax: embedded [OPTIONS] + Syntax: [embedded] [OPTIONS] "embedded" mode options: -d,--defaults <environment file> The environment properties with which every new session is initialized. diff --git a/docs/content/docs/dev/table/sql/gettingStarted.md b/docs/content/docs/dev/table/sql/gettingStarted.md index c404fc4..2c9559c 100644 --- a/docs/content/docs/dev/table/sql/gettingStarted.md +++ b/docs/content/docs/dev/table/sql/gettingStarted.md @@ -50,7 +50,7 @@ The [SQL Client]({{< ref "docs/dev/table/sqlClient" >}}) is an interactive clien To start the SQL client, run the `sql-client` script from the installation folder. ```bash -./bin/sql-client.sh embedded +./bin/sql-client.sh ``` ### Hello World diff --git a/docs/content/docs/dev/table/sqlClient.md b/docs/content/docs/dev/table/sqlClient.md index 0f8111d..bfe2fc7 100644 --- a/docs/content/docs/dev/table/sqlClient.md +++ b/docs/content/docs/dev/table/sqlClient.md @@ -54,6 +54,12 @@ The SQL Client scripts are also located in the binary directory of Flink. [In th ./bin/sql-client.sh ``` +or explicitly use `embedded` mode: + +```bash +./bin/sql-client.sh embedded +``` + By default, the SQL Client will read its configuration from the environment file located in `./conf/sql-client-defaults.yaml`. See the [configuration part](sqlClient.html#environment-files) for more information about the structure of environment files. ### Running SQL Queries @@ -157,11 +163,11 @@ Configuration The SQL Client can be started with the following optional CLI commands. They are discussed in detail in the subsequent paragraphs. ```text -./bin/sql-client.sh embedded --help +./bin/sql-client.sh --help -Mode "embedded" submits Flink jobs from the local machine. +Mode "embedded" (default) submits Flink jobs from the local machine. - Syntax: embedded [OPTIONS] + Syntax: [embedded] [OPTIONS] "embedded" mode options: -d,--defaults <environment file> The environment properties with which every new session is initialized. diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkContainer.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkContainer.java index 15eea95..ad167c6 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkContainer.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkContainer.java @@ -178,7 +178,6 @@ public class FlinkContainer extends GenericContainer<FlinkContainer> implements copyFileToContainer(MountableFile.forHostPath(script), "/tmp/script.sql"); commands.add("cat /tmp/script.sql | "); commands.add(FLINK_BIN + "/sql-client.sh"); - commands.add("embedded"); job.getDefaultEnvFile() .ifPresent( defaultEnvFile -> { diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java index 6e90233..0b1d398 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java @@ -209,7 +209,6 @@ final class FlinkDistribution { public void submitSQLJob(SQLJobSubmission job, Duration timeout) throws IOException { final List<String> commands = new ArrayList<>(); commands.add(bin.resolve("sql-client.sh").toAbsolutePath().toString()); - commands.add("embedded"); job.getDefaultEnvFile() .ifPresent( defaultEnvFile -> { diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client.sh b/flink-end-to-end-tests/test-scripts/test_sql_client.sh index 2ccd377..0075b36 100755 --- a/flink-end-to-end-tests/test-scripts/test_sql_client.sh +++ b/flink-end-to-end-tests/test-scripts/test_sql_client.sh @@ -291,7 +291,7 @@ INSERT INTO ElasticsearchAppendSinkTable EOF ) -JOB_ID=$($FLINK_DIR/bin/sql-client.sh embedded \ +JOB_ID=$($FLINK_DIR/bin/sql-client.sh \ --jar $KAFKA_SQL_JAR \ --jar $ELASTICSEARCH_SQL_JAR \ --jar $SQL_TOOLBOX_JAR \ diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java index 9e6a4f3..8e2bcbc 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java @@ -44,8 +44,8 @@ import java.util.Arrays; * <p>- In future versions: In gateway mode, the SQL CLI client connects to the REST API of the * gateway and allows for managing queries via console. * - * <p>For debugging in an IDE you can execute the main method of this class using: "embedded - * --defaults /path/to/sql-client-defaults.yaml --jar /path/to/target/flink-sql-client-*.jar" + * <p>For debugging in an IDE you can execute the main method of this class using: "--defaults + * /path/to/sql-client-defaults.yaml --jar /path/to/target/flink-sql-client-*.jar" * * <p>Make sure that the FLINK_CONF_DIR environment variable is set. */ @@ -125,20 +125,20 @@ public class SqlClient { // -------------------------------------------------------------------------------------------- public static void main(String[] args) { - final String model; + final String mode; final String[] modeArgs; if (args.length < 1 || args[0].startsWith("-")) { // mode is not specified, use the default `embedded` mode - model = MODE_EMBEDDED; + mode = MODE_EMBEDDED; modeArgs = args; } else { // mode is specified, extract the mode value and reaming args - model = args[0]; + mode = args[0]; // remove mode modeArgs = Arrays.copyOfRange(args, 1, args.length); } - switch (model) { + switch (mode) { case MODE_EMBEDDED: final CliOptions options = CliOptionsParser.parseEmbeddedModeClient(modeArgs); if (options.isPrintHelp()) { diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java index 46b66d0..08c3f4b 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java @@ -20,6 +20,7 @@ package org.apache.flink.table.client.cli; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.client.SqlClient; import org.apache.flink.table.client.SqlClientException; import org.apache.flink.table.client.cli.SqlCommandParser.SqlCommandCall; import org.apache.flink.table.client.config.YamlConfigUtils; @@ -684,9 +685,23 @@ public class CliClient implements AutoCloseable { // -------------------------------------------------------------------------------------------- + /** + * Internal flag to use {@link System#in} and {@link System#out} stream to construct {@link + * Terminal} for tests. This allows tests can easily mock input stream when startup {@link + * SqlClient}. + */ + protected static boolean useSystemInOutStream = false; + private static Terminal createDefaultTerminal() { try { - return TerminalBuilder.builder().name(CliStrings.CLI_NAME).build(); + if (useSystemInOutStream) { + return TerminalBuilder.builder() + .name(CliStrings.CLI_NAME) + .streams(System.in, System.out) + .build(); + } else { + return TerminalBuilder.builder().name(CliStrings.CLI_NAME).build(); + } } catch (IOException e) { throw new SqlClientException("Error opening command line interface.", e); } diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java index 153bbdd..880cbd5 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java @@ -216,8 +216,9 @@ public class CliOptionsParser { formatter.setLeftPadding(5); formatter.setWidth(80); - System.out.println("\nMode \"embedded\" submits Flink jobs from the local machine."); - System.out.println("\n Syntax: embedded [OPTIONS]"); + System.out.println( + "\nMode \"embedded\" (default) submits Flink jobs from the local machine."); + System.out.println("\n Syntax: [embedded] [OPTIONS]"); formatter.setSyntaxPrefix(" \"embedded\" mode options:"); formatter.printHelp(" ", EMBEDDED_MODE_CLIENT_OPTIONS); diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java new file mode 100644 index 0000000..128e126 --- /dev/null +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client; + +import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.table.client.cli.TerminalStreamsResource; +import org.apache.flink.util.FileUtils; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.PrintStream; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_CONF_DIR; +import static org.hamcrest.CoreMatchers.containsString; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +/** Tests for {@link SqlClient}. */ +public class SqlClientTest { + + @Rule public ExpectedException thrown = ExpectedException.none(); + + @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Rule public final TerminalStreamsResource useSystemStream = TerminalStreamsResource.INSTANCE; + + private PrintStream originalPrintStream; + + private InputStream originalInputStream; + + private ByteArrayOutputStream testOutputStream; + + private Map<String, String> originalEnv; + + private String historyPath; + + @Before + public void before() throws IOException { + originalEnv = System.getenv(); + originalPrintStream = System.out; + originalInputStream = System.in; + testOutputStream = new ByteArrayOutputStream(); + System.setOut(new PrintStream(testOutputStream, true)); + // send "QUIT;" command to gracefully shutdown the terminal + System.setIn(new ByteArrayInputStream("QUIT;".getBytes(StandardCharsets.UTF_8))); + + // prepare conf dir + File confFolder = tempFolder.newFolder("conf"); + File confYaml = new File(confFolder, "flink-conf.yaml"); + if (!confYaml.createNewFile()) { + throw new IOException("Can't create testing flink-conf.yaml file."); + } + + // adjust the test environment for the purposes of this test + Map<String, String> map = new HashMap<>(System.getenv()); + map.put(ENV_FLINK_CONF_DIR, confFolder.getAbsolutePath()); + CommonTestUtils.setEnv(map); + + historyPath = tempFolder.newFile("history").toString(); + } + + @After + public void after() throws InterruptedException { + System.setOut(originalPrintStream); + System.setIn(originalInputStream); + CommonTestUtils.setEnv(originalEnv); + } + + private String getStdoutString() { + return testOutputStream.toString(); + } + + @Test(timeout = 20000) + public void testEmbeddedWithOptions() throws InterruptedException { + String[] args = new String[] {"embedded", "-hist", historyPath}; + SqlClient.main(args); + assertThat(getStdoutString(), containsString("Command history file path: " + historyPath)); + } + + @Test(timeout = 20000) + public void testEmbeddedWithLongOptions() throws InterruptedException { + String[] args = new String[] {"embedded", "--history", historyPath}; + SqlClient.main(args); + assertThat(getStdoutString(), containsString("Command history file path: " + historyPath)); + } + + @Test(timeout = 20000) + public void testEmbeddedWithoutOptions() throws InterruptedException { + String[] args = new String[] {"embedded"}; + SqlClient.main(args); + assertThat(getStdoutString(), containsString("Command history file path")); + } + + @Test(timeout = 20000) + public void testEmptyOptions() throws InterruptedException { + String[] args = new String[] {}; + SqlClient.main(args); + assertThat(getStdoutString(), containsString("Command history file path")); + } + + @Test(timeout = 20000) + public void testUnsupportedGatewayMode() throws Exception { + String[] args = new String[] {"gateway"}; + thrown.expect(SqlClientException.class); + thrown.expectMessage("Gateway mode is not supported yet."); + SqlClient.main(args); + } + + @Test(timeout = 20000) + public void testPrintHelpForUnknownMode() throws IOException { + String[] args = new String[] {"unknown"}; + SqlClient.main(args); + final URL url = getClass().getClassLoader().getResource("sql-client-help.out"); + Objects.requireNonNull(url); + final String help = FileUtils.readFileUtf8(new File(url.getFile())); + assertEquals(help, getStdoutString()); + } +} diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TerminalStreamsResource.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TerminalStreamsResource.java new file mode 100644 index 0000000..85a5849 --- /dev/null +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TerminalStreamsResource.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.cli; + +import org.junit.rules.ExternalResource; + +/** + * Enables {@link org.apache.flink.table.client.SqlClient} to create a default terminal using {@link + * System#in} and {@link System#out} as the input and output stream. This can allows tests to easily + * mock input stream of the SqlClient by hijacking the standard stream. + */ +public class TerminalStreamsResource extends ExternalResource { + + public static final TerminalStreamsResource INSTANCE = new TerminalStreamsResource(); + + private TerminalStreamsResource() { + // singleton + } + + @Override + protected void before() throws Throwable { + CliClient.useSystemInOutStream = true; + } + + @Override + protected void after() { + CliClient.useSystemInOutStream = false; + } +} diff --git a/flink-table/flink-sql-client/src/test/resources/sql-client-help.out b/flink-table/flink-sql-client/src/test/resources/sql-client-help.out new file mode 100644 index 0000000..fe9173f --- /dev/null +++ b/flink-table/flink-sql-client/src/test/resources/sql-client-help.out @@ -0,0 +1,116 @@ +./sql-client [MODE] [OPTIONS] + +The following options are available: + +Mode "embedded" (default) submits Flink jobs from the local machine. + + Syntax: [embedded] [OPTIONS] + "embedded" mode options: + -d,--defaults <environment file> The environment properties with which + every new session is initialized. + Properties might be overwritten by + session properties. + -e,--environment <environment file> The environment properties to be + imported into the session. It might + overwrite default environment + properties. + -h,--help Show the help message with + descriptions of all options. + -hist,--history <History file path> The file which you want to save the + command history into. If not + specified, we will auto-generate one + under your user's home directory. + -j,--jar <JAR file> A JAR file to be imported into the + session. The file might contain + user-defined classes needed for the + execution of statements such as + functions, table sources, or sinks. + Can be used multiple times. + -l,--library <JAR directory> A JAR file directory with which every + new session is initialized. The files + might contain user-defined classes + needed for the execution of + statements such as functions, table + sources, or sinks. Can be used + multiple times. + -pyarch,--pyArchives <arg> Add python archive files for job. The + archive files will be extracted to + the working directory of python UDF + worker. Currently only zip-format is + supported. For each archive file, a + target directory be specified. If the + target directory name is specified, + the archive file will be extracted to + a name can directory with the + specified name. Otherwise, the + archive file will be extracted to a + directory with the same name of the + archive file. The files uploaded via + this option are accessible via + relative path. '#' could be used as + the separator of the archive file + path and the target directory name. + Comma (',') could be used as the + separator to specify multiple archive + files. This option can be used to + upload the virtual environment, the + data files used in Python UDF (e.g.: + --pyArchives + file:///tmp/py37.zip,file:///tmp/data + .zip#data --pyExecutable + py37.zip/py37/bin/python). The data + files could be accessed in Python + UDF, e.g.: f = open('data/data.txt', + 'r'). + -pyexec,--pyExecutable <arg> Specify the path of the python + interpreter used to execute the + python UDF worker (e.g.: + --pyExecutable + /usr/local/bin/python3). The python + UDF worker depends on Python 3.6+, + Apache Beam (version == 2.27.0), Pip + (version >= 7.1.0) and SetupTools + (version >= 37.0.0). Please ensure + that the specified environment meets + the above requirements. + -pyfs,--pyFiles <pythonFiles> Attach custom python files for job. + The standard python resource file + suffixes such as .py/.egg/.zip or + directory are all supported. These + files will be added to the PYTHONPATH + of both the local client and the + remote python UDF worker. Files + suffixed with .zip will be extracted + and added to PYTHONPATH. Comma (',') + could be used as the separator to + specify multiple files (e.g.: + --pyFiles + file:///tmp/myresource.zip,hdfs:///$n + amenode_address/myresource2.zip). + -pyreq,--pyRequirements <arg> Specify a requirements.txt file which + defines the third-party dependencies. + These dependencies will be installed + and added to the PYTHONPATH of the + python UDF worker. A directory which + contains the installation packages of + these dependencies could be specified + optionally. Use '#' as the separator + if the optional parameter exists + (e.g.: --pyRequirements + file:///tmp/requirements.txt#file:/// + tmp/cached_dir). + -s,--session <session identifier> The identifier for a session. + 'default' is the default identifier. + -u,--update <SQL update statement> Experimental (for testing only!): + Instructs the SQL Client to + immediately execute the given update + statement after starting up. The + process is shut down after the + statement has been submitted to the + cluster and returns an appropriate + return code. Currently, this feature + is only supported for INSERT INTO + statements that declare the target + sink table. + + diff --git a/pom.xml b/pom.xml index 48cadc5..b1d5bc8 100644 --- a/pom.xml +++ b/pom.xml @@ -1480,6 +1480,7 @@ under the License. <exclude>flink-core/src/test/resources/kryo-serializer-config-snapshot-v1</exclude> <exclude>flink-formats/flink-avro/src/test/resources/avro/*.avsc</exclude> <exclude>out/test/flink-avro/avro/user.avsc</exclude> + <exclude>flink-table/flink-sql-client/src/test/resources/*.out</exclude> <exclude>flink-table/flink-table-planner/src/test/scala/resources/*.out</exclude> <exclude>flink-table/flink-table-planner-blink/src/test/resources/**/*.out</exclude> <exclude>flink-yarn/src/test/resources/krb5.keytab</exclude>
