[
https://issues.apache.org/jira/browse/SPARK-18020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15719774#comment-15719774
]
Takeshi Yamamuro commented on SPARK-18020:
------------------------------------------
I tried to checkpoint with SHARD_END by
`IRecordProcessorCheckpointer#checkpoint(ExtendedSequenceNumber.SHARD_END.toString())`,
but I got the IllegalArgumentException exception with messages like "Sequence
number must be numeric, but was SHARD_END". Since I'm not sure this is a
expected behaviour, I ask this to aws guys here
https://forums.aws.amazon.com/thread.jspa?threadID=244218.
> Kinesis receiver does not snapshot when shard completes
> -------------------------------------------------------
>
> Key: SPARK-18020
> URL: https://issues.apache.org/jira/browse/SPARK-18020
> Project: Spark
> Issue Type: Bug
> Components: DStreams
> Affects Versions: 2.0.0
> Reporter: Yonathan Randolph
> Priority: Minor
> Labels: kinesis
>
> When a kinesis shard is split or combined and the old shard ends, the Amazon
> Kinesis Client library [calls
> IRecordProcessor.shutdown|https://github.com/awslabs/amazon-kinesis-client/blob/v1.7.0/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java#L100]
> and expects that {{IRecordProcessor.shutdown}} must checkpoint the sequence
> number {{ExtendedSequenceNumber.SHARD_END}} before returning. Unfortunately,
> spark’s
> [KinesisRecordProcessor|https://github.com/apache/spark/blob/v2.0.1/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala]
> sometimes does not checkpoint SHARD_END. This results in an error message,
> and spark is then blocked indefinitely from processing any items from the
> child shards.
> This issue has also been raised on StackOverflow: [resharding while spark
> running on kinesis
> stream|http://stackoverflow.com/questions/38898691/resharding-while-spark-running-on-kinesis-stream]
> Exception that is logged:
> {code}
> 16/10/19 19:37:49 ERROR worker.ShutdownTask: Application exception.
> java.lang.IllegalArgumentException: Application didn't checkpoint at end of
> shard shardId-000000000030
> at
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownTask.call(ShutdownTask.java:106)
> at
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49)
> at
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> Command used to split shard:
> {code}
> aws kinesis --region us-west-1 split-shard --stream-name my-stream
> --shard-to-split shardId-000000000030 --new-starting-hash-key
> 5316911983139663491615228241121378303
> {code}
> After the spark-streaming job has hung, examining the DynamoDB table
> indicates that the parent shard processor has not reached
> {{ExtendedSequenceNumber.SHARD_END}} and the child shards are still at
> {{ExtendedSequenceNumber.TRIM_HORIZON}} waiting for the parent to finish:
> {code}
> aws kinesis --region us-west-1 describe-stream --stream-name my-stream
> {
> "StreamDescription": {
> "RetentionPeriodHours": 24,
> "StreamName": "my-stream",
> "Shards": [
> {
> "ShardId": "shardId-000000000030",
> "HashKeyRange": {
> "EndingHashKey":
> "10633823966279326983230456482242756606",
> "StartingHashKey": "0"
> },
> ...
> },
> {
> "ShardId": "shardId-000000000062",
> "HashKeyRange": {
> "EndingHashKey": "5316911983139663491615228241121378302",
> "StartingHashKey": "0"
> },
> "ParentShardId": "shardId-000000000030",
> "SequenceNumberRange": {
> "StartingSequenceNumber":
> "49566806087883755242230188435465744452396445937434624994"
> }
> },
> {
> "ShardId": "shardId-000000000063",
> "HashKeyRange": {
> "EndingHashKey":
> "10633823966279326983230456482242756606",
> "StartingHashKey": "5316911983139663491615228241121378303"
> },
> "ParentShardId": "shardId-000000000030",
> "SequenceNumberRange": {
> "StartingSequenceNumber":
> "49566806087906055987428719058607280170669094298940605426"
> }
> },
> ...
> ],
> "StreamStatus": "ACTIVE"
> }
> }
> aws dynamodb --region us-west-1 scan --table-name my-processor
> {
> "Items": [
> {
> "leaseOwner": {
> "S": "localhost:fd385c95-5d19-4678-926f-b6d5f5503cbe"
> },
> "leaseCounter": {
> "N": "49318"
> },
> "ownerSwitchesSinceCheckpoint": {
> "N": "62"
> },
> "checkpointSubSequenceNumber": {
> "N": "0"
> },
> "checkpoint": {
> "S":
> "49566573572821264975247582655142547856950135436343247330"
> },
> "parentShardId": {
> "SS": [
> "shardId-000000000014"
> ]
> },
> "leaseKey": {
> "S": "shardId-000000000030"
> }
> },
> {
> "leaseOwner": {
> "S": "localhost:ca44dc83-2580-4bf3-903f-e7ccc8a3ab02"
> },
> "leaseCounter": {
> "N": "25439"
> },
> "ownerSwitchesSinceCheckpoint": {
> "N": "69"
> },
> "checkpointSubSequenceNumber": {
> "N": "0"
> },
> "checkpoint": {
> "S": "TRIM_HORIZON"
> },
> "parentShardId": {
> "SS": [
> "shardId-000000000030"
> ]
> },
> "leaseKey": {
> "S": "shardId-000000000062"
> }
> },
> {
> "leaseOwner": {
> "S": "localhost:94bf603f-780b-4121-87a4-bdf501723f83"
> },
> "leaseCounter": {
> "N": "25443"
> },
> "ownerSwitchesSinceCheckpoint": {
> "N": "59"
> },
> "checkpointSubSequenceNumber": {
> "N": "0"
> },
> "checkpoint": {
> "S": "TRIM_HORIZON"
> },
> "parentShardId": {
> "SS": [
> "shardId-000000000030"
> ]
> },
> "leaseKey": {
> "S": "shardId-000000000063"
> }
> },
> ...
> ]
> }
> {code}
> Workaround: I manually edited the DynamoDB table to delete the checkpoints
> for the parent shards. The child shards were then able to begin processing.
> I’m not sure whether this resulted in a few items being lost though.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]