HuangXingBo commented on code in PR #19140:
URL: https://github.com/apache/flink/pull/19140#discussion_r880270042
##########
flink-python/pyflink/table/tests/test_calc.py:
##########
@@ -66,12 +66,11 @@ def test_filter(self):
def test_from_element(self):
t_env = self.t_env
field_names = ["a", "b", "c", "d", "e", "f", "g", "h",
- "i", "j", "k", "l", "m", "n", "o", "p", "q"]
+ "i", "j", "k", "l", "m", "n", "o", "p"]
field_types = [DataTypes.BIGINT(), DataTypes.DOUBLE(),
DataTypes.STRING(),
DataTypes.STRING(), DataTypes.DATE(),
DataTypes.TIME(),
DataTypes.TIMESTAMP(3),
- DataTypes.INTERVAL(DataTypes.SECOND(3)),
Review Comment:
Why do you remove this?
##########
flink-python/pyflink/table/types.py:
##########
@@ -1956,7 +1821,7 @@ def _from_java_type(j_data_type):
TypeError("Unsupported data type: %s" % j_data_type)
-def _to_java_data_type(data_type: DataType):
+def _to_java_data_type(data_type: Union[DataType, List[str]]):
Review Comment:
When the `data_type` is a `List[str]`?
##########
flink-python/src/main/java/org/apache/flink/table/utils/python/PythonTableUtils.java:
##########
@@ -62,6 +69,8 @@
@Internal
public final class PythonTableUtils {
+ private static final Logger LOG =
LoggerFactory.getLogger(PythonTableUtils.class);
Review Comment:
unnecessary changes?
##########
flink-python/src/main/java/org/apache/flink/table/utils/python/PythonTableUtils.java:
##########
@@ -74,15 +83,17 @@ private PythonTableUtils() {}
* @return An InputFormat containing the python data.
*/
public static InputFormat<Row, ?> getInputFormat(
- final List<Object[]> data,
- final TypeInformation<Row> dataType,
- final ExecutionConfig config) {
- Function<Object, Object> converter = converter(dataType, config);
+ final List<Object[]> data, final DataType dataType, final
ExecutionConfig config) {
+ TypeInformation<Row> rowTypeInfo =
+ (TypeInformation<Row>)
TypeConversions.fromDataTypeToLegacyInfo(dataType);
Review Comment:
Can we create the corresponding serializer through `InternalTypeInfo` or
`ExternalTypeInfo`, after all `TypeConversions.fromDataTypeToLegacyInfo` has
been deprecated
##########
flink-python/src/test/java/org/apache/flink/table/utils/TestingSinkTableFactory.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/** Testing utils adopted from legacy planner until the Python code is
updated. */
Review Comment:
change the note
##########
flink-python/pyflink/table/tests/test_row_based_operation.py:
##########
@@ -189,6 +191,7 @@ def test_aggregate_with_pandas_udaf_without_keys(self):
actual = source_sink_utils.results()
self.assert_equals(actual, ["+I[3.8, 8]"])
+ @unittest.skip("Not supported yet")
Review Comment:
This is a bug in the batch time window of the table api. In the new type
system in this case, the type of the window reference will be `localDateTime`
##########
flink-python/pyflink/table/table_environment.py:
##########
@@ -1431,7 +1431,7 @@ def _from_elements(self, elements: List, schema:
Union[DataType, List[str]]) ->
try:
with temp_file:
serializer.serialize(elements, temp_file)
- row_type_info = _to_java_type(schema)
+ row_type_info = _to_java_data_type(schema)
Review Comment:
```suggestion
row_data_type = _to_java_data_type(schema)
```
##########
flink-python/pyflink/table/types.py:
##########
@@ -1681,157 +1681,13 @@ def convert_row(obj):
FloatType, DoubleType}
-def _to_java_type(data_type):
+def _from_java_data_type(j_data_type):
"""
- Converts Python type to Java TypeInformation.
+ Converts Java DataType to Python DataType.
"""
-
- global _python_java_types_mapping
- global _python_java_types_mapping_lock
-
gateway = get_gateway()
- Types = gateway.jvm.org.apache.flink.table.api.Types
-
- if _python_java_types_mapping is None:
- with _python_java_types_mapping_lock:
- _python_java_types_mapping = {
- BooleanType: Types.BOOLEAN(),
- TinyIntType: Types.BYTE(),
- SmallIntType: Types.SHORT(),
- IntType: Types.INT(),
- BigIntType: Types.LONG(),
- FloatType: Types.FLOAT(),
- DoubleType: Types.DOUBLE(),
- DateType: Types.SQL_DATE(),
- }
-
- # basic types
- if type(data_type) in _python_java_types_mapping:
- return _python_java_types_mapping[type(data_type)]
-
- # DecimalType
- elif isinstance(data_type, DecimalType):
- if data_type.precision == 38 and data_type.scale == 18:
- return Types.DECIMAL()
- else:
- raise TypeError("The precision must be 38 and the scale must be 18
for DecimalType, "
- "got %s" % repr(data_type))
-
- # TimeType
- elif isinstance(data_type, TimeType):
- if data_type.precision == 0:
- return Types.SQL_TIME()
- else:
- raise TypeError("The precision must be 0 for TimeType, got %s" %
repr(data_type))
-
- # TimestampType
- elif isinstance(data_type, TimestampType):
- if data_type.precision == 3:
- return Types.SQL_TIMESTAMP()
- else:
- raise TypeError("The precision must be 3 for TimestampType, got
%s" % repr(data_type))
-
- # LocalZonedTimestampType
- elif isinstance(data_type, LocalZonedTimestampType):
- if data_type.precision == 3:
- return
gateway.jvm.org.apache.flink.api.common.typeinfo.Types.INSTANT
- else:
- raise TypeError("The precision must be 3 for
LocalZonedTimestampType, got %s"
- % repr(data_type))
-
- # VarCharType
- elif isinstance(data_type, VarCharType):
- if data_type.length == 0x7fffffff:
- return Types.STRING()
- else:
- raise TypeError("The length limit must be 0x7fffffff(2147483647)
for VarCharType, "
- "got %s" % repr(data_type))
-
- # VarBinaryType
- elif isinstance(data_type, VarBinaryType):
- if data_type.length == 0x7fffffff:
- return Types.PRIMITIVE_ARRAY(Types.BYTE())
- else:
- raise TypeError("The length limit must be 0x7fffffff(2147483647)
for VarBinaryType, "
- "got %s" % repr(data_type))
-
- # YearMonthIntervalType
- elif isinstance(data_type, YearMonthIntervalType):
- if data_type.resolution ==
YearMonthIntervalType.YearMonthResolution.MONTH and \
- data_type.precision == 2:
- return Types.INTERVAL_MONTHS()
- else:
- raise TypeError("The resolution must be YearMonthResolution.MONTH
and the precision "
- "must be 2 for YearMonthIntervalType, got %s" %
repr(data_type))
-
- # DayTimeIntervalType
- elif isinstance(data_type, DayTimeIntervalType):
- if data_type.resolution ==
DayTimeIntervalType.DayTimeResolution.SECOND and \
- data_type.day_precision == 2 and
data_type.fractional_precision == 3:
- return Types.INTERVAL_MILLIS()
- else:
- raise TypeError("The resolution must be DayTimeResolution.SECOND,
the day_precision "
- "must be 2 and the fractional_precision must be 3
for "
- "DayTimeIntervalType, got %s" % repr(data_type))
-
- # ArrayType
- elif isinstance(data_type, ArrayType):
- return Types.OBJECT_ARRAY(_to_java_type(data_type.element_type))
- # ListViewType
- elif isinstance(data_type, ListViewType):
- return
gateway.jvm.org.apache.flink.table.dataview.ListViewTypeInfo(_to_java_type(
- data_type._element_type))
-
- # MapType
- elif isinstance(data_type, MapType):
- return Types.MAP(_to_java_type(data_type.key_type),
_to_java_type(data_type.value_type))
-
- # MapViewType
- elif isinstance(data_type, MapViewType):
- return gateway.jvm.org.apache.flink.table.dataview.MapViewTypeInfo(
- _to_java_type(data_type._key_type),
_to_java_type(data_type._value_type))
-
- # MultisetType
- elif isinstance(data_type, MultisetType):
- return Types.MULTISET(_to_java_type(data_type.element_type))
-
- # RowType
- elif isinstance(data_type, RowType):
- return Types.ROW(
- to_jarray(gateway.jvm.String, data_type.field_names()),
- to_jarray(gateway.jvm.TypeInformation,
- [_to_java_type(f.data_type) for f in data_type.fields]))
-
- # UserDefinedType
- elif isinstance(data_type, UserDefinedType):
- if data_type.java_udt():
- return
gateway.jvm.org.apache.flink.util.InstantiationUtil.instantiate(
- gateway.jvm.Class.forName(
- data_type.java_udt(),
- True,
-
gateway.jvm.Thread.currentThread().getContextClassLoader()))
- else:
- return _to_java_type(data_type.sql_type())
-
- else:
- raise TypeError("Not supported type: %s" % repr(data_type))
-
-
-def _from_java_type(j_data_type):
- """
- Converts Java TypeInformation to Python DataType.
- """
- gateway = get_gateway()
-
- if is_instance_of(j_data_type, gateway.jvm.TypeInformation):
- # input is TypeInformation
- LegacyTypeInfoDataTypeConverter = \
-
gateway.jvm.org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter
- java_data_type =
LegacyTypeInfoDataTypeConverter.toDataType(j_data_type)
- else:
- # input is DataType
- java_data_type = j_data_type
+ java_data_type = j_data_type
Review Comment:
Why not use `java_data_type` or `j_data_type` uniformly
##########
flink-python/src/main/java/org/apache/flink/table/utils/python/PythonTableUtils.java:
##########
@@ -98,6 +109,7 @@ private PythonTableUtils() {}
public static <T> InputFormat<T, ?> getCollectionInputFormat(
final List<T> data, final TypeInformation<T> dataType, final
ExecutionConfig config) {
Function<Object, Object> converter = converter(dataType, config);
+
Review Comment:
unnecessary changes?
##########
flink-python/src/main/java/org/apache/flink/table/utils/python/PythonInputFormatTableSource.java:
##########
@@ -43,12 +44,12 @@ public class PythonInputFormatTableSource extends
InputFormatTableSource<Row> {
/**
* The row type info of the python data. It is generated by the python
'from_element' method.
Review Comment:
change the `row type into` to `row datatype` and the note of `inputFormat`
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java:
##########
@@ -98,31 +98,26 @@ public boolean isDeterministic() {
@Override
public TypeInformation[] getParameterTypes(Class[] signature) {
if (inputTypes != null) {
- return inputTypes;
+ return TypeConversions.fromDataTypeToLegacyInfo(inputTypes);
} else {
return super.getParameterTypes(signature);
}
}
@Override
public TypeInformation getResultType(Class[] signature) {
Review Comment:
ditto
##########
flink-python/src/main/java/org/apache/flink/table/utils/python/PythonTableUtils.java:
##########
@@ -74,15 +83,17 @@ private PythonTableUtils() {}
* @return An InputFormat containing the python data.
*/
public static InputFormat<Row, ?> getInputFormat(
- final List<Object[]> data,
- final TypeInformation<Row> dataType,
- final ExecutionConfig config) {
- Function<Object, Object> converter = converter(dataType, config);
+ final List<Object[]> data, final DataType dataType, final
ExecutionConfig config) {
+ TypeInformation<Row> rowTypeInfo =
+ (TypeInformation<Row>)
TypeConversions.fromDataTypeToLegacyInfo(dataType);
+ LOG.info(rowTypeInfo.toString());
Review Comment:
unnecessary changes?
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java:
##########
@@ -98,31 +98,26 @@ public boolean isDeterministic() {
@Override
public TypeInformation[] getParameterTypes(Class[] signature) {
Review Comment:
I was wondering if it would be necessary to override this method now that we
already support the new type system. Of course, to ensure compatibility, we
need to map `register_function` to `create_temporary_system_function`
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java:
##########
@@ -100,31 +99,26 @@ public boolean isDeterministic() {
@Override
public TypeInformation[] getParameterTypes(Class[] signature) {
if (inputTypes != null) {
- return inputTypes;
+ return TypeConversions.fromDataTypeToLegacyInfo(inputTypes);
} else {
return super.getParameterTypes(signature);
}
}
@Override
public TypeInformation<Row> getResultType() {
- return resultType;
+ return (TypeInformation<Row>)
TypeConversions.fromDataTypeToLegacyInfo(resultType);
}
@Override
public TypeInference getTypeInference(DataTypeFactory typeFactory) {
TypeInference.Builder builder = TypeInference.newBuilder();
if (inputTypes != null) {
final List<DataType> argumentDataTypes =
- Stream.of(inputTypes)
- .map(TypeConversions::fromLegacyInfoToDataType)
- .collect(Collectors.toList());
+ Stream.of(inputTypes).collect(Collectors.toList());
Review Comment:
ditto
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java:
##########
@@ -98,31 +98,26 @@ public boolean isDeterministic() {
@Override
public TypeInformation[] getParameterTypes(Class[] signature) {
if (inputTypes != null) {
- return inputTypes;
+ return TypeConversions.fromDataTypeToLegacyInfo(inputTypes);
} else {
return super.getParameterTypes(signature);
}
}
@Override
public TypeInformation getResultType(Class[] signature) {
- return resultType;
+ return TypeConversions.fromDataTypeToLegacyInfo(resultType);
}
@Override
public TypeInference getTypeInference(DataTypeFactory typeFactory) {
TypeInference.Builder builder = TypeInference.newBuilder();
if (inputTypes != null) {
final List<DataType> argumentDataTypes =
- Stream.of(inputTypes)
- .map(TypeConversions::fromLegacyInfoToDataType)
- .collect(Collectors.toList());
+ Stream.of(inputTypes).collect(Collectors.toList());
Review Comment:
why not use `builder.typedArguments(inputTypes);` directly?
##########
flink-python/src/main/java/org/apache/flink/table/utils/python/PythonInputFormatTableSource.java:
##########
@@ -58,11 +59,11 @@ public PythonInputFormatTableSource(
@Override
public TableSchema getTableSchema() {
Review Comment:
Can we re-implement `PythonInputFormatTableSource` based on
`DynamicTableSouce`?
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/utils/PythonFunctionUtils.java:
##########
@@ -49,4 +54,10 @@ public static PythonFunction getPythonFunction(
t);
}
}
+
+ public static FunctionDefinition getPythonTableFunctionDefinition(
Review Comment:
could we remove this?
##########
flink-python/src/main/java/org/apache/flink/table/utils/python/PythonTableUtils.java:
##########
@@ -98,6 +109,7 @@ private PythonTableUtils() {}
public static <T> InputFormat<T, ?> getCollectionInputFormat(
Review Comment:
This method is only used in datastream jobs, so it feels strange to put it
under `PythonTableUtils`, but this is for historical reasons, maybe rename
`PythonTableUtils` is better
##########
flink-python/pyflink/table/expressions.py:
##########
@@ -773,12 +773,12 @@ def get_function_definition(f):
TypeInference was not supported for TableFunction in the old
planner. Use
TableFunctionDefinition to work around this issue.
"""
- j_result_types = to_jarray(gateway.jvm.TypeInformation,
- [_to_java_type(i) for i in
f._result_types])
- j_result_type =
gateway.jvm.org.apache.flink.api.java.typeutils.RowTypeInfo(
- j_result_types)
- return
gateway.jvm.org.apache.flink.table.functions.TableFunctionDefinition(
- 'f', f._java_user_defined_function(), j_result_type)
+ j_result_types = to_jarray(gateway.jvm.DataType,
Review Comment:
Now that there is no old planner, can we remove this judgment, and we will
not add `getPythonTableFunctionDefinition`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]