twalthr commented on a change in pull request #15837: URL: https://github.com/apache/flink/pull/15837#discussion_r627266914
########## File path: docs/content/docs/dev/table/data_stream_api.md ########## @@ -0,0 +1,2106 @@ +--- +title: "DataStream API Integration" +weight: 3 +type: docs +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# DataStream API Integration + +{{< hint info >}} +Tthis page only discusses the integration with DataStream API in JVM languages such as Java or Scala. +For Python, see the [Python API]({{< ref "docs/dev/python/overview" >}}) area. +{{< /hint >}} + +Both Table & SQL API and DataStream API are equality important when it comes to defining a data +processing pipeline. + +The DataStream API offers the primitives of stream processing (namely time, state, and dataflow +management) in a rather low-level imperative programming API. The Table & SQL API abstracts many +internals and provides a structured and declarative API. + +Both APIs can work with bounded *and* unbounded streams. + +Bounded streams need to be managed when processing historical data. Unbounded streams occur +in real-time processing scenarios that might be initialized with historical data first. + +For efficient execution, both APIs offer processing bounded streams in an optimized batch execution +mode. However, since batch is just a special case of streaming, it is also possible to run pipelines +of bounded streams in regular streaming execution mode. + +{{< hint warning >}} +Both DataStream API and Table API provide their own way for enabling the batch execution mode at the +moment. In the near future, this will be further unified. +{{< /hint >}} + +Pipelines in one API can be defined end-to-end without dependencies to the other API. However, it +might be useful to mix both APIs for various reasons: + +- Use the table ecosystem for accessing catalogs or connecting to external systems easily, before +implementing the main pipeline in DataStream API. +- Access some of the SQL functions for stateless data normalization and cleansing, before +implementing the main pipeline in DataStream API. +- Switch to DataStream API every now and then if a more low-level operation (e.g. custom timer +handling) is not present in Table API. + +Flink provides special bridging functionalities to make the integration with DataStream API as smooth +as possible. + +{{< hint info >}} +Switching between DataStream and Table API adds some conversion overhead. For example, internal data +structures of the table runtime (i.e. `RowData`) that partially work on binary data need to be converted +to more user-friendly data structures (i.e. `Row`). Usually, this overhead can be neglected but is +mentioned here for completeness. +{{< /hint >}} + +{{< top >}} + +Converting between DataStream and Table +--------------------------------------- + +Flink provides a specialized `StreamTableEnvironment` in Java and Scala for integrating with the +DataStream API. Those environments extend the regular `TableEnvironment` with additional methods +and take the `StreamExecutionEnvironment` used in the DataStream API as a parameter. + +{{< hint warning >}} +Currently, the `StreamTableEnvironment` does not support batch execution mode. Use the regular `TableEnvironment` +for this. Nevertheless, both bounded and unbounded streams can also be processed using streaming +execution mode. +{{< /hint >}} + +The following code shows an example of how to go back and forth between the two APIs. Column names +and types of the `Table` are automatically derived from the `TypeInformation` of the `DataStream`. +Since the DataStream API does not support changelog processing natively, the code assumes +append-only/insert-only semantics during the stream-to-table and table-to-stream conversion. + +{{< tabs "6ec84aa4-d91d-4c47-9fa2-b1aae1e3cdb5" >}} +{{< tab "Java" >}} +```java +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.types.Row; + +// create environments of both APIs +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); + +// create a DataStream +DataStream<String> dataStream = env.fromElements("Alice", "Bob", "John"); + +// interpret the insert-only DataStream as a Table +Table inputTable = tableEnv.fromDataStream(dataStream); + +// register the Table object as a view and query it +tableEnv.createTemporaryView("InputTable", inputTable); +Table resultTable = tableEnv.sqlQuery("SELECT UPPER(f0) FROM InputTable"); + +// interpret the insert-only Table as a DataStream again +DataStream<Row> resultStream = tableEnv.toDataStream(resultTable); + +// add a printing sink and execute in DataStream API +resultStream.print(); +env.execute(); + +// prints: +// +I[Alice] +// +I[Bob] +// +I[John] +``` +{{< /tab >}} +{{< tab "Scala" >}} +```scala +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment + +// create environments of both APIs +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tableEnv = StreamTableEnvironment.create(env) + +// create a DataStream +val dataStream = env.fromElements("Alice", "Bob", "John") + +// interpret the insert-only DataStream as a Table +val inputTable = tableEnv.fromDataStream(dataStream) + +// register the Table object as a view and query it +tableEnv.createTemporaryView("InputTable", inputTable) +val resultTable = tableEnv.sqlQuery("SELECT UPPER(f0) FROM InputTable") + +// interpret the insert-only Table as a DataStream again +val resultStream = tableEnv.toDataStream(resultTable) + +// add a printing sink and execute in DataStream API +resultStream.print() +env.execute() + +// prints: +// +I[Alice] +// +I[Bob] +// +I[John] +``` +{{< /tab >}} +{{< /tabs >}} + +The complete semantics of `fromDataStream` and `toDataStream` can be found in the [dedicated section below](#handling-of-insert-only-streams). +In particular, the section discusses how to influence the schema derivation with more complex +and nested types. It also covers working with event-time and watermarks. + +Depending on the kind of query, in many cases the resulting dynamic table is a pipeline that does not +only produce insert-only changes when coverting the `Table` to a `DataStream` but also produces retractions +and other kinds of updates. + +The following example shows how updating tables can be converted. Every result row represents +an entry in a changelog with a change flag that can be queried by calling `row.getKind()` on it. In +the example, the second score for `Alice` creates an _update before_ (`-U`) and _update after_ (`+U`) +change. + +{{< tabs "f45d1374-61a0-40c0-9280-702ed87d2ed0" >}} +{{< tab "Java" >}} +```java +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.types.Row; + +// create environments of both APIs +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); + +// create a DataStream +DataStream<Row> dataStream = env.fromElements( + Row.of("Alice", 12), + Row.of("Bob", 10), + Row.of("Alice", 100)); + +// interpret the insert-only DataStream as a Table +Table inputTable = tableEnv.fromDataStream(dataStream).as("name", "score"); + +// register the Table object as a view and query it +// the query contains an aggregation that produces updates +tableEnv.createTemporaryView("InputTable", inputTable); +Table resultTable = tableEnv.sqlQuery( + "SELECT name, SUM(score) FROM InputTable GROUP BY name"); + +// interpret the updating Table as a changelog DataStream +DataStream<Row> resultStream = tableEnv.toChangelogStream(resultTable); + +// add a printing sink and execute in DataStream API +resultStream.print(); +env.execute(); + +// prints: +// +I[Alice, 12] +// +I[Bob, 10] +// -U[Alice, 12] +// +U[Alice, 112] +``` +{{< /tab >}} +{{< tab "Scala" >}} +```scala +import org.apache.flink.api.scala.typeutils.Types +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment +import org.apache.flink.types.Row + +// create environments of both APIs +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tableEnv = StreamTableEnvironment.create(env) + +// create a DataStream +val dataStream = env.fromElements( + Row.of("Alice", Int.box(12)), + Row.of("Bob", Int.box(10)), + Row.of("Alice", Int.box(100)) +)(Types.ROW(Types.STRING, Types.INT)) + +// interpret the insert-only DataStream as a Table +val inputTable = tableEnv.fromDataStream(dataStream).as("name", "score") + +// register the Table object as a view and query it +// the query contains an aggregation that produces updates +tableEnv.createTemporaryView("InputTable", inputTable) +val resultTable = tableEnv.sqlQuery("SELECT name, SUM(score) FROM InputTable GROUP BY name") + +// interpret the updating Table as a changelog DataStream +val resultStream = tableEnv.toChangelogStream(resultTable) + +// add a printing sink and execute in DataStream API +resultStream.print() +env.execute() + +// prints: +// +I[Alice, 12] +// +I[Bob, 10] +// -U[Alice, 12] +// +U[Alice, 112] +``` +{{< /tab >}} +{{< /tabs >}} + +The complete semantics of `fromChangelogStream` and `toChangelogStream` can be found in the [dedicated section below](#handling-of-insert-only-streams). +In particular, the section discusses how to influence the schema derivation with more complex and nested +types. It covers working with event-time and watermarks. It discusses how to declare a primary key and +changelog mode for the input and output streams. + +### Dependencies and Imports + +Projects that combine Table API with DataStream API need to add one of the following bridging modules. +They include transitive dependencies to `flink-table-api-java` or `flink-table-api-scala` and the +corresponding language-specific DataStream API module. + +{{< tabs "0d2da52a-ee43-4d06-afde-b165517c0617" >}} +{{< tab "Java" >}} +```xml +<dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-java-bridge{{< scala_version >}}</artifactId> + <version>{{< version >}}</version> + <scope>provided</scope> +</dependency> +``` +{{< /tab >}} +{{< tab "Scala" >}} +```xml +<dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-scala-bridge{{< scala_version >}}</artifactId> + <version>{{< version >}}</version> + <scope>provided</scope> +</dependency> +``` +{{< /tab >}} +{{< /tabs >}} + +The following imports are required to declare common pipelines using either the Java or Scala version +of both DataStream API and Table API. + +{{< tabs "19a47e2d-168b-4f73-a966-abfcc8a6baca" >}} +{{< tab "Java" >}} +```java +// imports for Java DataStream API +import org.apache.flink.streaming.api.*; +import org.apache.flink.streaming.api.environment.*; + +// imports for Table API with bridging to Java DataStream API +import org.apache.flink.table.api.*; +import org.apache.flink.table.api.bridge.java.*; +``` +{{< /tab >}} +{{< tab "Scala" >}} +```scala +// imports for Scala DataStream API +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.scala._ + +// imports for Table API with bridging to Scala DataStream API +import org.apache.flink.table.api._ +import org.apache.flink.table.api.bridge.scala._ +``` +{{< /tab >}} +{{< /tabs >}} + +### Configuration + +The `TableEnvironment` will adopt all configuration options from the passed `StreamExecutionEnvironment`. +However, it cannot be guaranteed that further changes to the configuration of `StreamExecutionEnvironment` +are propagated to the `StreamTableEnvironment` after its instantiation. Also, the reverse propagation +of options from Table API to DataStream API is not supported. + +We recommend to set all configuration options in DataStream API early before switching to Table API. + +{{< tabs "47a32814-abea-11eb-8529-0242ac130003" >}} +{{< tab "Java" >}} +```java +import java.time.ZoneId; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; + +// create Java DataStream API + +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + +// set various configuration early + +env.setMaxParallelism(256); + +env.getConfig().addDefaultKryoSerializer(MyCustomType.class, CustomKryoSerializer.class); + +env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); + +// then switch to Java Table API + +StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); + +// set configuration early + +tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Europe/Berlin")); + +// start defining your pipelines in both APIs... +``` +{{< /tab >}} +{{< tab "Scala" >}} +```scala +import java.time.ZoneId +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.CheckpointingMode +import org.apache.flink.table.api.bridge.scala._ + +// create Scala DataStream API + +val env = StreamExecutionEnvironment.getExecutionEnvironment + +// set various configuration early + +env.setMaxParallelism(256) + +env.getConfig.addDefaultKryoSerializer(classOf[MyCustomType], classOf[CustomKryoSerializer]) + +env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) + +// then switch to Scala Table API + +val tableEnv = StreamTableEnvironment.create(env) + +// set configuration early + +tableEnv.getConfig.setLocalTimeZone(ZoneId.of("Europe/Berlin")) + +// start defining your pipelines in both APIs... +``` +{{< /tab >}} +{{< /tabs >}} + +### Execution Behavior + +Both APIs provide methods to execute pipelines. In other words: if requested, they compile a job +graph that will be submitted to the cluster and triggered for execution. Results will be streamed to +the declared sinks. + +Usually, both APIs mark such behavior with the term `execute` in method names. However, the execution +behavior is slightly different between Table API and DataStream API. + +**DataStream API** + +The DataStream API's `StreamExecutionEnvironment` acts as kind of a _builder pattern_ that can construct +a complex pipeline. The pipeline possibly splits into multiple branches that might or might not end with +a sink. + +At least one sink must be defined otherwise the following exception is thrown: +``` +java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute. +``` + +`StreamExecutionEnvironment.execute()` submits the entire constructed pipeline and clears the builder +afterwards. In other words: no sources and sinks are declared anymore and a new pipeline can be +appended. Thus, every DataStream program usually ends with a call to `StreamExecutionEnvironment.execute()`. +Alternatively, `DataStream.executeAndCollect()` implictly defines a sink for streaming the results to +the local client and only executes the current branch. + +**Table API** + +In Table API, branching pipelines are only supported within a `StatementSet` where each branch must +declare a final sink. Both `TableEnvironment` and also `StreamTableEnvironment` do not offer a dedicated +general `execute()` method. Instead, they offer methods for submitting a single source-to-sink +pipeline or a statement set: + +```java +// execute with explicit sink +tableEnv.from("InputTable").executeInsert("OutputTable") + +tableEnv.executeSql("INSERT INTO OutputTable SELECT * FROM InputTable") + +tableEnv.createStatementSet() + .addInsert("OutputTable", tableEnv.from("InputTable")) + .addInsert("OutputTable2", tableEnv.from("InputTable")) + .execute() + +tableEnv.createStatementSet() + .addInsertSql("INSERT INTO OutputTable SELECT * FROM InputTable") + .addInsertSql("INSERT INTO OutputTable2 SELECT * FROM InputTable") + .execute() + +// execute with implicit local sink + +tableEnv.from("InputTable").execute().print() + +tableEnv.executeSql("SELECT * FROM InputTable").print() +``` + +In order to combine both execution behaviors, every call to `StreamTableEnvironment.toDataStream` +or `StreamTableEnvironment.toChangelogStream` will materialize (i.e. compile) the Table API sub-pipeline +and insert it into the DataStream API pipeline builder. This means that `StreamExecutionEnvironment.execute()` +or `DataStream.executeAndCollect` must be called afterwards. An execution in Table API will not trigger +these "external parts". + +```java +// (1) only adds a branch with a sink to the StreamExecutionEnvironment +tableEnv.toDataStream(table).print() + +// (2) executes a Table API end-to-end pipeline as a Flink job and prints locally, +// (1) is still not executed +table.execute().print() + +// executes the DataStream API pipeline with the sink defined in (1) as a Flink job, +// (2) has no effect anymore +env.execute() +``` + +{{< top >}} + +Handling of (Insert-Only) Streams +--------------------------------- + +A `StreamTableEnvironment` offers the following methods to convert from and to DataStream API: + +- `fromDataStream(DataStream)`: Interprets a stream of insert-only changes and of arbitrary type as +a table. Event-time and watermarks are not propagated by default. + +- `fromDataStream(DataStream, Schema)`: Interprets a stream of insert-only changes and of arbitrary +type as a table. The optional schema allows to enrich column data types and add time attributes, +watermarks strategies, other computed columns, or primary keys. + +- `createTemporaryView(String, DataStream)`: Registers the stream under a name to access it in SQL. +It is a shortcut for `createTemporaryView(String, fromDataStream(DataStream))`. + +- `createTemporaryView(String, DataStream, Schema)`: Registers the stream under a name to access it in SQL. +It is a shortcut for `createTemporaryView(String, fromDataStream(DataStream, Schema))`. + +- `toDataStream(DataStream)`: Converts a table into a stream of insert-only changes. The default +stream record type is `org.apache.flink.types.Row`. A single rowtime attribute column is written +back into the DataStream API's record. Watermarks are propagated as well. + +- `toDataStream(DataStream, AbstractDataType)`: Converts a table into a stream of insert-only changes. +This method accepts a data type to express the desired stream record type. The planner might insert +implicit casts and reorders columns to map columns to fields of the (possibly nested) data type. + +- `toDataStream(DataStream, Class)`: A shortcut for `toDataStream(DataStream, DataTypes.of(Class))` +to quickly create the desired data type reflectively. + +From a Table API's perspective, converting from and to DataStream API is similar to reading from or +writing to a virtual table connector that has been defined using a [`CREATE TABLE` DDL]({{< ref "docs/dev/table/sql/create" >}}#create-table) +in SQL. + +The schema part in the virtual `CREATE TABLE name (schema) WITH (options)` statement can be automatically +derived from the DataStream's type information, enriched, or entirely defined manually using +`org.apache.flink.table.api.Schema`. + +The virtual DataStream table connector exposes the following metadata for every row: + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 25%">Key</th> + <th class="text-center" style="width: 30%">Data Type</th> + <th class="text-center" style="width: 40%">Description</th> + <th class="text-center" style="width: 5%">R/W</th> + </tr> + </thead> + <tbody> + <tr> + <td><code>rowtime</code></td> + <td><code>TIMESTAMP_LTZ(3) NOT NULL</code></td> + <td>Stream record's timestamp.</td> + <td><code>R/W</code></td> + </tr> + </tbody> +</table> + +The virtual DataStream table source implements [`SupportsSourceWatermark`]({{< ref "docs/dev/table/sourcesSinks" >}}#source-abilities), +thus, enabling calling the `SOURCE_WATERMARK()` built-in function as a watermark strategy to adopt +watermarks from the DataStream API. + +### Examples for `fromDataStream` + +The following code shows how to use `fromDataStream` for different scenarios. + +{{< tabs "079cdf25-21ef-4393-ad69-623510027a1b" >}} +{{< tab "Java" >}} +```java +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.Table; +import java.time.Instant; + +// some example POJO +public static class User { + public String name; + + public Integer score; + + public Instant event_time; + + // default constructor for DataStream API + public User() {} + + // fully assigning constructor for Table API + public User(String name, Integer score, Instant event_time) { + this.name = name; + this.score = score; + this.event_time = event_time; + } +} + +// create a DataStream +DataStream<User> dataStream = + env.fromElements( + new User("Alice", 4, Instant.ofEpochMilli(1000)), + new User("Bob", 6, Instant.ofEpochMilli(1001)), + new User("Alice", 10, Instant.ofEpochMilli(1002))); + + +// === EXAMPLE 1 === + +// derive all physical columns automatically + +Table table = tableEnv.fromDataStream(dataStream); +table.printSchema(); +// prints: +// ( +// `name` STRING, +// `score` INT, +// `event_time` TIMESTAMP_LTZ(9) +// ) + + +// === EXAMPLE 2 === + +// derive all physical columns automatically +// but add computed columns (in this case for creating a proctime attribute column) + +Table table = tableEnv.fromDataStream( + dataStream, + Schema.newBuilder() + .columnByExpression("proc_time", "PROCTIME()") + .build()); +table.printSchema(); +// prints: +// ( +// `name` STRING, +// `score` INT NOT NULL, +// `event_time` TIMESTAMP_LTZ(9), +// `proc_time` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME* AS PROCTIME() +//) + + +// === EXAMPLE 3 === + +// derive all physical columns automatically +// but add computed columns (in this case for creating a rowtime attribute column) +// and a custom watermark strategy + +Table table = + tableEnv.fromDataStream( + dataStream, + Schema.newBuilder() + .columnByExpression("rowtime", "CAST(event_time AS TIMESTAMP_LTZ(3))") + .watermark("rowtime", "rowtime - INTERVAL '10' SECOND") + .build()); +table.printSchema(); +// prints: +// ( +// `name` STRING, +// `score` INT, +// `event_time` TIMESTAMP_LTZ(9), +// `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* AS CAST(event_time AS TIMESTAMP_LTZ(3)), +// WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS rowtime - INTERVAL '10' SECOND +// ) + + +// === EXAMPLE 4 === + +// derive all physical columns automatically +// but access the stream record's timestamp for creating a rowtime attribute column +// also rely on the watermarka generated in the DataStream API + +// we assume that a watermark strategy has been defined for `dataStream` before +// (not part of this example) +Table table = + tableEnv.fromDataStream( + dataStream, + Schema.newBuilder() + .columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)") + .watermark("rowtime", "SOURCE_WATERMARK()") + .build()); +table.printSchema(); +// prints: +// ( +// `name` STRING, +// `score` INT, +// `event_time` TIMESTAMP_LTZ(9), +// `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* METADATA, +// WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS SOURCE_WATERMARK() +// ) + + +// === EXAMPLE 5 === + +// define physical columns manually +// in this example, +// - we can reduce the default precision of timestamps from 9 to 3 +// - we also project the columns and put `event_time` to the beginning + +Table table = + tableEnv.fromDataStream( + dataStream, + Schema.newBuilder() + .column("event_time", "TIMESTAMP_LTZ(3)") + .column("name", "STRING") + .column("score", "INT") + .watermark("event_time", "SOURCE_WATERMARK()") + .build()); +table.printSchema(); +// prints: +// ( +// `event_time` TIMESTAMP_LTZ(3) *ROWTIME*, +// `name` VARCHAR(200), +// `score` INT +// ) +// note: the watermark strategy is not shown due to the inserted column reordering projection +``` +{{< /tab >}} +{{< tab "Scala" >}} +```scala +import org.apache.flink.api.scala._ +import java.time.Instant; + +// some example case class +case class User(name: String, score: java.lang.Integer, event_time: java.time.Instant) + +// create a DataStream +val dataStream = env.fromElements( + User("Alice", 4, Instant.ofEpochMilli(1000)), + User("Bob", 6, Instant.ofEpochMilli(1001)), + User("Alice", 10, Instant.ofEpochMilli(1002))) + + +// === EXAMPLE 1 === + +// derive all physical columns automatically + +val table = tableEnv.fromDataStream(dataStream) +table.printSchema() +// prints: +// ( +// `name` STRING, +// `score` INT, +// `event_time` TIMESTAMP_LTZ(9) +// ) + + +// === EXAMPLE 2 === + +// derive all physical columns automatically +// but add computed columns (in this case for creating a proctime attribute column) + +val table = tableEnv.fromDataStream( + dataStream, + Schema.newBuilder() + .columnByExpression("proc_time", "PROCTIME()") + .build()) +table.printSchema() +// prints: +// ( +// `name` STRING, +// `score` INT NOT NULL, +// `event_time` TIMESTAMP_LTZ(9), +// `proc_time` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME* AS PROCTIME() +//) + + +// === EXAMPLE 3 === + +// derive all physical columns automatically +// but add computed columns (in this case for creating a rowtime attribute column) +// and a custom watermark strategy + +val table = + tableEnv.fromDataStream( + dataStream, + Schema.newBuilder() + .columnByExpression("rowtime", "CAST(event_time AS TIMESTAMP_LTZ(3))") + .watermark("rowtime", "rowtime - INTERVAL '10' SECOND") + .build()) +table.printSchema() +// prints: +// ( +// `name` STRING, +// `score` INT, +// `event_time` TIMESTAMP_LTZ(9), +// `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* AS CAST(event_time AS TIMESTAMP_LTZ(3)), +// WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS rowtime - INTERVAL '10' SECOND +// ) + + +// === EXAMPLE 4 === + +// derive all physical columns automatically +// but access the stream record's timestamp for creating a rowtime attribute column +// also rely on the watermarka generated in the DataStream API + +// we assume that a watermark strategy has been defined for `dataStream` before +// (not part of this example) +val table = + tableEnv.fromDataStream( + dataStream, + Schema.newBuilder() + .columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)") + .watermark("rowtime", "SOURCE_WATERMARK()") + .build()) +table.printSchema() +// prints: +// ( +// `name` STRING, +// `score` INT, +// `event_time` TIMESTAMP_LTZ(9), +// `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* METADATA, +// WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS SOURCE_WATERMARK() +// ) + + +// === EXAMPLE 5 === + +// define physical columns manually +// in this example, +// - we can reduce the default precision of timestamps from 9 to 3 +// - we also project the columns and put `event_time` to the beginning + +val table = + tableEnv.fromDataStream( + dataStream, + Schema.newBuilder() + .column("event_time", "TIMESTAMP_LTZ(3)") + .column("name", "STRING") + .column("score", "INT") + .watermark("event_time", "SOURCE_WATERMARK()") + .build()) +table.printSchema() +// prints: +// ( +// `event_time` TIMESTAMP_LTZ(3) *ROWTIME*, +// `name` VARCHAR(200), +// `score` INT +// ) +// note: the watermark strategy is not shown due to the inserted column reordering projection +``` +{{< /tab >}} +{{< /tabs >}} + +Example 1 illustrates a simple use case when no time-based operations are needed. + +Example 4 is the most common use case when time-based operations such as windows or interval +joins should be part of the pipeline. Example 2 is the most common use case when these time-based +operations should work in processing time. + +Example 5 fully relies on the declaration of the user. This can be useful to replace generic types +from DataStream API (which would be `RAW` in Table API) with proper data types. Since `DataType` is +richer than `TypeInformation`, we can enable immutable POJOs and other complex data structures easily. +The following example in Java shows what is possible: + +```java +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.Table; + +// DataStream API does not support immutable POJOs yet +// they will result in a generic type that is a RAW type in Table API by default Review comment: Actually, this is why the `TypeInformation` section is in this document. I will try to improve this part again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
