http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java index 49e806d..4b2190f 100644 --- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java @@ -25,8 +25,10 @@ import static org.mockito.Matchers.anyListOf; import static org.mockito.Mockito.when; import com.amazonaws.services.kinesis.model.ExpiredIteratorException; + import java.io.IOException; import java.util.Collections; + import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -40,112 +42,114 @@ import org.mockito.stubbing.Answer; */ @RunWith(MockitoJUnitRunner.class) public class ShardRecordsIteratorTest { - private static final String INITIAL_ITERATOR = "INITIAL_ITERATOR"; - private static final String SECOND_ITERATOR = "SECOND_ITERATOR"; - private static final String SECOND_REFRESHED_ITERATOR = "SECOND_REFRESHED_ITERATOR"; - private static final String THIRD_ITERATOR = "THIRD_ITERATOR"; - private static final String STREAM_NAME = "STREAM_NAME"; - private static final String SHARD_ID = "SHARD_ID"; - - @Mock - private SimplifiedKinesisClient kinesisClient; - @Mock - private ShardCheckpoint firstCheckpoint, aCheckpoint, bCheckpoint, cCheckpoint, dCheckpoint; - @Mock - private GetKinesisRecordsResult firstResult, secondResult, thirdResult; - @Mock - private KinesisRecord a, b, c, d; - @Mock - private RecordFilter recordFilter; - - private ShardRecordsIterator iterator; - - @Before - public void setUp() throws IOException, TransientKinesisException { - when(firstCheckpoint.getShardIterator(kinesisClient)).thenReturn(INITIAL_ITERATOR); - when(firstCheckpoint.getStreamName()).thenReturn(STREAM_NAME); - when(firstCheckpoint.getShardId()).thenReturn(SHARD_ID); - - when(firstCheckpoint.moveAfter(a)).thenReturn(aCheckpoint); - when(aCheckpoint.moveAfter(b)).thenReturn(bCheckpoint); - when(aCheckpoint.getStreamName()).thenReturn(STREAM_NAME); - when(aCheckpoint.getShardId()).thenReturn(SHARD_ID); - when(bCheckpoint.moveAfter(c)).thenReturn(cCheckpoint); - when(bCheckpoint.getStreamName()).thenReturn(STREAM_NAME); - when(bCheckpoint.getShardId()).thenReturn(SHARD_ID); - when(cCheckpoint.moveAfter(d)).thenReturn(dCheckpoint); - when(cCheckpoint.getStreamName()).thenReturn(STREAM_NAME); - when(cCheckpoint.getShardId()).thenReturn(SHARD_ID); - when(dCheckpoint.getStreamName()).thenReturn(STREAM_NAME); - when(dCheckpoint.getShardId()).thenReturn(SHARD_ID); - - when(kinesisClient.getRecords(INITIAL_ITERATOR, STREAM_NAME, SHARD_ID)) - .thenReturn(firstResult); - when(kinesisClient.getRecords(SECOND_ITERATOR, STREAM_NAME, SHARD_ID)) - .thenReturn(secondResult); - when(kinesisClient.getRecords(THIRD_ITERATOR, STREAM_NAME, SHARD_ID)) - .thenReturn(thirdResult); - - when(firstResult.getNextShardIterator()).thenReturn(SECOND_ITERATOR); - when(secondResult.getNextShardIterator()).thenReturn(THIRD_ITERATOR); - when(thirdResult.getNextShardIterator()).thenReturn(THIRD_ITERATOR); - - when(firstResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList()); - when(secondResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList()); - when(thirdResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList()); - - when(recordFilter.apply(anyListOf(KinesisRecord.class), any(ShardCheckpoint - .class))).thenAnswer(new IdentityAnswer()); - - iterator = new ShardRecordsIterator(firstCheckpoint, kinesisClient, recordFilter); - } - - @Test - public void returnsAbsentIfNoRecordsPresent() throws IOException, TransientKinesisException { - assertThat(iterator.next()).isEqualTo(CustomOptional.absent()); - assertThat(iterator.next()).isEqualTo(CustomOptional.absent()); - assertThat(iterator.next()).isEqualTo(CustomOptional.absent()); - } - - @Test - public void goesThroughAvailableRecords() throws IOException, TransientKinesisException { - when(firstResult.getRecords()).thenReturn(asList(a, b, c)); - when(secondResult.getRecords()).thenReturn(singletonList(d)); - - assertThat(iterator.getCheckpoint()).isEqualTo(firstCheckpoint); - assertThat(iterator.next()).isEqualTo(CustomOptional.of(a)); - assertThat(iterator.getCheckpoint()).isEqualTo(aCheckpoint); - assertThat(iterator.next()).isEqualTo(CustomOptional.of(b)); - assertThat(iterator.getCheckpoint()).isEqualTo(bCheckpoint); - assertThat(iterator.next()).isEqualTo(CustomOptional.of(c)); - assertThat(iterator.getCheckpoint()).isEqualTo(cCheckpoint); - assertThat(iterator.next()).isEqualTo(CustomOptional.of(d)); - assertThat(iterator.getCheckpoint()).isEqualTo(dCheckpoint); - assertThat(iterator.next()).isEqualTo(CustomOptional.absent()); - assertThat(iterator.getCheckpoint()).isEqualTo(dCheckpoint); - } - - @Test - public void refreshesExpiredIterator() throws IOException, TransientKinesisException { - when(firstResult.getRecords()).thenReturn(singletonList(a)); - when(secondResult.getRecords()).thenReturn(singletonList(b)); - - when(kinesisClient.getRecords(SECOND_ITERATOR, STREAM_NAME, SHARD_ID)) - .thenThrow(ExpiredIteratorException.class); - when(aCheckpoint.getShardIterator(kinesisClient)) - .thenReturn(SECOND_REFRESHED_ITERATOR); - when(kinesisClient.getRecords(SECOND_REFRESHED_ITERATOR, STREAM_NAME, SHARD_ID)) - .thenReturn(secondResult); - - assertThat(iterator.next()).isEqualTo(CustomOptional.of(a)); - assertThat(iterator.next()).isEqualTo(CustomOptional.of(b)); - assertThat(iterator.next()).isEqualTo(CustomOptional.absent()); - } - private static class IdentityAnswer implements Answer<Object> { - @Override - public Object answer(InvocationOnMock invocation) throws Throwable { - return invocation.getArguments()[0]; - } + private static final String INITIAL_ITERATOR = "INITIAL_ITERATOR"; + private static final String SECOND_ITERATOR = "SECOND_ITERATOR"; + private static final String SECOND_REFRESHED_ITERATOR = "SECOND_REFRESHED_ITERATOR"; + private static final String THIRD_ITERATOR = "THIRD_ITERATOR"; + private static final String STREAM_NAME = "STREAM_NAME"; + private static final String SHARD_ID = "SHARD_ID"; + + @Mock + private SimplifiedKinesisClient kinesisClient; + @Mock + private ShardCheckpoint firstCheckpoint, aCheckpoint, bCheckpoint, cCheckpoint, dCheckpoint; + @Mock + private GetKinesisRecordsResult firstResult, secondResult, thirdResult; + @Mock + private KinesisRecord a, b, c, d; + @Mock + private RecordFilter recordFilter; + + private ShardRecordsIterator iterator; + + @Before + public void setUp() throws IOException, TransientKinesisException { + when(firstCheckpoint.getShardIterator(kinesisClient)).thenReturn(INITIAL_ITERATOR); + when(firstCheckpoint.getStreamName()).thenReturn(STREAM_NAME); + when(firstCheckpoint.getShardId()).thenReturn(SHARD_ID); + + when(firstCheckpoint.moveAfter(a)).thenReturn(aCheckpoint); + when(aCheckpoint.moveAfter(b)).thenReturn(bCheckpoint); + when(aCheckpoint.getStreamName()).thenReturn(STREAM_NAME); + when(aCheckpoint.getShardId()).thenReturn(SHARD_ID); + when(bCheckpoint.moveAfter(c)).thenReturn(cCheckpoint); + when(bCheckpoint.getStreamName()).thenReturn(STREAM_NAME); + when(bCheckpoint.getShardId()).thenReturn(SHARD_ID); + when(cCheckpoint.moveAfter(d)).thenReturn(dCheckpoint); + when(cCheckpoint.getStreamName()).thenReturn(STREAM_NAME); + when(cCheckpoint.getShardId()).thenReturn(SHARD_ID); + when(dCheckpoint.getStreamName()).thenReturn(STREAM_NAME); + when(dCheckpoint.getShardId()).thenReturn(SHARD_ID); + + when(kinesisClient.getRecords(INITIAL_ITERATOR, STREAM_NAME, SHARD_ID)) + .thenReturn(firstResult); + when(kinesisClient.getRecords(SECOND_ITERATOR, STREAM_NAME, SHARD_ID)) + .thenReturn(secondResult); + when(kinesisClient.getRecords(THIRD_ITERATOR, STREAM_NAME, SHARD_ID)) + .thenReturn(thirdResult); + + when(firstResult.getNextShardIterator()).thenReturn(SECOND_ITERATOR); + when(secondResult.getNextShardIterator()).thenReturn(THIRD_ITERATOR); + when(thirdResult.getNextShardIterator()).thenReturn(THIRD_ITERATOR); + + when(firstResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList()); + when(secondResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList()); + when(thirdResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList()); + + when(recordFilter.apply(anyListOf(KinesisRecord.class), any(ShardCheckpoint + .class))).thenAnswer(new IdentityAnswer()); + + iterator = new ShardRecordsIterator(firstCheckpoint, kinesisClient, recordFilter); + } + + @Test + public void returnsAbsentIfNoRecordsPresent() throws IOException, TransientKinesisException { + assertThat(iterator.next()).isEqualTo(CustomOptional.absent()); + assertThat(iterator.next()).isEqualTo(CustomOptional.absent()); + assertThat(iterator.next()).isEqualTo(CustomOptional.absent()); + } + + @Test + public void goesThroughAvailableRecords() throws IOException, TransientKinesisException { + when(firstResult.getRecords()).thenReturn(asList(a, b, c)); + when(secondResult.getRecords()).thenReturn(singletonList(d)); + + assertThat(iterator.getCheckpoint()).isEqualTo(firstCheckpoint); + assertThat(iterator.next()).isEqualTo(CustomOptional.of(a)); + assertThat(iterator.getCheckpoint()).isEqualTo(aCheckpoint); + assertThat(iterator.next()).isEqualTo(CustomOptional.of(b)); + assertThat(iterator.getCheckpoint()).isEqualTo(bCheckpoint); + assertThat(iterator.next()).isEqualTo(CustomOptional.of(c)); + assertThat(iterator.getCheckpoint()).isEqualTo(cCheckpoint); + assertThat(iterator.next()).isEqualTo(CustomOptional.of(d)); + assertThat(iterator.getCheckpoint()).isEqualTo(dCheckpoint); + assertThat(iterator.next()).isEqualTo(CustomOptional.absent()); + assertThat(iterator.getCheckpoint()).isEqualTo(dCheckpoint); + } + + @Test + public void refreshesExpiredIterator() throws IOException, TransientKinesisException { + when(firstResult.getRecords()).thenReturn(singletonList(a)); + when(secondResult.getRecords()).thenReturn(singletonList(b)); + + when(kinesisClient.getRecords(SECOND_ITERATOR, STREAM_NAME, SHARD_ID)) + .thenThrow(ExpiredIteratorException.class); + when(aCheckpoint.getShardIterator(kinesisClient)) + .thenReturn(SECOND_REFRESHED_ITERATOR); + when(kinesisClient.getRecords(SECOND_REFRESHED_ITERATOR, STREAM_NAME, SHARD_ID)) + .thenReturn(secondResult); + + assertThat(iterator.next()).isEqualTo(CustomOptional.of(a)); + assertThat(iterator.next()).isEqualTo(CustomOptional.of(b)); + assertThat(iterator.next()).isEqualTo(CustomOptional.absent()); + } + + private static class IdentityAnswer implements Answer<Object> { + + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + return invocation.getArguments()[0]; } + } }
http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java index 96434fd..2f8757c 100644 --- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java @@ -34,7 +34,9 @@ import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededExcepti import com.amazonaws.services.kinesis.model.Shard; import com.amazonaws.services.kinesis.model.ShardIteratorType; import com.amazonaws.services.kinesis.model.StreamDescription; + import java.util.List; + import org.joda.time.Instant; import org.junit.Test; import org.junit.runner.RunWith; @@ -46,179 +48,180 @@ import org.mockito.runners.MockitoJUnitRunner; */ @RunWith(MockitoJUnitRunner.class) public class SimplifiedKinesisClientTest { - private static final String STREAM = "stream"; - private static final String SHARD_1 = "shard-01"; - private static final String SHARD_2 = "shard-02"; - private static final String SHARD_3 = "shard-03"; - private static final String SHARD_ITERATOR = "iterator"; - private static final String SEQUENCE_NUMBER = "abc123"; - - @Mock - private AmazonKinesis kinesis; - @InjectMocks - private SimplifiedKinesisClient underTest; - - @Test - public void shouldReturnIteratorStartingWithSequenceNumber() throws Exception { - given(kinesis.getShardIterator(new GetShardIteratorRequest() - .withStreamName(STREAM) - .withShardId(SHARD_1) - .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER) - .withStartingSequenceNumber(SEQUENCE_NUMBER) - )).willReturn(new GetShardIteratorResult() - .withShardIterator(SHARD_ITERATOR)); - - String stream = underTest.getShardIterator(STREAM, SHARD_1, - ShardIteratorType.AT_SEQUENCE_NUMBER, SEQUENCE_NUMBER, null); - - assertThat(stream).isEqualTo(SHARD_ITERATOR); - } - - @Test - public void shouldReturnIteratorStartingWithTimestamp() throws Exception { - Instant timestamp = Instant.now(); - given(kinesis.getShardIterator(new GetShardIteratorRequest() - .withStreamName(STREAM) - .withShardId(SHARD_1) - .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER) - .withTimestamp(timestamp.toDate()) - )).willReturn(new GetShardIteratorResult() - .withShardIterator(SHARD_ITERATOR)); - - String stream = underTest.getShardIterator(STREAM, SHARD_1, - ShardIteratorType.AT_SEQUENCE_NUMBER, null, timestamp); - - assertThat(stream).isEqualTo(SHARD_ITERATOR); - } - - @Test - public void shouldHandleExpiredIterationExceptionForGetShardIterator() { - shouldHandleGetShardIteratorError(new ExpiredIteratorException(""), - ExpiredIteratorException.class); - } - - @Test - public void shouldHandleLimitExceededExceptionForGetShardIterator() { - shouldHandleGetShardIteratorError(new LimitExceededException(""), - TransientKinesisException.class); - } - - @Test - public void shouldHandleProvisionedThroughputExceededExceptionForGetShardIterator() { - shouldHandleGetShardIteratorError(new ProvisionedThroughputExceededException(""), - TransientKinesisException.class); - } - - @Test - public void shouldHandleServiceErrorForGetShardIterator() { - shouldHandleGetShardIteratorError(newAmazonServiceException(ErrorType.Service), - TransientKinesisException.class); - } - - @Test - public void shouldHandleClientErrorForGetShardIterator() { - shouldHandleGetShardIteratorError(newAmazonServiceException(ErrorType.Client), - RuntimeException.class); - } - - @Test - public void shouldHandleUnexpectedExceptionForGetShardIterator() { - shouldHandleGetShardIteratorError(new NullPointerException(), - RuntimeException.class); - } - - private void shouldHandleGetShardIteratorError( - Exception thrownException, - Class<? extends Exception> expectedExceptionClass) { - GetShardIteratorRequest request = new GetShardIteratorRequest() - .withStreamName(STREAM) - .withShardId(SHARD_1) - .withShardIteratorType(ShardIteratorType.LATEST); - - given(kinesis.getShardIterator(request)).willThrow(thrownException); - - try { - underTest.getShardIterator(STREAM, SHARD_1, ShardIteratorType.LATEST, null, null); - failBecauseExceptionWasNotThrown(expectedExceptionClass); - } catch (Exception e) { - assertThat(e).isExactlyInstanceOf(expectedExceptionClass); - } finally { - reset(kinesis); - } - } - - @Test - public void shouldListAllShards() throws Exception { - Shard shard1 = new Shard().withShardId(SHARD_1); - Shard shard2 = new Shard().withShardId(SHARD_2); - Shard shard3 = new Shard().withShardId(SHARD_3); - given(kinesis.describeStream(STREAM, null)).willReturn(new DescribeStreamResult() - .withStreamDescription(new StreamDescription() - .withShards(shard1, shard2) - .withHasMoreShards(true))); - given(kinesis.describeStream(STREAM, SHARD_2)).willReturn(new DescribeStreamResult() - .withStreamDescription(new StreamDescription() - .withShards(shard3) - .withHasMoreShards(false))); - - List<Shard> shards = underTest.listShards(STREAM); - - assertThat(shards).containsOnly(shard1, shard2, shard3); - } - - @Test - public void shouldHandleExpiredIterationExceptionForShardListing() { - shouldHandleShardListingError(new ExpiredIteratorException(""), - ExpiredIteratorException.class); - } - - @Test - public void shouldHandleLimitExceededExceptionForShardListing() { - shouldHandleShardListingError(new LimitExceededException(""), - TransientKinesisException.class); - } - - @Test - public void shouldHandleProvisionedThroughputExceededExceptionForShardListing() { - shouldHandleShardListingError(new ProvisionedThroughputExceededException(""), - TransientKinesisException.class); - } - @Test - public void shouldHandleServiceErrorForShardListing() { - shouldHandleShardListingError(newAmazonServiceException(ErrorType.Service), - TransientKinesisException.class); - } - - @Test - public void shouldHandleClientErrorForShardListing() { - shouldHandleShardListingError(newAmazonServiceException(ErrorType.Client), - RuntimeException.class); - } - - @Test - public void shouldHandleUnexpectedExceptionForShardListing() { - shouldHandleShardListingError(new NullPointerException(), - RuntimeException.class); - } - - private void shouldHandleShardListingError( - Exception thrownException, - Class<? extends Exception> expectedExceptionClass) { - given(kinesis.describeStream(STREAM, null)).willThrow(thrownException); - try { - underTest.listShards(STREAM); - failBecauseExceptionWasNotThrown(expectedExceptionClass); - } catch (Exception e) { - assertThat(e).isExactlyInstanceOf(expectedExceptionClass); - } finally { - reset(kinesis); - } - } - - private AmazonServiceException newAmazonServiceException(ErrorType errorType) { - AmazonServiceException exception = new AmazonServiceException(""); - exception.setErrorType(errorType); - return exception; - } + private static final String STREAM = "stream"; + private static final String SHARD_1 = "shard-01"; + private static final String SHARD_2 = "shard-02"; + private static final String SHARD_3 = "shard-03"; + private static final String SHARD_ITERATOR = "iterator"; + private static final String SEQUENCE_NUMBER = "abc123"; + + @Mock + private AmazonKinesis kinesis; + @InjectMocks + private SimplifiedKinesisClient underTest; + + @Test + public void shouldReturnIteratorStartingWithSequenceNumber() throws Exception { + given(kinesis.getShardIterator(new GetShardIteratorRequest() + .withStreamName(STREAM) + .withShardId(SHARD_1) + .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER) + .withStartingSequenceNumber(SEQUENCE_NUMBER) + )).willReturn(new GetShardIteratorResult() + .withShardIterator(SHARD_ITERATOR)); + + String stream = underTest.getShardIterator(STREAM, SHARD_1, + ShardIteratorType.AT_SEQUENCE_NUMBER, SEQUENCE_NUMBER, null); + + assertThat(stream).isEqualTo(SHARD_ITERATOR); + } + + @Test + public void shouldReturnIteratorStartingWithTimestamp() throws Exception { + Instant timestamp = Instant.now(); + given(kinesis.getShardIterator(new GetShardIteratorRequest() + .withStreamName(STREAM) + .withShardId(SHARD_1) + .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER) + .withTimestamp(timestamp.toDate()) + )).willReturn(new GetShardIteratorResult() + .withShardIterator(SHARD_ITERATOR)); + + String stream = underTest.getShardIterator(STREAM, SHARD_1, + ShardIteratorType.AT_SEQUENCE_NUMBER, null, timestamp); + + assertThat(stream).isEqualTo(SHARD_ITERATOR); + } + + @Test + public void shouldHandleExpiredIterationExceptionForGetShardIterator() { + shouldHandleGetShardIteratorError(new ExpiredIteratorException(""), + ExpiredIteratorException.class); + } + + @Test + public void shouldHandleLimitExceededExceptionForGetShardIterator() { + shouldHandleGetShardIteratorError(new LimitExceededException(""), + TransientKinesisException.class); + } + + @Test + public void shouldHandleProvisionedThroughputExceededExceptionForGetShardIterator() { + shouldHandleGetShardIteratorError(new ProvisionedThroughputExceededException(""), + TransientKinesisException.class); + } + + @Test + public void shouldHandleServiceErrorForGetShardIterator() { + shouldHandleGetShardIteratorError(newAmazonServiceException(ErrorType.Service), + TransientKinesisException.class); + } + + @Test + public void shouldHandleClientErrorForGetShardIterator() { + shouldHandleGetShardIteratorError(newAmazonServiceException(ErrorType.Client), + RuntimeException.class); + } + + @Test + public void shouldHandleUnexpectedExceptionForGetShardIterator() { + shouldHandleGetShardIteratorError(new NullPointerException(), + RuntimeException.class); + } + + private void shouldHandleGetShardIteratorError( + Exception thrownException, + Class<? extends Exception> expectedExceptionClass) { + GetShardIteratorRequest request = new GetShardIteratorRequest() + .withStreamName(STREAM) + .withShardId(SHARD_1) + .withShardIteratorType(ShardIteratorType.LATEST); + + given(kinesis.getShardIterator(request)).willThrow(thrownException); + + try { + underTest.getShardIterator(STREAM, SHARD_1, ShardIteratorType.LATEST, null, null); + failBecauseExceptionWasNotThrown(expectedExceptionClass); + } catch (Exception e) { + assertThat(e).isExactlyInstanceOf(expectedExceptionClass); + } finally { + reset(kinesis); + } + } + + @Test + public void shouldListAllShards() throws Exception { + Shard shard1 = new Shard().withShardId(SHARD_1); + Shard shard2 = new Shard().withShardId(SHARD_2); + Shard shard3 = new Shard().withShardId(SHARD_3); + given(kinesis.describeStream(STREAM, null)).willReturn(new DescribeStreamResult() + .withStreamDescription(new StreamDescription() + .withShards(shard1, shard2) + .withHasMoreShards(true))); + given(kinesis.describeStream(STREAM, SHARD_2)).willReturn(new DescribeStreamResult() + .withStreamDescription(new StreamDescription() + .withShards(shard3) + .withHasMoreShards(false))); + + List<Shard> shards = underTest.listShards(STREAM); + + assertThat(shards).containsOnly(shard1, shard2, shard3); + } + + @Test + public void shouldHandleExpiredIterationExceptionForShardListing() { + shouldHandleShardListingError(new ExpiredIteratorException(""), + ExpiredIteratorException.class); + } + + @Test + public void shouldHandleLimitExceededExceptionForShardListing() { + shouldHandleShardListingError(new LimitExceededException(""), + TransientKinesisException.class); + } + + @Test + public void shouldHandleProvisionedThroughputExceededExceptionForShardListing() { + shouldHandleShardListingError(new ProvisionedThroughputExceededException(""), + TransientKinesisException.class); + } + + @Test + public void shouldHandleServiceErrorForShardListing() { + shouldHandleShardListingError(newAmazonServiceException(ErrorType.Service), + TransientKinesisException.class); + } + + @Test + public void shouldHandleClientErrorForShardListing() { + shouldHandleShardListingError(newAmazonServiceException(ErrorType.Client), + RuntimeException.class); + } + + @Test + public void shouldHandleUnexpectedExceptionForShardListing() { + shouldHandleShardListingError(new NullPointerException(), + RuntimeException.class); + } + + private void shouldHandleShardListingError( + Exception thrownException, + Class<? extends Exception> expectedExceptionClass) { + given(kinesis.describeStream(STREAM, null)).willThrow(thrownException); + try { + underTest.listShards(STREAM); + failBecauseExceptionWasNotThrown(expectedExceptionClass); + } catch (Exception e) { + assertThat(e).isExactlyInstanceOf(expectedExceptionClass); + } finally { + reset(kinesis); + } + } + + private AmazonServiceException newAmazonServiceException(ErrorType errorType) { + AmazonServiceException exception = new AmazonServiceException(""); + exception.setErrorType(errorType); + return exception; + } }
