[
https://issues.apache.org/jira/browse/SPARK-27648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16930763#comment-16930763
]
Harichandan Pulagam edited comment on SPARK-27648 at 9/16/19 6:38 PM:
----------------------------------------------------------------------
Here's a code example that reproduces the above issue, using the Apache
distribution of Spark 2.4.3, and local filesystem for checkpointing:
{noformat}
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import java.sql.Timestamp
case class UserEvent(
name: String,
age: Int,
eventTime: Timestamp)
val schema: StructType = StructType(
StructField("timestamp", StringType, true) ::
StructField("name", StringType, true) ::
StructField("age", IntegerType, true) :: Nil)
val kafkaProperties: Map[String, String] =
Map(
"subscribe" -> "user",
"startingOffsets" -> "earliest",
"kafka.bootstrap.servers" -> <kafka broker list>)
val ds = spark.readStream
.format("kafka")
.options(kafkaProperties)
.load
.selectExpr("CAST (value AS STRING) AS JSON")
.select(from_json(col("json"), schema).as("data"))
.select(
col("data.name").as("name"),
col("data.age").as("age"),
col("data.timestamp".as("timestamp"))
.withColumn("eventTime", col("timestamp").cast(TimestampType))
.drop("timestamp")
.as[UserEvent]
val countDS =
ds.withWatermark("eventTime", "1 minute")
.groupBy(
window(
col("eventTime"), "1 minute", "30 seconds"),
col("age"))
.count
countDS.withColumn("topic", lit("user_out"))
.selectExpr("topic", s"to_json(struct(name, age)) AS value")
.writeStream
.outputMode("append")
.format("kafka")
.trigger(Trigger.ProcessingTime("1 second"))
.option("checkpointLocation", "file:///tmp")
.option("kafka.bootstrap.servers", <kafka broker list>)
.start
{noformat}
was (Author: harichandan):
Here's a code example that reproduces the above issue, using the Apache
distribution of Spark 2.4.3, and local filesystem for checkpointing:
{noformat}
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import java.sql.Timestamp
case class UserEvent(
name: String,
age: Int,
eventTime: Timestamp)
val schema: StructType = StructType(
StructField("timestamp", StringType, true) ::
StructField("name", StringType, true) ::
StructField("age", IntegerType, true) :: Nil)
val kafkaProperties: Map[String, String] =
Map(
"subscribe" -> "user",
"startingOffsets" -> "earliest",
"kafka.bootstrap.servers" -> <kafka broker list>)
val ds = spark.readStream
.format("kafka")
.options(kafkaProperties)
.load
.selectExpr("CAST (value AS STRING) AS JSON")
.select(from_json(col("json"), schema).as("data"))
.select(
col("data.name").as("name"),
col("data.age").as("age"),
col("data.timestamp".as("timestamp"))
.withColumn("eventTime", col("timestamp").cast(TimestampType))
.drop("timestamp")
.as[UserEvent]
val countDS =
ds.withWatermark("eventTime", "1 minute")
.groupBy(
window(
col("eventTime"), "1 minute", "30 seconds"),
col("organization_id"))
.count
countDS.withColumn("topic", lit("user_out"))
.selectExpr("topic", s"to_json(struct(name, age)) AS value")
.writeStream
.outputMode("append")
.format("kafka")
.trigger(Trigger.ProcessingTime("1 second"))
.option("checkpointLocation", "file:///tmp")
.option("kafka.bootstrap.servers", <kafka broker list>)
.start
{noformat}
> In Spark2.4 Structured Streaming:The executor storage memory increasing over
> time
> ---------------------------------------------------------------------------------
>
> Key: SPARK-27648
> URL: https://issues.apache.org/jira/browse/SPARK-27648
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 2.4.0
> Reporter: tommy duan
> Priority: Major
> Attachments: houragg(1).out, houragg_filter.csv,
> houragg_with_state1_state2.csv, houragg_with_state1_state2.xlsx,
> image-2019-05-09-17-51-14-036.png, image-2019-05-10-17-49-42-034.png,
> image-2019-05-24-10-20-25-723.png, image-2019-05-27-10-10-30-460.png,
> image-2019-06-02-19-43-21-652.png
>
>
> *Spark Program Code Business:*
> Read the topic on kafka, aggregate the stream data sources, and then output
> it to another topic line of kafka.
> *Problem Description:*
> *1) Using spark structured streaming in CDH environment (spark 2.2)*, memory
> overflow problems often occur (because of too many versions of state stored
> in memory, this bug has been modified in spark 2.4).
> {code:java}
> /spark-submit \
> --conf “spark.yarn.executor.memoryOverhead=4096M”
> --num-executors 15 \
> --executor-memory 3G \
> --executor-cores 2 \
> --driver-memory 6G \{code}
> {code}
> Executor memory exceptions occur when running with this submit resource under
> SPARK 2.2 and the normal running time does not exceed one day.
> The solution is to set the executor memory larger than before
> {code:java}
> My spark-submit script is as follows:
> /spark-submit\
> conf "spark. yarn. executor. memoryOverhead = 4096M"
> num-executors 15\
> executor-memory 46G\
> executor-cores 3\
> driver-memory 6G\
> ...{code}
> In this case, the spark program can be guaranteed to run stably for a long
> time, and the executor storage memory is less than 10M (it has been running
> stably for more than 20 days).
> *2) From the upgrade information of Spark 2.4, we can see that the problem of
> large memory consumption of state storage has been solved in Spark 2.4.*
> So we upgraded spark to SPARK 2.4 under CDH, tried to run the spark program,
> and found that the use of memory was reduced.
> But a problem arises, as the running time increases, the storage memory of
> executor is growing (see Executors - > Storage Memory from the Spark on Yarn
> Resource Manager UI).
> This program has been running for 14 days (under SPARK 2.2, running with
> this submit resource, the normal running time is not more than one day,
> Executor memory abnormalities will occur).
> The script submitted by the program under spark2.4 is as follows:
> {code:java}
> /spark-submit \
> --conf “spark.yarn.executor.memoryOverhead=4096M”
> --num-executors 15 \
> --executor-memory 3G \
> --executor-cores 2 \
> --driver-memory 6G
> {code}
> Under Spark 2.4, I counted the size of executor memory as time went by during
> the running of the spark program:
> |Run-time(hour)|Storage Memory size(MB)|Memory growth rate(MB/hour)|
> |23.5H|41.6MB/1.5GB|1.770212766|
> |108.4H|460.2MB/1.5GB|4.245387454|
> |131.7H|559.1MB/1.5GB|4.245254366|
> |135.4H|575MB/1.5GB|4.246676514|
> |153.6H|641.2MB/1.5GB|4.174479167|
> |219H|888.1MB/1.5GB|4.055251142|
> |263H|1126.4MB/1.5GB|4.282889734|
> |309H|1228.8MB/1.5GB|3.976699029|
--
This message was sent by Atlassian Jira
(v8.3.2#803003)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]