[ https://issues.apache.org/jira/browse/SPARK-24763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16536539#comment-16536539 ]
Jungtaek Lim edited comment on SPARK-24763 at 7/9/18 5:18 AM: -------------------------------------------------------------- > Spark version * 2.4.0-SNAPSHOT * commit: 79c66894296840cc4a5bf6c8718ecfd2b08bcca8 (latest master) + SPARK-24441 + SPARK-24717 * POC commit: [https://github.com/HeartSaVioR/spark/commit/378ce2ae30116fba80421873ef04ed2ca113630e] > App (query) [https://github.com/HeartSaVioR/iot-trucking-app-spark-structured-streaming/blob/master/src/main/scala/com/hortonworks/spark/BenchmarkMovingAggregationsListenerToFs.scala] {code:java} val outDf = df .withWatermark("timestamp", "10 seconds") .selectExpr( "timestamp", "mod(value, 100) as mod", "value", createCaseExprStr(50, "mod(CAST(RANDN(0) * 1000 as INTEGER), 50)") + " as word") .groupBy( window($"timestamp", "1 minute", "10 seconds"), $"mod", $"word") .agg(max("value").as("max_value"), min("value").as("min_value"), avg("value").as("avg_value")){code} * key fields : window (start/end), mod (int), word (10 chars string) * value fields : max_value (int), min_value (int), avg_value (double) > How to test Ran above app three times per kinds of option (enable/disable). Used AWS dedicated instance in Seoul region, c5.xlarge (4 core, 8 gb), 30g SSD (IOPS 100/3000) Not using cluster: just ran in one JVM process to avoid any transfer latency. {code:java} --master local[*] --driver-memory 2g --executor-memory 8g --executor-cores 3{code} > Test Result (analysis query and result table, respectively) >> query (source) {code:java} val source0InfoPopulatedDf = jsonDf .selectExpr("data.id", "data.runId", "data.batchId", "data.sources[0].numInputRows AS numInputRows", "data.sources[0].inputRowsPerSecond AS inputRowsPerSecond", "data.sources[0].processedRowsPerSecond AS processedRowsPerSecond", "data.sources[0].startOffset AS startOffset", "data.sources[0].endOffset AS endOffset") .distinct() .where("batchId >= 50 and batchId <= 150") .groupBy("id", "runId") .agg("numInputRows" -> "avg", "inputRowsPerSecond" -> "avg", "processedRowsPerSecond" -> "avg") source0InfoPopulatedDf.show() {code} >> result ||mode||trial||avg(numInputRows)||avg(inputRowsPerSecond)||avg(processedRowsPerSecond)|| |disable|1|50000.0|10000.000110891095|22543.79742858393| |disable|2|50000.0|10000.019900990894|22689.82215288191| |disable|3|50000.0|9999.980265345746|22302.87036748844| |enable|1|50000.0|10000.000110891096|22711.04445963631| |enable|2|50000.0|10000.019893070104|22774.52502523933| |enable|3|50000.0|10000.019908911689|22826.48595693046| >> query (duration) {code:java} val durationInfoPopulatedDf = jsonDf .selectExpr("data.id", "data.runId", "data.batchId", "data.durationMs.*") .distinct() .where("batchId >= 50 and batchId <= 150") .groupBy("id", "runId") .agg("addBatch" -> "avg", "getBatch" -> "avg", "getOffset" -> "avg", "queryPlanning" -> "avg", "triggerExecution" -> "avg", "walCommit" -> "avg") durationInfoPopulatedDf.show() {code} >> result ||mode||trial||avg(addBatch)||avg(getBatch)||avg(getOffset)||avg(queryPlanning)||avg(triggerExecution)||avg(walCommit)|| |disable|1|2145.58415841584|0.0693069306930693|null|55.23762376237624|2221.871287128713|12.762376237623762| |disable|2|2131.00990099009|0.0495049504950495|null|55.49504950495049|2207.069306930693|12.405940594059405| |disable|3|2172.99009900990|0.0495049504950495|null|55.97029702970297|2249.495049504950|12.445544554455445| |enable|1|2130.77227722772|0.0594059405940594|null|55.31683168316832|2207.306930693069|12.871287128712872| |enable|2|2126.14851485148|0.0396039603960396|null|54.98019801980198|2202.415841584158|12.891089108910892| |enable|3|2120.19801980198|0.0495049504950495|null|54.82178217821782|2195.772277227723|12.643564356435643| >> query (state information) {code:java} val state0InfoPopulatedDf = jsonDf .selectExpr("data.id", "data.runId", "data.batchId", "data.stateOperators[0].numRowsTotal AS numRowsTotal", "data.stateOperators[0].numRowsUpdated AS numRowsUpdated", "data.stateOperators[0].memoryUsedBytes AS memoryUsedBytes") .distinct() // specific batch id .where("batchId = 150") state0InfoPopulatedDf.show() {code} >> result ||mode||trial||batchId||numRowsTotal||numRowsUpdated||memoryUsedBytes|| |disable|1|150|34811|29838|10585079| |disable|2|150|34811|29838|10477367| |disable|3|150|39734|31597|12056439| |enable|1|150|39725|31660|9856055| |enable|2|150|34811|29838|8631607| |enable|3|150|39713|31769|9674807| >> efficiency of state row size when enabling option (8631607 / 29838) / (10477367 / 29838) * 100 = 82.38% (9856055 / 31660) / (12056439 / 31597) * 100 = 81.58% In overall, enabling the option runs faster (but around 1% so no outstanding) and uses less memory for state (saves around 20%, according to the compose of key-value fields). was (Author: kabhwan): > Spark version * 2.4.0-SNAPSHOT * commit: 79c66894296840cc4a5bf6c8718ecfd2b08bcca8 (latest master) + SPARK-24441 + SPARK-24717 * POC commit: [https://github.com/HeartSaVioR/spark/commit/378ce2ae30116fba80421873ef04ed2ca113630e] > App (query) [https://github.com/HeartSaVioR/iot-trucking-app-spark-structured-streaming/blob/master/src/main/scala/com/hortonworks/spark/BenchmarkMovingAggregationsListenerToFs.scala] {code:java} val outDf = df .withWatermark("timestamp", "10 seconds") .selectExpr( "timestamp", "mod(value, 100) as mod", "value", createCaseExprStr(50, "mod(CAST(RANDN(0) * 1000 as INTEGER), 50)") + " as word") .groupBy( window($"timestamp", "1 minute", "10 seconds"), $"mod", $"word") .agg(max("value").as("max_value"), min("value").as("min_value"), avg("value").as("avg_value")){code} * key fields : window (start/end), mod (int), word (10 chars string) * value fields : max_value (int), min_value (int), avg_value (double) > How to test Ran above app three times per kinds of option (enable/disable). > Test Result (analysis query and result table, respectively) >> query (source) {code:java} val source0InfoPopulatedDf = jsonDf .selectExpr("data.id", "data.runId", "data.batchId", "data.sources[0].numInputRows AS numInputRows", "data.sources[0].inputRowsPerSecond AS inputRowsPerSecond", "data.sources[0].processedRowsPerSecond AS processedRowsPerSecond", "data.sources[0].startOffset AS startOffset", "data.sources[0].endOffset AS endOffset") .distinct() .where("batchId >= 50 and batchId <= 150") .groupBy("id", "runId") .agg("numInputRows" -> "avg", "inputRowsPerSecond" -> "avg", "processedRowsPerSecond" -> "avg") source0InfoPopulatedDf.show() {code} >> result ||mode||trial||avg(numInputRows)||avg(inputRowsPerSecond)||avg(processedRowsPerSecond)|| |disable|1|50000.0|10000.000110891095|22543.79742858393| |disable|2|50000.0|10000.019900990894|22689.82215288191| |disable|3|50000.0|9999.980265345746|22302.87036748844| |enable|1|50000.0|10000.000110891096|22711.04445963631| |enable|2|50000.0|10000.019893070104|22774.52502523933| |enable|3|50000.0|10000.019908911689|22826.48595693046| >> query (duration) {code:java} val durationInfoPopulatedDf = jsonDf .selectExpr("data.id", "data.runId", "data.batchId", "data.durationMs.*") .distinct() .where("batchId >= 50 and batchId <= 150") .groupBy("id", "runId") .agg("addBatch" -> "avg", "getBatch" -> "avg", "getOffset" -> "avg", "queryPlanning" -> "avg", "triggerExecution" -> "avg", "walCommit" -> "avg") durationInfoPopulatedDf.show() {code} >> result ||mode||trial||avg(addBatch)||avg(getBatch)||avg(getOffset)||avg(queryPlanning)||avg(triggerExecution)||avg(walCommit)|| |disable|1|2145.58415841584|0.0693069306930693|null|55.23762376237624|2221.871287128713|12.762376237623762| |disable|2|2131.00990099009|0.0495049504950495|null|55.49504950495049|2207.069306930693|12.405940594059405| |disable|3|2172.99009900990|0.0495049504950495|null|55.97029702970297|2249.495049504950|12.445544554455445| |enable|1|2130.77227722772|0.0594059405940594|null|55.31683168316832|2207.306930693069|12.871287128712872| |enable|2|2126.14851485148|0.0396039603960396|null|54.98019801980198|2202.415841584158|12.891089108910892| |enable|3|2120.19801980198|0.0495049504950495|null|54.82178217821782|2195.772277227723|12.643564356435643| >> query (state information) {code:java} val state0InfoPopulatedDf = jsonDf .selectExpr("data.id", "data.runId", "data.batchId", "data.stateOperators[0].numRowsTotal AS numRowsTotal", "data.stateOperators[0].numRowsUpdated AS numRowsUpdated", "data.stateOperators[0].memoryUsedBytes AS memoryUsedBytes") .distinct() // specific batch id .where("batchId = 150") state0InfoPopulatedDf.show() {code} >> result ||mode||trial||batchId||numRowsTotal||numRowsUpdated||memoryUsedBytes|| |disable|1|150|34811|29838|10585079| |disable|2|150|34811|29838|10477367| |disable|3|150|39734|31597|12056439| |enable|1|150|39725|31660|9856055| |enable|2|150|34811|29838|8631607| |enable|3|150|39713|31769|9674807| >> efficiency of state row size when enabling option (8631607 / 29838) / (10477367 / 29838) * 100 = 82.38% (9856055 / 31660) / (12056439 / 31597) * 100 = 81.58% In overall, enabling the option runs faster (but around 1% so no outstanding) and uses less memory for state (saves around 20%, according to the compose of key-value fields). > Remove redundant key data from value in streaming aggregation > ------------------------------------------------------------- > > Key: SPARK-24763 > URL: https://issues.apache.org/jira/browse/SPARK-24763 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming > Affects Versions: 2.4.0 > Reporter: Jungtaek Lim > Priority: Major > > Key/Value of state in streaming aggregation is formatted as below: > * key: UnsafeRow containing group-by fields > * value: UnsafeRow containing key fields and another fields for aggregation > results > which data for key is stored to both key and value. > This is to avoid doing projection row to value while storing, and joining key > and value to restore origin row to boost performance, but while doing a > simple benchmark test, I found it not much helpful compared to "project and > join". (will paste test result in comment) > So I would propose a new option: remove redundant in stateful aggregation. > I'm avoiding to modify default behavior of stateful aggregation, because > state value will not be compatible between current and option enabled. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org