hequn8128 commented on a change in pull request #11656: [FLINK-16983][python]
Support RowType in vectorized Python UDF
URL: https://github.com/apache/flink/pull/11656#discussion_r406056397
##########
File path:
flink-python/src/test/java/org/apache/flink/table/runtime/arrow/BaseRowArrowReaderWriterTest.java
##########
@@ -152,21 +164,25 @@ public static void init() {
BaseRow row1 = StreamRecordUtils.baserow((byte) 1, (short) 2,
3, 4L, true, 1.0f, 1.0, "hello", "hello".getBytes(), Decimal.fromLong(1, 10,
3), 100, 3600000, 3600000, 3600000, 3600000,
SqlTimestamp.fromEpochMillis(3600000),
SqlTimestamp.fromEpochMillis(3600000), SqlTimestamp.fromEpochMillis(3600000,
100000), SqlTimestamp.fromEpochMillis(3600000, 100000),
SqlTimestamp.fromEpochMillis(3600000),
SqlTimestamp.fromEpochMillis(3600000), SqlTimestamp.fromEpochMillis(3600000,
100000), SqlTimestamp.fromEpochMillis(3600000, 100000),
- new GenericArray(new BinaryString[]
{BinaryString.fromString("hello"), BinaryString.fromString("中文"), null}, 3));
+ new GenericArray(new BinaryString[]
{BinaryString.fromString("hello"), BinaryString.fromString("中文"), null}, 3),
+ GenericRow.of(1, BinaryString.fromString("hello"), new
GenericArray(new BinaryString[] {BinaryString.fromString("hello")}, 1),
SqlTimestamp.fromEpochMillis(3600000), GenericRow.of(1,
BinaryString.fromString("hello"))));
BinaryRow row2 = StreamRecordUtils.binaryrow((byte) 1, (short)
2, 3, 4L, false, 1.0f, 1.0, "中文", "中文".getBytes(), Decimal.fromLong(1, 10, 3),
100, 3600000, 3600000, 3600000, 3600000,
Tuple2.of(SqlTimestamp.fromEpochMillis(3600000), 0),
Tuple2.of(SqlTimestamp.fromEpochMillis(3600000), 2),
Tuple2.of(SqlTimestamp.fromEpochMillis(3600000, 100000), 4),
Tuple2.of(SqlTimestamp.fromEpochMillis(3600000, 100000), 8),
Tuple2.of(SqlTimestamp.fromEpochMillis(3600000), 0),
Tuple2.of(SqlTimestamp.fromEpochMillis(3600000), 2),
Tuple2.of(SqlTimestamp.fromEpochMillis(3600000, 100000), 4),
Tuple2.of(SqlTimestamp.fromEpochMillis(3600000, 100000), 8),
- Tuple2.of(new GenericArray(new String[] {null, null,
null}, 3), new BaseArraySerializer(new VarCharType(), null)));
+ Tuple2.of(new GenericArray(new String[] {null, null,
null}, 3), new BaseArraySerializer(new VarCharType(), null)),
+ Tuple2.of(GenericRow.of(1, null, new GenericArray(new
BinaryString[] {BinaryString.fromString("hello")}, 1), null, GenericRow.of(1,
BinaryString.fromString("hello"))), new BaseRowSerializer(new
ExecutionConfig(), rowFieldType)));
BaseRow row3 = StreamRecordUtils.baserow(null, (short) 2, 3,
4L, false, 1.0f, 1.0, "中文", "中文".getBytes(), Decimal.fromLong(1, 10, 3), 100,
3600000, 3600000, 3600000, 3600000,
SqlTimestamp.fromEpochMillis(3600000),
SqlTimestamp.fromEpochMillis(3600000), SqlTimestamp.fromEpochMillis(3600000,
100000), SqlTimestamp.fromEpochMillis(3600000, 100000),
SqlTimestamp.fromEpochMillis(3600000),
SqlTimestamp.fromEpochMillis(3600000), SqlTimestamp.fromEpochMillis(3600000,
100000), SqlTimestamp.fromEpochMillis(3600000, 100000),
- new GenericArray(new String[] {null, null, null}, 3));
+ new GenericArray(new String[] {null, null, null}, 3),
+ GenericRow.of(1, null, new GenericArray(new
BinaryString[] {BinaryString.fromString("hello")}, 1), null, null));
BinaryRow row4 = StreamRecordUtils.binaryrow((byte) 1, null, 3,
4L, true, 1.0f, 1.0, "hello", "hello".getBytes(), Decimal.fromLong(1, 10, 3),
100, 3600000, 3600000, 3600000, 3600000,
Tuple2.of(SqlTimestamp.fromEpochMillis(3600000), 0),
Tuple2.of(SqlTimestamp.fromEpochMillis(3600000), 2),
Tuple2.of(SqlTimestamp.fromEpochMillis(3600000, 100000), 4),
Tuple2.of(SqlTimestamp.fromEpochMillis(3600000, 100000), 8),
Tuple2.of(SqlTimestamp.fromEpochMillis(3600000), 0),
Tuple2.of(SqlTimestamp.fromEpochMillis(3600000), 2),
Tuple2.of(SqlTimestamp.fromEpochMillis(3600000, 100000), 4),
Tuple2.of(SqlTimestamp.fromEpochMillis(3600000, 100000), 8),
- Tuple2.of(new GenericArray(new BinaryString[]
{BinaryString.fromString("hello"), BinaryString.fromString("中文"), null}, 3),
new BaseArraySerializer(new VarCharType(), null)));
- BaseRow row5 = StreamRecordUtils.baserow(null, null, null,
null, null, null, null, null, null, null, null, null, null, null, null, null,
null, null, null, null, null, null, null, null);
- BinaryRow row6 = StreamRecordUtils.binaryrow(null, null, null,
null, null, null, null, null, null, null, null, null, null, null, null, null,
null, null, null, null, null, null, null, null);
+ Tuple2.of(new GenericArray(new BinaryString[]
{BinaryString.fromString("hello"), BinaryString.fromString("中文"), null}, 3),
new BaseArraySerializer(new VarCharType(), null)),
+ Tuple2.of(GenericRow.of(1, null, new GenericArray(new
BinaryString[] {BinaryString.fromString("hello")}, 1), null, null), new
BaseRowSerializer(new ExecutionConfig(), rowFieldType)));
+ BaseRow row5 = StreamRecordUtils.baserow(null, null, null,
null, null, null, null, null, null, null, null, null, null, null, null, null,
null, null, null, null, null, null, null, null, null);
Review comment:
StreamRecordUtils.baserow(new Object[n]) ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services