[ https://issues.apache.org/jira/browse/FLINK-36377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yang Li updated FLINK-36377: ---------------------------- Description: h2. Introduction In Flink, after applying a group by, users may use LAST_VALUE to process certain fields to ensure that all fields have corresponding aggregation functions. Currently, LAST_VALUE does not support the ROW type syntax, so users always apply the LAST_VALUE function to each individual field separately, as shown below. _SELECT_ _LAST_VALUE(bool_a) AS last_bool_a,_ _LAST_VALUE(int2_b) AS last_int2_b,_ _LAST_VALUE(int4_c) AS last_int4_c,_ _LAST_VALUE(int8_d) AS last_int8_d,_ _LAST_VALUE(float4_e) AS last_float4_e,_ _LAST_VALUE(float4_f) AS last_float4_f,_ _LAST_VALUE(numeric_g) AS last_numeric_g,_ _LAST_VALUE(text_m) AS last_text_m,_ _LAST_VALUE(varchar_p) AS last_varchar_p,_ _date_h_ _FROM source_table_ _GROUP BY date_h_ If the upstream operator is a retract stream, this approach will lead to redundant StateMap traversal. To facilitate retraction, Flink's internal {{LastValueWithRetractAggFunction}} will store all historical data related to the primary key. When the last value is deleted, it will traverse all keys in the orderToValue (which maps timestamps to data) and this {{MapView}} is stored in the form of {{{}StateMap{}}}. More {{LAST_VALUE}} functions leads to more times the read and write operations of RocksDB. Therefore, I advocate for handling {{ROW}} types with {{{}LAST_VALUE{}}}, allowing support for all fields with just one {{LAST_VALUE}} function as below. _SELECT_ _LAST_VALUE(_ _ROW(_ _bool_a,_ _int2_b,_ _int4_c,_ _int8_d,_ _float4_e,_ _float4_f,_ _numeric_g,_ _text_m,_ _varchar_p_ _)_ _) AS row_data,_ _date_h_ _FROM source_table_ _GROUP BY date_h_ The experiment indicates that applying the {{ROW}} type to the {{LAST_VALUE}} function can improve the processing speed for retract streams, but has no effect on append-only streams. h2. Evaluation: The throughput of jobs was compared based on whether the {{ROW}} type was used in the {{LAST_VALUE}} function, considering both retract and append-only scenarios. h3. Retraction Use a deduplication operator to convert the append-only stream generated by datagen into a retract stream. Two jobs show difference in throughput (Row 4817: Mean 1808). Through flame graph analysis, applying the ROW type to the LAST_VALUE function reduces the consumption of the aggregate function calls to accumulate, with CPU usage for accumulate being (ROW 20.02%: Separated 66.98%). LastValueWithRetractAccumulator uses MapState storage MapView. Therefore, updating the LastValueWithRetractAccumulator requires reading from or writing to RocksDB. h3. AppendOnly Two jobs show little difference in throughput (Row 13411: Mean 10673). Further examination of the flame graphs for both processes reveals that the bottleneck for both jobs lies in getting {{RocksDBValueState}} which is called by {{{}GroupFunction{}}}. Using {{ROW}} aggregation does not yield significant optimization in this part. I suspect it's because Flink uses RowData to store data from multiple Accumulators, and every time the {{accState}} invokes the {{value}} method, it reads all the Accumulators at the same time. Therefore, the use of ROW optimization might not be very effective. h2. Conclusion # Using ROW type for LAST_VALUE Aggregation can improve the processing speed for retract streams, with effectiveness proportional to the number of fields contained in the {{{}ROW{}}}. # Using ROW type for LAST_VALUE Aggregation results in limited improvements , as the optimization effect on state backend read speed is not significant. was: h2. Introduction In Flink, after applying a group by, users may use LAST_VALUE to process certain fields to ensure that all fields have corresponding aggregation functions. Currently, LAST_VALUE does not support the ROW type syntax, so users always apply the LAST_VALUE function to each individual field separately, as shown below. _SELECT_ _LAST_VALUE(bool_a) AS last_bool_a,_ _LAST_VALUE(int2_b) AS last_int2_b,_ _LAST_VALUE(int4_c) AS last_int4_c,_ _LAST_VALUE(int8_d) AS last_int8_d,_ _LAST_VALUE(float4_e) AS last_float4_e,_ _LAST_VALUE(float4_f) AS last_float4_f,_ _LAST_VALUE(numeric_g) AS last_numeric_g,_ _LAST_VALUE(text_m) AS last_text_m,_ _LAST_VALUE(varchar_p) AS last_varchar_p,_ _date_h_ _FROM source_table_ _GROUP BY date_h_ If the upstream operator is a retract stream, this approach will lead to redundant StateMap traversal. To facilitate retraction, Flink's internal \{{LastValueWithRetractAggFunction}} will store all historical data related to the primary key. When the last value is deleted, it will traverse all keys in the\{{ orderToValue}} (which maps timestamps to data) and this {{MapView}} is stored in the form of {{{}StateMap{}}}. More {{LAST_VALUE}} functions leads to more times the read and write operations of RocksDB. Therefore, I advocate for handling {{ROW}} types with {{{}LAST_VALUE{}}}, allowing support for all fields with just one {{LAST_VALUE}} function as below. _SELECT_ _LAST_VALUE(_ _ROW(_ _bool_a,_ _int2_b,_ _int4_c,_ _int8_d,_ _float4_e,_ _float4_f,_ _numeric_g,_ _text_m,_ _varchar_p_ _)_ _) AS row_data,_ _date_h_ _FROM source_table_ _GROUP BY date_h_ The experiment indicates that applying the {{ROW}} type to the {{LAST_VALUE}} function can improve the processing speed for retract streams, but has no effect on append-only streams. h2. Evaluation: The throughput of jobs was compared based on whether the {{ROW}} type was used in the {{LAST_VALUE}} function, considering both retract and append-only scenarios. h3. Retraction Use a deduplication operator to convert the append-only stream generated by datagen into a retract stream. Two jobs show difference in throughput (Row 4817: Mean 1808). Through flame graph analysis, applying the ROW type to the LAST_VALUE function reduces the consumption of the aggregate function calls to accumulate, with CPU usage for accumulate being (ROW 20.02%: Separated 66.98%). LastValueWithRetractAccumulator uses MapState storage MapView. Therefore, updating the LastValueWithRetractAccumulator requires reading from or writing to RocksDB. h3. AppendOnly Two jobs show little difference in throughput (Row 13411: Mean 10673). Further examination of the flame graphs for both processes reveals that the bottleneck for both jobs lies in getting {{RocksDBValueState}} which is called by {{{}GroupFunction{}}}. Using {{ROW}} aggregation does not yield significant optimization in this part. I suspect it's because Flink uses RowData to store data from multiple Accumulators, and every time the {{accState}} invokes the {{value}} method, it reads all the Accumulators at the same time. Therefore, the use of ROW optimization might not be very effective. h2. Conclusion # Using ROW type for LAST_VALUE Aggregation can improve the processing speed for retract streams, with effectiveness proportional to the number of fields contained in the {{{}ROW{}}}. # Using ROW type for LAST_VALUE Aggregation results in limited improvements , as the optimization effect on state backend read speed is not significant. > Support the use of the LAST_VALUE aggregate function on ROW type data > ---------------------------------------------------------------------- > > Key: FLINK-36377 > URL: https://issues.apache.org/jira/browse/FLINK-36377 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends > Reporter: Yang Li > Assignee: Yang Li > Priority: Major > > h2. Introduction > In Flink, after applying a group by, users may use LAST_VALUE to process > certain fields to ensure that all fields have corresponding aggregation > functions. Currently, LAST_VALUE does not support the ROW type syntax, so > users always apply the LAST_VALUE function to each individual field > separately, as shown below. > _SELECT_ > _LAST_VALUE(bool_a) AS last_bool_a,_ > _LAST_VALUE(int2_b) AS last_int2_b,_ > _LAST_VALUE(int4_c) AS last_int4_c,_ > _LAST_VALUE(int8_d) AS last_int8_d,_ > _LAST_VALUE(float4_e) AS last_float4_e,_ > _LAST_VALUE(float4_f) AS last_float4_f,_ > _LAST_VALUE(numeric_g) AS last_numeric_g,_ > _LAST_VALUE(text_m) AS last_text_m,_ > _LAST_VALUE(varchar_p) AS last_varchar_p,_ > _date_h_ > _FROM source_table_ > _GROUP BY date_h_ > > If the upstream operator is a retract stream, this approach will lead to > redundant StateMap traversal. To facilitate retraction, Flink's internal > {{LastValueWithRetractAggFunction}} will store all historical data related to > the primary key. When the last value is deleted, it will traverse all keys in > the orderToValue (which maps timestamps to data) and this {{MapView}} is > stored in the form of {{{}StateMap{}}}. More {{LAST_VALUE}} functions leads > to more times the read and write operations of RocksDB. Therefore, I advocate > for handling {{ROW}} types with {{{}LAST_VALUE{}}}, allowing support for all > fields with just one {{LAST_VALUE}} function as below. > _SELECT_ > _LAST_VALUE(_ > _ROW(_ > _bool_a,_ > _int2_b,_ > _int4_c,_ > _int8_d,_ > _float4_e,_ > _float4_f,_ > _numeric_g,_ > _text_m,_ > _varchar_p_ > _)_ > _) AS row_data,_ > _date_h_ > _FROM source_table_ > _GROUP BY date_h_ > The experiment indicates that applying the {{ROW}} type to the {{LAST_VALUE}} > function can improve the processing speed for retract streams, but has no > effect on append-only streams. > h2. Evaluation: > The throughput of jobs was compared based on whether the {{ROW}} type was > used in the {{LAST_VALUE}} function, considering both retract and append-only > scenarios. > h3. Retraction > Use a deduplication operator to convert the append-only stream generated by > datagen into a retract stream. Two jobs show difference in throughput (Row > 4817: Mean 1808). Through flame graph analysis, applying the ROW type to the > LAST_VALUE function reduces the consumption of the aggregate function calls > to accumulate, with CPU usage for accumulate being (ROW 20.02%: Separated > 66.98%). LastValueWithRetractAccumulator uses MapState storage MapView. > Therefore, updating the LastValueWithRetractAccumulator requires reading from > or writing to RocksDB. > h3. AppendOnly > Two jobs show little difference in throughput (Row 13411: Mean 10673). > Further examination of the flame graphs for both processes reveals that the > bottleneck > for both jobs lies in getting {{RocksDBValueState}} which is called by > {{{}GroupFunction{}}}. Using {{ROW}} aggregation does not yield significant > optimization in this part. I suspect it's because Flink uses RowData to store > data from multiple Accumulators, and every time the {{accState}} invokes the > {{value}} method, it reads all the Accumulators at the same time. Therefore, > the use of ROW optimization might not be very effective. > h2. Conclusion > # Using ROW type for LAST_VALUE Aggregation can improve the processing speed > for retract streams, with effectiveness proportional to the number of fields > contained in the {{{}ROW{}}}. > # Using ROW type for LAST_VALUE Aggregation results in limited improvements > , as the optimization effect on state backend read speed is not significant. -- This message was sent by Atlassian Jira (v8.20.10#820010)