Reformatting Kinesis IO to comply with official code style
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7925a668 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7925a668 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7925a668 Branch: refs/heads/DSL_SQL Commit: 7925a668b12e272c7b2631ff6b20376e92ad90be Parents: 4abd714 Author: Pawel Kaczmarczyk <[email protected]> Authored: Mon Jun 19 11:10:25 2017 +0200 Committer: Tyler Akidau <[email protected]> Committed: Wed Jul 12 20:01:02 2017 -0700 ---------------------------------------------------------------------- .../sdk/io/kinesis/CheckpointGenerator.java | 6 +- .../beam/sdk/io/kinesis/CustomOptional.java | 111 ++-- .../io/kinesis/DynamicCheckpointGenerator.java | 52 +- .../sdk/io/kinesis/GetKinesisRecordsResult.java | 49 +- .../sdk/io/kinesis/KinesisClientProvider.java | 4 +- .../apache/beam/sdk/io/kinesis/KinesisIO.java | 279 +++++----- .../beam/sdk/io/kinesis/KinesisReader.java | 206 +++---- .../sdk/io/kinesis/KinesisReaderCheckpoint.java | 97 ++-- .../beam/sdk/io/kinesis/KinesisRecord.java | 177 +++--- .../beam/sdk/io/kinesis/KinesisRecordCoder.java | 68 +-- .../beam/sdk/io/kinesis/KinesisSource.java | 147 ++--- .../beam/sdk/io/kinesis/RecordFilter.java | 18 +- .../apache/beam/sdk/io/kinesis/RoundRobin.java | 37 +- .../beam/sdk/io/kinesis/ShardCheckpoint.java | 241 ++++----- .../sdk/io/kinesis/ShardRecordsIterator.java | 106 ++-- .../sdk/io/kinesis/SimplifiedKinesisClient.java | 215 ++++---- .../beam/sdk/io/kinesis/StartingPoint.java | 84 +-- .../io/kinesis/StaticCheckpointGenerator.java | 27 +- .../io/kinesis/TransientKinesisException.java | 7 +- .../beam/sdk/io/kinesis/AmazonKinesisMock.java | 539 ++++++++++--------- .../beam/sdk/io/kinesis/CustomOptionalTest.java | 27 +- .../kinesis/DynamicCheckpointGeneratorTest.java | 33 +- .../sdk/io/kinesis/KinesisMockReadTest.java | 97 ++-- .../io/kinesis/KinesisReaderCheckpointTest.java | 52 +- .../beam/sdk/io/kinesis/KinesisReaderIT.java | 127 ++--- .../beam/sdk/io/kinesis/KinesisReaderTest.java | 166 +++--- .../sdk/io/kinesis/KinesisRecordCoderTest.java | 34 +- .../beam/sdk/io/kinesis/KinesisTestOptions.java | 43 +- .../beam/sdk/io/kinesis/KinesisUploader.java | 70 +-- .../beam/sdk/io/kinesis/RecordFilterTest.java | 52 +- .../beam/sdk/io/kinesis/RoundRobinTest.java | 42 +- .../sdk/io/kinesis/ShardCheckpointTest.java | 203 +++---- .../io/kinesis/ShardRecordsIteratorTest.java | 216 ++++---- .../io/kinesis/SimplifiedKinesisClientTest.java | 351 ++++++------ 34 files changed, 2031 insertions(+), 1952 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java index 919d85a..2629c57 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.io.kinesis; - import java.io.Serializable; /** @@ -25,6 +24,7 @@ import java.io.Serializable; * How exactly the checkpoint is generated is up to implementing class. */ interface CheckpointGenerator extends Serializable { - KinesisReaderCheckpoint generate(SimplifiedKinesisClient client) - throws TransientKinesisException; + + KinesisReaderCheckpoint generate(SimplifiedKinesisClient client) + throws TransientKinesisException; } http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java index 4bed0e3..5a28214 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java @@ -24,76 +24,79 @@ import java.util.Objects; * Similar to Guava {@code Optional}, but throws {@link NoSuchElementException} for missing element. */ abstract class CustomOptional<T> { - @SuppressWarnings("unchecked") - public static <T> CustomOptional<T> absent() { - return (Absent<T>) Absent.INSTANCE; - } - public static <T> CustomOptional<T> of(T v) { - return new Present<>(v); - } + @SuppressWarnings("unchecked") + public static <T> CustomOptional<T> absent() { + return (Absent<T>) Absent.INSTANCE; + } - public abstract boolean isPresent(); + public static <T> CustomOptional<T> of(T v) { + return new Present<>(v); + } - public abstract T get(); + public abstract boolean isPresent(); - private static class Present<T> extends CustomOptional<T> { - private final T value; + public abstract T get(); - private Present(T value) { - this.value = value; - } + private static class Present<T> extends CustomOptional<T> { - @Override - public boolean isPresent() { - return true; - } + private final T value; - @Override - public T get() { - return value; - } + private Present(T value) { + this.value = value; + } - @Override - public boolean equals(Object o) { - if (!(o instanceof Present)) { - return false; - } + @Override + public boolean isPresent() { + return true; + } - Present<?> present = (Present<?>) o; - return Objects.equals(value, present.value); - } + @Override + public T get() { + return value; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof Present)) { + return false; + } - @Override - public int hashCode() { - return Objects.hash(value); - } + Present<?> present = (Present<?>) o; + return Objects.equals(value, present.value); } - private static class Absent<T> extends CustomOptional<T> { - private static final Absent<Object> INSTANCE = new Absent<>(); + @Override + public int hashCode() { + return Objects.hash(value); + } + } - private Absent() { - } + private static class Absent<T> extends CustomOptional<T> { - @Override - public boolean isPresent() { - return false; - } + private static final Absent<Object> INSTANCE = new Absent<>(); - @Override - public T get() { - throw new NoSuchElementException(); - } + private Absent() { + } + + @Override + public boolean isPresent() { + return false; + } - @Override - public boolean equals(Object o) { - return o instanceof Absent; - } + @Override + public T get() { + throw new NoSuchElementException(); + } + + @Override + public boolean equals(Object o) { + return o instanceof Absent; + } - @Override - public int hashCode() { - return 0; - } + @Override + public int hashCode() { + return 0; } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java index 2ec293c..9933019 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java @@ -28,29 +28,31 @@ import com.google.common.base.Function; * List of shards is obtained dynamically on call to {@link #generate(SimplifiedKinesisClient)}. */ class DynamicCheckpointGenerator implements CheckpointGenerator { - private final String streamName; - private final StartingPoint startingPoint; - - public DynamicCheckpointGenerator(String streamName, StartingPoint startingPoint) { - this.streamName = checkNotNull(streamName, "streamName"); - this.startingPoint = checkNotNull(startingPoint, "startingPoint"); - } - - @Override - public KinesisReaderCheckpoint generate(SimplifiedKinesisClient kinesis) - throws TransientKinesisException { - return new KinesisReaderCheckpoint( - transform(kinesis.listShards(streamName), new Function<Shard, ShardCheckpoint>() { - @Override - public ShardCheckpoint apply(Shard shard) { - return new ShardCheckpoint(streamName, shard.getShardId(), startingPoint); - } - }) - ); - } - - @Override - public String toString() { - return String.format("Checkpoint generator for %s: %s", streamName, startingPoint); - } + + private final String streamName; + private final StartingPoint startingPoint; + + public DynamicCheckpointGenerator(String streamName, StartingPoint startingPoint) { + this.streamName = checkNotNull(streamName, "streamName"); + this.startingPoint = checkNotNull(startingPoint, "startingPoint"); + } + + @Override + public KinesisReaderCheckpoint generate(SimplifiedKinesisClient kinesis) + throws TransientKinesisException { + return new KinesisReaderCheckpoint( + transform(kinesis.listShards(streamName), new Function<Shard, ShardCheckpoint>() { + + @Override + public ShardCheckpoint apply(Shard shard) { + return new ShardCheckpoint(streamName, shard.getShardId(), startingPoint); + } + }) + ); + } + + @Override + public String toString() { + return String.format("Checkpoint generator for %s: %s", streamName, startingPoint); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java index 5a34d7d..f605f55 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java @@ -21,6 +21,7 @@ import static com.google.common.collect.Lists.transform; import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord; import com.google.common.base.Function; + import java.util.List; import javax.annotation.Nullable; @@ -28,27 +29,29 @@ import javax.annotation.Nullable; * Represents the output of 'get' operation on Kinesis stream. */ class GetKinesisRecordsResult { - private final List<KinesisRecord> records; - private final String nextShardIterator; - - public GetKinesisRecordsResult(List<UserRecord> records, String nextShardIterator, - final String streamName, final String shardId) { - this.records = transform(records, new Function<UserRecord, KinesisRecord>() { - @Nullable - @Override - public KinesisRecord apply(@Nullable UserRecord input) { - assert input != null; // to make FindBugs happy - return new KinesisRecord(input, streamName, shardId); - } - }); - this.nextShardIterator = nextShardIterator; - } - - public List<KinesisRecord> getRecords() { - return records; - } - - public String getNextShardIterator() { - return nextShardIterator; - } + + private final List<KinesisRecord> records; + private final String nextShardIterator; + + public GetKinesisRecordsResult(List<UserRecord> records, String nextShardIterator, + final String streamName, final String shardId) { + this.records = transform(records, new Function<UserRecord, KinesisRecord>() { + + @Nullable + @Override + public KinesisRecord apply(@Nullable UserRecord input) { + assert input != null; // to make FindBugs happy + return new KinesisRecord(input, streamName, shardId); + } + }); + this.nextShardIterator = nextShardIterator; + } + + public List<KinesisRecord> getRecords() { + return records; + } + + public String getNextShardIterator() { + return nextShardIterator; + } } http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java index c7fd7f6..b5b721e 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.kinesis; import com.amazonaws.services.kinesis.AmazonKinesis; + import java.io.Serializable; /** @@ -27,5 +28,6 @@ import java.io.Serializable; * {@link Serializable} to ensure it can be sent to worker machines. */ interface KinesisClientProvider extends Serializable { - AmazonKinesis get(); + + AmazonKinesis get(); } http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java index b85eb63..bc8ada1 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.io.kinesis; - import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; @@ -29,7 +28,9 @@ import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.AmazonKinesisClient; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; import com.google.auto.value.AutoValue; + import javax.annotation.Nullable; + import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource; import org.apache.beam.sdk.transforms.PTransform; @@ -102,142 +103,148 @@ import org.joda.time.Instant; */ @Experimental(Experimental.Kind.SOURCE_SINK) public final class KinesisIO { - /** Returns a new {@link Read} transform for reading from Kinesis. */ - public static Read read() { - return new AutoValue_KinesisIO_Read.Builder().setMaxNumRecords(-1).build(); + + /** Returns a new {@link Read} transform for reading from Kinesis. */ + public static Read read() { + return new AutoValue_KinesisIO_Read.Builder().setMaxNumRecords(-1).build(); + } + + /** Implementation of {@link #read}. */ + @AutoValue + public abstract static class Read extends PTransform<PBegin, PCollection<KinesisRecord>> { + + @Nullable + abstract String getStreamName(); + + @Nullable + abstract StartingPoint getInitialPosition(); + + @Nullable + abstract KinesisClientProvider getClientProvider(); + + abstract int getMaxNumRecords(); + + @Nullable + abstract Duration getMaxReadTime(); + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + + abstract Builder setStreamName(String streamName); + + abstract Builder setInitialPosition(StartingPoint startingPoint); + + abstract Builder setClientProvider(KinesisClientProvider clientProvider); + + abstract Builder setMaxNumRecords(int maxNumRecords); + + abstract Builder setMaxReadTime(Duration maxReadTime); + + abstract Read build(); } - /** Implementation of {@link #read}. */ - @AutoValue - public abstract static class Read extends PTransform<PBegin, PCollection<KinesisRecord>> { - @Nullable - abstract String getStreamName(); - - @Nullable - abstract StartingPoint getInitialPosition(); - - @Nullable - abstract KinesisClientProvider getClientProvider(); - - abstract int getMaxNumRecords(); - - @Nullable - abstract Duration getMaxReadTime(); - - abstract Builder toBuilder(); - - @AutoValue.Builder - abstract static class Builder { - abstract Builder setStreamName(String streamName); - abstract Builder setInitialPosition(StartingPoint startingPoint); - abstract Builder setClientProvider(KinesisClientProvider clientProvider); - abstract Builder setMaxNumRecords(int maxNumRecords); - abstract Builder setMaxReadTime(Duration maxReadTime); - - abstract Read build(); - } - - /** - * Specify reading from streamName at some initial position. - */ - public Read from(String streamName, InitialPositionInStream initialPosition) { - return toBuilder() - .setStreamName(streamName) - .setInitialPosition( - new StartingPoint(checkNotNull(initialPosition, "initialPosition"))) - .build(); - } - - /** - * Specify reading from streamName beginning at given {@link Instant}. - * This {@link Instant} must be in the past, i.e. before {@link Instant#now()}. - */ - public Read from(String streamName, Instant initialTimestamp) { - return toBuilder() - .setStreamName(streamName) - .setInitialPosition( - new StartingPoint(checkNotNull(initialTimestamp, "initialTimestamp"))) - .build(); - } - - /** - * Allows to specify custom {@link KinesisClientProvider}. - * {@link KinesisClientProvider} provides {@link AmazonKinesis} instances which are later - * used for communication with Kinesis. - * You should use this method if {@link Read#withClientProvider(String, String, Regions)} - * does not suit your needs. - */ - public Read withClientProvider(KinesisClientProvider kinesisClientProvider) { - return toBuilder().setClientProvider(kinesisClientProvider).build(); - } - - /** - * Specify credential details and region to be used to read from Kinesis. - * If you need more sophisticated credential protocol, then you should look at - * {@link Read#withClientProvider(KinesisClientProvider)}. - */ - public Read withClientProvider(String awsAccessKey, String awsSecretKey, Regions region) { - return withClientProvider(new BasicKinesisProvider(awsAccessKey, awsSecretKey, region)); - } - - /** Specifies to read at most a given number of records. */ - public Read withMaxNumRecords(int maxNumRecords) { - checkArgument( - maxNumRecords > 0, "maxNumRecords must be positive, but was: %s", maxNumRecords); - return toBuilder().setMaxNumRecords(maxNumRecords).build(); - } - - /** Specifies to read at most a given number of records. */ - public Read withMaxReadTime(Duration maxReadTime) { - checkNotNull(maxReadTime, "maxReadTime"); - return toBuilder().setMaxReadTime(maxReadTime).build(); - } - - @Override - public PCollection<KinesisRecord> expand(PBegin input) { - org.apache.beam.sdk.io.Read.Unbounded<KinesisRecord> read = - org.apache.beam.sdk.io.Read.from( - new KinesisSource(getClientProvider(), getStreamName(), getInitialPosition())); - if (getMaxNumRecords() > 0) { - BoundedReadFromUnboundedSource<KinesisRecord> bounded = - read.withMaxNumRecords(getMaxNumRecords()); - return getMaxReadTime() == null - ? input.apply(bounded) - : input.apply(bounded.withMaxReadTime(getMaxReadTime())); - } else { - return getMaxReadTime() == null - ? input.apply(read) - : input.apply(read.withMaxReadTime(getMaxReadTime())); - } - } - - private static final class BasicKinesisProvider implements KinesisClientProvider { - - private final String accessKey; - private final String secretKey; - private final Regions region; - - private BasicKinesisProvider(String accessKey, String secretKey, Regions region) { - this.accessKey = checkNotNull(accessKey, "accessKey"); - this.secretKey = checkNotNull(secretKey, "secretKey"); - this.region = checkNotNull(region, "region"); - } - - - private AWSCredentialsProvider getCredentialsProvider() { - return new StaticCredentialsProvider(new BasicAWSCredentials( - accessKey, - secretKey - )); - - } - - @Override - public AmazonKinesis get() { - AmazonKinesisClient client = new AmazonKinesisClient(getCredentialsProvider()); - client.withRegion(region); - return client; - } - } + /** + * Specify reading from streamName at some initial position. + */ + public Read from(String streamName, InitialPositionInStream initialPosition) { + return toBuilder() + .setStreamName(streamName) + .setInitialPosition( + new StartingPoint(checkNotNull(initialPosition, "initialPosition"))) + .build(); + } + + /** + * Specify reading from streamName beginning at given {@link Instant}. + * This {@link Instant} must be in the past, i.e. before {@link Instant#now()}. + */ + public Read from(String streamName, Instant initialTimestamp) { + return toBuilder() + .setStreamName(streamName) + .setInitialPosition( + new StartingPoint(checkNotNull(initialTimestamp, "initialTimestamp"))) + .build(); + } + + /** + * Allows to specify custom {@link KinesisClientProvider}. + * {@link KinesisClientProvider} provides {@link AmazonKinesis} instances which are later + * used for communication with Kinesis. + * You should use this method if {@link Read#withClientProvider(String, String, Regions)} + * does not suit your needs. + */ + public Read withClientProvider(KinesisClientProvider kinesisClientProvider) { + return toBuilder().setClientProvider(kinesisClientProvider).build(); + } + + /** + * Specify credential details and region to be used to read from Kinesis. + * If you need more sophisticated credential protocol, then you should look at + * {@link Read#withClientProvider(KinesisClientProvider)}. + */ + public Read withClientProvider(String awsAccessKey, String awsSecretKey, Regions region) { + return withClientProvider(new BasicKinesisProvider(awsAccessKey, awsSecretKey, region)); + } + + /** Specifies to read at most a given number of records. */ + public Read withMaxNumRecords(int maxNumRecords) { + checkArgument( + maxNumRecords > 0, "maxNumRecords must be positive, but was: %s", maxNumRecords); + return toBuilder().setMaxNumRecords(maxNumRecords).build(); + } + + /** Specifies to read at most a given number of records. */ + public Read withMaxReadTime(Duration maxReadTime) { + checkNotNull(maxReadTime, "maxReadTime"); + return toBuilder().setMaxReadTime(maxReadTime).build(); + } + + @Override + public PCollection<KinesisRecord> expand(PBegin input) { + org.apache.beam.sdk.io.Read.Unbounded<KinesisRecord> read = + org.apache.beam.sdk.io.Read.from( + new KinesisSource(getClientProvider(), getStreamName(), getInitialPosition())); + if (getMaxNumRecords() > 0) { + BoundedReadFromUnboundedSource<KinesisRecord> bounded = + read.withMaxNumRecords(getMaxNumRecords()); + return getMaxReadTime() == null + ? input.apply(bounded) + : input.apply(bounded.withMaxReadTime(getMaxReadTime())); + } else { + return getMaxReadTime() == null + ? input.apply(read) + : input.apply(read.withMaxReadTime(getMaxReadTime())); + } + } + + private static final class BasicKinesisProvider implements KinesisClientProvider { + + private final String accessKey; + private final String secretKey; + private final Regions region; + + private BasicKinesisProvider(String accessKey, String secretKey, Regions region) { + this.accessKey = checkNotNull(accessKey, "accessKey"); + this.secretKey = checkNotNull(secretKey, "secretKey"); + this.region = checkNotNull(region, "region"); + } + + private AWSCredentialsProvider getCredentialsProvider() { + return new StaticCredentialsProvider(new BasicAWSCredentials( + accessKey, + secretKey + )); + + } + + @Override + public AmazonKinesis get() { + AmazonKinesisClient client = new AmazonKinesisClient(getCredentialsProvider()); + client.withRegion(region); + return client; + } } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java index 2138094..e5c32d2 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java @@ -17,129 +17,129 @@ */ package org.apache.beam.sdk.io.kinesis; - import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.collect.Lists.newArrayList; import java.io.IOException; import java.util.List; import java.util.NoSuchElementException; + import org.apache.beam.sdk.io.UnboundedSource; import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** * Reads data from multiple kinesis shards in a single thread. * It uses simple round robin algorithm when fetching data from shards. */ class KinesisReader extends UnboundedSource.UnboundedReader<KinesisRecord> { - private static final Logger LOG = LoggerFactory.getLogger(KinesisReader.class); - - private final SimplifiedKinesisClient kinesis; - private final UnboundedSource<KinesisRecord, ?> source; - private final CheckpointGenerator initialCheckpointGenerator; - private RoundRobin<ShardRecordsIterator> shardIterators; - private CustomOptional<KinesisRecord> currentRecord = CustomOptional.absent(); - - public KinesisReader(SimplifiedKinesisClient kinesis, - CheckpointGenerator initialCheckpointGenerator, - UnboundedSource<KinesisRecord, ?> source) { - this.kinesis = checkNotNull(kinesis, "kinesis"); - this.initialCheckpointGenerator = - checkNotNull(initialCheckpointGenerator, "initialCheckpointGenerator"); - this.source = source; - } - - /** - * Generates initial checkpoint and instantiates iterators for shards. - */ - @Override - public boolean start() throws IOException { - LOG.info("Starting reader using {}", initialCheckpointGenerator); - - try { - KinesisReaderCheckpoint initialCheckpoint = - initialCheckpointGenerator.generate(kinesis); - List<ShardRecordsIterator> iterators = newArrayList(); - for (ShardCheckpoint checkpoint : initialCheckpoint) { - iterators.add(checkpoint.getShardRecordsIterator(kinesis)); - } - shardIterators = new RoundRobin<>(iterators); - } catch (TransientKinesisException e) { - throw new IOException(e); - } - return advance(); + private static final Logger LOG = LoggerFactory.getLogger(KinesisReader.class); + + private final SimplifiedKinesisClient kinesis; + private final UnboundedSource<KinesisRecord, ?> source; + private final CheckpointGenerator initialCheckpointGenerator; + private RoundRobin<ShardRecordsIterator> shardIterators; + private CustomOptional<KinesisRecord> currentRecord = CustomOptional.absent(); + + public KinesisReader(SimplifiedKinesisClient kinesis, + CheckpointGenerator initialCheckpointGenerator, + UnboundedSource<KinesisRecord, ?> source) { + this.kinesis = checkNotNull(kinesis, "kinesis"); + this.initialCheckpointGenerator = + checkNotNull(initialCheckpointGenerator, "initialCheckpointGenerator"); + this.source = source; + } + + /** + * Generates initial checkpoint and instantiates iterators for shards. + */ + @Override + public boolean start() throws IOException { + LOG.info("Starting reader using {}", initialCheckpointGenerator); + + try { + KinesisReaderCheckpoint initialCheckpoint = + initialCheckpointGenerator.generate(kinesis); + List<ShardRecordsIterator> iterators = newArrayList(); + for (ShardCheckpoint checkpoint : initialCheckpoint) { + iterators.add(checkpoint.getShardRecordsIterator(kinesis)); + } + shardIterators = new RoundRobin<>(iterators); + } catch (TransientKinesisException e) { + throw new IOException(e); } - /** - * Moves to the next record in one of the shards. - * If current shard iterator can be move forward (i.e. there's a record present) then we do it. - * If not, we iterate over shards in a round-robin manner. - */ - @Override - public boolean advance() throws IOException { - try { - for (int i = 0; i < shardIterators.size(); ++i) { - currentRecord = shardIterators.getCurrent().next(); - if (currentRecord.isPresent()) { - return true; - } else { - shardIterators.moveForward(); - } - } - } catch (TransientKinesisException e) { - LOG.warn("Transient exception occurred", e); + return advance(); + } + + /** + * Moves to the next record in one of the shards. + * If current shard iterator can be move forward (i.e. there's a record present) then we do it. + * If not, we iterate over shards in a round-robin manner. + */ + @Override + public boolean advance() throws IOException { + try { + for (int i = 0; i < shardIterators.size(); ++i) { + currentRecord = shardIterators.getCurrent().next(); + if (currentRecord.isPresent()) { + return true; + } else { + shardIterators.moveForward(); } - return false; - } - - @Override - public byte[] getCurrentRecordId() throws NoSuchElementException { - return currentRecord.get().getUniqueId(); - } - - @Override - public KinesisRecord getCurrent() throws NoSuchElementException { - return currentRecord.get(); - } - - /** - * When {@link KinesisReader} was advanced to the current record. - * We cannot use approximate arrival timestamp given for each record by Kinesis as it - * is not guaranteed to be accurate - this could lead to mark some records as "late" - * even if they were not. - */ - @Override - public Instant getCurrentTimestamp() throws NoSuchElementException { - return currentRecord.get().getReadTime(); - } - - @Override - public void close() throws IOException { - } - - /** - * Current time. - * We cannot give better approximation of the watermark with current semantics of - * {@link KinesisReader#getCurrentTimestamp()}, because we don't know when the next - * {@link KinesisReader#advance()} will be called. - */ - @Override - public Instant getWatermark() { - return Instant.now(); - } - - @Override - public UnboundedSource.CheckpointMark getCheckpointMark() { - return KinesisReaderCheckpoint.asCurrentStateOf(shardIterators); - } - - @Override - public UnboundedSource<KinesisRecord, ?> getCurrentSource() { - return source; + } + } catch (TransientKinesisException e) { + LOG.warn("Transient exception occurred", e); } + return false; + } + + @Override + public byte[] getCurrentRecordId() throws NoSuchElementException { + return currentRecord.get().getUniqueId(); + } + + @Override + public KinesisRecord getCurrent() throws NoSuchElementException { + return currentRecord.get(); + } + + /** + * When {@link KinesisReader} was advanced to the current record. + * We cannot use approximate arrival timestamp given for each record by Kinesis as it + * is not guaranteed to be accurate - this could lead to mark some records as "late" + * even if they were not. + */ + @Override + public Instant getCurrentTimestamp() throws NoSuchElementException { + return currentRecord.get().getReadTime(); + } + + @Override + public void close() throws IOException { + } + + /** + * Current time. + * We cannot give better approximation of the watermark with current semantics of + * {@link KinesisReader#getCurrentTimestamp()}, because we don't know when the next + * {@link KinesisReader#advance()} will be called. + */ + @Override + public Instant getWatermark() { + return Instant.now(); + } + + @Override + public UnboundedSource.CheckpointMark getCheckpointMark() { + return KinesisReaderCheckpoint.asCurrentStateOf(shardIterators); + } + + @Override + public UnboundedSource<KinesisRecord, ?> getCurrentSource() { + return source; + } } http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java index f0fa45d..d995e75 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java @@ -23,11 +23,13 @@ import static com.google.common.collect.Lists.partition; import com.google.common.base.Function; import com.google.common.collect.ImmutableList; + import java.io.IOException; import java.io.Serializable; import java.util.Iterator; import java.util.List; import javax.annotation.Nullable; + import org.apache.beam.sdk.io.UnboundedSource; /** @@ -37,60 +39,61 @@ import org.apache.beam.sdk.io.UnboundedSource; * This class is immutable. */ class KinesisReaderCheckpoint implements Iterable<ShardCheckpoint>, UnboundedSource - .CheckpointMark, Serializable { - private final List<ShardCheckpoint> shardCheckpoints; + .CheckpointMark, Serializable { - public KinesisReaderCheckpoint(Iterable<ShardCheckpoint> shardCheckpoints) { - this.shardCheckpoints = ImmutableList.copyOf(shardCheckpoints); - } + private final List<ShardCheckpoint> shardCheckpoints; - public static KinesisReaderCheckpoint asCurrentStateOf(Iterable<ShardRecordsIterator> - iterators) { - return new KinesisReaderCheckpoint(transform(iterators, - new Function<ShardRecordsIterator, ShardCheckpoint>() { - - @Nullable - @Override - public ShardCheckpoint apply(@Nullable - ShardRecordsIterator shardRecordsIterator) { - assert shardRecordsIterator != null; - return shardRecordsIterator.getCheckpoint(); - } - })); - } + public KinesisReaderCheckpoint(Iterable<ShardCheckpoint> shardCheckpoints) { + this.shardCheckpoints = ImmutableList.copyOf(shardCheckpoints); + } - /** - * Splits given multi-shard checkpoint into partitions of approximately equal size. - * - * @param desiredNumSplits - upper limit for number of partitions to generate. - * @return list of checkpoints covering consecutive partitions of current checkpoint. - */ - public List<KinesisReaderCheckpoint> splitInto(int desiredNumSplits) { - int partitionSize = divideAndRoundUp(shardCheckpoints.size(), desiredNumSplits); - - List<KinesisReaderCheckpoint> checkpoints = newArrayList(); - for (List<ShardCheckpoint> shardPartition : partition(shardCheckpoints, partitionSize)) { - checkpoints.add(new KinesisReaderCheckpoint(shardPartition)); - } - return checkpoints; - } + public static KinesisReaderCheckpoint asCurrentStateOf(Iterable<ShardRecordsIterator> + iterators) { + return new KinesisReaderCheckpoint(transform(iterators, + new Function<ShardRecordsIterator, ShardCheckpoint>() { - private int divideAndRoundUp(int nominator, int denominator) { - return (nominator + denominator - 1) / denominator; - } + @Nullable + @Override + public ShardCheckpoint apply(@Nullable + ShardRecordsIterator shardRecordsIterator) { + assert shardRecordsIterator != null; + return shardRecordsIterator.getCheckpoint(); + } + })); + } - @Override - public void finalizeCheckpoint() throws IOException { + /** + * Splits given multi-shard checkpoint into partitions of approximately equal size. + * + * @param desiredNumSplits - upper limit for number of partitions to generate. + * @return list of checkpoints covering consecutive partitions of current checkpoint. + */ + public List<KinesisReaderCheckpoint> splitInto(int desiredNumSplits) { + int partitionSize = divideAndRoundUp(shardCheckpoints.size(), desiredNumSplits); + List<KinesisReaderCheckpoint> checkpoints = newArrayList(); + for (List<ShardCheckpoint> shardPartition : partition(shardCheckpoints, partitionSize)) { + checkpoints.add(new KinesisReaderCheckpoint(shardPartition)); } + return checkpoints; + } - @Override - public String toString() { - return shardCheckpoints.toString(); - } + private int divideAndRoundUp(int nominator, int denominator) { + return (nominator + denominator - 1) / denominator; + } - @Override - public Iterator<ShardCheckpoint> iterator() { - return shardCheckpoints.iterator(); - } + @Override + public void finalizeCheckpoint() throws IOException { + + } + + @Override + public String toString() { + return shardCheckpoints.toString(); + } + + @Override + public Iterator<ShardCheckpoint> iterator() { + return shardCheckpoints.iterator(); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java index 02b5370..057b7bb 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java @@ -22,7 +22,9 @@ import static org.apache.commons.lang.builder.HashCodeBuilder.reflectionHashCode import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord; import com.google.common.base.Charsets; + import java.nio.ByteBuffer; + import org.apache.commons.lang.builder.EqualsBuilder; import org.joda.time.Instant; @@ -30,91 +32,92 @@ import org.joda.time.Instant; * {@link UserRecord} enhanced with utility methods. */ public class KinesisRecord { - private Instant readTime; - private String streamName; - private String shardId; - private long subSequenceNumber; - private String sequenceNumber; - private Instant approximateArrivalTimestamp; - private ByteBuffer data; - private String partitionKey; - - public KinesisRecord(UserRecord record, String streamName, String shardId) { - this(record.getData(), record.getSequenceNumber(), record.getSubSequenceNumber(), - record.getPartitionKey(), - new Instant(record.getApproximateArrivalTimestamp()), - Instant.now(), - streamName, shardId); - } - - public KinesisRecord(ByteBuffer data, String sequenceNumber, long subSequenceNumber, - String partitionKey, Instant approximateArrivalTimestamp, - Instant readTime, - String streamName, String shardId) { - this.data = data; - this.sequenceNumber = sequenceNumber; - this.subSequenceNumber = subSequenceNumber; - this.partitionKey = partitionKey; - this.approximateArrivalTimestamp = approximateArrivalTimestamp; - this.readTime = readTime; - this.streamName = streamName; - this.shardId = shardId; - } - - public ExtendedSequenceNumber getExtendedSequenceNumber() { - return new ExtendedSequenceNumber(getSequenceNumber(), getSubSequenceNumber()); - } - - /*** - * @return unique id of the record based on its position in the stream - */ - public byte[] getUniqueId() { - return getExtendedSequenceNumber().toString().getBytes(Charsets.UTF_8); - } - - public Instant getReadTime() { - return readTime; - } - - public String getStreamName() { - return streamName; - } - - public String getShardId() { - return shardId; - } - - public byte[] getDataAsBytes() { - return getData().array(); - } - - @Override - public boolean equals(Object obj) { - return EqualsBuilder.reflectionEquals(this, obj); - } - - @Override - public int hashCode() { - return reflectionHashCode(this); - } - - public long getSubSequenceNumber() { - return subSequenceNumber; - } - - public String getSequenceNumber() { - return sequenceNumber; - } - - public Instant getApproximateArrivalTimestamp() { - return approximateArrivalTimestamp; - } - - public ByteBuffer getData() { - return data; - } - - public String getPartitionKey() { - return partitionKey; - } + + private Instant readTime; + private String streamName; + private String shardId; + private long subSequenceNumber; + private String sequenceNumber; + private Instant approximateArrivalTimestamp; + private ByteBuffer data; + private String partitionKey; + + public KinesisRecord(UserRecord record, String streamName, String shardId) { + this(record.getData(), record.getSequenceNumber(), record.getSubSequenceNumber(), + record.getPartitionKey(), + new Instant(record.getApproximateArrivalTimestamp()), + Instant.now(), + streamName, shardId); + } + + public KinesisRecord(ByteBuffer data, String sequenceNumber, long subSequenceNumber, + String partitionKey, Instant approximateArrivalTimestamp, + Instant readTime, + String streamName, String shardId) { + this.data = data; + this.sequenceNumber = sequenceNumber; + this.subSequenceNumber = subSequenceNumber; + this.partitionKey = partitionKey; + this.approximateArrivalTimestamp = approximateArrivalTimestamp; + this.readTime = readTime; + this.streamName = streamName; + this.shardId = shardId; + } + + public ExtendedSequenceNumber getExtendedSequenceNumber() { + return new ExtendedSequenceNumber(getSequenceNumber(), getSubSequenceNumber()); + } + + /*** + * @return unique id of the record based on its position in the stream + */ + public byte[] getUniqueId() { + return getExtendedSequenceNumber().toString().getBytes(Charsets.UTF_8); + } + + public Instant getReadTime() { + return readTime; + } + + public String getStreamName() { + return streamName; + } + + public String getShardId() { + return shardId; + } + + public byte[] getDataAsBytes() { + return getData().array(); + } + + @Override + public boolean equals(Object obj) { + return EqualsBuilder.reflectionEquals(this, obj); + } + + @Override + public int hashCode() { + return reflectionHashCode(this); + } + + public long getSubSequenceNumber() { + return subSequenceNumber; + } + + public String getSequenceNumber() { + return sequenceNumber; + } + + public Instant getApproximateArrivalTimestamp() { + return approximateArrivalTimestamp; + } + + public ByteBuffer getData() { + return data; + } + + public String getPartitionKey() { + return partitionKey; + } } http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java index f233e27..dcf564d 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; + import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; @@ -33,40 +34,41 @@ import org.joda.time.Instant; * A {@link Coder} for {@link KinesisRecord}. */ class KinesisRecordCoder extends AtomicCoder<KinesisRecord> { - private static final StringUtf8Coder STRING_CODER = StringUtf8Coder.of(); - private static final ByteArrayCoder BYTE_ARRAY_CODER = ByteArrayCoder.of(); - private static final InstantCoder INSTANT_CODER = InstantCoder.of(); - private static final VarLongCoder VAR_LONG_CODER = VarLongCoder.of(); - public static KinesisRecordCoder of() { - return new KinesisRecordCoder(); - } + private static final StringUtf8Coder STRING_CODER = StringUtf8Coder.of(); + private static final ByteArrayCoder BYTE_ARRAY_CODER = ByteArrayCoder.of(); + private static final InstantCoder INSTANT_CODER = InstantCoder.of(); + private static final VarLongCoder VAR_LONG_CODER = VarLongCoder.of(); + + public static KinesisRecordCoder of() { + return new KinesisRecordCoder(); + } - @Override - public void encode(KinesisRecord value, OutputStream outStream) throws - IOException { - BYTE_ARRAY_CODER.encode(value.getData().array(), outStream); - STRING_CODER.encode(value.getSequenceNumber(), outStream); - STRING_CODER.encode(value.getPartitionKey(), outStream); - INSTANT_CODER.encode(value.getApproximateArrivalTimestamp(), outStream); - VAR_LONG_CODER.encode(value.getSubSequenceNumber(), outStream); - INSTANT_CODER.encode(value.getReadTime(), outStream); - STRING_CODER.encode(value.getStreamName(), outStream); - STRING_CODER.encode(value.getShardId(), outStream); - } + @Override + public void encode(KinesisRecord value, OutputStream outStream) throws + IOException { + BYTE_ARRAY_CODER.encode(value.getData().array(), outStream); + STRING_CODER.encode(value.getSequenceNumber(), outStream); + STRING_CODER.encode(value.getPartitionKey(), outStream); + INSTANT_CODER.encode(value.getApproximateArrivalTimestamp(), outStream); + VAR_LONG_CODER.encode(value.getSubSequenceNumber(), outStream); + INSTANT_CODER.encode(value.getReadTime(), outStream); + STRING_CODER.encode(value.getStreamName(), outStream); + STRING_CODER.encode(value.getShardId(), outStream); + } - @Override - public KinesisRecord decode(InputStream inStream) throws IOException { - ByteBuffer data = ByteBuffer.wrap(BYTE_ARRAY_CODER.decode(inStream)); - String sequenceNumber = STRING_CODER.decode(inStream); - String partitionKey = STRING_CODER.decode(inStream); - Instant approximateArrivalTimestamp = INSTANT_CODER.decode(inStream); - long subSequenceNumber = VAR_LONG_CODER.decode(inStream); - Instant readTimestamp = INSTANT_CODER.decode(inStream); - String streamName = STRING_CODER.decode(inStream); - String shardId = STRING_CODER.decode(inStream); - return new KinesisRecord(data, sequenceNumber, subSequenceNumber, partitionKey, - approximateArrivalTimestamp, readTimestamp, streamName, shardId - ); - } + @Override + public KinesisRecord decode(InputStream inStream) throws IOException { + ByteBuffer data = ByteBuffer.wrap(BYTE_ARRAY_CODER.decode(inStream)); + String sequenceNumber = STRING_CODER.decode(inStream); + String partitionKey = STRING_CODER.decode(inStream); + Instant approximateArrivalTimestamp = INSTANT_CODER.decode(inStream); + long subSequenceNumber = VAR_LONG_CODER.decode(inStream); + Instant readTimestamp = INSTANT_CODER.decode(inStream); + String streamName = STRING_CODER.decode(inStream); + String shardId = STRING_CODER.decode(inStream); + return new KinesisRecord(data, sequenceNumber, subSequenceNumber, partitionKey, + approximateArrivalTimestamp, readTimestamp, streamName, shardId + ); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java index 7e67d07..362792b 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.collect.Lists.newArrayList; import java.util.List; + import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.io.UnboundedSource; @@ -28,85 +29,85 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** * Represents source for single stream in Kinesis. */ class KinesisSource extends UnboundedSource<KinesisRecord, KinesisReaderCheckpoint> { - private static final Logger LOG = LoggerFactory.getLogger(KinesisSource.class); - - private final KinesisClientProvider kinesis; - private CheckpointGenerator initialCheckpointGenerator; - public KinesisSource(KinesisClientProvider kinesis, String streamName, - StartingPoint startingPoint) { - this(kinesis, new DynamicCheckpointGenerator(streamName, startingPoint)); + private static final Logger LOG = LoggerFactory.getLogger(KinesisSource.class); + + private final KinesisClientProvider kinesis; + private CheckpointGenerator initialCheckpointGenerator; + + public KinesisSource(KinesisClientProvider kinesis, String streamName, + StartingPoint startingPoint) { + this(kinesis, new DynamicCheckpointGenerator(streamName, startingPoint)); + } + + private KinesisSource(KinesisClientProvider kinesisClientProvider, + CheckpointGenerator initialCheckpoint) { + this.kinesis = kinesisClientProvider; + this.initialCheckpointGenerator = initialCheckpoint; + validate(); + } + + /** + * Generate splits for reading from the stream. + * Basically, it'll try to evenly split set of shards in the stream into + * {@code desiredNumSplits} partitions. Each partition is then a split. + */ + @Override + public List<KinesisSource> split(int desiredNumSplits, + PipelineOptions options) throws Exception { + KinesisReaderCheckpoint checkpoint = + initialCheckpointGenerator.generate(SimplifiedKinesisClient.from(kinesis)); + + List<KinesisSource> sources = newArrayList(); + + for (KinesisReaderCheckpoint partition : checkpoint.splitInto(desiredNumSplits)) { + sources.add(new KinesisSource( + kinesis, + new StaticCheckpointGenerator(partition))); } - - private KinesisSource(KinesisClientProvider kinesisClientProvider, - CheckpointGenerator initialCheckpoint) { - this.kinesis = kinesisClientProvider; - this.initialCheckpointGenerator = initialCheckpoint; - validate(); + return sources; + } + + /** + * Creates reader based on given {@link KinesisReaderCheckpoint}. + * If {@link KinesisReaderCheckpoint} is not given, then we use + * {@code initialCheckpointGenerator} to generate new checkpoint. + */ + @Override + public UnboundedReader<KinesisRecord> createReader(PipelineOptions options, + KinesisReaderCheckpoint checkpointMark) { + + CheckpointGenerator checkpointGenerator = initialCheckpointGenerator; + + if (checkpointMark != null) { + checkpointGenerator = new StaticCheckpointGenerator(checkpointMark); } - /** - * Generate splits for reading from the stream. - * Basically, it'll try to evenly split set of shards in the stream into - * {@code desiredNumSplits} partitions. Each partition is then a split. - */ - @Override - public List<KinesisSource> split(int desiredNumSplits, - PipelineOptions options) throws Exception { - KinesisReaderCheckpoint checkpoint = - initialCheckpointGenerator.generate(SimplifiedKinesisClient.from(kinesis)); - - List<KinesisSource> sources = newArrayList(); - - for (KinesisReaderCheckpoint partition : checkpoint.splitInto(desiredNumSplits)) { - sources.add(new KinesisSource( - kinesis, - new StaticCheckpointGenerator(partition))); - } - return sources; - } - - /** - * Creates reader based on given {@link KinesisReaderCheckpoint}. - * If {@link KinesisReaderCheckpoint} is not given, then we use - * {@code initialCheckpointGenerator} to generate new checkpoint. - */ - @Override - public UnboundedReader<KinesisRecord> createReader(PipelineOptions options, - KinesisReaderCheckpoint checkpointMark) { - - CheckpointGenerator checkpointGenerator = initialCheckpointGenerator; - - if (checkpointMark != null) { - checkpointGenerator = new StaticCheckpointGenerator(checkpointMark); - } - - LOG.info("Creating new reader using {}", checkpointGenerator); - - return new KinesisReader( - SimplifiedKinesisClient.from(kinesis), - checkpointGenerator, - this); - } - - @Override - public Coder<KinesisReaderCheckpoint> getCheckpointMarkCoder() { - return SerializableCoder.of(KinesisReaderCheckpoint.class); - } - - @Override - public void validate() { - checkNotNull(kinesis); - checkNotNull(initialCheckpointGenerator); - } - - @Override - public Coder<KinesisRecord> getDefaultOutputCoder() { - return KinesisRecordCoder.of(); - } + LOG.info("Creating new reader using {}", checkpointGenerator); + + return new KinesisReader( + SimplifiedKinesisClient.from(kinesis), + checkpointGenerator, + this); + } + + @Override + public Coder<KinesisReaderCheckpoint> getCheckpointMarkCoder() { + return SerializableCoder.of(KinesisReaderCheckpoint.class); + } + + @Override + public void validate() { + checkNotNull(kinesis); + checkNotNull(initialCheckpointGenerator); + } + + @Override + public Coder<KinesisRecord> getDefaultOutputCoder() { + return KinesisRecordCoder.of(); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RecordFilter.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RecordFilter.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RecordFilter.java index 40e65fc..eca725c 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RecordFilter.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RecordFilter.java @@ -21,7 +21,6 @@ import static com.google.common.collect.Lists.newArrayList; import java.util.List; - /** * Filters out records, which were already processed and checkpointed. * @@ -29,13 +28,14 @@ import java.util.List; * accuracy, not with "subSequenceNumber" accuracy. */ class RecordFilter { - public List<KinesisRecord> apply(List<KinesisRecord> records, ShardCheckpoint checkpoint) { - List<KinesisRecord> filteredRecords = newArrayList(); - for (KinesisRecord record : records) { - if (checkpoint.isBeforeOrAt(record)) { - filteredRecords.add(record); - } - } - return filteredRecords; + + public List<KinesisRecord> apply(List<KinesisRecord> records, ShardCheckpoint checkpoint) { + List<KinesisRecord> filteredRecords = newArrayList(); + for (KinesisRecord record : records) { + if (checkpoint.isBeforeOrAt(record)) { + filteredRecords.add(record); + } } + return filteredRecords; + } } http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java index e4ff541..806d982 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java @@ -27,27 +27,28 @@ import java.util.Iterator; * Very simple implementation of round robin algorithm. */ class RoundRobin<T> implements Iterable<T> { - private final Deque<T> deque; - public RoundRobin(Iterable<T> collection) { - this.deque = newArrayDeque(collection); - checkArgument(!deque.isEmpty(), "Tried to initialize RoundRobin with empty collection"); - } + private final Deque<T> deque; - public T getCurrent() { - return deque.getFirst(); - } + public RoundRobin(Iterable<T> collection) { + this.deque = newArrayDeque(collection); + checkArgument(!deque.isEmpty(), "Tried to initialize RoundRobin with empty collection"); + } - public void moveForward() { - deque.addLast(deque.removeFirst()); - } + public T getCurrent() { + return deque.getFirst(); + } - public int size() { - return deque.size(); - } + public void moveForward() { + deque.addLast(deque.removeFirst()); + } - @Override - public Iterator<T> iterator() { - return deque.iterator(); - } + public int size() { + return deque.size(); + } + + @Override + public Iterator<T> iterator() { + return deque.iterator(); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java index 6aa3504..95f97b8 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.io.kinesis; - import static com.amazonaws.services.kinesis.model.ShardIteratorType.AFTER_SEQUENCE_NUMBER; import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_SEQUENCE_NUMBER; import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_TIMESTAMP; @@ -27,9 +26,10 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.ShardIteratorType; + import java.io.Serializable; -import org.joda.time.Instant; +import org.joda.time.Instant; /** * Checkpoint mark for single shard in the stream. @@ -45,131 +45,132 @@ import org.joda.time.Instant; * This class is immutable. */ class ShardCheckpoint implements Serializable { - private final String streamName; - private final String shardId; - private final String sequenceNumber; - private final ShardIteratorType shardIteratorType; - private final Long subSequenceNumber; - private final Instant timestamp; - - public ShardCheckpoint(String streamName, String shardId, StartingPoint - startingPoint) { - this(streamName, shardId, - ShardIteratorType.fromValue(startingPoint.getPositionName()), - startingPoint.getTimestamp()); - } - - public ShardCheckpoint(String streamName, String shardId, ShardIteratorType - shardIteratorType, Instant timestamp) { - this(streamName, shardId, shardIteratorType, null, null, timestamp); - } - - public ShardCheckpoint(String streamName, String shardId, ShardIteratorType - shardIteratorType, String sequenceNumber, Long subSequenceNumber) { - this(streamName, shardId, shardIteratorType, sequenceNumber, subSequenceNumber, null); - } - - private ShardCheckpoint(String streamName, String shardId, ShardIteratorType shardIteratorType, - String sequenceNumber, Long subSequenceNumber, Instant timestamp) { - this.shardIteratorType = checkNotNull(shardIteratorType, "shardIteratorType"); - this.streamName = checkNotNull(streamName, "streamName"); - this.shardId = checkNotNull(shardId, "shardId"); - if (shardIteratorType == AT_SEQUENCE_NUMBER || shardIteratorType == AFTER_SEQUENCE_NUMBER) { - checkNotNull(sequenceNumber, - "You must provide sequence number for AT_SEQUENCE_NUMBER" - + " or AFTER_SEQUENCE_NUMBER"); - } else { - checkArgument(sequenceNumber == null, - "Sequence number must be null for LATEST, TRIM_HORIZON or AT_TIMESTAMP"); - } - if (shardIteratorType == AT_TIMESTAMP) { - checkNotNull(timestamp, - "You must provide timestamp for AT_SEQUENCE_NUMBER" - + " or AFTER_SEQUENCE_NUMBER"); - } else { - checkArgument(timestamp == null, - "Timestamp must be null for an iterator type other than AT_TIMESTAMP"); - } - - this.subSequenceNumber = subSequenceNumber; - this.sequenceNumber = sequenceNumber; - this.timestamp = timestamp; - } - - /** - * Used to compare {@link ShardCheckpoint} object to {@link KinesisRecord}. Depending - * on the the underlying shardIteratorType, it will either compare the timestamp or the - * {@link ExtendedSequenceNumber}. - * - * @param other - * @return if current checkpoint mark points before or at given {@link ExtendedSequenceNumber} - */ - public boolean isBeforeOrAt(KinesisRecord other) { - if (shardIteratorType == AT_TIMESTAMP) { - return timestamp.compareTo(other.getApproximateArrivalTimestamp()) <= 0; - } - int result = extendedSequenceNumber().compareTo(other.getExtendedSequenceNumber()); - if (result == 0) { - return shardIteratorType == AT_SEQUENCE_NUMBER; - } - return result < 0; - } - - private ExtendedSequenceNumber extendedSequenceNumber() { - String fullSequenceNumber = sequenceNumber; - if (fullSequenceNumber == null) { - fullSequenceNumber = shardIteratorType.toString(); - } - return new ExtendedSequenceNumber(fullSequenceNumber, subSequenceNumber); - } - @Override - public String toString() { - return String.format("Checkpoint %s for stream %s, shard %s: %s", shardIteratorType, - streamName, shardId, - sequenceNumber); + private final String streamName; + private final String shardId; + private final String sequenceNumber; + private final ShardIteratorType shardIteratorType; + private final Long subSequenceNumber; + private final Instant timestamp; + + public ShardCheckpoint(String streamName, String shardId, StartingPoint + startingPoint) { + this(streamName, shardId, + ShardIteratorType.fromValue(startingPoint.getPositionName()), + startingPoint.getTimestamp()); + } + + public ShardCheckpoint(String streamName, String shardId, ShardIteratorType + shardIteratorType, Instant timestamp) { + this(streamName, shardId, shardIteratorType, null, null, timestamp); + } + + public ShardCheckpoint(String streamName, String shardId, ShardIteratorType + shardIteratorType, String sequenceNumber, Long subSequenceNumber) { + this(streamName, shardId, shardIteratorType, sequenceNumber, subSequenceNumber, null); + } + + private ShardCheckpoint(String streamName, String shardId, ShardIteratorType shardIteratorType, + String sequenceNumber, Long subSequenceNumber, Instant timestamp) { + this.shardIteratorType = checkNotNull(shardIteratorType, "shardIteratorType"); + this.streamName = checkNotNull(streamName, "streamName"); + this.shardId = checkNotNull(shardId, "shardId"); + if (shardIteratorType == AT_SEQUENCE_NUMBER || shardIteratorType == AFTER_SEQUENCE_NUMBER) { + checkNotNull(sequenceNumber, + "You must provide sequence number for AT_SEQUENCE_NUMBER" + + " or AFTER_SEQUENCE_NUMBER"); + } else { + checkArgument(sequenceNumber == null, + "Sequence number must be null for LATEST, TRIM_HORIZON or AT_TIMESTAMP"); } - - public ShardRecordsIterator getShardRecordsIterator(SimplifiedKinesisClient kinesis) - throws TransientKinesisException { - return new ShardRecordsIterator(this, kinesis); + if (shardIteratorType == AT_TIMESTAMP) { + checkNotNull(timestamp, + "You must provide timestamp for AT_SEQUENCE_NUMBER" + + " or AFTER_SEQUENCE_NUMBER"); + } else { + checkArgument(timestamp == null, + "Timestamp must be null for an iterator type other than AT_TIMESTAMP"); } - public String getShardIterator(SimplifiedKinesisClient kinesisClient) - throws TransientKinesisException { - if (checkpointIsInTheMiddleOfAUserRecord()) { - return kinesisClient.getShardIterator(streamName, - shardId, AT_SEQUENCE_NUMBER, - sequenceNumber, null); - } - return kinesisClient.getShardIterator(streamName, - shardId, shardIteratorType, - sequenceNumber, timestamp); + this.subSequenceNumber = subSequenceNumber; + this.sequenceNumber = sequenceNumber; + this.timestamp = timestamp; + } + + /** + * Used to compare {@link ShardCheckpoint} object to {@link KinesisRecord}. Depending + * on the the underlying shardIteratorType, it will either compare the timestamp or the + * {@link ExtendedSequenceNumber}. + * + * @param other + * @return if current checkpoint mark points before or at given {@link ExtendedSequenceNumber} + */ + public boolean isBeforeOrAt(KinesisRecord other) { + if (shardIteratorType == AT_TIMESTAMP) { + return timestamp.compareTo(other.getApproximateArrivalTimestamp()) <= 0; } - - private boolean checkpointIsInTheMiddleOfAUserRecord() { - return shardIteratorType == AFTER_SEQUENCE_NUMBER && subSequenceNumber != null; + int result = extendedSequenceNumber().compareTo(other.getExtendedSequenceNumber()); + if (result == 0) { + return shardIteratorType == AT_SEQUENCE_NUMBER; } + return result < 0; + } - /** - * Used to advance checkpoint mark to position after given {@link Record}. - * - * @param record - * @return new checkpoint object pointing directly after given {@link Record} - */ - public ShardCheckpoint moveAfter(KinesisRecord record) { - return new ShardCheckpoint( - streamName, shardId, - AFTER_SEQUENCE_NUMBER, - record.getSequenceNumber(), - record.getSubSequenceNumber()); + private ExtendedSequenceNumber extendedSequenceNumber() { + String fullSequenceNumber = sequenceNumber; + if (fullSequenceNumber == null) { + fullSequenceNumber = shardIteratorType.toString(); } - - public String getStreamName() { - return streamName; - } - - public String getShardId() { - return shardId; + return new ExtendedSequenceNumber(fullSequenceNumber, subSequenceNumber); + } + + @Override + public String toString() { + return String.format("Checkpoint %s for stream %s, shard %s: %s", shardIteratorType, + streamName, shardId, + sequenceNumber); + } + + public ShardRecordsIterator getShardRecordsIterator(SimplifiedKinesisClient kinesis) + throws TransientKinesisException { + return new ShardRecordsIterator(this, kinesis); + } + + public String getShardIterator(SimplifiedKinesisClient kinesisClient) + throws TransientKinesisException { + if (checkpointIsInTheMiddleOfAUserRecord()) { + return kinesisClient.getShardIterator(streamName, + shardId, AT_SEQUENCE_NUMBER, + sequenceNumber, null); } + return kinesisClient.getShardIterator(streamName, + shardId, shardIteratorType, + sequenceNumber, timestamp); + } + + private boolean checkpointIsInTheMiddleOfAUserRecord() { + return shardIteratorType == AFTER_SEQUENCE_NUMBER && subSequenceNumber != null; + } + + /** + * Used to advance checkpoint mark to position after given {@link Record}. + * + * @param record + * @return new checkpoint object pointing directly after given {@link Record} + */ + public ShardCheckpoint moveAfter(KinesisRecord record) { + return new ShardCheckpoint( + streamName, shardId, + AFTER_SEQUENCE_NUMBER, + record.getSequenceNumber(), + record.getSubSequenceNumber()); + } + + public String getStreamName() { + return streamName; + } + + public String getShardId() { + return shardId; + } } http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java index 872f604..a69c6c1 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java @@ -21,7 +21,9 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.collect.Queues.newArrayDeque; import com.amazonaws.services.kinesis.model.ExpiredIteratorException; + import java.util.Deque; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,68 +33,68 @@ import org.slf4j.LoggerFactory; * Then the caller of {@link ShardRecordsIterator#next()} can read from queue one by one. */ class ShardRecordsIterator { - private static final Logger LOG = LoggerFactory.getLogger(ShardRecordsIterator.class); - private final SimplifiedKinesisClient kinesis; - private final RecordFilter filter; - private ShardCheckpoint checkpoint; - private String shardIterator; - private Deque<KinesisRecord> data = newArrayDeque(); + private static final Logger LOG = LoggerFactory.getLogger(ShardRecordsIterator.class); - public ShardRecordsIterator(final ShardCheckpoint initialCheckpoint, - SimplifiedKinesisClient simplifiedKinesisClient) throws - TransientKinesisException { - this(initialCheckpoint, simplifiedKinesisClient, new RecordFilter()); - } + private final SimplifiedKinesisClient kinesis; + private final RecordFilter filter; + private ShardCheckpoint checkpoint; + private String shardIterator; + private Deque<KinesisRecord> data = newArrayDeque(); - public ShardRecordsIterator(final ShardCheckpoint initialCheckpoint, - SimplifiedKinesisClient simplifiedKinesisClient, - RecordFilter filter) throws - TransientKinesisException { + public ShardRecordsIterator(final ShardCheckpoint initialCheckpoint, + SimplifiedKinesisClient simplifiedKinesisClient) throws + TransientKinesisException { + this(initialCheckpoint, simplifiedKinesisClient, new RecordFilter()); + } - this.checkpoint = checkNotNull(initialCheckpoint, "initialCheckpoint"); - this.filter = checkNotNull(filter, "filter"); - this.kinesis = checkNotNull(simplifiedKinesisClient, "simplifiedKinesisClient"); - shardIterator = checkpoint.getShardIterator(kinesis); - } + public ShardRecordsIterator(final ShardCheckpoint initialCheckpoint, + SimplifiedKinesisClient simplifiedKinesisClient, + RecordFilter filter) throws + TransientKinesisException { - /** - * Returns record if there's any present. - * Returns absent() if there are no new records at this time in the shard. - */ - public CustomOptional<KinesisRecord> next() throws TransientKinesisException { - readMoreIfNecessary(); + this.checkpoint = checkNotNull(initialCheckpoint, "initialCheckpoint"); + this.filter = checkNotNull(filter, "filter"); + this.kinesis = checkNotNull(simplifiedKinesisClient, "simplifiedKinesisClient"); + shardIterator = checkpoint.getShardIterator(kinesis); + } - if (data.isEmpty()) { - return CustomOptional.absent(); - } else { - KinesisRecord record = data.removeFirst(); - checkpoint = checkpoint.moveAfter(record); - return CustomOptional.of(record); - } - } + /** + * Returns record if there's any present. + * Returns absent() if there are no new records at this time in the shard. + */ + public CustomOptional<KinesisRecord> next() throws TransientKinesisException { + readMoreIfNecessary(); - private void readMoreIfNecessary() throws TransientKinesisException { - if (data.isEmpty()) { - GetKinesisRecordsResult response; - try { - response = kinesis.getRecords(shardIterator, checkpoint.getStreamName(), - checkpoint.getShardId()); - } catch (ExpiredIteratorException e) { - LOG.info("Refreshing expired iterator", e); - shardIterator = checkpoint.getShardIterator(kinesis); - response = kinesis.getRecords(shardIterator, checkpoint.getStreamName(), - checkpoint.getShardId()); - } - LOG.debug("Fetched {} new records", response.getRecords().size()); - shardIterator = response.getNextShardIterator(); - data.addAll(filter.apply(response.getRecords(), checkpoint)); - } + if (data.isEmpty()) { + return CustomOptional.absent(); + } else { + KinesisRecord record = data.removeFirst(); + checkpoint = checkpoint.moveAfter(record); + return CustomOptional.of(record); } + } - public ShardCheckpoint getCheckpoint() { - return checkpoint; + private void readMoreIfNecessary() throws TransientKinesisException { + if (data.isEmpty()) { + GetKinesisRecordsResult response; + try { + response = kinesis.getRecords(shardIterator, checkpoint.getStreamName(), + checkpoint.getShardId()); + } catch (ExpiredIteratorException e) { + LOG.info("Refreshing expired iterator", e); + shardIterator = checkpoint.getShardIterator(kinesis); + response = kinesis.getRecords(shardIterator, checkpoint.getStreamName(), + checkpoint.getShardId()); + } + LOG.debug("Fetched {} new records", response.getRecords().size()); + shardIterator = response.getNextShardIterator(); + data.addAll(filter.apply(response.getRecords(), checkpoint)); } + } + public ShardCheckpoint getCheckpoint() { + return checkpoint; + } }
