This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2391a8830a9944adbc5aadbb24b43ad19340d90c
Author: Jark Wu <[email protected]>
AuthorDate: Mon Aug 26 13:33:19 2019 +0800

    [FLINK-13362][docs] Add DDL documentation for Kafka, ElasticSearch, 
FileSystem and formats
---
 docs/dev/table/connect.md    | 270 +++++++++++++++++++++++++++++++++++++++++
 docs/dev/table/connect.zh.md | 283 +++++++++++++++++++++++++++++++++++++++++--
 2 files changed, 546 insertions(+), 7 deletions(-)

diff --git a/docs/dev/table/connect.md b/docs/dev/table/connect.md
index 14d2a3b..5378ae9 100644
--- a/docs/dev/table/connect.md
+++ b/docs/dev/table/connect.md
@@ -122,6 +122,12 @@ format: ...
 schema: ...
 {% endhighlight %}
 </div>
+
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+tableEnvironment.sqlUpdate("CREATE TABLE MyTable (...) WITH (...)")
+{% endhighlight %}
+</div>
 </div>
 
 The table's type (`source`, `sink`, or `both`) determines how a table is 
registered. In case of table type `both`, both a table source and table sink 
are registered under the same name. Logically, this means that we can both read 
and write to such a table similarly to a table in a regular DBMS.
@@ -276,6 +282,39 @@ tables:
         type: VARCHAR
 {% endhighlight %}
 </div>
+
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  `user` BIGINT,
+  message VARCHAR,
+  ts VARCHAR
+) WITH (
+  -- declare the external system to connect to
+  'connector.type' = 'kafka',
+  'connector.version' = '0.10',
+  'connector.topic' = 'topic_name',
+  'connector.startup-mode' = 'earliest-offset',
+  'connector.properties.0.key' = 'zookeeper.connect',
+  'connector.properties.0.value' = 'localhost:2181',
+  'connector.properties.1.key' = 'bootstrap.servers',
+  'connector.properties.1.value' = 'localhost:9092',
+  'update-mode' = 'append',
+  -- declare a format for this system
+  'format.type' = 'avro',
+  'format.avro-schema' = '{
+                            "namespace": "org.myorganization",
+                            "type": "record",
+                            "name": "UserMessage",
+                            "fields": [
+                                {"name": "ts", "type": "string"},
+                                {"name": "user", "type": "long"},
+                                {"name": "message", "type": ["string", "null"]}
+                            ]
+                         }'
+)
+{% endhighlight %}
+</div>
 </div>
 
 In both ways the desired connection properties are converted into normalized, 
string-based key-value pairs. So-called [table 
factories](sourceSinks.html#define-a-tablefactory) create configured table 
sources, table sinks, and corresponding formats from the key-value pairs. All 
table factories that can be found via Java's [Service Provider Interfaces 
(SPI)](https://docs.oracle.com/javase/tutorial/sound/SPI-intro.html) are taken 
into account when searching for exactly-one matching table factory.
@@ -603,6 +642,16 @@ tables:
     update-mode: append    # otherwise: "retract" or "upsert"
 {% endhighlight %}
 </div>
+
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyTable (
+ ...
+) WITH (
+ 'update-mode' = 'append'  -- otherwise: 'retract' or 'upsert'
+)
+{% endhighlight %}
+</div>
 </div>
 
 See also the [general streaming concepts 
documentation](streaming/dynamic_tables.html#continuous-queries) for more 
information.
@@ -652,6 +701,17 @@ connector:
   path: "file:///path/to/whatever"    # required: path to a file or directory
 {% endhighlight %}
 </div>
+
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  ...
+) WITH (
+  'connector.type' = 'filesystem',               -- required: specify to 
connector type
+  'connector.path' = 'file:///path/to/whatever'  -- required: path to a file 
or directory
+)
+{% endhighlight %}
+</div>
 </div>
 
 The file system connector itself is included in Flink and does not require an 
additional dependency. A corresponding format needs to be specified for reading 
and writing rows from and to a file system.
@@ -753,6 +813,49 @@ connector:
   sink-partitioner-class: org.mycompany.MyPartitioner  # optional: used in 
case of sink partitioner custom
 {% endhighlight %}
 </div>
+
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  ...
+) WITH (
+  'connector.type' = 'kafka',       
+
+  'connector.version' = '0.11',     -- required: valid connector versions are
+                                    -- "0.8", "0.9", "0.10", "0.11", and 
"universal"
+
+  'connector.topic' = 'topic_name', -- required: topic name from which the 
table is read
+
+  'update-mode' = 'append',         -- required: update mode when used as 
table sink, 
+                                    -- only support append mode now.
+
+  'connector.properties.0.key' = 'zookeeper.connect', -- optional: connector 
specific properties
+  'connector.properties.0.value' = 'localhost:2181',
+  'connector.properties.1.key' = 'bootstrap.servers',
+  'connector.properties.1.value' = 'localhost:9092',
+  'connector.properties.2.key' = 'group.id',
+  'connector.properties.2.value' = 'testGroup',
+  'connector.startup-mode' = 'earliest-offset',    -- optional: valid modes 
are "earliest-offset", 
+                                                   -- "latest-offset", 
"group-offsets", 
+                                                   -- or "specific-offsets"
+
+  -- optional: used in case of startup mode with specific offsets
+  'connector.specific-offsets.0.partition' = '0',
+  'connector.specific-offsets.0.offset' = '42',
+  'connector.specific-offsets.1.partition' = '1',
+  'connector.specific-offsets.1.offset' = '300',
+
+  'connector.sink-partitioner' = '...',  -- optional: output partitioning from 
Flink's partitions 
+                                         -- into Kafka's partitions valid are 
"fixed" 
+                                         -- (each Flink partition ends up in 
at most one Kafka partition),
+                                         -- "round-robin" (a Flink partition 
is distributed to 
+                                         -- Kafka partitions round-robin)
+                                         -- "custom" (use a custom 
FlinkKafkaPartitioner subclass)
+  -- optional: used in case of sink partitioner custom
+  'connector.sink-partitioner-class' = 'org.mycompany.MyPartitioner'
+)
+{% endhighlight %}
+</div>
 </div>
 
 **Specify the start reading position:** By default, the Kafka source will 
start reading data from the committed group offsets in Zookeeper or Kafka 
brokers. You can specify other start positions, which correspond to the 
configurations in section [Kafka Consumers Start Position Configuration]({{ 
site.baseurl 
}}/dev/connectors/kafka.html#kafka-consumers-start-position-configuration).
@@ -900,6 +1003,66 @@ connector:
     connection-path-prefix: "/v1"     # optional: prefix string to be added to 
every REST communication
 {% endhighlight %}
 </div>
+
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  ...
+) WITH (
+  'connector.type' = 'elasticsearch', -- required: specify this table type is 
elasticsearch
+  
+  'connector.version' = '6',          -- required: valid connector versions 
are "6"
+  
+  'connector.hosts.0.hostname' = 'host_name',  -- required: one or more 
Elasticsearch hosts to connect to
+  'connector.hosts.0.port' = '9092',
+  'connector.hosts.0.protocol' = 'http',
+
+  'connector.index' = 'MyUsers',       -- required: Elasticsearch index
+
+  'connector.document-type' = 'user',  -- required: Elasticsearch document type
+
+  'update-mode' = 'append',            -- optional: update mode when used as 
table sink.           
+
+  'connector.key-delimiter' = '$',     -- optional: delimiter for composite 
keys ("_" by default)
+                                       -- e.g., "$" would result in IDs 
"KEY1$KEY2$KEY3"
+
+  'connector.key-null-literal' = 'n/a',  -- optional: representation for null 
fields in keys ("null" by default)
+
+  'connector.failure-handler' = '...',   -- optional: failure handling 
strategy in case a request to 
+                                         -- Elasticsearch fails ("fail" by 
default).
+                                         -- valid strategies are 
+                                         -- "fail" (throws an exception if a 
request fails and
+                                         -- thus causes a job failure), 
+                                         -- "ignore" (ignores failures and 
drops the request),
+                                         -- "retry-rejected" (re-adds requests 
that have failed due 
+                                         -- to queue capacity saturation), 
+                                         -- or "custom" for failure handling 
with a
+                                         -- ActionRequestFailureHandler 
subclass
+
+  -- optional: configure how to buffer elements before sending them in bulk to 
the cluster for efficiency
+  'connector.flush-on-checkpoint' = 'true',   -- optional: disables flushing 
on checkpoint (see notes below!)
+                                              -- ("true" by default)
+  'connector.bulk-flush.max-actions' = '42',  -- optional: maximum number of 
actions to buffer 
+                                              -- for each bulk request
+  'connector.bulk-flush.max-size' = '42 mb',  -- optional: maximum size of 
buffered actions in bytes
+                                              -- per bulk request
+                                              -- (only MB granularity is 
supported)
+  'connector.bulk-flush.interval' = '60000',  -- optional: bulk flush interval 
(in milliseconds)
+  'connector.bulk-flush.back-off.type' = '...',       -- optional: backoff 
strategy ("disabled" by default)
+                                                      -- valid strategies are 
"disabled", "constant",
+                                                      -- or "exponential"
+  'connector.bulk-flush.back-off.max-retries' = '3',  -- optional: maximum 
number of retries
+  'connector.bulk-flush.back-off.delay' = '30000',    -- optional: delay 
between each backoff attempt
+                                                      -- (in milliseconds)
+
+  -- optional: connection properties to be used during REST communication to 
Elasticsearch
+  'connector.connection-max-retry-timeout' = '3',     -- optional: maximum 
timeout (in milliseconds)
+                                                      -- between retries
+  'connector.connection-path-prefix' = '/v1'          -- optional: prefix 
string to be added to every
+                                                      -- REST communication
+)
+{% endhighlight %}
+</div>
 </div>
 
 **Bulk flushing:** For more information about characteristics of the optional 
flushing parameters see the [corresponding low-level documentation]({{ 
site.baseurl }}/dev/connectors/elasticsearch.html).
@@ -1019,6 +1182,38 @@ format:
                                #   null value (disabled by default)
 {% endhighlight %}
 </div>
+
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  ...
+) WITH (
+  'format.type' = 'csv',                  -- required: specify the schema type
+
+  'format.fields.0.name' = 'lon',         -- required: define the schema 
either by using type information
+  'format.fields.0.type' = 'FLOAT',
+  'format.fields.1.name' = 'rideTime',
+  'format.fields.1.type' = 'TIMESTAMP',
+
+  'format.derive-schema' = 'true',        -- or use the table's schema
+
+  'format.field-delimiter' = ';',         -- optional: field delimiter 
character (',' by default)
+  'format.line-delimiter' = '\r\n',       -- optional: line delimiter ("\n" by 
default; otherwise
+                                          -- "\r" or "\r\n" are allowed)
+  'format.quote-character' = '''',        -- optional: quote character for 
enclosing field values ('"' by default)
+  'format.allow-comments' = true,         -- optional: ignores comment lines 
that start with "#" 
+                                          -- (disabled by default);
+                                          -- if enabled, make sure to also 
ignore parse errors to allow empty rows
+  'format.ignore-parse-errors' = 'true',  -- optional: skip fields and rows 
with parse errors instead of failing;
+                                          -- fields are set to null in case of 
errors
+  'format.array-element-delimiter' = '|', -- optional: the array element 
delimiter string for separating
+                                          -- array and row element values (";" 
by default)
+  'format.escape-character' = '\\',       -- optional: escape character for 
escaping values (disabled by default)
+  'format.null-literal' = 'n/a'           -- optional: null literal string 
that is interpreted as a
+                                          -- null value (disabled by default)
+)
+{% endhighlight %}
+</div>
 </div>
 
 The following table lists supported types that can be read and written:
@@ -1171,6 +1366,38 @@ format:
   derive-schema: true
 {% endhighlight %}
 </div>
+
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  ...
+) WITH (
+  'format.type' = 'json',                   -- required: specify the format 
type
+  'format.fail-on-missing-field' = 'true'   -- optional: flag whether to fail 
if a field is missing or not, false by default
+
+  'format.fields.0.name' = 'lon',           -- required: define the schema 
either by using a type string which parses numbers to corresponding types
+  'format.fields.0.type' = 'FLOAT',
+  'format.fields.1.name' = 'rideTime',
+  'format.fields.1.type' = 'TIMESTAMP',
+
+  'format.json-schema' =                    -- or by using a JSON schema which 
parses to DECIMAL and TIMESTAMP
+    '{
+      "type": "object",
+      "properties": {
+        "lon": {
+          "type": "number"
+        },
+        "rideTime": {
+          "type": "string",
+          "format": "date-time"
+        }
+      }
+    }',
+
+  'format.derive-schema' = 'true'          -- or use the table's schema
+)
+{% endhighlight %}
+</div>
 </div>
 
 The following table shows the mapping of JSON schema types to Flink SQL types:
@@ -1318,6 +1545,27 @@ format:
     }
 {% endhighlight %}
 </div>
+
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  ...
+) WITH (
+  'format.type' = 'avro',                                 -- required: specify 
the schema type
+  'format.record-class' = 'org.organization.types.User',  -- required: define 
the schema either by using an Avro specific record class
+
+  'format.avro-schema' =                                  -- or by using an 
Avro schema
+    '{
+      "type": "record",
+      "name": "test",
+      "fields" : [
+        {"name": "a", "type": "long"},
+        {"name": "b", "type": "string"}
+      ]
+    }'
+)
+{% endhighlight %}
+</div>
 </div>
 
 Avro types are mapped to the corresponding SQL data types. Union types are 
only supported for specifying nullability otherwise they are converted to an 
`ANY` type. The following table shows the mapping:
@@ -1410,6 +1658,28 @@ format:
   ignore-parse-errors: true  # optional: skip records with parse error instead 
of failing by default
 {% endhighlight %}
 </div>
+
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  ...
+) WITH (
+  'format.type' = 'csv',                  -- required: specify the schema type
+
+  'format.fields.0.name' = 'lon',         -- required: define the schema 
either by using type information
+  'format.fields.0.type' = 'FLOAT',
+  'format.fields.1.name' = 'rideTime',
+  'format.fields.1.type' = 'TIMESTAMP',
+  
+  'format.field-delimiter' = ',',         -- optional: string delimiter "," by 
default
+  'format.line-delimiter' = '\n',         -- optional: string delimiter "\n" 
by default
+  'format.quote-character' = '"',         -- optional: single character for 
string values, empty by default
+  'format.comment-prefix' = '#',          -- optional: string to indicate 
comments, empty by default
+  'format.ignore-first-line' = 'false',   -- optional: boolean flag to ignore 
the first line, by default it is not skipped
+  'format.ignore-parse-errors' = 'true'   -- optional: skip records with parse 
error instead of failing by default
+)
+{% endhighlight %}
+</div>
 </div>
 
 The old CSV format is included in Flink and does not require additional 
dependencies.
diff --git a/docs/dev/table/connect.zh.md b/docs/dev/table/connect.zh.md
index 9aeacd9..6deb6af 100644
--- a/docs/dev/table/connect.zh.md
+++ b/docs/dev/table/connect.zh.md
@@ -1,5 +1,5 @@
 ---
-title: "连接外部系统"
+title: "Connect to External Systems"
 nav-parent_id: tableapi
 nav-pos: 19
 ---
@@ -122,6 +122,12 @@ format: ...
 schema: ...
 {% endhighlight %}
 </div>
+
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+tableEnvironment.sqlUpdate("CREATE TABLE MyTable (...) WITH (...)")
+{% endhighlight %}
+</div>
 </div>
 
 The table's type (`source`, `sink`, or `both`) determines how a table is 
registered. In case of table type `both`, both a table source and table sink 
are registered under the same name. Logically, this means that we can both read 
and write to such a table similarly to a table in a regular DBMS.
@@ -276,6 +282,39 @@ tables:
         type: VARCHAR
 {% endhighlight %}
 </div>
+
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  `user` BIGINT,
+  message VARCHAR,
+  ts VARCHAR
+) WITH (
+  -- declare the external system to connect to
+  'connector.type' = 'kafka',
+  'connector.version' = '0.10',
+  'connector.topic' = 'topic_name',
+  'connector.startup-mode' = 'earliest-offset',
+  'connector.properties.0.key' = 'zookeeper.connect',
+  'connector.properties.0.value' = 'localhost:2181',
+  'connector.properties.1.key' = 'bootstrap.servers',
+  'connector.properties.1.value' = 'localhost:9092',
+  'update-mode' = 'append',
+  -- declare a format for this system
+  'format.type' = 'avro',
+  'format.avro-schema' = '{
+                            "namespace": "org.myorganization",
+                            "type": "record",
+                            "name": "UserMessage",
+                            "fields": [
+                                {"name": "ts", "type": "string"},
+                                {"name": "user", "type": "long"},
+                                {"name": "message", "type": ["string", "null"]}
+                            ]
+                         }'
+)
+{% endhighlight %}
+</div>
 </div>
 
 In both ways the desired connection properties are converted into normalized, 
string-based key-value pairs. So-called [table 
factories](sourceSinks.html#define-a-tablefactory) create configured table 
sources, table sinks, and corresponding formats from the key-value pairs. All 
table factories that can be found via Java's [Service Provider Interfaces 
(SPI)](https://docs.oracle.com/javase/tutorial/sound/SPI-intro.html) are taken 
into account when searching for exactly-one matching table factory.
@@ -603,6 +642,16 @@ tables:
     update-mode: append    # otherwise: "retract" or "upsert"
 {% endhighlight %}
 </div>
+
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyTable (
+ ...
+) WITH (
+ 'update-mode' = 'append'  -- otherwise: 'retract' or 'upsert'
+)
+{% endhighlight %}
+</div>
 </div>
 
 See also the [general streaming concepts 
documentation](streaming/dynamic_tables.html#continuous-queries) for more 
information.
@@ -652,6 +701,17 @@ connector:
   path: "file:///path/to/whatever"    # required: path to a file or directory
 {% endhighlight %}
 </div>
+
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  ...
+) WITH (
+  'connector.type' = 'filesystem',               -- required: specify to 
connector type
+  'connector.path' = 'file:///path/to/whatever'  -- required: path to a file 
or directory
+)
+{% endhighlight %}
+</div>
 </div>
 
 The file system connector itself is included in Flink and does not require an 
additional dependency. A corresponding format needs to be specified for reading 
and writing rows from and to a file system.
@@ -703,7 +763,7 @@ The Kafka connector allows for reading and writing from and 
to an Apache Kafka t
     .version("0.11")  # required: valid connector versions are
                       # "0.8", "0.9", "0.10", "0.11", and "universal"
     .topic("...")     # required: topic name from which the table is read
-    
+
     # optional: connector specific properties
     .property("zookeeper.connect", "localhost:2181")
     .property("bootstrap.servers", "localhost:9092")
@@ -753,6 +813,49 @@ connector:
   sink-partitioner-class: org.mycompany.MyPartitioner  # optional: used in 
case of sink partitioner custom
 {% endhighlight %}
 </div>
+
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  ...
+) WITH (
+  'connector.type' = 'kafka',
+
+  'connector.version' = '0.11',     -- required: valid connector versions are
+                                    -- "0.8", "0.9", "0.10", "0.11", and 
"universal"
+
+  'connector.topic' = 'topic_name', -- required: topic name from which the 
table is read
+
+  'update-mode' = 'append',         -- required: update mode when used as 
table sink,
+                                    -- only support append mode now.
+
+  'connector.properties.0.key' = 'zookeeper.connect', -- optional: connector 
specific properties
+  'connector.properties.0.value' = 'localhost:2181',
+  'connector.properties.1.key' = 'bootstrap.servers',
+  'connector.properties.1.value' = 'localhost:9092',
+  'connector.properties.2.key' = 'group.id',
+  'connector.properties.2.value' = 'testGroup',
+  'connector.startup-mode' = 'earliest-offset',    -- optional: valid modes 
are "earliest-offset",
+                                                   -- "latest-offset", 
"group-offsets",
+                                                   -- or "specific-offsets"
+
+  -- optional: used in case of startup mode with specific offsets
+  'connector.specific-offsets.0.partition' = '0',
+  'connector.specific-offsets.0.offset' = '42',
+  'connector.specific-offsets.1.partition' = '1',
+  'connector.specific-offsets.1.offset' = '300',
+
+  'connector.sink-partitioner' = '...',  -- optional: output partitioning from 
Flink's partitions
+                                         -- into Kafka's partitions valid are 
"fixed"
+                                         -- (each Flink partition ends up in 
at most one Kafka partition),
+                                         -- "round-robin" (a Flink partition 
is distributed to
+                                         -- Kafka partitions round-robin)
+                                         -- "custom" (use a custom 
FlinkKafkaPartitioner subclass)
+  -- optional: used in case of sink partitioner custom
+  'connector.sink-partitioner-class' = 'org.mycompany.MyPartitioner'
+)
+{% endhighlight %}
+</div>
 </div>
 
 **Specify the start reading position:** By default, the Kafka source will 
start reading data from the committed group offsets in Zookeeper or Kafka 
brokers. You can specify other start positions, which correspond to the 
configurations in section [Kafka Consumers Start Position Configuration]({{ 
site.baseurl 
}}/dev/connectors/kafka.html#kafka-consumers-start-position-configuration).
@@ -900,6 +1003,66 @@ connector:
     connection-path-prefix: "/v1"     # optional: prefix string to be added to 
every REST communication
 {% endhighlight %}
 </div>
+
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  ...
+) WITH (
+  'connector.type' = 'elasticsearch', -- required: specify this table type is 
elasticsearch
+
+  'connector.version' = '6',          -- required: valid connector versions 
are "6"
+
+  'connector.hosts.0.hostname' = 'host_name',  -- required: one or more 
Elasticsearch hosts to connect to
+  'connector.hosts.0.port' = '9092',
+  'connector.hosts.0.protocol' = 'http',
+
+  'connector.index' = 'MyUsers',       -- required: Elasticsearch index
+
+  'connector.document-type' = 'user',  -- required: Elasticsearch document type
+
+  'update-mode' = 'append',            -- optional: update mode when used as 
table sink.
+
+  'connector.key-delimiter' = '$',     -- optional: delimiter for composite 
keys ("_" by default)
+                                       -- e.g., "$" would result in IDs 
"KEY1$KEY2$KEY3"
+
+  'connector.key-null-literal' = 'n/a',  -- optional: representation for null 
fields in keys ("null" by default)
+
+  'connector.failure-handler' = '...',   -- optional: failure handling 
strategy in case a request to
+                                         -- Elasticsearch fails ("fail" by 
default).
+                                         -- valid strategies are
+                                         -- "fail" (throws an exception if a 
request fails and
+                                         -- thus causes a job failure),
+                                         -- "ignore" (ignores failures and 
drops the request),
+                                         -- "retry-rejected" (re-adds requests 
that have failed due
+                                         -- to queue capacity saturation),
+                                         -- or "custom" for failure handling 
with a
+                                         -- ActionRequestFailureHandler 
subclass
+
+  -- optional: configure how to buffer elements before sending them in bulk to 
the cluster for efficiency
+  'connector.flush-on-checkpoint' = 'true',   -- optional: disables flushing 
on checkpoint (see notes below!)
+                                              -- ("true" by default)
+  'connector.bulk-flush.max-actions' = '42',  -- optional: maximum number of 
actions to buffer
+                                              -- for each bulk request
+  'connector.bulk-flush.max-size' = '42 mb',  -- optional: maximum size of 
buffered actions in bytes
+                                              -- per bulk request
+                                              -- (only MB granularity is 
supported)
+  'connector.bulk-flush.interval' = '60000',  -- optional: bulk flush interval 
(in milliseconds)
+  'connector.bulk-flush.back-off.type' = '...',       -- optional: backoff 
strategy ("disabled" by default)
+                                                      -- valid strategies are 
"disabled", "constant",
+                                                      -- or "exponential"
+  'connector.bulk-flush.back-off.max-retries' = '3',  -- optional: maximum 
number of retries
+  'connector.bulk-flush.back-off.delay' = '30000',    -- optional: delay 
between each backoff attempt
+                                                      -- (in milliseconds)
+
+  -- optional: connection properties to be used during REST communication to 
Elasticsearch
+  'connector.connection-max-retry-timeout' = '3',     -- optional: maximum 
timeout (in milliseconds)
+                                                      -- between retries
+  'connector.connection-path-prefix' = '/v1'          -- optional: prefix 
string to be added to every
+                                                      -- REST communication
+)
+{% endhighlight %}
+</div>
 </div>
 
 **Bulk flushing:** For more information about characteristics of the optional 
flushing parameters see the [corresponding low-level documentation]({{ 
site.baseurl }}/dev/connectors/elasticsearch.html).
@@ -977,17 +1140,17 @@ The CSV format can be used as follows:
     # or use the table's schema
     .derive_schema()
 
-    .field_delimiter(";")          # optional: field delimiter character ("," 
by default)
+    .field_delimiter(';')          # optional: field delimiter character (',' 
by default)
     .line_delimiter("\r\n")        # optional: line delimiter ("\n" by default;
                                    #   otherwise "\r" or "\r\n" are allowed)
-    .quote_character("'")         # optional: quote character for enclosing 
field values ('"' by default)
-    .allow_comments()              # optional: ignores comment lines that 
start with "#" (disabled by default);
+    .quote_character('\'')         # optional: quote character for enclosing 
field values ('"' by default)
+    .allow_comments()              # optional: ignores comment lines that 
start with '#' (disabled by default);
                                    #   if enabled, make sure to also ignore 
parse errors to allow empty rows
     .ignore_parse_errors()         # optional: skip fields and rows with parse 
errors instead of failing;
                                    #   fields are set to null in case of errors
     .array_element_delimiter("|")  # optional: the array element delimiter 
string for separating
                                    #   array and row element values (";" by 
default)
-    .escape_character("\\")        # optional: escape character for escaping 
values (disabled by default)
+    .escape_character('\\')        # optional: escape character for escaping 
values (disabled by default)
     .null_literal("n/a")           # optional: null literal string that is 
interpreted as a
                                    #   null value (disabled by default)
 )
@@ -1019,6 +1182,38 @@ format:
                                #   null value (disabled by default)
 {% endhighlight %}
 </div>
+
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  ...
+) WITH (
+  'format.type' = 'csv',                  -- required: specify the schema type
+
+  'format.fields.0.name' = 'lon',         -- required: define the schema 
either by using type information
+  'format.fields.0.type' = 'FLOAT',
+  'format.fields.1.name' = 'rideTime',
+  'format.fields.1.type' = 'TIMESTAMP',
+
+  'format.derive-schema' = 'true',        -- or use the table's schema
+
+  'format.field-delimiter' = ';',         -- optional: field delimiter 
character (',' by default)
+  'format.line-delimiter' = '\r\n',       -- optional: line delimiter ("\n" by 
default; otherwise
+                                          -- "\r" or "\r\n" are allowed)
+  'format.quote-character' = '''',        -- optional: quote character for 
enclosing field values ('"' by default)
+  'format.allow-comments' = true,         -- optional: ignores comment lines 
that start with "#"
+                                          -- (disabled by default);
+                                          -- if enabled, make sure to also 
ignore parse errors to allow empty rows
+  'format.ignore-parse-errors' = 'true',  -- optional: skip fields and rows 
with parse errors instead of failing;
+                                          -- fields are set to null in case of 
errors
+  'format.array-element-delimiter' = '|', -- optional: the array element 
delimiter string for separating
+                                          -- array and row element values (";" 
by default)
+  'format.escape-character' = '\\',       -- optional: escape character for 
escaping values (disabled by default)
+  'format.null-literal' = 'n/a'           -- optional: null literal string 
that is interpreted as a
+                                          -- null value (disabled by default)
+)
+{% endhighlight %}
+</div>
 </div>
 
 The following table lists supported types that can be read and written:
@@ -1171,6 +1366,38 @@ format:
   derive-schema: true
 {% endhighlight %}
 </div>
+
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  ...
+) WITH (
+  'format.type' = 'json',                   -- required: specify the format 
type
+  'format.fail-on-missing-field' = 'true'   -- optional: flag whether to fail 
if a field is missing or not, false by default
+
+  'format.fields.0.name' = 'lon',           -- required: define the schema 
either by using a type string which parses numbers to corresponding types
+  'format.fields.0.type' = 'FLOAT',
+  'format.fields.1.name' = 'rideTime',
+  'format.fields.1.type' = 'TIMESTAMP',
+
+  'format.json-schema' =                    -- or by using a JSON schema which 
parses to DECIMAL and TIMESTAMP
+    '{
+      "type": "object",
+      "properties": {
+        "lon": {
+          "type": "number"
+        },
+        "rideTime": {
+          "type": "string",
+          "format": "date-time"
+        }
+      }
+    }',
+
+  'format.derive-schema' = 'true'          -- or use the table's schema
+)
+{% endhighlight %}
+</div>
 </div>
 
 The following table shows the mapping of JSON schema types to Flink SQL types:
@@ -1318,6 +1545,27 @@ format:
     }
 {% endhighlight %}
 </div>
+
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  ...
+) WITH (
+  'format.type' = 'avro',                                 -- required: specify 
the schema type
+  'format.record-class' = 'org.organization.types.User',  -- required: define 
the schema either by using an Avro specific record class
+
+  'format.avro-schema' =                                  -- or by using an 
Avro schema
+    '{
+      "type": "record",
+      "name": "test",
+      "fields" : [
+        {"name": "a", "type": "long"},
+        {"name": "b", "type": "string"}
+      ]
+    }'
+)
+{% endhighlight %}
+</div>
 </div>
 
 Avro types are mapped to the corresponding SQL data types. Union types are 
only supported for specifying nullability otherwise they are converted to an 
`ANY` type. The following table shows the mapping:
@@ -1410,6 +1658,28 @@ format:
   ignore-parse-errors: true  # optional: skip records with parse error instead 
of failing by default
 {% endhighlight %}
 </div>
+
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  ...
+) WITH (
+  'format.type' = 'csv',                  -- required: specify the schema type
+
+  'format.fields.0.name' = 'lon',         -- required: define the schema 
either by using type information
+  'format.fields.0.type' = 'FLOAT',
+  'format.fields.1.name' = 'rideTime',
+  'format.fields.1.type' = 'TIMESTAMP',
+
+  'format.field-delimiter' = ',',         -- optional: string delimiter "," by 
default
+  'format.line-delimiter' = '\n',         -- optional: string delimiter "\n" 
by default
+  'format.quote-character' = '"',         -- optional: single character for 
string values, empty by default
+  'format.comment-prefix' = '#',          -- optional: string to indicate 
comments, empty by default
+  'format.ignore-first-line' = 'false',   -- optional: boolean flag to ignore 
the first line, by default it is not skipped
+  'format.ignore-parse-errors' = 'true'   -- optional: skip records with parse 
error instead of failing by default
+)
+{% endhighlight %}
+</div>
 </div>
 
 The old CSV format is included in Flink and does not require additional 
dependencies.
@@ -1532,7 +1802,6 @@ table.insertInto("csvOutputTable")
 {% endhighlight %}
 </div>
 
-
 <div data-lang="python" markdown="1">
 {% highlight python %}
 

Reply via email to