[ 
https://issues.apache.org/jira/browse/SPARK-27648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16838337#comment-16838337
 ] 

Jungtaek Lim commented on SPARK-27648:
--------------------------------------

[~yy3b2007com]

I feel you're exposing only part of your query, as according to your query the 
state operator should be only one but the query information shows the 
information of two state operators. Structured streaming doesn't support 
multiple level of aggregations so except stream-stream join, most of cases 
there's one state operator.

And your graph for state doesn't consider adding two state operators. It seems 
only take smaller one. Here's state size (estimation) from batch 580:

11367828611 = 10.58711541 GiB
448870007 = 0.418042771 GiB

It was 11 GiB instead of less than 500 MiB in your graph.
{noformat}
Query made progress: {
"id" : "954e29ff-361c-43c9-8e8e-8221baaa544d",
"runId" : "907471bd-393f-46eb-a70c-2679463668a1",
"name" : "queryMyHourAgg",
"timestamp" : "2019-04-29T05:14:09.353Z",
"batchId" : 580,
"numInputRows" : 242468,
"inputRowsPerSecond" : 268.5174110840897,
"processedRowsPerSecond" : 267.4765196326098,
"durationMs" : {
"addBatch" : 877717,
"getBatch" : 1,
"getEndOffset" : 0,
"queryPlanning" : 25483,
"setOffsetRange" : 3082,
"triggerExecution" : 906502,
"walCommit" : 113
},
"eventTime" : {
"avg" : "2019-04-29T05:15:31.598Z",
"max" : "2019-04-29T05:30:00.000Z",
"min" : "2019-04-29T05:00:00.000Z",
"watermark" : "2019-04-29T03:35:00.000Z"
},
"stateOperators" : [ {
"numRowsTotal" : 725830,
"numRowsUpdated" : 240062,
"memoryUsedBytes" : 11367828611,
"customMetrics" : {
"loadedMapCacheHitCount" : 914082,
"loadedMapCacheMissCount" : 196,
"stateOnCurrentVersionSizeBytes" : 8740427635
}
}, {
"numRowsTotal" : 1701992,
"numRowsUpdated" : 242182,
"memoryUsedBytes" : 448870007,
"customMetrics" : {
"loadedMapCacheHitCount" : 502667,
"loadedMapCacheMissCount" : 69,
"stateOnCurrentVersionSizeBytes" : 340077583
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[MY-Q1]]",
"startOffset" : {
"MY-Q1" : {
"17" : 163557365,
"8" : 162352319,
"11" : 161504220,
"2" : 162584734,
"5" : 163017552,
"14" : 165254414,
"13" : 163239536,
"4" : 163848418,
"16" : 163333381,
"7" : 163687777,
"1" : 165680313,
"10" : 163337267,
"19" : 160421019,
"18" : 161986305,
"9" : 163245093,
"3" : 161627253,
"12" : 162961867,
"15" : 161854220,
"6" : 162547056,
"0" : 161467759
}
},
"endOffset" : {
"MY-Q1" : {
"17" : 163569546,
"8" : 162364385,
"11" : 161516235,
"2" : 162596785,
"5" : 163029700,
"14" : 165266681,
"13" : 163251648,
"4" : 163860634,
"16" : 163345529,
"7" : 163699952,
"1" : 165692663,
"10" : 163349403,
"19" : 160432998,
"18" : 161998399,
"9" : 163257234,
"3" : 161639259,
"12" : 162974030,
"15" : 161866224,
"6" : 162559232,
"0" : 161479799
}
},
"numInputRows" : 242468,
"inputRowsPerSecond" : 268.5174110840897,
"processedRowsPerSecond" : 267.4765196326098
} ],
"sink" : {
"description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider@4284be04"
}
}
{noformat}

> In Spark2.4 Structured Streaming:The executor storage memory increasing over 
> time
> ---------------------------------------------------------------------------------
>
>                 Key: SPARK-27648
>                 URL: https://issues.apache.org/jira/browse/SPARK-27648
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.4.0
>            Reporter: tommy duan
>            Priority: Major
>         Attachments: houragg(1).out, houragg_filter.csv, 
> image-2019-05-09-17-51-14-036.png, image-2019-05-10-17-49-42-034.png
>
>
> *Spark Program Code Business:*
>  Read the topic on kafka, aggregate the stream data sources, and then output 
> it to another topic line of kafka.
> *Problem Description:*
>  *1) Using spark structured streaming in CDH environment (spark 2.2)*, memory 
> overflow problems often occur (because of too many versions of state stored 
> in memory, this bug has been modified in spark 2.4).
> {code:java}
> /spark-submit \
> --conf “spark.yarn.executor.memoryOverhead=4096M”
> --num-executors 15 \
> --executor-memory 3G \
> --executor-cores 2 \
> --driver-memory 6G \{code}
> {code}
> Executor memory exceptions occur when running with this submit resource under 
> SPARK 2.2 and the normal running time does not exceed one day.
> The solution is to set the executor memory larger than before 
> {code:java}
>  My spark-submit script is as follows:
> /spark-submit\
> conf "spark. yarn. executor. memoryOverhead = 4096M"
> num-executors 15\
> executor-memory 46G\
> executor-cores 3\
> driver-memory 6G\
> ...{code}
> In this case, the spark program can be guaranteed to run stably for a long 
> time, and the executor storage memory is less than 10M (it has been running 
> stably for more than 20 days).
> *2) From the upgrade information of Spark 2.4, we can see that the problem of 
> large memory consumption of state storage has been solved in Spark 2.4.* 
>  So we upgraded spark to SPARK 2.4 under CDH, tried to run the spark program, 
> and found that the use of memory was reduced.
>  But a problem arises, as the running time increases, the storage memory of 
> executor is growing (see Executors - > Storage Memory from the Spark on Yarn 
> Resource Manager UI).
>  This program has been running for 14 days (under SPARK 2.2, running with 
> this submit resource, the normal running time is not more than one day, 
> Executor memory abnormalities will occur).
>  The script submitted by the program under spark2.4 is as follows:
> {code:java}
> /spark-submit \
>  --conf “spark.yarn.executor.memoryOverhead=4096M”
>  --num-executors 15 \
>  --executor-memory 3G \
>  --executor-cores 2 \
>  --driver-memory 6G 
> {code}
> Under Spark 2.4, I counted the size of executor memory as time went by during 
> the running of the spark program:
> |Run-time(hour)|Storage Memory size(MB)|Memory growth rate(MB/hour)|
> |23.5H|41.6MB/1.5GB|1.770212766|
> |108.4H|460.2MB/1.5GB|4.245387454|
> |131.7H|559.1MB/1.5GB|4.245254366|
> |135.4H|575MB/1.5GB|4.246676514|
> |153.6H|641.2MB/1.5GB|4.174479167|
> |219H|888.1MB/1.5GB|4.055251142|
> |263H|1126.4MB/1.5GB|4.282889734|
> |309H|1228.8MB/1.5GB|3.976699029|



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to