[ 
https://issues.apache.org/jira/browse/CAMEL-16594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17415420#comment-17415420
 ] 

Pierre-Yves Bigourdan commented on CAMEL-16594:
-----------------------------------------------

> The current Camel ddbstream implementation seems to incorrectly apply the 
> concept of {{ShardIteratorType}} to the list of shards forming a DynamoDB 
> stream rather than each shard individually.

 

I've been thinking about this quite a bit lately. My first attempt is to keep 
support for all Camel URI parameters to ensure backwards compatibility, so I'm 
trying to extend the concept of {{ShardIteratorType}}, which in theory should 
only apply at a single shard level, to a tree of shards. The four possible 
values of {{ShardIteratorType}} are the following:
h5. LATEST

Those would be the leaves of the tree, i.e. the shards that have a 
{{StartingSequenceNumber}}, but no {{EndingSequenceNumber}} (shards 3, 4, 5 and 
6).

The behaviour of {{LATEST}} applied to a whole tree sounds reasonable in my 
opinion.
h5. TRIM_HORIZON

Those would be the root of tree, i.e. the shards that have no 
{{ParentShardId}}. I'm not entirely sure what would happen if DynamoDB decided 
to drop Shard 0 (or if that's even possible), so a slightly more general/safer 
definition would be either shards that have no {{ParentShardId}} or which have 
a {{ParentShardId}} that can no longer be found in the tree. In my above 
example, if DynamoDB decided to drop Shard 0, Shard 1 and 2 would be the roots 
of two small subtrees.

The behaviour of {{TRIM_HORIZON}} applied to a whole tree sounds reasonable in 
my opinion.
h5. AT_SEQUENCE_NUMBER

This is where things become more troublesome. Consider the attached JSON. Take 
sequence number 105800000000033207000000 for example. This sequence number 
definitely doesn't belong to Shards 0, 1 and 2.

If you apply the same logic as the current implementation 
([https://github.com/apache/camel/blob/6119fdc379db343030bd25b191ab88bbec34d6b6/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/ShardList.java#L106]),
 you would end up with shard 5 because it has the biggest 
{{StartingSequenceNumber}}. However, the sequence number we've picked, 
105800000000033207000000, is smaller than the {{StartingSequenceNumber}} of 
shard 5, 105800000000033207048658. The current logic is incorrect and will 
probably result in a 400 being returned by AWS.

If we refine our logic and look for a shard that has a 
{{StartingSequenceNumber}} smaller than the one we've picked, shards 3, 4, 6 
are all candidates. We've narrowed things down a little, but if we tried to 
iterate over the three shards with that target sequence number, we'd still 
likely get two 400 errors returned by AWS.

The most likely answer is shard 6: indeed, 105800000000033207000000 is greater 
than 105800000000025199618049, the {{StartingSequenceNumber}} of shard 6, and 
105800000000033207000000 is numerically the closest amongst the 
{{StartingSequenceNumbers}} of shards 3, 4 and 6.

We've come up with the following algorithm:
 * iterate through all non-leaf shards in the tree and try to find a shard that 
has a {{StartingSequenceNumber}} smaller and an {{EndingSequenceNumber}} 
greater than the target sequence number.
 * if none are found, iterate through leaf shards and pick the one that has a 
{{StartingSequenceNumber}} smaller than the target sequence number but 
numerically closest.

However, I don't think we can definitely say it will be shard 6 for sure:
 * what if the sequence number within shard 3 had suddenly jumped and had 
reached 105800000000033207000000? The [documentation of 
Kinesis|https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html#terminology],
 which I believe underpins DynamoDB streams, states that "sequence numbers for 
the same partition key generally increase over time". That is very vague, even 
comparing numbers within a given shard is an approximation, let alone across 
multiple shards.
 * what if the same sequence number is present in multiple shards? The Kinesis 
documentation does states that "each data record has a sequence number that is 
unique per partition-key within its shard". Uniqueness across multiple shards 
is not guaranteed.
 * even if we did manage to select the correct shard(s), users would never get 
future updates from the other three leaf shards. Is that really what they'd 
want?

I feel that anything we come up with in this area would be a potentially 
confusing heuristic, rather than a rigorous algorithm. Using 
{{AT_SEQUENCE_NUMBER}} really only makes sense if you provide the specific 
shard identifier with it (which the AWS CLI forces you to do).
h5. AFTER_SEQUENCE_NUMBER

Pretty much the same as AT_SEQUENCE_NUMBER.

 
----
 

Taking a step back, if I had the opportunity to redesign things from scratch, I 
would only allow Camel users to define two modes of iteration through a 
DynamoDB stream, {{FROM_LATEST}} and {{FROM_START}}, intentionally stepping 
away from the {{ShardIteratorType}} wording. You'd simply pick either the roots 
or leaves of the tree, and not let users select random sequence numbers which 
could belong to any shard.

> DynamoDB stream updates are missed when there are more than one active shards
> -----------------------------------------------------------------------------
>
>                 Key: CAMEL-16594
>                 URL: https://issues.apache.org/jira/browse/CAMEL-16594
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-aws
>            Reporter: Pierre-Yves Bigourdan
>            Assignee: Andrea Cosentino
>            Priority: Major
>         Attachments: shards.json
>
>
> The current Camel ddbstream implementation seems to incorrectly apply the 
> concept of {{ShardIteratorType}} to the list of shards forming a DynamoDB 
> stream rather than each shard individually.
> According to the [AWS 
> documentation|https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetShardIterator.html#DDB-streams_GetShardIterator-request-ShardIteratorType]:
> {noformat}
> ShardIteratorType determines how the shard iterator is used to start reading 
> stream records from the shard.
> {noformat}
> For example, for a given shard, when {{ShardIteratorType}} equal to 
> {{LATEST}}, the AWS SDK will read the most recent data in that particular 
> shard. However, when {{ShardIteratorType}} equal to {{LATEST}}, Camel will 
> additionally use {{ShardIteratorType}} to determine which shard it considers 
> amongst all the available ones in the stream: 
> https://github.com/apache/camel/blob/6119fdc379db343030bd25b191ab88bbec34d6b6/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/ShardIteratorHandler.java#L132
> If my understanding is correct, shards in DynamoDB are modelled as a tree, 
> with the child leaf nodes being the shards that are still active, i.e. the 
> ones where new stream data will appear. These child shards will have a 
> {{StartingSequenceNumber}}, but no {{EndingSequenceNumber}}.
> The most common case is to have a single shard, or a single branch of parent 
> and child nodes:
> {noformat}
> Shard0
>    |
> Shard1
> {noformat}
> In the above case, new data will be added to {{Shard1}}, and the Camel 
> implementation which  looks only at the last shard when {{ShardIteratorType}} 
> is equal to {{LATEST}}, will be correct.
> However, the tree can also look like this (see related example in the 
> attached JSON output from the AWS CLI, where the shard number matches the 
> index in the JSON list):
> {noformat}
>              Shard0
>             /      \
>      Shard1          Shard2
>     /      \        /      \ 
> Shard3   Shard4  Shard5   Shard6
> {noformat}
> In this case, Camel will only consider Shard6, even though new data may be 
> added to any of Shard3, Shard4, Shard5 or Shard6. This leads to updates being 
> missed.
> As far as I can tell, DynamoDB will split into multiple shards depending on 
> the number of table partitions, which will either grow for a table with huge 
> amounts of data, or when an exiting table with provisioned capacity is 
> migrated to on-demand provisioning.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to