CAMEL-9515 Tests passing with resuming from a sequence number after an expired shard iterator.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/258f9c9c Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/258f9c9c Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/258f9c9c Branch: refs/heads/master Commit: 258f9c9ca808d6f97eced2b0b42db4e2cc5463f4 Parents: 9c99648 Author: Candle <[email protected]> Authored: Wed Jan 20 11:26:37 2016 +0000 Committer: Claus Ibsen <[email protected]> Committed: Thu Jan 21 10:10:26 2016 +0100 ---------------------------------------------------------------------- .../aws/ddbstream/ShardIteratorHandler.java | 55 +++++++++++--------- .../aws/ddbstream/ShardIteratorHandlerTest.java | 23 ++++++-- 2 files changed, 50 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/258f9c9c/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandler.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandler.java index 685907b..1d20f8b 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandler.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandler.java @@ -44,6 +44,14 @@ class ShardIteratorHandler { } String getShardIterator(String resumeFromSequenceNumber) { + ShardIteratorType iteratorType = getEndpoint().getIteratorType(); + String sequenceNumber = getEndpoint().getSequenceNumber(); + if (resumeFromSequenceNumber != null) { + iteratorType = ShardIteratorType.AFTER_SEQUENCE_NUMBER; + currentShard = null; + currentShardIterator = null; + sequenceNumber = resumeFromSequenceNumber; + } // either return a cached one or get a new one via a GetShardIterator request. if (currentShardIterator == null) { ListStreamsRequest req0 = new ListStreamsRequest() @@ -57,34 +65,20 @@ class ShardIteratorHandler { LOG.trace("Current shard is: {} (in {})", currentShard, shardList); if (currentShard == null) { - switch(getEndpoint().getIteratorType()) { - case AFTER_SEQUENCE_NUMBER: - currentShard = shardList.afterSeq(getEndpoint().getSequenceNumber()); - break; - case AT_SEQUENCE_NUMBER: - currentShard = shardList.atSeq(getEndpoint().getSequenceNumber()); - break; - case TRIM_HORIZON: - currentShard = shardList.first(); - break; - case LATEST: - default: - currentShard = shardList.last(); - break; - } + currentShard = resolveNewShard(iteratorType, resumeFromSequenceNumber); } else { currentShard = shardList.nextAfter(currentShard); } shardList.removeOlderThan(currentShard); LOG.trace("Next shard is: {} (in {})", currentShard, shardList); - GetShardIteratorRequest req = new GetShardIteratorRequest() .withStreamArn(streamArn) .withShardId(currentShard.getShardId()) - .withShardIteratorType(getEndpoint().getIteratorType()); - switch(getEndpoint().getIteratorType()) { - case AFTER_SEQUENCE_NUMBER: - case AT_SEQUENCE_NUMBER: + .withShardIteratorType(iteratorType); + if (getEndpoint().getIteratorType() == ShardIteratorType.AFTER_SEQUENCE_NUMBER + || getEndpoint().getIteratorType() == ShardIteratorType.AFTER_SEQUENCE_NUMBER + || resumeFromSequenceNumber != null + ) { // if you request with a sequence number that is LESS than the // start of the shard, you get a HTTP 400 from AWS. // So only add the sequence number if the endpoints @@ -94,16 +88,15 @@ class ShardIteratorHandler { // because we get a 400 when we use one of the // {at,after}_sequence_number iterator types and don't supply // a sequence number. + if (BigIntComparisons.Conditions.LTEQ.matches( new BigInteger(currentShard.getSequenceNumberRange().getStartingSequenceNumber()), - new BigInteger(getEndpoint().getSequenceNumber()) + new BigInteger(sequenceNumber) )) { - req = req.withSequenceNumber(getEndpoint().getSequenceNumber()); + req = req.withSequenceNumber(sequenceNumber); } else { req = req.withShardIteratorType(ShardIteratorType.TRIM_HORIZON); } - break; - default: } GetShardIteratorResult result = getClient().getShardIterator(req); currentShardIterator = result.getShardIterator(); @@ -112,6 +105,20 @@ class ShardIteratorHandler { return currentShardIterator; } + private Shard resolveNewShard(ShardIteratorType type, String resumeFrom) { + switch(type) { + case AFTER_SEQUENCE_NUMBER: + return shardList.afterSeq(resumeFrom != null ? resumeFrom : getEndpoint().getSequenceNumber()); + case AT_SEQUENCE_NUMBER: + return shardList.atSeq(getEndpoint().getSequenceNumber()); + case TRIM_HORIZON: + return shardList.first(); + case LATEST: + default: + return shardList.last(); + } + } + void updateShardIterator(String nextShardIterator) { this.currentShardIterator = nextShardIterator; } http://git-wip-us.apache.org/repos/asf/camel/blob/258f9c9c/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandlerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandlerTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandlerTest.java index c54dbef..126a641 100644 --- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandlerTest.java +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandlerTest.java @@ -28,6 +28,7 @@ import com.amazonaws.services.dynamodbv2.model.Stream; import com.amazonaws.services.dynamodbv2.model.StreamDescription; import org.apache.camel.CamelContext; import org.apache.camel.impl.DefaultCamelContext; +import org.hamcrest.CoreMatchers; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -160,15 +161,29 @@ public class ShardIteratorHandlerTest { } @Test - public void afterSeqNumber16StartsWithShardC() throws Exception { - endpoint.setIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER); + public void afterSeqNumber16StartsWithShardD() throws Exception { + endpoint.setIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER); endpoint.setSequenceNumberProvider(new StaticSequenceNumberProvider("16")); String shardIterator = undertest.getShardIterator(null); ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class); verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture()); - assertThat(getIteratorCaptor.getValue().getShardId(), is("c")); - assertThat(shardIterator, is("shard_iterator_c_000")); + assertThat(getIteratorCaptor.getValue().getShardId(), is("d")); + assertThat(shardIterator, is("shard_iterator_d_000")); + } + + @Test + public void resumingFromSomewhereActuallyUsesTheAfterSequenceNumber() throws Exception { + endpoint.setIteratorType(ShardIteratorType.LATEST); + + String shardIterator = undertest.getShardIterator("12"); + + ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class); + verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture()); + assertThat(getIteratorCaptor.getValue().getShardId(), is("b")); + assertThat(shardIterator, is("shard_iterator_b_000")); + assertThat(getIteratorCaptor.getValue().getShardIteratorType(), is(ShardIteratorType.AFTER_SEQUENCE_NUMBER.name())); + assertThat(getIteratorCaptor.getValue().getSequenceNumber(), is("12")); } } \ No newline at end of file
