Yonathan Randolph created SPARK-18020:
-----------------------------------------
Summary: Kinesis receiver does not snapshot when shard completes
Key: SPARK-18020
URL: https://issues.apache.org/jira/browse/SPARK-18020
Project: Spark
Issue Type: Bug
Components: Streaming
Affects Versions: 2.0.0
Reporter: Yonathan Randolph
Priority: Minor
When a kinesis shard is split or combined and the old shard ends, the Kinesis
Client Library [calls
IRecordProcessor.shutdown|https://github.com/awslabs/amazon-kinesis-client/blob/v1.7.0/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java#L100]
and expects that {{IRecordProcessor.shutdown}} must checkpoint the sequence
number {{ExtendedSequenceNumber.SHARD_END}} before returning. Unfortunately,
spark’s KinesisRecordProcessor sometimes does not checkpoint SHARD_END. This
results in an error message, and spark is then blocked indefinitely from
processing any items from the child shards.
This issue has also been raised on StackOverflow: [resharding while spark
running on kinesis
stream|http://stackoverflow.com/questions/38898691/resharding-while-spark-running-on-kinesis-stream]
Exception that is logged:
{code}
16/10/19 19:37:49 ERROR worker.ShutdownTask: Application exception.
java.lang.IllegalArgumentException: Application didn't checkpoint at end of
shard shardId-000000000030
at
com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownTask.call(ShutdownTask.java:106)
at
com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49)
at
com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{code}
Command used to split shard:
{code}
aws kinesis --region us-west-1 split-shard --stream-name my-stream
--shard-to-split shardId-000000000030 --new-starting-hash-key
5316911983139663491615228241121378303
{code}
After the spark-streaming job has hung, examining the DynamoDB table indicates
that the parent shard processor has not reached
{{ExtendedSequenceNumber.SHARD_END}} and the child shards are still at
{{ExtendedSequenceNumber.TRIM_HORIZON}} waiting for the parent to finish:
{code}
aws kinesis --region us-west-1 describe-stream --stream-name my-stream
\{
"StreamDescription": \{
"RetentionPeriodHours": 24,
"StreamName": "my-stream",
"Shards": [
\{
"ShardId": "shardId-000000000030",
"HashKeyRange": \{
"EndingHashKey": "10633823966279326983230456482242756606",
"StartingHashKey": "0"
\},
...
\},
\{
"ShardId": "shardId-000000000062",
"HashKeyRange": \{
"EndingHashKey": "5316911983139663491615228241121378302",
"StartingHashKey": "0"
\},
"ParentShardId": "shardId-000000000030",
"SequenceNumberRange": \{
"StartingSequenceNumber":
"49566806087883755242230188435465744452396445937434624994"
\}
\},
\{
"ShardId": "shardId-000000000063",
"HashKeyRange": \{
"EndingHashKey": "10633823966279326983230456482242756606",
"StartingHashKey": "5316911983139663491615228241121378303"
\},
"ParentShardId": "shardId-000000000030",
"SequenceNumberRange": \{
"StartingSequenceNumber":
"49566806087906055987428719058607280170669094298940605426"
\}
\},
...
],
"StreamStatus": "ACTIVE"
\}
\}
aws dynamodb --region us-west-1 scan --table-name my-processor
\{
"Items": [
\{
"leaseOwner": \{
"S": "localhost:fd385c95-5d19-4678-926f-b6d5f5503cbe"
\},
"leaseCounter": \{
"N": "49318"
\},
"ownerSwitchesSinceCheckpoint": \{
"N": "62"
\},
"checkpointSubSequenceNumber": \{
"N": "0"
\},
"checkpoint": \{
"S": "49566573572821264975247582655142547856950135436343247330"
\},
"parentShardId": \{
"SS": [
"shardId-000000000014"
]
\},
"leaseKey": \{
"S": "shardId-000000000030"
\}
\},
\{
"leaseOwner": \{
"S": "localhost:ca44dc83-2580-4bf3-903f-e7ccc8a3ab02"
\},
"leaseCounter": \{
"N": "25439"
\},
"ownerSwitchesSinceCheckpoint": \{
"N": "69"
\},
"checkpointSubSequenceNumber": \{
"N": "0"
\},
"checkpoint": \{
"S": "TRIM_HORIZON"
\},
"parentShardId": \{
"SS": [
"shardId-000000000030"
]
\},
"leaseKey": \{
"S": "shardId-000000000062"
\}
\},
\{
"leaseOwner": \{
"S": "localhost:94bf603f-780b-4121-87a4-bdf501723f83"
\},
"leaseCounter": \{
"N": "25443"
\},
"ownerSwitchesSinceCheckpoint": \{
"N": "59"
\},
"checkpointSubSequenceNumber": \{
"N": "0"
\},
"checkpoint": \{
"S": "TRIM_HORIZON"
\},
"parentShardId": \{
"SS": [
"shardId-000000000030"
]
\},
"leaseKey": \{
"S": "shardId-000000000063"
\}
\},
...
]
\}
{code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]