[
https://issues.apache.org/jira/browse/FLINK-28375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
hehuiyuan updated FLINK-28375:
------------------------------
Description:
{code:java}
CREATE TABLE kafkaTableSource (
keyField INTEGER,
timestampField INTEGER,
arrayField ARRAY<String>,
ws as TO_TIMESTAMP(FROM_UNIXTIME(timestampField)),
WATERMARK FOR ws AS ws
) WITH (
'connector' = 'kafka',
'topic' = 'hehuiyuan1',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.client.id' = 'test-consumer-group',
'properties.group.id' = 'test-consumer-group',
'format' = 'json'
);
CREATE TABLE kafkaTableSink(
keyField INTEGER,
timestampField INTEGER,
arrayLastValue ARRAY<String>
)
WITH (
'connector' = 'print'
);
insert into kafkaTableSink
select keyField,timestampField, last_value(arrayField) over (partition by
keyField order by ws) from kafkaTableSource;
{code}
{color:#ff0000}Exception in thread "main"
org.apache.flink.table.api.TableException: LAST_VALUE aggregate function does
not support type: ''ARRAY''.{color}
{color:#ff0000}Please re-check the data type.{color}
I have a modification to support this, but why does the community not support
it?
Is there any special reason that i do not considered?
The test the array data type can run:
mock data:
!image-2022-07-04-16-21-28-198.png!
result is right:
!image-2022-07-04-16-20-08-661.png|width=626,height=146!
non over window last value funciton test:
{code:java}
CREATE TABLE kafkaTableSource (
keyField INTEGER,
timestampField INTEGER,
arrayField ARRAY<String>,
ws as TO_TIMESTAMP(FROM_UNIXTIME(timestampField)),
WATERMARK FOR ws AS ws - INTERVAL '2' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'hehuiyuan1',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.client.id' = 'test-consumer-group',
'properties.group.id' = 'test-consumer-group',
'format' = 'json'
);
CREATE TABLE kafkaTableSink(
keyField INTEGER,
arrayLastValue ARRAY<String>
)
WITH (
'connector' = 'print'
);
insert into kafkaTableSink
select keyField, last_value(arrayField) from kafkaTableSource GROUP BY
TUMBLE(ws,INTERVAL '5' SECOND),keyField;
{code}
The mock data:
!image-2022-07-04-16-42-31-399.png|width=705,height=183!
The result is right
!image-2022-07-04-16-44-02-720.png|width=687,height=155!
was:
{code:java}
CREATE TABLE kafkaTableSource (
keyField INTEGER,
timestampField INTEGER,
arrayField ARRAY<String>,
ws as TO_TIMESTAMP(FROM_UNIXTIME(timestampField)),
WATERMARK FOR ws AS ws
) WITH (
'connector' = 'kafka',
'topic' = 'hehuiyuan1',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.client.id' = 'test-consumer-group',
'properties.group.id' = 'test-consumer-group',
'format' = 'json'
);
CREATE TABLE kafkaTableSink(
keyField INTEGER,
timestampField INTEGER,
arrayLastValue ARRAY<String>
)
WITH (
'connector' = 'print'
);
insert into kafkaTableSink
select keyField,timestampField, last_value(arrayField) over (partition by
keyField order by ws) from kafkaTableSource;
{code}
{color:#FF0000}Exception in thread "main"
org.apache.flink.table.api.TableException: LAST_VALUE aggregate function does
not support type: ''ARRAY''.{color}
{color:#FF0000}Please re-check the data type.{color}
I have a modification to support this, but why does the community not support
it?
Is there any special reason that i do not considered?
The test the array data type can run:
mock data:
!image-2022-07-04-16-21-28-198.png!
result:
!image-2022-07-04-16-20-08-661.png|width=626,height=146!
> Whether to consider adding other data type to support for last_value function
> -----------------------------------------------------------------------------
>
> Key: FLINK-28375
> URL: https://issues.apache.org/jira/browse/FLINK-28375
> Project: Flink
> Issue Type: Improvement
> Components: Table SQL / API
> Reporter: hehuiyuan
> Priority: Minor
> Attachments: image-2022-07-04-16-20-08-661.png,
> image-2022-07-04-16-21-28-198.png, image-2022-07-04-16-42-31-399.png,
> image-2022-07-04-16-44-02-720.png
>
>
>
> {code:java}
> CREATE TABLE kafkaTableSource (
> keyField INTEGER,
> timestampField INTEGER,
> arrayField ARRAY<String>,
> ws as TO_TIMESTAMP(FROM_UNIXTIME(timestampField)),
> WATERMARK FOR ws AS ws
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'hehuiyuan1',
> 'scan.startup.mode' = 'latest-offset',
> 'properties.bootstrap.servers' = 'localhost:9092',
> 'properties.client.id' = 'test-consumer-group',
> 'properties.group.id' = 'test-consumer-group',
> 'format' = 'json'
> );
> CREATE TABLE kafkaTableSink(
> keyField INTEGER,
> timestampField INTEGER,
> arrayLastValue ARRAY<String>
> )
> WITH (
> 'connector' = 'print'
> );
> insert into kafkaTableSink
> select keyField,timestampField, last_value(arrayField) over (partition by
> keyField order by ws) from kafkaTableSource;
> {code}
> {color:#ff0000}Exception in thread "main"
> org.apache.flink.table.api.TableException: LAST_VALUE aggregate function does
> not support type: ''ARRAY''.{color}
> {color:#ff0000}Please re-check the data type.{color}
>
> I have a modification to support this, but why does the community not
> support it?
> Is there any special reason that i do not considered?
>
> The test the array data type can run:
> mock data:
> !image-2022-07-04-16-21-28-198.png!
> result is right:
> !image-2022-07-04-16-20-08-661.png|width=626,height=146!
>
>
> non over window last value funciton test:
> {code:java}
> CREATE TABLE kafkaTableSource (
> keyField INTEGER,
> timestampField INTEGER,
> arrayField ARRAY<String>,
> ws as TO_TIMESTAMP(FROM_UNIXTIME(timestampField)),
> WATERMARK FOR ws AS ws - INTERVAL '2' SECOND
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'hehuiyuan1',
> 'scan.startup.mode' = 'latest-offset',
> 'properties.bootstrap.servers' = 'localhost:9092',
> 'properties.client.id' = 'test-consumer-group',
> 'properties.group.id' = 'test-consumer-group',
> 'format' = 'json'
> );
> CREATE TABLE kafkaTableSink(
> keyField INTEGER,
> arrayLastValue ARRAY<String>
> )
> WITH (
> 'connector' = 'print'
> );
> insert into kafkaTableSink
> select keyField, last_value(arrayField) from kafkaTableSource GROUP BY
> TUMBLE(ws,INTERVAL '5' SECOND),keyField;
> {code}
> The mock data:
> !image-2022-07-04-16-42-31-399.png|width=705,height=183!
> The result is right
> !image-2022-07-04-16-44-02-720.png|width=687,height=155!
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)