[ https://issues.apache.org/jira/browse/FLINK-36377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18013589#comment-18013589 ]
Feng Jin commented on FLINK-36377: ---------------------------------- [~dezhouliyang] Thank you for following up on this issue. Could you also add an explanation of why using TypeSerializer is necessary? cc [~Sergey Nuyanzin] > Support the use of the LAST_VALUE aggregate function on ROW type data > ---------------------------------------------------------------------- > > Key: FLINK-36377 > URL: https://issues.apache.org/jira/browse/FLINK-36377 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends > Reporter: Yang Li > Assignee: Yang Li > Priority: Major > Labels: pull-request-available > > h2. Introduction > In Flink, after applying a group by, users may use LAST_VALUE to process > certain fields to ensure that all fields have corresponding aggregation > functions. Currently, LAST_VALUE does not support the ROW type syntax, so > users always apply the LAST_VALUE function to each individual field > separately, as shown below. > _SELECT_ > _LAST_VALUE(bool_a) AS last_bool_a,_ > _LAST_VALUE(int2_b) AS last_int2_b,_ > _LAST_VALUE(int4_c) AS last_int4_c,_ > _LAST_VALUE(int8_d) AS last_int8_d,_ > _LAST_VALUE(float4_e) AS last_float4_e,_ > _LAST_VALUE(float4_f) AS last_float4_f,_ > _LAST_VALUE(numeric_g) AS last_numeric_g,_ > _LAST_VALUE(text_m) AS last_text_m,_ > _LAST_VALUE(varchar_p) AS last_varchar_p,_ > _date_h_ > _FROM source_table_ > _GROUP BY date_h_ > > If the upstream operator is a retract stream, this approach will lead to > redundant StateMap traversal. To facilitate retraction, Flink's internal > {{LastValueWithRetractAggFunction}} will store all historical data related to > the primary key. When the last value is deleted, it will traverse all keys in > the orderToValue (which maps timestamps to data) and this {{MapView}} is > stored in the form of {{{}StateMap{}}}. More {{LAST_VALUE}} functions leads > to more times the read and write operations of RocksDB. Therefore, I advocate > for handling {{ROW}} types with {{{}LAST_VALUE{}}}, allowing support for all > fields with just one {{LAST_VALUE}} function as below. > _SELECT_ > _LAST_VALUE(_ > _ROW(_ > _bool_a,_ > _int2_b,_ > _int4_c,_ > _int8_d,_ > _float4_e,_ > _float4_f,_ > _numeric_g,_ > _text_m,_ > _varchar_p_ > _)_ > _) AS row_data,_ > _date_h_ > _FROM source_table_ > _GROUP BY date_h_ > The experiment indicates that applying the {{ROW}} type to the {{LAST_VALUE}} > function can improve the processing speed for retract streams, but has no > effect on append-only streams. > h2. Evaluation: > The throughput of jobs was compared based on whether the {{ROW}} type was > used in the {{LAST_VALUE}} function, considering both retract and append-only > scenarios. > h3. Retraction > Use a deduplication operator to convert the append-only stream generated by > datagen into a retract stream. Two jobs show difference in throughput (Row > 4817: Mean 1808). Through flame graph analysis, applying the ROW type to the > LAST_VALUE function reduces the consumption of the aggregate function calls > to accumulate, with CPU usage for accumulate being (ROW 20.02%: Separated > 66.98%). LastValueWithRetractAccumulator uses MapState storage MapView. > Therefore, updating the LastValueWithRetractAccumulator requires reading from > or writing to RocksDB. > h3. AppendOnly > Two jobs show little difference in throughput (Row 13411: Mean 10673). > Further examination of the flame graphs for both processes reveals that the > bottleneck > for both jobs lies in getting {{RocksDBValueState}} which is called by > {{{}GroupFunction{}}}. Using {{ROW}} aggregation does not yield significant > optimization in this part. I suspect it's because Flink uses RowData to store > data from multiple Accumulators, and every time the {{accState}} invokes the > {{value}} method, it reads all the Accumulators at the same time. Therefore, > the use of ROW optimization might not be very effective. > h2. Conclusion > # Using ROW type for LAST_VALUE Aggregation can improve the processing speed > for retract streams, with effectiveness proportional to the number of fields > contained in the {{{}ROW{}}}. > # Using ROW type for LAST_VALUE Aggregation results in limited improvements > , as the optimization effect on state backend read speed is not significant. -- This message was sent by Atlassian Jira (v8.20.10#820010)