This is an automated email from the ASF dual-hosted git repository.

hxb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8b4a5bf124db58ebe1e3b4787b7dfea91290ca29
Author: huangxingbo <[email protected]>
AuthorDate: Fri Jul 23 18:09:27 2021 +0800

    [FLINK-22911][python] Add to_changelog_stream/from_changelog_stream to 
StreamTableEnvironment in Python Table API
    
    This closes #16611.
---
 flink-python/pyflink/table/__init__.py             |   4 +-
 flink-python/pyflink/table/changelog_mode.py       |  47 +++++++
 flink-python/pyflink/table/expressions.py          |   2 +-
 flink-python/pyflink/table/table_environment.py    | 150 ++++++++++++++++++++-
 .../table/tests/test_table_environment_api.py      |  52 ++++++-
 5 files changed, 251 insertions(+), 4 deletions(-)

diff --git a/flink-python/pyflink/table/__init__.py 
b/flink-python/pyflink/table/__init__.py
index bbba3cf..325cef1 100644
--- a/flink-python/pyflink/table/__init__.py
+++ b/flink-python/pyflink/table/__init__.py
@@ -68,6 +68,7 @@ Important classes of Flink Table API:
 """
 from __future__ import absolute_import
 
+from pyflink.table.changelog_mode import ChangelogMode
 from pyflink.table.data_view import DataView, ListView, MapView
 from pyflink.table.environment_settings import EnvironmentSettings
 from pyflink.table.explain_detail import ExplainDetail
@@ -125,5 +126,6 @@ __all__ = [
     'TableAggregateFunction',
     'UserDefinedType',
     'WindowGroupedTable',
-    'WriteMode'
+    'WriteMode',
+    'ChangelogMode'
 ]
diff --git a/flink-python/pyflink/table/changelog_mode.py 
b/flink-python/pyflink/table/changelog_mode.py
new file mode 100644
index 0000000..bd63e1d
--- /dev/null
+++ b/flink-python/pyflink/table/changelog_mode.py
@@ -0,0 +1,47 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from pyflink.java_gateway import get_gateway
+
+__all__ = ['ChangelogMode']
+
+
+class ChangelogMode(object):
+    """
+    The set of changes contained in a changelog.
+    """
+
+    def __init__(self, j_changelog_mode):
+        self._j_changelog_mode = j_changelog_mode
+
+    @staticmethod
+    def insert_only():
+        gateway = get_gateway()
+        return ChangelogMode(
+            
gateway.jvm.org.apache.flink.table.connector.ChangelogMode.insertOnly())
+
+    @staticmethod
+    def upsert():
+        gateway = get_gateway()
+        return ChangelogMode(
+            
gateway.jvm.org.apache.flink.table.connector.ChangelogMode.upsert())
+
+    @staticmethod
+    def all():
+        gateway = get_gateway()
+        return ChangelogMode(
+            gateway.jvm.org.apache.flink.table.connector.ChangelogMode.all())
diff --git a/flink-python/pyflink/table/expressions.py 
b/flink-python/pyflink/table/expressions.py
index c169f31..69d9d7d 100644
--- a/flink-python/pyflink/table/expressions.py
+++ b/flink-python/pyflink/table/expressions.py
@@ -30,7 +30,7 @@ __all__ = ['if_then_else', 'lit', 'col', 'range_', 'and_', 
'or_', 'UNBOUNDED_ROW
            'temporal_overlaps', 'date_format', 'timestamp_diff', 'array', 
'row', 'map_',
            'row_interval', 'pi', 'e', 'rand', 'rand_integer', 'atan2', 
'negative', 'concat',
            'concat_ws', 'uuid', 'null_of', 'log', 'with_columns', 
'without_columns', 'call',
-           'call_sql']
+           'call_sql', 'source_watermark']
 
 
 def _leaf_op(op_name: str) -> Expression:
diff --git a/flink-python/pyflink/table/table_environment.py 
b/flink-python/pyflink/table/table_environment.py
index fcb9116..88afa5c 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -33,7 +33,7 @@ from pyflink.common import JobExecutionResult
 from pyflink.java_gateway import get_gateway
 from pyflink.serializers import BatchedSerializer, PickleSerializer
 from pyflink.table import Table, EnvironmentSettings, Expression, 
ExplainDetail, \
-    Module, ModuleEntry, TableSink, Schema
+    Module, ModuleEntry, TableSink, Schema, ChangelogMode
 from pyflink.table.catalog import Catalog
 from pyflink.table.serializers import ArrowSerializer
 from pyflink.table.statement_set import StatementSet
@@ -1967,6 +1967,60 @@ class StreamTableEnvironment(TableEnvironment):
                 j_data_stream, fields_or_schema[0]._j_schema), t_env=self)
         raise ValueError("Invalid arguments for 'fields': %r" % 
fields_or_schema)
 
+    def from_changelog_stream(self,
+                              data_stream: DataStream,
+                              schema: Schema = None,
+                              changelog_mode: ChangelogMode = None) -> Table:
+        """
+        Converts the given DataStream of changelog entries into a Table.
+
+        Compared to :func:`from_data_stream`, this method consumes instances 
of Row and evaluates
+        the RowKind flag that is contained in every record during runtime. The 
runtime behavior is
+        similar to that of a DynamicTableSource.
+
+        If you don't specify the changelog_mode, the changelog containing all 
kinds of changes
+        (enumerated in RowKind) as the default ChangelogMode.
+
+        Column names and types of the Table are automatically derived from the 
TypeInformation of
+        the DataStream. If the outermost record's TypeInformation is a 
CompositeType, it will be
+        flattened in the first level. Composite nested fields will not be 
accessible.
+
+        By default, the stream record's timestamp and watermarks are not 
propagated unless
+        explicitly declared.
+
+        This method allows to declare a Schema for the resulting table. The 
declaration is similar
+        to a {@code CREATE TABLE} DDL in SQL and allows to:
+
+            1. enrich or overwrite automatically derived columns with a custom 
DataType
+            2. reorder columns
+            3. add computed or metadata columns next to the physical columns
+            4. access a stream record's timestamp
+            5. declare a watermark strategy or propagate the DataStream 
watermarks
+            6. declare a primary key
+
+        See :func:`from_data_stream` for more information and examples of how 
to declare a Schema.
+
+        :param data_stream: The changelog stream of Row.
+        :param schema: The customized schema for the final table.
+        :param changelog_mode: The expected kinds of changes in the incoming 
changelog.
+        :return: The converted Table.
+        """
+        j_data_stream = data_stream._j_data_stream
+        JPythonConfigUtil = 
get_gateway().jvm.org.apache.flink.python.util.PythonConfigUtil
+        
JPythonConfigUtil.configPythonOperator(j_data_stream.getExecutionEnvironment())
+        if schema is None:
+            return Table(self._j_tenv.fromChangelogStream(j_data_stream), 
t_env=self)
+        elif changelog_mode is None:
+            return Table(
+                self._j_tenv.fromChangelogStream(j_data_stream, 
schema._j_schema), t_env=self)
+        else:
+            return Table(
+                self._j_tenv.fromChangelogStream(
+                    j_data_stream,
+                    schema._j_schema,
+                    changelog_mode._j_changelog_mode),
+                t_env=self)
+
     def to_data_stream(self, table: Table) -> DataStream:
         """
         Converts the given Table into a DataStream.
@@ -1989,6 +2043,100 @@ class StreamTableEnvironment(TableEnvironment):
         """
         return DataStream(self._j_tenv.toDataStream(table._j_table))
 
+    def to_changelog_stream(self,
+                            table: Table,
+                            target_schema: Schema = None,
+                            changelog_mode: ChangelogMode = None) -> 
DataStream:
+        """
+        Converts the given Table into a DataStream of changelog entries.
+
+        Compared to :func:`to_data_stream`, this method produces instances of 
Row and sets the
+        RowKind flag that is contained in every record during runtime. The 
runtime behavior is
+        similar to that of a DynamicTableSink.
+
+        If you don't specify the changelog_mode, the changelog containing all 
kinds of changes
+        (enumerated in RowKind) as the default ChangelogMode.
+
+        The given Schema is used to configure the table runtime to convert 
columns and internal data
+        structures to the desired representation. The following example shows 
how to
+        convert a table column into a Row type.
+
+        Example:
+        ::
+
+            >>> table_env.to_changelog_stream(
+            ...     table,
+            ...     Schema.new_builder() \
+            ...         .column("id", DataTypes.BIGINT())
+            ...         .column("payload", DataTypes.ROW(
+            ...                                     [DataTypes.FIELD("name", 
DataTypes.STRING()),
+            ...                                     DataTypes.FIELD("age", 
DataTypes.INT())]))
+            ...         .build())
+
+        Note that the type system of the table ecosystem is richer than the 
one of the DataStream
+        API. The table runtime will make sure to properly serialize the output 
records to the first
+        operator of the DataStream API. Afterwards, the Types semantics of the 
DataStream API need
+        to be considered.
+
+        If the input table contains a single rowtime column, it will be 
propagated into a stream
+        record's timestamp. Watermarks will be propagated as well.
+
+        If the rowtime should not be a concrete field in the final Row 
anymore, or the schema should
+        be symmetrical for both :func:`from_changelog_stream` and 
:func:`to_changelog_stream`, the
+        rowtime can also be declared as a metadata column that will be 
propagated into a stream
+        record's timestamp. It is possible to declare a schema without 
physical/regular columns.
+        In this case, those columns will be automatically derived and 
implicitly put at the
+        beginning of the schema declaration.
+
+        The following examples illustrate common schema declarations and their 
semantics:
+
+        Example:
+        ::
+
+            given a Table of (id INT, name STRING, my_rowtime TIMESTAMP_LTZ(3))
+
+            === EXAMPLE 1 ===
+
+            no physical columns defined, they will be derived automatically,
+            the last derived physical column will be skipped in favor of the 
metadata column
+
+            >>> Schema.new_builder() \
+            ...     .column_by_metadata("rowtime", "TIMESTAMP_LTZ(3)") \
+            ...     .build()
+
+            equal to: CREATE TABLE (id INT, name STRING, rowtime 
TIMESTAMP_LTZ(3) METADATA)
+
+            === EXAMPLE 2 ===
+
+            physical columns defined, all columns must be defined
+
+            >>> Schema.new_builder() \
+            ...     .column("id", "INT") \
+            ...     .column("name", "STRING") \
+            ...     .column_by_metadata("rowtime", "TIMESTAMP_LTZ(3)") \
+            ...     .build()
+
+            equal to: CREATE TABLE (id INT, name STRING, rowtime 
TIMESTAMP_LTZ(3) METADATA)
+
+        :param table: The Table to convert. It can be updating or insert-only.
+        :param target_schema: The Schema that decides about the final external 
representation in
+            DataStream records.
+        :param changelog_mode: The required kinds of changes in the result 
changelog. An exception
+            will be thrown if the given updating table cannot be represented 
in this changelog mode.
+        :return: The converted changelog stream of Row.
+        """
+        if target_schema is None:
+            return DataStream(self._j_tenv.toChangelogStream(table._j_table))
+        elif changelog_mode is None:
+            return DataStream(
+                self._j_tenv.toChangelogStream(table._j_table, 
target_schema._j_schema))
+        else:
+            return DataStream(
+                self._j_tenv.toChangelogStream(
+                    table._j_table,
+                    target_schema._j_schema,
+                    changelog_mode._j_changelog_mode))
+
     def to_append_stream(self, table: Table, type_info: TypeInformation) -> 
DataStream:
         """
         Converts the given Table into a DataStream of a specified type. The 
Table must only have
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py 
b/flink-python/pyflink/table/tests/test_table_environment_api.py
index eaec37d..075df72 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -34,7 +34,7 @@ from pyflink.table import DataTypes, CsvTableSink, 
StreamTableEnvironment, Envir
     Module, ResultKind, ModuleEntry
 from pyflink.table.catalog import ObjectPath, CatalogBaseTable
 from pyflink.table.explain_detail import ExplainDetail
-from pyflink.table.expressions import col
+from pyflink.table.expressions import col, source_watermark
 from pyflink.table.table_descriptor import TableDescriptor
 from pyflink.table.types import RowType, Row
 from pyflink.table.udf import udf
@@ -381,6 +381,56 @@ class DataStreamConversionTestCases(PyFlinkTestCase):
         actual_results.sort()
         self.assertEqual(expected_results, actual_results)
 
+    def test_from_and_to_changelog_stream_event_time(self):
+        from pyflink.table import Schema
+
+        self.env.set_parallelism(1)
+        ds = self.env.from_collection([(1, 42, "a"), (2, 5, "a"), (3, 1000, 
"c"), (100, 1000, "c")],
+                                      Types.ROW([Types.LONG(), Types.INT(), 
Types.STRING()]))
+        ds = ds.assign_timestamps_and_watermarks(
+            WatermarkStrategy.for_monotonous_timestamps()
+            .with_timestamp_assigner(MyTimestampAssigner()))
+
+        changelog_stream = ds.map(lambda t: Row(t.f1, t.f2),
+                                  Types.ROW([Types.INT(), Types.STRING()]))
+
+        # derive physical columns and add a rowtime
+        table = self.t_env.from_changelog_stream(
+            changelog_stream,
+            Schema.new_builder()
+                  .column_by_metadata("rowtime", DataTypes.TIMESTAMP_LTZ(3))
+                  .column_by_expression("computed", str(col("f1").upper_case))
+                  .watermark("rowtime", str(source_watermark()))
+                  .build())
+
+        self.t_env.create_temporary_view("t", table)
+
+        # access and reorder columns
+        reordered = self.t_env.sql_query("SELECT computed, rowtime, f0 FROM t")
+
+        # write out the rowtime column with fully declared schema
+        result = self.t_env.to_changelog_stream(
+            reordered,
+            Schema.new_builder()
+            .column("f1", DataTypes.STRING())
+            .column_by_metadata("rowtime", DataTypes.TIMESTAMP_LTZ(3))
+            .column_by_expression("ignored", str(col("f1").upper_case))
+            .column("f0", DataTypes.INT())
+            .build()
+        )
+
+        # test event time window and field access
+        result.key_by(lambda k: k.f1) \
+            .window(MyTumblingEventTimeWindow()) \
+            .apply(SumWindowFunction(), Types.TUPLE([Types.STRING(), 
Types.INT()])) \
+            .add_sink(self.test_sink)
+        self.env.execute()
+        expected_results = ['(A,47)', '(C,1000)', '(C,1000)']
+        actual_results = self.test_sink.get_results(False)
+        expected_results.sort()
+        actual_results.sort()
+        self.assertEqual(expected_results, actual_results)
+
     def test_to_append_stream(self):
         self.env.set_parallelism(1)
         t_env = StreamTableEnvironment.create(

Reply via email to