[ 
https://issues.apache.org/jira/browse/SPARK-27648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16838540#comment-16838540
 ] 

tommy duan edited comment on SPARK-27648 at 5/13/19 1:34 PM:
-------------------------------------------------------------

Hi [~kabhwan]&[~gsomogyi],

Yes,the code include one sink;

 
{code:java}
public void start(SparkSession sparkSession) throws Exception {
  init(sparkSession);
   Dataset<Row> dsSourceData = loadSourceData(sparkSession); // dsSourceData
   Dataset<Row> dsPreDropDupl = procSrcDataPreDropDupl(dsSourceData);
   Dataset<Row> dsDropDuplicates = dropDuplicates(sparkSession, dsPreDropDupl);
   Dataset<Row> dsAfterDropDupl = procSrcDataAfterDropDupl(dsDropDuplicates);
   if (this._DropDuplType.equals("keepfirst")||this._DropDuplType.equals("no")) 
{
     Dataset<Row> dsAgg = executeAgg(sparkSession, dsAfterDropDupl);
     writeResult(sparkSession, dsAgg);
   }
   if (this._DropDuplType.equals("keeplast")) {
     writeResult(sparkSession, dsAfterDropDupl);
   }
 }{code}
 

The property "_DropDuplTypes" value equals "keepfirst",So the sink just only 
one。

The state operator include tow *{color:#ff0000}dropDpulKeepFirst{color}* and 
*{color:#ff0000}executeAgg{color}* 

*First state operator* is read the topic of KAFKA source and do duplication 
with it,then sink

 
{code:java}
private Dataset<Row> dropDuplKeepFirst(SparkSession sparkSession, Dataset<Row> 
dsRows, String watermarkMinutes,
  String queryName) throws AnalysisException {
   if (watermarkMinutes.equals("0")) {
     return dsRows;
   }
   Dataset<Row> dsDropDuplicates = dsRows.withWatermark("timestamp", 
watermarkMinutes + " minutes");
   dsDropDuplicates = dsDropDuplicates.dropDuplicates("int_id", "timestamp");
   return dsDropDuplicates;
 }{code}
 

*Second state operator* is just agg。

The function 
{code:java}
procSrcDataAfterDropDupl{code}
 and 
{code:java}
procSrcDataAfterDropDupl{code}
do nothing。

 


was (Author: yy3b2007com):
Hi [~kabhwan]&[~gsomogyi],

Yes,the code include one sink;

 
{code:java}
public void start(SparkSession sparkSession) throws Exception {
  init(sparkSession);
   Dataset<Row> dsSourceData = loadSourceData(sparkSession); // dsSourceData
   Dataset<Row> dsPreDropDupl = procSrcDataPreDropDupl(dsSourceData);
   Dataset<Row> dsDropDuplicates = dropDuplicates(sparkSession, dsPreDropDupl);
   Dataset<Row> dsAfterDropDupl = procSrcDataAfterDropDupl(dsDropDuplicates);
   if (this._DropDuplType.equals("keepfirst")||this._DropDuplType.equals("no")) 
{
     Dataset<Row> dsAgg = executeAgg(sparkSession, dsAfterDropDupl);
     writeResult(sparkSession, dsAgg);
   }
   if (this._DropDuplType.equals("keeplast")) {
     writeResult(sparkSession, dsAfterDropDupl);
   }
 }{code}
 

The property "_DropDuplTypes" value equals "keepfirst",So the sink just only 
one。

The state operator include tow *{color:#FF0000}dropDpulKeepFirst{color}* and 
*{color:#FF0000}executeAgg{color}* 

*First state operator* is read the topic of KAFKA source and do duplication 
with it,then sink

 
{code:java}
private Dataset<Row> dropDuplKeepFirst(SparkSession sparkSession, Dataset<Row> 
dsRows, String watermarkMinutes,
  String queryName) throws AnalysisException {
   if (watermarkMinutes.equals("0")) {
     return dsRows;
   }
   Dataset<Row> dsDropDuplicates = dsRows.withWatermark("timestamp", 
watermarkMinutes + " minutes");
   dsDropDuplicates = dsDropDuplicates.dropDuplicates("int_id", "timestamp");
   return dsDropDuplicates;
 }{code}
 

*Second state operator* is just agg。

 

> In Spark2.4 Structured Streaming:The executor storage memory increasing over 
> time
> ---------------------------------------------------------------------------------
>
>                 Key: SPARK-27648
>                 URL: https://issues.apache.org/jira/browse/SPARK-27648
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.4.0
>            Reporter: tommy duan
>            Priority: Major
>         Attachments: houragg(1).out, houragg_filter.csv, 
> image-2019-05-09-17-51-14-036.png, image-2019-05-10-17-49-42-034.png
>
>
> *Spark Program Code Business:*
>  Read the topic on kafka, aggregate the stream data sources, and then output 
> it to another topic line of kafka.
> *Problem Description:*
>  *1) Using spark structured streaming in CDH environment (spark 2.2)*, memory 
> overflow problems often occur (because of too many versions of state stored 
> in memory, this bug has been modified in spark 2.4).
> {code:java}
> /spark-submit \
> --conf “spark.yarn.executor.memoryOverhead=4096M”
> --num-executors 15 \
> --executor-memory 3G \
> --executor-cores 2 \
> --driver-memory 6G \{code}
> {code}
> Executor memory exceptions occur when running with this submit resource under 
> SPARK 2.2 and the normal running time does not exceed one day.
> The solution is to set the executor memory larger than before 
> {code:java}
>  My spark-submit script is as follows:
> /spark-submit\
> conf "spark. yarn. executor. memoryOverhead = 4096M"
> num-executors 15\
> executor-memory 46G\
> executor-cores 3\
> driver-memory 6G\
> ...{code}
> In this case, the spark program can be guaranteed to run stably for a long 
> time, and the executor storage memory is less than 10M (it has been running 
> stably for more than 20 days).
> *2) From the upgrade information of Spark 2.4, we can see that the problem of 
> large memory consumption of state storage has been solved in Spark 2.4.* 
>  So we upgraded spark to SPARK 2.4 under CDH, tried to run the spark program, 
> and found that the use of memory was reduced.
>  But a problem arises, as the running time increases, the storage memory of 
> executor is growing (see Executors - > Storage Memory from the Spark on Yarn 
> Resource Manager UI).
>  This program has been running for 14 days (under SPARK 2.2, running with 
> this submit resource, the normal running time is not more than one day, 
> Executor memory abnormalities will occur).
>  The script submitted by the program under spark2.4 is as follows:
> {code:java}
> /spark-submit \
>  --conf “spark.yarn.executor.memoryOverhead=4096M”
>  --num-executors 15 \
>  --executor-memory 3G \
>  --executor-cores 2 \
>  --driver-memory 6G 
> {code}
> Under Spark 2.4, I counted the size of executor memory as time went by during 
> the running of the spark program:
> |Run-time(hour)|Storage Memory size(MB)|Memory growth rate(MB/hour)|
> |23.5H|41.6MB/1.5GB|1.770212766|
> |108.4H|460.2MB/1.5GB|4.245387454|
> |131.7H|559.1MB/1.5GB|4.245254366|
> |135.4H|575MB/1.5GB|4.246676514|
> |153.6H|641.2MB/1.5GB|4.174479167|
> |219H|888.1MB/1.5GB|4.055251142|
> |263H|1126.4MB/1.5GB|4.282889734|
> |309H|1228.8MB/1.5GB|3.976699029|



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to