[
https://issues.apache.org/jira/browse/SPARK-24970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
bruce_zhao updated SPARK-24970:
-------------------------------
Description:
We're using Spark streaming to consume Kinesis data, and found that it reads
more data from Kinesis and is easy to touch
ProvisionedThroughputExceededException *when it recovers from streaming
checkpoint*.
Normally, it's a WARN in spark log. But when we have multiple streaming
applications (i.e., 5 applications) to consume the same Kinesis stream, the
situation becomes serious. *The application will fail to recover due to the
following exception in driver.* And one application failure will also affect
the other running applications.
{panel:title=Exception}
org.apache.spark.SparkException: Job aborted due to stage failure:
{color:#ff0000}*Task 5 in stage 7.0 failed 4 times, most recent
failure*:{color} Lost task 5.3 in stage 7.0 (TID 128,
ip-172-31-14-36.ap-northeast-1.compute.internal, executor 1):
org.apache.spark.SparkException: Gave up after 3 retries while getting records
using shard iterator, last exception: at
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$retryOrTimeout$2.apply(KinesisBackedBlockRDD.scala:288)
at scala.Option.getOrElse(Option.scala:121) at
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:282)
at
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecordsAndNextKinesisIterator(KinesisBackedBlockRDD.scala:223)
at
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecords(KinesisBackedBlockRDD.scala:207)
at
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:162)
at
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:133)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at
scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438) at
scala.collection.Iterator$class.foreach(Iterator.scala:893) at
scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:509)
at
org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:333)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1954) at
org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
Caused by:
*{color:#ff0000}com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException:
Rate exceeded for shard shardId-000000000000 in stream rellfsstream-an under
account 1111111111111{color}.* (Service: AmazonKinesis; Status Code: 400; Error
Code: ProvisionedThroughputExceededException; Request ID:
d3520677-060e-14c4-8014-2886b6b75f03) at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1587)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1257)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1029)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:741)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:715)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:697)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:665)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:647)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:511) at
com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2219)
at
com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2195)
at
com.amazonaws.services.kinesis.AmazonKinesisClient.executeGetRecords(AmazonKinesisClient.java:1004)
at
com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:980)
at
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$2.apply(KinesisBackedBlockRDD.scala:224)
at
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$2.apply(KinesisBackedBlockRDD.scala:224)
at
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:269)
{panel}
After check the source code, we found it calls getBlockFromKinesis() to recover
data and in this function it accesses Kinesis directly to read data. As all
partitions in the BlockRDD will access Kinesis, and AWS Kinesis only supports 5
concurrency reads per shard per second, it will touch
ProvisionedThroughputExceededException easily. Even the code does some retries,
it's still easy to fail when conflicts is heavy.
{code:java}
// KinesisBackedBlockRDD.scala
def getBlockFromKinesis(): Iterator[T] = {
val credentials = kinesisCreds.provider.getCredentials
partition.seqNumberRanges.ranges.iterator.flatMap { range =>
new KinesisSequenceRangeIterator(credentials, endpointUrl, regionName,
range, kinesisReadConfigs).map(messageHandler)
}
}
if (partition.isBlockIdValid) {
getBlockFromBlockManager().getOrElse { getBlockFromKinesis() }
} else {
getBlockFromKinesis()
}
{code}
Why do we need to re-read data from Kinesis directly? Is there any way to avoid
it under the current design?
Mostly, when we use Spark streaming, we will enable WAL. Then we can recover
data from WAL, instead of re-reading from Kinesis directly.
I'd like to make a change for this. It may not be a perfect fix, but at least
we can provide a choice to avoid this problem under current design.
was:
We're using Spark streaming to consume Kinesis data, and found that it reads
more data from Kinesis and is easy to touch
ProvisionedThroughputExceededException *when it recovers from streaming
checkpoint*.
Normally, it's a WARN in spark log. But when we have multiple streaming
applications (i.e., 5 applications) to consume the same Kinesis stream, the
situation becomes serious. *The application will fail to recover due to the
following exception in driver.* And one application failure will also affect
the other running applications.
{panel:title=Exception}
org.apache.spark.SparkException: Job aborted due to stage failure:
{color:#FF0000}*Task 5 in stage 7.0 failed 4 times, most recent
failure*:{color} Lost task 5.3 in stage 7.0 (TID 128,
ip-172-31-14-36.ap-northeast-1.compute.internal, executor 1):
org.apache.spark.SparkException: Gave up after 3 retries while getting records
using shard iterator, last exception: at
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$retryOrTimeout$2.apply(KinesisBackedBlockRDD.scala:288)
at scala.Option.getOrElse(Option.scala:121) at
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:282)
at
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecordsAndNextKinesisIterator(KinesisBackedBlockRDD.scala:223)
at
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecords(KinesisBackedBlockRDD.scala:207)
at
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:162)
at
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:133)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at
scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438) at
scala.collection.Iterator$class.foreach(Iterator.scala:893) at
scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:509)
at
org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:333)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1954) at
org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
Caused by:
*{color:#FF0000}com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException:
Rate exceeded for shard shardId-000000000000 in stream rellfsstream-an under
account 1111111111111{color}.* (Service: AmazonKinesis; Status Code: 400; Error
Code: ProvisionedThroughputExceededException; Request ID:
d3520677-060e-14c4-8014-2886b6b75f03) at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1587)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1257)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1029)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:741)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:715)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:697)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:665)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:647)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:511) at
com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2219)
at
com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2195)
at
com.amazonaws.services.kinesis.AmazonKinesisClient.executeGetRecords(AmazonKinesisClient.java:1004)
at
com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:980)
at
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$2.apply(KinesisBackedBlockRDD.scala:224)
at
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$2.apply(KinesisBackedBlockRDD.scala:224)
at
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:269)
{panel}
After check the source code, we found it calls getBlockFromKinesis() to recover
data and in this function it accesses Kinesis directly to read data. As all
partitions in the BlockRDD will access Kinesis, and AWS Kinesis only supports 5
concurrency reads per shard per second, it will touch
ProvisionedThroughputExceededException easily. Even the code does some retries,
it's still easy to fail when conflicts is heavy.
{code:java}
// KinesisBackedBlockRDD.scala
def getBlockFromKinesis(): Iterator[T] = {
val credentials = kinesisCreds.provider.getCredentials
partition.seqNumberRanges.ranges.iterator.flatMap { range =>
new KinesisSequenceRangeIterator(credentials, endpointUrl, regionName,
range, kinesisReadConfigs).map(messageHandler)
}
}
if (partition.isBlockIdValid) {
getBlockFromBlockManager().getOrElse { getBlockFromKinesis() }
} else {
getBlockFromKinesis()
}
{code}
Why do we need to re-read data from Kinesis directly? Is there any way to avoid
it under the current design?
Mostly, when we use Spark streaming, we will enable WAL. Then we can recover
data from WAL, instead of re-reading from Kinesis directly.
I'd like to make a change for this. It may not be a perfect fix, but at least
we can provide a choice to avoid this problem under current design.
> Spark Kinesis streaming application fails to recover from streaming
> checkpoint due to ProvisionedThroughputExceededException
> ----------------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-24970
> URL: https://issues.apache.org/jira/browse/SPARK-24970
> Project: Spark
> Issue Type: Bug
> Components: DStreams
> Affects Versions: 2.2.0
> Reporter: bruce_zhao
> Priority: Major
> Labels: kinesis
>
>
> We're using Spark streaming to consume Kinesis data, and found that it reads
> more data from Kinesis and is easy to touch
> ProvisionedThroughputExceededException *when it recovers from streaming
> checkpoint*.
>
> Normally, it's a WARN in spark log. But when we have multiple streaming
> applications (i.e., 5 applications) to consume the same Kinesis stream, the
> situation becomes serious. *The application will fail to recover due to the
> following exception in driver.* And one application failure will also affect
> the other running applications.
>
>
> {panel:title=Exception}
> org.apache.spark.SparkException: Job aborted due to stage failure:
> {color:#ff0000}*Task 5 in stage 7.0 failed 4 times, most recent
> failure*:{color} Lost task 5.3 in stage 7.0 (TID 128,
> ip-172-31-14-36.ap-northeast-1.compute.internal, executor 1):
> org.apache.spark.SparkException: Gave up after 3 retries while getting
> records using shard iterator, last exception: at
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$retryOrTimeout$2.apply(KinesisBackedBlockRDD.scala:288)
> at scala.Option.getOrElse(Option.scala:121) at
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:282)
> at
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecordsAndNextKinesisIterator(KinesisBackedBlockRDD.scala:223)
> at
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecords(KinesisBackedBlockRDD.scala:207)
> at
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:162)
> at
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:133)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at
> scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438) at
> scala.collection.Iterator$class.foreach(Iterator.scala:893) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at
> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:509)
> at
> org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:333)
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1954) at
> org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
> Caused by:
> *{color:#ff0000}com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException:
> Rate exceeded for shard shardId-000000000000 in stream rellfsstream-an under
> account 1111111111111{color}.* (Service: AmazonKinesis; Status Code: 400;
> Error Code: ProvisionedThroughputExceededException; Request ID:
> d3520677-060e-14c4-8014-2886b6b75f03) at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1587)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1257)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1029)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:741)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:715)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:697)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:665)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:647)
> at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:511) at
> com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2219)
> at
> com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2195)
> at
> com.amazonaws.services.kinesis.AmazonKinesisClient.executeGetRecords(AmazonKinesisClient.java:1004)
> at
> com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:980)
> at
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$2.apply(KinesisBackedBlockRDD.scala:224)
> at
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$2.apply(KinesisBackedBlockRDD.scala:224)
> at
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:269)
> {panel}
>
>
> After check the source code, we found it calls getBlockFromKinesis() to
> recover data and in this function it accesses Kinesis directly to read data.
> As all partitions in the BlockRDD will access Kinesis, and AWS Kinesis only
> supports 5 concurrency reads per shard per second, it will touch
> ProvisionedThroughputExceededException easily. Even the code does some
> retries, it's still easy to fail when conflicts is heavy.
>
>
> {code:java}
> // KinesisBackedBlockRDD.scala
> def getBlockFromKinesis(): Iterator[T] = {
> val credentials = kinesisCreds.provider.getCredentials
> partition.seqNumberRanges.ranges.iterator.flatMap { range =>
> new KinesisSequenceRangeIterator(credentials, endpointUrl, regionName,
> range, kinesisReadConfigs).map(messageHandler)
> }
> }
> if (partition.isBlockIdValid) {
> getBlockFromBlockManager().getOrElse { getBlockFromKinesis() }
> } else {
> getBlockFromKinesis()
> }
> {code}
>
>
>
> Why do we need to re-read data from Kinesis directly? Is there any way to
> avoid it under the current design?
> Mostly, when we use Spark streaming, we will enable WAL. Then we can recover
> data from WAL, instead of re-reading from Kinesis directly.
> I'd like to make a change for this. It may not be a perfect fix, but at least
> we can provide a choice to avoid this problem under current design.
>
>
>
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]