[ https://issues.apache.org/jira/browse/SPARK-24763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16541367#comment-16541367 ]
Jungtaek Lim edited comment on SPARK-24763 at 7/12/18 9:20 AM: --------------------------------------------------------------- I had a chance to craft various key/value cases (bigger key, bigger value, many key columns, many value columns) and ran similar tests on these cases. To handle 4 tests concurrently, I just had 2 trials (instead of 3 trials) per each case & disable/enable. Same spark version and same AWS dedicated instance, same command to run JVM (just increased driver memory to 6g). I'll describe test results per case. {quote}App 1 - bigger key {quote} > Code [https://github.com/HeartSaVioR/iot-trucking-app-spark-structured-streaming/blob/master/src/main/scala/com/hortonworks/spark/benchmark/BenchmarkMovingAggregationsListenerToFsKeyMuchBigger.scala] * key fields : window (start/end), mod (int), word (1000 chars string) * value fields : max_value (int), min_value (int), avg_value (double) > Test Result - use same queries from above test >> query result (source) ||mode||trial||avg(numInputRows)||avg(inputRowsPerSecond)||avg(processedRowsPerSecond)|| |disable|1|50000.0|10000.019877228518|21173.52173524455| |disable|2|50000.0|9999.980273266538|21241.022682965504| |enable|1|50000.0|10000.00006336634|21745.986204268098| |enable|2|50000.0|10000.000007920793|21382.004127689153| >> query result (duration) ||mode||trial||avg(addBatch)||avg(getBatch)||avg(getOffset)||avg(queryPlanning)||avg(triggerExecution)||avg(walCommit)|| |disable|1|2283.7425742574255|0.06930693069306931|null|62.475247524752476|2365.168316831683|10.267326732673267| |disable|2|2274.3564356435645|0.09900990099009901|null|64.42574257425743|2358.059405940594|11.049504950495049| |enable|1|2216.8514851485147|0.0891089108910891|null|65.23762376237623|2301.920792079208|10.752475247524753| |enable|2|2260.366336633663|0.0297029702970297|null|63.742574257425744|2344.6633663366338|11.673267326732674| >> query result (state information) ||mode||trial||batchId||numRowsTotal||numRowsUpdated||memoryUsedBytes|| |disable|1|150|34153|29274|68989767| |disable|2|150|34153|29274|68981335| |enable|1|150|38952|31352|44137447| |enable|2|150|38969|31138|43849183| * average state row size ||mode||size||note|| |disable|2019.897256464| | |enable|1129.178232806|55.90 %| {quote}App 2 - bigger value {quote} > Code [https://github.com/HeartSaVioR/iot-trucking-app-spark-structured-streaming/blob/master/src/main/scala/com/hortonworks/spark/benchmark/BenchmarkMovingAggregationsListenerToFsValueMuchBigger.scala] * key fields : window (start/end), mod (int) * value fields : max_value (int), min_value (int), avg_value (double), word_last (1000 chars string) >> query result (source) ||mode||trial||avg(numInputRows)||avg(inputRowsPerSecond)||avg(processedRowsPerSecond)|| |disable|1|50000.0|10000.000039603963|20021.91393461469| |disable|2|50000.0|10000.000071287132|19811.730674701703| |enable|1|50000.0|10000.000435644328|20518.188917614298| |enable|2|50000.0|10000.000071287132|20312.040082394597| >> query result (duration) ||mode||trial||avg(addBatch)||avg(getBatch)||avg(getOffset)||avg(queryPlanning)||avg(triggerExecution)||avg(walCommit)|| |disable|1|2417.990099009901|0.0297029702970297|null|64.9009900990099|2500.910891089109|10.158415841584159| |disable|2|2447.7524752475247|0.039603960396039604|null|62.386138613861384|2529.3564356435645|10.712871287128714| |enable|1|2357.891089108911|0.04950495049504951|null|62.81188118811881|2439.891089108911|10.881188118811881| |enable|2|2381.5346534653463|0.04950495049504951|null|61.742574257425744|2463.108910891089|11.435643564356436| >> query result (state information) ||mode||trial||batchId||numRowsTotal||numRowsUpdated||memoryUsedBytes|| |disable|1|150|700|600|934447| |disable|2|150|700|600|934223| |enable|1|150|800|700|1029799| |enable|2|150|800|700|1029183| * average state row size ||mode||size||note|| |disable|1334.764285714| | |enable|1286.86375|96.41 %| {quote}App 3 - many keys {quote} > Code [https://github.com/HeartSaVioR/iot-trucking-app-spark-structured-streaming/blob/master/src/main/scala/com/hortonworks/spark/benchmark/BenchmarkMovingAggregationsListenerToFsManyKeys.scala] * key fields : window (start/end), mod (int), word (10 chars string), and 20 more columns (int) * value fields : max_value (int), min_value (int), avg_value (double) > Test Result - use same queries from above test >> query result (source) ||mode||trial||avg(numInputRows)||avg(inputRowsPerSecond)||avg(processedRowsPerSecond)|| |disable|1|50000.0|10000.000055445547|17645.896698295066| |disable|2|50000.0|10000.00008712872|17627.17064059849| |enable|1|50000.0|10000.000039603963|18408.23205926807| |enable|2|50000.0|10000.000071287132|18568.40828962387| >> query result (duration) ||mode||trial||avg(addBatch)||avg(getBatch)||avg(getOffset)||avg(queryPlanning)||avg(triggerExecution)||avg(walCommit)|| |disable|1|2739.990099009901|0.0594059405940594|null|77.48514851485149|2837.4851485148515|11.584158415841584| |disable|2|2742.960396039604|0.04950495049504951|null|76.03960396039604|2839.019801980198|11.009900990099009| |enable|1|2625.108910891089|0.019801980198019802|null|78.06930693069307|2722.3366336633662|10.623762376237623| |enable|2|2599.5544554455446|0.039603960396039604|null|76.24752475247524|2695.029702970297|10.643564356435643| >> query result (state information) ||mode||trial||batchId||numRowsTotal||numRowsUpdated||memoryUsedBytes|| |disable|1|150|38946|31487|23036727| |disable|2|150|38992|30800|23084343| |enable|1|150|38915|31883|15208727| |enable|2|150|39007|30401|14829975| * average state row size ||mode||size||note|| |disable|591.765993004| | |enable|385.503337366|65.14 %| {quote}App 4 - many values {quote} > Code [https://github.com/HeartSaVioR/iot-trucking-app-spark-structured-streaming/blob/master/src/main/scala/com/hortonworks/spark/benchmark/BenchmarkMovingAggregationsListenerToFsManyKeys.scala] * key fields : window (start/end), mod (int), word (10 chars string) * value fields : max (int), min (int), avg (double) for 20 columns > Test Result - use same queries from above test >> query result (source) ||mode||trial||avg(numInputRows)||avg(inputRowsPerSecond)||avg(processedRowsPerSecond)|| |disable|1|63465.346534653465|10005.318225922994|9999.487035328104| |disable|2|65247.52475247525|10011.503266213555|10001.570001460635| |enable|1|62673.267326732675|9998.149926805827|9997.805403434037| |enable|2|63564.35643564357|10000.005956852667|9996.36862736686| >> query result (duration) ||mode||trial||avg(addBatch)||avg(getBatch)||avg(getOffset)||avg(queryPlanning)||avg(triggerExecution)||avg(walCommit)|| |disable|1|6235.069306930693|0.04950495049504951|null|93.63366336633663|6348.584158415842|9.910891089108912| |disable|2|6410.3564356435645|0.039603960396039604|null|93.73267326732673|6523.821782178218|9.910891089108912| |enable|1|6162.6732673267325|0.04950495049504951|null|88.62376237623762|6271.178217821782|10.435643564356436| |enable|2|6244.079207920792|0.0297029702970297|null|94.26732673267327|6357.970297029703|9.930693069306932| >> query result (state information) ||mode||trial||batchId||numRowsTotal||numRowsUpdated||memoryUsedBytes|| |disable|1|150|34776|29808|30019279| |disable|2|150|39627|33928|33920375| |enable|1|150|39842|30502|31837671| |enable|2|150|34776|29616|27082887| * average state row size ||mode||size||note|| |disable|859.604889211| | |enable|788.939591288|91.77 %| {quote}Summary {quote} * Enabling option showed on far or slightly better throughputs from three cases. It showed slightly lower throughput from one case but it was around 0.1% which I think we can treat it as noise. ** In overall it didn't show any performance regression at any cases. * Enabling option reduced state memory usage according to the ratio of key-value size as expected. Enabling option could reduce state memory usage around 45% for bigger key case. was (Author: kabhwan): I had a chance to craft various key/value cases (bigger key, bigger value, many key columns, many value columns) and ran similar tests on these cases. To handle 4 tests concurrently, I just had 2 trials (instead of 3 trials) per each case & disable/enable. Same spark version and same AWS dedicated instance, same command to run JVM (just increased driver memory to 6g). I'll describe test results per case. {quote}App 1 - bigger key {quote} > Code [https://github.com/HeartSaVioR/iot-trucking-app-spark-structured-streaming/blob/master/src/main/scala/com/hortonworks/spark/benchmark/BenchmarkMovingAggregationsListenerToFsKeyMuchBigger.scala] * key fields : window (start/end), mod (int), word (1000 chars string) * value fields : max_value (int), min_value (int), avg_value (double) > Test Result - use same queries from above test >> query result (source) ||mode||trial||avg(numInputRows)||avg(inputRowsPerSecond)||avg(processedRowsPerSecond)|| |disable|1|50000.0|10000.019877228518|21173.52173524455| |disable|2|50000.0|9999.980273266538|21241.022682965504| |enable|1|50000.0|10000.00006336634|21745.986204268098| |enable|2|50000.0|10000.000007920793|21382.004127689153| >> query result (duration) ||mode||trial||avg(addBatch)||avg(getBatch)||avg(getOffset)||avg(queryPlanning)||avg(triggerExecution)||avg(walCommit)|| |disable|1|2283.7425742574255|0.06930693069306931|null|62.475247524752476|2365.168316831683|10.267326732673267| |disable|2|2274.3564356435645|0.09900990099009901|null|64.42574257425743|2358.059405940594|11.049504950495049| |enable|1|2216.8514851485147|0.0891089108910891|null|65.23762376237623|2301.920792079208|10.752475247524753| |enable|2|2260.366336633663|0.0297029702970297|null|63.742574257425744|2344.6633663366338|11.673267326732674| >> query result (state information) ||mode||trial||batchId||numRowsTotal||numRowsUpdated||memoryUsedBytes|| |disable|1|150|34153|29274|68989767| |disable|2|150|34153|29274|68981335| |enable|1|150|38952|31352|44137447| |enable|2|150|38969|31138|43849183| * average state row size ||mode||size||note|| |disable|2019.897256464| | |enable|1129.178232806|55.90 %| {quote}App 2 - bigger value {quote} > Code [https://github.com/HeartSaVioR/iot-trucking-app-spark-structured-streaming/blob/master/src/main/scala/com/hortonworks/spark/benchmark/BenchmarkMovingAggregationsListenerToFsValueMuchBigger.scala] * key fields : window (start/end), mod (int) * value fields : max_value (int), min_value (int), avg_value (double), word_last (1000 chars string) >> query result (source) ||mode||trial||avg(numInputRows)||avg(inputRowsPerSecond)||avg(processedRowsPerSecond)|| |disable|1|50000.0|10000.000039603963|20021.91393461469| |disable|2|50000.0|10000.000071287132|19811.730674701703| |enable|1|50000.0|10000.000435644328|20518.188917614298| |enable|2|50000.0|10000.000071287132|20312.040082394597| >> query result (duration) ||mode||trial||avg(addBatch)||avg(getBatch)||avg(getOffset)||avg(queryPlanning)||avg(triggerExecution)||avg(walCommit)|| |disable|1|2417.990099009901|0.0297029702970297|null|64.9009900990099|2500.910891089109|10.158415841584159| |disable|2|2447.7524752475247|0.039603960396039604|null|62.386138613861384|2529.3564356435645|10.712871287128714| |enable|1|2357.891089108911|0.04950495049504951|null|62.81188118811881|2439.891089108911|10.881188118811881| |enable|2|2381.5346534653463|0.04950495049504951|null|61.742574257425744|2463.108910891089|11.435643564356436| >> query result (state information) ||mode||trial||batchId||numRowsTotal||numRowsUpdated||memoryUsedBytes|| |disable|1|150|700|600|934447| |disable|2|150|700|600|934223| |enable|1|150|800|700|1029799| |enable|2|150|800|700|1029183| * average state row size ||mode||size||note|| |disable|1334.764285714| | |enable|1286.86375|96.41 %| {quote}App 3 - many keys {quote} > Code [https://github.com/HeartSaVioR/iot-trucking-app-spark-structured-streaming/blob/master/src/main/scala/com/hortonworks/spark/benchmark/BenchmarkMovingAggregationsListenerToFsManyKeys.scala] * key fields : window (start/end), mod (int), word (10 chars string), and 20 more columns (int) * value fields : max_value (int), min_value (int), avg_value (double) > Test Result - use same queries from above test >> query result (source) ||mode||trial||avg(numInputRows)||avg(inputRowsPerSecond)||avg(processedRowsPerSecond)|| |disable|1|50000.0|10000.000055445547|17645.896698295066| |disable|2|50000.0|10000.00008712872|17627.17064059849| |enable|1|50000.0|10000.000039603963|18408.23205926807| |enable|2|50000.0|10000.000071287132|18568.40828962387| >> query result (duration) ||mode||trial||avg(addBatch)||avg(getBatch)||avg(getOffset)||avg(queryPlanning)||avg(triggerExecution)||avg(walCommit)|| |disable|1|2739.990099009901|0.0594059405940594|null|77.48514851485149|2837.4851485148515|11.584158415841584| |disable|2|2742.960396039604|0.04950495049504951|null|76.03960396039604|2839.019801980198|11.009900990099009| |enable|1|2625.108910891089|0.019801980198019802|null|78.06930693069307|2722.3366336633662|10.623762376237623| |enable|2|2599.5544554455446|0.039603960396039604|null|76.24752475247524|2695.029702970297|10.643564356435643| >> query result (state information) ||mode||trial||batchId||numRowsTotal||numRowsUpdated||memoryUsedBytes|| |disable|1|150|38946|31487|23036727| |disable|2|150|38992|30800|23084343| |enable|1|150|38915|31883|15208727| |enable|2|150|39007|30401|14829975| * average state row size ||mode||size||note|| |disable|591.765993004| | |enable|385.503337366|65.14 %| {quote}App 4 - many values {quote} > Code [https://github.com/HeartSaVioR/iot-trucking-app-spark-structured-streaming/blob/master/src/main/scala/com/hortonworks/spark/benchmark/BenchmarkMovingAggregationsListenerToFsManyKeys.scala] * key fields : window (start/end), mod (int), word (10 chars string) * value fields : max (int), min (int), avg (double) for 20 columns > Test Result - use same queries from above test >> query result (source) ||mode||trial||avg(numInputRows)||avg(inputRowsPerSecond)||avg(processedRowsPerSecond)|| |disable|1|63465.346534653465|10005.318225922994|9999.487035328104| |disable|2|65247.52475247525|10011.503266213555|10001.570001460635| |enable|1|62673.267326732675|9998.149926805827|9997.805403434037| |enable|2|63564.35643564357|10000.005956852667|9996.36862736686| >> query result (duration) ||mode||trial||avg(addBatch)||avg(getBatch)||avg(getOffset)||avg(queryPlanning)||avg(triggerExecution)||avg(walCommit)|| |disable|1|6235.069306930693|0.04950495049504951|null|93.63366336633663|6348.584158415842|9.910891089108912| |disable|2|6410.3564356435645|0.039603960396039604|null|93.73267326732673|6523.821782178218|9.910891089108912| |enable|1|6162.6732673267325|0.04950495049504951|null|88.62376237623762|6271.178217821782|10.435643564356436| |enable|2|6244.079207920792|0.0297029702970297|null|94.26732673267327|6357.970297029703|9.930693069306932| >> query result (state information) ||mode||trial||batchId||numRowsTotal||numRowsUpdated||memoryUsedBytes|| |disable|1|150|34776|29808|30019279| |disable|2|150|39627|33928|33920375| |enable|1|150|39842|30502|31837671| |enable|2|150|34776|29616|27082887| * average state row size ||mode||size||note|| |disable|859.604889211| | |enable|788.939591288|91.77 %| {quote}Summary {quote} * Enabling option showed on far or slightly better throughputs from three cases. It showed slightly lower throughput from one case but it was around 0.1% which I think we can treat it as noise. ** In overall it didn't show any performance regression at any cases. * Enabling option reduced state memory usage according to the ratio of key-value size as expected. Enabling option could reduce state memory usage around 45% for bigger key case. > Remove redundant key data from value in streaming aggregation > ------------------------------------------------------------- > > Key: SPARK-24763 > URL: https://issues.apache.org/jira/browse/SPARK-24763 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming > Affects Versions: 2.4.0 > Reporter: Jungtaek Lim > Priority: Major > > Key/Value of state in streaming aggregation is formatted as below: > * key: UnsafeRow containing group-by fields > * value: UnsafeRow containing key fields and another fields for aggregation > results > which data for key is stored to both key and value. > This is to avoid doing projection row to value while storing, and joining key > and value to restore origin row to boost performance, but while doing a > simple benchmark test, I found it not much helpful compared to "project and > join". (will paste test result in comment) > So I would propose a new option: remove redundant in stateful aggregation. > I'm avoiding to modify default behavior of stateful aggregation, because > state value will not be compatible between current and option enabled. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org