This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 6b3a5dba6c1 [minor][python][tests] Add some test cases for binary and
char types
6b3a5dba6c1 is described below
commit 6b3a5dba6c1f49ec6df6b0b6497ada48f01caa0e
Author: Dian Fu <[email protected]>
AuthorDate: Wed Jan 11 14:38:44 2023 +0800
[minor][python][tests] Add some test cases for binary and char types
---
flink-python/pyflink/fn_execution/coders.py | 4 +-
flink-python/pyflink/table/tests/test_udf.py | 55 ++++++++++++++++++++++++----
flink-python/pyflink/table/types.py | 5 ---
3 files changed, 50 insertions(+), 14 deletions(-)
diff --git a/flink-python/pyflink/fn_execution/coders.py
b/flink-python/pyflink/fn_execution/coders.py
index 6cdd6bfaa45..fa8ccdd5aac 100644
--- a/flink-python/pyflink/fn_execution/coders.py
+++ b/flink-python/pyflink/fn_execution/coders.py
@@ -125,11 +125,11 @@ class LengthPrefixBaseCoder(ABC):
elif field_type.type_name == flink_fn_execution_pb2.Schema.DOUBLE:
return DoubleType(field_type.nullable)
elif field_type.type_name == flink_fn_execution_pb2.Schema.VARCHAR:
- return VarCharType(0x7fffffff, field_type.nullable)
+ return VarCharType(field_type.var_char_info.length,
field_type.nullable)
elif field_type.type_name == flink_fn_execution_pb2.Schema.BINARY:
return BinaryType(field_type.binary_info.length,
field_type.nullable)
elif field_type.type_name == flink_fn_execution_pb2.Schema.VARBINARY:
- return VarBinaryType(0x7fffffff, field_type.nullable)
+ return VarBinaryType(field_type.var_binary_info.length,
field_type.nullable)
elif field_type.type_name == flink_fn_execution_pb2.Schema.DECIMAL:
return DecimalType(field_type.decimal_info.precision,
field_type.decimal_info.scale,
diff --git a/flink-python/pyflink/table/tests/test_udf.py
b/flink-python/pyflink/table/tests/test_udf.py
index 6f249e5dde5..851c43fc0b2 100644
--- a/flink-python/pyflink/table/tests/test_udf.py
+++ b/flink-python/pyflink/table/tests/test_udf.py
@@ -391,12 +391,45 @@ class UserDefinedFunctionTests(object):
'decimal_param is wrong value %s !' % decimal_param
return decimal_param
+ @udf(result_type=DataTypes.BINARY(5))
+ def binary_func(binary_param):
+ assert len(binary_param) == 5
+ return binary_param
+
+ @udf(result_type=DataTypes.CHAR(7))
+ def char_func(char_param):
+ assert len(char_param) == 7
+ return char_param
+
+ @udf(result_type=DataTypes.VARCHAR(10))
+ def varchar_func(varchar_param):
+ assert len(varchar_param) <= 10
+ return varchar_param
+
sink_table = generate_random_table_name()
sink_table_ddl = f"""
CREATE TABLE {sink_table}(
- a BIGINT, b BIGINT, c TINYINT, d BOOLEAN, e SMALLINT, f INT, g
FLOAT, h DOUBLE, i BYTES,
- j STRING, k DATE, l TIME, m TIMESTAMP(3), n ARRAY<BIGINT>, o
MAP<BIGINT, STRING>,
- p DECIMAL(38, 18), q DECIMAL(38, 18)) WITH
('connector'='test-sink')
+ a BIGINT,
+ b BIGINT,
+ c TINYINT,
+ d BOOLEAN,
+ e SMALLINT,
+ f INT,
+ g FLOAT,
+ h DOUBLE,
+ i BYTES,
+ j STRING,
+ k DATE,
+ l TIME,
+ m TIMESTAMP(3),
+ n ARRAY<BIGINT>,
+ o MAP<BIGINT, STRING>,
+ p DECIMAL(38, 18),
+ q DECIMAL(38, 18),
+ r BINARY(5),
+ s CHAR(7),
+ t VARCHAR(10)
+ ) WITH ('connector'='test-sink')
"""
self.t_env.execute_sql(sink_table_ddl)
@@ -408,7 +441,8 @@ class UserDefinedFunctionTests(object):
datetime.time(hour=12, minute=0, second=0, microsecond=123000),
datetime.datetime(2018, 3, 11, 3, 0, 0, 123000), [[1, 2, 3]],
{1: 'flink', 2: 'pyflink'},
decimal.Decimal('1000000000000000000.05'),
-
decimal.Decimal('1000000000000000000.05999999999999999899999999999'))],
+
decimal.Decimal('1000000000000000000.05999999999999999899999999999'),
+ bytearray(b'flink'), 'pyflink', 'pyflink')],
DataTypes.ROW(
[DataTypes.FIELD("a", DataTypes.BIGINT()),
DataTypes.FIELD("b", DataTypes.BIGINT()),
@@ -426,7 +460,10 @@ class UserDefinedFunctionTests(object):
DataTypes.FIELD("n",
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.BIGINT()))),
DataTypes.FIELD("o", DataTypes.MAP(DataTypes.BIGINT(),
DataTypes.STRING())),
DataTypes.FIELD("p", DataTypes.DECIMAL(38, 18)),
- DataTypes.FIELD("q", DataTypes.DECIMAL(38, 18))]))
+ DataTypes.FIELD("q", DataTypes.DECIMAL(38, 18)),
+ DataTypes.FIELD("r", DataTypes.BINARY(5)),
+ DataTypes.FIELD("s", DataTypes.CHAR(7)),
+ DataTypes.FIELD("t", DataTypes.VARCHAR(10))]))
t.select(
bigint_func(t.a),
@@ -445,7 +482,10 @@ class UserDefinedFunctionTests(object):
array_func(t.n),
map_func(t.o),
decimal_func(t.p),
- decimal_cut_func(t.q)) \
+ decimal_cut_func(t.q),
+ binary_func(t.r),
+ char_func(t.s),
+ varchar_func(t.t)) \
.execute_insert(sink_table).wait()
actual = source_sink_utils.results()
# Currently the sink result precision of DataTypes.TIME(precision)
only supports 0.
@@ -454,7 +494,8 @@ class UserDefinedFunctionTests(object):
"[102, 108, 105, 110, 107], pyflink, 2014-09-13,
12:00:00.123, "
"2018-03-11T03:00:00.123, [1, 2, 3], "
"{1=flink, 2=pyflink},
1000000000000000000.050000000000000000, "
- "1000000000000000000.059999999999999999]"])
+ "1000000000000000000.059999999999999999, [102,
108, 105, 110, 107], "
+ "pyflink, pyflink]"])
def test_all_data_types(self):
def boolean_func(bool_param):
diff --git a/flink-python/pyflink/table/types.py
b/flink-python/pyflink/table/types.py
index 856942e5936..99e6676d687 100644
--- a/flink-python/pyflink/table/types.py
+++ b/flink-python/pyflink/table/types.py
@@ -2360,8 +2360,6 @@ class DataTypes(object):
:param length: int, the string representation length. It must have a
value
between 1 and 2147483647(0x7fffffff) (both inclusive).
:param nullable: boolean, whether the type can be null (None) or not.
-
- .. note:: `CharType` is still not supported yet.
"""
return CharType(length, nullable)
@@ -2409,8 +2407,6 @@ class DataTypes(object):
:param length: int, the number of bytes. It must have a value between
1 and 2147483647(0x7fffffff) (both inclusive).
:param nullable: boolean, whether the type can be null (None) or not.
-
- .. note:: `BinaryType` is still not supported yet.
"""
return BinaryType(length, nullable)
@@ -2423,7 +2419,6 @@ class DataTypes(object):
between 1 and 2147483647(0x7fffffff) (both inclusive).
:param nullable: boolean, whether the type can be null (None) or not.
- .. note:: The length limit must be 0x7fffffff(2147483647) currently.
.. seealso:: :func:`~DataTypes.BYTES`
"""
return VarBinaryType(length, nullable)