[ https://issues.apache.org/jira/browse/SPARK-27648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16930763#comment-16930763 ]
Harichandan Pulagam commented on SPARK-27648: --------------------------------------------- Here's a code example that reproduces the above issue, using the Apache distribution of Spark 2.4.3, and local filesystem for checkpointing: {noformat} import org.apache.spark.sql.streaming.Trigger import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import java.sql.Timestamp case class UserEvent( name: String, age: Int, eventTime: Timestamp) val schema: StructType = StructType( StructField("timestamp", StringType, true) :: StructField("name", StringType, true) :: StructField("age", IntegerType, true) :: Nil) val kafkaProperties: Map[String, String] = Map( "subscribe" -> "user", "startingOffsets" -> "earliest", "kafka.bootstrap.servers" -> <kafka broker list>) val ds = spark.readStream .format("kafka") .options(kafkaProperties) .load .selectExpr("CAST (value AS STRING) AS JSON") .select(from_json(col("json"), schema).as("data")) .select( col("data.name").as("name"), col("data.age").as("age"), col("data.timestamp".as("timestamp")) .withColumn("eventTime", col("timestamp").cast(TimestampType)) .drop("timestamp") .as[UserEvent] val countDS = ds.withWatermark("eventTime", "1 minute") .groupBy( window( col("eventTime"), "1 minute", "30 seconds"), col("organization_id")) .count countDS.withColumn("topic", lit("user_out")) .selectExpr("topic", s"to_json(struct(name, age)) AS value") .writeStream .outputMode("append") .format("kafka") .trigger(Trigger.ProcessingTime("1 second")) .option("checkpointLocation", "file:///tmp") .option("kafka.bootstrap.servers", <kafka broker list>) .start {noformat} > In Spark2.4 Structured Streaming:The executor storage memory increasing over > time > --------------------------------------------------------------------------------- > > Key: SPARK-27648 > URL: https://issues.apache.org/jira/browse/SPARK-27648 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.4.0 > Reporter: tommy duan > Priority: Major > Attachments: houragg(1).out, houragg_filter.csv, > houragg_with_state1_state2.csv, houragg_with_state1_state2.xlsx, > image-2019-05-09-17-51-14-036.png, image-2019-05-10-17-49-42-034.png, > image-2019-05-24-10-20-25-723.png, image-2019-05-27-10-10-30-460.png, > image-2019-06-02-19-43-21-652.png > > > *Spark Program Code Business:* > Read the topic on kafka, aggregate the stream data sources, and then output > it to another topic line of kafka. > *Problem Description:* > *1) Using spark structured streaming in CDH environment (spark 2.2)*, memory > overflow problems often occur (because of too many versions of state stored > in memory, this bug has been modified in spark 2.4). > {code:java} > /spark-submit \ > --conf “spark.yarn.executor.memoryOverhead=4096M” > --num-executors 15 \ > --executor-memory 3G \ > --executor-cores 2 \ > --driver-memory 6G \{code} > {code} > Executor memory exceptions occur when running with this submit resource under > SPARK 2.2 and the normal running time does not exceed one day. > The solution is to set the executor memory larger than before > {code:java} > My spark-submit script is as follows: > /spark-submit\ > conf "spark. yarn. executor. memoryOverhead = 4096M" > num-executors 15\ > executor-memory 46G\ > executor-cores 3\ > driver-memory 6G\ > ...{code} > In this case, the spark program can be guaranteed to run stably for a long > time, and the executor storage memory is less than 10M (it has been running > stably for more than 20 days). > *2) From the upgrade information of Spark 2.4, we can see that the problem of > large memory consumption of state storage has been solved in Spark 2.4.* > So we upgraded spark to SPARK 2.4 under CDH, tried to run the spark program, > and found that the use of memory was reduced. > But a problem arises, as the running time increases, the storage memory of > executor is growing (see Executors - > Storage Memory from the Spark on Yarn > Resource Manager UI). > This program has been running for 14 days (under SPARK 2.2, running with > this submit resource, the normal running time is not more than one day, > Executor memory abnormalities will occur). > The script submitted by the program under spark2.4 is as follows: > {code:java} > /spark-submit \ > --conf “spark.yarn.executor.memoryOverhead=4096M” > --num-executors 15 \ > --executor-memory 3G \ > --executor-cores 2 \ > --driver-memory 6G > {code} > Under Spark 2.4, I counted the size of executor memory as time went by during > the running of the spark program: > |Run-time(hour)|Storage Memory size(MB)|Memory growth rate(MB/hour)| > |23.5H|41.6MB/1.5GB|1.770212766| > |108.4H|460.2MB/1.5GB|4.245387454| > |131.7H|559.1MB/1.5GB|4.245254366| > |135.4H|575MB/1.5GB|4.246676514| > |153.6H|641.2MB/1.5GB|4.174479167| > |219H|888.1MB/1.5GB|4.055251142| > |263H|1126.4MB/1.5GB|4.282889734| > |309H|1228.8MB/1.5GB|3.976699029| -- This message was sent by Atlassian Jira (v8.3.2#803003) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org