dianfu commented on a change in pull request #16611:
URL: https://github.com/apache/flink/pull/16611#discussion_r679673463
##########
File path: flink-python/pyflink/table/table_environment.py
##########
@@ -1207,23 +1207,130 @@ def register_function(self, name: str, function:
UserDefinedFunctionWrapper):
else:
self._j_tenv.registerFunction(name, java_function)
- def create_temporary_view(self, view_path: str, table: Table):
+ def create_temporary_view(self,
+ view_path: str,
+ table_or_data_stream: Union[Table, DataStream],
+ *fields_or_schema: Union[str, Expression,
Schema]):
"""
- Registers a :class:`~pyflink.table.Table` API object as a temporary
view similar to SQL
- temporary views.
+ 1. When table_or_data_stream is a :class:`~pyflink.table.Table`:
- Temporary objects can shadow permanent ones. If a permanent object in
a given path exists,
- it will be inaccessible in the current session. To make the permanent
object available
- again you can drop the corresponding temporary object.
+ Registers a :class:`~pyflink.table.Table` API object as a
temporary view similar to SQL
+ temporary views.
+
+ Temporary objects can shadow permanent ones. If a permanent object
in a given path
+ exists, it will be inaccessible in the current session. To make
the permanent object
+ available again you can drop the corresponding temporary object.
+
+ 2. When table_or_data_stream is a
:class:`~pyflink.datastream.DataStream`:
+
+ 2.1 When fields_or_schema is a str or a sequence of
:class:`~pyflink.table.Expression`:
+
+ Creates a view from the given {@link DataStream} in a given
path with specified
+ field names. Registered views can be referenced in SQL queries.
+
+ 1. Reference input fields by name: All fields in the schema
definition are
+ referenced by name (and possibly renamed using an alias (as).
Moreover, we can
+ define proctime and rowtime attributes at arbitrary positions
using arbitrary names
+ (except those that exist in the result schema). In this mode,
fields can be
+ reordered and projected out. This mode can be used for any
input type, including
+ POJOs.
+
+ Example:
+ ::
+
+ >>> stream = ...
+ # reorder the fields, rename the original 'f0' field to
'name' and add
+ # event-time attribute named 'rowtime'
+
+ # use str
+ >>> table_env.create_temporary_view(
+ ... "cat.db.myTable",
+ ... stream,
+ ... "f1, rowtime.rowtime, f0 as 'name'")
+
+ # or use a sequence of expression
+ >>> table_env.create_temporary_view(
+ ... "cat.db.myTable",
+ ... stream,
+ ... col("f1"),
+ ... col("rowtime").rowtime,
+ ... col("f0").alias('name'))
+
+ 2. Reference input fields by position: In this mode, fields
are simply renamed.
+ Event-time attributes can replace the field on their position
in the input data
+ (if it is of correct type) or be appended at the end. Proctime
attributes must be
+ appended at the end. This mode can only be used if the input
type has a defined
+ field order (tuple, case class, Row) and none of the {@code
fields} references a
+ field of the input type.
+
+ Example:
+ ::
+
+ >>> stream = ...
+ # rename the original fields to 'a' and 'b' and extract
the internally attached
+ # timestamp into an event-time attribute named 'rowtime'
+
+ # use str
+ >>> table_env.create_temporary_view(
+ ... "cat.db.myTable", stream, "a, b, rowtime.rowtime")
+
+ # or use a sequence of expressions
+ >>> table_env.create_temporary_view(
+ ... "cat.db.myTable",
+ ... stream,
+ ... col("a"),
+ ... col("b"),
+ ... col("rowtime").rowtime)
+
+ Temporary objects can shadow permanent ones. If a permanent
object in a given path
+ exists, it will be inaccessible in the current session. To
make the permanent object
+ available again you can drop the corresponding temporary
object.
+
+ 2.2 When fields_or_schema is a :class:`~pyflink.table.Schema`:
+
+ Creates a view from the given {@link DataStream} in a given
path. Registered views
+ can be referenced in SQL queries.
+
+ See :func:`from_data_stream` for more information on how a
+ :class:`~pyflink.datastream.DataStream` is translated into a
table.
+
+ Temporary objects can shadow permanent ones. If a permanent
object in a given path
+ exists, it will be inaccessible in the current session. To
make the permanent object
+ available again you can drop the corresponding temporary
object.
+
+ .. note:: create_temporary_view by providing a Schema (case
2.) was added from flink
+ 1.14.0.
:param view_path: The path under which the view will be registered.
See also the
:class:`~pyflink.table.TableEnvironment` class
description for the format
of the path.
- :param table: The view to register.
+ :param table_or_data_stream: The Table or DataStream out of which to
create the view.
+ :param fields_or_schema: The fields expressions(str) to map original
fields of the
+ DataStream to the fields of the View or the customized
schema for the final
+ table.
.. versionadded:: 1.10.0
"""
- self._j_tenv.createTemporaryView(view_path, table._j_table)
+ if isinstance(table_or_data_stream, Table):
+ self._j_tenv.createTemporaryView(view_path,
table_or_data_stream._j_table)
+ elif len(fields_or_schema) == 0:
+ self._j_tenv.createTemporaryView(view_path,
table_or_data_stream._j_data_stream)
+ elif len(fields_or_schema) == 1 and isinstance(fields_or_schema[0],
str):
+ self._j_tenv.createTemporaryView(
+ view_path,
+ table_or_data_stream._j_data_stream,
+ fields_or_schema[0])
+ elif len(fields_or_schema) == 1 and isinstance(fields_or_schema[0],
Schema):
+ self._j_tenv.createTemporaryView(
+ view_path,
+ table_or_data_stream._j_data_stream,
+ fields_or_schema[0]._j_schema)
+ elif (len(fields_or_schema) > 0 and
+ all(isinstance(elem, Expression) for elem in fields_or_schema)):
+ self._j_tenv.createTemporaryView(
+ view_path,
+ table_or_data_stream._j_data_stream,
+ to_expression_jarray(fields_or_schema))
Review comment:
Raise an exception if none of the above condition holds?
##########
File path: docs/content.zh/docs/dev/table/data_stream_api.md
##########
@@ -818,6 +914,132 @@ table.printSchema()
// note: the watermark strategy is not shown due to the inserted column
reordering projection
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+from pyflink.common.types import Row, Instant
+from pyflink.common.typeinfo import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.table import StreamTableEnvironment, Schema
+
+env = StreamExecutionEnvironment.get_execution_environment()
+t_env = StreamTableEnvironment.create(env)
+ds = env.from_collection([
+ Row("Alice", 12, Instant.of_epoch_milli(1000)),
+ Row("Bob", 5, Instant.of_epoch_milli(1001)),
+ Row("Alice", 10, Instant.of_epoch_milli(1002))],
+ type_info=Types.ROW_NAMED(['name', 'score', 'event_time'],
[Types.STRING(), Types.INT(), Types.INSTANT()]))
+
+# === EXAMPLE 1 ===
+
+# derive all physical columns automatically
+
+table = t_env.from_data_stream(ds)
+table.print_schema()
+
+# prints:
+# (
+# `name` STRING,
+# `score` INT,
+# `event_time` TIMESTAMP_LTZ(9)
+# )
+
+
+// === EXAMPLE 2 ===
Review comment:
```suggestion
# === EXAMPLE 2 ===
```
##########
File path: docs/content.zh/docs/dev/python/table/conversion_of_data_stream.md
##########
@@ -0,0 +1,27 @@
+---
+title: "Table 和 DataStream 的互转"
+weight: 42
+type: docs
+aliases:
Review comment:
aliases should be removed as it doesn't apply here
##########
File path: flink-python/pyflink/common/typeinfo.py
##########
@@ -182,6 +185,28 @@ def BIG_INT_TYPE_INFO():
def BIG_DEC_TYPE_INFO():
return BasicTypeInfo(BasicType.BIG_DEC)
+ @staticmethod
+ def INSTANT_TYPE_INFO():
+ return InstantTypeInfo(BasicType.INSTANT)
+
+
+class InstantTypeInfo(BasicTypeInfo):
Review comment:
Add documentation about instant type in
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/datastream/data_types/#supported-data-types
##########
File path: flink-python/pyflink/common/types.py
##########
@@ -20,7 +20,7 @@
from typing import List
-__all__ = ['Row', 'RowKind']
+__all__ = ['Row', 'RowKind', 'Instant']
Review comment:
What about move Instant to pyflink/common/time.py
##########
File path:
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/utils/python/PythonTableUtils.scala
##########
@@ -216,18 +216,19 @@ object PythonTableUtils {
(obj: Any) => nullSafeConvert(obj) {
case c if c.getClass.isArray =>
val r = c.asInstanceOf[Array[_]]
- if (r.length != rowType.getFieldTypes.length) {
+ if (r.length - 1 != rowType.getFieldTypes.length) {
Review comment:
What's the purpose of this change? Is this fixing a bug?
##########
File path: flink-python/pyflink/table/connector.py
##########
@@ -0,0 +1,47 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from pyflink.java_gateway import get_gateway
+
+__all__ = ['ChangelogMode']
+
+
Review comment:
What about changing the module name to "changelog_mode" or "changelog"?
##########
File path: flink-python/pyflink/fn_execution/coder_impl_fast.pyx
##########
@@ -262,9 +263,10 @@ cdef class IterableCoderImpl(LengthPrefixBaseCoderImpl):
self._separated_with_end_message = separated_with_end_message
cpdef encode_to_stream(self, value, LengthPrefixOutputStream
output_stream):
- for item in value:
- self._field_coder.encode_to_stream(item, self._data_out_stream)
- self._write_data_to_output_stream(output_stream)
+ if value:
Review comment:
What's the purpose of this change?
##########
File path:
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/utils/python/PythonTableUtils.scala
##########
@@ -147,8 +147,8 @@ object PythonTableUtils {
case _ if dataType == org.apache.flink.api.common.typeinfo.Types.INSTANT =>
(obj: Any) => nullSafeConvert(obj) {
- case c: Long => Instant.ofEpochMilli(c / 1000)
- case c: Int => Instant.ofEpochMilli(c.toLong / 1000)
Review comment:
What's the purpose of this change?
##########
File path: docs/content.zh/docs/dev/table/data_stream_api.md
##########
@@ -818,6 +914,132 @@ table.printSchema()
// note: the watermark strategy is not shown due to the inserted column
reordering projection
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+from pyflink.common.types import Row, Instant
+from pyflink.common.typeinfo import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.table import StreamTableEnvironment, Schema
+
+env = StreamExecutionEnvironment.get_execution_environment()
+t_env = StreamTableEnvironment.create(env)
+ds = env.from_collection([
+ Row("Alice", 12, Instant.of_epoch_milli(1000)),
+ Row("Bob", 5, Instant.of_epoch_milli(1001)),
+ Row("Alice", 10, Instant.of_epoch_milli(1002))],
+ type_info=Types.ROW_NAMED(['name', 'score', 'event_time'],
[Types.STRING(), Types.INT(), Types.INSTANT()]))
+
+# === EXAMPLE 1 ===
+
+# derive all physical columns automatically
+
+table = t_env.from_data_stream(ds)
+table.print_schema()
+
+# prints:
+# (
+# `name` STRING,
+# `score` INT,
+# `event_time` TIMESTAMP_LTZ(9)
+# )
+
+
+// === EXAMPLE 2 ===
+
+// derive all physical columns automatically
Review comment:
ditto
##########
File path: docs/content.zh/docs/dev/python/table/conversion_of_data_stream.md
##########
@@ -0,0 +1,27 @@
+---
+title: "Table 和 DataStream 的互转"
Review comment:
```suggestion
title: "Table 和 DataStream 互转"
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]