[
https://issues.apache.org/jira/browse/SPARK-25721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Zhaobo Yu updated SPARK-25721:
------------------------------
Description:
In the onStart() function of KinesisReceiver class, the
KinesisClientLibConfiguration object is initialized in the following way,
val kinesisClientLibConfiguration = new KinesisClientLibConfiguration(
checkpointAppName,
streamName,
kinesisProvider,
dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider),
cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider),
workerId)
.withKinesisEndpoint(endpointUrl)
.withInitialPositionInStream(initialPositionInStream)
.withTaskBackoffTimeMillis(500)
.withRegionName(regionName)
As you can see there is no withMaxRecords() in initialization, so
KinesisClientLibConfiguration will set it to 10000 by default since it has been
hard coded as this way,
public static final int DEFAULT_MAX_RECORDS = 10000;
In such a case, the receiver will not fulfill any maxRate setting we set if
it's less than 10k, worse still, it will cause
ProvisionedThroughputExceededException from Kinesis, especially when we restart
the streaming application.
Attached rate_violation.png, we have a spark streaming application that has 40
receivers, which is set to consume 1 record per second. Within 5 minutes the
spark streaming application should take no more than 12k records/5 minutes
(40*60*5 = 12k), but cloudwatch metrics shows it was consuming more than that,
which is almost at the rate of 22k records/5 minutes.
was:
In the onStart() function of KinesisReceiver class, the
KinesisClientLibConfiguration object is initialized in the following way,
val kinesisClientLibConfiguration = new KinesisClientLibConfiguration(
checkpointAppName,
streamName,
kinesisProvider,
dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider),
cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider),
workerId)
.withKinesisEndpoint(endpointUrl)
.withInitialPositionInStream(initialPositionInStream)
.withTaskBackoffTimeMillis(500)
.withRegionName(regionName)
As you can see there is no withMaxRecords() in initialization, so
KinesisClientLibConfiguration will set it to 10000 by default since it has been
hard coded as this way,
public static final int DEFAULT_MAX_RECORDS = 10000;
In such a case, the receiver will not fulfill any maxRate setting we set, worse
still, it will cause ProvisionedThroughputExceededException from Kinesis,
especially when we restart the streaming application.
Attached rate_violation.png, we have a spark streaming application that has 40
receivers, which is set to consume 1 record per second. Within 5 minutes the
spark streaming application should take no more than 12k records/5 minutes
(40*60*5 = 12k), but cloudwatch metrics shows it was consuming more than that,
which is almost at the rate of 22k records/5 minutes.
> maxRate configuration not being used in Kinesis receiver
> --------------------------------------------------------
>
> Key: SPARK-25721
> URL: https://issues.apache.org/jira/browse/SPARK-25721
> Project: Spark
> Issue Type: Bug
> Components: DStreams
> Affects Versions: 2.2.0
> Reporter: Zhaobo Yu
> Priority: Major
> Attachments: rate_violation.png
>
>
> In the onStart() function of KinesisReceiver class, the
> KinesisClientLibConfiguration object is initialized in the following way,
> val kinesisClientLibConfiguration = new KinesisClientLibConfiguration(
> checkpointAppName,
> streamName,
> kinesisProvider,
> dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider),
> cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider),
> workerId)
> .withKinesisEndpoint(endpointUrl)
> .withInitialPositionInStream(initialPositionInStream)
> .withTaskBackoffTimeMillis(500)
> .withRegionName(regionName)
>
> As you can see there is no withMaxRecords() in initialization, so
> KinesisClientLibConfiguration will set it to 10000 by default since it has
> been hard coded as this way,
> public static final int DEFAULT_MAX_RECORDS = 10000;
> In such a case, the receiver will not fulfill any maxRate setting we set if
> it's less than 10k, worse still, it will cause
> ProvisionedThroughputExceededException from Kinesis, especially when we
> restart the streaming application.
>
> Attached rate_violation.png, we have a spark streaming application that has
> 40 receivers, which is set to consume 1 record per second. Within 5 minutes
> the spark streaming application should take no more than 12k records/5
> minutes (40*60*5 = 12k), but cloudwatch metrics shows it was consuming more
> than that, which is almost at the rate of 22k records/5 minutes.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]