LadyForest commented on code in PR #21752:
URL: https://github.com/apache/flink/pull/21752#discussion_r1089891027
##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/DefaultContextUtils.java:
##########
@@ -69,6 +69,15 @@ public static DefaultContext buildDefaultContext(CliOptions
options) {
return new DefaultContext(configuration, dependencies);
}
+ public static DefaultContext
buildDefaultContext(CliOptions.GatewayCliOptions options) {
Review Comment:
`options` not used?
##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java:
##########
@@ -285,35 +268,18 @@ public static CliOptions parseGatewayModeClient(String[]
args) {
try {
DefaultParser parser = new DefaultParser();
CommandLine line = parser.parse(GATEWAY_MODE_CLIENT_OPTIONS, args,
true);
- return new CliOptions(
+ return new CliOptions.GatewayCliOptions(
line.hasOption(CliOptionsParser.OPTION_HELP.getOpt()),
checkSessionId(line),
- null,
- null,
- checkUrls(line, CliOptionsParser.OPTION_JAR),
- checkUrls(line, CliOptionsParser.OPTION_LIBRARY),
+ checkUrl(line, CliOptionsParser.OPTION_INIT_FILE),
+ checkUrl(line, CliOptionsParser.OPTION_FILE),
line.getOptionValue(CliOptionsParser.OPTION_UPDATE.getOpt()),
line.getOptionValue(CliOptionsParser.OPTION_HISTORY.getOpt()),
- getPythonConfiguration(line));
- } catch (ParseException e) {
- throw new SqlClientException(e.getMessage());
- }
- }
-
- public static CliOptions parseGatewayModeGateway(String[] args) {
- try {
- DefaultParser parser = new DefaultParser();
- CommandLine line = parser.parse(GATEWAY_MODE_GATEWAY_OPTIONS,
args, true);
- return new CliOptions(
- line.hasOption(CliOptionsParser.OPTION_HELP.getOpt()),
- null,
- null,
- null,
- checkUrls(line, CliOptionsParser.OPTION_JAR),
- checkUrls(line, CliOptionsParser.OPTION_LIBRARY),
- null,
- null,
- getPythonConfiguration(line));
+
line.hasOption(CliOptionsParser.OPTION_ENDPOINT_ADDRESS.getOpt())
+ ? NetUtils.parseHostPortAddress(
+ line.getOptionValue(
+
CliOptionsParser.OPTION_ENDPOINT_ADDRESS.getOpt()))
Review Comment:
Just curious about what a `null` endpoint means.
##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java:
##########
@@ -56,38 +57,52 @@
* <p>- In embedded mode, the SQL CLI is tightly coupled with the executor in
a common process. This
* allows for submitting jobs without having to start an additional component.
*
- * <p>- In future versions: In gateway mode, the SQL CLI client connects to
the REST API of the
- * gateway and allows for managing queries via console.
- *
- * <p>For debugging in an IDE you can execute the main method of this class
using: "--defaults
- * /path/to/sql-client-defaults.yaml --jar
/path/to/target/flink-sql-client-*.jar"
- *
- * <p>Make sure that the FLINK_CONF_DIR environment variable is set.
+ * <p>- In gateway mode, the SQL CLI client connects to the REST API of the
gateway and allows for
+ * managing queries via console.
*/
public class SqlClient {
private static final Logger LOG = LoggerFactory.getLogger(SqlClient.class);
- private final boolean isEmbedded;
+ private final boolean isGatewayMode;
private final CliOptions options;
private final Supplier<Terminal> terminalFactory;
public static final String MODE_EMBEDDED = "embedded";
public static final String MODE_GATEWAY = "gateway";
+ public static final String MODE_NONE = "";
Review Comment:
I think it's better to have all kinds of `MODE_XX` as `Enum` instead of
`String`
##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java:
##########
@@ -265,9 +289,13 @@ public void close() {
private static class EmbeddedShutdownThread extends Thread {
Review Comment:
If `embedded` and `gateway` shares the shutdown hook logic, we'd better
rename `EmbeddedShutdownThread` to `ShutdownThread`
##########
flink-table/flink-sql-client/src/test/resources/cli/all-mode-help.out:
##########
@@ -0,0 +1,175 @@
+./sql-client [MODE] [OPTIONS]
Review Comment:
What I've got
```text
./bin/sql-client.sh -h
usage:
-f,--file <script file> Script file that should be
executed. In this mode, the
client will not open an
interactive terminal.
-h,--help Show the help message with
descriptions of all options.
-hist,--history <History file path> The file which you want to
save
the command history into. If
not
specified, we will
auto-generate
one under your user's home
directory.
-i,--init <initialization file> Script file that used to init
the session context. If get
error in execution, the sql
client will exit. Notice it's
not allowed to add query or
insert into the init file.
-j,--jar <JAR file> A JAR file to be imported
into
the session. The file might
contain user-defined classes
needed for the execution of
statements such as functions,
table sources, or sinks. Can
be
used multiple times.
-l,--library <JAR directory> A JAR file directory with
which
every new session is
initialized. The files might
contain user-defined classes
needed for the execution of
statements such as functions,
table sources, or sinks. Can
be
used multiple times.
-pyarch,--pyArchives <arg> Add python archive files for
job. The archive files will
be
extracted to the working
directory of python UDF
worker.
For each archive file, a
target
directory be specified. If
the
target directory name is
specified, the archive file
will
be extracted to a directory
with
the specified name.
Otherwise,
the archive file will be
extracted to a directory with
the same name of the archive
file. The files uploaded via
this option are accessible
via
relative path. '#' could be
used
as the separator of the
archive
file path and the target
directory name. Comma (',')
could be used as the
separator
to specify multiple archive
files. This option can be
used
to upload the virtual
environment, the data files
used
in Python UDF (e.g.,
--pyArchives
file:///tmp/py37.zip,file:///tmp
/data.zip#data --pyExecutable
py37.zip/py37/bin/python).
The
data files could be accessed
in
Python UDF, e.g.: f =
open('data/data.txt', 'r').
-pyclientexec,--pyClientExecutable <arg> The path of the Python
interpreter used to launch
the
Python process when
submitting
the Python jobs via "flink
run"
or compiling the Java/Scala
jobs
containing Python UDFs.
-pyexec,--pyExecutable <arg> Specify the path of the
python
interpreter used to execute
the
python UDF worker (e.g.:
--pyExecutable
/usr/local/bin/python3). The
python UDF worker depends on
Python 3.7+, Apache Beam
(version == 2.43.0), Pip
(version >= 20.3) and
SetupTools
(version >= 37.0.0). Please
ensure that the specified
environment meets the above
requirements.
-pyfs,--pyFiles <pythonFiles> Attach custom files for job.
The
standard resource file
suffixes
such as .py/.egg/.zip/.whl or
directory are all supported.
These files will be added to
the
PYTHONPATH of both the local
client and the remote python
UDF
worker. Files suffixed with
.zip
will be extracted and added
to
PYTHONPATH. Comma (',')
could be
used as the separator to
specify
multiple files (e.g.,
--pyFiles
file:///tmp/myresource.zip,hdfs:
///$namenode_address/myresource2
.zip).
-pyreq,--pyRequirements <arg> Specify a requirements.txt
file
which defines the third-party
dependencies. These
dependencies
will be installed and added
to
the PYTHONPATH of the python
UDF
worker. A directory which
contains the installation
packages of these
dependencies
could be specified
optionally.
Use '#' as the separator if
the
optional parameter exists
(e.g.,
--pyRequirements
file:///tmp/requirements.txt#fil
e:///tmp/cached_dir).
-s,--session <session identifier> The identifier for a session.
'default' is the default
identifier.
-u,--update <SQL update statement> Deprecated Experimental (for
testing only!) feature:
Instructs the SQL Client to
immediately execute the given
update statement after
starting
up. The process is shut down
after the statement has been
submitted to the cluster and
returns an appropriate return
code. Currently, this
feature is
only supported for INSERT
INTO
statements that declare the
target sink table.Please use
option -f to submit update
statement.
./sql-client [MODE] [OPTIONS]
The following options are available:
Mode "embedded" (default) submits Flink jobs from the local machine.
Syntax: [embedded] [OPTIONS]
"embedded" mode options:
-f,--file <script file> Script file that should be
executed. In this mode, the
client will not open an
interactive terminal.
-h,--help Show the help message with
descriptions of all options.
-hist,--history <History file path> The file which you want to
save
the command history into. If
not
specified, we will
auto-generate
one under your user's home
directory.
-i,--init <initialization file> Script file that used to init
the session context. If get
error in execution, the sql
client will exit. Notice it's
not allowed to add query or
insert into the init file.
-j,--jar <JAR file> A JAR file to be imported
into
the session. The file might
contain user-defined classes
needed for the execution of
statements such as functions,
table sources, or sinks. Can
be
used multiple times.
-l,--library <JAR directory> A JAR file directory with
which
every new session is
initialized. The files might
contain user-defined classes
needed for the execution of
statements such as functions,
table sources, or sinks. Can
be
used multiple times.
-pyarch,--pyArchives <arg> Add python archive files for
job. The archive files will
be
extracted to the working
directory of python UDF
worker.
For each archive file, a
target
directory be specified. If
the
target directory name is
specified, the archive file
will
be extracted to a directory
with
the specified name.
Otherwise,
the archive file will be
extracted to a directory with
the same name of the archive
file. The files uploaded via
this option are accessible
via
relative path. '#' could be
used
as the separator of the
archive
file path and the target
directory name. Comma (',')
could be used as the
separator
to specify multiple archive
files. This option can be
used
to upload the virtual
environment, the data files
used
in Python UDF (e.g.,
--pyArchives
file:///tmp/py37.zip,file:///tmp
/data.zip#data --pyExecutable
py37.zip/py37/bin/python).
The
data files could be accessed
in
Python UDF, e.g.: f =
open('data/data.txt', 'r').
-pyclientexec,--pyClientExecutable <arg> The path of the Python
interpreter used to launch
the
Python process when
submitting
the Python jobs via "flink
run"
or compiling the Java/Scala
jobs
containing Python UDFs.
-pyexec,--pyExecutable <arg> Specify the path of the
python
interpreter used to execute
the
python UDF worker (e.g.:
--pyExecutable
/usr/local/bin/python3). The
python UDF worker depends on
Python 3.7+, Apache Beam
(version == 2.43.0), Pip
(version >= 20.3) and
SetupTools
(version >= 37.0.0). Please
ensure that the specified
environment meets the above
requirements.
-pyfs,--pyFiles <pythonFiles> Attach custom files for job.
The
standard resource file
suffixes
such as .py/.egg/.zip/.whl or
directory are all supported.
These files will be added to
the
PYTHONPATH of both the local
client and the remote python
UDF
worker. Files suffixed with
.zip
will be extracted and added
to
PYTHONPATH. Comma (',')
could be
used as the separator to
specify
multiple files (e.g.,
--pyFiles
file:///tmp/myresource.zip,hdfs:
///$namenode_address/myresource2
.zip).
-pyreq,--pyRequirements <arg> Specify a requirements.txt
file
which defines the third-party
dependencies. These
dependencies
will be installed and added
to
the PYTHONPATH of the python
UDF
worker. A directory which
contains the installation
packages of these
dependencies
could be specified
optionally.
Use '#' as the separator if
the
optional parameter exists
(e.g.,
--pyRequirements
file:///tmp/requirements.txt#fil
e:///tmp/cached_dir).
-s,--session <session identifier> The identifier for a session.
'default' is the default
identifier.
-u,--update <SQL update statement> Deprecated Experimental (for
testing only!) feature:
Instructs the SQL Client to
immediately execute the given
update statement after
starting
up. The process is shut down
after the statement has been
submitted to the cluster and
returns an appropriate return
code. Currently, this
feature is
only supported for INSERT
INTO
statements that declare the
target sink table.Please use
option -f to submit update
statement.
Mode "gateway" mode connects to the SQL gateway for submission.
Syntax: gateway [OPTIONS]
"gateway" mode options:
-e,--endpoint <SQL Gateway address> The address of the remote SQL
Gateway
to connect.
-f,--file <script file> Script file that should be
executed.
In this mode, the client will not
open an interactive terminal.
-h,--help Show the help message with
descriptions of all options.
-hist,--history <History file path> The file which you want to save
the
command history into. If not
specified, we will auto-generate
one
under your user's home directory.
-i,--init <initialization file> Script file that used to init the
session context. If get error in
execution, the sql client will
exit.
Notice it's not allowed to add
query
or insert into the init file.
-s,--session <session identifier> The identifier for a session.
'default' is the default
identifier.
-u,--update <SQL update statement> Deprecated Experimental (for
testing
only!) feature: Instructs the SQL
Client to immediately execute the
given update statement after
starting
up. The process is shut down after
the statement has been submitted
to
the cluster and returns an
appropriate return code.
Currently,
this feature is only supported for
INSERT INTO statements that
declare
the target sink table.Please use
option -f to submit update
statement.
```
##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java:
##########
@@ -56,38 +57,52 @@
* <p>- In embedded mode, the SQL CLI is tightly coupled with the executor in
a common process. This
* allows for submitting jobs without having to start an additional component.
*
- * <p>- In future versions: In gateway mode, the SQL CLI client connects to
the REST API of the
- * gateway and allows for managing queries via console.
- *
- * <p>For debugging in an IDE you can execute the main method of this class
using: "--defaults
- * /path/to/sql-client-defaults.yaml --jar
/path/to/target/flink-sql-client-*.jar"
- *
- * <p>Make sure that the FLINK_CONF_DIR environment variable is set.
+ * <p>- In gateway mode, the SQL CLI client connects to the REST API of the
gateway and allows for
+ * managing queries via console.
*/
public class SqlClient {
private static final Logger LOG = LoggerFactory.getLogger(SqlClient.class);
- private final boolean isEmbedded;
+ private final boolean isGatewayMode;
private final CliOptions options;
private final Supplier<Terminal> terminalFactory;
public static final String MODE_EMBEDDED = "embedded";
public static final String MODE_GATEWAY = "gateway";
+ public static final String MODE_NONE = "";
- public SqlClient(boolean isEmbedded, CliOptions options,
Supplier<Terminal> terminalFactory) {
- this.isEmbedded = isEmbedded;
+ public SqlClient(
+ boolean isGatewayMode, CliOptions options, Supplier<Terminal>
terminalFactory) {
+ this.isGatewayMode = isGatewayMode;
this.options = options;
this.terminalFactory = terminalFactory;
}
private void start() {
- if (isEmbedded) {
- DefaultContext defaultContext =
LocalContextUtils.buildDefaultContext(options);
+ if (isGatewayMode) {
+ CliOptions.GatewayCliOptions gatewayCliOptions =
(CliOptions.GatewayCliOptions) options;
+ try (ExecutorImpl executor =
+ new ExecutorImpl(
+
DefaultContextUtils.buildDefaultContext(gatewayCliOptions),
+ gatewayCliOptions
+ .getGatewayAddress()
+ .orElseThrow(
+ () ->
+ new SqlClientException(
+ "Please specify
the address of the SQL Gateway with command line option"
+ + "
'-e,--endpoint <SQL Gateway address>' in the gateway mode.")))) {
Review Comment:
If the exception is thrown here, why `endpoint` might be null at
`CliOptionsParser#L282`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]