[
https://issues.apache.org/jira/browse/FLINK-36377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yang Li updated FLINK-36377:
----------------------------
Description:
h2. Introduction
In Flink, after applying a group by, users may use LAST_VALUE to process
certain fields to
ensure that all fields have corresponding aggregation functions. Currently,
LAST_VALUE
does not support the ROW type syntax, so users always apply the LAST_VALUE
function
to each individual field separately, as shown below.
SELECT
LAST_VALUE(bool_a) AS last_bool_a,
LAST_VALUE(int2_b) AS last_int2_b,
LAST_VALUE(int4_c) AS last_int4_c,
LAST_VALUE(int8_d) AS last_int8_d,
LAST_VALUE(float4_e) AS last_float4_e,
LAST_VALUE(float4_f) AS last_float4_f,
LAST_VALUE(numeric_g) AS last_numeric_g,
LAST_VALUE(text_m) AS last_text_m,
LAST_VALUE(varchar_p) AS last_varchar_p,
date_h
FROM source_table
GROUP BY date_h
If the upstream operator is a retract stream, this approach will lead to
redundant StateMap
traversal. To facilitate retraction, Flink's internal
{{LastValueWithRetractAggFunction}} will
store all historical data related to the primary key. When the last value is
deleted, it will traverse
all keys in the {{orderToValue}} (which maps timestamps to data) and this
{{MapView}} is stored in
the form of {{{}StateMap{}}}.
More {{LAST_VALUE}} functions leads to more times the read and write operations
of RocksDB.
Therefore, I advocate for handling {{ROW}} types with {{{}LAST_VALUE{}}},
allowing support for all fields
with just one {{LAST_VALUE}} function as below.
SELECT
LAST_VALUE(
ROW(
bool_a,
int2_b,
int4_c,
int8_d,
float4_e,
float4_f,
numeric_g,
text_m,
varchar_p
)
) AS row_data,
date_h
FROM source_table
GROUP BY date_h
The experiment indicates that applying the {{ROW}} type to the {{LAST_VALUE}}
function can improve the processing speed for retract streams, but has no
effect on append-only streams.
h2. Evaluation:
The throughput of jobs was compared based on whether the {{ROW}} type was used
in the {{LAST_VALUE}} function,
considering both retract and append-only scenarios.
h3. Retraction
Use a deduplication operator to convert the append-only stream generated by
datagen into a retract stream.
h4. LAST_VALUE with Separated Fields
!https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=N2UxY2VkZjAwOWM0M2M4ODYxNzMwZjA3ZDYxNTYzOTVfbElkb2JxUkwzY3hHNEZ4TXZYNUxHaEVQdVk3M25mcWZfVG9rZW46Ym94azRqS3VCeXpQSFl1YjVhaVF0NVBMcHhjXzE3MjczMzU0NjE6MTcyNzMzOTA2MV9WNA|width=903,height=236!
h4. LAST_VALUE with ROW
!https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=MTNkOTI3OTRhYmU3ZmYxMzE4NDhkOWU1NDkxYzZmNTBfNUtKVGp0b0lOd1MyUVpDbnR0SWJ4aGxQd3QwNTZmZXdfVG9rZW46Ym94azR1UTVZUGZFdnFXMktBdWdHMjd6cHVmXzE3MjczMzU0NjE6MTcyNzMzOTA2MV9WNA|width=894,height=233!
h4. Summary
he two jobs show little difference in throughput (Row 4817: Mean 1808).
Through flame graph analysis, applying the ROW type to the LAST_VALUE
function reduces the consumption of the aggregate function calls to accumulate,
with CPU usage for accumulate being (ROW 20.02%: Separated 66.98%).
LastValueWithRetractAccumulator uses MapState storage MapView.
Therefore, updating the LastValueWithRetractAccumulator requires reading from
or writing to RocksDB.
h3. AppendOnly
h4. LAST_VALUE with Separated Fields
!https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=OGExYjk0NTQ3NDNmZDQ4OTI5ODhiZTliM2QzODM5YmVfcE1LQTZSRElaRkZpNmhsRkxndzhSRGxQdFJrTFdZQ0lfVG9rZW46Ym94azRVYWlsWU5UTmJ6VHhRTmRaTnJHRHBmXzE3MjczMzU0NjE6MTcyNzMzOTA2MV9WNA|width=870,height=228!
h4. LAST_VALUE with ROW
!https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=N2JkZjFiMDZjODU4MmEyNGNmNjgzNzQyYzM4YmYzZTlfNUt3VVc1ZmZHeFpoSWRvdm1HYU0yRldxcGtLM1MyY09fVG9rZW46Ym94azRqcTZvNW04NU1KTXpoQUZEa0g4R29mXzE3MjczMzU0NjE6MTcyNzMzOTA2MV9WNA|width=864,height=230!
h4. Summary
The two jobs show little difference in throughput (Row 13411: Mean 10673).
Further examination of the flame graphs for both processes reveals that the
bottleneck
for both jobs lies in getting {{RocksDBValueState}} which is called by
{{{}GroupFunction{}}}.
Using {{ROW}} aggregation does not yield significant optimization in this part.
I suspect it's
because Flink uses RowData to store data from multiple Accumulators, and every
time
the {{accState}} invokes the {{value}} method, it reads all the Accumulators at
the same time.
Therefore, the use of ROW optimization might not be very effective.
h2. Conclusion
# Using ROW type for LAST_VALUE Aggregation can improve the processing speed
for retract streams, with effectiveness proportional to the number of fields
contained in the {{{}ROW{}}}.
# Using ROW type for LAST_VALUE Aggregation results in limited improvements ,
as the optimization effect on state backend read speed is not significant.
was:
h2. Introduction
In Flink, after applying a group by, users may use LAST_VALUE to process
certain fields to
ensure that all fields have corresponding aggregation functions. Currently,
LAST_VALUE
does not support the ROW type syntax, so users always apply the LAST_VALUE
function
to each individual field separately, as shown below.
SELECT
LAST_VALUE(bool_a) AS last_bool_a,
LAST_VALUE(int2_b) AS last_int2_b,
LAST_VALUE(int4_c) AS last_int4_c,
LAST_VALUE(int8_d) AS last_int8_d,
LAST_VALUE(float4_e) AS last_float4_e,
LAST_VALUE(float4_f) AS last_float4_f,
LAST_VALUE(numeric_g) AS last_numeric_g,
LAST_VALUE(text_m) AS last_text_m,
LAST_VALUE(varchar_p) AS last_varchar_p,
date_h
FROM source_table
GROUP BY date_h
If the upstream operator is a retract stream, this approach will lead to
redundant StateMap
traversal. To facilitate retraction, Flink's internal
{{LastValueWithRetractAggFunction}} will
store all historical data related to the primary key. When the last value is
deleted, it will traverse
all keys in the {{orderToValue}} (which maps timestamps to data) and this
{{MapView}} is stored in
the form of {{{}StateMap{}}}.
More {{LAST_VALUE}} functions leads to more times the read and write operations
of RocksDB.
Therefore, I advocate for handling {{ROW}} types with {{{}LAST_VALUE{}}},
allowing support for all fields
with just one {{LAST_VALUE}} function as below.
SELECT
LAST_VALUE(
ROW(
bool_a,
int2_b,
int4_c,
int8_d,
float4_e,
float4_f,
numeric_g,
text_m,
varchar_p
)
) AS row_data,
date_h
FROM source_table
GROUP BY date_h
The experiment indicates that applying the {{ROW}} type to the {{LAST_VALUE}}
function can improve the processing speed for retract streams, but has no
effect on append-only streams.
h2. Evaluation:
The throughput of jobs was compared based on whether the {{ROW}} type was used
in the {{LAST_VALUE}} function,
considering both retract and append-only scenarios.
h3. Retraction
Use a deduplication operator to convert the append-only stream generated by
datagen into a retract stream.
h4. LAST_VALUE with Separated Fields
!https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=N2UxY2VkZjAwOWM0M2M4ODYxNzMwZjA3ZDYxNTYzOTVfbElkb2JxUkwzY3hHNEZ4TXZYNUxHaEVQdVk3M25mcWZfVG9rZW46Ym94azRqS3VCeXpQSFl1YjVhaVF0NVBMcHhjXzE3MjczMzU0NjE6MTcyNzMzOTA2MV9WNA|width=903,height=236!
h4. LAST_VALUE with ROW
!https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=MTNkOTI3OTRhYmU3ZmYxMzE4NDhkOWU1NDkxYzZmNTBfNUtKVGp0b0lOd1MyUVpDbnR0SWJ4aGxQd3QwNTZmZXdfVG9rZW46Ym94azR1UTVZUGZFdnFXMktBdWdHMjd6cHVmXzE3MjczMzU0NjE6MTcyNzMzOTA2MV9WNA|width=894,height=233!
h4. Summary
he two jobs show little difference in throughput (Row 4817: Mean 1808).
Through flame graph analysis, applying the ROW type to the LAST_VALUE
function reduces the consumption of the aggregate function calls to accumulate,
with CPU usage for accumulate being (ROW 20.02%: Separated 66.98%).
LastValueWithRetractAccumulator uses MapState storage MapView.
Therefore, updating the LastValueWithRetractAccumulator requires reading from
or writing to RocksDB.
h3. AppendOnly
h4. LAST_VALUE with Separated Fields
!https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=OGExYjk0NTQ3NDNmZDQ4OTI5ODhiZTliM2QzODM5YmVfcE1LQTZSRElaRkZpNmhsRkxndzhSRGxQdFJrTFdZQ0lfVG9rZW46Ym94azRVYWlsWU5UTmJ6VHhRTmRaTnJHRHBmXzE3MjczMzU0NjE6MTcyNzMzOTA2MV9WNA|width=870,height=228!
h4. LAST_VALUE with ROW
!https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=N2JkZjFiMDZjODU4MmEyNGNmNjgzNzQyYzM4YmYzZTlfNUt3VVc1ZmZHeFpoSWRvdm1HYU0yRldxcGtLM1MyY09fVG9rZW46Ym94azRqcTZvNW04NU1KTXpoQUZEa0g4R29mXzE3MjczMzU0NjE6MTcyNzMzOTA2MV9WNA|width=864,height=230!
h4. Summary
The two jobs show little difference in throughput (Row 13411: Mean 10673).
Further examination of the flame graphs for both processes reveals that the
bottleneck
for both jobs lies in getting {{RocksDBValueState}} which is called by
{{{}GroupFunction{}}}.
Using {{ROW}} aggregation does not yield significant optimization in this part.
I suspect it's
because Flink uses RowData to store data from multiple Accumulators, and every
time
the {{accState}} invokes the {{value}} method, it reads all the Accumulators at
the same time.
Therefore, the use of ROW optimization might not be very effective.
h2. Conclusion
#
Using ROW type for LAST_VALUE Aggregation can improve the processing speed for
retract streams, with effectiveness proportional to the number of fields
contained in the {{{}ROW{}}}.
#
Using ROW type for LAST_VALUE Aggregation results in limited improvements , as
the optimization effect on state backend read speed is not significant.
> Support the use of the LAST_VALUE aggregate function on ROW type data
> ----------------------------------------------------------------------
>
> Key: FLINK-36377
> URL: https://issues.apache.org/jira/browse/FLINK-36377
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / State Backends
> Reporter: Yang Li
> Priority: Major
>
> h2. Introduction
> In Flink, after applying a group by, users may use LAST_VALUE to process
> certain fields to
> ensure that all fields have corresponding aggregation functions. Currently,
> LAST_VALUE
> does not support the ROW type syntax, so users always apply the LAST_VALUE
> function
> to each individual field separately, as shown below.
> SELECT
> LAST_VALUE(bool_a) AS last_bool_a,
> LAST_VALUE(int2_b) AS last_int2_b,
> LAST_VALUE(int4_c) AS last_int4_c,
> LAST_VALUE(int8_d) AS last_int8_d,
> LAST_VALUE(float4_e) AS last_float4_e,
> LAST_VALUE(float4_f) AS last_float4_f,
> LAST_VALUE(numeric_g) AS last_numeric_g,
> LAST_VALUE(text_m) AS last_text_m,
> LAST_VALUE(varchar_p) AS last_varchar_p,
> date_h
> FROM source_table
> GROUP BY date_h
>
> If the upstream operator is a retract stream, this approach will lead to
> redundant StateMap
> traversal. To facilitate retraction, Flink's internal
> {{LastValueWithRetractAggFunction}} will
> store all historical data related to the primary key. When the last value is
> deleted, it will traverse
> all keys in the {{orderToValue}} (which maps timestamps to data) and this
> {{MapView}} is stored in
> the form of {{{}StateMap{}}}.
> More {{LAST_VALUE}} functions leads to more times the read and write
> operations of RocksDB.
> Therefore, I advocate for handling {{ROW}} types with {{{}LAST_VALUE{}}},
> allowing support for all fields
> with just one {{LAST_VALUE}} function as below.
> SELECT
> LAST_VALUE(
> ROW(
> bool_a,
> int2_b,
> int4_c,
> int8_d,
> float4_e,
> float4_f,
> numeric_g,
> text_m,
> varchar_p
> )
> ) AS row_data,
> date_h
> FROM source_table
> GROUP BY date_h
> The experiment indicates that applying the {{ROW}} type to the {{LAST_VALUE}}
> function can improve the processing speed for retract streams, but has no
> effect on append-only streams.
> h2. Evaluation:
> The throughput of jobs was compared based on whether the {{ROW}} type was
> used in the {{LAST_VALUE}} function,
> considering both retract and append-only scenarios.
> h3. Retraction
> Use a deduplication operator to convert the append-only stream generated by
> datagen into a retract stream.
> h4. LAST_VALUE with Separated Fields
>
> !https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=N2UxY2VkZjAwOWM0M2M4ODYxNzMwZjA3ZDYxNTYzOTVfbElkb2JxUkwzY3hHNEZ4TXZYNUxHaEVQdVk3M25mcWZfVG9rZW46Ym94azRqS3VCeXpQSFl1YjVhaVF0NVBMcHhjXzE3MjczMzU0NjE6MTcyNzMzOTA2MV9WNA|width=903,height=236!
> h4. LAST_VALUE with ROW
>
> !https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=MTNkOTI3OTRhYmU3ZmYxMzE4NDhkOWU1NDkxYzZmNTBfNUtKVGp0b0lOd1MyUVpDbnR0SWJ4aGxQd3QwNTZmZXdfVG9rZW46Ym94azR1UTVZUGZFdnFXMktBdWdHMjd6cHVmXzE3MjczMzU0NjE6MTcyNzMzOTA2MV9WNA|width=894,height=233!
> h4. Summary
> he two jobs show little difference in throughput (Row 4817: Mean 1808).
> Through flame graph analysis, applying the ROW type to the LAST_VALUE
> function reduces the consumption of the aggregate function calls to
> accumulate,
> with CPU usage for accumulate being (ROW 20.02%: Separated 66.98%).
> LastValueWithRetractAccumulator uses MapState storage MapView.
> Therefore, updating the LastValueWithRetractAccumulator requires reading from
> or writing to RocksDB.
> h3. AppendOnly
> h4. LAST_VALUE with Separated Fields
>
> !https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=OGExYjk0NTQ3NDNmZDQ4OTI5ODhiZTliM2QzODM5YmVfcE1LQTZSRElaRkZpNmhsRkxndzhSRGxQdFJrTFdZQ0lfVG9rZW46Ym94azRVYWlsWU5UTmJ6VHhRTmRaTnJHRHBmXzE3MjczMzU0NjE6MTcyNzMzOTA2MV9WNA|width=870,height=228!
> h4. LAST_VALUE with ROW
>
> !https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=N2JkZjFiMDZjODU4MmEyNGNmNjgzNzQyYzM4YmYzZTlfNUt3VVc1ZmZHeFpoSWRvdm1HYU0yRldxcGtLM1MyY09fVG9rZW46Ym94azRqcTZvNW04NU1KTXpoQUZEa0g4R29mXzE3MjczMzU0NjE6MTcyNzMzOTA2MV9WNA|width=864,height=230!
> h4. Summary
> The two jobs show little difference in throughput (Row 13411: Mean 10673).
> Further examination of the flame graphs for both processes reveals that the
> bottleneck
> for both jobs lies in getting {{RocksDBValueState}} which is called by
> {{{}GroupFunction{}}}.
> Using {{ROW}} aggregation does not yield significant optimization in this
> part. I suspect it's
> because Flink uses RowData to store data from multiple Accumulators, and
> every time
> the {{accState}} invokes the {{value}} method, it reads all the Accumulators
> at the same time.
> Therefore, the use of ROW optimization might not be very effective.
> h2. Conclusion
> # Using ROW type for LAST_VALUE Aggregation can improve the processing speed
> for retract streams, with effectiveness proportional to the number of fields
> contained in the {{{}ROW{}}}.
> # Using ROW type for LAST_VALUE Aggregation results in limited improvements
> , as the optimization effect on state backend read speed is not significant.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)