This is an automated email from the ASF dual-hosted git repository. hxb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit b5ca9eeb3ac5502c50f990bf6a2f8530a7fb0232 Author: huangxingbo <[email protected]> AuthorDate: Wed Jul 28 00:16:28 2021 +0800 [FLINK-22911][python][doc] Add documentation how to interact with DataStream API in PyFlink This closes #16611. --- .../docs/dev/python/datastream/data_types.md | 1 + .../dev/python/table/conversion_of_data_stream.md | 25 + docs/content.zh/docs/dev/table/data_stream_api.md | 612 ++++++++++++++++++- .../docs/dev/python/datastream/data_types.md | 1 + .../dev/python/table/conversion_of_data_stream.md | 25 + docs/content/docs/dev/table/data_stream_api.md | 651 ++++++++++++++++++++- 6 files changed, 1285 insertions(+), 30 deletions(-) diff --git a/docs/content.zh/docs/dev/python/datastream/data_types.md b/docs/content.zh/docs/dev/python/datastream/data_types.md index 6a53650..0beef71 100644 --- a/docs/content.zh/docs/dev/python/datastream/data_types.md +++ b/docs/content.zh/docs/dev/python/datastream/data_types.md @@ -110,6 +110,7 @@ The table below shows the types supported now and how to define them: |`Types.STRING()` | `str` | `java.lang.String` | |`Types.BIG_INT()` | `int` | `java.math.BigInteger` | |`Types.BIG_DEC()` | `decimal.Decimal` | `java.math.BigDecimal` | +|`Types.INSTANT()` | `pyflink.common.time.Instant` | `java.time.Instant` | |`Types.TUPLE()` | `tuple` | `org.apache.flink.api.java.tuple.Tuple0` ~ `org.apache.flink.api.java.tuple.Tuple25` | |`Types.ROW()` | `pyflink.common.Row` | `org.apache.flink.types.Row` | |`Types.ROW_NAMED()` | `pyflink.common.Row` | `org.apache.flink.types.Row` | diff --git a/docs/content.zh/docs/dev/python/table/conversion_of_data_stream.md b/docs/content.zh/docs/dev/python/table/conversion_of_data_stream.md new file mode 100644 index 0000000..8e5813e --- /dev/null +++ b/docs/content.zh/docs/dev/python/table/conversion_of_data_stream.md @@ -0,0 +1,25 @@ +--- +title: "Table 和 DataStream 互转" +weight: 42 +type: docs +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +<meta http-equiv="refresh" content="0; url={{< ref "docs/dev/table/data_stream_api" >}} "/> diff --git a/docs/content.zh/docs/dev/table/data_stream_api.md b/docs/content.zh/docs/dev/table/data_stream_api.md index dfd00aa..1bbcb6c 100644 --- a/docs/content.zh/docs/dev/table/data_stream_api.md +++ b/docs/content.zh/docs/dev/table/data_stream_api.md @@ -75,7 +75,7 @@ mentioned here for completeness. Converting between DataStream and Table --------------------------------------- -Flink provides a specialized `StreamTableEnvironment` in Java and Scala for integrating with the +Flink provides a specialized `StreamTableEnvironment` for integrating with the DataStream API. Those environments extend the regular `TableEnvironment` with additional methods and take the `StreamExecutionEnvironment` used in the DataStream API as a parameter. @@ -123,9 +123,9 @@ resultStream.print(); env.execute(); // prints: -// +I[Alice] -// +I[Bob] -// +I[John] +// +I[ALICE] +// +I[BOB] +// +I[JOHN] ``` {{< /tab >}} {{< tab "Scala" >}} @@ -156,9 +156,42 @@ resultStream.print() env.execute() // prints: -// +I[Alice] -// +I[Bob] -// +I[John] +// +I[ALICE] +// +I[BOB] +// +I[JOHN] +``` +{{< /tab >}} +{{< tab "Python" >}} +```python +from pyflink.datastream import StreamExecutionEnvironment +from pyflink.table import StreamTableEnvironment +from pyflink.common.typeinfo import Types + +env = StreamExecutionEnvironment.get_execution_environment() +t_env = StreamTableEnvironment.create(env) + +# create a DataStream +ds = env.from_collection(["Alice", "Bob", "John"], Types.STRING()) + +# interpret the insert-only DataStream as a Table +t = t_env.from_data_stream(ds) + +# register the Table object as a view and query it +t_env.create_temporary_view("InputTable", t) +res_table = t_env.sql_query("SELECT UPPER(f0) FROM InputTable") + +# interpret the insert-only Table as a DataStream again +res_ds = t_env.to_data_stream(res_table) + +# add a printing sink and execute in DataStream API +res_ds.print() + +env.execute() + +# prints: +# +I[ALICE] +# +I[BOB] +# +I[JOHN] ``` {{< /tab >}} {{< /tabs >}} @@ -264,6 +297,43 @@ env.execute() // +U[Alice, 112] ``` {{< /tab >}} +{{< tab "Python" >}} +```python +from pyflink.datastream import StreamExecutionEnvironment +from pyflink.table import StreamTableEnvironment +from pyflink.common.typeinfo import Types + +#create environments of both APIs +env = StreamExecutionEnvironment.get_execution_environment() +t_env = StreamTableEnvironment.create(env) + +# create a DataStream +ds = env.from_collection([("Alice", 12), ("Bob", 10), ("Alice", 100)], + type_info=Types.ROW_NAMED( + ["a", "b"], + [Types.STRING(), Types.INT()])) + +input_table = t_env.from_data_stream(ds).alias("name", "score") + +# register the Table object as a view and query it +# the query contains an aggregation that produces updates +t_env.create_temporary_view("InputTable", input_table) +res_table = t_env.sql_query("SELECT name, SUM(score) FROM InputTable GROUP BY name") + +# interpret the updating Table as a changelog DataStream +res_stream = t_env.to_changelog_stream(res_table) + +# add a printing sink and execute in DataStream API +res_stream.print() +env.execute() + +# prints: +# +I[Alice, 12] +# +I[Bob, 10] +# -U[Alice, 12] +# +U[Alice, 112] +``` +{{< /tab >}} {{< /tabs >}} The complete semantics of `fromChangelogStream` and `toChangelogStream` can be found in the [dedicated section below](#handling-of-insert-only-streams). @@ -399,6 +469,32 @@ tableEnv.getConfig.setLocalTimeZone(ZoneId.of("Europe/Berlin")) // start defining your pipelines in both APIs... ``` {{< /tab >}} +{{< tab "Python" >}} +```python +from pyflink.datastream import StreamExecutionEnvironment +from pyflink.table import StreamTableEnvironment +from pyflink.datastream.checkpointing_mode import CheckpointingMode + + +# create Python DataStream API +env = StreamExecutionEnvironment.get_execution_environment() + +# set various configuration early +env.set_max_parallelism(256) + +env.get_config().add_default_kryo_serializer("type_class_name", "serializer_class_name") + +env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode.EXACTLY_ONCE) + +# then switch to Python Table API +t_env = StreamTableEnvironment.create(env) + +# set configuration early +t_env.get_config().set_local_timezone("Europe/Berlin") + +# start defining your pipelines in both APIs... +``` +{{< /tab >}} {{< /tabs >}} ### Execution Behavior @@ -818,6 +914,133 @@ table.printSchema() // note: the watermark strategy is not shown due to the inserted column reordering projection ``` {{< /tab >}} +{{< tab "Python" >}} +```python +from pyflink.common.time import Instant +from pyflink.common.types import Row +from pyflink.common.typeinfo import Types +from pyflink.datastream import StreamExecutionEnvironment +from pyflink.table import StreamTableEnvironment, Schema + +env = StreamExecutionEnvironment.get_execution_environment() +t_env = StreamTableEnvironment.create(env) +ds = env.from_collection([ + Row("Alice", 12, Instant.of_epoch_milli(1000)), + Row("Bob", 5, Instant.of_epoch_milli(1001)), + Row("Alice", 10, Instant.of_epoch_milli(1002))], + type_info=Types.ROW_NAMED(['name', 'score', 'event_time'], [Types.STRING(), Types.INT(), Types.INSTANT()])) + +# === EXAMPLE 1 === + +# derive all physical columns automatically + +table = t_env.from_data_stream(ds) +table.print_schema() + +# prints: +# ( +# `name` STRING, +# `score` INT, +# `event_time` TIMESTAMP_LTZ(9) +# ) + + +# === EXAMPLE 2 === + +# derive all physical columns automatically +# but add computed columns (in this case for creating a proctime attribute column) + +table = t_env.from_data_stream( + ds, + Schema.new_builder() + .column_by_expression("proc_time", "PROCTIME()") + .build()) +table.print_schema() + +# prints: +# ( +# `name` STRING, +# `score` INT, +# `event_time` TIMESTAMP_LTZ(9), +# `proc_time` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME* AS PROCTIME() +# ) + +# === EXAMPLE 3 === + +# derive all physical columns automatically +# but add computed columns (in this case for creating a rowtime attribute column) +# and a custom watermark strategy + +table = t_env.from_data_stream( + ds, + Schema.new_builder() + .column_by_expression("rowtime", "CAST(event_time AS TIMESTAMP_LTZ(3))") + .watermark("rowtime", "rowtime - INTERVAL '10' SECOND") + .build()) +table.print_schema() + +# prints: +# ( +# `name` STRING, +# `score` INT, +# `event_time` TIMESTAMP_LTZ(9), +# `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* AS CAST(event_time AS TIMESTAMP_LTZ(3)), +# WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS rowtime - INTERVAL '10' SECOND +# ) + +# === EXAMPLE 4 === + +# derive all physical columns automatically +# but access the stream record's timestamp for creating a rowtime attribute column +# also rely on the watermarks generated in the DataStream API + +# we assume that a watermark strategy has been defined for `dataStream` before +# (not part of this example) +table = t_env.from_data_stream( + ds, + Schema.new_builder() + .column_by_metadata("rowtime", "TIMESTAMP_LTZ(3)") + .watermark("rowtime", "SOURCE_WATERMARK()") + .build()) +table.print_schema() + +# prints: +# ( +# `name` STRING, +# `score` INT, +# `event_time` TIMESTAMP_LTZ(9), +# `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* METADATA, +# WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS SOURCE_WATERMARK() +# ) + + + +# === EXAMPLE 5 === + +# define physical columns manually +# in this example, +# - we can reduce the default precision of timestamps from 9 to 3 +# - we also project the columns and put `event_time` to the beginning + +table = t_env.from_data_stream( + ds, + Schema.new_builder() + .column("event_time", "TIMESTAMP_LTZ(3)") + .column("name", "STRING") + .column("score", "INT") + .watermark("event_time", "SOURCE_WATERMARK()") + .build()) +table.print_schema() + +# prints: +# ( +# `event_time` TIMESTAMP_LTZ(3) *ROWTIME*, +# `name` STRING, +# `score` INT +# ) +# note: the watermark strategy is not shown due to the inserted column reordering projection +``` +{{< /tab >}} {{< /tabs >}} Example 1 illustrates a simple use case when no time-based operations are needed. @@ -1052,6 +1275,72 @@ tableEnv.from("MyView").printSchema() // ) ``` {{< /tab >}} +{{< tab "Python" >}} +```python +from pyflink.common.typeinfo import Types +from pyflink.datastream import StreamExecutionEnvironment +from pyflink.table import DataTypes, StreamTableEnvironment, Schema + +env = StreamExecutionEnvironment.get_execution_environment() +t_env = StreamTableEnvironment.create(env) +ds = env.from_collection([(12, "Alice"), (0, "Bob")], type_info=Types.TUPLE([Types.LONG(), Types.STRING()])) + +# === EXAMPLE 1 === + +# register the DataStream as view "MyView" in the current session +# all columns are derived automatically + +t_env.create_temporary_view("MyView", ds) + +t_env.from_path("MyView").print_schema() + +# prints: +# ( +# `f0` BIGINT NOT NULL, +# `f1` STRING +# ) + +# === EXAMPLE 2 === + +# register the DataStream as view "MyView" in the current session, +# provide a schema to adjust the columns similar to `fromDataStream` + +# in this example, the derived NOT NULL information has been removed + +t_env.create_temporary_view( + "MyView", + ds, + Schema.new_builder() + .column("f0", "BIGINT") + .column("f1", "STRING") + .build()) + +t_env.from_path("MyView").print_schema() + +# prints: +# ( +# `f0` BIGINT, +# `f1` STRING +# ) + + +# === EXAMPLE 3 === + +# use the Table API before creating the view if it is only about renaming columns + +t_env.create_temporary_view( + "MyView", + t_env.from_data_stream(ds).alias("id", "name")) + +t_env.from_path("MyView").print_schema() + +# prints: +# ( +# `id` BIGINT NOT NULL, +# `name` STRING +# ) +``` +{{< /tab >}} {{< /tabs >}} {{< top >}} @@ -1182,6 +1471,31 @@ val dataStream: DataStream[User] = DataTypes.FIELD("event_time", DataTypes.TIMESTAMP_LTZ(3)))) ``` {{< /tab >}} +{{< tab "Python" >}} +```python +t_env.execute_sql( + "CREATE TABLE GeneratedTable " + + "(" + + " name STRING," + + " score INT," + + " event_time TIMESTAMP_LTZ(3)," + + " WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND" + + ")" + + "WITH ('connector'='datagen')"); + +table = t_env.from_path("GeneratedTable"); + + +# === EXAMPLE 1 === + +# use the default conversion to instances of Row + +# since `event_time` is a single rowtime attribute, it is inserted into the DataStream +# metadata and watermarks are propagated + +ds = t_env.to_data_stream(table) +``` +{{< /tab >}} {{< /tabs >}} Note that only non-updating tables are supported by `toDataStream`. Usually, time-based operations @@ -1395,6 +1709,77 @@ tableEnv // +----+--------------------------------+-------------+ ``` {{< /tab >}} +{{< tab "Python" >}} +```python +from pyflink.common import Row, RowKind +from pyflink.common.typeinfo import Types +from pyflink.datastream import StreamExecutionEnvironment +from pyflink.table import DataTypes, StreamTableEnvironment, Schema + +env = StreamExecutionEnvironment.get_execution_environment() +t_env = StreamTableEnvironment.create(env) + +# === EXAMPLE 1 === + +# create a changelog DataStream + +ds = env.from_collection([ + Row.of_kind(RowKind.INSERT, "Alice", 12), + Row.of_kind(RowKind.INSERT, "Bob", 5), + Row.of_kind(RowKind.UPDATE_BEFORE, "Alice", 12), + Row.of_kind(RowKind.UPDATE_AFTER, "Alice", 100)], + type_info=Types.ROW([Types.STRING(),Types.INT()])) + +# interpret the DataStream as a Table +table = t_env.from_changelog_stream(ds) + + +# register the table under a name and perform an aggregation +t_env.create_temporary_view("InputTable", table) +t_env.execute_sql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY f0").print() + +# prints: +# +----+--------------------------------+-------------+ +# | op | name | score | +# +----+--------------------------------+-------------+ +# | +I | Bob | 5 | +# | +I | Alice | 12 | +# | -D | Alice | 12 | +# | +I | Alice | 100 | +# +----+--------------------------------+-------------+ + +# === EXAMPLE 2 === + +# interpret the stream as an upsert stream (without a need for UPDATE_BEFORE) + +# create a changelog DataStream +ds = env.from_collection([ + Row.of_kind(RowKind.INSERT, "Alice", 12), + Row.of_kind(RowKind.INSERT, "Bob", 5), + Row.of_kind(RowKind.UPDATE_AFTER, "Alice", 100)], + type_info=Types.ROW([Types.STRING(),Types.INT()])) + +# interpret the DataStream as a Table +table = t_env.from_changelog_stream( + ds, + Schema.new_builder().primary_key("f0").build(), + ChangelogMode.upsert()) + +# register the table under a name and perform an aggregation +t_env.create_temporary_view("InputTable", table) +t_env.execute_sql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY f0").print() + +# prints: +# +----+--------------------------------+-------------+ +# | op | name | score | +# +----+--------------------------------+-------------+ +# | +I | Bob | 5 | +# | +I | Alice | 12 | +# | -D | Alice | 12 | +# | +I | Alice | 100 | +# +----+--------------------------------+-------------+ +``` +{{< /tab >}} {{< /tabs >}} The default `ChangelogMode` shown in example 1 should be sufficient for most use cases as it accepts @@ -1669,6 +2054,90 @@ val dataStream: DataStream[Row] = tableEnv.toChangelogStream( // leads to a stream of Row(name: StringData, score: Integer, event_time: Long) ``` {{< /tab >}} +{{< tab "Python" >}} +```python +from pyflink.common import Row +from pyflink.datastream import StreamExecutionEnvironment +from pyflink.datastream.functions import ProcessFunction +from pyflink.table import DataTypes, StreamTableEnvironment, Schema +from pyflink.table.expressions import col + +env = StreamExecutionEnvironment.get_execution_environment() +t_env = StreamTableEnvironment.create(env) + +# create Table with event-time +t_env.execute_sql( + "CREATE TABLE GeneratedTable " + + "(" + + " name STRING," + + " score INT," + + " event_time TIMESTAMP_LTZ(3)," + + " WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND" + + ")" + + "WITH ('connector'='datagen')") + +table = t_env.from_path("GeneratedTable") + +# === EXAMPLE 1 === + +# convert to DataStream in the simplest and most general way possible (no event-time) +simple_table = t_env.from_elements([Row("Alice", 12), Row("Alice", 2), Row("Bob", 12)], + DataTypes.ROW([DataTypes.FIELD("name", DataTypes.STRING()), + DataTypes.FIELD("score", DataTypes.INT())])) + +simple_table = simple_table.group_by(col('name')).select(col('name'), col('score').sum) + +t_env.to_changelog_stream(simple_table).print() + +env.execute() + +# prints: +# +I[Bob, 12] +# +I[Alice, 12] +# -U[Alice, 12] +# +U[Alice, 14] + +# === EXAMPLE 2 === + +# convert to DataStream in the simplest and most general way possible (with event-time) + +ds = t_env.to_changelog_stream(table) + +# since `event_time` is a single time attribute in the schema, it is set as the +# stream record's timestamp by default; however, at the same time, it remains part of the Row + +class MyProcessFunction(ProcessFunction): + def process_element(self, row, ctx): + print(row) + assert ctx.timestamp() == row.event_time.to_epoch_milli() + +ds.process(MyProcessFunction()) + +env.execute() + +# === EXAMPLE 3 === + +# convert to DataStream but write out the time attribute as a metadata column which means +# it is not part of the physical schema anymore + +ds = t_env.to_changelog_stream( + table, + Schema.new_builder() + .column("name", "STRING") + .column("score", "INT") + .column_by_metadata("rowtime", "TIMESTAMP_LTZ(3)") + .build()) + +class MyProcessFunction(ProcessFunction): + def process_element(self, row, ctx): + print(row) + print(ctx.timestamp()) + +ds.process(MyProcessFunction()) + +env.execute() +``` +{{< /tab >}} {{< /tabs >}} For more information about which conversions are supported for data types in Example 4, see the @@ -1813,6 +2282,15 @@ val stream: DataStream[(Long, String)] = ??? val table2: Table = tableEnv.fromDataStream(stream, $"myLong", $"myString") ``` {{< /tab >}} +{{< tab "Python" >}} +```python +t_env = ... # type: StreamTableEnvironment + +stream = ... # type: DataStream of Types.TUPLE([Types.LONG(), Types.STRING()]) + +table2 = t_env.from_data_stream(stream, col('my_long'), col('my_stram')) +``` +{{< /tab >}} {{< /tabs >}} {{< top >}} @@ -1912,13 +2390,13 @@ val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](ta ### 数据类型到 Table Schema 的映射 Flink 的 DataStream API 支持多样的数据类型。 -例如 Tuple(Scala 内置以及Flink Java tuple)、POJO 类型、Scala case class 类型以及 Flink 的 Row 类型等允许嵌套且有多个可在表的表达式中访问的字段的复合数据类型。其他类型被视为原子类型。下面,我们讨论 Table API 如何将这些数据类型类型转换为内部 row 表示形式,并提供将 `DataStream` 转换成 `Table` 的样例。 +例如 Tuple(Scala 内置,Flink Java tuple 和 Python tuples)、POJO 类型、Scala case class 类型以及 Flink 的 Row 类型等允许嵌套且有多个可在表的表达式中访问的字段的复合数据类型。其他类型被视为原子类型。下面,我们讨论 Table API 如何将这些数据类型类型转换为内部 row 表示形式,并提供将 `DataStream` 转换成 `Table` 的样例。 数据类型到 table schema 的映射有两种方式:**基于字段位置**或**基于字段名称**。 **基于位置映射** -基于位置的映射可在保持字段顺序的同时为字段提供更有意义的名称。这种映射方式可用于*具有特定的字段顺序*的复合数据类型以及原子类型。如 tuple、row 以及 case class 这些复合数据类型都有这样的字段顺序。然而,POJO 类型的字段则必须通过名称映射(参见下一章)。可以将字段投影出来,但不能使用`as`重命名。 +基于位置的映射可在保持字段顺序的同时为字段提供更有意义的名称。这种映射方式可用于*具有特定的字段顺序*的复合数据类型以及原子类型。如 tuple、row 以及 case class 这些复合数据类型都有这样的字段顺序。然而,POJO 类型的字段则必须通过名称映射(参见下一章)。可以将字段投影出来,但不能使用`as`(Java 和 Scala) 或者 `alias`(Python)重命名。 定义基于位置的映射时,输入数据类型中一定不能存在指定的名称,否则 API 会假定应该基于字段名称进行映射。如果未指定任何字段名称,则使用默认的字段名称和复合数据类型的字段顺序,或者使用 `f0` 表示原子类型。 @@ -1950,6 +2428,22 @@ val table: Table = tableEnv.fromDataStream(stream, $"myLong") val table: Table = tableEnv.fromDataStream(stream, $"myLong", $"myInt") ``` {{< /tab >}} +{{< tab "Python" >}} +```python +from pyflink.table.expressions import col + +# get a TableEnvironment +t_env = ... # see "Create a TableEnvironment" section + +stream = ... # type: DataStream of Types.Tuple([Types.LONG(), Types.INT()]) + +# convert DataStream into Table with field "my_long" only +table = t_env.from_data_stream(stream, col('my_long')) + +# convert DataStream into Table with field names "my_long" and "my_int" +table = t_env.from_data_stream(stream, col('my_long'), col('my_int')) +``` +{{< /tab >}} {{< /tabs >}} **基于名称的映射** @@ -1992,6 +2486,25 @@ val table: Table = tableEnv.fromDataStream(stream, $"_2", $"_1") val table: Table = tableEnv.fromDataStream(stream, $"_2" as "myInt", $"_1" as "myLong") ``` {{< /tab >}} +{{< tab "Python" >}} +```python +from pyflink.table.expressions import col + +# get a TableEnvironment +t_env = ... # see "Create a TableEnvironment" section + +stream = ... # type: DataStream of Types.Tuple([Types.LONG(), Types.INT()]) + +# convert DataStream into Table with field "f1" only +table = t_env.from_data_stream(stream, col('f1')) + +# convert DataStream into Table with swapped fields +table = t_env.from_data_stream(stream, col('f1'), col('f0')) + +# convert DataStream into Table with swapped fields and field names "my_int" and "my_long" +table = t_env.from_data_stream(stream, col('f1').alias('my_int'), col('f0').alias('my_long')) +``` +{{< /tab >}} {{< /tabs >}} <a name="atomic-types"></a> @@ -2026,11 +2539,26 @@ val table: Table = tableEnv.fromDataStream(stream) val table: Table = tableEnv.fromDataStream(stream, $"myLong") ``` {{< /tab >}} +{{< tab "Python" >}} +```python +from pyflink.table.expressions import col + +t_env = ... + +stream = ... # types: DataStream of Types.Long() + +# Convert DataStream into Table with default field name "f0" +table = t_env.from_data_stream(stream) + +# Convert DataStream into Table with field name "my_long" +table = t_env.from_data_stream(stream, col('my_long')) +``` +{{< /tab >}} {{< /tabs >}} <a name="tuples-scala-and-java-and-case-classes-scala-only"></a> -#### Tuple类型(Scala 和 Java)和 Case Class类型(仅 Scala) +#### Tuple类型(Scala , Java 和 Python)和 Case Class类型(仅 Scala) Flink 支持 Scala 的内置 tuple 类型并给 Java 提供自己的 tuple 类型。 两种 tuple 的 DataStream 都能被转换成表。 @@ -2041,6 +2569,12 @@ Flink 支持 Scala 的内置 tuple 类型并给 Java 提供自己的 tuple 类 {{< tabs "130f44c6-7432-465a-ae8a-b4c436888361" >}} {{< tab "Java" >}} +Flink 给 Java 提供自己的 tuple 类型。 +tuple 的 DataStream 都能被转换成表。 +可以通过提供所有字段名称来重命名字段(基于位置映射)。 +如果没有指明任何字段名称,则会使用默认的字段名称。 +如果引用了原始字段名称(对于 Flink tuple 为`f0`、`f1` ... ...),则 API 会假定映射是基于名称的而不是基于位置的。 +基于名称的映射可以通过 `as` 对字段和投影进行重新排序。 ```java StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section @@ -2060,6 +2594,12 @@ Table table = tableEnv.fromDataStream(stream, $("f1").as("myString"), $("f0").as ``` {{< /tab >}} {{< tab "Scala" >}} +Flink 支持 Scala 的内置 tuple 类型。 +tuple 的 DataStream 都能被转换成表。 +可以通过提供所有字段名称来重命名字段(基于位置映射)。 +如果没有指明任何字段名称,则会使用默认的字段名称。 +如果引用了原始字段名称(对于 Scala tuple 为`_1`、`_2` ... ...),则 API 会假定映射是基于名称的而不是基于位置的。 +基于名称的映射可以通过 `as` 对字段和投影进行重新排序。 ```scala // get a TableEnvironment val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section @@ -2090,6 +2630,33 @@ val table: Table = tableEnv.fromDataStream(stream, $"age" as "myAge", $"name" as ``` {{< /tab >}} +{{< tab "Python" >}} +Flink 支持 Python 的内置 tuple 类型。 +tuple 的 DataStream 都能被转换成表。 +可以通过提供所有字段名称来重命名字段(基于位置映射)。 +如果没有指明任何字段名称,则会使用默认的字段名称。 +如果引用了原始字段名称(对于 Python tuple 为`f0`、`f1` ... ...),则 API 会假定映射是基于名称的而不是基于位置的。 +基于名称的映射可以通过 `alias` 对字段和投影进行重新排序。 + +```python +from pyflink.table.expressions import col + +stream = ... # type: DataStream of Types.TUPLE([Types.LONG(), Types.STRING()]) + +# convert DataStream into Table with renamed field names "my_long", "my_string" (position-based) +table = t_env.from_data_stream(stream, col('my_long'), col('my_string')) + +# convert DataStream into Table with reordered fields "f1", "f0" (name-based) +table = t_env.from_data_stream(stream, col('f1'), col('f0')) + +# convert DataStream into Table with projected field "f1" (name-based) +table = t_env.from_data_stream(stream, col('f1')) + +# convert DataStream into Table with reordered and aliased fields "my_string", "my_long" (name-based) +table = t_env.from_data_stream(stream, col('f1').alias('my_string'), col('f0').alias('my_long')) + +``` +{{< /tab >}} {{< /tabs >}} <a name="pojo-java-and-scala"></a> @@ -2136,6 +2703,9 @@ val table: Table = tableEnv.fromDataStream(stream, $"name") val table: Table = tableEnv.fromDataStream(stream, $"name" as "myName") ``` {{< /tab >}} +{{< tab "Python" >}} +PyFlink 暂时还不支持自定义 PoJo 类型 +{{< /tab >}} {{< /tabs >}} <a name="row"></a> @@ -2187,6 +2757,28 @@ val table: Table = tableEnv.fromDataStream(stream, $"name") val table: Table = tableEnv.fromDataStream(stream, $"name" as "myName") ``` {{< /tab >}} +{{< tab "Python" >}} +```python +from pyflink.table.expressions import col + +t_env = ...; + +# DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo` +stream = ... + +# Convert DataStream into Table with renamed field names "my_name", "my_age" (position-based) +table = t_env.from_data_stream(stream, col('my_name'), col('my_age')) + +# Convert DataStream into Table with renamed fields "my_name", "my_age" (name-based) +table = t_env.from_data_stream(stream, col('name').alias('my_name'), col('age').alias('my_age')) + +# Convert DataStream into Table with projected field "name" (name-based) +table = t_env.from_data_stream(stream, col('name')) + +# Convert DataStream into Table with projected and renamed field "my_name" (name-based) +table = t_env.from_data_stream(stream, col('name').alias("my_name")) +``` +{{< /tab >}} {{< /tabs >}} {{< top >}} diff --git a/docs/content/docs/dev/python/datastream/data_types.md b/docs/content/docs/dev/python/datastream/data_types.md index 17a6e7ae..87a357a 100644 --- a/docs/content/docs/dev/python/datastream/data_types.md +++ b/docs/content/docs/dev/python/datastream/data_types.md @@ -110,6 +110,7 @@ The table below shows the types supported now and how to define them: |`Types.STRING()` | `str` | `java.lang.String` | |`Types.BIG_INT()` | `int` | `java.math.BigInteger` | |`Types.BIG_DEC()` | `decimal.Decimal` | `java.math.BigDecimal` | +|`Types.INSTANT()` | `pyflink.common.time.Instant` | `java.time.Instant` | |`Types.TUPLE()` | `tuple` | `org.apache.flink.api.java.tuple.Tuple0` ~ `org.apache.flink.api.java.tuple.Tuple25` | |`Types.ROW()` | `pyflink.common.Row` | `org.apache.flink.types.Row` | |`Types.ROW_NAMED()` | `pyflink.common.Row` | `org.apache.flink.types.Row` | diff --git a/docs/content/docs/dev/python/table/conversion_of_data_stream.md b/docs/content/docs/dev/python/table/conversion_of_data_stream.md new file mode 100644 index 0000000..25fbfe8 --- /dev/null +++ b/docs/content/docs/dev/python/table/conversion_of_data_stream.md @@ -0,0 +1,25 @@ +--- +title: "Conversions between Table and DataStream" +weight: 42 +type: docs +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +<meta http-equiv="refresh" content="0; url={{< ref "docs/dev/table/data_stream_api" >}} "/> diff --git a/docs/content/docs/dev/table/data_stream_api.md b/docs/content/docs/dev/table/data_stream_api.md index fd39165..f2969b4 100644 --- a/docs/content/docs/dev/table/data_stream_api.md +++ b/docs/content/docs/dev/table/data_stream_api.md @@ -24,11 +24,6 @@ under the License. # DataStream API Integration -{{< hint info >}} -This page only discusses the integration with DataStream API in JVM languages such as Java or Scala. -For Python, see the [Python API]({{< ref "docs/dev/python/overview" >}}) area. -{{< /hint >}} - Both Table API and DataStream API are equally important when it comes to defining a data processing pipeline. @@ -75,7 +70,7 @@ mentioned here for completeness. Converting between DataStream and Table --------------------------------------- -Flink provides a specialized `StreamTableEnvironment` in Java and Scala for integrating with the +Flink provides a specialized `StreamTableEnvironment` for integrating with the DataStream API. Those environments extend the regular `TableEnvironment` with additional methods and take the `StreamExecutionEnvironment` used in the DataStream API as a parameter. @@ -123,9 +118,9 @@ resultStream.print(); env.execute(); // prints: -// +I[Alice] -// +I[Bob] -// +I[John] +// +I[ALICE] +// +I[BOB] +// +I[JOHN] ``` {{< /tab >}} {{< tab "Scala" >}} @@ -156,9 +151,42 @@ resultStream.print() env.execute() // prints: -// +I[Alice] -// +I[Bob] -// +I[John] +// +I[ALICE] +// +I[BOB] +// +I[JOHN] +``` +{{< /tab >}} +{{< tab "Python" >}} +```python +from pyflink.datastream import StreamExecutionEnvironment +from pyflink.table import StreamTableEnvironment +from pyflink.common.typeinfo import Types + +env = StreamExecutionEnvironment.get_execution_environment() +t_env = StreamTableEnvironment.create(env) + +# create a DataStream +ds = env.from_collection(["Alice", "Bob", "John"], Types.STRING()) + +# interpret the insert-only DataStream as a Table +t = t_env.from_data_stream(ds) + +# register the Table object as a view and query it +t_env.create_temporary_view("InputTable", t) +res_table = t_env.sql_query("SELECT UPPER(f0) FROM InputTable") + +# interpret the insert-only Table as a DataStream again +res_ds = t_env.to_data_stream(res_table) + +# add a printing sink and execute in DataStream API +res_ds.print() + +env.execute() + +# prints: +# +I[ALICE] +# +I[BOB] +# +I[JOHN] ``` {{< /tab >}} {{< /tabs >}} @@ -264,6 +292,44 @@ env.execute() // +U[Alice, 112] ``` {{< /tab >}} +{{< tab "Python" >}} +```python +from pyflink.datastream import StreamExecutionEnvironment +from pyflink.table import StreamTableEnvironment +from pyflink.common.typeinfo import Types +from pyflink.common import Row + +#create environments of both APIs +env = StreamExecutionEnvironment.get_execution_environment() +t_env = StreamTableEnvironment.create(env) + +# create a DataStream +ds = env.from_collection([("Alice", 12), ("Bob", 10), ("Alice", 100)], + type_info=Types.ROW_NAMED( + ["a", "b"], + [Types.STRING(), Types.INT()])) + +input_table = t_env.from_data_stream(ds).alias("name", "score") + +# register the Table object as a view and query it +# the query contains an aggregation that produces updates +t_env.create_temporary_view("InputTable", input_table) +res_table = t_env.sql_query("SELECT name, SUM(score) FROM InputTable GROUP BY name") + +# interpret the updating Table as a changelog DataStream +res_stream = t_env.to_changelog_stream(res_table) + +# add a printing sink and execute in DataStream API +res_stream.print() +env.execute() + +# prints: +# +I[Alice, 12] +# +I[Bob, 10] +# -U[Alice, 12] +# +U[Alice, 112] +``` +{{< /tab >}} {{< /tabs >}} The complete semantics of `fromChangelogStream` and `toChangelogStream` can be found in the [dedicated section below](#handling-of-insert-only-streams). @@ -399,6 +465,32 @@ tableEnv.getConfig.setLocalTimeZone(ZoneId.of("Europe/Berlin")) // start defining your pipelines in both APIs... ``` {{< /tab >}} +{{< tab "Python" >}} +```python +from pyflink.datastream import StreamExecutionEnvironment +from pyflink.table import StreamTableEnvironment +from pyflink.datastream.checkpointing_mode import CheckpointingMode + + +# create Python DataStream API +env = StreamExecutionEnvironment.get_execution_environment() + +# set various configuration early +env.set_max_parallelism(256) + +env.get_config().add_default_kryo_serializer("type_class_name", "serializer_class_name") + +env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode.EXACTLY_ONCE) + +# then switch to Python Table API +t_env = StreamTableEnvironment.create(env) + +# set configuration early +t_env.get_config().set_local_timezone("Europe/Berlin") + +# start defining your pipelines in both APIs... +``` +{{< /tab >}} {{< /tabs >}} ### Execution Behavior @@ -818,6 +910,136 @@ table.printSchema() // note: the watermark strategy is not shown due to the inserted column reordering projection ``` {{< /tab >}} +{{< tab "Python" >}} +```python +from pyflink.common.time import Instant +from pyflink.common.types import Row, RowKind +from pyflink.common.typeinfo import Types +from pyflink.datastream import StreamExecutionEnvironment +from pyflink.datastream.functions import ProcessFunction +from pyflink.table import DataTypes, StreamTableEnvironment, Schema +from pyflink.table.connector import ChangelogMode +from pyflink.table.expressions import col + +env = StreamExecutionEnvironment.get_execution_environment() +t_env = StreamTableEnvironment.create(env) +ds = env.from_collection([ + Row("Alice", 12, Instant.of_epoch_milli(1000)), + Row("Bob", 5, Instant.of_epoch_milli(1001)), + Row("Alice", 10, Instant.of_epoch_milli(1002))], + type_info=Types.ROW_NAMED(['name', 'score', 'event_time'], [Types.STRING(), Types.INT(), Types.INSTANT()])) + +# === EXAMPLE 1 === + +# derive all physical columns automatically + +table = t_env.from_data_stream(ds) +table.print_schema() + +# prints: +# ( +# `name` STRING, +# `score` INT, +# `event_time` TIMESTAMP_LTZ(9) +# ) + + +# === EXAMPLE 2 === + +# derive all physical columns automatically +# but add computed columns (in this case for creating a proctime attribute column) + +table = t_env.from_data_stream( + ds, + Schema.new_builder() + .column_by_expression("proc_time", "PROCTIME()") + .build()) +table.print_schema() + +# prints: +# ( +# `name` STRING, +# `score` INT, +# `event_time` TIMESTAMP_LTZ(9), +# `proc_time` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME* AS PROCTIME() +# ) + +# === EXAMPLE 3 === + +# derive all physical columns automatically +# but add computed columns (in this case for creating a rowtime attribute column) +# and a custom watermark strategy + +table = t_env.from_data_stream( + ds, + Schema.new_builder() + .column_by_expression("rowtime", "CAST(event_time AS TIMESTAMP_LTZ(3))") + .watermark("rowtime", "rowtime - INTERVAL '10' SECOND") + .build()) +table.print_schema() + +# prints: +# ( +# `name` STRING, +# `score` INT, +# `event_time` TIMESTAMP_LTZ(9), +# `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* AS CAST(event_time AS TIMESTAMP_LTZ(3)), +# WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS rowtime - INTERVAL '10' SECOND +# ) + +# === EXAMPLE 4 === + +# derive all physical columns automatically +# but access the stream record's timestamp for creating a rowtime attribute column +# also rely on the watermarks generated in the DataStream API + +# we assume that a watermark strategy has been defined for `dataStream` before +# (not part of this example) +table = t_env.from_data_stream( + ds, + Schema.new_builder() + .column_by_metadata("rowtime", "TIMESTAMP_LTZ(3)") + .watermark("rowtime", "SOURCE_WATERMARK()") + .build()) +table.print_schema() + +# prints: +# ( +# `name` STRING, +# `score` INT, +# `event_time` TIMESTAMP_LTZ(9), +# `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* METADATA, +# WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS SOURCE_WATERMARK() +# ) + + + +# === EXAMPLE 5 === + +# define physical columns manually +# in this example, +# - we can reduce the default precision of timestamps from 9 to 3 +# - we also project the columns and put `event_time` to the beginning + +table = t_env.from_data_stream( + ds, + Schema.new_builder() + .column("event_time", "TIMESTAMP_LTZ(3)") + .column("name", "STRING") + .column("score", "INT") + .watermark("event_time", "SOURCE_WATERMARK()") + .build()) +table.print_schema() + +# prints: +# ( +# `event_time` TIMESTAMP_LTZ(3) *ROWTIME*, +# `name` STRING, +# `score` INT +# ) +# note: the watermark strategy is not shown due to the inserted column reordering projection +``` +{{< /tab >}} {{< /tabs >}} Example 1 illustrates a simple use case when no time-based operations are needed. @@ -1052,6 +1274,72 @@ tableEnv.from("MyView").printSchema() // ) ``` {{< /tab >}} +{{< tab "Python" >}} +```python +from pyflink.common.typeinfo import Types +from pyflink.datastream import StreamExecutionEnvironment +from pyflink.table import DataTypes, StreamTableEnvironment, Schema + +env = StreamExecutionEnvironment.get_execution_environment() +t_env = StreamTableEnvironment.create(env) +ds = env.from_collection([(12, "Alice"), (0, "Bob")], type_info=Types.TUPLE([Types.LONG(), Types.STRING()])) + +# === EXAMPLE 1 === + +# register the DataStream as view "MyView" in the current session +# all columns are derived automatically + +t_env.create_temporary_view("MyView", ds) + +t_env.from_path("MyView").print_schema() + +# prints: +# ( +# `f0` BIGINT NOT NULL, +# `f1` STRING +# ) + +# === EXAMPLE 2 === + +# register the DataStream as view "MyView" in the current session, +# provide a schema to adjust the columns similar to `fromDataStream` + +# in this example, the derived NOT NULL information has been removed + +t_env.create_temporary_view( + "MyView", + ds, + Schema.new_builder() + .column("f0", "BIGINT") + .column("f1", "STRING") + .build()) + +t_env.from_path("MyView").print_schema() + +# prints: +# ( +# `f0` BIGINT, +# `f1` STRING +# ) + + +# === EXAMPLE 3 === + +# use the Table API before creating the view if it is only about renaming columns + +t_env.create_temporary_view( + "MyView", + t_env.from_data_stream(ds).alias("id", "name")) + +t_env.from_path("MyView").print_schema() + +# prints: +# ( +# `id` BIGINT NOT NULL, +# `name` STRING +# ) +``` +{{< /tab >}} {{< /tabs >}} {{< top >}} @@ -1182,6 +1470,31 @@ val dataStream: DataStream[User] = DataTypes.FIELD("event_time", DataTypes.TIMESTAMP_LTZ(3)))) ``` {{< /tab >}} +{{< tab "Python" >}} +```python +t_env.execute_sql( + "CREATE TABLE GeneratedTable " + + "(" + + " name STRING," + + " score INT," + + " event_time TIMESTAMP_LTZ(3)," + + " WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND" + + ")" + + "WITH ('connector'='datagen')"); + +table = t_env.from_path("GeneratedTable"); + + +# === EXAMPLE 1 === + +# use the default conversion to instances of Row + +# since `event_time` is a single rowtime attribute, it is inserted into the DataStream +# metadata and watermarks are propagated + +ds = t_env.to_data_stream(table) +``` +{{< /tab >}} {{< /tabs >}} Note that only non-updating tables are supported by `toDataStream`. Usually, time-based operations @@ -1395,6 +1708,77 @@ tableEnv // +----+--------------------------------+-------------+ ``` {{< /tab >}} +{{< tab "Python" >}} +```python +from pyflink.common import Row, RowKind +from pyflink.common.typeinfo import Types +from pyflink.datastream import StreamExecutionEnvironment +from pyflink.table import DataTypes, StreamTableEnvironment, Schema + +env = StreamExecutionEnvironment.get_execution_environment() +t_env = StreamTableEnvironment.create(env) + +# === EXAMPLE 1 === + +# create a changelog DataStream + +ds = env.from_collection([ + Row.of_kind(RowKind.INSERT, "Alice", 12), + Row.of_kind(RowKind.INSERT, "Bob", 5), + Row.of_kind(RowKind.UPDATE_BEFORE, "Alice", 12), + Row.of_kind(RowKind.UPDATE_AFTER, "Alice", 100)], + type_info=Types.ROW([Types.STRING(),Types.INT()])) + +# interpret the DataStream as a Table +table = t_env.from_changelog_stream(ds) + + +# register the table under a name and perform an aggregation +t_env.create_temporary_view("InputTable", table) +t_env.execute_sql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY f0").print() + +# prints: +# +----+--------------------------------+-------------+ +# | op | name | score | +# +----+--------------------------------+-------------+ +# | +I | Bob | 5 | +# | +I | Alice | 12 | +# | -D | Alice | 12 | +# | +I | Alice | 100 | +# +----+--------------------------------+-------------+ + +# === EXAMPLE 2 === + +# interpret the stream as an upsert stream (without a need for UPDATE_BEFORE) + +# create a changelog DataStream +ds = env.from_collection([ + Row.of_kind(RowKind.INSERT, "Alice", 12), + Row.of_kind(RowKind.INSERT, "Bob", 5), + Row.of_kind(RowKind.UPDATE_AFTER, "Alice", 100)], + type_info=Types.ROW([Types.STRING(),Types.INT()])) + +# interpret the DataStream as a Table +table = t_env.from_changelog_stream( + ds, + Schema.new_builder().primary_key("f0").build(), + ChangelogMode.upsert()) + +# register the table under a name and perform an aggregation +t_env.create_temporary_view("InputTable", table) +t_env.execute_sql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY f0").print() + +# prints: +# +----+--------------------------------+-------------+ +# | op | name | score | +# +----+--------------------------------+-------------+ +# | +I | Bob | 5 | +# | +I | Alice | 12 | +# | -D | Alice | 12 | +# | +I | Alice | 100 | +# +----+--------------------------------+-------------+ +``` +{{< /tab >}} {{< /tabs >}} The default `ChangelogMode` shown in example 1 should be sufficient for most use cases as it accepts @@ -1669,6 +2053,90 @@ val dataStream: DataStream[Row] = tableEnv.toChangelogStream( // leads to a stream of Row(name: StringData, score: Integer, event_time: Long) ``` {{< /tab >}} +{{< tab "Python" >}} +```python +from pyflink.common import Row +from pyflink.datastream import StreamExecutionEnvironment +from pyflink.datastream.functions import ProcessFunction +from pyflink.table import DataTypes, StreamTableEnvironment, Schema +from pyflink.table.expressions import col + +env = StreamExecutionEnvironment.get_execution_environment() +t_env = StreamTableEnvironment.create(env) + +# create Table with event-time +t_env.execute_sql( + "CREATE TABLE GeneratedTable " + + "(" + + " name STRING," + + " score INT," + + " event_time TIMESTAMP_LTZ(3)," + + " WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND" + + ")" + + "WITH ('connector'='datagen')") + +table = t_env.from_path("GeneratedTable") + +# === EXAMPLE 1 === + +# convert to DataStream in the simplest and most general way possible (no event-time) +simple_table = t_env.from_elements([Row("Alice", 12), Row("Alice", 2), Row("Bob", 12)], + DataTypes.ROW([DataTypes.FIELD("name", DataTypes.STRING()), + DataTypes.FIELD("score", DataTypes.INT())])) + +simple_table = simple_table.group_by(col('name')).select(col('name'), col('score').sum) + +t_env.to_changelog_stream(simple_table).print() + +env.execute() + +# prints: +# +I[Bob, 12] +# +I[Alice, 12] +# -U[Alice, 12] +# +U[Alice, 14] + +# === EXAMPLE 2 === + +# convert to DataStream in the simplest and most general way possible (with event-time) + +ds = t_env.to_changelog_stream(table) + +# since `event_time` is a single time attribute in the schema, it is set as the +# stream record's timestamp by default; however, at the same time, it remains part of the Row + +class MyProcessFunction(ProcessFunction): + def process_element(self, row, ctx): + print(row) + assert ctx.timestamp() == row.event_time.to_epoch_milli() + +ds.process(MyProcessFunction()) + +env.execute() + +# === EXAMPLE 3 === + +# convert to DataStream but write out the time attribute as a metadata column which means +# it is not part of the physical schema anymore + +ds = t_env.to_changelog_stream( + table, + Schema.new_builder() + .column("name", "STRING") + .column("score", "INT") + .column_by_metadata("rowtime", "TIMESTAMP_LTZ(3)") + .build()) + +class MyProcessFunction(ProcessFunction): + def process_element(self, row, ctx): + print(row) + print(ctx.timestamp()) + +ds.process(MyProcessFunction()) + +env.execute() +``` +{{< /tab >}} {{< /tabs >}} For more information about which conversions are supported for data types in Example 4, see the @@ -1811,6 +2279,15 @@ val stream: DataStream[(Long, String)] = ??? val table2: Table = tableEnv.fromDataStream(stream, $"myLong", $"myString") ``` {{< /tab >}} +{{< tab "Python" >}} +```python +t_env = ... # type: StreamTableEnvironment + +stream = ... # type: DataStream of Types.TUPLE([Types.LONG(), Types.STRING()]) + +table2 = t_env.from_data_stream(stream, col('my_long'), col('my_stram')) +``` +{{< /tab >}} {{< /tabs >}} {{< top >}} @@ -1892,6 +2369,30 @@ val dsTuple: DataStream[(String, Int)] dsTuple = val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table) ``` {{< /tab >}} +{{< tab "Python" >}} +```python +from pyflink.table import DataTypes +from pyflink.common.typeinfo import Types + +t_env = ... + +table = t_env.from_elements([("john", 35), ("sarah", 32)], + DataTypes.ROW([DataTypes.FIELD("name", DataTypes.STRING()), + DataTypes.FIELD("age", DataTypes.INT())])) + +# Convert the Table into an append DataStream of Row by specifying the type information +ds_row = t_env.to_append_stream(table, Types.ROW([Types.STRING(), Types.INT()])) + +# Convert the Table into an append DataStream of Tuple[str, int] with TypeInformation +ds_tuple = t_env.to_append_stream(table, Types.TUPLE([Types.STRING(), Types.INT()])) + +# Convert the Table into a retract DataStream of Row by specifying the type information +# A retract stream of type X is a DataStream of Tuple[bool, X]. +# The boolean field indicates the type of the change. +# True is INSERT, false is DELETE. +retract_stream = t_env.to_retract_stream(table, Types.ROW([Types.STRING(), Types.INT()])) +``` +{{< /tab >}} {{< /tabs >}} **Note:** A detailed discussion about dynamic tables and their properties is given in the [Dynamic Tables](streaming/dynamic_tables.html) document. @@ -1905,13 +2406,13 @@ Once the Table is converted to a DataStream, please use the `StreamExecutionEnvi ### Mapping of Data Types to Table Schema Flink's DataStream API supports many diverse types. -Composite types such as Tuples (built-in Scala and Flink Java tuples), POJOs, Scala case classes, and Flink's Row type allow for nested data structures with multiple fields that can be accessed in table expressions. Other types are treated as atomic types. In the following, we describe how the Table API converts these types into an internal row representation and show examples of converting a `DataStream` into a `Table`. +Composite types such as Tuples (built-in Scala , Flink Java tuples and Python tuples), POJOs, Scala case classes, and Flink's Row type allow for nested data structures with multiple fields that can be accessed in table expressions. Other types are treated as atomic types. In the following, we describe how the Table API converts these types into an internal row representation and show examples of converting a `DataStream` into a `Table`. The mapping of a data type to a table schema can happen in two ways: **based on the field positions** or **based on the field names**. **Position-based Mapping** -Position-based mapping can be used to give fields a more meaningful name while keeping the field order. This mapping is available for composite data types *with a defined field order* and atomic types. Composite data types such as tuples, rows, and case classes have such a field order. However, fields of a POJO must be mapped based on the field names (see next section). Fields can be projected out but can't be renamed using an alias `as`. +Position-based mapping can be used to give fields a more meaningful name while keeping the field order. This mapping is available for composite data types *with a defined field order* and atomic types. Composite data types such as tuples, rows, and case classes have such a field order. However, fields of a POJO must be mapped based on the field names (see next section). Fields can be projected out but can't be renamed using an alias `as`(Java and Scala) or `alias`(Python). When defining a position-based mapping, the specified names must not exist in the input data type, otherwise, the API will assume that the mapping should happen based on the field names. If no field names are specified, the default field names and field order of the composite type are used or `f0` for atomic types. @@ -1943,6 +2444,22 @@ val table: Table = tableEnv.fromDataStream(stream, $"myLong") val table: Table = tableEnv.fromDataStream(stream, $"myLong", $"myInt") ``` {{< /tab >}} +{{< tab "Python" >}} +```python +from pyflink.table.expressions import col + +# get a TableEnvironment +t_env = ... # see "Create a TableEnvironment" section + +stream = ... # type: DataStream of Types.Tuple([Types.LONG(), Types.INT()]) + +# convert DataStream into Table with field "my_long" only +table = t_env.from_data_stream(stream, col('my_long')) + +# convert DataStream into Table with field names "my_long" and "my_int" +table = t_env.from_data_stream(stream, col('my_long'), col('my_int')) +``` +{{< /tab >}} {{< /tabs >}} **Name-based Mapping** @@ -1985,6 +2502,25 @@ val table: Table = tableEnv.fromDataStream(stream, $"_2", $"_1") val table: Table = tableEnv.fromDataStream(stream, $"_2" as "myInt", $"_1" as "myLong") ``` {{< /tab >}} +{{< tab "Python" >}} +```python +from pyflink.table.expressions import col + +# get a TableEnvironment +t_env = ... # see "Create a TableEnvironment" section + +stream = ... # type: DataStream of Types.Tuple([Types.LONG(), Types.INT()]) + +# convert DataStream into Table with field "f1" only +table = t_env.from_data_stream(stream, col('f1')) + +# convert DataStream into Table with swapped fields +table = t_env.from_data_stream(stream, col('f1'), col('f0')) + +# convert DataStream into Table with swapped fields and field names "my_int" and "my_long" +table = t_env.from_data_stream(stream, col('f1').alias('my_int'), col('f0').alias('my_long')) +``` +{{< /tab >}} {{< /tabs >}} #### Atomic Types @@ -2017,19 +2553,34 @@ val table: Table = tableEnv.fromDataStream(stream) val table: Table = tableEnv.fromDataStream(stream, $"myLong") ``` {{< /tab >}} +{{< tab "Python" >}} +```python +from pyflink.table.expressions import col + +t_env = ... + +stream = ... # types: DataStream of Types.Long() + +# Convert DataStream into Table with default field name "f0" +table = t_env.from_data_stream(stream) + +# Convert DataStream into Table with field name "my_long" +table = t_env.from_data_stream(stream, col('my_long')) +``` +{{< /tab >}} {{< /tabs >}} -#### Tuples (Scala and Java) and Case Classes (Scala only) +#### Tuples (Scala, Java, Python) and Case Classes (Scala only) -Flink supports Scala's built-in tuples and provides its own tuple classes for Java. -DataStreams of both kinds of tuples can be converted into tables. +{{< tabs "27c312af-7427-401c-a1b1-de9fe39f7b59" >}} +{{< tab "Java" >}} +Flink provides its own tuple classes for Java. +DataStreams of the the Java tuple classes can be converted into tables. Fields can be renamed by providing names for all fields (mapping based on position). If no field names are specified, the default field names are used. -If the original field names (`f0`, `f1`, ... for Flink Tuples and `_1`, `_2`, ... for Scala Tuples) are referenced, the API assumes that the mapping is name-based instead of position-based. +If the original field names (`f0`, `f1`, ... for Flink Tuples) are referenced, the API assumes that the mapping is name-based instead of position-based. Name-based mapping allows for reordering fields and projection with alias (`as`). -{{< tabs "27c312af-7427-401c-a1b1-de9fe39f7b59" >}} -{{< tab "Java" >}} ```java StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section @@ -2049,6 +2600,13 @@ Table table = tableEnv.fromDataStream(stream, $("f1").as("myString"), $("f0").as ``` {{< /tab >}} {{< tab "Scala" >}} +Flink supports Scala's built-in tuples. +DataStreams of Scala's built-in tuples can be converted into tables. +Fields can be renamed by providing names for all fields (mapping based on position). +If no field names are specified, the default field names are used. +If the original field names (`_1`, `_2`, ... for Scala Tuples) are referenced, the API assumes that the mapping is name-based instead of position-based. +Name-based mapping allows for reordering fields and projection with alias (`as`). + ```scala // get a TableEnvironment val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section @@ -2079,6 +2637,33 @@ val table: Table = tableEnv.fromDataStream(stream, $"age" as "myAge", $"name" as ``` {{< /tab >}} +{{< tab "Python" >}} +Flink supports Python's built-in Tuples. +DataStreams of tuples can be converted into tables. +Fields can be renamed by providing names for all fields (mapping based on position). +If no field names are specified, the default field names are used. +If the original field names (`f0`, `f1`, ... ) are referenced, the API assumes that the mapping is name-based instead of position-based. +Name-based mapping allows for reordering fields and projection with alias (`alias`). + +```python +from pyflink.table.expressions import col + +stream = ... # type: DataStream of Types.TUPLE([Types.LONG(), Types.STRING()]) + +# convert DataStream into Table with renamed field names "my_long", "my_string" (position-based) +table = t_env.from_data_stream(stream, col('my_long'), col('my_string')) + +# convert DataStream into Table with reordered fields "f1", "f0" (name-based) +table = t_env.from_data_stream(stream, col('f1'), col('f0')) + +# convert DataStream into Table with projected field "f1" (name-based) +table = t_env.from_data_stream(stream, col('f1')) + +# convert DataStream into Table with reordered and aliased fields "my_string", "my_long" (name-based) +table = t_env.from_data_stream(stream, col('f1').alias('my_string'), col('f0').alias('my_long')) + +``` +{{< /tab >}} {{< /tabs >}} #### POJO (Java and Scala) @@ -2123,6 +2708,9 @@ val table: Table = tableEnv.fromDataStream(stream, $"name") val table: Table = tableEnv.fromDataStream(stream, $"name" as "myName") ``` {{< /tab >}} +{{< tab "Python" >}} +Custom PoJo Class is unsupported in PyFlink now. +{{< /tab >}} {{< /tabs >}} #### Row @@ -2172,6 +2760,29 @@ val table: Table = tableEnv.fromDataStream(stream, $"name") val table: Table = tableEnv.fromDataStream(stream, $"name" as "myName") ``` {{< /tab >}} + +{{< tab "Python" >}} +```python +from pyflink.table.expressions import col + +t_env = ...; + +# DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo` +stream = ... + +# Convert DataStream into Table with renamed field names "my_name", "my_age" (position-based) +table = t_env.from_data_stream(stream, col('my_name'), col('my_age')) + +# Convert DataStream into Table with renamed fields "my_name", "my_age" (name-based) +table = t_env.from_data_stream(stream, col('name').alias('my_name'), col('age').alias('my_age')) + +# Convert DataStream into Table with projected field "name" (name-based) +table = t_env.from_data_stream(stream, col('name')) + +# Convert DataStream into Table with projected and renamed field "my_name" (name-based) +table = t_env.from_data_stream(stream, col('name').alias("my_name")) +``` +{{< /tab >}} {{< /tabs >}} {{< top >}}
