[ 
https://issues.apache.org/jira/browse/SPARK-27648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16930763#comment-16930763
 ] 

Harichandan Pulagam commented on SPARK-27648:
---------------------------------------------

Here's a code example that reproduces the above issue, using the Apache 
distribution of Spark 2.4.3, and local filesystem for checkpointing:

{noformat}
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import java.sql.Timestamp

case class UserEvent(
   name: String,
   age: Int,
   eventTime: Timestamp)

val schema: StructType = StructType(
  StructField("timestamp", StringType, true) ::
  StructField("name", StringType, true) ::
  StructField("age", IntegerType, true) :: Nil)

val kafkaProperties: Map[String, String] =
   Map(
     "subscribe" -> "user",
     "startingOffsets" -> "earliest",
     "kafka.bootstrap.servers" -> <kafka broker list>)

val ds = spark.readStream
 .format("kafka")
 .options(kafkaProperties)
 .load
 .selectExpr("CAST (value AS STRING) AS JSON")
 .select(from_json(col("json"), schema).as("data"))
 .select(
   col("data.name").as("name"),
   col("data.age").as("age"),
   col("data.timestamp".as("timestamp"))
 .withColumn("eventTime", col("timestamp").cast(TimestampType))
 .drop("timestamp")
 .as[UserEvent]

val countDS =
  ds.withWatermark("eventTime", "1 minute")
    .groupBy(
      window(
        col("eventTime"), "1 minute", "30 seconds"),
        col("organization_id"))
    .count

countDS.withColumn("topic", lit("user_out"))
  .selectExpr("topic", s"to_json(struct(name, age)) AS value")
  .writeStream
  .outputMode("append")
  .format("kafka")
  .trigger(Trigger.ProcessingTime("1 second"))
  .option("checkpointLocation", "file:///tmp")
  .option("kafka.bootstrap.servers", <kafka broker list>)
  .start
{noformat}

> In Spark2.4 Structured Streaming:The executor storage memory increasing over 
> time
> ---------------------------------------------------------------------------------
>
>                 Key: SPARK-27648
>                 URL: https://issues.apache.org/jira/browse/SPARK-27648
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.4.0
>            Reporter: tommy duan
>            Priority: Major
>         Attachments: houragg(1).out, houragg_filter.csv, 
> houragg_with_state1_state2.csv, houragg_with_state1_state2.xlsx, 
> image-2019-05-09-17-51-14-036.png, image-2019-05-10-17-49-42-034.png, 
> image-2019-05-24-10-20-25-723.png, image-2019-05-27-10-10-30-460.png, 
> image-2019-06-02-19-43-21-652.png
>
>
> *Spark Program Code Business:*
>  Read the topic on kafka, aggregate the stream data sources, and then output 
> it to another topic line of kafka.
> *Problem Description:*
>  *1) Using spark structured streaming in CDH environment (spark 2.2)*, memory 
> overflow problems often occur (because of too many versions of state stored 
> in memory, this bug has been modified in spark 2.4).
> {code:java}
> /spark-submit \
> --conf “spark.yarn.executor.memoryOverhead=4096M”
> --num-executors 15 \
> --executor-memory 3G \
> --executor-cores 2 \
> --driver-memory 6G \{code}
> {code}
> Executor memory exceptions occur when running with this submit resource under 
> SPARK 2.2 and the normal running time does not exceed one day.
> The solution is to set the executor memory larger than before 
> {code:java}
>  My spark-submit script is as follows:
> /spark-submit\
> conf "spark. yarn. executor. memoryOverhead = 4096M"
> num-executors 15\
> executor-memory 46G\
> executor-cores 3\
> driver-memory 6G\
> ...{code}
> In this case, the spark program can be guaranteed to run stably for a long 
> time, and the executor storage memory is less than 10M (it has been running 
> stably for more than 20 days).
> *2) From the upgrade information of Spark 2.4, we can see that the problem of 
> large memory consumption of state storage has been solved in Spark 2.4.* 
>  So we upgraded spark to SPARK 2.4 under CDH, tried to run the spark program, 
> and found that the use of memory was reduced.
>  But a problem arises, as the running time increases, the storage memory of 
> executor is growing (see Executors - > Storage Memory from the Spark on Yarn 
> Resource Manager UI).
>  This program has been running for 14 days (under SPARK 2.2, running with 
> this submit resource, the normal running time is not more than one day, 
> Executor memory abnormalities will occur).
>  The script submitted by the program under spark2.4 is as follows:
> {code:java}
> /spark-submit \
>  --conf “spark.yarn.executor.memoryOverhead=4096M”
>  --num-executors 15 \
>  --executor-memory 3G \
>  --executor-cores 2 \
>  --driver-memory 6G 
> {code}
> Under Spark 2.4, I counted the size of executor memory as time went by during 
> the running of the spark program:
> |Run-time(hour)|Storage Memory size(MB)|Memory growth rate(MB/hour)|
> |23.5H|41.6MB/1.5GB|1.770212766|
> |108.4H|460.2MB/1.5GB|4.245387454|
> |131.7H|559.1MB/1.5GB|4.245254366|
> |135.4H|575MB/1.5GB|4.246676514|
> |153.6H|641.2MB/1.5GB|4.174479167|
> |219H|888.1MB/1.5GB|4.055251142|
> |263H|1126.4MB/1.5GB|4.282889734|
> |309H|1228.8MB/1.5GB|3.976699029|



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to