This is an automated email from the ASF dual-hosted git repository. hxb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit a1ebbadf016fe8fc016347a3f7ab6d2d173390dd Author: huangxingbo <[email protected]> AuthorDate: Wed Jul 28 00:21:12 2021 +0800 [FLINK-22911][python] Optimize from_collection/from_elements to support RowKind and Instant This closes #16611. --- flink-python/pyflink/common/typeinfo.py | 41 +++++++++++++++------- flink-python/pyflink/table/types.py | 35 ++++++++++++------ .../planner/utils/python/PythonTableUtils.scala | 13 +++---- 3 files changed, 60 insertions(+), 29 deletions(-) diff --git a/flink-python/pyflink/common/typeinfo.py b/flink-python/pyflink/common/typeinfo.py index 66f7d1d..95f4ea7 100644 --- a/flink-python/pyflink/common/typeinfo.py +++ b/flink-python/pyflink/common/typeinfo.py @@ -429,31 +429,48 @@ class RowTypeInfo(TypeInformation): def to_internal_type(self, obj): if obj is None: return - + from pyflink.common import Row, RowKind if self._need_serialize_any_field: # Only calling to_internal_type function for fields that need conversion if isinstance(obj, dict): - return tuple(f.to_internal_type(obj.get(n)) if c else obj.get(n) - for n, f, c in zip(self.get_field_names(), self._field_types, - self._need_conversion)) + return (RowKind.INSERT.value,) + tuple( + f.to_internal_type(obj.get(n)) if c else obj.get(n) + for n, f, c in + zip(self.get_field_names(), self._field_types, self._need_conversion)) + elif isinstance(obj, Row) and hasattr(obj, "_fields"): + return (obj.get_row_kind().value,) + tuple( + f.to_internal_type(obj.get(n)) if c else obj.get(n) + for n, f, c in + zip(self.get_field_names(), self._field_types, self._need_conversion)) + elif isinstance(obj, Row): + return (obj.get_row_kind().value,) + tuple( + f.to_internal_type(v) if c else v + for f, v, c in zip(self._field_types, obj, self._need_conversion)) elif isinstance(obj, (tuple, list)): - return tuple(f.to_internal_type(v) if c else v - for f, v, c in zip(self._field_types, obj, self._need_conversion)) + return (RowKind.INSERT.value,) + tuple( + f.to_internal_type(v) if c else v + for f, v, c in zip(self._field_types, obj, self._need_conversion)) elif hasattr(obj, "__dict__"): d = obj.__dict__ - return tuple(f.to_internal_type(d.get(n)) if c else d.get(n) - for n, f, c in zip(self.get_field_names(), self._field_types, - self._need_conversion)) + return (RowKind.INSERT.value,) + tuple( + f.to_internal_type(d.get(n)) if c else d.get(n) + for n, f, c in + zip(self.get_field_names(), self._field_types, self._need_conversion)) else: raise ValueError("Unexpected tuple %r with RowTypeInfo" % obj) else: if isinstance(obj, dict): - return tuple(obj.get(n) for n in self.get_field_names()) + return (RowKind.INSERT.value,) + tuple(obj.get(n) for n in self.get_field_names()) + elif isinstance(obj, Row) and hasattr(obj, "_fields"): + return (obj.get_row_kind().value,) + tuple( + obj.get(n) for n in self.get_field_names()) + elif isinstance(obj, Row): + return (obj.get_row_kind().value,) + tuple(obj) elif isinstance(obj, (list, tuple)): - return tuple(obj) + return (RowKind.INSERT.value,) + tuple(obj) elif hasattr(obj, "__dict__"): d = obj.__dict__ - return tuple(d.get(n) for n in self.get_field_names()) + return (RowKind.INSERT.value,) + tuple(d.get(n) for n in self.get_field_names()) else: raise ValueError("Unexpected tuple %r with RowTypeInfo" % obj) diff --git a/flink-python/pyflink/table/types.py b/flink-python/pyflink/table/types.py index a38c536..1856d21 100644 --- a/flink-python/pyflink/table/types.py +++ b/flink-python/pyflink/table/types.py @@ -1245,27 +1245,40 @@ class RowType(DataType): if self._need_serialize_any_field: # Only calling to_sql_type function for fields that need conversion if isinstance(obj, dict): - return tuple(f.to_sql_type(obj.get(n)) if c else obj.get(n) - for n, f, c in zip(self.names, self.fields, self._need_conversion)) + return (RowKind.INSERT.value,) + tuple( + f.to_sql_type(obj.get(n)) if c else obj.get(n) + for n, f, c in zip(self.names, self.fields, self._need_conversion)) + elif isinstance(obj, Row) and hasattr(obj, "_fields"): + return (obj.get_row_kind().value,) + tuple( + f.to_sql_type(obj.get(n)) if c else obj.get(n) + for n, f, c in zip(self.names, self.fields, self._need_conversion)) + elif isinstance(obj, Row): + return (obj.get_row_kind().value, ) + tuple( + f.to_sql_type(v) if c else v + for f, v, c in zip(self.fields, obj, self._need_conversion)) elif isinstance(obj, (tuple, list, Row)): - return tuple(f.to_sql_type(v) if c else v - for f, v, c in zip(self.fields, obj, self._need_conversion)) + return (RowKind.INSERT.value,) + tuple( + f.to_sql_type(v) if c else v + for f, v, c in zip(self.fields, obj, self._need_conversion)) elif hasattr(obj, "__dict__"): d = obj.__dict__ - return tuple(f.to_sql_type(d.get(n)) if c else d.get(n) - for n, f, c in zip(self.names, self.fields, self._need_conversion)) + return (RowKind.INSERT.value,) + tuple( + f.to_sql_type(d.get(n)) if c else d.get(n) + for n, f, c in zip(self.names, self.fields, self._need_conversion)) else: raise ValueError("Unexpected tuple %r with RowType" % obj) else: if isinstance(obj, dict): - return tuple(obj.get(n) for n in self.names) + return (RowKind.INSERT.value,) + tuple(obj.get(n) for n in self.names) elif isinstance(obj, Row) and hasattr(obj, "_fields"): - return tuple(obj[n] for n in self.names) - elif isinstance(obj, (list, tuple, Row)): - return tuple(obj) + return (obj.get_row_kind().value,) + tuple(obj[n] for n in self.names) + elif isinstance(obj, Row): + return (obj.get_row_kind().value,) + tuple(obj) + elif isinstance(obj, (list, tuple)): + return (RowKind.INSERT.value,) + tuple(obj) elif hasattr(obj, "__dict__"): d = obj.__dict__ - return tuple(d.get(n) for n in self.names) + return (RowKind.INSERT.value,) + tuple(d.get(n) for n in self.names) else: raise ValueError("Unexpected tuple %r with RowType" % obj) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/utils/python/PythonTableUtils.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/utils/python/PythonTableUtils.scala index 21684de..da39d48 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/utils/python/PythonTableUtils.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/utils/python/PythonTableUtils.scala @@ -33,7 +33,7 @@ import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo, Ro import org.apache.flink.core.io.InputSplit import org.apache.flink.table.api.{TableSchema, Types} import org.apache.flink.table.sources.InputFormatTableSource -import org.apache.flink.types.Row +import org.apache.flink.types.{Row, RowKind} import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ @@ -216,18 +216,19 @@ object PythonTableUtils { (obj: Any) => nullSafeConvert(obj) { case c if c.getClass.isArray => val r = c.asInstanceOf[Array[_]] - if (r.length != rowType.getFieldTypes.length) { + if (r.length - 1 != rowType.getFieldTypes.length) { throw new IllegalStateException( s"Input row doesn't have expected number of values required by the schema. " + - s"${rowType.getFieldTypes.length} fields are required while ${r.length} " + + s"${rowType.getFieldTypes.length} fields are required while ${r.length - 1} " + s"values are provided." ) } - val row = new Row(r.length) - var i = 0 + val row = new Row(r.length - 1) + row.setKind(RowKind.fromByteValue(r(0).asInstanceOf[Integer].byteValue())) + var i = 1 while (i < r.length) { - row.setField(i, fieldsFromJava(i)(r(i))) + row.setField(i - 1, fieldsFromJava(i - 1)(r(i))) i += 1 } row
