wuchong commented on a change in pull request #15595:
URL: https://github.com/apache/flink/pull/15595#discussion_r616439563
##########
File path: docs/content/docs/dev/table/sqlClient.md
##########
@@ -169,559 +177,523 @@ Mode "embedded" (default) submits Flink jobs from the
local machine.
Syntax: [embedded] [OPTIONS]
"embedded" mode options:
- -d,--defaults <environment file> The environment properties with
which
- every new session is initialized.
- Properties might be overwritten by
- session properties.
- -e,--environment <environment file> The environment properties to be
- imported into the session. It might
- overwrite default environment
- properties.
- -h,--help Show the help message with
- descriptions of all options.
- -hist,--history <History file path> The file which you want to save the
- command history into. If not
- specified, we will auto-generate one
- under your user's home directory.
- -j,--jar <JAR file> A JAR file to be imported into the
- session. The file might contain
- user-defined classes needed for the
- execution of statements such as
- functions, table sources, or sinks.
- Can be used multiple times.
- -l,--library <JAR directory> A JAR file directory with which
every
- new session is initialized. The
files
- might contain user-defined classes
- needed for the execution of
- statements such as functions, table
- sources, or sinks. Can be used
- multiple times.
- -pyarch,--pyArchives <arg> Add python archive files for job.
The
- archive files will be extracted to
- the working directory of python UDF
- worker. Currently only zip-format is
- supported. For each archive file, a
- target directory be specified. If
the
- target directory name is specified,
- the archive file will be extracted
to
- a name can directory with the
- specified name. Otherwise, the
- archive file will be extracted to a
- directory with the same name of the
- archive file. The files uploaded via
- this option are accessible via
- relative path. '#' could be used as
- the separator of the archive file
- path and the target directory name.
- Comma (',') could be used as the
- separator to specify multiple
archive
- files. This option can be used to
- upload the virtual environment, the
- data files used in Python UDF (e.g.:
- --pyArchives
-
file:///tmp/py37.zip,file:///tmp/data
- .zip#data --pyExecutable
- py37.zip/py37/bin/python). The data
- files could be accessed in Python
- UDF, e.g.: f = open('data/data.txt',
- 'r').
- -pyexec,--pyExecutable <arg> Specify the path of the python
- interpreter used to execute the
- python UDF worker (e.g.:
- --pyExecutable
- /usr/local/bin/python3). The python
- UDF worker depends on Python 3.6+,
- Apache Beam (version == 2.27.0), Pip
- (version >= 7.1.0) and SetupTools
- (version >= 37.0.0). Please ensure
- that the specified environment meets
- the above requirements.
- -pyfs,--pyFiles <pythonFiles> Attach custom python files for job.
- These files will be added to the
- PYTHONPATH of both the local client
- and the remote python UDF worker.
The
- standard python resource file
- suffixes such as .py/.egg/.zip or
- directory are all supported. Comma
- (',') could be used as the separator
- to specify multiple files (e.g.:
- --pyFiles
-
file:///tmp/myresource.zip,hdfs:///$n
- amenode_address/myresource2.zip).
- -pyreq,--pyRequirements <arg> Specify a requirements.txt file
which
- defines the third-party
dependencies.
- These dependencies will be installed
- and added to the PYTHONPATH of the
- python UDF worker. A directory which
- contains the installation packages
of
- these dependencies could be
specified
- optionally. Use '#' as the separator
- if the optional parameter exists
- (e.g.: --pyRequirements
-
file:///tmp/requirements.txt#file:///
- tmp/cached_dir).
- -s,--session <session identifier> The identifier for a session.
- 'default' is the default identifier.
- -u,--update <SQL update statement> Experimental (for testing only!):
- Instructs the SQL Client to
- immediately execute the given update
- statement after starting up. The
- process is shut down after the
- statement has been submitted to the
- cluster and returns an appropriate
- return code. Currently, this feature
- is only supported for INSERT INTO
- statements that declare the target
- sink table.
+ -d,--defaults <environment file> Deprecated feature: the
environment
+ properties with which every new
+ session is initialized.
Properties
+ might be overwritten by session
+ properties.
+ -e,--environment <environment file> Deprecated feature: the
environment
+ properties to be imported into
the
+ session. It might overwrite
default
+ environment properties.
+ -f,--file <script file> Script file that should be
executed.
+ In this mode, the client will
not
+ open an interactive terminal.
+ -h,--help Show the help message with
+ descriptions of all options.
+ -hist,--history <History file path> The file which you want to save
the
+ command history into. If not
+ specified, we will
auto-generate one
+ under your user's home
directory.
+ -i,--init <initialization file> Script file that used to init
the
+ session context. If get error in
+ execution, the sql client will
exit.
+ Notice it's not allowed to add
query
+ or insert into the init file.
+ -j,--jar <JAR file> A JAR file to be imported into
the
+ session. The file might contain
+ user-defined classes needed for
the
+ execution of statements such as
+ functions, table sources, or
sinks.
+ Can be used multiple times.
+ -l,--library <JAR directory> A JAR file directory with which
every
+ new session is initialized. The
files
+ might contain user-defined
classes
+ needed for the execution of
+ statements such as functions,
table
+ sources, or sinks. Can be used
+ multiple times.
+ -pyarch,--pyArchives <arg> Add python archive files for
job. The
+ archive files will be extracted
to
+ the working directory of python
UDF
+ worker. Currently only
zip-format is
+ supported. For each archive
file, a
+ target directory be specified.
If the
+ target directory name is
specified,
+ the archive file will be
extracted to
+ a name can directory with the
+ specified name. Otherwise, the
+ archive file will be extracted
to a
+ directory with the same name of
the
+ archive file. The files
uploaded via
+ this option are accessible via
+ relative path. '#' could be
used as
+ the separator of the archive
file
+ path and the target directory
name.
+ Comma (',') could be used as the
+ separator to specify multiple
archive
+ files. This option can be used
to
+ upload the virtual environment,
the
+ data files used in Python UDF
(e.g.:
+ --pyArchives
+
file:///tmp/py37.zip,file:///tmp/data
+ .zip#data --pyExecutable
+ py37.zip/py37/bin/python). The
data
+ files could be accessed in
Python
+ UDF, e.g.: f =
open('data/data.txt',
+ 'r').
+ -pyexec,--pyExecutable <arg> Specify the path of the python
+ interpreter used to execute the
+ python UDF worker (e.g.:
+ --pyExecutable
+ /usr/local/bin/python3). The
python
+ UDF worker depends on Python
3.6+,
+ Apache Beam (version ==
2.27.0), Pip
+ (version >= 7.1.0) and
SetupTools
+ (version >= 37.0.0). Please
ensure
+ that the specified environment
meets
+ the above requirements.
+ -pyfs,--pyFiles <pythonFiles> Attach custom python files for
job.
+ The standard python resource
file
+ suffixes such as .py/.egg/.zip
or
+ directory are all supported.
These
+ files will be added to the
PYTHONPATH
+ of both the local client and the
+ remote python UDF worker. Files
+ suffixed with .zip will be
extracted
+ and added to PYTHONPATH. Comma
(',')
+ could be used as the separator
to
+ specify multiple files (e.g.:
+ --pyFiles
+
file:///tmp/myresource.zip,hdfs:///$n
+
amenode_address/myresource2.zip).
+ -pyreq,--pyRequirements <arg> Specify a requirements.txt file
which
+ defines the third-party
dependencies.
+ These dependencies will be
installed
+ and added to the PYTHONPATH of
the
+ python UDF worker. A directory
which
+ contains the installation
packages of
+ these dependencies could be
specified
+ optionally. Use '#' as the
separator
+ if the optional parameter exists
+ (e.g.: --pyRequirements
+
file:///tmp/requirements.txt#file:///
+ tmp/cached_dir).
+ -s,--session <session identifier> The identifier for a session.
+ 'default' is the default
identifier.
+ -u,--update <SQL update statement> Deprecated Experimental (for
testing
+ only!) feature: Instructs the
SQL
+ Client to immediately execute
the
+ given update statement after
starting
+ up. The process is shut down
after
+ the statement has been
submitted to
+ the cluster and returns an
+ appropriate return code.
Currently,
+ this feature is only supported
for
+ INSERT INTO statements that
declare
+ the target sink table.Please use
+ option -f to submit update
statement.
```
-{{< top >}}
+### SQL Client Configuration
-### Environment Files
-
-A SQL query needs a configuration environment in which it is executed. The
so-called *environment files* define available catalogs, table sources and
sinks, user-defined functions, and other properties required for execution and
deployment.
-
-Every environment file is a regular [YAML file](http://yaml.org/). An example
of such a file is presented below.
-
-```yaml
-# Define tables here such as sources, sinks, views, or temporal tables.
-
-tables:
- - name: MyTableSource
- type: source-table
- update-mode: append
- connector:
- type: filesystem
- path: "/path/to/something.csv"
- format:
- type: csv
- fields:
- - name: MyField1
- data-type: INT
- - name: MyField2
- data-type: VARCHAR
- line-delimiter: "\n"
- comment-prefix: "#"
- schema:
- - name: MyField1
- data-type: INT
- - name: MyField2
- data-type: VARCHAR
- - name: MyCustomView
- type: view
- query: "SELECT MyField2 FROM MyTableSource"
-
-# Define user-defined functions here.
-
-functions:
- - name: myUDF
- from: class
- class: foo.bar.AggregateUDF
- constructor:
- - 7.6
- - false
-
-# Define available catalogs
-
-catalogs:
- - name: catalog_1
- type: hive
- property-version: 1
- hive-conf-dir: ...
- - name: catalog_2
- type: hive
- property-version: 1
- default-database: mydb2
- hive-conf-dir: ...
-
-# Properties that change the fundamental execution behavior of a table program.
-
-execution:
- planner: blink # optional: either 'blink' (default) or
'old'
- type: streaming # required: execution mode either 'batch'
or 'streaming'
- result-mode: table # required: either 'table' or 'changelog'
- max-table-result-rows: 1000000 # optional: maximum number of maintained
rows in
- # 'table' mode (1000000 by default,
smaller 1 means unlimited)
- time-characteristic: event-time # optional: 'processing-time' or
'event-time' (default)
- parallelism: 1 # optional: Flink's parallelism (1 by
default)
- periodic-watermarks-interval: 200 # optional: interval for periodic
watermarks (200 ms by default)
- max-parallelism: 16 # optional: Flink's maximum parallelism
(128 by default)
- min-idle-state-retention: 0 # optional: table program's minimum idle
state time
- max-idle-state-retention: 0 # optional: table program's maximum idle
state time
- current-catalog: catalog_1 # optional: name of the current catalog of
the session ('default_catalog' by default)
- current-database: mydb1 # optional: name of the current database
of the current catalog
- # (default database of the current
catalog by default)
- restart-strategy: # optional: restart strategy
- type: fallback # "fallback" to global restart strategy
by default
-
-# Configuration options for adjusting and tuning table programs.
-
-# A full list of options and their default values can be found
-# on the dedicated "Configuration" page.
-configuration:
- table.optimizer.join-reorder-enabled: true
- table.exec.spill-compression.enabled: true
- table.exec.spill-compression.block-size: 128kb
-```
+{{< generated/sql_client_configuration >}}
-This configuration:
+### Initialize Session Using SQL Files
-- defines an environment with a table source `MyTableSource` that reads from a
CSV file,
-- defines a view `MyCustomView` that declares a virtual table using a SQL
query,
-- defines a user-defined function `myUDF` that can be instantiated using the
class name and two constructor parameters,
-- connects to two Hive catalogs and uses `catalog_1` as the current catalog
with `mydb1` as the current database of the catalog,
-- uses the blink planner in streaming mode for running statements with
event-time characteristic and a parallelism of 1,
-- runs exploratory queries in the `table` result mode,
-- and makes some planner adjustments around join reordering and spilling via
configuration options.
+A SQL query needs a configuration environment in which it is executed. SQL
Client supports the `-i`
+startup option to execute an initialization SQL file to setup environment when
starting up the SQL Client.
+The so-called *initialization SQL file* can use DDLs to define available
catalogs, table sources and sinks,
+user-defined functions, and other properties required for execution and
deployment.
-Depending on the use case, a configuration can be split into multiple files.
Therefore, environment files can be created for general purposes (*defaults
environment file* using `--defaults`) as well as on a per-session basis
(*session environment file* using `--environment`). Every CLI session is
initialized with the default properties followed by the session properties. For
example, the defaults environment file could specify all table sources that
should be available for querying in every session whereas the session
environment file only declares a specific state retention time and parallelism.
Both default and session environment files can be passed when starting the CLI
application. If no default environment file has been specified, the SQL Client
searches for `./conf/sql-client-defaults.yaml` in Flink's configuration
directory.
+An example of such a file is presented below.
-<span class="label label-danger">Attention</span> Properties that have been
set within a CLI session (e.g. using the `SET` command) have highest precedence:
+```sql
+-- Define available catalogs
-```text
-CLI commands > session environment file > defaults environment file
-```
+CREATE CATALOG MyCatalog
+ WITH (
+ 'type' = 'hive'
+ );
-#### Restart Strategies
+USE MyCatalog;
-Restart strategies control how Flink jobs are restarted in case of a failure.
Similar to `global restart strategies` for a Flink cluster, a more fine-grained
restart configuration can be declared in an environment file.
+-- Define available database
-The following strategies are supported:
+CREATE DATABASE MyDatabase;
-```yaml
-execution:
- # falls back to the global strategy defined in flink-conf.yaml
- restart-strategy:
- type: fallback
+USE MyDatabase;
- # job fails directly and no restart is attempted
- restart-strategy:
- type: none
+-- Define TABLE
- # attempts a given number of times to restart the job
- restart-strategy:
- type: fixed-delay
- attempts: 3 # retries before job is declared as failed (default:
Integer.MAX_VALUE)
- delay: 10000 # delay in ms between retries (default: 10 s)
+CREATE TABLE MyTable(
+ MyField1 INT,
+ MyField2 STRING
+) WITH (
+ 'connector' = 'filesystem',
+ 'path' = '/path/to/something',
+ 'format' = 'csv'
+);
- # attempts as long as the maximum number of failures per time interval is
not exceeded
- restart-strategy:
- type: failure-rate
- max-failures-per-interval: 1 # retries in interval until failing
(default: 1)
- failure-rate-interval: 60000 # measuring interval in ms for failure rate
- delay: 10000 # delay in ms between retries (default: 10
s)
-```
+-- Define VIEW
-{{< top >}}
+CREATE VIEW MyCustomView AS SELECT MyField2 FROM MyTable;
-### Dependencies
+-- Define user-defined functions here.
-The SQL Client does not require to setup a Java project using Maven or SBT.
Instead, you can pass the dependencies as regular JAR files that get submitted
to the cluster. You can either specify each JAR file separately (using `--jar`)
or define entire library directories (using `--library`). For connectors to
external systems (such as Apache Kafka) and corresponding data formats (such as
JSON), Flink provides **ready-to-use JAR bundles**. These JAR files can be
downloaded for each release from the Maven central repository.
-
-The full list of offered SQL JARs and documentation about how to use them can
be found on the [connection to external systems page](connect.html).
-
-The following example shows an environment file that defines a table source
reading JSON data from Apache Kafka.
-
-```yaml
-tables:
- - name: TaxiRides
- type: source-table
- update-mode: append
- connector:
- property-version: 1
- type: kafka
- version: "0.11"
- topic: TaxiRides
- startup-mode: earliest-offset
- properties:
- bootstrap.servers: localhost:9092
- group.id: testGroup
- format:
- property-version: 1
- type: json
- schema: "ROW<rideId LONG, lon FLOAT, lat FLOAT, rideTime TIMESTAMP>"
- schema:
- - name: rideId
- data-type: BIGINT
- - name: lon
- data-type: FLOAT
- - name: lat
- data-type: FLOAT
- - name: rowTime
- data-type: TIMESTAMP(3)
- rowtime:
- timestamps:
- type: "from-field"
- from: "rideTime"
- watermarks:
- type: "periodic-bounded"
- delay: "60000"
- - name: procTime
- data-type: TIMESTAMP(3)
- proctime: true
-```
+CREATE FUNCTION foo.bar.AggregateUDF AS myUDF;
-The resulting schema of the `TaxiRide` table contains most of the fields of
the JSON schema. Furthermore, it adds a rowtime attribute `rowTime` and
processing-time attribute `procTime`.
+-- Properties that change the fundamental execution behavior of a table
program.
-Both `connector` and `format` allow to define a property version (which is
currently version `1`) for future backwards compatibility.
+SET table.planner = blink; -- planner: either 'blink' (default) or 'old'
+SET execution.runtime-mode = streaming; -- execution mode either 'batch' or
'streaming'
+SET sql-client.execution.result-mode = table; -- available values: 'table',
'changelog' and 'tableau'
+SET sql-client.execution.max-table-result.rows = 10000; -- optional: maximum
number of maintained rows
+SET parallelism.default = 1; -- optional: Flink's parallelism (1 by default)
+SET pipeline.auto-watermark-interval = 200; --optional: interval for periodic
watermarks
+SET pipeline.max-parallelism = 10; -- optional: Flink's maximum parallelism
+SET table.exec.state.ttl=1000; -- optional: table program's idle state time
+SET restart-strategy = fixed-delay;
-{{< top >}}
+-- Configuration options for adjusting and tuning table programs.
-### User-defined Functions
+SET table.optimizer.join-reorder-enabled = true;
+SET table.exec.spill-compression.enabled = true;
+SET table.exec.spill-compression.block-size = 128kb;
+```
-The SQL Client allows users to create custom, user-defined functions to be
used in SQL queries. Currently, these functions are restricted to be defined
programmatically in Java/Scala classes or Python files.
+This configuration:
-In order to provide a Java/Scala user-defined function, you need to first
implement and compile a function class that extends `ScalarFunction`,
`AggregateFunction` or `TableFunction` (see [User-defined Functions]({{< ref
"docs/dev/table/functions/udfs" >}})). One or more functions can then be
packaged into a dependency JAR for the SQL Client.
+- connects to Hive catalogs and uses `MyCatalog` as the current catalog with
`MyDatabase` as the current database of the catalog,
+- defines a table `MyTableSource` that can read data from a CSV file,
+- defines a view `MyCustomView` that declares a virtual table using a SQL
query,
+- defines a user-defined function `myUDF` that can be instantiated using the
class name,
+- uses the blink planner in streaming mode for running statements and a
parallelism of 1,
+- runs exploratory queries in the `table` result mode,
+- and makes some planner adjustments around join reordering and spilling via
configuration options.
-In order to provide a Python user-defined function, you need to write a Python
function and decorate it with the `pyflink.table.udf.udf` or
`pyflink.table.udf.udtf` decorator (see [Python UDFs]({{< ref
"docs/dev/python/table/udfs/python_udfs" >}})). One or more functions can then
be placed into a Python file. The Python file and related dependencies need to
be specified via the configuration (see [Python Configuration]({{< ref
"docs/dev/python/python_config" >}})) in environment file or the command line
options (see [Command Line Usage]({{< ref "docs/deployment/cli" >}}#usage)).
+When using `-i <init.sql>` option to initialize SQL Client session, the
following statements are allowed in an initialization SQL file:
+- DDL(CREATE/DROP/ALTER),
+- USE CATALOG/DATABASE,
+- LOAD/UNLOAD MODULE,
+- SET command,
+- RESET command.
-All functions must be declared in an environment file before being called. For
each item in the list of `functions`, one must specify
+When execute queries or insert statements, please enter the interactive mode
or use the -f option to submit the SQL statements.
-- a `name` under which the function is registered,
-- the source of the function using `from` (restricted to be `class`
(Java/Scala UDF) or `python` (Python UDF) for now),
+<span class="label label-danger">Attention</span> If SQL Client meets errors
in initialization, SQL Client will exit with error messages.
-The Java/Scala UDF must specify:
+### Dependencies
-- the `class` which indicates the fully qualified class name of the function
and an optional list of `constructor` parameters for instantiation.
+The SQL Client does not require to setup a Java project using Maven or SBT.
Instead, you can pass the
+dependencies as regular JAR files that get submitted to the cluster. You can
either specify each JAR
+file separately (using `--jar`) or define entire library directories (using
`--library`). For
+connectors to external systems (such as Apache Kafka) and corresponding data
formats (such as JSON),
+Flink provides **ready-to-use JAR bundles**. These JAR files can be downloaded
for each release from
+the Maven central repository.
-The Python UDF must specify:
+The full list of offered SQL JARs and documentation about how to use them can
be found on the [connection to external systems page]({{< ref
"docs/connectors/table/overview" >}}).
-- the `fully-qualified-name` which indicates the fully qualified name, i.e the
"[module name].[object name]" of the function.
+Use SQL Client to submit job
+----------------------------
-```yaml
-functions:
- - name: java_udf # required: name of the function
- from: class # required: source of the function
- class: ... # required: fully qualified class name of the
function
- constructor: # optional: constructor parameters of the
function class
- - ... # optional: a literal parameter with implicit
type
- - class: ... # optional: full class name of the parameter
- constructor: # optional: constructor parameters of the
parameter's class
- - type: ... # optional: type of the literal parameter
- value: ... # optional: value of the literal parameter
- - name: python_udf # required: name of the function
- from: python # required: source of the function
- fully-qualified-name: ... # required: fully qualified class name of the
function
-```
+SQL Client allows users to submit jobs either within the interactive command
line or using `-f` option to execute sql file.
-For Java/Scala UDF, make sure that the order and types of the specified
parameters strictly match one of the constructors of your function class.
+In both modes, SQL Client supports to parse and execute all types of the Flink
supported SQL statements.
-#### Constructor Parameters
+### Interactive Command Line
-Depending on the user-defined function, it might be necessary to parameterize
the implementation before using it in SQL statements.
+In interactive Command Line, the SQL Client reads user inputs and executes the
statement when getting semicolon (`;`).
-As shown in the example before, when declaring a user-defined function, a
class can be configured using constructor parameters in one of the following
three ways:
+SQL Client will print success message if the statement is executed
successfully. When getting errors, SQL Client will also print error messages.
+By default, the error message only contains the error cause. In order to print
the full exception stack for debugging, please set the
+`sql-client.verbose` to true through command `SET sql-client.verbose = true;`.
-**A literal value with implicit type:** The SQL Client will automatically
derive the type according to the literal value itself. Currently, only values
of `BOOLEAN`, `INT`, `DOUBLE` and `VARCHAR` are supported here.
-If the automatic derivation does not work as expected (e.g., you need a
VARCHAR `false`), use explicit types instead.
+### Execute SQL Files
-```yaml
-- true # -> BOOLEAN (case sensitive)
-- 42 # -> INT
-- 1234.222 # -> DOUBLE
-- foo # -> VARCHAR
-```
+SQL Client supports to execute a SQL script file with the `-f` option. SQL
Client will execute
+statements one by one in the SQL script file and print execution messages for
each executed statements.
+Once a statement is failed, the SQL Client will exist and all the remaining
statements will not be executed.
-**A literal value with explicit type:** Explicitly declare the parameter with
`type` and `value` properties for type-safety.
+An example of such a file is presented below.
-```yaml
-- type: DECIMAL
- value: 11111111111111111
+```sql
+CREATE TEMPORARY TABLE users (
+ user_id BIGINT,
+ user_name STRING,
+ user_level STRING,
+ region STRING,
+ PRIMARY KEY (user_id) NOT ENFORCED
+) WITH (
+ 'connector' = 'upsert-kafka',
+ 'topic' = 'users',
+ 'properties.bootstrap.servers' = '...',
+ 'key.format' = 'csv',
+ 'value.format' = 'avro'
+);
+
+-- set sync mode
+SET table.dml-sync=true;
+
+-- set the job name
+SET pipeline.name=SqlJob;
+
+-- set the queue that the job submit to
+SET yarn.application.queue=root;
+
+-- set the job parallism
+SET parallism.default=100;
+
+-- restore from the specific savepoint path
+SET
execution.savepoint.path=/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab;
+
+INSERT INTO pageviews_enriched
+SELECT *
+FROM pageviews AS p
+LEFT JOIN users FOR SYSTEM_TIME AS OF p.proctime AS u
+ON p.user_id = u.user_id;
```
+This configuration:
-The table below illustrates the supported Java parameter types and the
corresponding SQL type strings.
-
-| Java type | SQL type |
-| :---------------------- | :---------------- |
-| `java.math.BigDecimal` | `DECIMAL` |
-| `java.lang.Boolean` | `BOOLEAN` |
-| `java.lang.Byte` | `TINYINT` |
-| `java.lang.Double` | `DOUBLE` |
-| `java.lang.Float` | `REAL`, `FLOAT` |
-| `java.lang.Integer` | `INTEGER`, `INT` |
-| `java.lang.Long` | `BIGINT` |
-| `java.lang.Short` | `SMALLINT` |
-| `java.lang.String` | `VARCHAR` |
-
-More types (e.g., `TIMESTAMP` or `ARRAY`), primitive types, and `null` are not
supported yet.
-
-**A (nested) class instance:** Besides literal values, you can also create
(nested) class instances for constructor parameters by specifying the `class`
and `constructor` properties.
-This process can be recursively performed until all the constructor parameters
are represented with literal values.
-
-```yaml
-- class: foo.bar.paramClass
- constructor:
- - StarryName
- - class: java.lang.Integer
- constructor:
- - class: java.lang.String
- constructor:
- - type: VARCHAR
- value: 3
-```
+- defines a temporal table source `users` that reads from a CSV file,
+- set the properties, e.g job name,
+- set the savepoint path,
+- submit a sql job that load the savepoint from the specified savepoint path.
-{{< top >}}
+<span class="label label-danger">Attention</span> Comparing to interactive
mode, SQL Client will stop execution and exits when meets errors.
-Catalogs
---------
-
-Catalogs can be defined as a set of YAML properties and are automatically
registered to the environment upon starting SQL Client.
-
-Users can specify which catalog they want to use as the current catalog in SQL
CLI, and which database of the catalog they want to use as the current database.
-
-```yaml
-catalogs:
- - name: catalog_1
- type: hive
- property-version: 1
- default-database: mydb2
- hive-conf-dir: <path of Hive conf directory>
- - name: catalog_2
- type: hive
- property-version: 1
- hive-conf-dir: <path of Hive conf directory>
-
-execution:
- ...
- current-catalog: catalog_1
- current-database: mydb1
-```
+### Execute a set of SQL statements
-For more information about catalogs, see [Catalogs]({{< ref
"docs/dev/table/catalogs" >}}).
+SQL Client execute each INSERT INTO statement as a single Flink job. However,
this is sometimes not
+optimal because some part of the pipeline can be reused. SQL Client supports
STATEMENT SET syntax to
+execute a set of SQL statements. This is an equivalent feature with
StatementSet in Table API. The
+`STATEMENT SET` syntax encloses one or more `INSERT INTO` statements. All
statements in a `STATEMENT SET`
+block are holistically optimized and executed as a single Flink job. Joint
optimization and execution
+allows for reusing common intermediate results and can therefore significantly
improve the efficiency
+of executing multiple queries.
-Detached SQL Queries
---------------------
+#### Syntax
+```sql
+BEGIN STATEMENT SET;
+ -- one or more INSERT INTO statements
+ { INSERT INTO|OVERWRITE <select_statement>; }+
+END;
+```
+
+<span class="label label-danger">Attention</span> The statements of enclosed
in the `STATEMENT SET` must be separated by a semicolon (;).
-In order to define end-to-end SQL pipelines, SQL's `INSERT INTO` statement can
be used for submitting long-running, detached queries to a Flink cluster. These
queries produce their results into an external system instead of the SQL
Client. This allows for dealing with higher parallelism and larger amounts of
data. The CLI itself does not have any control over a detached query after
submission.
+{{< tabs "statement set" >}}
+{{< tab "SQL CLI" >}}
```sql
-INSERT INTO MyTableSink SELECT * FROM MyTableSource
+Flink SQL> CREATE TABLE pageviews (
+> user_id BIGINT,
+> page_id BIGINT,
+> viewtime TIMESTAMP,
+> proctime AS PROCTIME()
+> ) WITH (
+> 'connector' = 'kafka',
+> 'topic' = 'pageviews',
+> 'properties.bootstrap.servers' = '...',
+> 'format' = 'avro'
+> );
+[INFO] Execute statement succeed.
+
+Flink SQL> CREATE TABLE pageview (
+> page_id BIGINT,
+> cnt BIGINT
+> ) WITH (
+> 'connector' = 'jdbc',
+> 'url' = 'jdbc:mysql://localhost:3306/mydatabase',
+> 'table-name' = 'pageview'
+> );
+[INFO] Execute statement succeed.
+
+Flink SQL> CREATE TABLE uniqueview (
+> page_id BIGINT,
+> cnt BIGINT
+> ) WITH (
+> 'connector' = 'jdbc',
+> 'url' = 'jdbc:mysql://localhost:3306/mydatabase',
+> 'table-name' = 'uniqueview'
+> );
+[INFO] Execute statement succeed.
+
+Flink SQL> BEGIN STATEMENT SET;
+[INFO] Begin a statement set.
+
+Flink SQL> INSERT INTO pageviews
+> SELECT page_id, count(1)
+> FROM pageviews
+> GROUP BY page_id;
+[INFO] Add SQL update statement to the statement set.
+
+Flink SQL> INSERT INTO uniqueview
+> SELECT page_id, count(distinct user_id)
+> FROM pageviews
+> GROUP BY page_id;
+[INFO] Add SQL update statement to the statement set.
+
+Flink SQL> END;
+[INFO] Submitting SQL update statement to the cluster...
+[INFO] SQL update statement has been successfully submitted to the cluster:
+Job ID: 6b1af540c0c0bb3fcfcad50ac037c862
```
+{{< /tab >}}
-The table sink `MyTableSink` has to be declared in the environment file. See
the [connection page](connect.html) for more information about supported
external systems and their configuration. An example for an Apache Kafka table
sink is shown below.
-
-```yaml
-tables:
- - name: MyTableSink
- type: sink-table
- update-mode: append
- connector:
- property-version: 1
- type: kafka
- version: "0.11"
- topic: OutputTopic
- properties:
- bootstrap.servers: localhost:9092
- group.id: testGroup
- format:
- property-version: 1
- type: json
- derive-schema: true
- schema:
- - name: rideId
- data-type: BIGINT
- - name: lon
- data-type: FLOAT
- - name: lat
- data-type: FLOAT
- - name: rideTime
- data-type: TIMESTAMP(3)
+{{< tab "SQL File" >}}
+```sql
+CREATE TABLE pageviews (
+ user_id BIGINT,
+ page_id BIGINT,
+ viewtime TIMESTAMP,
+ proctime AS PROCTIME()
+) WITH (
+ 'connector' = 'kafka',
+ 'topic' = 'pageviews',
+ 'properties.bootstrap.servers' = '...',
+ 'format' = 'avro'
+);
+
+CREATE TABLE pageview (
+ page_id BIGINT,
+ cnt BIGINT
+) WITH (
+ 'connector' = 'jdbc',
+ 'url' = 'jdbc:mysql://localhost:3306/mydatabase',
+ 'table-name' = 'pageview'
+);
+
+CREATE TABLE uniqueview (
+ page_id BIGINT,
+ cnt BIGINT
+) WITH (
+ 'connector' = 'jdbc',
+ 'url' = 'jdbc:mysql://localhost:3306/mydatabase',
+ 'table-name' = 'uniqueview'
+);
+
+BEGIN STATEMENT SET;
+
+INSERT INTO pageviews
+SELECT page_id, count(1)
+FROM pageviews
+GROUP BY page_id;
+
+INSERT INTO uniqueview
+SELECT page_id, count(distinct user_id)
+FROM pageviews
+GROUP BY page_id;
+
+END;
```
+{{< /tab >}}
+{{< /tabs >}}
-The SQL Client makes sure that a statement is successfully submitted to the
cluster. Once the query is submitted, the CLI will show information about the
Flink job.
+### Execute DML statements sync/async
-```text
+By default, SQL Client executes DML statements asynchronously. That means, SQL
Client will submit a
+job for the DML statement to a Flink cluster, and not wait for the job to
finish.
+So SQL Client can submit multiple jobs at the same time. This is useful for
streaming jobs, which are long-running in general.
+
+SQL Client makes sure that a statement is successfully submitted to the
cluster. Once the statement
+is submitted, the CLI will show information about the Flink job.
+
+```sql
+Flink SQL> INSERT INTO MyTableSink SELECT * FROM MyTableSource;
[INFO] Table update statement has been successfully submitted to the cluster:
Cluster ID: StandaloneClusterId
Job ID: 6f922fe5cba87406ff23ae4a7bb79044
-Web interface: http://localhost:8081
```
<span class="label label-danger">Attention</span> The SQL Client does not
track the status of the running Flink job after submission. The CLI process can
be shutdown after the submission without affecting the detached query. Flink's
`restart strategy` takes care of the fault-tolerance. A query can be cancelled
using Flink's web interface, command-line, or REST API.
-{{< top >}}
+However, for batch users, it's more common that the next DML statement
requires to wait util the
+previous DML statement finishes. In order to execute DML statements
synchronously, you can set
+`table.dml-sync` option true in SQL Client.
-SQL Views
----------
+```sql
+Flink SQL> SET table.dml-sync = true;
+[INFO] Session property has been set.
+
+Flink SQL> INSERT INTO MyTableSink SELECT * FROM MyTableSource;
+[INFO] Submitting SQL update statement to the cluster...
+[INFO] Execute statement in sync mode. Please wait for the execution finish...
+[INFO] Complete execution of the SQL update statement.
+```
-Views allow to define virtual tables from SQL queries. The view definition is
parsed and validated immediately. However, the actual execution happens when
the view is accessed during the submission of a general `INSERT INTO` or
`SELECT` statement.
+<span class="label label-danger">Attention</span> If you want to terminate
the job, just type `CTRL-C` to cancel the execution.
-Views can either be defined in [environment
files](sqlClient.html#environment-files) or within the CLI session.
+### Start a SQL Job from a savepoint
-The following example shows how to define multiple views in a file. The views
are registered in the order in which they are defined in the environment file.
Reference chains such as _view A depends on view B depends on view C_ are
supported.
+Flink supports to start the job with specified savepoints. In SQL Client, it's
allow to use `SET` command
+to specify the path to the savepoints.
-```yaml
-tables:
- - name: MyTableSource
- # ...
- - name: MyRestrictedView
- type: view
- query: "SELECT MyField2 FROM MyTableSource"
- - name: MyComplexView
- type: view
- query: >
- SELECT MyField2 + 42, CAST(MyField1 AS VARCHAR)
- FROM MyTableSource
- WHERE MyField2 > 200
+```sql
+Flink SQL> SET
execution.savepoint.path=/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab;
+[INFO] Session property has been set.
Review comment:
apply this change?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]