[ https://issues.apache.org/jira/browse/SPARK-27648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16839097#comment-16839097 ]
tommy duan commented on SPARK-27648: ------------------------------------ Hi [~kabhwan] I'm so sorry. The above is the main code. The broadcast updates mentioned above are all for business. {code:java} public class Main{ public static void main(String[] args){ SparkSession sparkSession =..; SparkConf conf = _SparkHelper.getSparkConf(); String resHDFSPath = SparkConfig.getInstance().getResPath(conf); HashMap<Long, Map<String, String>> resouseData =new HashMap<>(SparkConfig.getInstance().getRes(conf, sparkSession,resHDFSPath)); JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sparkSession.sparkContext()); Broadcast<HashMap<Long, Map<String, String>>> resBroadcast = jsc.broadcast(resouseData); sparkSession.streams().addListener(new ResourceLoadListener(sparkSession,resBroadcast,resHDFSPath)); // The business code start(sparkSession); sparkSession.streams().awaitAnyTermination(); } } public class ResourceLoadListener extends StreamingQueryListener { private SparkSession sparkSession = null; private Broadcast<HashMap<Long, Map<String, String>>> resBroadcast = null; private String resHDFSPath = ""; public ResourceLoadListener(SparkSession sparkSession, Broadcast<HashMap<Long, Map<String, String>>> resBroadcast,String resHDFSPath){ this.sparkSession = sparkSession; this.resBroadcast = resBroadcast; this.resHDFSPath = resHDFSPath; } @Override public void onQueryStarted(QueryStartedEvent event) { } @Override public void onQueryProgress(QueryProgressEvent queryProgress){ try{ SparkConf conf = SparkHelper.getInstance().getSparkConf(); HashMap<Long, Map<String, String>> resouseData =new HashMap<>(SparkConfig.getInstance().getRes(conf, sparkSession,resHDFSPath); JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sparkSession.sparkContext()); resBroadcast.unpersist(true); resBroadcast = jsc.broadcast(resouseData); System.out.println("reload resource size:"+resBroadcast.getValue().size()); }catch (Exception e){ System.out.println("reload resource error:"+this.resHDFSPath+e); } } @Override public void onQueryTerminated(QueryTerminatedEvent event) { } } {code} > In Spark2.4 Structured Streaming:The executor storage memory increasing over > time > --------------------------------------------------------------------------------- > > Key: SPARK-27648 > URL: https://issues.apache.org/jira/browse/SPARK-27648 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.4.0 > Reporter: tommy duan > Priority: Major > Attachments: houragg(1).out, houragg_filter.csv, > image-2019-05-09-17-51-14-036.png, image-2019-05-10-17-49-42-034.png > > > *Spark Program Code Business:* > Read the topic on kafka, aggregate the stream data sources, and then output > it to another topic line of kafka. > *Problem Description:* > *1) Using spark structured streaming in CDH environment (spark 2.2)*, memory > overflow problems often occur (because of too many versions of state stored > in memory, this bug has been modified in spark 2.4). > {code:java} > /spark-submit \ > --conf “spark.yarn.executor.memoryOverhead=4096M” > --num-executors 15 \ > --executor-memory 3G \ > --executor-cores 2 \ > --driver-memory 6G \{code} > {code} > Executor memory exceptions occur when running with this submit resource under > SPARK 2.2 and the normal running time does not exceed one day. > The solution is to set the executor memory larger than before > {code:java} > My spark-submit script is as follows: > /spark-submit\ > conf "spark. yarn. executor. memoryOverhead = 4096M" > num-executors 15\ > executor-memory 46G\ > executor-cores 3\ > driver-memory 6G\ > ...{code} > In this case, the spark program can be guaranteed to run stably for a long > time, and the executor storage memory is less than 10M (it has been running > stably for more than 20 days). > *2) From the upgrade information of Spark 2.4, we can see that the problem of > large memory consumption of state storage has been solved in Spark 2.4.* > So we upgraded spark to SPARK 2.4 under CDH, tried to run the spark program, > and found that the use of memory was reduced. > But a problem arises, as the running time increases, the storage memory of > executor is growing (see Executors - > Storage Memory from the Spark on Yarn > Resource Manager UI). > This program has been running for 14 days (under SPARK 2.2, running with > this submit resource, the normal running time is not more than one day, > Executor memory abnormalities will occur). > The script submitted by the program under spark2.4 is as follows: > {code:java} > /spark-submit \ > --conf “spark.yarn.executor.memoryOverhead=4096M” > --num-executors 15 \ > --executor-memory 3G \ > --executor-cores 2 \ > --driver-memory 6G > {code} > Under Spark 2.4, I counted the size of executor memory as time went by during > the running of the spark program: > |Run-time(hour)|Storage Memory size(MB)|Memory growth rate(MB/hour)| > |23.5H|41.6MB/1.5GB|1.770212766| > |108.4H|460.2MB/1.5GB|4.245387454| > |131.7H|559.1MB/1.5GB|4.245254366| > |135.4H|575MB/1.5GB|4.246676514| > |153.6H|641.2MB/1.5GB|4.174479167| > |219H|888.1MB/1.5GB|4.055251142| > |263H|1126.4MB/1.5GB|4.282889734| > |309H|1228.8MB/1.5GB|3.976699029| -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org