[ 
https://issues.apache.org/jira/browse/SPARK-23682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuriy Bondaruk updated SPARK-23682:
-----------------------------------
    Description: 
It seems like there is an issue with memory in structured streaming. A stream 
with aggregation (dropDuplicates()) and data partitioning constantly increases 
memory usage and finally executors fails with exit code 137:

{{ExecutorLostFailure (executor 2 exited caused by one of the running tasks) 
Reason: Container marked as failed: container_1520214726510_0001_01_000003 on 
host: ip-10-0-1-153.us-west-2.compute.internal. Exit status: 137. Diagnostics: 
Container killed on request. Exit code is 137
Container exited with a non-zero exit code 137
Killed by external signal}}

Stream creating looks something like this:

{{session
    .readStream()
    .schema(inputSchema)
    .option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)
    .option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)
    .csv("s3://test-bucket/input")
    .as(Encoders.bean(TestRecord.class))
    .flatMap(mf, Encoders.bean(TestRecord.class))
    .dropDuplicates("testId", "testName")
    .withColumn("year", 
functions.date_format(dataset.col("testTimestamp").cast(DataTypes.DateType), 
"YYYY"))
    .writeStream()
    .option("path", "s3://test-bucket/output")
    .option("checkpointLocation", "s3://test-bucket/checkpoint")
    .trigger(Trigger.ProcessingTime(60, TimeUnit.SECONDS))
    .partitionBy("year")
    .format("parquet")
    .outputMode(OutputMode.Append())
    .queryName("test-stream")
    .start();}}

Analyzing the heap dump I found that most of the memory used by 
{{org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider}} 
that is referenced from 
[StateStore|https://github.com/apache/spark/blob/branch-2.2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L196]
 

On the first glance it looks normal since that is how Spark keeps aggregation 
keys in memory. However I did my testing by renaming files in source folder, so 
that they could be picked up by spark again. Since input records are the same 
all further rows should be rejected as duplicates and memory consumption 
shouldn't increase but it's not true. Moreover, GC time took more than 30% of 
total processing time.

  was:
It seems like there is an issue with memory in structured streaming. A stream 
with aggregation (dropDuplicates()) and data partitioning constantly increases 
memory usage and finally executors fails with exit code 137:

{{ExecutorLostFailure (executor 2 exited caused by one of the running tasks) 
Reason: Container marked as failed: container_1520214726510_0001_01_000003 on 
host: ip-10-0-1-153.us-west-2.compute.internal. Exit status: 137. Diagnostics: 
Container killed on request. Exit code is 137}}
{{ Container exited with a non-zero exit code 137}}
{{ Killed by external signal}}

Stream creating looks something like this:

{{session}}
{{ .readStream()}}
{{ .schema(inputSchema)}}
{{ .option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)}}
{{ .option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)}}
{{ .csv("s3://test-bucket/input")}}
{{ .as(Encoders.bean(TestRecord.class))}}
{{ .flatMap(mf, Encoders.bean(TestRecord.class))}}
{{ .dropDuplicates("testId", "testName")}}
{{ .withColumn("year", 
functions.date_format(dataset.col("testTimestamp").cast(DataTypes.DateType), 
"YYYY"))}}
{{ .writeStream()}}
{{ .option("path", "s3://test-bucket/output")}}
{{ .option("checkpointLocation", "s3://test-bucket/checkpoint")}}
{{ .trigger(Trigger.ProcessingTime(60, TimeUnit.SECONDS))}}
{{ .partitionBy("year")}}
{{ .format("parquet")}}
{{ .outputMode(OutputMode.Append())}}
{{ .queryName("test-stream")}}
{{ .start();}}

Analyzing the heap dump I found that most of the memory used by 
{{org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider}} 
that is referenced from 
[[StateStore|https://github.com/apache/spark/blob/branch-2.2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L196]|https://github.com/apache/spark/blob/branch-2.2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L196]
 

On the first glance it looks normal since that is how Spark keeps aggregation 
keys in memory. However I did my testing by renaming files in source folder, so 
that they could be picked up by spark again. Since input records are the same 
all further rows should be rejected as duplicates and memory consumption 
shouldn't increase but it's not true. Moreover, GC time took more than 30% of 
total processing time.


> Memory issue with Spark structured streaming
> --------------------------------------------
>
>                 Key: SPARK-23682
>                 URL: https://issues.apache.org/jira/browse/SPARK-23682
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL, Structured Streaming
>    Affects Versions: 2.2.0
>         Environment: EMR 5.9.0 with Spark 2.2.0 and Hadoop 2.7.3
> |spark.blacklist.decommissioning.enabled|true|
> |spark.blacklist.decommissioning.timeout|1h|
> |spark.cleaner.periodicGC.interval|10min|
> |spark.default.parallelism|18|
> |spark.dynamicAllocation.enabled|false|
> |spark.eventLog.enabled|true|
> |spark.executor.cores|3|
> |spark.executor.extraJavaOptions|-verbose:gc -XX:+PrintGCDetails 
> -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC 
> -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 
> -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'|
> |spark.executor.id|driver|
> |spark.executor.instances|3|
> |spark.executor.memory|22G|
> |spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version|2|
> |spark.hadoop.parquet.enable.summary-metadata|false|
> |spark.hadoop.yarn.timeline-service.enabled|false|
> |spark.jars| |
> |spark.master|yarn|
> |spark.memory.fraction|0.9|
> |spark.memory.storageFraction|0.3|
> |spark.memory.useLegacyMode|false|
> |spark.rdd.compress|true|
> |spark.resourceManager.cleanupExpiredHost|true|
> |spark.scheduler.mode|FIFO|
> |spark.serializer|org.apache.spark.serializer.KryoSerializer|
> |spark.shuffle.service.enabled|true|
> |spark.speculation|false|
> |spark.sql.parquet.filterPushdown|true|
> |spark.sql.parquet.mergeSchema|false|
> |spark.sql.warehouse.dir|hdfs:///user/spark/warehouse|
> |spark.stage.attempt.ignoreOnDecommissionFetchFailure|true|
> |spark.submit.deployMode|client|
> |spark.yarn.am.cores|1|
> |spark.yarn.am.memory|2G|
> |spark.yarn.am.memoryOverhead|1G|
> |spark.yarn.executor.memoryOverhead|3G|
>            Reporter: Yuriy Bondaruk
>            Priority: Major
>              Labels: Memory, memory, memory-leak
>
> It seems like there is an issue with memory in structured streaming. A stream 
> with aggregation (dropDuplicates()) and data partitioning constantly 
> increases memory usage and finally executors fails with exit code 137:
> {{ExecutorLostFailure (executor 2 exited caused by one of the running tasks) 
> Reason: Container marked as failed: container_1520214726510_0001_01_000003 on 
> host: ip-10-0-1-153.us-west-2.compute.internal. Exit status: 137. 
> Diagnostics: Container killed on request. Exit code is 137
> Container exited with a non-zero exit code 137
> Killed by external signal}}
> Stream creating looks something like this:
> {{session
>     .readStream()
>     .schema(inputSchema)
>     .option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)
>     .option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)
>     .csv("s3://test-bucket/input")
>     .as(Encoders.bean(TestRecord.class))
>     .flatMap(mf, Encoders.bean(TestRecord.class))
>     .dropDuplicates("testId", "testName")
>     .withColumn("year", 
> functions.date_format(dataset.col("testTimestamp").cast(DataTypes.DateType), 
> "YYYY"))
>     .writeStream()
>     .option("path", "s3://test-bucket/output")
>     .option("checkpointLocation", "s3://test-bucket/checkpoint")
>     .trigger(Trigger.ProcessingTime(60, TimeUnit.SECONDS))
>     .partitionBy("year")
>     .format("parquet")
>     .outputMode(OutputMode.Append())
>     .queryName("test-stream")
>     .start();}}
> Analyzing the heap dump I found that most of the memory used by 
> {{org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider}}
>  that is referenced from 
> [StateStore|https://github.com/apache/spark/blob/branch-2.2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L196]
>  
> On the first glance it looks normal since that is how Spark keeps aggregation 
> keys in memory. However I did my testing by renaming files in source folder, 
> so that they could be picked up by spark again. Since input records are the 
> same all further rows should be rejected as duplicates and memory consumption 
> shouldn't increase but it's not true. Moreover, GC time took more than 30% of 
> total processing time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to