[ 
https://issues.apache.org/jira/browse/SPARK-18020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yonathan Randolph updated SPARK-18020:
--------------------------------------
    Description: 
When a kinesis shard is split or combined and the old shard ends, the Kinesis 
Client Library [calls 
IRecordProcessor.shutdown|https://github.com/awslabs/amazon-kinesis-client/blob/v1.7.0/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java#L100]
 and expects that {{IRecordProcessor.shutdown}} must checkpoint the sequence 
number {{ExtendedSequenceNumber.SHARD_END}} before returning. Unfortunately, 
spark’s KinesisRecordProcessor sometimes does not checkpoint SHARD_END. This 
results in an error message, and spark is then blocked indefinitely from 
processing any items from the child shards.

This issue has also been raised on StackOverflow: [resharding while spark 
running on kinesis 
stream|http://stackoverflow.com/questions/38898691/resharding-while-spark-running-on-kinesis-stream]

Exception that is logged:
{code}
16/10/19 19:37:49 ERROR worker.ShutdownTask: Application exception. 
java.lang.IllegalArgumentException: Application didn't checkpoint at end of 
shard shardId-000000000030
        at 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownTask.call(ShutdownTask.java:106)
        at 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49)
        at 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
{code}

Command used to split shard:

{code}
aws kinesis --region us-west-1 split-shard --stream-name my-stream 
--shard-to-split shardId-000000000030 --new-starting-hash-key 
5316911983139663491615228241121378303
{code}

After the spark-streaming job has hung, examining the DynamoDB table indicates 
that the parent shard processor has not reached 
{{ExtendedSequenceNumber.SHARD_END}} and the child shards are still at 
{{ExtendedSequenceNumber.TRIM_HORIZON}} waiting for the parent to finish:

{code}
aws kinesis --region us-west-1 describe-stream --stream-name my-stream
{
    "StreamDescription": {
        "RetentionPeriodHours": 24, 
        "StreamName": "my-stream", 
        "Shards": [
            {
                "ShardId": "shardId-000000000030", 
                "HashKeyRange": {
                    "EndingHashKey": "10633823966279326983230456482242756606", 
                    "StartingHashKey": "0"
                },
                ...
            }, 
            {
                "ShardId": "shardId-000000000062", 
                "HashKeyRange": {
                    "EndingHashKey": "5316911983139663491615228241121378302", 
                    "StartingHashKey": "0"
                }, 
                "ParentShardId": "shardId-000000000030", 
                "SequenceNumberRange": {
                    "StartingSequenceNumber": 
"49566806087883755242230188435465744452396445937434624994"
                }
            }, 
            {
                "ShardId": "shardId-000000000063", 
                "HashKeyRange": {
                    "EndingHashKey": "10633823966279326983230456482242756606", 
                    "StartingHashKey": "5316911983139663491615228241121378303"
                }, 
                "ParentShardId": "shardId-000000000030", 
                "SequenceNumberRange": {
                    "StartingSequenceNumber": 
"49566806087906055987428719058607280170669094298940605426"
                }
            },
            ...
        ],
        "StreamStatus": "ACTIVE"
    }
}

aws dynamodb --region us-west-1 scan --table-name my-processor
{
    "Items": [
        {
            "leaseOwner": {
                "S": "localhost:fd385c95-5d19-4678-926f-b6d5f5503cbe"
            }, 
            "leaseCounter": {
                "N": "49318"
            }, 
            "ownerSwitchesSinceCheckpoint": {
                "N": "62"
            }, 
            "checkpointSubSequenceNumber": {
                "N": "0"
            }, 
            "checkpoint": {
                "S": "49566573572821264975247582655142547856950135436343247330"
            }, 
            "parentShardId": {
                "SS": [
                    "shardId-000000000014"
                ]
            }, 
            "leaseKey": {
                "S": "shardId-000000000030"
            }
        }, 
        {
            "leaseOwner": {
                "S": "localhost:ca44dc83-2580-4bf3-903f-e7ccc8a3ab02"
            }, 
            "leaseCounter": {
                "N": "25439"
            }, 
            "ownerSwitchesSinceCheckpoint": {
                "N": "69"
            }, 
            "checkpointSubSequenceNumber": {
                "N": "0"
            }, 
            "checkpoint": {
                "S": "TRIM_HORIZON"
            }, 
            "parentShardId": {
                "SS": [
                    "shardId-000000000030"
                ]
            }, 
            "leaseKey": {
                "S": "shardId-000000000062"
            }
        }, 
        {
            "leaseOwner": {
                "S": "localhost:94bf603f-780b-4121-87a4-bdf501723f83"
            }, 
            "leaseCounter": {
                "N": "25443"
            }, 
            "ownerSwitchesSinceCheckpoint": {
                "N": "59"
            }, 
            "checkpointSubSequenceNumber": {
                "N": "0"
            }, 
            "checkpoint": {
                "S": "TRIM_HORIZON"
            }, 
            "parentShardId": {
                "SS": [
                    "shardId-000000000030"
                ]
            }, 
            "leaseKey": {
                "S": "shardId-000000000063"
            }
        },
        ...
    ]
}
{code}

Workaround: I manually edited the DynamoDB table to delete the checkpoints for 
the parent shards. The child shards were then able to begin processing. I’m not 
sure whether this resulted in a few items being lost though.

  was:
When a kinesis shard is split or combined and the old shard ends, the Kinesis 
Client Library [calls 
IRecordProcessor.shutdown|https://github.com/awslabs/amazon-kinesis-client/blob/v1.7.0/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java#L100]
 and expects that {{IRecordProcessor.shutdown}} must checkpoint the sequence 
number {{ExtendedSequenceNumber.SHARD_END}} before returning. Unfortunately, 
spark’s KinesisRecordProcessor sometimes does not checkpoint SHARD_END. This 
results in an error message, and spark is then blocked indefinitely from 
processing any items from the child shards.

This issue has also been raised on StackOverflow: [resharding while spark 
running on kinesis 
stream|http://stackoverflow.com/questions/38898691/resharding-while-spark-running-on-kinesis-stream]

Exception that is logged:
{code}
16/10/19 19:37:49 ERROR worker.ShutdownTask: Application exception. 
java.lang.IllegalArgumentException: Application didn't checkpoint at end of 
shard shardId-000000000030
        at 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownTask.call(ShutdownTask.java:106)
        at 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49)
        at 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
{code}

Command used to split shard:

{code}
aws kinesis --region us-west-1 split-shard --stream-name my-stream 
--shard-to-split shardId-000000000030 --new-starting-hash-key 
5316911983139663491615228241121378303
{code}

After the spark-streaming job has hung, examining the DynamoDB table indicates 
that the parent shard processor has not reached 
{{ExtendedSequenceNumber.SHARD_END}} and the child shards are still at 
{{ExtendedSequenceNumber.TRIM_HORIZON}} waiting for the parent to finish:

{code}
aws kinesis --region us-west-1 describe-stream --stream-name my-stream
\{
    "StreamDescription": \{
        "RetentionPeriodHours": 24, 
        "StreamName": "my-stream", 
        "Shards": [
            \{
                "ShardId": "shardId-000000000030", 
                "HashKeyRange": \{
                    "EndingHashKey": "10633823966279326983230456482242756606", 
                    "StartingHashKey": "0"
                \},
                ...
            \}, 
            \{
                "ShardId": "shardId-000000000062", 
                "HashKeyRange": \{
                    "EndingHashKey": "5316911983139663491615228241121378302", 
                    "StartingHashKey": "0"
                \}, 
                "ParentShardId": "shardId-000000000030", 
                "SequenceNumberRange": \{
                    "StartingSequenceNumber": 
"49566806087883755242230188435465744452396445937434624994"
                \}
            \}, 
            \{
                "ShardId": "shardId-000000000063", 
                "HashKeyRange": \{
                    "EndingHashKey": "10633823966279326983230456482242756606", 
                    "StartingHashKey": "5316911983139663491615228241121378303"
                \}, 
                "ParentShardId": "shardId-000000000030", 
                "SequenceNumberRange": \{
                    "StartingSequenceNumber": 
"49566806087906055987428719058607280170669094298940605426"
                \}
            \},
            ...
        ],
        "StreamStatus": "ACTIVE"
    \}
\}

aws dynamodb --region us-west-1 scan --table-name my-processor
\{
    "Items": [
        \{
            "leaseOwner": \{
                "S": "localhost:fd385c95-5d19-4678-926f-b6d5f5503cbe"
            \}, 
            "leaseCounter": \{
                "N": "49318"
            \}, 
            "ownerSwitchesSinceCheckpoint": \{
                "N": "62"
            \}, 
            "checkpointSubSequenceNumber": \{
                "N": "0"
            \}, 
            "checkpoint": \{
                "S": "49566573572821264975247582655142547856950135436343247330"
            \}, 
            "parentShardId": \{
                "SS": [
                    "shardId-000000000014"
                ]
            \}, 
            "leaseKey": \{
                "S": "shardId-000000000030"
            \}
        \}, 
        \{
            "leaseOwner": \{
                "S": "localhost:ca44dc83-2580-4bf3-903f-e7ccc8a3ab02"
            \}, 
            "leaseCounter": \{
                "N": "25439"
            \}, 
            "ownerSwitchesSinceCheckpoint": \{
                "N": "69"
            \}, 
            "checkpointSubSequenceNumber": \{
                "N": "0"
            \}, 
            "checkpoint": \{
                "S": "TRIM_HORIZON"
            \}, 
            "parentShardId": \{
                "SS": [
                    "shardId-000000000030"
                ]
            \}, 
            "leaseKey": \{
                "S": "shardId-000000000062"
            \}
        \}, 
        \{
            "leaseOwner": \{
                "S": "localhost:94bf603f-780b-4121-87a4-bdf501723f83"
            \}, 
            "leaseCounter": \{
                "N": "25443"
            \}, 
            "ownerSwitchesSinceCheckpoint": \{
                "N": "59"
            \}, 
            "checkpointSubSequenceNumber": \{
                "N": "0"
            \}, 
            "checkpoint": \{
                "S": "TRIM_HORIZON"
            \}, 
            "parentShardId": \{
                "SS": [
                    "shardId-000000000030"
                ]
            \}, 
            "leaseKey": \{
                "S": "shardId-000000000063"
            \}
        \},
        ...
    ]
\}
{code}



> Kinesis receiver does not snapshot when shard completes
> -------------------------------------------------------
>
>                 Key: SPARK-18020
>                 URL: https://issues.apache.org/jira/browse/SPARK-18020
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 2.0.0
>            Reporter: Yonathan Randolph
>            Priority: Minor
>              Labels: kinesis
>
> When a kinesis shard is split or combined and the old shard ends, the Kinesis 
> Client Library [calls 
> IRecordProcessor.shutdown|https://github.com/awslabs/amazon-kinesis-client/blob/v1.7.0/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java#L100]
>  and expects that {{IRecordProcessor.shutdown}} must checkpoint the sequence 
> number {{ExtendedSequenceNumber.SHARD_END}} before returning. Unfortunately, 
> spark’s KinesisRecordProcessor sometimes does not checkpoint SHARD_END. This 
> results in an error message, and spark is then blocked indefinitely from 
> processing any items from the child shards.
> This issue has also been raised on StackOverflow: [resharding while spark 
> running on kinesis 
> stream|http://stackoverflow.com/questions/38898691/resharding-while-spark-running-on-kinesis-stream]
> Exception that is logged:
> {code}
> 16/10/19 19:37:49 ERROR worker.ShutdownTask: Application exception. 
> java.lang.IllegalArgumentException: Application didn't checkpoint at end of 
> shard shardId-000000000030
>         at 
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownTask.call(ShutdownTask.java:106)
>         at 
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49)
>         at 
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> {code}
> Command used to split shard:
> {code}
> aws kinesis --region us-west-1 split-shard --stream-name my-stream 
> --shard-to-split shardId-000000000030 --new-starting-hash-key 
> 5316911983139663491615228241121378303
> {code}
> After the spark-streaming job has hung, examining the DynamoDB table 
> indicates that the parent shard processor has not reached 
> {{ExtendedSequenceNumber.SHARD_END}} and the child shards are still at 
> {{ExtendedSequenceNumber.TRIM_HORIZON}} waiting for the parent to finish:
> {code}
> aws kinesis --region us-west-1 describe-stream --stream-name my-stream
> {
>     "StreamDescription": {
>         "RetentionPeriodHours": 24, 
>         "StreamName": "my-stream", 
>         "Shards": [
>             {
>                 "ShardId": "shardId-000000000030", 
>                 "HashKeyRange": {
>                     "EndingHashKey": 
> "10633823966279326983230456482242756606", 
>                     "StartingHashKey": "0"
>                 },
>                 ...
>             }, 
>             {
>                 "ShardId": "shardId-000000000062", 
>                 "HashKeyRange": {
>                     "EndingHashKey": "5316911983139663491615228241121378302", 
>                     "StartingHashKey": "0"
>                 }, 
>                 "ParentShardId": "shardId-000000000030", 
>                 "SequenceNumberRange": {
>                     "StartingSequenceNumber": 
> "49566806087883755242230188435465744452396445937434624994"
>                 }
>             }, 
>             {
>                 "ShardId": "shardId-000000000063", 
>                 "HashKeyRange": {
>                     "EndingHashKey": 
> "10633823966279326983230456482242756606", 
>                     "StartingHashKey": "5316911983139663491615228241121378303"
>                 }, 
>                 "ParentShardId": "shardId-000000000030", 
>                 "SequenceNumberRange": {
>                     "StartingSequenceNumber": 
> "49566806087906055987428719058607280170669094298940605426"
>                 }
>             },
>             ...
>         ],
>         "StreamStatus": "ACTIVE"
>     }
> }
> aws dynamodb --region us-west-1 scan --table-name my-processor
> {
>     "Items": [
>         {
>             "leaseOwner": {
>                 "S": "localhost:fd385c95-5d19-4678-926f-b6d5f5503cbe"
>             }, 
>             "leaseCounter": {
>                 "N": "49318"
>             }, 
>             "ownerSwitchesSinceCheckpoint": {
>                 "N": "62"
>             }, 
>             "checkpointSubSequenceNumber": {
>                 "N": "0"
>             }, 
>             "checkpoint": {
>                 "S": 
> "49566573572821264975247582655142547856950135436343247330"
>             }, 
>             "parentShardId": {
>                 "SS": [
>                     "shardId-000000000014"
>                 ]
>             }, 
>             "leaseKey": {
>                 "S": "shardId-000000000030"
>             }
>         }, 
>         {
>             "leaseOwner": {
>                 "S": "localhost:ca44dc83-2580-4bf3-903f-e7ccc8a3ab02"
>             }, 
>             "leaseCounter": {
>                 "N": "25439"
>             }, 
>             "ownerSwitchesSinceCheckpoint": {
>                 "N": "69"
>             }, 
>             "checkpointSubSequenceNumber": {
>                 "N": "0"
>             }, 
>             "checkpoint": {
>                 "S": "TRIM_HORIZON"
>             }, 
>             "parentShardId": {
>                 "SS": [
>                     "shardId-000000000030"
>                 ]
>             }, 
>             "leaseKey": {
>                 "S": "shardId-000000000062"
>             }
>         }, 
>         {
>             "leaseOwner": {
>                 "S": "localhost:94bf603f-780b-4121-87a4-bdf501723f83"
>             }, 
>             "leaseCounter": {
>                 "N": "25443"
>             }, 
>             "ownerSwitchesSinceCheckpoint": {
>                 "N": "59"
>             }, 
>             "checkpointSubSequenceNumber": {
>                 "N": "0"
>             }, 
>             "checkpoint": {
>                 "S": "TRIM_HORIZON"
>             }, 
>             "parentShardId": {
>                 "SS": [
>                     "shardId-000000000030"
>                 ]
>             }, 
>             "leaseKey": {
>                 "S": "shardId-000000000063"
>             }
>         },
>         ...
>     ]
> }
> {code}
> Workaround: I manually edited the DynamoDB table to delete the checkpoints 
> for the parent shards. The child shards were then able to begin processing. 
> I’m not sure whether this resulted in a few items being lost though.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to