This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2391a8830a9944adbc5aadbb24b43ad19340d90c Author: Jark Wu <[email protected]> AuthorDate: Mon Aug 26 13:33:19 2019 +0800 [FLINK-13362][docs] Add DDL documentation for Kafka, ElasticSearch, FileSystem and formats --- docs/dev/table/connect.md | 270 +++++++++++++++++++++++++++++++++++++++++ docs/dev/table/connect.zh.md | 283 +++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 546 insertions(+), 7 deletions(-) diff --git a/docs/dev/table/connect.md b/docs/dev/table/connect.md index 14d2a3b..5378ae9 100644 --- a/docs/dev/table/connect.md +++ b/docs/dev/table/connect.md @@ -122,6 +122,12 @@ format: ... schema: ... {% endhighlight %} </div> + +<div data-lang="DDL" markdown="1"> +{% highlight sql %} +tableEnvironment.sqlUpdate("CREATE TABLE MyTable (...) WITH (...)") +{% endhighlight %} +</div> </div> The table's type (`source`, `sink`, or `both`) determines how a table is registered. In case of table type `both`, both a table source and table sink are registered under the same name. Logically, this means that we can both read and write to such a table similarly to a table in a regular DBMS. @@ -276,6 +282,39 @@ tables: type: VARCHAR {% endhighlight %} </div> + +<div data-lang="DDL" markdown="1"> +{% highlight sql %} +CREATE TABLE MyUserTable ( + `user` BIGINT, + message VARCHAR, + ts VARCHAR +) WITH ( + -- declare the external system to connect to + 'connector.type' = 'kafka', + 'connector.version' = '0.10', + 'connector.topic' = 'topic_name', + 'connector.startup-mode' = 'earliest-offset', + 'connector.properties.0.key' = 'zookeeper.connect', + 'connector.properties.0.value' = 'localhost:2181', + 'connector.properties.1.key' = 'bootstrap.servers', + 'connector.properties.1.value' = 'localhost:9092', + 'update-mode' = 'append', + -- declare a format for this system + 'format.type' = 'avro', + 'format.avro-schema' = '{ + "namespace": "org.myorganization", + "type": "record", + "name": "UserMessage", + "fields": [ + {"name": "ts", "type": "string"}, + {"name": "user", "type": "long"}, + {"name": "message", "type": ["string", "null"]} + ] + }' +) +{% endhighlight %} +</div> </div> In both ways the desired connection properties are converted into normalized, string-based key-value pairs. So-called [table factories](sourceSinks.html#define-a-tablefactory) create configured table sources, table sinks, and corresponding formats from the key-value pairs. All table factories that can be found via Java's [Service Provider Interfaces (SPI)](https://docs.oracle.com/javase/tutorial/sound/SPI-intro.html) are taken into account when searching for exactly-one matching table factory. @@ -603,6 +642,16 @@ tables: update-mode: append # otherwise: "retract" or "upsert" {% endhighlight %} </div> + +<div data-lang="DDL" markdown="1"> +{% highlight sql %} +CREATE TABLE MyTable ( + ... +) WITH ( + 'update-mode' = 'append' -- otherwise: 'retract' or 'upsert' +) +{% endhighlight %} +</div> </div> See also the [general streaming concepts documentation](streaming/dynamic_tables.html#continuous-queries) for more information. @@ -652,6 +701,17 @@ connector: path: "file:///path/to/whatever" # required: path to a file or directory {% endhighlight %} </div> + +<div data-lang="DDL" markdown="1"> +{% highlight sql %} +CREATE TABLE MyUserTable ( + ... +) WITH ( + 'connector.type' = 'filesystem', -- required: specify to connector type + 'connector.path' = 'file:///path/to/whatever' -- required: path to a file or directory +) +{% endhighlight %} +</div> </div> The file system connector itself is included in Flink and does not require an additional dependency. A corresponding format needs to be specified for reading and writing rows from and to a file system. @@ -753,6 +813,49 @@ connector: sink-partitioner-class: org.mycompany.MyPartitioner # optional: used in case of sink partitioner custom {% endhighlight %} </div> + +<div data-lang="DDL" markdown="1"> +{% highlight sql %} +CREATE TABLE MyUserTable ( + ... +) WITH ( + 'connector.type' = 'kafka', + + 'connector.version' = '0.11', -- required: valid connector versions are + -- "0.8", "0.9", "0.10", "0.11", and "universal" + + 'connector.topic' = 'topic_name', -- required: topic name from which the table is read + + 'update-mode' = 'append', -- required: update mode when used as table sink, + -- only support append mode now. + + 'connector.properties.0.key' = 'zookeeper.connect', -- optional: connector specific properties + 'connector.properties.0.value' = 'localhost:2181', + 'connector.properties.1.key' = 'bootstrap.servers', + 'connector.properties.1.value' = 'localhost:9092', + 'connector.properties.2.key' = 'group.id', + 'connector.properties.2.value' = 'testGroup', + 'connector.startup-mode' = 'earliest-offset', -- optional: valid modes are "earliest-offset", + -- "latest-offset", "group-offsets", + -- or "specific-offsets" + + -- optional: used in case of startup mode with specific offsets + 'connector.specific-offsets.0.partition' = '0', + 'connector.specific-offsets.0.offset' = '42', + 'connector.specific-offsets.1.partition' = '1', + 'connector.specific-offsets.1.offset' = '300', + + 'connector.sink-partitioner' = '...', -- optional: output partitioning from Flink's partitions + -- into Kafka's partitions valid are "fixed" + -- (each Flink partition ends up in at most one Kafka partition), + -- "round-robin" (a Flink partition is distributed to + -- Kafka partitions round-robin) + -- "custom" (use a custom FlinkKafkaPartitioner subclass) + -- optional: used in case of sink partitioner custom + 'connector.sink-partitioner-class' = 'org.mycompany.MyPartitioner' +) +{% endhighlight %} +</div> </div> **Specify the start reading position:** By default, the Kafka source will start reading data from the committed group offsets in Zookeeper or Kafka brokers. You can specify other start positions, which correspond to the configurations in section [Kafka Consumers Start Position Configuration]({{ site.baseurl }}/dev/connectors/kafka.html#kafka-consumers-start-position-configuration). @@ -900,6 +1003,66 @@ connector: connection-path-prefix: "/v1" # optional: prefix string to be added to every REST communication {% endhighlight %} </div> + +<div data-lang="DDL" markdown="1"> +{% highlight sql %} +CREATE TABLE MyUserTable ( + ... +) WITH ( + 'connector.type' = 'elasticsearch', -- required: specify this table type is elasticsearch + + 'connector.version' = '6', -- required: valid connector versions are "6" + + 'connector.hosts.0.hostname' = 'host_name', -- required: one or more Elasticsearch hosts to connect to + 'connector.hosts.0.port' = '9092', + 'connector.hosts.0.protocol' = 'http', + + 'connector.index' = 'MyUsers', -- required: Elasticsearch index + + 'connector.document-type' = 'user', -- required: Elasticsearch document type + + 'update-mode' = 'append', -- optional: update mode when used as table sink. + + 'connector.key-delimiter' = '$', -- optional: delimiter for composite keys ("_" by default) + -- e.g., "$" would result in IDs "KEY1$KEY2$KEY3" + + 'connector.key-null-literal' = 'n/a', -- optional: representation for null fields in keys ("null" by default) + + 'connector.failure-handler' = '...', -- optional: failure handling strategy in case a request to + -- Elasticsearch fails ("fail" by default). + -- valid strategies are + -- "fail" (throws an exception if a request fails and + -- thus causes a job failure), + -- "ignore" (ignores failures and drops the request), + -- "retry-rejected" (re-adds requests that have failed due + -- to queue capacity saturation), + -- or "custom" for failure handling with a + -- ActionRequestFailureHandler subclass + + -- optional: configure how to buffer elements before sending them in bulk to the cluster for efficiency + 'connector.flush-on-checkpoint' = 'true', -- optional: disables flushing on checkpoint (see notes below!) + -- ("true" by default) + 'connector.bulk-flush.max-actions' = '42', -- optional: maximum number of actions to buffer + -- for each bulk request + 'connector.bulk-flush.max-size' = '42 mb', -- optional: maximum size of buffered actions in bytes + -- per bulk request + -- (only MB granularity is supported) + 'connector.bulk-flush.interval' = '60000', -- optional: bulk flush interval (in milliseconds) + 'connector.bulk-flush.back-off.type' = '...', -- optional: backoff strategy ("disabled" by default) + -- valid strategies are "disabled", "constant", + -- or "exponential" + 'connector.bulk-flush.back-off.max-retries' = '3', -- optional: maximum number of retries + 'connector.bulk-flush.back-off.delay' = '30000', -- optional: delay between each backoff attempt + -- (in milliseconds) + + -- optional: connection properties to be used during REST communication to Elasticsearch + 'connector.connection-max-retry-timeout' = '3', -- optional: maximum timeout (in milliseconds) + -- between retries + 'connector.connection-path-prefix' = '/v1' -- optional: prefix string to be added to every + -- REST communication +) +{% endhighlight %} +</div> </div> **Bulk flushing:** For more information about characteristics of the optional flushing parameters see the [corresponding low-level documentation]({{ site.baseurl }}/dev/connectors/elasticsearch.html). @@ -1019,6 +1182,38 @@ format: # null value (disabled by default) {% endhighlight %} </div> + +<div data-lang="DDL" markdown="1"> +{% highlight sql %} +CREATE TABLE MyUserTable ( + ... +) WITH ( + 'format.type' = 'csv', -- required: specify the schema type + + 'format.fields.0.name' = 'lon', -- required: define the schema either by using type information + 'format.fields.0.type' = 'FLOAT', + 'format.fields.1.name' = 'rideTime', + 'format.fields.1.type' = 'TIMESTAMP', + + 'format.derive-schema' = 'true', -- or use the table's schema + + 'format.field-delimiter' = ';', -- optional: field delimiter character (',' by default) + 'format.line-delimiter' = '\r\n', -- optional: line delimiter ("\n" by default; otherwise + -- "\r" or "\r\n" are allowed) + 'format.quote-character' = '''', -- optional: quote character for enclosing field values ('"' by default) + 'format.allow-comments' = true, -- optional: ignores comment lines that start with "#" + -- (disabled by default); + -- if enabled, make sure to also ignore parse errors to allow empty rows + 'format.ignore-parse-errors' = 'true', -- optional: skip fields and rows with parse errors instead of failing; + -- fields are set to null in case of errors + 'format.array-element-delimiter' = '|', -- optional: the array element delimiter string for separating + -- array and row element values (";" by default) + 'format.escape-character' = '\\', -- optional: escape character for escaping values (disabled by default) + 'format.null-literal' = 'n/a' -- optional: null literal string that is interpreted as a + -- null value (disabled by default) +) +{% endhighlight %} +</div> </div> The following table lists supported types that can be read and written: @@ -1171,6 +1366,38 @@ format: derive-schema: true {% endhighlight %} </div> + +<div data-lang="DDL" markdown="1"> +{% highlight sql %} +CREATE TABLE MyUserTable ( + ... +) WITH ( + 'format.type' = 'json', -- required: specify the format type + 'format.fail-on-missing-field' = 'true' -- optional: flag whether to fail if a field is missing or not, false by default + + 'format.fields.0.name' = 'lon', -- required: define the schema either by using a type string which parses numbers to corresponding types + 'format.fields.0.type' = 'FLOAT', + 'format.fields.1.name' = 'rideTime', + 'format.fields.1.type' = 'TIMESTAMP', + + 'format.json-schema' = -- or by using a JSON schema which parses to DECIMAL and TIMESTAMP + '{ + "type": "object", + "properties": { + "lon": { + "type": "number" + }, + "rideTime": { + "type": "string", + "format": "date-time" + } + } + }', + + 'format.derive-schema' = 'true' -- or use the table's schema +) +{% endhighlight %} +</div> </div> The following table shows the mapping of JSON schema types to Flink SQL types: @@ -1318,6 +1545,27 @@ format: } {% endhighlight %} </div> + +<div data-lang="DDL" markdown="1"> +{% highlight sql %} +CREATE TABLE MyUserTable ( + ... +) WITH ( + 'format.type' = 'avro', -- required: specify the schema type + 'format.record-class' = 'org.organization.types.User', -- required: define the schema either by using an Avro specific record class + + 'format.avro-schema' = -- or by using an Avro schema + '{ + "type": "record", + "name": "test", + "fields" : [ + {"name": "a", "type": "long"}, + {"name": "b", "type": "string"} + ] + }' +) +{% endhighlight %} +</div> </div> Avro types are mapped to the corresponding SQL data types. Union types are only supported for specifying nullability otherwise they are converted to an `ANY` type. The following table shows the mapping: @@ -1410,6 +1658,28 @@ format: ignore-parse-errors: true # optional: skip records with parse error instead of failing by default {% endhighlight %} </div> + +<div data-lang="DDL" markdown="1"> +{% highlight sql %} +CREATE TABLE MyUserTable ( + ... +) WITH ( + 'format.type' = 'csv', -- required: specify the schema type + + 'format.fields.0.name' = 'lon', -- required: define the schema either by using type information + 'format.fields.0.type' = 'FLOAT', + 'format.fields.1.name' = 'rideTime', + 'format.fields.1.type' = 'TIMESTAMP', + + 'format.field-delimiter' = ',', -- optional: string delimiter "," by default + 'format.line-delimiter' = '\n', -- optional: string delimiter "\n" by default + 'format.quote-character' = '"', -- optional: single character for string values, empty by default + 'format.comment-prefix' = '#', -- optional: string to indicate comments, empty by default + 'format.ignore-first-line' = 'false', -- optional: boolean flag to ignore the first line, by default it is not skipped + 'format.ignore-parse-errors' = 'true' -- optional: skip records with parse error instead of failing by default +) +{% endhighlight %} +</div> </div> The old CSV format is included in Flink and does not require additional dependencies. diff --git a/docs/dev/table/connect.zh.md b/docs/dev/table/connect.zh.md index 9aeacd9..6deb6af 100644 --- a/docs/dev/table/connect.zh.md +++ b/docs/dev/table/connect.zh.md @@ -1,5 +1,5 @@ --- -title: "连接外部系统" +title: "Connect to External Systems" nav-parent_id: tableapi nav-pos: 19 --- @@ -122,6 +122,12 @@ format: ... schema: ... {% endhighlight %} </div> + +<div data-lang="DDL" markdown="1"> +{% highlight sql %} +tableEnvironment.sqlUpdate("CREATE TABLE MyTable (...) WITH (...)") +{% endhighlight %} +</div> </div> The table's type (`source`, `sink`, or `both`) determines how a table is registered. In case of table type `both`, both a table source and table sink are registered under the same name. Logically, this means that we can both read and write to such a table similarly to a table in a regular DBMS. @@ -276,6 +282,39 @@ tables: type: VARCHAR {% endhighlight %} </div> + +<div data-lang="DDL" markdown="1"> +{% highlight sql %} +CREATE TABLE MyUserTable ( + `user` BIGINT, + message VARCHAR, + ts VARCHAR +) WITH ( + -- declare the external system to connect to + 'connector.type' = 'kafka', + 'connector.version' = '0.10', + 'connector.topic' = 'topic_name', + 'connector.startup-mode' = 'earliest-offset', + 'connector.properties.0.key' = 'zookeeper.connect', + 'connector.properties.0.value' = 'localhost:2181', + 'connector.properties.1.key' = 'bootstrap.servers', + 'connector.properties.1.value' = 'localhost:9092', + 'update-mode' = 'append', + -- declare a format for this system + 'format.type' = 'avro', + 'format.avro-schema' = '{ + "namespace": "org.myorganization", + "type": "record", + "name": "UserMessage", + "fields": [ + {"name": "ts", "type": "string"}, + {"name": "user", "type": "long"}, + {"name": "message", "type": ["string", "null"]} + ] + }' +) +{% endhighlight %} +</div> </div> In both ways the desired connection properties are converted into normalized, string-based key-value pairs. So-called [table factories](sourceSinks.html#define-a-tablefactory) create configured table sources, table sinks, and corresponding formats from the key-value pairs. All table factories that can be found via Java's [Service Provider Interfaces (SPI)](https://docs.oracle.com/javase/tutorial/sound/SPI-intro.html) are taken into account when searching for exactly-one matching table factory. @@ -603,6 +642,16 @@ tables: update-mode: append # otherwise: "retract" or "upsert" {% endhighlight %} </div> + +<div data-lang="DDL" markdown="1"> +{% highlight sql %} +CREATE TABLE MyTable ( + ... +) WITH ( + 'update-mode' = 'append' -- otherwise: 'retract' or 'upsert' +) +{% endhighlight %} +</div> </div> See also the [general streaming concepts documentation](streaming/dynamic_tables.html#continuous-queries) for more information. @@ -652,6 +701,17 @@ connector: path: "file:///path/to/whatever" # required: path to a file or directory {% endhighlight %} </div> + +<div data-lang="DDL" markdown="1"> +{% highlight sql %} +CREATE TABLE MyUserTable ( + ... +) WITH ( + 'connector.type' = 'filesystem', -- required: specify to connector type + 'connector.path' = 'file:///path/to/whatever' -- required: path to a file or directory +) +{% endhighlight %} +</div> </div> The file system connector itself is included in Flink and does not require an additional dependency. A corresponding format needs to be specified for reading and writing rows from and to a file system. @@ -703,7 +763,7 @@ The Kafka connector allows for reading and writing from and to an Apache Kafka t .version("0.11") # required: valid connector versions are # "0.8", "0.9", "0.10", "0.11", and "universal" .topic("...") # required: topic name from which the table is read - + # optional: connector specific properties .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092") @@ -753,6 +813,49 @@ connector: sink-partitioner-class: org.mycompany.MyPartitioner # optional: used in case of sink partitioner custom {% endhighlight %} </div> + +<div data-lang="DDL" markdown="1"> +{% highlight sql %} +CREATE TABLE MyUserTable ( + ... +) WITH ( + 'connector.type' = 'kafka', + + 'connector.version' = '0.11', -- required: valid connector versions are + -- "0.8", "0.9", "0.10", "0.11", and "universal" + + 'connector.topic' = 'topic_name', -- required: topic name from which the table is read + + 'update-mode' = 'append', -- required: update mode when used as table sink, + -- only support append mode now. + + 'connector.properties.0.key' = 'zookeeper.connect', -- optional: connector specific properties + 'connector.properties.0.value' = 'localhost:2181', + 'connector.properties.1.key' = 'bootstrap.servers', + 'connector.properties.1.value' = 'localhost:9092', + 'connector.properties.2.key' = 'group.id', + 'connector.properties.2.value' = 'testGroup', + 'connector.startup-mode' = 'earliest-offset', -- optional: valid modes are "earliest-offset", + -- "latest-offset", "group-offsets", + -- or "specific-offsets" + + -- optional: used in case of startup mode with specific offsets + 'connector.specific-offsets.0.partition' = '0', + 'connector.specific-offsets.0.offset' = '42', + 'connector.specific-offsets.1.partition' = '1', + 'connector.specific-offsets.1.offset' = '300', + + 'connector.sink-partitioner' = '...', -- optional: output partitioning from Flink's partitions + -- into Kafka's partitions valid are "fixed" + -- (each Flink partition ends up in at most one Kafka partition), + -- "round-robin" (a Flink partition is distributed to + -- Kafka partitions round-robin) + -- "custom" (use a custom FlinkKafkaPartitioner subclass) + -- optional: used in case of sink partitioner custom + 'connector.sink-partitioner-class' = 'org.mycompany.MyPartitioner' +) +{% endhighlight %} +</div> </div> **Specify the start reading position:** By default, the Kafka source will start reading data from the committed group offsets in Zookeeper or Kafka brokers. You can specify other start positions, which correspond to the configurations in section [Kafka Consumers Start Position Configuration]({{ site.baseurl }}/dev/connectors/kafka.html#kafka-consumers-start-position-configuration). @@ -900,6 +1003,66 @@ connector: connection-path-prefix: "/v1" # optional: prefix string to be added to every REST communication {% endhighlight %} </div> + +<div data-lang="DDL" markdown="1"> +{% highlight sql %} +CREATE TABLE MyUserTable ( + ... +) WITH ( + 'connector.type' = 'elasticsearch', -- required: specify this table type is elasticsearch + + 'connector.version' = '6', -- required: valid connector versions are "6" + + 'connector.hosts.0.hostname' = 'host_name', -- required: one or more Elasticsearch hosts to connect to + 'connector.hosts.0.port' = '9092', + 'connector.hosts.0.protocol' = 'http', + + 'connector.index' = 'MyUsers', -- required: Elasticsearch index + + 'connector.document-type' = 'user', -- required: Elasticsearch document type + + 'update-mode' = 'append', -- optional: update mode when used as table sink. + + 'connector.key-delimiter' = '$', -- optional: delimiter for composite keys ("_" by default) + -- e.g., "$" would result in IDs "KEY1$KEY2$KEY3" + + 'connector.key-null-literal' = 'n/a', -- optional: representation for null fields in keys ("null" by default) + + 'connector.failure-handler' = '...', -- optional: failure handling strategy in case a request to + -- Elasticsearch fails ("fail" by default). + -- valid strategies are + -- "fail" (throws an exception if a request fails and + -- thus causes a job failure), + -- "ignore" (ignores failures and drops the request), + -- "retry-rejected" (re-adds requests that have failed due + -- to queue capacity saturation), + -- or "custom" for failure handling with a + -- ActionRequestFailureHandler subclass + + -- optional: configure how to buffer elements before sending them in bulk to the cluster for efficiency + 'connector.flush-on-checkpoint' = 'true', -- optional: disables flushing on checkpoint (see notes below!) + -- ("true" by default) + 'connector.bulk-flush.max-actions' = '42', -- optional: maximum number of actions to buffer + -- for each bulk request + 'connector.bulk-flush.max-size' = '42 mb', -- optional: maximum size of buffered actions in bytes + -- per bulk request + -- (only MB granularity is supported) + 'connector.bulk-flush.interval' = '60000', -- optional: bulk flush interval (in milliseconds) + 'connector.bulk-flush.back-off.type' = '...', -- optional: backoff strategy ("disabled" by default) + -- valid strategies are "disabled", "constant", + -- or "exponential" + 'connector.bulk-flush.back-off.max-retries' = '3', -- optional: maximum number of retries + 'connector.bulk-flush.back-off.delay' = '30000', -- optional: delay between each backoff attempt + -- (in milliseconds) + + -- optional: connection properties to be used during REST communication to Elasticsearch + 'connector.connection-max-retry-timeout' = '3', -- optional: maximum timeout (in milliseconds) + -- between retries + 'connector.connection-path-prefix' = '/v1' -- optional: prefix string to be added to every + -- REST communication +) +{% endhighlight %} +</div> </div> **Bulk flushing:** For more information about characteristics of the optional flushing parameters see the [corresponding low-level documentation]({{ site.baseurl }}/dev/connectors/elasticsearch.html). @@ -977,17 +1140,17 @@ The CSV format can be used as follows: # or use the table's schema .derive_schema() - .field_delimiter(";") # optional: field delimiter character ("," by default) + .field_delimiter(';') # optional: field delimiter character (',' by default) .line_delimiter("\r\n") # optional: line delimiter ("\n" by default; # otherwise "\r" or "\r\n" are allowed) - .quote_character("'") # optional: quote character for enclosing field values ('"' by default) - .allow_comments() # optional: ignores comment lines that start with "#" (disabled by default); + .quote_character('\'') # optional: quote character for enclosing field values ('"' by default) + .allow_comments() # optional: ignores comment lines that start with '#' (disabled by default); # if enabled, make sure to also ignore parse errors to allow empty rows .ignore_parse_errors() # optional: skip fields and rows with parse errors instead of failing; # fields are set to null in case of errors .array_element_delimiter("|") # optional: the array element delimiter string for separating # array and row element values (";" by default) - .escape_character("\\") # optional: escape character for escaping values (disabled by default) + .escape_character('\\') # optional: escape character for escaping values (disabled by default) .null_literal("n/a") # optional: null literal string that is interpreted as a # null value (disabled by default) ) @@ -1019,6 +1182,38 @@ format: # null value (disabled by default) {% endhighlight %} </div> + +<div data-lang="DDL" markdown="1"> +{% highlight sql %} +CREATE TABLE MyUserTable ( + ... +) WITH ( + 'format.type' = 'csv', -- required: specify the schema type + + 'format.fields.0.name' = 'lon', -- required: define the schema either by using type information + 'format.fields.0.type' = 'FLOAT', + 'format.fields.1.name' = 'rideTime', + 'format.fields.1.type' = 'TIMESTAMP', + + 'format.derive-schema' = 'true', -- or use the table's schema + + 'format.field-delimiter' = ';', -- optional: field delimiter character (',' by default) + 'format.line-delimiter' = '\r\n', -- optional: line delimiter ("\n" by default; otherwise + -- "\r" or "\r\n" are allowed) + 'format.quote-character' = '''', -- optional: quote character for enclosing field values ('"' by default) + 'format.allow-comments' = true, -- optional: ignores comment lines that start with "#" + -- (disabled by default); + -- if enabled, make sure to also ignore parse errors to allow empty rows + 'format.ignore-parse-errors' = 'true', -- optional: skip fields and rows with parse errors instead of failing; + -- fields are set to null in case of errors + 'format.array-element-delimiter' = '|', -- optional: the array element delimiter string for separating + -- array and row element values (";" by default) + 'format.escape-character' = '\\', -- optional: escape character for escaping values (disabled by default) + 'format.null-literal' = 'n/a' -- optional: null literal string that is interpreted as a + -- null value (disabled by default) +) +{% endhighlight %} +</div> </div> The following table lists supported types that can be read and written: @@ -1171,6 +1366,38 @@ format: derive-schema: true {% endhighlight %} </div> + +<div data-lang="DDL" markdown="1"> +{% highlight sql %} +CREATE TABLE MyUserTable ( + ... +) WITH ( + 'format.type' = 'json', -- required: specify the format type + 'format.fail-on-missing-field' = 'true' -- optional: flag whether to fail if a field is missing or not, false by default + + 'format.fields.0.name' = 'lon', -- required: define the schema either by using a type string which parses numbers to corresponding types + 'format.fields.0.type' = 'FLOAT', + 'format.fields.1.name' = 'rideTime', + 'format.fields.1.type' = 'TIMESTAMP', + + 'format.json-schema' = -- or by using a JSON schema which parses to DECIMAL and TIMESTAMP + '{ + "type": "object", + "properties": { + "lon": { + "type": "number" + }, + "rideTime": { + "type": "string", + "format": "date-time" + } + } + }', + + 'format.derive-schema' = 'true' -- or use the table's schema +) +{% endhighlight %} +</div> </div> The following table shows the mapping of JSON schema types to Flink SQL types: @@ -1318,6 +1545,27 @@ format: } {% endhighlight %} </div> + +<div data-lang="DDL" markdown="1"> +{% highlight sql %} +CREATE TABLE MyUserTable ( + ... +) WITH ( + 'format.type' = 'avro', -- required: specify the schema type + 'format.record-class' = 'org.organization.types.User', -- required: define the schema either by using an Avro specific record class + + 'format.avro-schema' = -- or by using an Avro schema + '{ + "type": "record", + "name": "test", + "fields" : [ + {"name": "a", "type": "long"}, + {"name": "b", "type": "string"} + ] + }' +) +{% endhighlight %} +</div> </div> Avro types are mapped to the corresponding SQL data types. Union types are only supported for specifying nullability otherwise they are converted to an `ANY` type. The following table shows the mapping: @@ -1410,6 +1658,28 @@ format: ignore-parse-errors: true # optional: skip records with parse error instead of failing by default {% endhighlight %} </div> + +<div data-lang="DDL" markdown="1"> +{% highlight sql %} +CREATE TABLE MyUserTable ( + ... +) WITH ( + 'format.type' = 'csv', -- required: specify the schema type + + 'format.fields.0.name' = 'lon', -- required: define the schema either by using type information + 'format.fields.0.type' = 'FLOAT', + 'format.fields.1.name' = 'rideTime', + 'format.fields.1.type' = 'TIMESTAMP', + + 'format.field-delimiter' = ',', -- optional: string delimiter "," by default + 'format.line-delimiter' = '\n', -- optional: string delimiter "\n" by default + 'format.quote-character' = '"', -- optional: single character for string values, empty by default + 'format.comment-prefix' = '#', -- optional: string to indicate comments, empty by default + 'format.ignore-first-line' = 'false', -- optional: boolean flag to ignore the first line, by default it is not skipped + 'format.ignore-parse-errors' = 'true' -- optional: skip records with parse error instead of failing by default +) +{% endhighlight %} +</div> </div> The old CSV format is included in Flink and does not require additional dependencies. @@ -1532,7 +1802,6 @@ table.insertInto("csvOutputTable") {% endhighlight %} </div> - <div data-lang="python" markdown="1"> {% highlight python %}
