CAMEL-9515 Tests passing with resuming from a sequence number after an expired 
shard iterator.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/258f9c9c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/258f9c9c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/258f9c9c

Branch: refs/heads/master
Commit: 258f9c9ca808d6f97eced2b0b42db4e2cc5463f4
Parents: 9c99648
Author: Candle <[email protected]>
Authored: Wed Jan 20 11:26:37 2016 +0000
Committer: Claus Ibsen <[email protected]>
Committed: Thu Jan 21 10:10:26 2016 +0100

----------------------------------------------------------------------
 .../aws/ddbstream/ShardIteratorHandler.java     | 55 +++++++++++---------
 .../aws/ddbstream/ShardIteratorHandlerTest.java | 23 ++++++--
 2 files changed, 50 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/258f9c9c/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandler.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandler.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandler.java
index 685907b..1d20f8b 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandler.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandler.java
@@ -44,6 +44,14 @@ class ShardIteratorHandler {
     }
 
     String getShardIterator(String resumeFromSequenceNumber) {
+        ShardIteratorType iteratorType = getEndpoint().getIteratorType();
+        String sequenceNumber = getEndpoint().getSequenceNumber();
+        if (resumeFromSequenceNumber != null) {
+            iteratorType = ShardIteratorType.AFTER_SEQUENCE_NUMBER;
+            currentShard = null;
+            currentShardIterator = null;
+            sequenceNumber = resumeFromSequenceNumber;
+        }
         // either return a cached one or get a new one via a GetShardIterator 
request.
         if (currentShardIterator == null) {
             ListStreamsRequest req0 = new ListStreamsRequest()
@@ -57,34 +65,20 @@ class ShardIteratorHandler {
 
             LOG.trace("Current shard is: {} (in {})", currentShard, shardList);
             if (currentShard == null) {
-                switch(getEndpoint().getIteratorType()) {
-                case AFTER_SEQUENCE_NUMBER:
-                    currentShard = 
shardList.afterSeq(getEndpoint().getSequenceNumber());
-                    break;
-                case AT_SEQUENCE_NUMBER:
-                    currentShard = 
shardList.atSeq(getEndpoint().getSequenceNumber());
-                    break;
-                case TRIM_HORIZON:
-                    currentShard = shardList.first();
-                    break;
-                case LATEST:
-                default:
-                    currentShard = shardList.last();
-                    break;
-                }
+                currentShard = resolveNewShard(iteratorType, 
resumeFromSequenceNumber);
             } else {
                 currentShard = shardList.nextAfter(currentShard);
             }
             shardList.removeOlderThan(currentShard);
             LOG.trace("Next shard is: {} (in {})", currentShard, shardList);
-
             GetShardIteratorRequest req = new GetShardIteratorRequest()
                     .withStreamArn(streamArn)
                     .withShardId(currentShard.getShardId())
-                    .withShardIteratorType(getEndpoint().getIteratorType());
-            switch(getEndpoint().getIteratorType()) {
-            case AFTER_SEQUENCE_NUMBER:
-            case AT_SEQUENCE_NUMBER:
+                    .withShardIteratorType(iteratorType);
+            if (getEndpoint().getIteratorType() == 
ShardIteratorType.AFTER_SEQUENCE_NUMBER
+                    || getEndpoint().getIteratorType() == 
ShardIteratorType.AFTER_SEQUENCE_NUMBER
+                    || resumeFromSequenceNumber != null
+                    ) {
                 // if you request with a sequence number that is LESS than the
                 // start of the shard, you get a HTTP 400 from AWS.
                 // So only add the sequence number if the endpoints
@@ -94,16 +88,15 @@ class ShardIteratorHandler {
                 // because we get a 400 when we use one of the
                 // {at,after}_sequence_number iterator types and don't supply
                 // a sequence number.
+
                 if (BigIntComparisons.Conditions.LTEQ.matches(
                         new 
BigInteger(currentShard.getSequenceNumberRange().getStartingSequenceNumber()),
-                        new BigInteger(getEndpoint().getSequenceNumber())
+                        new BigInteger(sequenceNumber)
                 )) {
-                    req = 
req.withSequenceNumber(getEndpoint().getSequenceNumber());
+                    req = req.withSequenceNumber(sequenceNumber);
                 } else {
                     req = 
req.withShardIteratorType(ShardIteratorType.TRIM_HORIZON);
                 }
-                break;
-            default:
             }
             GetShardIteratorResult result = getClient().getShardIterator(req);
             currentShardIterator = result.getShardIterator();
@@ -112,6 +105,20 @@ class ShardIteratorHandler {
         return currentShardIterator;
     }
 
+    private Shard resolveNewShard(ShardIteratorType type, String resumeFrom) {
+        switch(type) {
+        case AFTER_SEQUENCE_NUMBER:
+            return shardList.afterSeq(resumeFrom != null ? resumeFrom : 
getEndpoint().getSequenceNumber());
+        case AT_SEQUENCE_NUMBER:
+            return shardList.atSeq(getEndpoint().getSequenceNumber());
+        case TRIM_HORIZON:
+            return shardList.first();
+        case LATEST:
+        default:
+            return shardList.last();
+        }
+    }
+
     void updateShardIterator(String nextShardIterator) {
         this.currentShardIterator = nextShardIterator;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/258f9c9c/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandlerTest.java
 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandlerTest.java
index c54dbef..126a641 100644
--- 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandlerTest.java
+++ 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandlerTest.java
@@ -28,6 +28,7 @@ import com.amazonaws.services.dynamodbv2.model.Stream;
 import com.amazonaws.services.dynamodbv2.model.StreamDescription;
 import org.apache.camel.CamelContext;
 import org.apache.camel.impl.DefaultCamelContext;
+import org.hamcrest.CoreMatchers;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -160,15 +161,29 @@ public class ShardIteratorHandlerTest {
     }
 
     @Test
-    public void afterSeqNumber16StartsWithShardC() throws Exception {
-        endpoint.setIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER);
+    public void afterSeqNumber16StartsWithShardD() throws Exception {
+        endpoint.setIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
         endpoint.setSequenceNumberProvider(new 
StaticSequenceNumberProvider("16"));
 
         String shardIterator = undertest.getShardIterator(null);
 
         ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = 
ArgumentCaptor.forClass(GetShardIteratorRequest.class);
         
verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture());
-        assertThat(getIteratorCaptor.getValue().getShardId(), is("c"));
-        assertThat(shardIterator, is("shard_iterator_c_000"));
+        assertThat(getIteratorCaptor.getValue().getShardId(), is("d"));
+        assertThat(shardIterator, is("shard_iterator_d_000"));
+    }
+
+    @Test
+    public void resumingFromSomewhereActuallyUsesTheAfterSequenceNumber() 
throws Exception {
+        endpoint.setIteratorType(ShardIteratorType.LATEST);
+
+        String shardIterator = undertest.getShardIterator("12");
+
+        ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = 
ArgumentCaptor.forClass(GetShardIteratorRequest.class);
+        
verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture());
+        assertThat(getIteratorCaptor.getValue().getShardId(), is("b"));
+        assertThat(shardIterator, is("shard_iterator_b_000"));
+        assertThat(getIteratorCaptor.getValue().getShardIteratorType(), 
is(ShardIteratorType.AFTER_SEQUENCE_NUMBER.name()));
+        assertThat(getIteratorCaptor.getValue().getSequenceNumber(), is("12"));
     }
 }
\ No newline at end of file

Reply via email to