[ 
https://issues.apache.org/jira/browse/FLINK-9947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16563752#comment-16563752
 ] 

ASF GitHub Bot commented on FLINK-9947:
---------------------------------------

fhueske commented on a change in pull request #6456: [FLINK-9947] [docs] 
Document unified table sources/sinks/formats
URL: https://github.com/apache/flink/pull/6456#discussion_r206546032
 
 

 ##########
 File path: docs/dev/table/connect.md
 ##########
 @@ -0,0 +1,1033 @@
+---
+title: "Connect to External Systems"
+nav-parent_id: tableapi
+nav-pos: 19
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Flink's Table API & SQL programs can be connected to other external systems 
for reading and writing both batch and streaming tables. A table source 
provides access to data which is stored in external systems (such as a 
database, key-value store, message queue, or file system). A table sink emits a 
table to an external storage system. Depending on the type of source and sink, 
they support different formats such as CSV, Parquet, or ORC.
+
+This page describes how to declare built-in table sources and/or table sinks 
and register them in Flink. After a source, sink, or both have been registered, 
they can be accessed by Table API & SQL queries.
+
+<span class="label label-danger">Attention</span> If you want to implement 
your own *custom* table source or sink, have a look at the [user-defined 
sources & sinks page](sourceSinks.html).
+
+* This will be replaced by the TOC
+{:toc}
+
+Dependencies
+------------
+
+The following table list all available connectors and formats. Their mutual 
compatibility is tagged in the corresponding sections for [table 
connectors](connect.html#table-connectors) and [table 
formats](connect.html#table-formats). The following table provides dependency 
information for both projects using a build automation tool (such as Maven or 
SBT) and SQL Client with SQL JAR bundles.
+
+{% if site.is_stable %}
+
+### Connectors
+
+| Name              | Version       | Maven dependency             | SQL 
Client JAR         |
+| :---------------- | :------------ | :--------------------------- | 
:----------------------|
+| Filesystem        |               | Built-in                     | Built-in  
             |
+| Apache Kafka      | 0.8           | `flink-connector-kafka-0.8`  | Not 
available          |
+| Apache Kafka      | 0.9           | `flink-connector-kafka-0.9`  | 
[Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.9{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.9{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar)
 |
+| Apache Kafka      | 0.10          | `flink-connector-kafka-0.10` | 
[Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.10{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.10{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar)
 |
+| Apache Kafka      | 0.11          | `flink-connector-kafka-0.11` | 
[Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.11{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.11{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar)
 |
+
+### Formats
+
+| Name              | Maven dependency             | SQL Client JAR         |
+| :---------------- | :--------------------------- | :--------------------- |
+| CSV               | Built-in                     | Built-in               |
+| JSON              | `flink-json`                 | 
[Download](http://central.maven.org/maven2/org/apache/flink/flink-json/{{site.version}}/flink-json-{{site.version}}-sql-jar.jar)
 |
+| Apache Avro       | `flink-avro`                 | 
[Download](http://central.maven.org/maven2/org/apache/flink/flink-avro/{{site.version}}/flink-avro-{{site.version}}-sql-jar.jar)
 |
+
+{% else %}
+
+This table is only available for stable releases.
+
+{% endif %}
+
+{% top %}
+
+Overview
+--------
+
+Beginning from Flink 1.6, the declaration of a connection to an external 
system is separated from the actual implementation. Connections can be 
specified either
+
+- **programmatically** using a `Descriptor` under 
`org.apache.flink.table.descriptors` for Table & SQL API
+- or **declaratively** via [YAML configuration files](http://yaml.org/) for 
the SQL Client.
+
+This allows not only for better unification of APIs and SQL Client but also 
for better extensibility in case of [custom implementations](sourceSinks.html) 
without changing the declaration.
+
+Similar to a SQL `CREATE TABLE` statement, one can define the name of the 
table, the final schema of the table, connector, and a data format upfront for 
connecting to an external system. Additionally, the table's type (source, sink, 
or both) and an update mode for streaming queries can be specified:
+
+<div class="codetabs" markdown="1">
+<div data-lang="Java/Scala" markdown="1">
+{% highlight java %}
+tableEnvironment
+  .connect(...)
+  .withFormat(...)
+  .withSchema(...)
+  .inAppendMode()
+  .registerTableSource(...)
+{% endhighlight %}
+</div>
+
+<div data-lang="YAML" markdown="1">
+{% highlight yaml %}
+name: MyTable
+type: source
+update-mode: append
+schema: ...
+format: ...
+connector: ...
+{% endhighlight %}
+</div>
+</div>
+
+The subsequent sections will cover each definition part 
([schema](connect.html#table-schema), 
[connector](connect.html#table-connectors), 
[format](connect.html#table-formats), and [update 
mode](connect.html#update-modes)) in more detail.
+
+The following code shows a full example of how to connect to Kafka for reading 
Avro records.
+
+<div class="codetabs" markdown="1">
+<div data-lang="Java/Scala" markdown="1">
+{% highlight java %}
+tableEnvironment
+  // declare the external system to connect to
+  .connect(
+    new Kafka()
+      .version("0.10")
+      .topic("test-input")
+      .startFromEarliest()
+      .property("zookeeper.connect", "localhost:2181")
+      .property("bootstrap.servers", "localhost:9092")
+  )
+
+  // declare a format for this system
+  .withFormat(
+    new Avro()
+      .avroSchema(
+        "{" +
+        "  \"namespace\": \"org.myorganization\"," +
+        "  \"type\": \"record\"," +
+        "  \"name\": \"UserMessage\"," +
+        "    \"fields\": [" +
+        "      {\"name\": \"timestamp\", \"type\": \"string\"}," +
+        "      {\"name\": \"user\", \"type\": \"long\"}," +
+        "      {\"name\": \"message\", \"type\": [\"string\", \"null\"]}" +
+        "    ]" +
+        "}" +
+      )
+  )
+
+  // declare the final schema of the table
+  .withSchema(
+    new Schema()
+      .field("rowtime", Types.SQL_TIMESTAMP)
+        .rowtime(new Rowtime()
+          .timestampsFromField("ts")
+          .watermarksPeriodicBounded(60000)
+        )
+      .field("user", Types.LONG)
+      .field("message", Types.STRING)
+  )
+
+  // specify the update-mode for streaming tables
+  .inAppendMode()
+
+  // register as source, sink, or both and under a name
+  .registerTableSource("MyUserTable");
+{% endhighlight %}
+</div>
+
+<div data-lang="YAML" markdown="1">
+{% highlight yaml %}
+tables:
+  - name: MyUserTable      # name the new table
+    type: source           # declare if the table should be "source", "sink", 
or "both"
+    update-mode: append    # specify the update-mode for streaming tables
+
+    # declare the final schema of the table
+    schema:
+      - name: rowtime
+        type: TIMESTAMP
+        rowtime:
+          timestamps:
+            type: from-field
+            from: ts
+          watermarks:
+            type: periodic-bounded
+            delay: "60000"
+      - name: user
+        type: BIGINT
+      - name: message
+        type: VARCHAR
+
+    # declare a format for this system
+    format:
+      type: avro
+      avro-schema: >
+        {
+          "namespace": "org.myorganization",
+          "type": "record",
+          "name": "UserMessage",
+            "fields": [
+              {"name": "ts", "type": "string"},
+              {"name": "user", "type": "long"},
+              {"name": "message", "type": ["string", "null"]}
+            ]
+        }
+
+    # declare the external system to connect to
+    connector:
+      type: kafka
+      version: "0.10"
+      topic: test-input
+      startup-mode: earliest-offset
+      properties:
+        - key: zookeeper.connect
+          value: localhost:2181
+        - key: bootstrap.servers
+          value: localhost:9092
+{% endhighlight %}
+</div>
+</div>
+
+In both ways the desired connection properties are converted into normalized, 
string-based key-value pairs. So-called [table 
factories](sourceSinks.html#define-a-tablefactory) create configured table 
sources, table sinks, and corresponding formats from the key-value pairs. All 
table factories that can be found via Java's [Service Provider Interfaces 
(SPI)](https://docs.oracle.com/javase/tutorial/sound/SPI-intro.html) are taken 
into when searching for exactly-one matching table factory.
+
+If no factory can be found or multiple factories match for the given 
properties, an exception will be thrown with additional information about 
considered factories and supported properties.
+
+Table Schema
+------------
+
+The table schema allows for describing the final appearance of a table. It 
specifies the final name, final type, and the origin of a field. The origin of 
a field might be important if the name of the field should differ from the 
input/output format. For instance, a field `name&field` should reference 
`nameField` from an Avro format. Additionally, the schema is needed to map 
column names and types from an external system to Flink's representation. In 
case of a table sink, it ensures that only data with valid schema is written to 
an external system.
+
+The following example shows a simple schema without time attributes and 
one-to-one field mapping of input/output to table columns. 
+
+<div class="codetabs" markdown="1">
+<div data-lang="Java/Scala" markdown="1">
+{% highlight java %}
+.withSchema(
+  new Schema()
+    .field("MyField1", Types.INT)     // required: specify the fields of the 
table (in this order)
+    .field("MyField2", Types.STRING)
+    .field("MyField3", Types.BOOLEAN)
+)
+{% endhighlight %}
+</div>
+
+<div data-lang="YAML" markdown="1">
+{% highlight yaml %}
+schema:
+  - name: MyField1    # required: specify the fields of the table (in this 
order)
+    type: INT
+  - name: MyField2
+    type: VARCHAR
+  - name: MyField3
+    type: BOOLEAN
+{% endhighlight %}
+</div>
+</div>
+
+For *each field*, the following properties can be declared in addition to the 
column's name and type:
+
+<div class="codetabs" markdown="1">
+<div data-lang="Java/Scala" markdown="1">
+{% highlight java %}
+.withSchema(
+  new Schema()
+    .field("MyField1", Types.SQL_TIMESTAMP)
+      .proctime()      // optional: declares this field as a processing-time 
attribute
+    .field("MyField2", Types.SQL_TIMESTAMP)
+      .rowtime(...)    // optional: declares this field as a event-time 
attribute
+    .field("MyField3", Types.BOOLEAN)
+      .from("mf3")     // optional: original field in the input that is 
referenced/aliased by this field
+)
+{% endhighlight %}
+</div>
+
+<div data-lang="YAML" markdown="1">
+{% highlight yaml %}
+schema:
+  - name: MyField1
+    type: TIMESTAMP
+    proctime: true    # optional: boolean flag whether this field should be a 
processing-time attribute
+  - name: MyField2
+    type: TIMESTAMP
+    rowtime: ...      # optional: wether this field should be a event-time 
attribute
+  - name: MyField3
+    type: BOOLEAN
+    from: mf3         # optional: original field in the input that is 
referenced/aliased by this field
+{% endhighlight %}
+</div>
+</div>
+
+Time attributes are essential when working with unbounded streaming tables. 
Therefore both processing-time and event-time (also known as "rowtime") 
attributes can be defined as part of the schema.
+
+For more information about time handling in Flink and especially event-time, 
we recommend the general [event-time section](streaming.html#time-attributes).
+
+### Rowtime Attributes
+
+In order to control the event-time behavior for tables, Flink provides 
predefined timestamp extractors and watermark strategies.
+
+The following timestamp extractors are supported:
+
+<div class="codetabs" markdown="1">
+<div data-lang="Java/Scala" markdown="1">
+{% highlight java %}
+// Converts an existing LONG or SQL_TIMESTAMP field in the input into the 
rowtime attribute.
+.rowtime(
+  new Rowtime()
+    .timestampsFromField("ts_field")    // required: original field name in 
the input
+)
+
+// Converts the assigned timestamps from a DataStream API record into the 
rowtime attribute 
+// and thus preserves the assigned timestamps from the source.
+// This requires a source that assigns timestamps (e.g., Kafka 0.10+).
+.rowtime(
+  new Rowtime()
+    .timestampsFromSource()
+)
+
+// Sets a custom timestamp extractor to be used for the rowtime attribute.
+// The extractor must extend 
`org.apache.flink.table.sources.tsextractors.TimestampExtractor`.
+.rowtime(
+  new Rowtime()
+    .timestampsFromExtractor(...)
+)
+{% endhighlight %}
+</div>
+
+<div data-lang="YAML" markdown="1">
+{% highlight yaml %}
+# Converts an existing BIGINT or TIMESTAMP field in the input into the rowtime 
attribute.
+rowtime:
+  timestamps:
+    type: from-field
+    from: "ts_field"                 # required: original field name in the 
input
+
+# Converts the assigned timestamps from a DataStream API record into the 
rowtime attribute 
+# and thus preserves the assigned timestamps from the source.
+rowtime:
+  timestamps:
+    type: from-source
+{% endhighlight %}
+</div>
+</div>
+
+The following watermark strategies are supported:
+
+<div class="codetabs" markdown="1">
+<div data-lang="Java/Scala" markdown="1">
+{% highlight java %}
+// Sets a watermark strategy for ascending rowtime attributes. Emits a 
watermark of the maximum 
+// observed timestamp so far minus 1. Rows that have a timestamp equal to the 
max timestamp
+// are not late.
+.rowtime(
+  new Rowtime()
+    .watermarksPeriodicAscending()
+)
+
+// Sets a built-in watermark strategy for rowtime attributes which are 
out-of-order by a bounded time interval.
+// Emits watermarks which are the maximum observed timestamp minus the 
specified delay.
+.rowtime(
+  new Rowtime()
+    .watermarksPeriodicBounded(2000)    // delay in milliseconds
+)
+
+// Sets a built-in watermark strategy which indicates the watermarks should be 
preserved from the
+// underlying DataStream API and thus preserves the assigned watermarks from 
the source.
+.rowtime(
+  new Rowtime()
+    .watermarksFromSource()
+)
+{% endhighlight %}
+</div>
+
+<div data-lang="YAML" markdown="1">
+{% highlight yaml %}
+# Sets a watermark strategy for ascending rowtime attributes. Emits a 
watermark of the maximum 
+# observed timestamp so far minus 1. Rows that have a timestamp equal to the 
max timestamp
+# are not late.
+rowtime:
+  watermarks:
+    type: periodic-ascending
+
+# Sets a built-in watermark strategy for rowtime attributes which are 
out-of-order by a bounded time interval.
+# Emits watermarks which are the maximum observed timestamp minus the 
specified delay.
+rowtime:
+  watermarks:
+    type: periodic-bounded
+    delay: ...                # required: delay in milliseconds
+
+# Sets a built-in watermark strategy which indicates the watermarks should be 
preserved from the
+# underlying DataStream API and thus preserves the assigned watermarks from 
the source.
+rowtime:
+  watermarks:
+    type: from-source
+{% endhighlight %}
+</div>
+</div>
+
+Make sure to always declare both timestamps and watermarks. Watermarks are 
required for triggering time-based operations.
+
+### Type Strings
+
+Because type information is only available in a programming language, the 
following type strings are supported for being defined in a YAML file:
+
+{% highlight yaml %}
+VARCHAR
+BOOLEAN
+TINYINT
+SMALLINT
+INT
+BIGINT
+FLOAT
+DOUBLE
+DECIMAL
+DATE
+TIME
+TIMESTAMP
+ROW(fieldtype, ...)              # unnamed row; e.g. ROW(VARCHAR, INT) that is 
mapped to Flink's RowTypeInfo
+                                 # with indexed fields names f0, f1, ...
+ROW(fieldname fieldtype, ...)    # named row; e.g., ROW(myField VARCHAR, 
myOtherField INT) that
+                                 # is mapped to Flink's RowTypeInfo
+POJO(class)                      # e.g., POJO(org.mycompany.MyPojoClass) that 
is mapped to Flink's PojoTypeInfo
+ANY(class)                       # e.g., ANY(org.mycompany.MyClass) that is 
mapped to Flink's GenericTypeInfo
+ANY(class, serialized)           # used for type information that is not 
supported by Flink's Table & SQL API
+{% endhighlight %}
+
+{% top %}
+
+Update Modes
+------------
+
+For streaming queries, it is required to declare how to perform the 
[conversion between a dynamic table and an external 
connector](streaming.html#dynamic-tables--continuous-queries). The *update 
mode* specifies which kind of messages should be exchanged with the external 
system:
+
+**Append Mode:** In append mode, a dynamic table and an external connector 
only exchange INSERT messages.
+
+**Retract Mode:** In retract mode, a dynamic table and an external connector 
exchange ADD and RETRACT messages. An INSERT change is encoded as an ADD 
message, a DELETE change as a RETRACT message, and an UPDATE change as a 
RETRACT message for the updated (previous) row and an ADD message for the 
updating (new) row. In this mode, a key must not be defined as opposed to 
upsert mode. However, every update consists of two messages which is less 
efficient.
+
+**Upsert Mode:** In upsert mode, a dynamic table and an external connector 
exchange UPSERT and DELETE messages. This mode requires a (possibly composite) 
unique key by which updates can be propagated. The external connector needs to 
be aware of the unique key attribute in order to apply messages correctly. 
INSERT and UPDATE changes are encoded as UPSERT messages. DELETE changes as 
DELETE messages. The main difference to a retract stream is that UPDATE changes 
are encoded with a single message and are therefore more efficient.
+
+<span class="label label-danger">Attention</span> The documentation of each 
connector states which update modes are supported.
+
+<div class="codetabs" markdown="1">
+<div data-lang="Java/Scala" markdown="1">
+{% highlight java %}
+.connect(...)
+  .inAppendMode()    // otherwise: inUpsertMode() or inRetractMode()
+{% endhighlight %}
+</div>
+
+<div data-lang="YAML" markdown="1">
+{% highlight yaml %}
+tables:
+  - name: ...
+    update-mode: append    # otherwise: "retract" or "upsert"
+{% endhighlight %}
+</div>
+</div>
+
+See also the [general streaming concepts 
documentation](streaming.html#dynamic-tables--continuous-queries) for more 
information.
+
+{% top %}
+
+Table Connectors
+----------------
+
+Flink provides a set of connectors for connecting to external systems.
+
+Please note that not all connectors are available in both batch and streaming 
yet. Furthermore, not every streaming connector supports every streaming mode. 
Therefore, each connector is tagged accordingly. A format tag indicates that 
the connector requires a certain type of format.
+
+### File System Connector
 
 Review comment:
   I would add that the file system source and sink support for streaming is 
only experimental. We do not support actual streaming use cases, i.e., 
directory monitoring and bucket output.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Document unified table sources/sinks/formats
> --------------------------------------------
>
>                 Key: FLINK-9947
>                 URL: https://issues.apache.org/jira/browse/FLINK-9947
>             Project: Flink
>          Issue Type: Improvement
>          Components: Documentation, Table API &amp; SQL
>            Reporter: Timo Walther
>            Assignee: Timo Walther
>            Priority: Major
>              Labels: pull-request-available
>
> The recent unification of table sources/sinks/formats needs documentation. I 
> propose a new page that explains the built-in sources, sinks, and formats as 
> well as a page for customization of public interfaces.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to