[ 
https://issues.apache.org/jira/browse/SPARK-24970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-24970:
------------------------------------

    Assignee:     (was: Apache Spark)

> Spark Kinesis streaming application fails to recover from streaming 
> checkpoint due to ProvisionedThroughputExceededException
> ----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-24970
>                 URL: https://issues.apache.org/jira/browse/SPARK-24970
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 2.2.0
>            Reporter: bruce_zhao
>            Priority: Major
>              Labels: kinesis
>
>  
> We're using Spark streaming to consume Kinesis data, and found that it reads 
> more data from Kinesis and is easy to touch 
> ProvisionedThroughputExceededException *when it recovers from streaming 
> checkpoint*. 
>  
> Normally, it's a WARN in spark log. But when we have multiple streaming 
> applications (i.e., 5 applications) to consume the same Kinesis stream, the 
> situation becomes serious. *The application will fail to recover due to the 
> following exception in driver.*  And one application failure will also affect 
> the other running applications. 
>   
> {panel:title=Exception}
> org.apache.spark.SparkException: Job aborted due to stage failure: 
> {color:#ff0000}*Task 5 in stage 7.0 failed 4 times, most recent 
> failure*:{color} Lost task 5.3 in stage 7.0 (TID 128, 
> ip-172-31-14-36.ap-northeast-1.compute.internal, executor 1): 
> org.apache.spark.SparkException: Gave up after 3 retries while getting 
> records using shard iterator, last exception: at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$retryOrTimeout$2.apply(KinesisBackedBlockRDD.scala:288)
>  at scala.Option.getOrElse(Option.scala:121) at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:282)
>  at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecordsAndNextKinesisIterator(KinesisBackedBlockRDD.scala:223)
>  at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecords(KinesisBackedBlockRDD.scala:207)
>  at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:162)
>  at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:133)
>  at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at 
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at 
> scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438) at 
> scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at 
> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:509)
>  at 
> org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:333)
>  at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1954) at 
> org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
> Caused by: 
> *{color:#ff0000}com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException:
>  Rate exceeded for shard shardId-000000000000 in stream rellfsstream-an under 
> account 1111111111111{color}.* (Service: AmazonKinesis; Status Code: 400; 
> Error Code: ProvisionedThroughputExceededException; Request ID: 
> d3520677-060e-14c4-8014-2886b6b75f03) at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1587)
>  at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1257)
>  at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1029)
>  at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:741)
>  at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:715)
>  at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:697)
>  at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:665)
>  at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:647)
>  at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:511) at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2219)
>  at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2195)
>  at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.executeGetRecords(AmazonKinesisClient.java:1004)
>  at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:980)
>  at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$2.apply(KinesisBackedBlockRDD.scala:224)
>  at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$2.apply(KinesisBackedBlockRDD.scala:224)
>  at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:269)
> {panel}
>  
> After check the source code, we found it calls getBlockFromKinesis() to 
> recover data and in this function it accesses Kinesis directly to read data. 
> As all partitions in the BlockRDD will access Kinesis, and AWS Kinesis only 
> supports 5 concurrency reads per shard per second, it will touch 
> ProvisionedThroughputExceededException easily. Even the code does some 
> retries, it's still easy to fail when conflicts is heavy. 
>   
> {code:java}
> // KinesisBackedBlockRDD.scala
> def getBlockFromKinesis(): Iterator[T] = {
>   val credentials = kinesisCreds.provider.getCredentials
>   partition.seqNumberRanges.ranges.iterator.flatMap { range =>
>     new KinesisSequenceRangeIterator(credentials, endpointUrl, regionName,
>       range, kinesisReadConfigs).map(messageHandler)
>   }
> }
> if (partition.isBlockIdValid) {
>   getBlockFromBlockManager().getOrElse { getBlockFromKinesis() }
> } else {
>   getBlockFromKinesis()
> }
> {code}
>   
> Why do we need to re-read data from Kinesis directly? Is there any way to 
> avoid it under the current design?
> Mostly, when we use Spark streaming, we will enable WAL. Then we can recover 
> data from WAL, instead of re-reading from Kinesis directly.
> I'd like to make a change for this. It may not be a perfect fix, but at least 
> we can provide a choice to avoid this problem under current design. 
>  
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to