[ 
https://issues.apache.org/jira/browse/SPARK-24763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16541367#comment-16541367
 ] 

Jungtaek Lim edited comment on SPARK-24763 at 7/12/18 9:20 AM:
---------------------------------------------------------------

I had a chance to craft various key/value cases (bigger key, bigger value, many 
key columns, many value columns) and ran similar tests on these cases. To 
handle 4 tests concurrently, I just had 2 trials (instead of 3 trials) per each 
case & disable/enable.

Same spark version and same AWS dedicated instance, same command to run JVM 
(just increased driver memory to 6g).

I'll describe test results per case.
{quote}App 1 - bigger key
{quote}
> Code

[https://github.com/HeartSaVioR/iot-trucking-app-spark-structured-streaming/blob/master/src/main/scala/com/hortonworks/spark/benchmark/BenchmarkMovingAggregationsListenerToFsKeyMuchBigger.scala]
 * key fields : window (start/end), mod (int), word (1000 chars string)
 * value fields : max_value (int), min_value (int), avg_value (double)

> Test Result - use same queries from above test

>> query result (source)
||mode||trial||avg(numInputRows)||avg(inputRowsPerSecond)||avg(processedRowsPerSecond)||
|disable|1|50000.0|10000.019877228518|21173.52173524455|
|disable|2|50000.0|9999.980273266538|21241.022682965504|
|enable|1|50000.0|10000.00006336634|21745.986204268098|
|enable|2|50000.0|10000.000007920793|21382.004127689153|

>> query result (duration)
||mode||trial||avg(addBatch)||avg(getBatch)||avg(getOffset)||avg(queryPlanning)||avg(triggerExecution)||avg(walCommit)||
|disable|1|2283.7425742574255|0.06930693069306931|null|62.475247524752476|2365.168316831683|10.267326732673267|
|disable|2|2274.3564356435645|0.09900990099009901|null|64.42574257425743|2358.059405940594|11.049504950495049|
|enable|1|2216.8514851485147|0.0891089108910891|null|65.23762376237623|2301.920792079208|10.752475247524753|
|enable|2|2260.366336633663|0.0297029702970297|null|63.742574257425744|2344.6633663366338|11.673267326732674|

>> query result (state information)
||mode||trial||batchId||numRowsTotal||numRowsUpdated||memoryUsedBytes||
|disable|1|150|34153|29274|68989767|
|disable|2|150|34153|29274|68981335|
|enable|1|150|38952|31352|44137447|
|enable|2|150|38969|31138|43849183|
 * average state row size

||mode||size||note||
|disable|2019.897256464| |
|enable|1129.178232806|55.90 %|
{quote}App 2 - bigger value
{quote}
> Code

[https://github.com/HeartSaVioR/iot-trucking-app-spark-structured-streaming/blob/master/src/main/scala/com/hortonworks/spark/benchmark/BenchmarkMovingAggregationsListenerToFsValueMuchBigger.scala]
 * key fields : window (start/end), mod (int)
 * value fields : max_value (int), min_value (int), avg_value (double), 
word_last (1000 chars string)

>> query result (source)
||mode||trial||avg(numInputRows)||avg(inputRowsPerSecond)||avg(processedRowsPerSecond)||
|disable|1|50000.0|10000.000039603963|20021.91393461469|
|disable|2|50000.0|10000.000071287132|19811.730674701703|
|enable|1|50000.0|10000.000435644328|20518.188917614298|
|enable|2|50000.0|10000.000071287132|20312.040082394597|

>> query result (duration)
||mode||trial||avg(addBatch)||avg(getBatch)||avg(getOffset)||avg(queryPlanning)||avg(triggerExecution)||avg(walCommit)||
|disable|1|2417.990099009901|0.0297029702970297|null|64.9009900990099|2500.910891089109|10.158415841584159|
|disable|2|2447.7524752475247|0.039603960396039604|null|62.386138613861384|2529.3564356435645|10.712871287128714|
|enable|1|2357.891089108911|0.04950495049504951|null|62.81188118811881|2439.891089108911|10.881188118811881|
|enable|2|2381.5346534653463|0.04950495049504951|null|61.742574257425744|2463.108910891089|11.435643564356436|

>> query result (state information)
||mode||trial||batchId||numRowsTotal||numRowsUpdated||memoryUsedBytes||
|disable|1|150|700|600|934447|
|disable|2|150|700|600|934223|
|enable|1|150|800|700|1029799|
|enable|2|150|800|700|1029183|
 * average state row size

||mode||size||note||
|disable|1334.764285714| |
|enable|1286.86375|96.41 %|
{quote}App 3 - many keys
{quote}
> Code

[https://github.com/HeartSaVioR/iot-trucking-app-spark-structured-streaming/blob/master/src/main/scala/com/hortonworks/spark/benchmark/BenchmarkMovingAggregationsListenerToFsManyKeys.scala]
 * key fields : window (start/end), mod (int), word (10 chars string), and 20 
more columns (int)
 * value fields : max_value (int), min_value (int), avg_value (double)

> Test Result - use same queries from above test

>> query result (source)
||mode||trial||avg(numInputRows)||avg(inputRowsPerSecond)||avg(processedRowsPerSecond)||
|disable|1|50000.0|10000.000055445547|17645.896698295066|
|disable|2|50000.0|10000.00008712872|17627.17064059849|
|enable|1|50000.0|10000.000039603963|18408.23205926807|
|enable|2|50000.0|10000.000071287132|18568.40828962387|

>> query result (duration)
||mode||trial||avg(addBatch)||avg(getBatch)||avg(getOffset)||avg(queryPlanning)||avg(triggerExecution)||avg(walCommit)||
|disable|1|2739.990099009901|0.0594059405940594|null|77.48514851485149|2837.4851485148515|11.584158415841584|
|disable|2|2742.960396039604|0.04950495049504951|null|76.03960396039604|2839.019801980198|11.009900990099009|
|enable|1|2625.108910891089|0.019801980198019802|null|78.06930693069307|2722.3366336633662|10.623762376237623|
|enable|2|2599.5544554455446|0.039603960396039604|null|76.24752475247524|2695.029702970297|10.643564356435643|

>> query result (state information)
||mode||trial||batchId||numRowsTotal||numRowsUpdated||memoryUsedBytes||
|disable|1|150|38946|31487|23036727|
|disable|2|150|38992|30800|23084343|
|enable|1|150|38915|31883|15208727|
|enable|2|150|39007|30401|14829975|
 * average state row size

||mode||size||note||
|disable|591.765993004| |
|enable|385.503337366|65.14 %|
{quote}App 4 - many values
{quote}
> Code

[https://github.com/HeartSaVioR/iot-trucking-app-spark-structured-streaming/blob/master/src/main/scala/com/hortonworks/spark/benchmark/BenchmarkMovingAggregationsListenerToFsManyKeys.scala]
 * key fields : window (start/end), mod (int), word (10 chars string)
 * value fields : max (int), min (int), avg (double) for 20 columns

> Test Result - use same queries from above test

>> query result (source)
||mode||trial||avg(numInputRows)||avg(inputRowsPerSecond)||avg(processedRowsPerSecond)||
|disable|1|63465.346534653465|10005.318225922994|9999.487035328104|
|disable|2|65247.52475247525|10011.503266213555|10001.570001460635|
|enable|1|62673.267326732675|9998.149926805827|9997.805403434037|
|enable|2|63564.35643564357|10000.005956852667|9996.36862736686|

>> query result (duration)
||mode||trial||avg(addBatch)||avg(getBatch)||avg(getOffset)||avg(queryPlanning)||avg(triggerExecution)||avg(walCommit)||
|disable|1|6235.069306930693|0.04950495049504951|null|93.63366336633663|6348.584158415842|9.910891089108912|
|disable|2|6410.3564356435645|0.039603960396039604|null|93.73267326732673|6523.821782178218|9.910891089108912|
|enable|1|6162.6732673267325|0.04950495049504951|null|88.62376237623762|6271.178217821782|10.435643564356436|
|enable|2|6244.079207920792|0.0297029702970297|null|94.26732673267327|6357.970297029703|9.930693069306932|

>> query result (state information)
||mode||trial||batchId||numRowsTotal||numRowsUpdated||memoryUsedBytes||
|disable|1|150|34776|29808|30019279|
|disable|2|150|39627|33928|33920375|
|enable|1|150|39842|30502|31837671|
|enable|2|150|34776|29616|27082887|
 * average state row size

||mode||size||note||
|disable|859.604889211| |
|enable|788.939591288|91.77 %|

 
{quote}Summary
{quote} * Enabling option showed on far or slightly better throughputs from 
three cases. It showed slightly lower throughput from one case but it was 
around 0.1% which I think we can treat it as noise.
 ** In overall it didn't show any performance regression at any cases.
 * Enabling option reduced state memory usage according to the ratio of 
key-value size as expected. Enabling option could reduce state memory usage 
around 45% for bigger key case.


was (Author: kabhwan):
I had a chance to craft various key/value cases (bigger key, bigger value, many 
key columns, many value columns) and ran similar tests on these cases. To 
handle 4 tests concurrently, I just had 2 trials (instead of 3 trials) per each 
case & disable/enable.

Same spark version and same AWS dedicated instance, same command to run JVM 
(just increased driver memory to 6g).

I'll describe test results per case.
{quote}App 1 - bigger key
{quote}
> Code

[https://github.com/HeartSaVioR/iot-trucking-app-spark-structured-streaming/blob/master/src/main/scala/com/hortonworks/spark/benchmark/BenchmarkMovingAggregationsListenerToFsKeyMuchBigger.scala]
 * key fields : window (start/end), mod (int), word (1000 chars string)
 * value fields : max_value (int), min_value (int), avg_value (double)

> Test Result - use same queries from above test

>> query result (source)
||mode||trial||avg(numInputRows)||avg(inputRowsPerSecond)||avg(processedRowsPerSecond)||
|disable|1|50000.0|10000.019877228518|21173.52173524455|
|disable|2|50000.0|9999.980273266538|21241.022682965504|
|enable|1|50000.0|10000.00006336634|21745.986204268098|
|enable|2|50000.0|10000.000007920793|21382.004127689153|

>> query result (duration)
||mode||trial||avg(addBatch)||avg(getBatch)||avg(getOffset)||avg(queryPlanning)||avg(triggerExecution)||avg(walCommit)||
|disable|1|2283.7425742574255|0.06930693069306931|null|62.475247524752476|2365.168316831683|10.267326732673267|
|disable|2|2274.3564356435645|0.09900990099009901|null|64.42574257425743|2358.059405940594|11.049504950495049|
|enable|1|2216.8514851485147|0.0891089108910891|null|65.23762376237623|2301.920792079208|10.752475247524753|
|enable|2|2260.366336633663|0.0297029702970297|null|63.742574257425744|2344.6633663366338|11.673267326732674|

>> query result (state information)
||mode||trial||batchId||numRowsTotal||numRowsUpdated||memoryUsedBytes||
|disable|1|150|34153|29274|68989767|
|disable|2|150|34153|29274|68981335|
|enable|1|150|38952|31352|44137447|
|enable|2|150|38969|31138|43849183|
 * average state row size

||mode||size||note||
|disable|2019.897256464| |
|enable|1129.178232806|55.90 %|
{quote}App 2 - bigger value
{quote}
> Code

[https://github.com/HeartSaVioR/iot-trucking-app-spark-structured-streaming/blob/master/src/main/scala/com/hortonworks/spark/benchmark/BenchmarkMovingAggregationsListenerToFsValueMuchBigger.scala]
 * key fields : window (start/end), mod (int)
 * value fields : max_value (int), min_value (int), avg_value (double), 
word_last (1000 chars string)

>> query result (source)
||mode||trial||avg(numInputRows)||avg(inputRowsPerSecond)||avg(processedRowsPerSecond)||
|disable|1|50000.0|10000.000039603963|20021.91393461469|
|disable|2|50000.0|10000.000071287132|19811.730674701703|
|enable|1|50000.0|10000.000435644328|20518.188917614298|
|enable|2|50000.0|10000.000071287132|20312.040082394597|

>> query result (duration)
||mode||trial||avg(addBatch)||avg(getBatch)||avg(getOffset)||avg(queryPlanning)||avg(triggerExecution)||avg(walCommit)||
|disable|1|2417.990099009901|0.0297029702970297|null|64.9009900990099|2500.910891089109|10.158415841584159|
|disable|2|2447.7524752475247|0.039603960396039604|null|62.386138613861384|2529.3564356435645|10.712871287128714|
|enable|1|2357.891089108911|0.04950495049504951|null|62.81188118811881|2439.891089108911|10.881188118811881|
|enable|2|2381.5346534653463|0.04950495049504951|null|61.742574257425744|2463.108910891089|11.435643564356436|

>> query result (state information)
||mode||trial||batchId||numRowsTotal||numRowsUpdated||memoryUsedBytes||
|disable|1|150|700|600|934447|
|disable|2|150|700|600|934223|
|enable|1|150|800|700|1029799|
|enable|2|150|800|700|1029183|
 * average state row size

||mode||size||note||
|disable|1334.764285714| |
|enable|1286.86375|96.41 %|
{quote}App 3 - many keys
{quote}
> Code

[https://github.com/HeartSaVioR/iot-trucking-app-spark-structured-streaming/blob/master/src/main/scala/com/hortonworks/spark/benchmark/BenchmarkMovingAggregationsListenerToFsManyKeys.scala]
 * key fields : window (start/end), mod (int), word (10 chars string), and 20 
more columns (int)
 * value fields : max_value (int), min_value (int), avg_value (double)

> Test Result - use same queries from above test

>> query result (source)
||mode||trial||avg(numInputRows)||avg(inputRowsPerSecond)||avg(processedRowsPerSecond)||
|disable|1|50000.0|10000.000055445547|17645.896698295066|
|disable|2|50000.0|10000.00008712872|17627.17064059849|
|enable|1|50000.0|10000.000039603963|18408.23205926807|
|enable|2|50000.0|10000.000071287132|18568.40828962387|

>> query result (duration)
||mode||trial||avg(addBatch)||avg(getBatch)||avg(getOffset)||avg(queryPlanning)||avg(triggerExecution)||avg(walCommit)||
|disable|1|2739.990099009901|0.0594059405940594|null|77.48514851485149|2837.4851485148515|11.584158415841584|
|disable|2|2742.960396039604|0.04950495049504951|null|76.03960396039604|2839.019801980198|11.009900990099009|
|enable|1|2625.108910891089|0.019801980198019802|null|78.06930693069307|2722.3366336633662|10.623762376237623|
|enable|2|2599.5544554455446|0.039603960396039604|null|76.24752475247524|2695.029702970297|10.643564356435643|

>> query result (state information)
||mode||trial||batchId||numRowsTotal||numRowsUpdated||memoryUsedBytes||
|disable|1|150|38946|31487|23036727|
|disable|2|150|38992|30800|23084343|
|enable|1|150|38915|31883|15208727|
|enable|2|150|39007|30401|14829975|
 * average state row size

||mode||size||note||
|disable|591.765993004| |
|enable|385.503337366|65.14 %|
{quote}App 4 - many values
{quote}
> Code

[https://github.com/HeartSaVioR/iot-trucking-app-spark-structured-streaming/blob/master/src/main/scala/com/hortonworks/spark/benchmark/BenchmarkMovingAggregationsListenerToFsManyKeys.scala]
 * key fields : window (start/end), mod (int), word (10 chars string)
 * value fields : max (int), min (int), avg (double) for 20 columns

> Test Result - use same queries from above test

>> query result (source)
||mode||trial||avg(numInputRows)||avg(inputRowsPerSecond)||avg(processedRowsPerSecond)||
|disable|1|63465.346534653465|10005.318225922994|9999.487035328104|
|disable|2|65247.52475247525|10011.503266213555|10001.570001460635|
|enable|1|62673.267326732675|9998.149926805827|9997.805403434037|
|enable|2|63564.35643564357|10000.005956852667|9996.36862736686|

>> query result (duration)
||mode||trial||avg(addBatch)||avg(getBatch)||avg(getOffset)||avg(queryPlanning)||avg(triggerExecution)||avg(walCommit)||
|disable|1|6235.069306930693|0.04950495049504951|null|93.63366336633663|6348.584158415842|9.910891089108912|
|disable|2|6410.3564356435645|0.039603960396039604|null|93.73267326732673|6523.821782178218|9.910891089108912|
|enable|1|6162.6732673267325|0.04950495049504951|null|88.62376237623762|6271.178217821782|10.435643564356436|
|enable|2|6244.079207920792|0.0297029702970297|null|94.26732673267327|6357.970297029703|9.930693069306932|

>> query result (state information)
||mode||trial||batchId||numRowsTotal||numRowsUpdated||memoryUsedBytes||
|disable|1|150|34776|29808|30019279|
|disable|2|150|39627|33928|33920375|
|enable|1|150|39842|30502|31837671|
|enable|2|150|34776|29616|27082887|
 * average state row size

||mode||size||note||
|disable|859.604889211| |
|enable|788.939591288|91.77 %|
{quote}Summary
{quote} * Enabling option showed on far or slightly better throughputs from 
three cases. It showed slightly lower throughput from one case but it was 
around 0.1% which I think we can treat it as noise.
 ** In overall it didn't show any performance regression at any cases.
 * Enabling option reduced state memory usage according to the ratio of 
key-value size as expected. Enabling option could reduce state memory usage 
around 45% for bigger key case.

> Remove redundant key data from value in streaming aggregation
> -------------------------------------------------------------
>
>                 Key: SPARK-24763
>                 URL: https://issues.apache.org/jira/browse/SPARK-24763
>             Project: Spark
>          Issue Type: Improvement
>          Components: Structured Streaming
>    Affects Versions: 2.4.0
>            Reporter: Jungtaek Lim
>            Priority: Major
>
> Key/Value of state in streaming aggregation is formatted as below:
>  * key: UnsafeRow containing group-by fields
>  * value: UnsafeRow containing key fields and another fields for aggregation 
> results
> which data for key is stored to both key and value.
> This is to avoid doing projection row to value while storing, and joining key 
> and value to restore origin row to boost performance, but while doing a 
> simple benchmark test, I found it not much helpful compared to "project and 
> join". (will paste test result in comment)
> So I would propose a new option: remove redundant in stateful aggregation. 
> I'm avoiding to modify default behavior of stateful aggregation, because 
> state value will not be compatible between current and option enabled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to