[ https://issues.apache.org/jira/browse/SPARK-23682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16622198#comment-16622198 ]
Sahil Aggarwal commented on SPARK-23682: ---------------------------------------- I am facing the same issue where most of the memory is consumed by HDFSBackedStateStoreProvider. It is every growing memory which keep getting promoted to old gen hence leading to consistent full GC. Any workaround to avoid this? In my job i am doing just the windowed groupby with watermark of 5 mins and was expecting the memory to drop after the data is written to the output sink. Thanks > Memory issue with Spark structured streaming > -------------------------------------------- > > Key: SPARK-23682 > URL: https://issues.apache.org/jira/browse/SPARK-23682 > Project: Spark > Issue Type: Bug > Components: SQL, Structured Streaming > Affects Versions: 2.2.0 > Environment: EMR 5.9.0 with Spark 2.2.0 and Hadoop 2.7.3 > |spark.blacklist.decommissioning.enabled|true| > |spark.blacklist.decommissioning.timeout|1h| > |spark.cleaner.periodicGC.interval|10min| > |spark.default.parallelism|18| > |spark.dynamicAllocation.enabled|false| > |spark.eventLog.enabled|true| > |spark.executor.cores|3| > |spark.executor.extraJavaOptions|-verbose:gc -XX:+PrintGCDetails > -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC > -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 > -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'| > |spark.executor.id|driver| > |spark.executor.instances|3| > |spark.executor.memory|22G| > |spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version|2| > |spark.hadoop.parquet.enable.summary-metadata|false| > |spark.hadoop.yarn.timeline-service.enabled|false| > |spark.jars| | > |spark.master|yarn| > |spark.memory.fraction|0.9| > |spark.memory.storageFraction|0.3| > |spark.memory.useLegacyMode|false| > |spark.rdd.compress|true| > |spark.resourceManager.cleanupExpiredHost|true| > |spark.scheduler.mode|FIFO| > |spark.serializer|org.apache.spark.serializer.KryoSerializer| > |spark.shuffle.service.enabled|true| > |spark.speculation|false| > |spark.sql.parquet.filterPushdown|true| > |spark.sql.parquet.mergeSchema|false| > |spark.sql.warehouse.dir|hdfs:///user/spark/warehouse| > |spark.stage.attempt.ignoreOnDecommissionFetchFailure|true| > |spark.submit.deployMode|client| > |spark.yarn.am.cores|1| > |spark.yarn.am.memory|2G| > |spark.yarn.am.memoryOverhead|1G| > |spark.yarn.executor.memoryOverhead|3G| > Reporter: Yuriy Bondaruk > Priority: Major > Labels: Memory, memory, memory-leak > Attachments: Screen Shot 2018-03-07 at 21.52.17.png, Screen Shot > 2018-03-10 at 18.53.49.png, Screen Shot 2018-03-28 at 16.44.20.png, Screen > Shot 2018-03-28 at 16.44.20.png, Screen Shot 2018-03-28 at 16.44.20.png, > Spark executors GC time.png, image-2018-03-22-14-46-31-960.png, > screen_shot_2018-03-20_at_15.23.29.png > > > It seems like there is an issue with memory in structured streaming. A stream > with aggregation (dropDuplicates()) and data partitioning constantly > increases memory usage and finally executors fails with exit code 137: > {quote}ExecutorLostFailure (executor 2 exited caused by one of the running > tasks) Reason: Container marked as failed: > container_1520214726510_0001_01_000003 on host: > ip-10-0-1-153.us-west-2.compute.internal. Exit status: 137. Diagnostics: > Container killed on request. Exit code is 137 > Container exited with a non-zero exit code 137 > Killed by external signal{quote} > Stream creating looks something like this: > {quote}session > .readStream() > .schema(inputSchema) > .option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB) > .option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF) > .csv("s3://test-bucket/input") > .as(Encoders.bean(TestRecord.class)) > .flatMap(mf, Encoders.bean(TestRecord.class)) > .dropDuplicates("testId", "testName") > .withColumn("year", > functions.date_format(dataset.col("testTimestamp").cast(DataTypes.DateType), > "YYYY")) > .writeStream() > .option("path", "s3://test-bucket/output") > .option("checkpointLocation", "s3://test-bucket/checkpoint") > .trigger(Trigger.ProcessingTime(60, TimeUnit.SECONDS)) > .partitionBy("year") > .format("parquet") > .outputMode(OutputMode.Append()) > .queryName("test-stream") > .start();{quote} > Analyzing the heap dump I found that most of the memory used by > {{org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider}} > that is referenced from > [StateStore|https://github.com/apache/spark/blob/branch-2.2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L196] > > On the first glance it looks normal since that is how Spark keeps aggregation > keys in memory. However I did my testing by renaming files in source folder, > so that they could be picked up by spark again. Since input records are the > same all further rows should be rejected as duplicates and memory consumption > shouldn't increase but it's not true. Moreover, GC time took more than 30% of > total processing time. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org