Xingcan Cui created FLINK-34926:
-----------------------------------
Summary: Adaptive auto parallelism doesn't work for a query
Key: FLINK-34926
URL: https://issues.apache.org/jira/browse/FLINK-34926
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Affects Versions: 1.18.1
Reporter: Xingcan Cui
Attachments: image_720.png
We have the following query running in batch mode.
{code:java}
WITH FEATURE_INCLUSION AS (
SELECT
insertion_id, -- Not unique
features -- Array<Row<key, value>>
FROM
features_table
),
TOTAL AS (
SELECT
COUNT(DISTINCT insertion_id) total_id
FROM
FEATURE_INCLUSION
),
FEATURE_INCLUSION_COUNTS AS (
SELECT
`key`,
COUNT(DISTINCT insertion_id) AS id_count
FROM
FEATURE_INCLUSION,
UNNEST(features) as t (`key`, `value`)
WHERE
TRUE
GROUP BY
`key`
),
RESULTS AS (
SELECT
`key`
FROM
FEATURE_INCLUSION_COUNTS,
TOTAL
WHERE
(1.0 * id_count)/total_id > 0.1
)
SELECT
JSON_ARRAYAGG(`key`) AS feature_ids,
FROM
RESULTS{code}
The parallelism adaptively set by Flink for the following operator was always 1.
{code:java}
[37]:HashAggregate(isMerge=[true], groupBy=[key, insertion_id], select=[key,
insertion_id])
+- [38]:LocalHashAggregate(groupBy=[key], select=[key,
Partial_COUNT(insertion_id) AS count$0]){code}
If we turn off `execution.batch.adaptive.auto-parallelism.enabled` and manually
set `parallelism.default` to be greater than one, it worked.
The screenshot of the full job graph is attached. !image_720.png!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)