[
https://issues.apache.org/jira/browse/SPARK-19364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hyukjin Kwon resolved SPARK-19364.
----------------------------------
Resolution: Incomplete
> Stream Blocks in Storage Persists Forever when Kinesis Checkpoints are
> enabled and an exception is thrown
> ----------------------------------------------------------------------------------------------------------
>
> Key: SPARK-19364
> URL: https://issues.apache.org/jira/browse/SPARK-19364
> Project: Spark
> Issue Type: Bug
> Components: DStreams
> Affects Versions: 2.0.2
> Environment: ubuntu unix
> spark 2.0.2
> application is java
> Reporter: Andrew Milkowski
> Priority: Major
> Labels: bulk-closed
>
> -- update --- we found that below situation occurs when we encounter
> "com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException:
> Can't update checkpoint - instance doesn't hold the lease for this shard"
> https://github.com/awslabs/amazon-kinesis-client/issues/108
> we use s3 directory (and dynamodb) to store checkpoints, but if such occurs
> blocks should not get stuck but continue to be evicted gracefully from
> memory, obviously kinesis library race condition is a problem onto itself...
> -- exception leading to a block not being freed up --
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/mnt/yarn/usercache/hadoop/filecache/24/__spark_libs__7928020266533182031.zip/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 17/01/26 13:52:00 ERROR KinesisRecordProcessor: ShutdownException: Caught
> shutdown exception, skipping checkpoint.
> com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException:
> Can't update checkpoint - instance doesn't hold the lease for this shard
> at
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibLeaseCoordinator.setCheckpoint(KinesisClientLibLeaseCoordinator.java:120)
> at
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer.advancePosition(RecordProcessorCheckpointer.java:216)
> at
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer.checkpoint(RecordProcessorCheckpointer.java:137)
> at
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer.checkpoint(RecordProcessorCheckpointer.java:103)
> at
> org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1$$anonfun$apply$1.apply$mcV$sp(KinesisCheckpointer.scala:81)
> at
> org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1$$anonfun$apply$1.apply(KinesisCheckpointer.scala:81)
> at
> org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1$$anonfun$apply$1.apply(KinesisCheckpointer.scala:81)
> at scala.util.Try$.apply(Try.scala:192)
> at
> org.apache.spark.streaming.kinesis.KinesisRecordProcessor$.retryRandom(KinesisRecordProcessor.scala:144)
> at
> org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1.apply(KinesisCheckpointer.scala:81)
> at
> org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1.apply(KinesisCheckpointer.scala:75)
> at scala.Option.foreach(Option.scala:257)
> at
> org.apache.spark.streaming.kinesis.KinesisCheckpointer.checkpoint(KinesisCheckpointer.scala:75)
> at
> org.apache.spark.streaming.kinesis.KinesisCheckpointer.org$apache$spark$streaming$kinesis$KinesisCheckpointer$$checkpointAll(KinesisCheckpointer.scala:103)
> at
> org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$1.apply$mcVJ$sp(KinesisCheckpointer.scala:117)
> at
> org.apache.spark.streaming.util.RecurringTimer.triggerActionForNextInterval(RecurringTimer.scala:94)
> at
> org.apache.spark.streaming.util.RecurringTimer.org$apache$spark$streaming$util$RecurringTimer$$loop(RecurringTimer.scala:106)
> at
> org.apache.spark.streaming.util.RecurringTimer$$anon$1.run(RecurringTimer.scala:29)
> running standard kinesis stream ingestion with a java spark app and creating
> dstream after running for some time some block streams seem to persist
> forever and never cleaned up and this eventually leads to memory depletion on
> workers
> we even tried cleaning RDD's with the following:
> cleaner = ssc.sparkContext().sc().cleaner().get();
> filtered.foreachRDD(new VoidFunction<JavaRDD<String>>() {
> @Override
> public void call(JavaRDD<String> rdd) throws Exception {
> cleaner.doCleanupRDD(rdd.id(), true);
> }
> });
> despite above blocks do persis still, this can be seen in spark admin UI
> for instance
> input-0-1485362233945 1 ip-<>:34245 Memory Serialized 1442.5
> KB
> above block stays and is never cleaned up
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]