[
https://issues.apache.org/jira/browse/FLINK-28504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
nyingping updated FLINK-28504:
------------------------------
Description:
I have a window topN test task, the code is as follows
`
Configuration configuration = new Configuration();
configuration.setInteger(RestOptions.PORT, 8082);
StreamExecutionEnvironment streamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
StreamTableEnvironment st =
StreamTableEnvironment.create(streamExecutionEnvironment);
st.getConfig().getConfiguration().setString("table.exec.source.idle-timeout",
"10s");
st.executeSql(
"CREATE TABLE test (\n"
+ " `key` STRING,\n"
+ " `time` TIMESTAMP(3),\n"
+ " `price` float,\n"
+ " WATERMARK FOR `time` AS `time` - INTERVAL '10'
SECOND"
+ ") WITH (\n"
+ " 'connector' = 'kafka',\n"
+ " 'topic' = 'test',\n"
+ " 'properties.bootstrap.servers' =
'testlocal:9092',\n"
+ " 'properties.group.id' = 'windowGroup',\n"
+ " 'scan.startup.mode' = 'latest-offset',\n"
+ " 'format' = 'json'\n"
+ ")"
String sqlWindowTopN =
"select * from (" +
" select *, " +
" ROW_NUMBER() over (partition by window_start, window_end
order by total desc ) as rownum " +
" from (" +
" select key,window_start,window_end,count(key) as
`count`,sum(price) total from table (" +
" tumble(TABLE test, DESCRIPTOR(`time`), interval '1'
minute)" +
" ) group by window_start, window_end, key" +
" )" +
") where rownum <= 3";
st.executeSql(sqlWindowTopN).print();
`
Run and do not get result on long time after.
Watermark appears as follows on the UI
!image-2022-07-12-15-11-51-653.png!
I didn't set the parallelism manually, so it defaults to 12. The data source
Kafka has only one partition, so there are free partitions. To align the
watermarks for the entire task, I use the `table.exec. source. Idle-timeout`
configuration.
As above show,I found that the system automatically split window-Topn SQL into
local-global aggregation tasks. In the Local phase, watermark didn't work as
well as I expected.
Manually setting the parallelism to 1 did what I expected.
`streamExecutionEnvironment.setParallelism(1);`
!image-2022-07-12-15-19-29-950.png!
I can also manually configure the system not to split into local-global phases.
At this point, the `table.exec.source-idle-timeout ` configuration takes effect
and the watermark is aligned.
`
st.getConfig().getConfiguration().setString("table.optimizer.agg-phase-strategy",
"ONE_PHASE");
`
result:
!image-2022-07-12-15-20-06-919.png!
As mentioned above, I hope to be able to aggregate in two stages and align the
watermarks at the same time.
was:
I have a window topN test task, the code is as follows
```
Configuration configuration = new Configuration();
configuration.setInteger(RestOptions.PORT, 8082);
StreamExecutionEnvironment streamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
StreamTableEnvironment st =
StreamTableEnvironment.create(streamExecutionEnvironment);
st.getConfig().getConfiguration().setString("table.exec.source.idle-timeout",
"10s");
st.executeSql(
"CREATE TABLE test (\n"
+ " `key` STRING,\n"
+ " `time` TIMESTAMP(3),\n"
+ " `price` float,\n"
+ " WATERMARK FOR `time` AS `time` - INTERVAL '10'
SECOND"
+ ") WITH (\n"
+ " 'connector' = 'kafka',\n"
+ " 'topic' = 'test',\n"
+ " 'properties.bootstrap.servers' =
'testlocal:9092',\n"
+ " 'properties.group.id' = 'windowGroup',\n"
+ " 'scan.startup.mode' = 'latest-offset',\n"
+ " 'format' = 'json'\n"
+ ")"
String sqlWindowTopN =
"select * from (" +
" select *, " +
" ROW_NUMBER() over (partition by window_start, window_end
order by total desc ) as rownum " +
" from (" +
" select key,window_start,window_end,count(key) as
`count`,sum(price) total from table (" +
" tumble(TABLE test, DESCRIPTOR(`time`), interval '1'
minute)" +
" ) group by window_start, window_end, key" +
" )" +
") where rownum <= 3";
st.executeSql(sqlWindowTopN).print();
```
Run and do not get result on long time after.
Watermark appears as follows on the UI
!image-2022-07-12-15-11-51-653.png!
I didn't set the parallelism manually, so it defaults to 12. The data source
Kafka has only one partition, so there are free partitions. To align the
watermarks for the entire task, I use the `table.exec. source. Idle-timeout`
configuration.
As above show,I found that the system automatically split window-Topn SQL into
local-global aggregation tasks. In the Local phase, watermark didn't work as
well as I expected.
Manually setting the parallelism to 1 did what I expected.
`streamExecutionEnvironment.setParallelism(1);`
!image-2022-07-12-15-19-29-950.png!
I can also manually configure the system not to split into local-global phases.
At this point, the `table.exec.source-idle-timeout ` configuration takes effect
and the watermark is aligned.
`
st.getConfig().getConfiguration().setString("table.optimizer.agg-phase-strategy",
"ONE_PHASE");
`
result:
!image-2022-07-12-15-20-06-919.png!
As mentioned above, I hope to be able to aggregate in two stages and align the
watermarks at the same time.
> Local-Global aggregation causes watermark alignment
> (table.exec.source.idle-timeout) of idle partition invalid
> --------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-28504
> URL: https://issues.apache.org/jira/browse/FLINK-28504
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 1.14.2
> Environment: flink 1.14.1
> kafka 2.4
> Reporter: nyingping
> Priority: Major
> Attachments: image-2022-07-12-15-11-51-653.png,
> image-2022-07-12-15-19-29-950.png, image-2022-07-12-15-20-06-919.png
>
>
> I have a window topN test task, the code is as follows
> `
> Configuration configuration = new Configuration();
> configuration.setInteger(RestOptions.PORT, 8082);
> StreamExecutionEnvironment streamExecutionEnvironment =
>
> StreamExecutionEnvironment.getExecutionEnvironment(configuration);
>
> StreamTableEnvironment st =
> StreamTableEnvironment.create(streamExecutionEnvironment);
>
>
> st.getConfig().getConfiguration().setString("table.exec.source.idle-timeout",
> "10s");
> st.executeSql(
> "CREATE TABLE test (\n"
> + " `key` STRING,\n"
> + " `time` TIMESTAMP(3),\n"
> + " `price` float,\n"
> + " WATERMARK FOR `time` AS `time` - INTERVAL '10'
> SECOND"
> + ") WITH (\n"
> + " 'connector' = 'kafka',\n"
> + " 'topic' = 'test',\n"
> + " 'properties.bootstrap.servers' =
> 'testlocal:9092',\n"
> + " 'properties.group.id' = 'windowGroup',\n"
> + " 'scan.startup.mode' = 'latest-offset',\n"
> + " 'format' = 'json'\n"
> + ")"
> String sqlWindowTopN =
> "select * from (" +
> " select *, " +
> " ROW_NUMBER() over (partition by window_start, window_end
> order by total desc ) as rownum " +
> " from (" +
> " select key,window_start,window_end,count(key) as
> `count`,sum(price) total from table (" +
> " tumble(TABLE test, DESCRIPTOR(`time`), interval
> '1' minute)" +
> " ) group by window_start, window_end, key" +
> " )" +
> ") where rownum <= 3";
> st.executeSql(sqlWindowTopN).print();
> `
>
> Run and do not get result on long time after.
> Watermark appears as follows on the UI
>
> !image-2022-07-12-15-11-51-653.png!
> I didn't set the parallelism manually, so it defaults to 12. The data source
> Kafka has only one partition, so there are free partitions. To align the
> watermarks for the entire task, I use the `table.exec. source. Idle-timeout`
> configuration.
>
> As above show,I found that the system automatically split window-Topn SQL
> into local-global aggregation tasks. In the Local phase, watermark didn't
> work as well as I expected.
>
> Manually setting the parallelism to 1 did what I expected.
> `streamExecutionEnvironment.setParallelism(1);`
> !image-2022-07-12-15-19-29-950.png!
>
> I can also manually configure the system not to split into local-global
> phases. At this point, the `table.exec.source-idle-timeout ` configuration
> takes effect and the watermark is aligned.
> `
> st.getConfig().getConfiguration().setString("table.optimizer.agg-phase-strategy",
> "ONE_PHASE");
> `
> result:
> !image-2022-07-12-15-20-06-919.png!
>
> As mentioned above, I hope to be able to aggregate in two stages and align
> the watermarks at the same time.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)