[ https://issues.apache.org/jira/browse/SPARK-24970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-24970: ------------------------------------ Assignee: (was: Apache Spark) > Spark Kinesis streaming application fails to recover from streaming > checkpoint due to ProvisionedThroughputExceededException > ---------------------------------------------------------------------------------------------------------------------------- > > Key: SPARK-24970 > URL: https://issues.apache.org/jira/browse/SPARK-24970 > Project: Spark > Issue Type: Bug > Components: DStreams > Affects Versions: 2.2.0 > Reporter: bruce_zhao > Priority: Major > Labels: kinesis > > > We're using Spark streaming to consume Kinesis data, and found that it reads > more data from Kinesis and is easy to touch > ProvisionedThroughputExceededException *when it recovers from streaming > checkpoint*. > > Normally, it's a WARN in spark log. But when we have multiple streaming > applications (i.e., 5 applications) to consume the same Kinesis stream, the > situation becomes serious. *The application will fail to recover due to the > following exception in driver.* And one application failure will also affect > the other running applications. > > {panel:title=Exception} > org.apache.spark.SparkException: Job aborted due to stage failure: > {color:#ff0000}*Task 5 in stage 7.0 failed 4 times, most recent > failure*:{color} Lost task 5.3 in stage 7.0 (TID 128, > ip-172-31-14-36.ap-northeast-1.compute.internal, executor 1): > org.apache.spark.SparkException: Gave up after 3 retries while getting > records using shard iterator, last exception: at > org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$retryOrTimeout$2.apply(KinesisBackedBlockRDD.scala:288) > at scala.Option.getOrElse(Option.scala:121) at > org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:282) > at > org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecordsAndNextKinesisIterator(KinesisBackedBlockRDD.scala:223) > at > org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecords(KinesisBackedBlockRDD.scala:207) > at > org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:162) > at > org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:133) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at > scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438) at > scala.collection.Iterator$class.foreach(Iterator.scala:893) at > scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at > org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:509) > at > org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:333) > at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1954) at > org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269) > Caused by: > *{color:#ff0000}com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException: > Rate exceeded for shard shardId-000000000000 in stream rellfsstream-an under > account 1111111111111{color}.* (Service: AmazonKinesis; Status Code: 400; > Error Code: ProvisionedThroughputExceededException; Request ID: > d3520677-060e-14c4-8014-2886b6b75f03) at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1587) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1257) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1029) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:741) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:715) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:697) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:665) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:647) > at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:511) at > com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2219) > at > com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2195) > at > com.amazonaws.services.kinesis.AmazonKinesisClient.executeGetRecords(AmazonKinesisClient.java:1004) > at > com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:980) > at > org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$2.apply(KinesisBackedBlockRDD.scala:224) > at > org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$2.apply(KinesisBackedBlockRDD.scala:224) > at > org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:269) > {panel} > > After check the source code, we found it calls getBlockFromKinesis() to > recover data and in this function it accesses Kinesis directly to read data. > As all partitions in the BlockRDD will access Kinesis, and AWS Kinesis only > supports 5 concurrency reads per shard per second, it will touch > ProvisionedThroughputExceededException easily. Even the code does some > retries, it's still easy to fail when conflicts is heavy. > > {code:java} > // KinesisBackedBlockRDD.scala > def getBlockFromKinesis(): Iterator[T] = { > val credentials = kinesisCreds.provider.getCredentials > partition.seqNumberRanges.ranges.iterator.flatMap { range => > new KinesisSequenceRangeIterator(credentials, endpointUrl, regionName, > range, kinesisReadConfigs).map(messageHandler) > } > } > if (partition.isBlockIdValid) { > getBlockFromBlockManager().getOrElse { getBlockFromKinesis() } > } else { > getBlockFromKinesis() > } > {code} > > Why do we need to re-read data from Kinesis directly? Is there any way to > avoid it under the current design? > Mostly, when we use Spark streaming, we will enable WAL. Then we can recover > data from WAL, instead of re-reading from Kinesis directly. > I'd like to make a change for this. It may not be a perfect fix, but at least > we can provide a choice to avoid this problem under current design. > > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org