twalthr commented on a change in pull request #15837:
URL: https://github.com/apache/flink/pull/15837#discussion_r627243022



##########
File path: docs/content/docs/dev/table/data_stream_api.md
##########
@@ -0,0 +1,2106 @@
+---
+title: "DataStream API Integration"
+weight: 3
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# DataStream API Integration
+
+{{< hint info >}}
+Tthis page only discusses the integration with DataStream API in JVM languages 
such as Java or Scala.
+For Python, see the [Python API]({{< ref "docs/dev/python/overview" >}}) area.
+{{< /hint >}}
+
+Both Table & SQL API and DataStream API are equality important when it comes 
to defining a data
+processing pipeline.
+
+The DataStream API offers the primitives of stream processing (namely time, 
state, and dataflow
+management) in a rather low-level imperative programming API. The Table & SQL 
API abstracts many
+internals and provides a structured and declarative API.
+
+Both APIs can work with bounded *and* unbounded streams.
+
+Bounded streams need to be managed when processing historical data. Unbounded 
streams occur
+in real-time processing scenarios that might be initialized with historical 
data first.
+
+For efficient execution, both APIs offer processing bounded streams in an 
optimized batch execution
+mode. However, since batch is just a special case of streaming, it is also 
possible to run pipelines
+of bounded streams in regular streaming execution mode.
+
+{{< hint warning >}}
+Both DataStream API and Table API provide their own way for enabling the batch 
execution mode at the
+moment. In the near future, this will be further unified.
+{{< /hint >}}
+
+Pipelines in one API can be defined end-to-end without dependencies to the 
other API. However, it
+might be useful to mix both APIs for various reasons:
+
+- Use the table ecosystem for accessing catalogs or connecting to external 
systems easily, before
+implementing the main pipeline in DataStream API.
+- Access some of the SQL functions for stateless data normalization and 
cleansing, before
+implementing the main pipeline in DataStream API.
+- Switch to DataStream API every now and then if a more low-level operation 
(e.g. custom timer
+handling) is not present in Table API.
+
+Flink provides special bridging functionalities to make the integration with 
DataStream API as smooth
+as possible.
+
+{{< hint info >}}
+Switching between DataStream and Table API adds some conversion overhead. For 
example, internal data
+structures of the table runtime (i.e. `RowData`) that partially work on binary 
data need to be converted
+to more user-friendly data structures (i.e. `Row`). Usually, this overhead can 
be neglected but is
+mentioned here for completeness.
+{{< /hint >}}
+
+{{< top >}}
+
+Converting between DataStream and Table
+---------------------------------------
+
+Flink provides a specialized `StreamTableEnvironment` in Java and Scala for 
integrating with the
+DataStream API. Those environments extend the regular `TableEnvironment` with 
additional methods
+and take the `StreamExecutionEnvironment` used in the DataStream API as a 
parameter.
+
+{{< hint warning >}}
+Currently, the `StreamTableEnvironment` does not support batch execution mode. 
Use the regular `TableEnvironment`
+for this. Nevertheless, both bounded and unbounded streams can also be 
processed using streaming
+execution mode.
+{{< /hint >}}
+
+The following code shows an example of how to go back and forth between the 
two APIs. Column names
+and types of the `Table` are automatically derived from the `TypeInformation` 
of the `DataStream`.
+Since the DataStream API does not support changelog processing natively, the 
code assumes
+append-only/insert-only semantics during the stream-to-table and 
table-to-stream conversion.
+
+{{< tabs "6ec84aa4-d91d-4c47-9fa2-b1aae1e3cdb5" >}}
+{{< tab "Java" >}}
+```java
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+
+// create environments of both APIs
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+
+// create a DataStream
+DataStream<String> dataStream = env.fromElements("Alice", "Bob", "John");
+
+// interpret the insert-only DataStream as a Table
+Table inputTable = tableEnv.fromDataStream(dataStream);
+
+// register the Table object as a view and query it
+tableEnv.createTemporaryView("InputTable", inputTable);
+Table resultTable = tableEnv.sqlQuery("SELECT UPPER(f0) FROM InputTable");
+
+// interpret the insert-only Table as a DataStream again
+DataStream<Row> resultStream = tableEnv.toDataStream(resultTable);
+
+// add a printing sink and execute in DataStream API
+resultStream.print();
+env.execute();
+
+// prints:
+// +I[Alice]
+// +I[Bob]
+// +I[John]
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
+
+// create environments of both APIs
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tableEnv = StreamTableEnvironment.create(env)
+
+// create a DataStream
+val dataStream = env.fromElements("Alice", "Bob", "John")
+
+// interpret the insert-only DataStream as a Table
+val inputTable = tableEnv.fromDataStream(dataStream)
+
+// register the Table object as a view and query it
+tableEnv.createTemporaryView("InputTable", inputTable)
+val resultTable = tableEnv.sqlQuery("SELECT UPPER(f0) FROM InputTable")
+
+// interpret the insert-only Table as a DataStream again
+val resultStream = tableEnv.toDataStream(resultTable)
+
+// add a printing sink and execute in DataStream API
+resultStream.print()
+env.execute()
+
+// prints:
+// +I[Alice]
+// +I[Bob]
+// +I[John]
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+The complete semantics of `fromDataStream` and `toDataStream` can be found in 
the [dedicated section below](#handling-of-insert-only-streams).
+In particular, the section discusses how to influence the schema derivation 
with more complex
+and nested types. It also covers working with event-time and watermarks.
+
+Depending on the kind of query, in many cases the resulting dynamic table is a 
pipeline that does not
+only produce insert-only changes when coverting the `Table` to a `DataStream` 
but also produces retractions
+and other kinds of updates.
+
+The following example shows how updating tables can be converted. Every result 
row represents
+an entry in a changelog with a change flag that can be queried by calling 
`row.getKind()` on it. In
+the example, the second score for `Alice` creates an _update before_ (`-U`) 
and _update after_ (`+U`)
+change.
+
+{{< tabs "f45d1374-61a0-40c0-9280-702ed87d2ed0" >}}
+{{< tab "Java" >}}
+```java
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+
+// create environments of both APIs
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+
+// create a DataStream
+DataStream<Row> dataStream = env.fromElements(
+    Row.of("Alice", 12),
+    Row.of("Bob", 10),
+    Row.of("Alice", 100));
+
+// interpret the insert-only DataStream as a Table
+Table inputTable = tableEnv.fromDataStream(dataStream).as("name", "score");
+
+// register the Table object as a view and query it
+// the query contains an aggregation that produces updates
+tableEnv.createTemporaryView("InputTable", inputTable);
+Table resultTable = tableEnv.sqlQuery(
+    "SELECT name, SUM(score) FROM InputTable GROUP BY name");
+
+// interpret the updating Table as a changelog DataStream
+DataStream<Row> resultStream = tableEnv.toChangelogStream(resultTable);
+
+// add a printing sink and execute in DataStream API
+resultStream.print();
+env.execute();
+
+// prints:
+// +I[Alice, 12]
+// +I[Bob, 10]
+// -U[Alice, 12]
+// +U[Alice, 112]
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+import org.apache.flink.api.scala.typeutils.Types
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
+import org.apache.flink.types.Row
+
+// create environments of both APIs
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tableEnv = StreamTableEnvironment.create(env)
+
+// create a DataStream
+val dataStream = env.fromElements(
+  Row.of("Alice", Int.box(12)),
+  Row.of("Bob", Int.box(10)),
+  Row.of("Alice", Int.box(100))
+)(Types.ROW(Types.STRING, Types.INT))
+
+// interpret the insert-only DataStream as a Table
+val inputTable = tableEnv.fromDataStream(dataStream).as("name", "score")
+
+// register the Table object as a view and query it
+// the query contains an aggregation that produces updates
+tableEnv.createTemporaryView("InputTable", inputTable)
+val resultTable = tableEnv.sqlQuery("SELECT name, SUM(score) FROM InputTable 
GROUP BY name")
+
+// interpret the updating Table as a changelog DataStream
+val resultStream = tableEnv.toChangelogStream(resultTable)
+
+// add a printing sink and execute in DataStream API
+resultStream.print()
+env.execute()
+
+// prints:
+// +I[Alice, 12]
+// +I[Bob, 10]
+// -U[Alice, 12]
+// +U[Alice, 112]
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+The complete semantics of `fromChangelogStream` and `toChangelogStream` can be 
found in the [dedicated section below](#handling-of-insert-only-streams).
+In particular, the section discusses how to influence the schema derivation 
with more complex and nested
+types. It covers working with event-time and watermarks. It discusses how to 
declare a primary key and
+changelog mode for the input and output streams.
+
+### Dependencies and Imports
+
+Projects that combine Table API with DataStream API need to add one of the 
following bridging modules.
+They include transitive dependencies to `flink-table-api-java` or 
`flink-table-api-scala` and the
+corresponding language-specific DataStream API module.
+
+{{< tabs "0d2da52a-ee43-4d06-afde-b165517c0617" >}}
+{{< tab "Java" >}}
+```xml
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-table-api-java-bridge{{< scala_version >}}</artifactId>
+  <version>{{< version >}}</version>
+  <scope>provided</scope>
+</dependency>
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```xml
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-table-api-scala-bridge{{< scala_version >}}</artifactId>
+  <version>{{< version >}}</version>
+  <scope>provided</scope>
+</dependency>
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+The following imports are required to declare common pipelines using either 
the Java or Scala version
+of both DataStream API and Table API.
+
+{{< tabs "19a47e2d-168b-4f73-a966-abfcc8a6baca" >}}
+{{< tab "Java" >}}
+```java
+// imports for Java DataStream API
+import org.apache.flink.streaming.api.*;
+import org.apache.flink.streaming.api.environment.*;
+
+// imports for Table API with bridging to Java DataStream API
+import org.apache.flink.table.api.*;
+import org.apache.flink.table.api.bridge.java.*;
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+// imports for Scala DataStream API
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala._
+
+// imports for Table API with bridging to Scala DataStream API
+import org.apache.flink.table.api._
+import org.apache.flink.table.api.bridge.scala._
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+### Configuration
+
+The `TableEnvironment` will adopt all configuration options from the passed 
`StreamExecutionEnvironment`.
+However, it cannot be guaranteed that further changes to the configuration of 
`StreamExecutionEnvironment`
+are propagated to the `StreamTableEnvironment` after its instantiation. Also, 
the reverse propagation
+of options from Table API to DataStream API is not supported.
+
+We recommend to set all configuration options in DataStream API early before 
switching to Table API.
+
+{{< tabs "47a32814-abea-11eb-8529-0242ac130003" >}}
+{{< tab "Java" >}}
+```java
+import java.time.ZoneId;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+
+// create Java DataStream API
+
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// set various configuration early
+
+env.setMaxParallelism(256);
+
+env.getConfig().addDefaultKryoSerializer(MyCustomType.class, 
CustomKryoSerializer.class);
+
+env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
+
+// then switch to Java Table API
+
+StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+
+// set configuration early
+
+tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Europe/Berlin"));
+
+// start defining your pipelines in both APIs...
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+import java.time.ZoneId
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.CheckpointingMode
+import org.apache.flink.table.api.bridge.scala._
+
+// create Scala DataStream API
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+// set various configuration early
+
+env.setMaxParallelism(256)
+
+env.getConfig.addDefaultKryoSerializer(classOf[MyCustomType], 
classOf[CustomKryoSerializer])
+
+env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
+
+// then switch to Scala Table API
+
+val tableEnv = StreamTableEnvironment.create(env)
+
+// set configuration early
+
+tableEnv.getConfig.setLocalTimeZone(ZoneId.of("Europe/Berlin"))
+
+// start defining your pipelines in both APIs...
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+### Execution Behavior
+
+Both APIs provide methods to execute pipelines. In other words: if requested, 
they compile a job
+graph that will be submitted to the cluster and triggered for execution. 
Results will be streamed to
+the declared sinks.
+
+Usually, both APIs mark such behavior with the term `execute` in method names. 
However, the execution
+behavior is slightly different between Table API and DataStream API.
+
+**DataStream API**
+
+The DataStream API's `StreamExecutionEnvironment` acts as kind of a _builder 
pattern_ that can construct
+a complex pipeline. The pipeline possibly splits into multiple branches that 
might or might not end with
+a sink.
+
+At least one sink must be defined otherwise the following exception is thrown:
+```
+java.lang.IllegalStateException: No operators defined in streaming topology. 
Cannot execute.
+```
+
+`StreamExecutionEnvironment.execute()` submits the entire constructed pipeline 
and clears the builder
+afterwards. In other words: no sources and sinks are declared anymore and a 
new pipeline can be
+appended. Thus, every DataStream program usually ends with a call to 
`StreamExecutionEnvironment.execute()`.

Review comment:
       I added `added to the builder`. I find this "builder-like" concept 
important to understand for the user.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to