http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java index 80c950f..3e3984a 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.kinesis; + import com.amazonaws.AmazonServiceException; import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord; @@ -30,11 +31,9 @@ import com.amazonaws.services.kinesis.model.Shard; import com.amazonaws.services.kinesis.model.ShardIteratorType; import com.amazonaws.services.kinesis.model.StreamDescription; import com.google.common.collect.Lists; - import java.util.Date; import java.util.List; import java.util.concurrent.Callable; - import org.joda.time.Instant; /** @@ -42,121 +41,117 @@ import org.joda.time.Instant; * proper error handling. */ class SimplifiedKinesisClient { + private final AmazonKinesis kinesis; - private final AmazonKinesis kinesis; - - public SimplifiedKinesisClient(AmazonKinesis kinesis) { - this.kinesis = kinesis; - } - - public static SimplifiedKinesisClient from(KinesisClientProvider provider) { - return new SimplifiedKinesisClient(provider.get()); - } - - public String getShardIterator(final String streamName, final String shardId, - final ShardIteratorType shardIteratorType, - final String startingSequenceNumber, final Instant timestamp) - throws TransientKinesisException { - final Date date = timestamp != null ? timestamp.toDate() : null; - return wrapExceptions(new Callable<String>() { - - @Override - public String call() throws Exception { - return kinesis.getShardIterator(new GetShardIteratorRequest() - .withStreamName(streamName) - .withShardId(shardId) - .withShardIteratorType(shardIteratorType) - .withStartingSequenceNumber(startingSequenceNumber) - .withTimestamp(date) - ).getShardIterator(); - } - }); - } - - public List<Shard> listShards(final String streamName) throws TransientKinesisException { - return wrapExceptions(new Callable<List<Shard>>() { - - @Override - public List<Shard> call() throws Exception { - List<Shard> shards = Lists.newArrayList(); - String lastShardId = null; - - StreamDescription description; - do { - description = kinesis.describeStream(streamName, lastShardId) - .getStreamDescription(); + public SimplifiedKinesisClient(AmazonKinesis kinesis) { + this.kinesis = kinesis; + } - shards.addAll(description.getShards()); - lastShardId = shards.get(shards.size() - 1).getShardId(); - } while (description.getHasMoreShards()); + public static SimplifiedKinesisClient from(KinesisClientProvider provider) { + return new SimplifiedKinesisClient(provider.get()); + } - return shards; - } - }); - } + public String getShardIterator(final String streamName, final String shardId, + final ShardIteratorType shardIteratorType, + final String startingSequenceNumber, final Instant timestamp) + throws TransientKinesisException { + final Date date = timestamp != null ? timestamp.toDate() : null; + return wrapExceptions(new Callable<String>() { + @Override + public String call() throws Exception { + return kinesis.getShardIterator(new GetShardIteratorRequest() + .withStreamName(streamName) + .withShardId(shardId) + .withShardIteratorType(shardIteratorType) + .withStartingSequenceNumber(startingSequenceNumber) + .withTimestamp(date) + ).getShardIterator(); + } + }); + } - /** - * Gets records from Kinesis and deaggregates them if needed. - * - * @return list of deaggregated records - * @throws TransientKinesisException - in case of recoverable situation - */ - public GetKinesisRecordsResult getRecords(String shardIterator, String streamName, - String shardId) throws TransientKinesisException { - return getRecords(shardIterator, streamName, shardId, null); - } + public List<Shard> listShards(final String streamName) throws TransientKinesisException { + return wrapExceptions(new Callable<List<Shard>>() { + @Override + public List<Shard> call() throws Exception { + List<Shard> shards = Lists.newArrayList(); + String lastShardId = null; + + StreamDescription description; + do { + description = kinesis.describeStream(streamName, lastShardId) + .getStreamDescription(); + + shards.addAll(description.getShards()); + lastShardId = shards.get(shards.size() - 1).getShardId(); + } while (description.getHasMoreShards()); + + return shards; + } + }); + } - /** - * Gets records from Kinesis and deaggregates them if needed. - * - * @return list of deaggregated records - * @throws TransientKinesisException - in case of recoverable situation - */ - public GetKinesisRecordsResult getRecords(final String shardIterator, final String streamName, - final String shardId, final Integer limit) - throws - TransientKinesisException { - return wrapExceptions(new Callable<GetKinesisRecordsResult>() { + /** + * Gets records from Kinesis and deaggregates them if needed. + * + * @return list of deaggregated records + * @throws TransientKinesisException - in case of recoverable situation + */ + public GetKinesisRecordsResult getRecords(String shardIterator, String streamName, + String shardId) throws TransientKinesisException { + return getRecords(shardIterator, streamName, shardId, null); + } - @Override - public GetKinesisRecordsResult call() throws Exception { - GetRecordsResult response = kinesis.getRecords(new GetRecordsRequest() - .withShardIterator(shardIterator) - .withLimit(limit)); - return new GetKinesisRecordsResult( - UserRecord.deaggregate(response.getRecords()), - response.getNextShardIterator(), - streamName, shardId); - } - }); - } + /** + * Gets records from Kinesis and deaggregates them if needed. + * + * @return list of deaggregated records + * @throws TransientKinesisException - in case of recoverable situation + */ + public GetKinesisRecordsResult getRecords(final String shardIterator, final String streamName, + final String shardId, final Integer limit) + throws + TransientKinesisException { + return wrapExceptions(new Callable<GetKinesisRecordsResult>() { + @Override + public GetKinesisRecordsResult call() throws Exception { + GetRecordsResult response = kinesis.getRecords(new GetRecordsRequest() + .withShardIterator(shardIterator) + .withLimit(limit)); + return new GetKinesisRecordsResult( + UserRecord.deaggregate(response.getRecords()), + response.getNextShardIterator(), + streamName, shardId); + } + }); + } - /** - * Wraps Amazon specific exceptions into more friendly format. - * - * @throws TransientKinesisException - in case of recoverable situation, i.e. - * the request rate is too high, Kinesis remote service - * failed, network issue, etc. - * @throws ExpiredIteratorException - if iterator needs to be refreshed - * @throws RuntimeException - in all other cases - */ - private <T> T wrapExceptions(Callable<T> callable) throws TransientKinesisException { - try { - return callable.call(); - } catch (ExpiredIteratorException e) { - throw e; - } catch (LimitExceededException | ProvisionedThroughputExceededException e) { - throw new TransientKinesisException( - "Too many requests to Kinesis. Wait some time and retry.", e); - } catch (AmazonServiceException e) { - if (e.getErrorType() == AmazonServiceException.ErrorType.Service) { - throw new TransientKinesisException( - "Kinesis backend failed. Wait some time and retry.", e); - } - throw new RuntimeException("Kinesis client side failure", e); - } catch (Exception e) { - throw new RuntimeException("Unknown kinesis failure, when trying to reach kinesis", e); + /** + * Wraps Amazon specific exceptions into more friendly format. + * + * @throws TransientKinesisException - in case of recoverable situation, i.e. + * the request rate is too high, Kinesis remote service + * failed, network issue, etc. + * @throws ExpiredIteratorException - if iterator needs to be refreshed + * @throws RuntimeException - in all other cases + */ + private <T> T wrapExceptions(Callable<T> callable) throws TransientKinesisException { + try { + return callable.call(); + } catch (ExpiredIteratorException e) { + throw e; + } catch (LimitExceededException | ProvisionedThroughputExceededException e) { + throw new TransientKinesisException( + "Too many requests to Kinesis. Wait some time and retry.", e); + } catch (AmazonServiceException e) { + if (e.getErrorType() == AmazonServiceException.ErrorType.Service) { + throw new TransientKinesisException( + "Kinesis backend failed. Wait some time and retry.", e); + } + throw new RuntimeException("Kinesis client side failure", e); + } catch (Exception e) { + throw new RuntimeException("Unknown kinesis failure, when trying to reach kinesis", e); + } } - } }
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java index f9298fa..d8842c4 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java @@ -17,14 +17,13 @@ */ package org.apache.beam.sdk.io.kinesis; + import static com.google.common.base.Preconditions.checkNotNull; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; import com.amazonaws.services.kinesis.model.ShardIteratorType; - import java.io.Serializable; import java.util.Objects; - import org.joda.time.Instant; /** @@ -33,55 +32,54 @@ import org.joda.time.Instant; * in which case the reader will start reading at the specified point in time. */ class StartingPoint implements Serializable { + private final InitialPositionInStream position; + private final Instant timestamp; - private final InitialPositionInStream position; - private final Instant timestamp; - - public StartingPoint(InitialPositionInStream position) { - this.position = checkNotNull(position, "position"); - this.timestamp = null; - } - - public StartingPoint(Instant timestamp) { - this.timestamp = checkNotNull(timestamp, "timestamp"); - this.position = null; - } + public StartingPoint(InitialPositionInStream position) { + this.position = checkNotNull(position, "position"); + this.timestamp = null; + } - public InitialPositionInStream getPosition() { - return position; - } + public StartingPoint(Instant timestamp) { + this.timestamp = checkNotNull(timestamp, "timestamp"); + this.position = null; + } - public String getPositionName() { - return position != null ? position.name() : ShardIteratorType.AT_TIMESTAMP.name(); - } + public InitialPositionInStream getPosition() { + return position; + } - public Instant getTimestamp() { - return timestamp != null ? timestamp : null; - } + public String getPositionName() { + return position != null ? position.name() : ShardIteratorType.AT_TIMESTAMP.name(); + } - @Override - public boolean equals(Object o) { - if (this == o) { - return true; + public Instant getTimestamp() { + return timestamp != null ? timestamp : null; } - if (o == null || getClass() != o.getClass()) { - return false; + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + StartingPoint that = (StartingPoint) o; + return position == that.position && Objects.equals(timestamp, that.timestamp); } - StartingPoint that = (StartingPoint) o; - return position == that.position && Objects.equals(timestamp, that.timestamp); - } - @Override - public int hashCode() { - return Objects.hash(position, timestamp); - } + @Override + public int hashCode() { + return Objects.hash(position, timestamp); + } - @Override - public String toString() { - if (timestamp == null) { - return position.toString(); - } else { - return "Starting at timestamp " + timestamp; + @Override + public String toString() { + if (timestamp == null) { + return position.toString(); + } else { + return "Starting at timestamp " + timestamp; + } } - } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StaticCheckpointGenerator.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StaticCheckpointGenerator.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StaticCheckpointGenerator.java index 1ec865d..22dc973 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StaticCheckpointGenerator.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StaticCheckpointGenerator.java @@ -23,21 +23,20 @@ import static com.google.common.base.Preconditions.checkNotNull; * Always returns the same instance of checkpoint. */ class StaticCheckpointGenerator implements CheckpointGenerator { + private final KinesisReaderCheckpoint checkpoint; - private final KinesisReaderCheckpoint checkpoint; + public StaticCheckpointGenerator(KinesisReaderCheckpoint checkpoint) { + checkNotNull(checkpoint, "checkpoint"); + this.checkpoint = checkpoint; + } - public StaticCheckpointGenerator(KinesisReaderCheckpoint checkpoint) { - checkNotNull(checkpoint, "checkpoint"); - this.checkpoint = checkpoint; - } + @Override + public KinesisReaderCheckpoint generate(SimplifiedKinesisClient client) { + return checkpoint; + } - @Override - public KinesisReaderCheckpoint generate(SimplifiedKinesisClient client) { - return checkpoint; - } - - @Override - public String toString() { - return checkpoint.toString(); - } + @Override + public String toString() { + return checkpoint.toString(); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java index 68ca0d7..57ad8a8 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java @@ -23,8 +23,7 @@ import com.amazonaws.AmazonServiceException; * A transient exception thrown by Kinesis. */ class TransientKinesisException extends Exception { - - public TransientKinesisException(String s, AmazonServiceException e) { - super(s, e); - } + public TransientKinesisException(String s, AmazonServiceException e) { + super(s, e); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java index 994d6e3..046c9d9 100644 --- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java @@ -66,12 +66,10 @@ import com.amazonaws.services.kinesis.model.SplitShardRequest; import com.amazonaws.services.kinesis.model.SplitShardResult; import com.amazonaws.services.kinesis.model.StreamDescription; import com.google.common.base.Function; - import java.io.Serializable; import java.nio.ByteBuffer; import java.util.List; import javax.annotation.Nullable; - import org.apache.commons.lang.builder.EqualsBuilder; import org.joda.time.Instant; @@ -80,301 +78,298 @@ import org.joda.time.Instant; */ class AmazonKinesisMock implements AmazonKinesis { - static class TestData implements Serializable { + static class TestData implements Serializable { + private final String data; + private final Instant arrivalTimestamp; + private final String sequenceNumber; + + public TestData(KinesisRecord record) { + this(new String(record.getData().array()), + record.getApproximateArrivalTimestamp(), + record.getSequenceNumber()); + } + + public TestData(String data, Instant arrivalTimestamp, String sequenceNumber) { + this.data = data; + this.arrivalTimestamp = arrivalTimestamp; + this.sequenceNumber = sequenceNumber; + } + + public Record convertToRecord() { + return new Record(). + withApproximateArrivalTimestamp(arrivalTimestamp.toDate()). + withData(ByteBuffer.wrap(data.getBytes())). + withSequenceNumber(sequenceNumber). + withPartitionKey(""); + } + + @Override + public boolean equals(Object obj) { + return EqualsBuilder.reflectionEquals(this, obj); + } + + @Override + public int hashCode() { + return reflectionHashCode(this); + } + } + + static class Provider implements KinesisClientProvider { + + private final List<List<TestData>> shardedData; + private final int numberOfRecordsPerGet; + + public Provider(List<List<TestData>> shardedData, int numberOfRecordsPerGet) { + this.shardedData = shardedData; + this.numberOfRecordsPerGet = numberOfRecordsPerGet; + } + + @Override + public AmazonKinesis get() { + return new AmazonKinesisMock(transform(shardedData, + new Function<List<TestData>, List<Record>>() { + @Override + public List<Record> apply(@Nullable List<TestData> testDatas) { + return transform(testDatas, new Function<TestData, Record>() { + @Override + public Record apply(@Nullable TestData testData) { + return testData.convertToRecord(); + } + }); + } + }), numberOfRecordsPerGet); + } + } - private final String data; - private final Instant arrivalTimestamp; - private final String sequenceNumber; + private final List<List<Record>> shardedData; + private final int numberOfRecordsPerGet; - public TestData(KinesisRecord record) { - this(new String(record.getData().array()), - record.getApproximateArrivalTimestamp(), - record.getSequenceNumber()); + public AmazonKinesisMock(List<List<Record>> shardedData, int numberOfRecordsPerGet) { + this.shardedData = shardedData; + this.numberOfRecordsPerGet = numberOfRecordsPerGet; } - public TestData(String data, Instant arrivalTimestamp, String sequenceNumber) { - this.data = data; - this.arrivalTimestamp = arrivalTimestamp; - this.sequenceNumber = sequenceNumber; + @Override + public GetRecordsResult getRecords(GetRecordsRequest getRecordsRequest) { + String[] shardIteratorParts = getRecordsRequest.getShardIterator().split(":"); + int shardId = parseInt(shardIteratorParts[0]); + int startingRecord = parseInt(shardIteratorParts[1]); + List<Record> shardData = shardedData.get(shardId); + + int toIndex = min(startingRecord + numberOfRecordsPerGet, shardData.size()); + int fromIndex = min(startingRecord, toIndex); + return new GetRecordsResult(). + withRecords(shardData.subList(fromIndex, toIndex)). + withNextShardIterator(String.format("%s:%s", shardId, toIndex)); } - public Record convertToRecord() { - return new Record(). - withApproximateArrivalTimestamp(arrivalTimestamp.toDate()). - withData(ByteBuffer.wrap(data.getBytes())). - withSequenceNumber(sequenceNumber). - withPartitionKey(""); + @Override + public GetShardIteratorResult getShardIterator( + GetShardIteratorRequest getShardIteratorRequest) { + ShardIteratorType shardIteratorType = ShardIteratorType.fromValue( + getShardIteratorRequest.getShardIteratorType()); + + String shardIterator; + if (shardIteratorType == ShardIteratorType.TRIM_HORIZON) { + shardIterator = String.format("%s:%s", getShardIteratorRequest.getShardId(), 0); + } else { + throw new RuntimeException("Not implemented"); + } + + return new GetShardIteratorResult().withShardIterator(shardIterator); } @Override - public boolean equals(Object obj) { - return EqualsBuilder.reflectionEquals(this, obj); + public DescribeStreamResult describeStream(String streamName, String exclusiveStartShardId) { + int nextShardId = 0; + if (exclusiveStartShardId != null) { + nextShardId = parseInt(exclusiveStartShardId) + 1; + } + boolean hasMoreShards = nextShardId + 1 < shardedData.size(); + + List<Shard> shards = newArrayList(); + if (nextShardId < shardedData.size()) { + shards.add(new Shard().withShardId(Integer.toString(nextShardId))); + } + + return new DescribeStreamResult().withStreamDescription( + new StreamDescription().withHasMoreShards(hasMoreShards).withShards(shards) + ); } @Override - public int hashCode() { - return reflectionHashCode(this); + public void setEndpoint(String endpoint) { + } - } - static class Provider implements KinesisClientProvider { + @Override + public void setRegion(Region region) { - private final List<List<TestData>> shardedData; - private final int numberOfRecordsPerGet; + } + + @Override + public AddTagsToStreamResult addTagsToStream(AddTagsToStreamRequest addTagsToStreamRequest) { + throw new RuntimeException("Not implemented"); + } + + @Override + public CreateStreamResult createStream(CreateStreamRequest createStreamRequest) { + throw new RuntimeException("Not implemented"); + } + + @Override + public CreateStreamResult createStream(String streamName, Integer shardCount) { + throw new RuntimeException("Not implemented"); + } + + @Override + public DecreaseStreamRetentionPeriodResult decreaseStreamRetentionPeriod( + DecreaseStreamRetentionPeriodRequest decreaseStreamRetentionPeriodRequest) { + throw new RuntimeException("Not implemented"); + } + + @Override + public DeleteStreamResult deleteStream(DeleteStreamRequest deleteStreamRequest) { + throw new RuntimeException("Not implemented"); + } + + @Override + public DeleteStreamResult deleteStream(String streamName) { + throw new RuntimeException("Not implemented"); + } + + @Override + public DescribeStreamResult describeStream(DescribeStreamRequest describeStreamRequest) { + throw new RuntimeException("Not implemented"); + } + + @Override + public DescribeStreamResult describeStream(String streamName) { + + throw new RuntimeException("Not implemented"); + } + + @Override + public DescribeStreamResult describeStream(String streamName, + Integer limit, String exclusiveStartShardId) { + throw new RuntimeException("Not implemented"); + } + + @Override + public DisableEnhancedMonitoringResult disableEnhancedMonitoring( + DisableEnhancedMonitoringRequest disableEnhancedMonitoringRequest) { + throw new RuntimeException("Not implemented"); + } + + @Override + public EnableEnhancedMonitoringResult enableEnhancedMonitoring( + EnableEnhancedMonitoringRequest enableEnhancedMonitoringRequest) { + throw new RuntimeException("Not implemented"); + } - public Provider(List<List<TestData>> shardedData, int numberOfRecordsPerGet) { - this.shardedData = shardedData; - this.numberOfRecordsPerGet = numberOfRecordsPerGet; + @Override + public GetShardIteratorResult getShardIterator(String streamName, + String shardId, + String shardIteratorType) { + throw new RuntimeException("Not implemented"); + } + + @Override + public GetShardIteratorResult getShardIterator(String streamName, + String shardId, + String shardIteratorType, + String startingSequenceNumber) { + throw new RuntimeException("Not implemented"); + } + + @Override + public IncreaseStreamRetentionPeriodResult increaseStreamRetentionPeriod( + IncreaseStreamRetentionPeriodRequest increaseStreamRetentionPeriodRequest) { + throw new RuntimeException("Not implemented"); + } + + @Override + public ListStreamsResult listStreams(ListStreamsRequest listStreamsRequest) { + throw new RuntimeException("Not implemented"); + } + + @Override + public ListStreamsResult listStreams() { + throw new RuntimeException("Not implemented"); + } + + @Override + public ListStreamsResult listStreams(String exclusiveStartStreamName) { + throw new RuntimeException("Not implemented"); + } + + @Override + public ListStreamsResult listStreams(Integer limit, String exclusiveStartStreamName) { + throw new RuntimeException("Not implemented"); + } + + @Override + public ListTagsForStreamResult listTagsForStream( + ListTagsForStreamRequest listTagsForStreamRequest) { + throw new RuntimeException("Not implemented"); + } + + @Override + public MergeShardsResult mergeShards(MergeShardsRequest mergeShardsRequest) { + throw new RuntimeException("Not implemented"); } @Override - public AmazonKinesis get() { - return new AmazonKinesisMock(transform(shardedData, - new Function<List<TestData>, List<Record>>() { + public MergeShardsResult mergeShards(String streamName, + String shardToMerge, String adjacentShardToMerge) { + throw new RuntimeException("Not implemented"); + } - @Override - public List<Record> apply(@Nullable List<TestData> testDatas) { - return transform(testDatas, new Function<TestData, Record>() { + @Override + public PutRecordResult putRecord(PutRecordRequest putRecordRequest) { + throw new RuntimeException("Not implemented"); + } - @Override - public Record apply(@Nullable TestData testData) { - return testData.convertToRecord(); - } - }); - } - }), numberOfRecordsPerGet); + @Override + public PutRecordResult putRecord(String streamName, ByteBuffer data, String partitionKey) { + throw new RuntimeException("Not implemented"); } - } - private final List<List<Record>> shardedData; - private final int numberOfRecordsPerGet; + @Override + public PutRecordResult putRecord(String streamName, ByteBuffer data, + String partitionKey, String sequenceNumberForOrdering) { + throw new RuntimeException("Not implemented"); + } - public AmazonKinesisMock(List<List<Record>> shardedData, int numberOfRecordsPerGet) { - this.shardedData = shardedData; - this.numberOfRecordsPerGet = numberOfRecordsPerGet; - } + @Override + public PutRecordsResult putRecords(PutRecordsRequest putRecordsRequest) { + throw new RuntimeException("Not implemented"); + } - @Override - public GetRecordsResult getRecords(GetRecordsRequest getRecordsRequest) { - String[] shardIteratorParts = getRecordsRequest.getShardIterator().split(":"); - int shardId = parseInt(shardIteratorParts[0]); - int startingRecord = parseInt(shardIteratorParts[1]); - List<Record> shardData = shardedData.get(shardId); + @Override + public RemoveTagsFromStreamResult removeTagsFromStream( + RemoveTagsFromStreamRequest removeTagsFromStreamRequest) { + throw new RuntimeException("Not implemented"); + } - int toIndex = min(startingRecord + numberOfRecordsPerGet, shardData.size()); - int fromIndex = min(startingRecord, toIndex); - return new GetRecordsResult(). - withRecords(shardData.subList(fromIndex, toIndex)). - withNextShardIterator(String.format("%s:%s", shardId, toIndex)); - } + @Override + public SplitShardResult splitShard(SplitShardRequest splitShardRequest) { + throw new RuntimeException("Not implemented"); + } + + @Override + public SplitShardResult splitShard(String streamName, + String shardToSplit, String newStartingHashKey) { + throw new RuntimeException("Not implemented"); + } - @Override - public GetShardIteratorResult getShardIterator( - GetShardIteratorRequest getShardIteratorRequest) { - ShardIteratorType shardIteratorType = ShardIteratorType.fromValue( - getShardIteratorRequest.getShardIteratorType()); - - String shardIterator; - if (shardIteratorType == ShardIteratorType.TRIM_HORIZON) { - shardIterator = String.format("%s:%s", getShardIteratorRequest.getShardId(), 0); - } else { - throw new RuntimeException("Not implemented"); - } - - return new GetShardIteratorResult().withShardIterator(shardIterator); - } - - @Override - public DescribeStreamResult describeStream(String streamName, String exclusiveStartShardId) { - int nextShardId = 0; - if (exclusiveStartShardId != null) { - nextShardId = parseInt(exclusiveStartShardId) + 1; - } - boolean hasMoreShards = nextShardId + 1 < shardedData.size(); - - List<Shard> shards = newArrayList(); - if (nextShardId < shardedData.size()) { - shards.add(new Shard().withShardId(Integer.toString(nextShardId))); - } - - return new DescribeStreamResult().withStreamDescription( - new StreamDescription().withHasMoreShards(hasMoreShards).withShards(shards) - ); - } - - @Override - public void setEndpoint(String endpoint) { - - } - - @Override - public void setRegion(Region region) { - - } - - @Override - public AddTagsToStreamResult addTagsToStream(AddTagsToStreamRequest addTagsToStreamRequest) { - throw new RuntimeException("Not implemented"); - } - - @Override - public CreateStreamResult createStream(CreateStreamRequest createStreamRequest) { - throw new RuntimeException("Not implemented"); - } - - @Override - public CreateStreamResult createStream(String streamName, Integer shardCount) { - throw new RuntimeException("Not implemented"); - } - - @Override - public DecreaseStreamRetentionPeriodResult decreaseStreamRetentionPeriod( - DecreaseStreamRetentionPeriodRequest decreaseStreamRetentionPeriodRequest) { - throw new RuntimeException("Not implemented"); - } - - @Override - public DeleteStreamResult deleteStream(DeleteStreamRequest deleteStreamRequest) { - throw new RuntimeException("Not implemented"); - } - - @Override - public DeleteStreamResult deleteStream(String streamName) { - throw new RuntimeException("Not implemented"); - } - - @Override - public DescribeStreamResult describeStream(DescribeStreamRequest describeStreamRequest) { - throw new RuntimeException("Not implemented"); - } - - @Override - public DescribeStreamResult describeStream(String streamName) { - - throw new RuntimeException("Not implemented"); - } - - @Override - public DescribeStreamResult describeStream(String streamName, - Integer limit, String exclusiveStartShardId) { - throw new RuntimeException("Not implemented"); - } - - @Override - public DisableEnhancedMonitoringResult disableEnhancedMonitoring( - DisableEnhancedMonitoringRequest disableEnhancedMonitoringRequest) { - throw new RuntimeException("Not implemented"); - } - - @Override - public EnableEnhancedMonitoringResult enableEnhancedMonitoring( - EnableEnhancedMonitoringRequest enableEnhancedMonitoringRequest) { - throw new RuntimeException("Not implemented"); - } - - @Override - public GetShardIteratorResult getShardIterator(String streamName, - String shardId, - String shardIteratorType) { - throw new RuntimeException("Not implemented"); - } - - @Override - public GetShardIteratorResult getShardIterator(String streamName, - String shardId, - String shardIteratorType, - String startingSequenceNumber) { - throw new RuntimeException("Not implemented"); - } - - @Override - public IncreaseStreamRetentionPeriodResult increaseStreamRetentionPeriod( - IncreaseStreamRetentionPeriodRequest increaseStreamRetentionPeriodRequest) { - throw new RuntimeException("Not implemented"); - } - - @Override - public ListStreamsResult listStreams(ListStreamsRequest listStreamsRequest) { - throw new RuntimeException("Not implemented"); - } - - @Override - public ListStreamsResult listStreams() { - throw new RuntimeException("Not implemented"); - } - - @Override - public ListStreamsResult listStreams(String exclusiveStartStreamName) { - throw new RuntimeException("Not implemented"); - } - - @Override - public ListStreamsResult listStreams(Integer limit, String exclusiveStartStreamName) { - throw new RuntimeException("Not implemented"); - } - - @Override - public ListTagsForStreamResult listTagsForStream( - ListTagsForStreamRequest listTagsForStreamRequest) { - throw new RuntimeException("Not implemented"); - } - - @Override - public MergeShardsResult mergeShards(MergeShardsRequest mergeShardsRequest) { - throw new RuntimeException("Not implemented"); - } - - @Override - public MergeShardsResult mergeShards(String streamName, - String shardToMerge, String adjacentShardToMerge) { - throw new RuntimeException("Not implemented"); - } - - @Override - public PutRecordResult putRecord(PutRecordRequest putRecordRequest) { - throw new RuntimeException("Not implemented"); - } - - @Override - public PutRecordResult putRecord(String streamName, ByteBuffer data, String partitionKey) { - throw new RuntimeException("Not implemented"); - } - - @Override - public PutRecordResult putRecord(String streamName, ByteBuffer data, - String partitionKey, String sequenceNumberForOrdering) { - throw new RuntimeException("Not implemented"); - } - - @Override - public PutRecordsResult putRecords(PutRecordsRequest putRecordsRequest) { - throw new RuntimeException("Not implemented"); - } - - @Override - public RemoveTagsFromStreamResult removeTagsFromStream( - RemoveTagsFromStreamRequest removeTagsFromStreamRequest) { - throw new RuntimeException("Not implemented"); - } - - @Override - public SplitShardResult splitShard(SplitShardRequest splitShardRequest) { - throw new RuntimeException("Not implemented"); - } - - @Override - public SplitShardResult splitShard(String streamName, - String shardToSplit, String newStartingHashKey) { - throw new RuntimeException("Not implemented"); - } - - @Override - public void shutdown() { - - } - - @Override - public ResponseMetadata getCachedResponseMetadata(AmazonWebServiceRequest request) { - throw new RuntimeException("Not implemented"); - } + @Override + public void shutdown() { + + } + + @Override + public ResponseMetadata getCachedResponseMetadata(AmazonWebServiceRequest request) { + throw new RuntimeException("Not implemented"); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java index 0b16bb7..00acffe 100644 --- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java @@ -18,27 +18,24 @@ package org.apache.beam.sdk.io.kinesis; import com.google.common.testing.EqualsTester; - import java.util.NoSuchElementException; - import org.junit.Test; /** * Tests {@link CustomOptional}. */ public class CustomOptionalTest { + @Test(expected = NoSuchElementException.class) + public void absentThrowsNoSuchElementExceptionOnGet() { + CustomOptional.absent().get(); + } - @Test(expected = NoSuchElementException.class) - public void absentThrowsNoSuchElementExceptionOnGet() { - CustomOptional.absent().get(); - } - - @Test - public void testEqualsAndHashCode() { - new EqualsTester() - .addEqualityGroup(CustomOptional.absent(), CustomOptional.absent()) - .addEqualityGroup(CustomOptional.of(3), CustomOptional.of(3)) - .addEqualityGroup(CustomOptional.of(11)) - .addEqualityGroup(CustomOptional.of("3")).testEquals(); - } + @Test + public void testEqualsAndHashCode() { + new EqualsTester() + .addEqualityGroup(CustomOptional.absent(), CustomOptional.absent()) + .addEqualityGroup(CustomOptional.of(3), CustomOptional.of(3)) + .addEqualityGroup(CustomOptional.of(11)) + .addEqualityGroup(CustomOptional.of("3")).testEquals(); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java index 1bb9717..c92ac9a 100644 --- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java @@ -28,29 +28,30 @@ import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; + /*** */ @RunWith(MockitoJUnitRunner.class) public class DynamicCheckpointGeneratorTest { - @Mock - private SimplifiedKinesisClient kinesisClient; - @Mock - private Shard shard1, shard2, shard3; + @Mock + private SimplifiedKinesisClient kinesisClient; + @Mock + private Shard shard1, shard2, shard3; - @Test - public void shouldMapAllShardsToCheckpoints() throws Exception { - given(shard1.getShardId()).willReturn("shard-01"); - given(shard2.getShardId()).willReturn("shard-02"); - given(shard3.getShardId()).willReturn("shard-03"); - given(kinesisClient.listShards("stream")).willReturn(asList(shard1, shard2, shard3)); + @Test + public void shouldMapAllShardsToCheckpoints() throws Exception { + given(shard1.getShardId()).willReturn("shard-01"); + given(shard2.getShardId()).willReturn("shard-02"); + given(shard3.getShardId()).willReturn("shard-03"); + given(kinesisClient.listShards("stream")).willReturn(asList(shard1, shard2, shard3)); - StartingPoint startingPoint = new StartingPoint(InitialPositionInStream.LATEST); - DynamicCheckpointGenerator underTest = new DynamicCheckpointGenerator("stream", - startingPoint); + StartingPoint startingPoint = new StartingPoint(InitialPositionInStream.LATEST); + DynamicCheckpointGenerator underTest = new DynamicCheckpointGenerator("stream", + startingPoint); - KinesisReaderCheckpoint checkpoint = underTest.generate(kinesisClient); + KinesisReaderCheckpoint checkpoint = underTest.generate(kinesisClient); - assertThat(checkpoint).hasSize(3); - } + assertThat(checkpoint).hasSize(3); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java index 44ad67d..567e25f 100644 --- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java @@ -21,9 +21,7 @@ import static com.google.common.collect.Lists.newArrayList; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; import com.google.common.collect.Iterables; - import java.util.List; - import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.DoFn; @@ -38,60 +36,59 @@ import org.junit.Test; */ public class KinesisMockReadTest { - @Rule - public final transient TestPipeline p = TestPipeline.create(); - - @Test - public void readsDataFromMockKinesis() { - int noOfShards = 3; - int noOfEventsPerShard = 100; - List<List<AmazonKinesisMock.TestData>> testData = - provideTestData(noOfShards, noOfEventsPerShard); - - PCollection<AmazonKinesisMock.TestData> result = p - .apply( - KinesisIO.read() - .from("stream", InitialPositionInStream.TRIM_HORIZON) - .withClientProvider(new AmazonKinesisMock.Provider(testData, 10)) - .withMaxNumRecords(noOfShards * noOfEventsPerShard)) - .apply(ParDo.of(new KinesisRecordToTestData())); - PAssert.that(result).containsInAnyOrder(Iterables.concat(testData)); - p.run(); - } - - private static class KinesisRecordToTestData extends - DoFn<KinesisRecord, AmazonKinesisMock.TestData> { + @Rule + public final transient TestPipeline p = TestPipeline.create(); + + @Test + public void readsDataFromMockKinesis() { + int noOfShards = 3; + int noOfEventsPerShard = 100; + List<List<AmazonKinesisMock.TestData>> testData = + provideTestData(noOfShards, noOfEventsPerShard); + + PCollection<AmazonKinesisMock.TestData> result = p + .apply( + KinesisIO.read() + .from("stream", InitialPositionInStream.TRIM_HORIZON) + .withClientProvider(new AmazonKinesisMock.Provider(testData, 10)) + .withMaxNumRecords(noOfShards * noOfEventsPerShard)) + .apply(ParDo.of(new KinesisRecordToTestData())); + PAssert.that(result).containsInAnyOrder(Iterables.concat(testData)); + p.run(); + } - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - c.output(new AmazonKinesisMock.TestData(c.element())); + private static class KinesisRecordToTestData extends + DoFn<KinesisRecord, AmazonKinesisMock.TestData> { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + c.output(new AmazonKinesisMock.TestData(c.element())); + } } - } - private List<List<AmazonKinesisMock.TestData>> provideTestData( - int noOfShards, - int noOfEventsPerShard) { + private List<List<AmazonKinesisMock.TestData>> provideTestData( + int noOfShards, + int noOfEventsPerShard) { - int seqNumber = 0; + int seqNumber = 0; - List<List<AmazonKinesisMock.TestData>> shardedData = newArrayList(); - for (int i = 0; i < noOfShards; ++i) { - List<AmazonKinesisMock.TestData> shardData = newArrayList(); - shardedData.add(shardData); + List<List<AmazonKinesisMock.TestData>> shardedData = newArrayList(); + for (int i = 0; i < noOfShards; ++i) { + List<AmazonKinesisMock.TestData> shardData = newArrayList(); + shardedData.add(shardData); - DateTime arrival = DateTime.now(); - for (int j = 0; j < noOfEventsPerShard; ++j) { - arrival = arrival.plusSeconds(1); + DateTime arrival = DateTime.now(); + for (int j = 0; j < noOfEventsPerShard; ++j) { + arrival = arrival.plusSeconds(1); - seqNumber++; - shardData.add(new AmazonKinesisMock.TestData( - Integer.toString(seqNumber), - arrival.toInstant(), - Integer.toString(seqNumber)) - ); - } - } + seqNumber++; + shardData.add(new AmazonKinesisMock.TestData( + Integer.toString(seqNumber), + arrival.toInstant(), + Integer.toString(seqNumber)) + ); + } + } - return shardedData; - } + return shardedData; + } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java index 1038a47..8c8da64 100644 --- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java @@ -17,14 +17,13 @@ */ package org.apache.beam.sdk.io.kinesis; + import static java.util.Arrays.asList; import static org.assertj.core.api.Assertions.assertThat; import com.google.common.collect.Iterables; - import java.util.Iterator; import java.util.List; - import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -36,34 +35,33 @@ import org.mockito.runners.MockitoJUnitRunner; */ @RunWith(MockitoJUnitRunner.class) public class KinesisReaderCheckpointTest { + @Mock + private ShardCheckpoint a, b, c; - @Mock - private ShardCheckpoint a, b, c; - - private KinesisReaderCheckpoint checkpoint; + private KinesisReaderCheckpoint checkpoint; - @Before - public void setUp() { - checkpoint = new KinesisReaderCheckpoint(asList(a, b, c)); - } + @Before + public void setUp() { + checkpoint = new KinesisReaderCheckpoint(asList(a, b, c)); + } - @Test - public void splitsCheckpointAccordingly() { - verifySplitInto(1); - verifySplitInto(2); - verifySplitInto(3); - verifySplitInto(4); - } + @Test + public void splitsCheckpointAccordingly() { + verifySplitInto(1); + verifySplitInto(2); + verifySplitInto(3); + verifySplitInto(4); + } - @Test(expected = UnsupportedOperationException.class) - public void isImmutable() { - Iterator<ShardCheckpoint> iterator = checkpoint.iterator(); - iterator.remove(); - } + @Test(expected = UnsupportedOperationException.class) + public void isImmutable() { + Iterator<ShardCheckpoint> iterator = checkpoint.iterator(); + iterator.remove(); + } - private void verifySplitInto(int size) { - List<KinesisReaderCheckpoint> split = checkpoint.splitInto(size); - assertThat(Iterables.concat(split)).containsOnly(a, b, c); - assertThat(split).hasSize(Math.min(size, 3)); - } + private void verifySplitInto(int size) { + List<KinesisReaderCheckpoint> split = checkpoint.splitInto(size); + assertThat(Iterables.concat(split)).containsOnly(a, b, c); + assertThat(split).hasSize(Math.min(size, 3)); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java index 5781033..8eb6546 100644 --- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java @@ -23,7 +23,6 @@ import static java.util.concurrent.Executors.newSingleThreadExecutor; import static org.assertj.core.api.Assertions.assertThat; import com.amazonaws.regions.Regions; - import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.List; @@ -32,7 +31,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; - import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; @@ -52,75 +50,72 @@ import org.junit.Test; * You need to provide all {@link KinesisTestOptions} in order to run this. */ public class KinesisReaderIT { - - private static final long PIPELINE_STARTUP_TIME = TimeUnit.SECONDS.toMillis(10); - private ExecutorService singleThreadExecutor = newSingleThreadExecutor(); - - @Rule - public final transient TestPipeline p = TestPipeline.create(); - - @Ignore - @Test - public void readsDataFromRealKinesisStream() - throws IOException, InterruptedException, ExecutionException { - KinesisTestOptions options = readKinesisOptions(); - List<String> testData = prepareTestData(1000); - - Future<?> future = startTestPipeline(testData, options); - KinesisUploader.uploadAll(testData, options); - future.get(); - } - - private List<String> prepareTestData(int count) { - List<String> data = newArrayList(); - for (int i = 0; i < count; ++i) { - data.add(RandomStringUtils.randomAlphabetic(32)); + private static final long PIPELINE_STARTUP_TIME = TimeUnit.SECONDS.toMillis(10); + private ExecutorService singleThreadExecutor = newSingleThreadExecutor(); + + @Rule + public final transient TestPipeline p = TestPipeline.create(); + + @Ignore + @Test + public void readsDataFromRealKinesisStream() + throws IOException, InterruptedException, ExecutionException { + KinesisTestOptions options = readKinesisOptions(); + List<String> testData = prepareTestData(1000); + + Future<?> future = startTestPipeline(testData, options); + KinesisUploader.uploadAll(testData, options); + future.get(); } - return data; - } - private Future<?> startTestPipeline(List<String> testData, KinesisTestOptions options) - throws InterruptedException { - - PCollection<String> result = p. - apply(KinesisIO.read() - .from(options.getAwsKinesisStream(), Instant.now()) - .withClientProvider(options.getAwsAccessKey(), options.getAwsSecretKey(), - Regions.fromName(options.getAwsKinesisRegion())) - .withMaxReadTime(Duration.standardMinutes(3)) - ). - apply(ParDo.of(new RecordDataToString())); - PAssert.that(result).containsInAnyOrder(testData); - - Future<?> future = singleThreadExecutor.submit(new Callable<Void>() { - - @Override - public Void call() throws Exception { - PipelineResult result = p.run(); - PipelineResult.State state = result.getState(); - while (state != PipelineResult.State.DONE && state != PipelineResult.State.FAILED) { - Thread.sleep(1000); - state = result.getState(); + private List<String> prepareTestData(int count) { + List<String> data = newArrayList(); + for (int i = 0; i < count; ++i) { + data.add(RandomStringUtils.randomAlphabetic(32)); } - assertThat(state).isEqualTo(PipelineResult.State.DONE); - return null; - } - }); - Thread.sleep(PIPELINE_STARTUP_TIME); - return future; - } + return data; + } - private KinesisTestOptions readKinesisOptions() { - PipelineOptionsFactory.register(KinesisTestOptions.class); - return TestPipeline.testingPipelineOptions().as(KinesisTestOptions.class); - } + private Future<?> startTestPipeline(List<String> testData, KinesisTestOptions options) + throws InterruptedException { + + PCollection<String> result = p. + apply(KinesisIO.read() + .from(options.getAwsKinesisStream(), Instant.now()) + .withClientProvider(options.getAwsAccessKey(), options.getAwsSecretKey(), + Regions.fromName(options.getAwsKinesisRegion())) + .withMaxReadTime(Duration.standardMinutes(3)) + ). + apply(ParDo.of(new RecordDataToString())); + PAssert.that(result).containsInAnyOrder(testData); + + Future<?> future = singleThreadExecutor.submit(new Callable<Void>() { + @Override + public Void call() throws Exception { + PipelineResult result = p.run(); + PipelineResult.State state = result.getState(); + while (state != PipelineResult.State.DONE && state != PipelineResult.State.FAILED) { + Thread.sleep(1000); + state = result.getState(); + } + assertThat(state).isEqualTo(PipelineResult.State.DONE); + return null; + } + }); + Thread.sleep(PIPELINE_STARTUP_TIME); + return future; + } - private static class RecordDataToString extends DoFn<KinesisRecord, String> { + private KinesisTestOptions readKinesisOptions() { + PipelineOptionsFactory.register(KinesisTestOptions.class); + return TestPipeline.testingPipelineOptions().as(KinesisTestOptions.class); + } - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - checkNotNull(c.element(), "Null record given"); - c.output(new String(c.element().getData().array(), StandardCharsets.UTF_8)); + private static class RecordDataToString extends DoFn<KinesisRecord, String> { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + checkNotNull(c.element(), "Null record given"); + c.output(new String(c.element().getData().array(), StandardCharsets.UTF_8)); + } } - } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java index a26501a..3111029 100644 --- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java @@ -23,7 +23,6 @@ import static org.mockito.Mockito.when; import java.io.IOException; import java.util.NoSuchElementException; - import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -35,88 +34,87 @@ import org.mockito.runners.MockitoJUnitRunner; */ @RunWith(MockitoJUnitRunner.class) public class KinesisReaderTest { - - @Mock - private SimplifiedKinesisClient kinesis; - @Mock - private CheckpointGenerator generator; - @Mock - private ShardCheckpoint firstCheckpoint, secondCheckpoint; - @Mock - private ShardRecordsIterator firstIterator, secondIterator; - @Mock - private KinesisRecord a, b, c, d; - - private KinesisReader reader; - - @Before - public void setUp() throws IOException, TransientKinesisException { - when(generator.generate(kinesis)).thenReturn(new KinesisReaderCheckpoint( - asList(firstCheckpoint, secondCheckpoint) - )); - when(firstCheckpoint.getShardRecordsIterator(kinesis)).thenReturn(firstIterator); - when(secondCheckpoint.getShardRecordsIterator(kinesis)).thenReturn(secondIterator); - when(firstIterator.next()).thenReturn(CustomOptional.<KinesisRecord>absent()); - when(secondIterator.next()).thenReturn(CustomOptional.<KinesisRecord>absent()); - - reader = new KinesisReader(kinesis, generator, null); - } - - @Test - public void startReturnsFalseIfNoDataAtTheBeginning() throws IOException { - assertThat(reader.start()).isFalse(); - } - - @Test(expected = NoSuchElementException.class) - public void throwsNoSuchElementExceptionIfNoData() throws IOException { - reader.start(); - reader.getCurrent(); - } - - @Test - public void startReturnsTrueIfSomeDataAvailable() throws IOException, - TransientKinesisException { - when(firstIterator.next()). - thenReturn(CustomOptional.of(a)). - thenReturn(CustomOptional.<KinesisRecord>absent()); - - assertThat(reader.start()).isTrue(); - } - - @Test - public void advanceReturnsFalseIfThereIsTransientExceptionInKinesis() - throws IOException, TransientKinesisException { - reader.start(); - - when(firstIterator.next()).thenThrow(TransientKinesisException.class); - - assertThat(reader.advance()).isFalse(); - } - - @Test - public void readsThroughAllDataAvailable() throws IOException, TransientKinesisException { - when(firstIterator.next()). - thenReturn(CustomOptional.<KinesisRecord>absent()). - thenReturn(CustomOptional.of(a)). - thenReturn(CustomOptional.<KinesisRecord>absent()). - thenReturn(CustomOptional.of(b)). - thenReturn(CustomOptional.<KinesisRecord>absent()); - - when(secondIterator.next()). - thenReturn(CustomOptional.of(c)). - thenReturn(CustomOptional.<KinesisRecord>absent()). - thenReturn(CustomOptional.of(d)). - thenReturn(CustomOptional.<KinesisRecord>absent()); - - assertThat(reader.start()).isTrue(); - assertThat(reader.getCurrent()).isEqualTo(c); - assertThat(reader.advance()).isTrue(); - assertThat(reader.getCurrent()).isEqualTo(a); - assertThat(reader.advance()).isTrue(); - assertThat(reader.getCurrent()).isEqualTo(d); - assertThat(reader.advance()).isTrue(); - assertThat(reader.getCurrent()).isEqualTo(b); - assertThat(reader.advance()).isFalse(); - } + @Mock + private SimplifiedKinesisClient kinesis; + @Mock + private CheckpointGenerator generator; + @Mock + private ShardCheckpoint firstCheckpoint, secondCheckpoint; + @Mock + private ShardRecordsIterator firstIterator, secondIterator; + @Mock + private KinesisRecord a, b, c, d; + + private KinesisReader reader; + + @Before + public void setUp() throws IOException, TransientKinesisException { + when(generator.generate(kinesis)).thenReturn(new KinesisReaderCheckpoint( + asList(firstCheckpoint, secondCheckpoint) + )); + when(firstCheckpoint.getShardRecordsIterator(kinesis)).thenReturn(firstIterator); + when(secondCheckpoint.getShardRecordsIterator(kinesis)).thenReturn(secondIterator); + when(firstIterator.next()).thenReturn(CustomOptional.<KinesisRecord>absent()); + when(secondIterator.next()).thenReturn(CustomOptional.<KinesisRecord>absent()); + + reader = new KinesisReader(kinesis, generator, null); + } + + @Test + public void startReturnsFalseIfNoDataAtTheBeginning() throws IOException { + assertThat(reader.start()).isFalse(); + } + + @Test(expected = NoSuchElementException.class) + public void throwsNoSuchElementExceptionIfNoData() throws IOException { + reader.start(); + reader.getCurrent(); + } + + @Test + public void startReturnsTrueIfSomeDataAvailable() throws IOException, + TransientKinesisException { + when(firstIterator.next()). + thenReturn(CustomOptional.of(a)). + thenReturn(CustomOptional.<KinesisRecord>absent()); + + assertThat(reader.start()).isTrue(); + } + + @Test + public void advanceReturnsFalseIfThereIsTransientExceptionInKinesis() + throws IOException, TransientKinesisException { + reader.start(); + + when(firstIterator.next()).thenThrow(TransientKinesisException.class); + + assertThat(reader.advance()).isFalse(); + } + + @Test + public void readsThroughAllDataAvailable() throws IOException, TransientKinesisException { + when(firstIterator.next()). + thenReturn(CustomOptional.<KinesisRecord>absent()). + thenReturn(CustomOptional.of(a)). + thenReturn(CustomOptional.<KinesisRecord>absent()). + thenReturn(CustomOptional.of(b)). + thenReturn(CustomOptional.<KinesisRecord>absent()); + + when(secondIterator.next()). + thenReturn(CustomOptional.of(c)). + thenReturn(CustomOptional.<KinesisRecord>absent()). + thenReturn(CustomOptional.of(d)). + thenReturn(CustomOptional.<KinesisRecord>absent()); + + assertThat(reader.start()).isTrue(); + assertThat(reader.getCurrent()).isEqualTo(c); + assertThat(reader.advance()).isTrue(); + assertThat(reader.getCurrent()).isEqualTo(a); + assertThat(reader.advance()).isTrue(); + assertThat(reader.getCurrent()).isEqualTo(d); + assertThat(reader.advance()).isTrue(); + assertThat(reader.getCurrent()).isEqualTo(b); + assertThat(reader.advance()).isFalse(); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java index c9f01bb..8771c86 100644 --- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.io.kinesis; import java.nio.ByteBuffer; - import org.apache.beam.sdk.testing.CoderProperties; import org.joda.time.Instant; import org.junit.Test; @@ -27,21 +26,20 @@ import org.junit.Test; * Tests {@link KinesisRecordCoder}. */ public class KinesisRecordCoderTest { - - @Test - public void encodingAndDecodingWorks() throws Exception { - KinesisRecord record = new KinesisRecord( - ByteBuffer.wrap("data".getBytes()), - "sequence", - 128L, - "partition", - Instant.now(), - Instant.now(), - "stream", - "shard" - ); - CoderProperties.coderDecodeEncodeEqual( - new KinesisRecordCoder(), record - ); - } + @Test + public void encodingAndDecodingWorks() throws Exception { + KinesisRecord record = new KinesisRecord( + ByteBuffer.wrap("data".getBytes()), + "sequence", + 128L, + "partition", + Instant.now(), + Instant.now(), + "stream", + "shard" + ); + CoderProperties.coderDecodeEncodeEqual( + new KinesisRecordCoder(), record + ); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java index 76bcb27..324de46 100644 --- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java @@ -25,28 +25,23 @@ import org.apache.beam.sdk.testing.TestPipelineOptions; * Options for Kinesis integration tests. */ public interface KinesisTestOptions extends TestPipelineOptions { - - @Description("AWS region where Kinesis stream resided") - @Default.String("aws-kinesis-region") - String getAwsKinesisRegion(); - - void setAwsKinesisRegion(String value); - - @Description("Kinesis stream name") - @Default.String("aws-kinesis-stream") - String getAwsKinesisStream(); - - void setAwsKinesisStream(String value); - - @Description("AWS secret key") - @Default.String("aws-secret-key") - String getAwsSecretKey(); - - void setAwsSecretKey(String value); - - @Description("AWS access key") - @Default.String("aws-access-key") - String getAwsAccessKey(); - - void setAwsAccessKey(String value); + @Description("AWS region where Kinesis stream resided") + @Default.String("aws-kinesis-region") + String getAwsKinesisRegion(); + void setAwsKinesisRegion(String value); + + @Description("Kinesis stream name") + @Default.String("aws-kinesis-stream") + String getAwsKinesisStream(); + void setAwsKinesisStream(String value); + + @Description("AWS secret key") + @Default.String("aws-secret-key") + String getAwsSecretKey(); + void setAwsSecretKey(String value); + + @Description("AWS access key") + @Default.String("aws-access-key") + String getAwsAccessKey(); + void setAwsAccessKey(String value); } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java index 7a7cb02..7518ff7 100644 --- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java @@ -29,7 +29,6 @@ import com.amazonaws.services.kinesis.model.PutRecordsResult; import com.amazonaws.services.kinesis.model.PutRecordsResultEntry; import com.google.common.base.Charsets; import com.google.common.collect.Lists; - import java.nio.ByteBuffer; import java.util.List; @@ -38,46 +37,47 @@ import java.util.List; */ public class KinesisUploader { - public static final int MAX_NUMBER_OF_RECORDS_IN_BATCH = 499; + public static final int MAX_NUMBER_OF_RECORDS_IN_BATCH = 499; - public static void uploadAll(List<String> data, KinesisTestOptions options) { - AmazonKinesisClient client = new AmazonKinesisClient( - new StaticCredentialsProvider( - new BasicAWSCredentials( - options.getAwsAccessKey(), options.getAwsSecretKey())) - ).withRegion(Regions.fromName(options.getAwsKinesisRegion())); + public static void uploadAll(List<String> data, KinesisTestOptions options) { + AmazonKinesisClient client = new AmazonKinesisClient( + new StaticCredentialsProvider( + new BasicAWSCredentials( + options.getAwsAccessKey(), options.getAwsSecretKey())) + ).withRegion(Regions.fromName(options.getAwsKinesisRegion())); - List<List<String>> partitions = Lists.partition(data, MAX_NUMBER_OF_RECORDS_IN_BATCH); + List<List<String>> partitions = Lists.partition(data, MAX_NUMBER_OF_RECORDS_IN_BATCH); - for (List<String> partition : partitions) { - List<PutRecordsRequestEntry> allRecords = newArrayList(); - for (String row : partition) { - allRecords.add(new PutRecordsRequestEntry(). - withData(ByteBuffer.wrap(row.getBytes(Charsets.UTF_8))). - withPartitionKey(Integer.toString(row.hashCode())) - ); - } + for (List<String> partition : partitions) { + List<PutRecordsRequestEntry> allRecords = newArrayList(); + for (String row : partition) { + allRecords.add(new PutRecordsRequestEntry(). + withData(ByteBuffer.wrap(row.getBytes(Charsets.UTF_8))). + withPartitionKey(Integer.toString(row.hashCode())) - PutRecordsResult result; - do { - result = client.putRecords( - new PutRecordsRequest(). - withStreamName(options.getAwsKinesisStream()). - withRecords(allRecords)); - List<PutRecordsRequestEntry> failedRecords = newArrayList(); - int i = 0; - for (PutRecordsResultEntry row : result.getRecords()) { - if (row.getErrorCode() != null) { - failedRecords.add(allRecords.get(i)); - } - ++i; - } - allRecords = failedRecords; - } + ); + } - while (result.getFailedRecordCount() > 0); + PutRecordsResult result; + do { + result = client.putRecords( + new PutRecordsRequest(). + withStreamName(options.getAwsKinesisStream()). + withRecords(allRecords)); + List<PutRecordsRequestEntry> failedRecords = newArrayList(); + int i = 0; + for (PutRecordsResultEntry row : result.getRecords()) { + if (row.getErrorCode() != null) { + failedRecords.add(allRecords.get(i)); + } + ++i; + } + allRecords = failedRecords; + } + + while (result.getFailedRecordCount() > 0); + } } - } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java index cb32562..f979c01 100644 --- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java @@ -20,49 +20,47 @@ package org.apache.beam.sdk.io.kinesis; import static org.mockito.BDDMockito.given; import com.google.common.collect.Lists; - import java.util.Collections; import java.util.List; - import org.assertj.core.api.Assertions; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; + /*** */ @RunWith(MockitoJUnitRunner.class) public class RecordFilterTest { + @Mock + private ShardCheckpoint checkpoint; + @Mock + private KinesisRecord record1, record2, record3, record4, record5; - @Mock - private ShardCheckpoint checkpoint; - @Mock - private KinesisRecord record1, record2, record3, record4, record5; - - @Test - public void shouldFilterOutRecordsBeforeOrAtCheckpoint() { - given(checkpoint.isBeforeOrAt(record1)).willReturn(false); - given(checkpoint.isBeforeOrAt(record2)).willReturn(true); - given(checkpoint.isBeforeOrAt(record3)).willReturn(true); - given(checkpoint.isBeforeOrAt(record4)).willReturn(false); - given(checkpoint.isBeforeOrAt(record5)).willReturn(true); - List<KinesisRecord> records = Lists.newArrayList(record1, record2, - record3, record4, record5); - RecordFilter underTest = new RecordFilter(); + @Test + public void shouldFilterOutRecordsBeforeOrAtCheckpoint() { + given(checkpoint.isBeforeOrAt(record1)).willReturn(false); + given(checkpoint.isBeforeOrAt(record2)).willReturn(true); + given(checkpoint.isBeforeOrAt(record3)).willReturn(true); + given(checkpoint.isBeforeOrAt(record4)).willReturn(false); + given(checkpoint.isBeforeOrAt(record5)).willReturn(true); + List<KinesisRecord> records = Lists.newArrayList(record1, record2, + record3, record4, record5); + RecordFilter underTest = new RecordFilter(); - List<KinesisRecord> retainedRecords = underTest.apply(records, checkpoint); + List<KinesisRecord> retainedRecords = underTest.apply(records, checkpoint); - Assertions.assertThat(retainedRecords).containsOnly(record2, record3, record5); - } + Assertions.assertThat(retainedRecords).containsOnly(record2, record3, record5); + } - @Test - public void shouldNotFailOnEmptyList() { - List<KinesisRecord> records = Collections.emptyList(); - RecordFilter underTest = new RecordFilter(); + @Test + public void shouldNotFailOnEmptyList() { + List<KinesisRecord> records = Collections.emptyList(); + RecordFilter underTest = new RecordFilter(); - List<KinesisRecord> retainedRecords = underTest.apply(records, checkpoint); + List<KinesisRecord> retainedRecords = underTest.apply(records, checkpoint); - Assertions.assertThat(retainedRecords).isEmpty(); - } + Assertions.assertThat(retainedRecords).isEmpty(); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java index e4abce4..f032eea 100644 --- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java @@ -22,38 +22,36 @@ import static org.assertj.core.api.Assertions.assertThat; import java.util.Collections; import java.util.List; - import org.junit.Test; /** * Tests {@link RoundRobin}. */ public class RoundRobinTest { + @Test(expected = IllegalArgumentException.class) + public void doesNotAllowCreationWithEmptyCollection() { + new RoundRobin<>(Collections.emptyList()); + } - @Test(expected = IllegalArgumentException.class) - public void doesNotAllowCreationWithEmptyCollection() { - new RoundRobin<>(Collections.emptyList()); - } - - @Test - public void goesThroughElementsInCycle() { - List<String> input = newArrayList("a", "b", "c"); + @Test + public void goesThroughElementsInCycle() { + List<String> input = newArrayList("a", "b", "c"); - RoundRobin<String> roundRobin = new RoundRobin<>(newArrayList(input)); + RoundRobin<String> roundRobin = new RoundRobin<>(newArrayList(input)); - input.addAll(input); // duplicate the input - for (String element : input) { - assertThat(roundRobin.getCurrent()).isEqualTo(element); - assertThat(roundRobin.getCurrent()).isEqualTo(element); - roundRobin.moveForward(); + input.addAll(input); // duplicate the input + for (String element : input) { + assertThat(roundRobin.getCurrent()).isEqualTo(element); + assertThat(roundRobin.getCurrent()).isEqualTo(element); + roundRobin.moveForward(); + } } - } - @Test - public void usualIteratorGoesThroughElementsOnce() { - List<String> input = newArrayList("a", "b", "c"); + @Test + public void usualIteratorGoesThroughElementsOnce() { + List<String> input = newArrayList("a", "b", "c"); - RoundRobin<String> roundRobin = new RoundRobin<>(input); - assertThat(roundRobin).hasSize(3).containsOnly(input.toArray(new String[0])); - } + RoundRobin<String> roundRobin = new RoundRobin<>(input); + assertThat(roundRobin).hasSize(3).containsOnly(input.toArray(new String[0])); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java index d4784c4..39ab36f 100644 --- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java @@ -32,9 +32,7 @@ import static org.mockito.Mockito.when; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.model.ShardIteratorType; - import java.io.IOException; - import org.joda.time.DateTime; import org.joda.time.Instant; import org.junit.Before; @@ -48,105 +46,104 @@ import org.mockito.runners.MockitoJUnitRunner; */ @RunWith(MockitoJUnitRunner.class) public class ShardCheckpointTest { - - private static final String AT_SEQUENCE_SHARD_IT = "AT_SEQUENCE_SHARD_IT"; - private static final String AFTER_SEQUENCE_SHARD_IT = "AFTER_SEQUENCE_SHARD_IT"; - private static final String STREAM_NAME = "STREAM"; - private static final String SHARD_ID = "SHARD_ID"; - @Mock - private SimplifiedKinesisClient client; - - @Before - public void setUp() throws IOException, TransientKinesisException { - when(client.getShardIterator( - eq(STREAM_NAME), eq(SHARD_ID), eq(AT_SEQUENCE_NUMBER), - anyString(), isNull(Instant.class))). - thenReturn(AT_SEQUENCE_SHARD_IT); - when(client.getShardIterator( - eq(STREAM_NAME), eq(SHARD_ID), eq(AFTER_SEQUENCE_NUMBER), - anyString(), isNull(Instant.class))). - thenReturn(AFTER_SEQUENCE_SHARD_IT); - } - - @Test - public void testProvidingShardIterator() throws IOException, TransientKinesisException { - assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", null).getShardIterator(client)) - .isEqualTo(AT_SEQUENCE_SHARD_IT); - assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", null).getShardIterator(client)) - .isEqualTo(AFTER_SEQUENCE_SHARD_IT); - assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 10L).getShardIterator(client)).isEqualTo - (AT_SEQUENCE_SHARD_IT); - assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 10L).getShardIterator(client)) - .isEqualTo(AT_SEQUENCE_SHARD_IT); - } - - @Test - public void testComparisonWithExtendedSequenceNumber() { - assertThat(new ShardCheckpoint("", "", new StartingPoint(LATEST)).isBeforeOrAt( - recordWith(new ExtendedSequenceNumber("100", 0L)) - )).isTrue(); - - assertThat(new ShardCheckpoint("", "", new StartingPoint(TRIM_HORIZON)).isBeforeOrAt( - recordWith(new ExtendedSequenceNumber("100", 0L)) - )).isTrue(); - - assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "10", 1L).isBeforeOrAt( - recordWith(new ExtendedSequenceNumber("100", 0L)) - )).isTrue(); - - assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt( - recordWith(new ExtendedSequenceNumber("100", 0L)) - )).isTrue(); - - assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt( - recordWith(new ExtendedSequenceNumber("100", 0L)) - )).isFalse(); - - assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 1L).isBeforeOrAt( - recordWith(new ExtendedSequenceNumber("100", 0L)) - )).isFalse(); - - assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt( - recordWith(new ExtendedSequenceNumber("99", 1L)) - )).isFalse(); - } - - @Test - public void testComparisonWithTimestamp() { - DateTime referenceTimestamp = DateTime.now(); - - assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant()) - .isBeforeOrAt(recordWith(referenceTimestamp.minusMillis(10).toInstant())) - ).isFalse(); - - assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant()) - .isBeforeOrAt(recordWith(referenceTimestamp.toInstant())) - ).isTrue(); - - assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant()) - .isBeforeOrAt(recordWith(referenceTimestamp.plusMillis(10).toInstant())) - ).isTrue(); - } - - private KinesisRecord recordWith(ExtendedSequenceNumber extendedSequenceNumber) { - KinesisRecord record = mock(KinesisRecord.class); - given(record.getExtendedSequenceNumber()).willReturn(extendedSequenceNumber); - return record; - } - - private ShardCheckpoint checkpoint(ShardIteratorType iteratorType, String sequenceNumber, - Long subSequenceNumber) { - return new ShardCheckpoint(STREAM_NAME, SHARD_ID, iteratorType, sequenceNumber, - subSequenceNumber); - } - - private KinesisRecord recordWith(Instant approximateArrivalTimestamp) { - KinesisRecord record = mock(KinesisRecord.class); - given(record.getApproximateArrivalTimestamp()).willReturn(approximateArrivalTimestamp); - return record; - } - - private ShardCheckpoint checkpoint(ShardIteratorType iteratorType, Instant timestamp) { - return new ShardCheckpoint(STREAM_NAME, SHARD_ID, iteratorType, timestamp); - } + private static final String AT_SEQUENCE_SHARD_IT = "AT_SEQUENCE_SHARD_IT"; + private static final String AFTER_SEQUENCE_SHARD_IT = "AFTER_SEQUENCE_SHARD_IT"; + private static final String STREAM_NAME = "STREAM"; + private static final String SHARD_ID = "SHARD_ID"; + @Mock + private SimplifiedKinesisClient client; + + @Before + public void setUp() throws IOException, TransientKinesisException { + when(client.getShardIterator( + eq(STREAM_NAME), eq(SHARD_ID), eq(AT_SEQUENCE_NUMBER), + anyString(), isNull(Instant.class))). + thenReturn(AT_SEQUENCE_SHARD_IT); + when(client.getShardIterator( + eq(STREAM_NAME), eq(SHARD_ID), eq(AFTER_SEQUENCE_NUMBER), + anyString(), isNull(Instant.class))). + thenReturn(AFTER_SEQUENCE_SHARD_IT); + } + + @Test + public void testProvidingShardIterator() throws IOException, TransientKinesisException { + assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", null).getShardIterator(client)) + .isEqualTo(AT_SEQUENCE_SHARD_IT); + assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", null).getShardIterator(client)) + .isEqualTo(AFTER_SEQUENCE_SHARD_IT); + assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 10L).getShardIterator(client)).isEqualTo + (AT_SEQUENCE_SHARD_IT); + assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 10L).getShardIterator(client)) + .isEqualTo(AT_SEQUENCE_SHARD_IT); + } + + @Test + public void testComparisonWithExtendedSequenceNumber() { + assertThat(new ShardCheckpoint("", "", new StartingPoint(LATEST)).isBeforeOrAt( + recordWith(new ExtendedSequenceNumber("100", 0L)) + )).isTrue(); + + assertThat(new ShardCheckpoint("", "", new StartingPoint(TRIM_HORIZON)).isBeforeOrAt( + recordWith(new ExtendedSequenceNumber("100", 0L)) + )).isTrue(); + + assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "10", 1L).isBeforeOrAt( + recordWith(new ExtendedSequenceNumber("100", 0L)) + )).isTrue(); + + assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt( + recordWith(new ExtendedSequenceNumber("100", 0L)) + )).isTrue(); + + assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt( + recordWith(new ExtendedSequenceNumber("100", 0L)) + )).isFalse(); + + assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 1L).isBeforeOrAt( + recordWith(new ExtendedSequenceNumber("100", 0L)) + )).isFalse(); + + assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt( + recordWith(new ExtendedSequenceNumber("99", 1L)) + )).isFalse(); + } + + @Test + public void testComparisonWithTimestamp() { + DateTime referenceTimestamp = DateTime.now(); + + assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant()) + .isBeforeOrAt(recordWith(referenceTimestamp.minusMillis(10).toInstant())) + ).isFalse(); + + assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant()) + .isBeforeOrAt(recordWith(referenceTimestamp.toInstant())) + ).isTrue(); + + assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant()) + .isBeforeOrAt(recordWith(referenceTimestamp.plusMillis(10).toInstant())) + ).isTrue(); + } + + private KinesisRecord recordWith(ExtendedSequenceNumber extendedSequenceNumber) { + KinesisRecord record = mock(KinesisRecord.class); + given(record.getExtendedSequenceNumber()).willReturn(extendedSequenceNumber); + return record; + } + + private ShardCheckpoint checkpoint(ShardIteratorType iteratorType, String sequenceNumber, + Long subSequenceNumber) { + return new ShardCheckpoint(STREAM_NAME, SHARD_ID, iteratorType, sequenceNumber, + subSequenceNumber); + } + + private KinesisRecord recordWith(Instant approximateArrivalTimestamp) { + KinesisRecord record = mock(KinesisRecord.class); + given(record.getApproximateArrivalTimestamp()).willReturn(approximateArrivalTimestamp); + return record; + } + + private ShardCheckpoint checkpoint(ShardIteratorType iteratorType, Instant timestamp) { + return new ShardCheckpoint(STREAM_NAME, SHARD_ID, iteratorType, timestamp); + } }
