dianfu commented on a change in pull request #16611:
URL: https://github.com/apache/flink/pull/16611#discussion_r679673463



##########
File path: flink-python/pyflink/table/table_environment.py
##########
@@ -1207,23 +1207,130 @@ def register_function(self, name: str, function: 
UserDefinedFunctionWrapper):
         else:
             self._j_tenv.registerFunction(name, java_function)
 
-    def create_temporary_view(self, view_path: str, table: Table):
+    def create_temporary_view(self,
+                              view_path: str,
+                              table_or_data_stream: Union[Table, DataStream],
+                              *fields_or_schema: Union[str, Expression, 
Schema]):
         """
-        Registers a :class:`~pyflink.table.Table` API object as a temporary 
view similar to SQL
-        temporary views.
+        1. When table_or_data_stream is a :class:`~pyflink.table.Table`:
 
-        Temporary objects can shadow permanent ones. If a permanent object in 
a given path exists,
-        it will be inaccessible in the current session. To make the permanent 
object available
-        again you can drop the corresponding temporary object.
+            Registers a :class:`~pyflink.table.Table` API object as a 
temporary view similar to SQL
+            temporary views.
+
+            Temporary objects can shadow permanent ones. If a permanent object 
in a given path
+            exists, it will be inaccessible in the current session. To make 
the permanent object
+            available again you can drop the corresponding temporary object.
+
+        2. When table_or_data_stream is a 
:class:`~pyflink.datastream.DataStream`:
+
+            2.1 When fields_or_schema is a str or a sequence of 
:class:`~pyflink.table.Expression`:
+
+                Creates a view from the given {@link DataStream} in a given 
path with specified
+                field names. Registered views can be referenced in SQL queries.
+
+                1. Reference input fields by name: All fields in the schema 
definition are
+                referenced by name (and possibly renamed using an alias (as). 
Moreover, we can
+                define proctime and rowtime attributes at arbitrary positions 
using arbitrary names
+                (except those that exist in the result schema). In this mode, 
fields can be
+                reordered and projected out. This mode can be used for any 
input type, including
+                POJOs.
+
+                Example:
+                ::
+
+                    >>> stream = ...
+                    # reorder the fields, rename the original 'f0' field to 
'name' and add
+                    # event-time attribute named 'rowtime'
+
+                    # use str
+                    >>> table_env.create_temporary_view(
+                    ...     "cat.db.myTable",
+                    ...     stream,
+                    ...     "f1, rowtime.rowtime, f0 as 'name'")
+
+                    # or use a sequence of expression
+                    >>> table_env.create_temporary_view(
+                    ...     "cat.db.myTable",
+                    ...     stream,
+                    ...     col("f1"),
+                    ...     col("rowtime").rowtime,
+                    ...     col("f0").alias('name'))
+
+                2. Reference input fields by position: In this mode, fields 
are simply renamed.
+                Event-time attributes can replace the field on their position 
in the input data
+                (if it is of correct type) or be appended at the end. Proctime 
attributes must be
+                appended at the end. This mode can only be used if the input 
type has a defined
+                field order (tuple, case class, Row) and none of the {@code 
fields} references a
+                field of the input type.
+
+                Example:
+                ::
+
+                    >>> stream = ...
+                    # rename the original fields to 'a' and 'b' and extract 
the internally attached
+                    # timestamp into an event-time attribute named 'rowtime'
+
+                    # use str
+                    >>> table_env.create_temporary_view(
+                    ...     "cat.db.myTable", stream, "a, b, rowtime.rowtime")
+
+                    # or use a sequence of expressions
+                    >>> table_env.create_temporary_view(
+                    ...     "cat.db.myTable",
+                    ...     stream,
+                    ...     col("a"),
+                    ...     col("b"),
+                    ...     col("rowtime").rowtime)
+
+                Temporary objects can shadow permanent ones. If a permanent 
object in a given path
+                exists, it will be inaccessible in the current session. To 
make the permanent object
+                available again you can drop the corresponding temporary 
object.
+
+            2.2 When fields_or_schema is a :class:`~pyflink.table.Schema`:
+
+                Creates a view from the given {@link DataStream} in a given 
path. Registered views
+                can be referenced in SQL queries.
+
+                See :func:`from_data_stream` for more information on how a
+                :class:`~pyflink.datastream.DataStream` is translated into a 
table.
+
+                Temporary objects can shadow permanent ones. If a permanent 
object in a given path
+                exists, it will be inaccessible in the current session. To 
make the permanent object
+                available again you can drop the corresponding temporary 
object.
+
+                .. note:: create_temporary_view by providing a Schema (case 
2.) was added from flink
+                    1.14.0.
 
         :param view_path: The path under which the view will be registered. 
See also the
                           :class:`~pyflink.table.TableEnvironment` class 
description for the format
                           of the path.
-        :param table: The view to register.
+        :param table_or_data_stream: The Table or DataStream out of which to 
create the view.
+        :param fields_or_schema: The fields expressions(str) to map original 
fields of the
+                        DataStream to the fields of the View or the customized 
schema for the final
+                        table.
 
         .. versionadded:: 1.10.0
         """
-        self._j_tenv.createTemporaryView(view_path, table._j_table)
+        if isinstance(table_or_data_stream, Table):
+            self._j_tenv.createTemporaryView(view_path, 
table_or_data_stream._j_table)
+        elif len(fields_or_schema) == 0:
+            self._j_tenv.createTemporaryView(view_path, 
table_or_data_stream._j_data_stream)
+        elif len(fields_or_schema) == 1 and isinstance(fields_or_schema[0], 
str):
+            self._j_tenv.createTemporaryView(
+                view_path,
+                table_or_data_stream._j_data_stream,
+                fields_or_schema[0])
+        elif len(fields_or_schema) == 1 and isinstance(fields_or_schema[0], 
Schema):
+            self._j_tenv.createTemporaryView(
+                view_path,
+                table_or_data_stream._j_data_stream,
+                fields_or_schema[0]._j_schema)
+        elif (len(fields_or_schema) > 0 and
+              all(isinstance(elem, Expression) for elem in fields_or_schema)):
+            self._j_tenv.createTemporaryView(
+                view_path,
+                table_or_data_stream._j_data_stream,
+                to_expression_jarray(fields_or_schema))

Review comment:
       Raise an exception if none of the above condition holds?

##########
File path: docs/content.zh/docs/dev/table/data_stream_api.md
##########
@@ -818,6 +914,132 @@ table.printSchema()
 // note: the watermark strategy is not shown due to the inserted column 
reordering projection
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+from pyflink.common.types import Row, Instant
+from pyflink.common.typeinfo import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.table import StreamTableEnvironment, Schema
+
+env = StreamExecutionEnvironment.get_execution_environment()
+t_env = StreamTableEnvironment.create(env)
+ds = env.from_collection([
+    Row("Alice", 12, Instant.of_epoch_milli(1000)),
+    Row("Bob", 5, Instant.of_epoch_milli(1001)),
+    Row("Alice", 10, Instant.of_epoch_milli(1002))],
+    type_info=Types.ROW_NAMED(['name', 'score', 'event_time'], 
[Types.STRING(), Types.INT(), Types.INSTANT()]))
+
+# === EXAMPLE 1 ===
+
+# derive all physical columns automatically
+
+table = t_env.from_data_stream(ds)
+table.print_schema()
+
+# prints:
+# (
+#  `name` STRING,
+#  `score` INT,
+#  `event_time` TIMESTAMP_LTZ(9)
+# )
+
+
+// === EXAMPLE 2 ===

Review comment:
       ```suggestion
   # === EXAMPLE 2 ===
   ```

##########
File path: docs/content.zh/docs/dev/python/table/conversion_of_data_stream.md
##########
@@ -0,0 +1,27 @@
+---
+title: "Table 和 DataStream 的互转"
+weight: 42
+type: docs
+aliases:

Review comment:
       aliases should be removed as it doesn't apply here

##########
File path: flink-python/pyflink/common/typeinfo.py
##########
@@ -182,6 +185,28 @@ def BIG_INT_TYPE_INFO():
     def BIG_DEC_TYPE_INFO():
         return BasicTypeInfo(BasicType.BIG_DEC)
 
+    @staticmethod
+    def INSTANT_TYPE_INFO():
+        return InstantTypeInfo(BasicType.INSTANT)
+
+
+class InstantTypeInfo(BasicTypeInfo):

Review comment:
       Add documentation about instant type in 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/datastream/data_types/#supported-data-types

##########
File path: flink-python/pyflink/common/types.py
##########
@@ -20,7 +20,7 @@
 
 from typing import List
 
-__all__ = ['Row', 'RowKind']
+__all__ = ['Row', 'RowKind', 'Instant']

Review comment:
       What about move Instant to pyflink/common/time.py

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/utils/python/PythonTableUtils.scala
##########
@@ -216,18 +216,19 @@ object PythonTableUtils {
       (obj: Any) => nullSafeConvert(obj) {
         case c if c.getClass.isArray =>
           val r = c.asInstanceOf[Array[_]]
-          if (r.length != rowType.getFieldTypes.length) {
+          if (r.length - 1 != rowType.getFieldTypes.length) {

Review comment:
       What's the purpose of this change? Is this fixing a bug?

##########
File path: flink-python/pyflink/table/connector.py
##########
@@ -0,0 +1,47 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from pyflink.java_gateway import get_gateway
+
+__all__ = ['ChangelogMode']
+
+

Review comment:
       What about changing the module name to "changelog_mode" or "changelog"?

##########
File path: flink-python/pyflink/fn_execution/coder_impl_fast.pyx
##########
@@ -262,9 +263,10 @@ cdef class IterableCoderImpl(LengthPrefixBaseCoderImpl):
         self._separated_with_end_message = separated_with_end_message
 
     cpdef encode_to_stream(self, value, LengthPrefixOutputStream 
output_stream):
-        for item in value:
-            self._field_coder.encode_to_stream(item, self._data_out_stream)
-            self._write_data_to_output_stream(output_stream)
+        if value:

Review comment:
       What's the purpose of this change?

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/utils/python/PythonTableUtils.scala
##########
@@ -147,8 +147,8 @@ object PythonTableUtils {
 
     case _ if dataType == org.apache.flink.api.common.typeinfo.Types.INSTANT =>
       (obj: Any) => nullSafeConvert(obj) {
-        case c: Long => Instant.ofEpochMilli(c / 1000)
-        case c: Int => Instant.ofEpochMilli(c.toLong / 1000)

Review comment:
       What's the purpose of this change?

##########
File path: docs/content.zh/docs/dev/table/data_stream_api.md
##########
@@ -818,6 +914,132 @@ table.printSchema()
 // note: the watermark strategy is not shown due to the inserted column 
reordering projection
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+from pyflink.common.types import Row, Instant
+from pyflink.common.typeinfo import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.table import StreamTableEnvironment, Schema
+
+env = StreamExecutionEnvironment.get_execution_environment()
+t_env = StreamTableEnvironment.create(env)
+ds = env.from_collection([
+    Row("Alice", 12, Instant.of_epoch_milli(1000)),
+    Row("Bob", 5, Instant.of_epoch_milli(1001)),
+    Row("Alice", 10, Instant.of_epoch_milli(1002))],
+    type_info=Types.ROW_NAMED(['name', 'score', 'event_time'], 
[Types.STRING(), Types.INT(), Types.INSTANT()]))
+
+# === EXAMPLE 1 ===
+
+# derive all physical columns automatically
+
+table = t_env.from_data_stream(ds)
+table.print_schema()
+
+# prints:
+# (
+#  `name` STRING,
+#  `score` INT,
+#  `event_time` TIMESTAMP_LTZ(9)
+# )
+
+
+// === EXAMPLE 2 ===
+
+// derive all physical columns automatically

Review comment:
       ditto

##########
File path: docs/content.zh/docs/dev/python/table/conversion_of_data_stream.md
##########
@@ -0,0 +1,27 @@
+---
+title: "Table 和 DataStream 的互转"

Review comment:
       ```suggestion
   title: "Table 和 DataStream 互转"
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to