[ 
https://issues.apache.org/jira/browse/SPARK-27648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16839097#comment-16839097
 ] 

tommy duan commented on SPARK-27648:
------------------------------------

Hi [~kabhwan] 

I'm so sorry.

The above is the main code. The broadcast updates mentioned above are all for 
business.

 
{code:java}
public class Main{
 public static void main(String[] args){
 SparkSession sparkSession =..; 
 
 SparkConf conf = _SparkHelper.getSparkConf();
 String resHDFSPath = SparkConfig.getInstance().getResPath(conf); 
 HashMap<Long, Map<String, String>> resouseData =new 
HashMap<>(SparkConfig.getInstance().getRes(conf, sparkSession,resHDFSPath));
 JavaSparkContext jsc = 
JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
 Broadcast<HashMap<Long, Map<String, String>>> resBroadcast = 
jsc.broadcast(resouseData);
sparkSession.streams().addListener(new 
ResourceLoadListener(sparkSession,resBroadcast,resHDFSPath));
 
 // The business code
 start(sparkSession);
 
 sparkSession.streams().awaitAnyTermination();
 }
}

public class ResourceLoadListener extends StreamingQueryListener {
 private SparkSession sparkSession = null;
 private Broadcast<HashMap<Long, Map<String, String>>> resBroadcast = null;
 private String resHDFSPath = "";
 public ResourceLoadListener(SparkSession sparkSession, Broadcast<HashMap<Long, 
Map<String, String>>> resBroadcast,String resHDFSPath){
 this.sparkSession = sparkSession;
 this.resBroadcast = resBroadcast;
 this.resHDFSPath = resHDFSPath;
 }
 @Override
 public void onQueryStarted(QueryStartedEvent event) {
 }
 @Override
 public void onQueryProgress(QueryProgressEvent queryProgress){
 try{
 SparkConf conf = SparkHelper.getInstance().getSparkConf();
 HashMap<Long, Map<String, String>> resouseData =new 
HashMap<>(SparkConfig.getInstance().getRes(conf, sparkSession,resHDFSPath);
 JavaSparkContext jsc = 
JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
 
 resBroadcast.unpersist(true);
 resBroadcast = jsc.broadcast(resouseData);
 
 System.out.println("reload resource size:"+resBroadcast.getValue().size());
 }catch (Exception e){
 System.out.println("reload resource error:"+this.resHDFSPath+e);
 } 
 }
 @Override
 public void onQueryTerminated(QueryTerminatedEvent event) {
 }
}
 
{code}
 

> In Spark2.4 Structured Streaming:The executor storage memory increasing over 
> time
> ---------------------------------------------------------------------------------
>
>                 Key: SPARK-27648
>                 URL: https://issues.apache.org/jira/browse/SPARK-27648
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.4.0
>            Reporter: tommy duan
>            Priority: Major
>         Attachments: houragg(1).out, houragg_filter.csv, 
> image-2019-05-09-17-51-14-036.png, image-2019-05-10-17-49-42-034.png
>
>
> *Spark Program Code Business:*
>  Read the topic on kafka, aggregate the stream data sources, and then output 
> it to another topic line of kafka.
> *Problem Description:*
>  *1) Using spark structured streaming in CDH environment (spark 2.2)*, memory 
> overflow problems often occur (because of too many versions of state stored 
> in memory, this bug has been modified in spark 2.4).
> {code:java}
> /spark-submit \
> --conf “spark.yarn.executor.memoryOverhead=4096M”
> --num-executors 15 \
> --executor-memory 3G \
> --executor-cores 2 \
> --driver-memory 6G \{code}
> {code}
> Executor memory exceptions occur when running with this submit resource under 
> SPARK 2.2 and the normal running time does not exceed one day.
> The solution is to set the executor memory larger than before 
> {code:java}
>  My spark-submit script is as follows:
> /spark-submit\
> conf "spark. yarn. executor. memoryOverhead = 4096M"
> num-executors 15\
> executor-memory 46G\
> executor-cores 3\
> driver-memory 6G\
> ...{code}
> In this case, the spark program can be guaranteed to run stably for a long 
> time, and the executor storage memory is less than 10M (it has been running 
> stably for more than 20 days).
> *2) From the upgrade information of Spark 2.4, we can see that the problem of 
> large memory consumption of state storage has been solved in Spark 2.4.* 
>  So we upgraded spark to SPARK 2.4 under CDH, tried to run the spark program, 
> and found that the use of memory was reduced.
>  But a problem arises, as the running time increases, the storage memory of 
> executor is growing (see Executors - > Storage Memory from the Spark on Yarn 
> Resource Manager UI).
>  This program has been running for 14 days (under SPARK 2.2, running with 
> this submit resource, the normal running time is not more than one day, 
> Executor memory abnormalities will occur).
>  The script submitted by the program under spark2.4 is as follows:
> {code:java}
> /spark-submit \
>  --conf “spark.yarn.executor.memoryOverhead=4096M”
>  --num-executors 15 \
>  --executor-memory 3G \
>  --executor-cores 2 \
>  --driver-memory 6G 
> {code}
> Under Spark 2.4, I counted the size of executor memory as time went by during 
> the running of the spark program:
> |Run-time(hour)|Storage Memory size(MB)|Memory growth rate(MB/hour)|
> |23.5H|41.6MB/1.5GB|1.770212766|
> |108.4H|460.2MB/1.5GB|4.245387454|
> |131.7H|559.1MB/1.5GB|4.245254366|
> |135.4H|575MB/1.5GB|4.246676514|
> |153.6H|641.2MB/1.5GB|4.174479167|
> |219H|888.1MB/1.5GB|4.055251142|
> |263H|1126.4MB/1.5GB|4.282889734|
> |309H|1228.8MB/1.5GB|3.976699029|



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to