CAMEL-9515 refactor tests to their relevent test classes.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0e92c63a Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0e92c63a Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0e92c63a Branch: refs/heads/master Commit: 0e92c63a7c5ddaee96c13229320b08a7b9be63ae Parents: 8f6dfa4 Author: Candle <[email protected]> Authored: Tue Jan 19 14:54:45 2016 +0000 Committer: Claus Ibsen <[email protected]> Committed: Thu Jan 21 10:10:25 2016 +0100 ---------------------------------------------------------------------- .../aws/ddbstream/DdbStreamConsumer.java | 6 +- .../aws/ddbstream/DdbStreamConsumerTest.java | 138 +------------- .../aws/ddbstream/ShardIteratorHandlerTest.java | 179 +++++++++++++++++++ 3 files changed, 189 insertions(+), 134 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/0e92c63a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java index 4bdb5c6..c407df6 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java @@ -40,8 +40,12 @@ public class DdbStreamConsumer extends ScheduledBatchPollingConsumer { private final ShardIteratorHandler shardIteratorHandler; public DdbStreamConsumer(DdbStreamEndpoint endpoint, Processor processor) { + this(endpoint, processor, new ShardIteratorHandler(endpoint)); + } + + DdbStreamConsumer(DdbStreamEndpoint endpoint, Processor processor, ShardIteratorHandler shardIteratorHandler) { super(endpoint, processor); - shardIteratorHandler = new ShardIteratorHandler(endpoint); + this.shardIteratorHandler = shardIteratorHandler; } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/0e92c63a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumerTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumerTest.java index 0cbf9ca..13e6d92 100644 --- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumerTest.java +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumerTest.java @@ -25,18 +25,10 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams; -import com.amazonaws.services.dynamodbv2.model.DescribeStreamRequest; -import com.amazonaws.services.dynamodbv2.model.DescribeStreamResult; import com.amazonaws.services.dynamodbv2.model.GetRecordsRequest; import com.amazonaws.services.dynamodbv2.model.GetRecordsResult; -import com.amazonaws.services.dynamodbv2.model.GetShardIteratorRequest; -import com.amazonaws.services.dynamodbv2.model.GetShardIteratorResult; -import com.amazonaws.services.dynamodbv2.model.ListStreamsRequest; -import com.amazonaws.services.dynamodbv2.model.ListStreamsResult; import com.amazonaws.services.dynamodbv2.model.Record; import com.amazonaws.services.dynamodbv2.model.ShardIteratorType; -import com.amazonaws.services.dynamodbv2.model.Stream; -import com.amazonaws.services.dynamodbv2.model.StreamDescription; import com.amazonaws.services.dynamodbv2.model.StreamRecord; import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; @@ -66,48 +58,16 @@ public class DdbStreamConsumerTest { @Mock private AmazonDynamoDBStreams amazonDynamoDBStreams; @Mock private AsyncProcessor processor; + @Mock private ShardIteratorHandler shardIteratorHandler; private final CamelContext context = new DefaultCamelContext(); private final DdbStreamComponent component = new DdbStreamComponent(context); - private DdbStreamEndpoint endpoint = new DdbStreamEndpoint(null, "table_name", component); - - private final String[] seqNums = new String[]{"2", "9", "11", "13", "14", "21", "25", "30", "35", "40"}; + private final DdbStreamEndpoint endpoint = new DdbStreamEndpoint(null, "table_name", component); @Before public void setup() throws Exception { endpoint.setAmazonDynamoDbStreamsClient(amazonDynamoDBStreams); - when(amazonDynamoDBStreams.listStreams(any(ListStreamsRequest.class))).thenReturn( - new ListStreamsResult() - .withStreams(new Stream() - .withStreamArn("arn:aws:dynamodb:region:12345:table/table_name/stream/timestamp") - ) - ); - - when(amazonDynamoDBStreams.describeStream(any(DescribeStreamRequest.class))).thenReturn( - new DescribeStreamResult() - .withStreamDescription( - new StreamDescription() - .withTableName("table_name") - .withShards( - ShardListTest.createShardsWithSequenceNumbers(null, - "a", "1", "5", - "b", "8", "15", - "c", "16", "16", - "d", "20", null - ) - ) - ) - ); - - when(amazonDynamoDBStreams.getShardIterator(any(GetShardIteratorRequest.class))).thenAnswer(new Answer<GetShardIteratorResult>() { - @Override - public GetShardIteratorResult answer(InvocationOnMock invocation) throws Throwable { - return new GetShardIteratorResult() - .withShardIterator("shard_iterator_" - + ((GetShardIteratorRequest) invocation.getArguments()[0]).getShardId() - + "_000"); - } - }); + undertest = new DdbStreamConsumer(endpoint, processor, shardIteratorHandler); final Map<String, String> shardIterators = new HashMap<>(); shardIterators.put("shard_iterator_a_000", "shard_iterator_a_001"); @@ -159,98 +119,10 @@ public class DdbStreamConsumerTest { } @Test - public void latestOnlyUsesTheLastShard() throws Exception { - endpoint.setIteratorType(ShardIteratorType.LATEST); - undertest = new DdbStreamConsumer(endpoint, processor); - - undertest.poll(); - - ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class); - verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture()); - assertThat(getIteratorCaptor.getValue().getShardId(), is("d")); - } - - @Test - public void latestWithTwoPolls() throws Exception { - endpoint.setIteratorType(ShardIteratorType.LATEST); - undertest = new DdbStreamConsumer(endpoint, processor); - - undertest.poll(); - undertest.poll(); - - ArgumentCaptor<GetRecordsRequest> getRecordsCaptor = ArgumentCaptor.forClass(GetRecordsRequest.class); - verify(amazonDynamoDBStreams, times(2)).getRecords(getRecordsCaptor.capture()); - assertThat(getRecordsCaptor.getAllValues().get(0).getShardIterator(), is("shard_iterator_d_000")); - assertThat(getRecordsCaptor.getAllValues().get(1).getShardIterator(), is("shard_iterator_d_001")); - } - - @Test - public void trimHorizonStartsWithTheFirstShard() throws Exception { - endpoint.setIteratorType(ShardIteratorType.TRIM_HORIZON); - undertest = new DdbStreamConsumer(endpoint, processor); - - undertest.poll(); - - ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class); - verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture()); - assertThat(getIteratorCaptor.getValue().getShardId(), is("a")); - } - - @Test - public void trimHorizonWalksAllShards() throws Exception { - endpoint.setIteratorType(ShardIteratorType.TRIM_HORIZON); - undertest = new DdbStreamConsumer(endpoint, processor); - - for (int i = 0; i < 9; ++i) { - undertest.poll(); - } - - ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class); - verify(amazonDynamoDBStreams, times(4)).getShardIterator(getIteratorCaptor.capture()); - assertThat(getIteratorCaptor.getAllValues().get(0).getShardId(), is("a")); - assertThat(getIteratorCaptor.getAllValues().get(1).getShardId(), is("b")); - assertThat(getIteratorCaptor.getAllValues().get(2).getShardId(), is("c")); - assertThat(getIteratorCaptor.getAllValues().get(3).getShardId(), is("d")); - - ArgumentCaptor<Exchange> exchangeCaptor = ArgumentCaptor.forClass(Exchange.class); - verify(processor, times(seqNums.length)).process(exchangeCaptor.capture(), any(AsyncCallback.class)); - - for (int i = 0; i < seqNums.length; ++i) { - assertThat(exchangeCaptor.getAllValues().get(i).getIn().getBody(Record.class).getDynamodb().getSequenceNumber(), is(seqNums[i])); - } - } - - @Test - public void atSeqNumber12StartsWithShardB() throws Exception { - endpoint.setIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER); - endpoint.setSequenceNumberProvider(new StaticSequenceNumberProvider("12")); - undertest = new DdbStreamConsumer(endpoint, processor); - - undertest.poll(); - - ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class); - verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture()); - assertThat(getIteratorCaptor.getValue().getShardId(), is("b")); - } - - @Test - public void afterSeqNumber16StartsWithShardC() throws Exception { - endpoint.setIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER); - endpoint.setSequenceNumberProvider(new StaticSequenceNumberProvider("16")); - undertest = new DdbStreamConsumer(endpoint, processor); - - undertest.poll(); - - ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class); - verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture()); - assertThat(getIteratorCaptor.getValue().getShardId(), is("c")); - } - - @Test public void atSeqNumber35GivesFirstRecordWithSeq35() throws Exception { endpoint.setIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER); endpoint.setSequenceNumberProvider(new StaticSequenceNumberProvider("35")); - undertest = new DdbStreamConsumer(endpoint, processor); + when(shardIteratorHandler.getShardIterator()).thenReturn("shard_iterator_d_001", "shard_iterator_d_002"); for (int i = 0; i < 10; ++i) { // poll lots. undertest.poll(); @@ -267,7 +139,7 @@ public class DdbStreamConsumerTest { public void afterSeqNumber35GivesFirstRecordWithSeq40() throws Exception { endpoint.setIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER); endpoint.setSequenceNumberProvider(new StaticSequenceNumberProvider("35")); - undertest = new DdbStreamConsumer(endpoint, processor); + when(shardIteratorHandler.getShardIterator()).thenReturn("shard_iterator_d_001", "shard_iterator_d_002"); for (int i = 0; i < 10; ++i) { // poll lots. undertest.poll(); http://git-wip-us.apache.org/repos/asf/camel/blob/0e92c63a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandlerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandlerTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandlerTest.java new file mode 100644 index 0000000..43a4c2e --- /dev/null +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandlerTest.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.ddbstream; + +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams; +import com.amazonaws.services.dynamodbv2.model.DescribeStreamRequest; +import com.amazonaws.services.dynamodbv2.model.DescribeStreamResult; +import com.amazonaws.services.dynamodbv2.model.GetShardIteratorRequest; +import com.amazonaws.services.dynamodbv2.model.GetShardIteratorResult; +import com.amazonaws.services.dynamodbv2.model.ListStreamsRequest; +import com.amazonaws.services.dynamodbv2.model.ListStreamsResult; +import com.amazonaws.services.dynamodbv2.model.ShardIteratorType; +import com.amazonaws.services.dynamodbv2.model.Stream; +import com.amazonaws.services.dynamodbv2.model.StreamDescription; +import org.apache.camel.CamelContext; +import org.apache.camel.impl.DefaultCamelContext; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.*; + +@RunWith(MockitoJUnitRunner.class) +public class ShardIteratorHandlerTest { + + private ShardIteratorHandler undertest; + + @Mock private AmazonDynamoDBStreams amazonDynamoDBStreams; + private final CamelContext context = new DefaultCamelContext(); + private final DdbStreamComponent component = new DdbStreamComponent(context); + private final DdbStreamEndpoint endpoint = new DdbStreamEndpoint(null, "table_name", component); + + @Before + public void setup() throws Exception { + endpoint.setAmazonDynamoDbStreamsClient(amazonDynamoDBStreams); + + + undertest = new ShardIteratorHandler(endpoint); + + when(amazonDynamoDBStreams.listStreams(any(ListStreamsRequest.class))).thenReturn( + new ListStreamsResult() + .withStreams(new Stream() + .withStreamArn("arn:aws:dynamodb:region:12345:table/table_name/stream/timestamp") + ) + ); + + when(amazonDynamoDBStreams.describeStream(any(DescribeStreamRequest.class))).thenReturn( + new DescribeStreamResult() + .withStreamDescription( + new StreamDescription() + .withTableName("table_name") + .withShards( + ShardListTest.createShardsWithSequenceNumbers(null, + "a", "1", "5", + "b", "8", "15", + "c", "16", "16", + "d", "20", null + ) + ) + ) + ); + + when(amazonDynamoDBStreams.getShardIterator(any(GetShardIteratorRequest.class))).thenAnswer(new Answer<GetShardIteratorResult>() { + @Override + public GetShardIteratorResult answer(InvocationOnMock invocation) throws Throwable { + return new GetShardIteratorResult() + .withShardIterator("shard_iterator_" + + ((GetShardIteratorRequest) invocation.getArguments()[0]).getShardId() + + "_000"); + } + }); + } + + + @Test + public void latestOnlyUsesTheLastShard() throws Exception { + endpoint.setIteratorType(ShardIteratorType.LATEST); + + String shardIterator = undertest.getShardIterator(); + + ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class); + verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture()); + assertThat(getIteratorCaptor.getValue().getShardId(), is("d")); + assertThat(shardIterator, is("shard_iterator_d_000")); + } + + @Test + public void cachesRecentShardId() throws Exception { + endpoint.setIteratorType(ShardIteratorType.LATEST); + + undertest.updateShardIterator("bar"); + String shardIterator = undertest.getShardIterator(); + + verify(amazonDynamoDBStreams, times(0)).getShardIterator(any(GetShardIteratorRequest.class)); + assertThat(shardIterator, is("bar")); + } + + @Test + public void trimHorizonStartsWithTheFirstShard() throws Exception { + endpoint.setIteratorType(ShardIteratorType.TRIM_HORIZON); + + String shardIterator = undertest.getShardIterator(); + + ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class); + verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture()); + assertThat(getIteratorCaptor.getValue().getShardId(), is("a")); + assertThat(shardIterator, is("shard_iterator_a_000")); + } + + + @Test + public void trimHorizonWalksAllShards() throws Exception { + endpoint.setIteratorType(ShardIteratorType.TRIM_HORIZON); + + String[] shardIterators = new String[4]; + + for (int i = 0; i < shardIterators.length; ++i) { + shardIterators[i] = undertest.getShardIterator(); + undertest.updateShardIterator(null); + } + + ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class); + verify(amazonDynamoDBStreams, times(4)).getShardIterator(getIteratorCaptor.capture()); + String[] shards = new String[]{"a", "b", "c", "d"}; + for (int i = 0; i < shards.length; ++i) { + assertThat(getIteratorCaptor.getAllValues().get(i).getShardId(), is(shards[i])); + } + assertThat(shardIterators, is(new String[]{"shard_iterator_a_000", "shard_iterator_b_000", "shard_iterator_c_000", "shard_iterator_d_000"})); + + } + + @Test + public void atSeqNumber12StartsWithShardB() throws Exception { + endpoint.setIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER); + endpoint.setSequenceNumberProvider(new StaticSequenceNumberProvider("12")); + + String shardIterator = undertest.getShardIterator(); + + ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class); + verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture()); + assertThat(getIteratorCaptor.getValue().getShardId(), is("b")); + assertThat(shardIterator, is("shard_iterator_b_000")); + } + + @Test + public void afterSeqNumber16StartsWithShardC() throws Exception { + endpoint.setIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER); + endpoint.setSequenceNumberProvider(new StaticSequenceNumberProvider("16")); + + String shardIterator = undertest.getShardIterator(); + + ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class); + verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture()); + assertThat(getIteratorCaptor.getValue().getShardId(), is("c")); + assertThat(shardIterator, is("shard_iterator_c_000")); + } + +} \ No newline at end of file
