[ 
https://issues.apache.org/jira/browse/FLINK-36377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18013589#comment-18013589
 ] 

Feng Jin commented on FLINK-36377:
----------------------------------

[~dezhouliyang]   Thank you for following up on this issue. Could you also add 
an explanation of why using TypeSerializer is necessary?   cc [~Sergey 
Nuyanzin] 

>  Support the use of the LAST_VALUE aggregate function on ROW type data
> ----------------------------------------------------------------------
>
>                 Key: FLINK-36377
>                 URL: https://issues.apache.org/jira/browse/FLINK-36377
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / State Backends
>            Reporter: Yang Li
>            Assignee: Yang Li
>            Priority: Major
>              Labels: pull-request-available
>
> h2. Introduction
> In Flink, after applying a group by, users may use LAST_VALUE to process 
> certain fields to ensure that all fields have corresponding aggregation 
> functions. Currently, LAST_VALUE does not support the ROW type syntax, so 
> users always apply the LAST_VALUE function to each individual field 
> separately, as shown below.
> _SELECT_
>     _LAST_VALUE(bool_a) AS last_bool_a,_ 
>     _LAST_VALUE(int2_b) AS last_int2_b,_ 
>     _LAST_VALUE(int4_c) AS last_int4_c,_ 
>     _LAST_VALUE(int8_d) AS last_int8_d,_ 
>     _LAST_VALUE(float4_e) AS last_float4_e,_ 
>     _LAST_VALUE(float4_f) AS last_float4_f,_ 
>     _LAST_VALUE(numeric_g) AS last_numeric_g,_ 
>     _LAST_VALUE(text_m) AS last_text_m,_ 
>     _LAST_VALUE(varchar_p) AS last_varchar_p,_
>     _date_h_
> _FROM source_table_
> _GROUP BY date_h_
>  
> If the upstream operator is a retract stream, this approach will lead to 
> redundant StateMap traversal. To facilitate retraction, Flink's internal 
> {{LastValueWithRetractAggFunction}} will store all historical data related to 
> the primary key. When the last value is deleted, it will traverse all keys in 
> the orderToValue (which maps timestamps to data) and this {{MapView}} is 
> stored in the form of {{{}StateMap{}}}. More {{LAST_VALUE}} functions leads 
> to more times the read and write operations of RocksDB. Therefore, I advocate 
> for handling {{ROW}} types with {{{}LAST_VALUE{}}}, allowing support for all 
> fields with just one {{LAST_VALUE}} function as below.
> _SELECT_
>  _LAST_VALUE(_
>     _ROW(_
>         _bool_a,_
>         _int2_b,_
>         _int4_c,_
>         _int8_d,_
>         _float4_e,_
>         _float4_f,_
>         _numeric_g,_
>         _text_m,_
>         _varchar_p_
>     _)_
> _) AS row_data,_
> _date_h_
> _FROM source_table_
> _GROUP BY date_h_
> The experiment indicates that applying the {{ROW}} type to the {{LAST_VALUE}} 
> function can improve the processing speed for retract streams, but has no 
> effect on append-only streams.
> h2. Evaluation:
> The throughput of jobs was compared based on whether the {{ROW}} type was 
> used in the {{LAST_VALUE}} function, considering both retract and append-only 
> scenarios.
> h3. Retraction
> Use a deduplication operator to convert the append-only stream generated by 
> datagen into a retract stream. Two jobs show difference in throughput (Row 
> 4817: Mean 1808). Through flame graph analysis, applying the ROW type to the 
> LAST_VALUE function reduces the consumption of the aggregate function calls 
> to accumulate, with CPU usage for accumulate being (ROW 20.02%: Separated 
> 66.98%). LastValueWithRetractAccumulator uses MapState storage MapView. 
> Therefore, updating the LastValueWithRetractAccumulator requires reading from 
> or writing to RocksDB.
> h3. AppendOnly
> Two jobs show little difference in throughput (Row 13411: Mean 10673). 
> Further examination of the flame graphs for both processes reveals that the 
> bottleneck
> for both jobs lies in getting {{RocksDBValueState}} which is called by 
> {{{}GroupFunction{}}}. Using {{ROW}} aggregation does not yield significant 
> optimization in this part. I suspect it's because Flink uses RowData to store 
> data from multiple Accumulators, and every time the {{accState}} invokes the 
> {{value}} method, it reads all the Accumulators at the same time. Therefore, 
> the use of ROW optimization might not be very effective.
> h2. Conclusion
>  # Using ROW type for LAST_VALUE Aggregation can improve the processing speed 
> for retract streams, with effectiveness proportional to the number of fields 
> contained in the {{{}ROW{}}}.
>  # Using ROW type for LAST_VALUE Aggregation results in limited improvements 
> , as the optimization effect on state backend read speed is not significant.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to