This is an automated email from the ASF dual-hosted git repository.

hxb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a1ebbadf016fe8fc016347a3f7ab6d2d173390dd
Author: huangxingbo <[email protected]>
AuthorDate: Wed Jul 28 00:21:12 2021 +0800

    [FLINK-22911][python] Optimize from_collection/from_elements to support 
RowKind and Instant
    
    This closes #16611.
---
 flink-python/pyflink/common/typeinfo.py            | 41 +++++++++++++++-------
 flink-python/pyflink/table/types.py                | 35 ++++++++++++------
 .../planner/utils/python/PythonTableUtils.scala    | 13 +++----
 3 files changed, 60 insertions(+), 29 deletions(-)

diff --git a/flink-python/pyflink/common/typeinfo.py 
b/flink-python/pyflink/common/typeinfo.py
index 66f7d1d..95f4ea7 100644
--- a/flink-python/pyflink/common/typeinfo.py
+++ b/flink-python/pyflink/common/typeinfo.py
@@ -429,31 +429,48 @@ class RowTypeInfo(TypeInformation):
     def to_internal_type(self, obj):
         if obj is None:
             return
-
+        from pyflink.common import Row, RowKind
         if self._need_serialize_any_field:
             # Only calling to_internal_type function for fields that need 
conversion
             if isinstance(obj, dict):
-                return tuple(f.to_internal_type(obj.get(n)) if c else 
obj.get(n)
-                             for n, f, c in zip(self.get_field_names(), 
self._field_types,
-                                                self._need_conversion))
+                return (RowKind.INSERT.value,) + tuple(
+                    f.to_internal_type(obj.get(n)) if c else obj.get(n)
+                    for n, f, c in
+                    zip(self.get_field_names(), self._field_types, 
self._need_conversion))
+            elif isinstance(obj, Row) and hasattr(obj, "_fields"):
+                return (obj.get_row_kind().value,) + tuple(
+                    f.to_internal_type(obj.get(n)) if c else obj.get(n)
+                    for n, f, c in
+                    zip(self.get_field_names(), self._field_types, 
self._need_conversion))
+            elif isinstance(obj, Row):
+                return (obj.get_row_kind().value,) + tuple(
+                    f.to_internal_type(v) if c else v
+                    for f, v, c in zip(self._field_types, obj, 
self._need_conversion))
             elif isinstance(obj, (tuple, list)):
-                return tuple(f.to_internal_type(v) if c else v
-                             for f, v, c in zip(self._field_types, obj, 
self._need_conversion))
+                return (RowKind.INSERT.value,) + tuple(
+                    f.to_internal_type(v) if c else v
+                    for f, v, c in zip(self._field_types, obj, 
self._need_conversion))
             elif hasattr(obj, "__dict__"):
                 d = obj.__dict__
-                return tuple(f.to_internal_type(d.get(n)) if c else d.get(n)
-                             for n, f, c in zip(self.get_field_names(), 
self._field_types,
-                                                self._need_conversion))
+                return (RowKind.INSERT.value,) + tuple(
+                    f.to_internal_type(d.get(n)) if c else d.get(n)
+                    for n, f, c in
+                    zip(self.get_field_names(), self._field_types, 
self._need_conversion))
             else:
                 raise ValueError("Unexpected tuple %r with RowTypeInfo" % obj)
         else:
             if isinstance(obj, dict):
-                return tuple(obj.get(n) for n in self.get_field_names())
+                return (RowKind.INSERT.value,) + tuple(obj.get(n) for n in 
self.get_field_names())
+            elif isinstance(obj, Row) and hasattr(obj, "_fields"):
+                return (obj.get_row_kind().value,) + tuple(
+                    obj.get(n) for n in self.get_field_names())
+            elif isinstance(obj, Row):
+                return (obj.get_row_kind().value,) + tuple(obj)
             elif isinstance(obj, (list, tuple)):
-                return tuple(obj)
+                return (RowKind.INSERT.value,) + tuple(obj)
             elif hasattr(obj, "__dict__"):
                 d = obj.__dict__
-                return tuple(d.get(n) for n in self.get_field_names())
+                return (RowKind.INSERT.value,) + tuple(d.get(n) for n in 
self.get_field_names())
             else:
                 raise ValueError("Unexpected tuple %r with RowTypeInfo" % obj)
 
diff --git a/flink-python/pyflink/table/types.py 
b/flink-python/pyflink/table/types.py
index a38c536..1856d21 100644
--- a/flink-python/pyflink/table/types.py
+++ b/flink-python/pyflink/table/types.py
@@ -1245,27 +1245,40 @@ class RowType(DataType):
         if self._need_serialize_any_field:
             # Only calling to_sql_type function for fields that need conversion
             if isinstance(obj, dict):
-                return tuple(f.to_sql_type(obj.get(n)) if c else obj.get(n)
-                             for n, f, c in zip(self.names, self.fields, 
self._need_conversion))
+                return (RowKind.INSERT.value,) + tuple(
+                    f.to_sql_type(obj.get(n)) if c else obj.get(n)
+                    for n, f, c in zip(self.names, self.fields, 
self._need_conversion))
+            elif isinstance(obj, Row) and hasattr(obj, "_fields"):
+                return (obj.get_row_kind().value,) + tuple(
+                    f.to_sql_type(obj.get(n)) if c else obj.get(n)
+                    for n, f, c in zip(self.names, self.fields, 
self._need_conversion))
+            elif isinstance(obj, Row):
+                return (obj.get_row_kind().value, ) + tuple(
+                    f.to_sql_type(v) if c else v
+                    for f, v, c in zip(self.fields, obj, 
self._need_conversion))
             elif isinstance(obj, (tuple, list, Row)):
-                return tuple(f.to_sql_type(v) if c else v
-                             for f, v, c in zip(self.fields, obj, 
self._need_conversion))
+                return (RowKind.INSERT.value,) + tuple(
+                    f.to_sql_type(v) if c else v
+                    for f, v, c in zip(self.fields, obj, 
self._need_conversion))
             elif hasattr(obj, "__dict__"):
                 d = obj.__dict__
-                return tuple(f.to_sql_type(d.get(n)) if c else d.get(n)
-                             for n, f, c in zip(self.names, self.fields, 
self._need_conversion))
+                return (RowKind.INSERT.value,) + tuple(
+                    f.to_sql_type(d.get(n)) if c else d.get(n)
+                    for n, f, c in zip(self.names, self.fields, 
self._need_conversion))
             else:
                 raise ValueError("Unexpected tuple %r with RowType" % obj)
         else:
             if isinstance(obj, dict):
-                return tuple(obj.get(n) for n in self.names)
+                return (RowKind.INSERT.value,) + tuple(obj.get(n) for n in 
self.names)
             elif isinstance(obj, Row) and hasattr(obj, "_fields"):
-                return tuple(obj[n] for n in self.names)
-            elif isinstance(obj, (list, tuple, Row)):
-                return tuple(obj)
+                return (obj.get_row_kind().value,) + tuple(obj[n] for n in 
self.names)
+            elif isinstance(obj, Row):
+                return (obj.get_row_kind().value,) + tuple(obj)
+            elif isinstance(obj, (list, tuple)):
+                return (RowKind.INSERT.value,) + tuple(obj)
             elif hasattr(obj, "__dict__"):
                 d = obj.__dict__
-                return tuple(d.get(n) for n in self.names)
+                return (RowKind.INSERT.value,) + tuple(d.get(n) for n in 
self.names)
             else:
                 raise ValueError("Unexpected tuple %r with RowType" % obj)
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/utils/python/PythonTableUtils.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/utils/python/PythonTableUtils.scala
index 21684de..da39d48 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/utils/python/PythonTableUtils.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/utils/python/PythonTableUtils.scala
@@ -33,7 +33,7 @@ import org.apache.flink.api.java.typeutils.{MapTypeInfo, 
ObjectArrayTypeInfo, Ro
 import org.apache.flink.core.io.InputSplit
 import org.apache.flink.table.api.{TableSchema, Types}
 import org.apache.flink.table.sources.InputFormatTableSource
-import org.apache.flink.types.Row
+import org.apache.flink.types.{Row, RowKind}
 
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
@@ -216,18 +216,19 @@ object PythonTableUtils {
       (obj: Any) => nullSafeConvert(obj) {
         case c if c.getClass.isArray =>
           val r = c.asInstanceOf[Array[_]]
-          if (r.length != rowType.getFieldTypes.length) {
+          if (r.length - 1 != rowType.getFieldTypes.length) {
             throw new IllegalStateException(
               s"Input row doesn't have expected number of values required by 
the schema. " +
-                s"${rowType.getFieldTypes.length} fields are required while 
${r.length} " +
+                s"${rowType.getFieldTypes.length} fields are required while 
${r.length - 1} " +
                 s"values are provided."
               )
           }
 
-          val row = new Row(r.length)
-          var i = 0
+          val row = new Row(r.length - 1)
+          
row.setKind(RowKind.fromByteValue(r(0).asInstanceOf[Integer].byteValue()))
+          var i = 1
           while (i < r.length) {
-            row.setField(i, fieldsFromJava(i)(r(i)))
+            row.setField(i - 1, fieldsFromJava(i - 1)(r(i)))
             i += 1
           }
           row

Reply via email to