This is an automated email from the ASF dual-hosted git repository. hxb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8b4a5bf124db58ebe1e3b4787b7dfea91290ca29 Author: huangxingbo <[email protected]> AuthorDate: Fri Jul 23 18:09:27 2021 +0800 [FLINK-22911][python] Add to_changelog_stream/from_changelog_stream to StreamTableEnvironment in Python Table API This closes #16611. --- flink-python/pyflink/table/__init__.py | 4 +- flink-python/pyflink/table/changelog_mode.py | 47 +++++++ flink-python/pyflink/table/expressions.py | 2 +- flink-python/pyflink/table/table_environment.py | 150 ++++++++++++++++++++- .../table/tests/test_table_environment_api.py | 52 ++++++- 5 files changed, 251 insertions(+), 4 deletions(-) diff --git a/flink-python/pyflink/table/__init__.py b/flink-python/pyflink/table/__init__.py index bbba3cf..325cef1 100644 --- a/flink-python/pyflink/table/__init__.py +++ b/flink-python/pyflink/table/__init__.py @@ -68,6 +68,7 @@ Important classes of Flink Table API: """ from __future__ import absolute_import +from pyflink.table.changelog_mode import ChangelogMode from pyflink.table.data_view import DataView, ListView, MapView from pyflink.table.environment_settings import EnvironmentSettings from pyflink.table.explain_detail import ExplainDetail @@ -125,5 +126,6 @@ __all__ = [ 'TableAggregateFunction', 'UserDefinedType', 'WindowGroupedTable', - 'WriteMode' + 'WriteMode', + 'ChangelogMode' ] diff --git a/flink-python/pyflink/table/changelog_mode.py b/flink-python/pyflink/table/changelog_mode.py new file mode 100644 index 0000000..bd63e1d --- /dev/null +++ b/flink-python/pyflink/table/changelog_mode.py @@ -0,0 +1,47 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from pyflink.java_gateway import get_gateway + +__all__ = ['ChangelogMode'] + + +class ChangelogMode(object): + """ + The set of changes contained in a changelog. + """ + + def __init__(self, j_changelog_mode): + self._j_changelog_mode = j_changelog_mode + + @staticmethod + def insert_only(): + gateway = get_gateway() + return ChangelogMode( + gateway.jvm.org.apache.flink.table.connector.ChangelogMode.insertOnly()) + + @staticmethod + def upsert(): + gateway = get_gateway() + return ChangelogMode( + gateway.jvm.org.apache.flink.table.connector.ChangelogMode.upsert()) + + @staticmethod + def all(): + gateway = get_gateway() + return ChangelogMode( + gateway.jvm.org.apache.flink.table.connector.ChangelogMode.all()) diff --git a/flink-python/pyflink/table/expressions.py b/flink-python/pyflink/table/expressions.py index c169f31..69d9d7d 100644 --- a/flink-python/pyflink/table/expressions.py +++ b/flink-python/pyflink/table/expressions.py @@ -30,7 +30,7 @@ __all__ = ['if_then_else', 'lit', 'col', 'range_', 'and_', 'or_', 'UNBOUNDED_ROW 'temporal_overlaps', 'date_format', 'timestamp_diff', 'array', 'row', 'map_', 'row_interval', 'pi', 'e', 'rand', 'rand_integer', 'atan2', 'negative', 'concat', 'concat_ws', 'uuid', 'null_of', 'log', 'with_columns', 'without_columns', 'call', - 'call_sql'] + 'call_sql', 'source_watermark'] def _leaf_op(op_name: str) -> Expression: diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py index fcb9116..88afa5c 100644 --- a/flink-python/pyflink/table/table_environment.py +++ b/flink-python/pyflink/table/table_environment.py @@ -33,7 +33,7 @@ from pyflink.common import JobExecutionResult from pyflink.java_gateway import get_gateway from pyflink.serializers import BatchedSerializer, PickleSerializer from pyflink.table import Table, EnvironmentSettings, Expression, ExplainDetail, \ - Module, ModuleEntry, TableSink, Schema + Module, ModuleEntry, TableSink, Schema, ChangelogMode from pyflink.table.catalog import Catalog from pyflink.table.serializers import ArrowSerializer from pyflink.table.statement_set import StatementSet @@ -1967,6 +1967,60 @@ class StreamTableEnvironment(TableEnvironment): j_data_stream, fields_or_schema[0]._j_schema), t_env=self) raise ValueError("Invalid arguments for 'fields': %r" % fields_or_schema) + def from_changelog_stream(self, + data_stream: DataStream, + schema: Schema = None, + changelog_mode: ChangelogMode = None) -> Table: + """ + Converts the given DataStream of changelog entries into a Table. + + Compared to :func:`from_data_stream`, this method consumes instances of Row and evaluates + the RowKind flag that is contained in every record during runtime. The runtime behavior is + similar to that of a DynamicTableSource. + + If you don't specify the changelog_mode, the changelog containing all kinds of changes + (enumerated in RowKind) as the default ChangelogMode. + + Column names and types of the Table are automatically derived from the TypeInformation of + the DataStream. If the outermost record's TypeInformation is a CompositeType, it will be + flattened in the first level. Composite nested fields will not be accessible. + + By default, the stream record's timestamp and watermarks are not propagated unless + explicitly declared. + + This method allows to declare a Schema for the resulting table. The declaration is similar + to a {@code CREATE TABLE} DDL in SQL and allows to: + + 1. enrich or overwrite automatically derived columns with a custom DataType + 2. reorder columns + 3. add computed or metadata columns next to the physical columns + 4. access a stream record's timestamp + 5. declare a watermark strategy or propagate the DataStream watermarks + 6. declare a primary key + + See :func:`from_data_stream` for more information and examples of how to declare a Schema. + + :param data_stream: The changelog stream of Row. + :param schema: The customized schema for the final table. + :param changelog_mode: The expected kinds of changes in the incoming changelog. + :return: The converted Table. + """ + j_data_stream = data_stream._j_data_stream + JPythonConfigUtil = get_gateway().jvm.org.apache.flink.python.util.PythonConfigUtil + JPythonConfigUtil.configPythonOperator(j_data_stream.getExecutionEnvironment()) + if schema is None: + return Table(self._j_tenv.fromChangelogStream(j_data_stream), t_env=self) + elif changelog_mode is None: + return Table( + self._j_tenv.fromChangelogStream(j_data_stream, schema._j_schema), t_env=self) + else: + return Table( + self._j_tenv.fromChangelogStream( + j_data_stream, + schema._j_schema, + changelog_mode._j_changelog_mode), + t_env=self) + def to_data_stream(self, table: Table) -> DataStream: """ Converts the given Table into a DataStream. @@ -1989,6 +2043,100 @@ class StreamTableEnvironment(TableEnvironment): """ return DataStream(self._j_tenv.toDataStream(table._j_table)) + def to_changelog_stream(self, + table: Table, + target_schema: Schema = None, + changelog_mode: ChangelogMode = None) -> DataStream: + """ + Converts the given Table into a DataStream of changelog entries. + + Compared to :func:`to_data_stream`, this method produces instances of Row and sets the + RowKind flag that is contained in every record during runtime. The runtime behavior is + similar to that of a DynamicTableSink. + + If you don't specify the changelog_mode, the changelog containing all kinds of changes + (enumerated in RowKind) as the default ChangelogMode. + + The given Schema is used to configure the table runtime to convert columns and internal data + structures to the desired representation. The following example shows how to + convert a table column into a Row type. + + Example: + :: + + >>> table_env.to_changelog_stream( + ... table, + ... Schema.new_builder() \ + ... .column("id", DataTypes.BIGINT()) + ... .column("payload", DataTypes.ROW( + ... [DataTypes.FIELD("name", DataTypes.STRING()), + ... DataTypes.FIELD("age", DataTypes.INT())])) + ... .build()) + + Note that the type system of the table ecosystem is richer than the one of the DataStream + API. The table runtime will make sure to properly serialize the output records to the first + operator of the DataStream API. Afterwards, the Types semantics of the DataStream API need + to be considered. + + If the input table contains a single rowtime column, it will be propagated into a stream + record's timestamp. Watermarks will be propagated as well. + + If the rowtime should not be a concrete field in the final Row anymore, or the schema should + be symmetrical for both :func:`from_changelog_stream` and :func:`to_changelog_stream`, the + rowtime can also be declared as a metadata column that will be propagated into a stream + record's timestamp. It is possible to declare a schema without physical/regular columns. + In this case, those columns will be automatically derived and implicitly put at the + beginning of the schema declaration. + + The following examples illustrate common schema declarations and their semantics: + + Example: + :: + + given a Table of (id INT, name STRING, my_rowtime TIMESTAMP_LTZ(3)) + + === EXAMPLE 1 === + + no physical columns defined, they will be derived automatically, + the last derived physical column will be skipped in favor of the metadata column + + >>> Schema.new_builder() \ + ... .column_by_metadata("rowtime", "TIMESTAMP_LTZ(3)") \ + ... .build() + + equal to: CREATE TABLE (id INT, name STRING, rowtime TIMESTAMP_LTZ(3) METADATA) + + === EXAMPLE 2 === + + physical columns defined, all columns must be defined + + >>> Schema.new_builder() \ + ... .column("id", "INT") \ + ... .column("name", "STRING") \ + ... .column_by_metadata("rowtime", "TIMESTAMP_LTZ(3)") \ + ... .build() + + equal to: CREATE TABLE (id INT, name STRING, rowtime TIMESTAMP_LTZ(3) METADATA) + + :param table: The Table to convert. It can be updating or insert-only. + :param target_schema: The Schema that decides about the final external representation in + DataStream records. + :param changelog_mode: The required kinds of changes in the result changelog. An exception + will be thrown if the given updating table cannot be represented in this changelog mode. + :return: The converted changelog stream of Row. + """ + if target_schema is None: + return DataStream(self._j_tenv.toChangelogStream(table._j_table)) + elif changelog_mode is None: + return DataStream( + self._j_tenv.toChangelogStream(table._j_table, target_schema._j_schema)) + else: + return DataStream( + self._j_tenv.toChangelogStream( + table._j_table, + target_schema._j_schema, + changelog_mode._j_changelog_mode)) + def to_append_stream(self, table: Table, type_info: TypeInformation) -> DataStream: """ Converts the given Table into a DataStream of a specified type. The Table must only have diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py index eaec37d..075df72 100644 --- a/flink-python/pyflink/table/tests/test_table_environment_api.py +++ b/flink-python/pyflink/table/tests/test_table_environment_api.py @@ -34,7 +34,7 @@ from pyflink.table import DataTypes, CsvTableSink, StreamTableEnvironment, Envir Module, ResultKind, ModuleEntry from pyflink.table.catalog import ObjectPath, CatalogBaseTable from pyflink.table.explain_detail import ExplainDetail -from pyflink.table.expressions import col +from pyflink.table.expressions import col, source_watermark from pyflink.table.table_descriptor import TableDescriptor from pyflink.table.types import RowType, Row from pyflink.table.udf import udf @@ -381,6 +381,56 @@ class DataStreamConversionTestCases(PyFlinkTestCase): actual_results.sort() self.assertEqual(expected_results, actual_results) + def test_from_and_to_changelog_stream_event_time(self): + from pyflink.table import Schema + + self.env.set_parallelism(1) + ds = self.env.from_collection([(1, 42, "a"), (2, 5, "a"), (3, 1000, "c"), (100, 1000, "c")], + Types.ROW([Types.LONG(), Types.INT(), Types.STRING()])) + ds = ds.assign_timestamps_and_watermarks( + WatermarkStrategy.for_monotonous_timestamps() + .with_timestamp_assigner(MyTimestampAssigner())) + + changelog_stream = ds.map(lambda t: Row(t.f1, t.f2), + Types.ROW([Types.INT(), Types.STRING()])) + + # derive physical columns and add a rowtime + table = self.t_env.from_changelog_stream( + changelog_stream, + Schema.new_builder() + .column_by_metadata("rowtime", DataTypes.TIMESTAMP_LTZ(3)) + .column_by_expression("computed", str(col("f1").upper_case)) + .watermark("rowtime", str(source_watermark())) + .build()) + + self.t_env.create_temporary_view("t", table) + + # access and reorder columns + reordered = self.t_env.sql_query("SELECT computed, rowtime, f0 FROM t") + + # write out the rowtime column with fully declared schema + result = self.t_env.to_changelog_stream( + reordered, + Schema.new_builder() + .column("f1", DataTypes.STRING()) + .column_by_metadata("rowtime", DataTypes.TIMESTAMP_LTZ(3)) + .column_by_expression("ignored", str(col("f1").upper_case)) + .column("f0", DataTypes.INT()) + .build() + ) + + # test event time window and field access + result.key_by(lambda k: k.f1) \ + .window(MyTumblingEventTimeWindow()) \ + .apply(SumWindowFunction(), Types.TUPLE([Types.STRING(), Types.INT()])) \ + .add_sink(self.test_sink) + self.env.execute() + expected_results = ['(A,47)', '(C,1000)', '(C,1000)'] + actual_results = self.test_sink.get_results(False) + expected_results.sort() + actual_results.sort() + self.assertEqual(expected_results, actual_results) + def test_to_append_stream(self): self.env.set_parallelism(1) t_env = StreamTableEnvironment.create(
