http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index 482f5a2..b69bc83 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -83,7 +83,6 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetResetStrategy; @@ -365,35 +364,6 @@ public class KafkaIOTest { } @Test - public void testUnreachableKafkaBrokers() { - // Expect an exception when the Kafka brokers are not reachable on the workers. - // We specify partitions explicitly so that splitting does not involve server interaction. - // Set request timeout to 10ms so that test does not take long. - - thrown.expect(Exception.class); - thrown.expectMessage("Reader-0: Timeout while initializing partition 'test-0'"); - - int numElements = 1000; - PCollection<Long> input = p - .apply(KafkaIO.<Integer, Long>read() - .withBootstrapServers("8.8.8.8:9092") // Google public DNS ip. - .withTopicPartitions(ImmutableList.of(new TopicPartition("test", 0))) - .withKeyDeserializer(IntegerDeserializer.class) - .withValueDeserializer(LongDeserializer.class) - .updateConsumerProperties(ImmutableMap.<String, Object>of( - ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10, - ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5, - ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 8, - ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 8)) - .withMaxNumRecords(10) - .withoutMetadata()) - .apply(Values.<Long>create()); - - addCountingAsserts(input, numElements); - p.run(); - } - - @Test public void testUnboundedSourceWithSingleTopic() { // same as testUnboundedSource, but with single topic
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/pom.xml b/sdks/java/io/kinesis/pom.xml index 46d5e26..cb7064b 100644 --- a/sdks/java/io/kinesis/pom.xml +++ b/sdks/java/io/kinesis/pom.xml @@ -21,7 +21,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-parent</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java index 2629c57..919d85a 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.kinesis; + import java.io.Serializable; /** @@ -24,7 +25,6 @@ import java.io.Serializable; * How exactly the checkpoint is generated is up to implementing class. */ interface CheckpointGenerator extends Serializable { - - KinesisReaderCheckpoint generate(SimplifiedKinesisClient client) - throws TransientKinesisException; + KinesisReaderCheckpoint generate(SimplifiedKinesisClient client) + throws TransientKinesisException; } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java index 5a28214..4bed0e3 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java @@ -24,79 +24,76 @@ import java.util.Objects; * Similar to Guava {@code Optional}, but throws {@link NoSuchElementException} for missing element. */ abstract class CustomOptional<T> { + @SuppressWarnings("unchecked") + public static <T> CustomOptional<T> absent() { + return (Absent<T>) Absent.INSTANCE; + } - @SuppressWarnings("unchecked") - public static <T> CustomOptional<T> absent() { - return (Absent<T>) Absent.INSTANCE; - } + public static <T> CustomOptional<T> of(T v) { + return new Present<>(v); + } - public static <T> CustomOptional<T> of(T v) { - return new Present<>(v); - } + public abstract boolean isPresent(); - public abstract boolean isPresent(); + public abstract T get(); - public abstract T get(); + private static class Present<T> extends CustomOptional<T> { + private final T value; - private static class Present<T> extends CustomOptional<T> { + private Present(T value) { + this.value = value; + } - private final T value; + @Override + public boolean isPresent() { + return true; + } - private Present(T value) { - this.value = value; - } + @Override + public T get() { + return value; + } - @Override - public boolean isPresent() { - return true; - } + @Override + public boolean equals(Object o) { + if (!(o instanceof Present)) { + return false; + } - @Override - public T get() { - return value; - } - - @Override - public boolean equals(Object o) { - if (!(o instanceof Present)) { - return false; - } + Present<?> present = (Present<?>) o; + return Objects.equals(value, present.value); + } - Present<?> present = (Present<?>) o; - return Objects.equals(value, present.value); + @Override + public int hashCode() { + return Objects.hash(value); + } } - @Override - public int hashCode() { - return Objects.hash(value); - } - } + private static class Absent<T> extends CustomOptional<T> { + private static final Absent<Object> INSTANCE = new Absent<>(); - private static class Absent<T> extends CustomOptional<T> { + private Absent() { + } - private static final Absent<Object> INSTANCE = new Absent<>(); + @Override + public boolean isPresent() { + return false; + } - private Absent() { - } - - @Override - public boolean isPresent() { - return false; - } + @Override + public T get() { + throw new NoSuchElementException(); + } - @Override - public T get() { - throw new NoSuchElementException(); - } - - @Override - public boolean equals(Object o) { - return o instanceof Absent; - } + @Override + public boolean equals(Object o) { + return o instanceof Absent; + } - @Override - public int hashCode() { - return 0; + @Override + public int hashCode() { + return 0; + } } - } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java index 9933019..2ec293c 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java @@ -28,31 +28,29 @@ import com.google.common.base.Function; * List of shards is obtained dynamically on call to {@link #generate(SimplifiedKinesisClient)}. */ class DynamicCheckpointGenerator implements CheckpointGenerator { - - private final String streamName; - private final StartingPoint startingPoint; - - public DynamicCheckpointGenerator(String streamName, StartingPoint startingPoint) { - this.streamName = checkNotNull(streamName, "streamName"); - this.startingPoint = checkNotNull(startingPoint, "startingPoint"); - } - - @Override - public KinesisReaderCheckpoint generate(SimplifiedKinesisClient kinesis) - throws TransientKinesisException { - return new KinesisReaderCheckpoint( - transform(kinesis.listShards(streamName), new Function<Shard, ShardCheckpoint>() { - - @Override - public ShardCheckpoint apply(Shard shard) { - return new ShardCheckpoint(streamName, shard.getShardId(), startingPoint); - } - }) - ); - } - - @Override - public String toString() { - return String.format("Checkpoint generator for %s: %s", streamName, startingPoint); - } + private final String streamName; + private final StartingPoint startingPoint; + + public DynamicCheckpointGenerator(String streamName, StartingPoint startingPoint) { + this.streamName = checkNotNull(streamName, "streamName"); + this.startingPoint = checkNotNull(startingPoint, "startingPoint"); + } + + @Override + public KinesisReaderCheckpoint generate(SimplifiedKinesisClient kinesis) + throws TransientKinesisException { + return new KinesisReaderCheckpoint( + transform(kinesis.listShards(streamName), new Function<Shard, ShardCheckpoint>() { + @Override + public ShardCheckpoint apply(Shard shard) { + return new ShardCheckpoint(streamName, shard.getShardId(), startingPoint); + } + }) + ); + } + + @Override + public String toString() { + return String.format("Checkpoint generator for %s: %s", streamName, startingPoint); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java index f605f55..5a34d7d 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java @@ -21,7 +21,6 @@ import static com.google.common.collect.Lists.transform; import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord; import com.google.common.base.Function; - import java.util.List; import javax.annotation.Nullable; @@ -29,29 +28,27 @@ import javax.annotation.Nullable; * Represents the output of 'get' operation on Kinesis stream. */ class GetKinesisRecordsResult { - - private final List<KinesisRecord> records; - private final String nextShardIterator; - - public GetKinesisRecordsResult(List<UserRecord> records, String nextShardIterator, - final String streamName, final String shardId) { - this.records = transform(records, new Function<UserRecord, KinesisRecord>() { - - @Nullable - @Override - public KinesisRecord apply(@Nullable UserRecord input) { - assert input != null; // to make FindBugs happy - return new KinesisRecord(input, streamName, shardId); - } - }); - this.nextShardIterator = nextShardIterator; - } - - public List<KinesisRecord> getRecords() { - return records; - } - - public String getNextShardIterator() { - return nextShardIterator; - } + private final List<KinesisRecord> records; + private final String nextShardIterator; + + public GetKinesisRecordsResult(List<UserRecord> records, String nextShardIterator, + final String streamName, final String shardId) { + this.records = transform(records, new Function<UserRecord, KinesisRecord>() { + @Nullable + @Override + public KinesisRecord apply(@Nullable UserRecord input) { + assert input != null; // to make FindBugs happy + return new KinesisRecord(input, streamName, shardId); + } + }); + this.nextShardIterator = nextShardIterator; + } + + public List<KinesisRecord> getRecords() { + return records; + } + + public String getNextShardIterator() { + return nextShardIterator; + } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java index b5b721e..c7fd7f6 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.io.kinesis; import com.amazonaws.services.kinesis.AmazonKinesis; - import java.io.Serializable; /** @@ -28,6 +27,5 @@ import java.io.Serializable; * {@link Serializable} to ensure it can be sent to worker machines. */ interface KinesisClientProvider extends Serializable { - - AmazonKinesis get(); + AmazonKinesis get(); } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java index bc8ada1..c97316d 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.kinesis; + import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; @@ -28,9 +29,7 @@ import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.AmazonKinesisClient; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; import com.google.auto.value.AutoValue; - import javax.annotation.Nullable; - import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource; import org.apache.beam.sdk.transforms.PTransform; @@ -101,150 +100,144 @@ import org.joda.time.Instant; * }</pre> * */ -@Experimental(Experimental.Kind.SOURCE_SINK) +@Experimental public final class KinesisIO { - - /** Returns a new {@link Read} transform for reading from Kinesis. */ - public static Read read() { - return new AutoValue_KinesisIO_Read.Builder().setMaxNumRecords(-1).build(); - } - - /** Implementation of {@link #read}. */ - @AutoValue - public abstract static class Read extends PTransform<PBegin, PCollection<KinesisRecord>> { - - @Nullable - abstract String getStreamName(); - - @Nullable - abstract StartingPoint getInitialPosition(); - - @Nullable - abstract KinesisClientProvider getClientProvider(); - - abstract int getMaxNumRecords(); - - @Nullable - abstract Duration getMaxReadTime(); - - abstract Builder toBuilder(); - - @AutoValue.Builder - abstract static class Builder { - - abstract Builder setStreamName(String streamName); - - abstract Builder setInitialPosition(StartingPoint startingPoint); - - abstract Builder setClientProvider(KinesisClientProvider clientProvider); - - abstract Builder setMaxNumRecords(int maxNumRecords); - - abstract Builder setMaxReadTime(Duration maxReadTime); - - abstract Read build(); + /** Returns a new {@link Read} transform for reading from Kinesis. */ + public static Read read() { + return new AutoValue_KinesisIO_Read.Builder().setMaxNumRecords(-1).build(); } - /** - * Specify reading from streamName at some initial position. - */ - public Read from(String streamName, InitialPositionInStream initialPosition) { - return toBuilder() - .setStreamName(streamName) - .setInitialPosition( - new StartingPoint(checkNotNull(initialPosition, "initialPosition"))) - .build(); - } - - /** - * Specify reading from streamName beginning at given {@link Instant}. - * This {@link Instant} must be in the past, i.e. before {@link Instant#now()}. - */ - public Read from(String streamName, Instant initialTimestamp) { - return toBuilder() - .setStreamName(streamName) - .setInitialPosition( - new StartingPoint(checkNotNull(initialTimestamp, "initialTimestamp"))) - .build(); - } - - /** - * Allows to specify custom {@link KinesisClientProvider}. - * {@link KinesisClientProvider} provides {@link AmazonKinesis} instances which are later - * used for communication with Kinesis. - * You should use this method if {@link Read#withClientProvider(String, String, Regions)} - * does not suit your needs. - */ - public Read withClientProvider(KinesisClientProvider kinesisClientProvider) { - return toBuilder().setClientProvider(kinesisClientProvider).build(); - } - - /** - * Specify credential details and region to be used to read from Kinesis. - * If you need more sophisticated credential protocol, then you should look at - * {@link Read#withClientProvider(KinesisClientProvider)}. - */ - public Read withClientProvider(String awsAccessKey, String awsSecretKey, Regions region) { - return withClientProvider(new BasicKinesisProvider(awsAccessKey, awsSecretKey, region)); - } - - /** Specifies to read at most a given number of records. */ - public Read withMaxNumRecords(int maxNumRecords) { - checkArgument( - maxNumRecords > 0, "maxNumRecords must be positive, but was: %s", maxNumRecords); - return toBuilder().setMaxNumRecords(maxNumRecords).build(); - } - - /** Specifies to read at most a given number of records. */ - public Read withMaxReadTime(Duration maxReadTime) { - checkNotNull(maxReadTime, "maxReadTime"); - return toBuilder().setMaxReadTime(maxReadTime).build(); - } - - @Override - public PCollection<KinesisRecord> expand(PBegin input) { - org.apache.beam.sdk.io.Read.Unbounded<KinesisRecord> read = - org.apache.beam.sdk.io.Read.from( - new KinesisSource(getClientProvider(), getStreamName(), getInitialPosition())); - if (getMaxNumRecords() > 0) { - BoundedReadFromUnboundedSource<KinesisRecord> bounded = - read.withMaxNumRecords(getMaxNumRecords()); - return getMaxReadTime() == null - ? input.apply(bounded) - : input.apply(bounded.withMaxReadTime(getMaxReadTime())); - } else { - return getMaxReadTime() == null - ? input.apply(read) - : input.apply(read.withMaxReadTime(getMaxReadTime())); - } - } - - private static final class BasicKinesisProvider implements KinesisClientProvider { - - private final String accessKey; - private final String secretKey; - private final Regions region; - - private BasicKinesisProvider(String accessKey, String secretKey, Regions region) { - this.accessKey = checkNotNull(accessKey, "accessKey"); - this.secretKey = checkNotNull(secretKey, "secretKey"); - this.region = checkNotNull(region, "region"); - } - - private AWSCredentialsProvider getCredentialsProvider() { - return new StaticCredentialsProvider(new BasicAWSCredentials( - accessKey, - secretKey - )); - - } - - @Override - public AmazonKinesis get() { - AmazonKinesisClient client = new AmazonKinesisClient(getCredentialsProvider()); - client.withRegion(region); - return client; - } + /** Implementation of {@link #read}. */ + @AutoValue + public abstract static class Read extends PTransform<PBegin, PCollection<KinesisRecord>> { + @Nullable + abstract String getStreamName(); + + @Nullable + abstract StartingPoint getInitialPosition(); + + @Nullable + abstract KinesisClientProvider getClientProvider(); + + abstract int getMaxNumRecords(); + + @Nullable + abstract Duration getMaxReadTime(); + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setStreamName(String streamName); + abstract Builder setInitialPosition(StartingPoint startingPoint); + abstract Builder setClientProvider(KinesisClientProvider clientProvider); + abstract Builder setMaxNumRecords(int maxNumRecords); + abstract Builder setMaxReadTime(Duration maxReadTime); + + abstract Read build(); + } + + /** + * Specify reading from streamName at some initial position. + */ + public Read from(String streamName, InitialPositionInStream initialPosition) { + return toBuilder() + .setStreamName(streamName) + .setInitialPosition( + new StartingPoint(checkNotNull(initialPosition, "initialPosition"))) + .build(); + } + + /** + * Specify reading from streamName beginning at given {@link Instant}. + * This {@link Instant} must be in the past, i.e. before {@link Instant#now()}. + */ + public Read from(String streamName, Instant initialTimestamp) { + return toBuilder() + .setStreamName(streamName) + .setInitialPosition( + new StartingPoint(checkNotNull(initialTimestamp, "initialTimestamp"))) + .build(); + } + + /** + * Allows to specify custom {@link KinesisClientProvider}. + * {@link KinesisClientProvider} provides {@link AmazonKinesis} instances which are later + * used for communication with Kinesis. + * You should use this method if {@link Read#withClientProvider(String, String, Regions)} + * does not suit your needs. + */ + public Read withClientProvider(KinesisClientProvider kinesisClientProvider) { + return toBuilder().setClientProvider(kinesisClientProvider).build(); + } + + /** + * Specify credential details and region to be used to read from Kinesis. + * If you need more sophisticated credential protocol, then you should look at + * {@link Read#withClientProvider(KinesisClientProvider)}. + */ + public Read withClientProvider(String awsAccessKey, String awsSecretKey, Regions region) { + return withClientProvider(new BasicKinesisProvider(awsAccessKey, awsSecretKey, region)); + } + + /** Specifies to read at most a given number of records. */ + public Read withMaxNumRecords(int maxNumRecords) { + checkArgument( + maxNumRecords > 0, "maxNumRecords must be positive, but was: %s", maxNumRecords); + return toBuilder().setMaxNumRecords(maxNumRecords).build(); + } + + /** Specifies to read at most a given number of records. */ + public Read withMaxReadTime(Duration maxReadTime) { + checkNotNull(maxReadTime, "maxReadTime"); + return toBuilder().setMaxReadTime(maxReadTime).build(); + } + + @Override + public PCollection<KinesisRecord> expand(PBegin input) { + org.apache.beam.sdk.io.Read.Unbounded<KinesisRecord> read = + org.apache.beam.sdk.io.Read.from( + new KinesisSource(getClientProvider(), getStreamName(), getInitialPosition())); + if (getMaxNumRecords() > 0) { + BoundedReadFromUnboundedSource<KinesisRecord> bounded = + read.withMaxNumRecords(getMaxNumRecords()); + return getMaxReadTime() == null + ? input.apply(bounded) + : input.apply(bounded.withMaxReadTime(getMaxReadTime())); + } else { + return getMaxReadTime() == null + ? input.apply(read) + : input.apply(read.withMaxReadTime(getMaxReadTime())); + } + } + + private static final class BasicKinesisProvider implements KinesisClientProvider { + + private final String accessKey; + private final String secretKey; + private final Regions region; + + private BasicKinesisProvider(String accessKey, String secretKey, Regions region) { + this.accessKey = checkNotNull(accessKey, "accessKey"); + this.secretKey = checkNotNull(secretKey, "secretKey"); + this.region = checkNotNull(region, "region"); + } + + + private AWSCredentialsProvider getCredentialsProvider() { + return new StaticCredentialsProvider(new BasicAWSCredentials( + accessKey, + secretKey + )); + + } + + @Override + public AmazonKinesis get() { + AmazonKinesisClient client = new AmazonKinesisClient(getCredentialsProvider()); + client.withRegion(region); + return client; + } + } } - } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java index e5c32d2..2138094 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java @@ -17,129 +17,129 @@ */ package org.apache.beam.sdk.io.kinesis; + import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.collect.Lists.newArrayList; import java.io.IOException; import java.util.List; import java.util.NoSuchElementException; - import org.apache.beam.sdk.io.UnboundedSource; import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + /** * Reads data from multiple kinesis shards in a single thread. * It uses simple round robin algorithm when fetching data from shards. */ class KinesisReader extends UnboundedSource.UnboundedReader<KinesisRecord> { + private static final Logger LOG = LoggerFactory.getLogger(KinesisReader.class); + + private final SimplifiedKinesisClient kinesis; + private final UnboundedSource<KinesisRecord, ?> source; + private final CheckpointGenerator initialCheckpointGenerator; + private RoundRobin<ShardRecordsIterator> shardIterators; + private CustomOptional<KinesisRecord> currentRecord = CustomOptional.absent(); + + public KinesisReader(SimplifiedKinesisClient kinesis, + CheckpointGenerator initialCheckpointGenerator, + UnboundedSource<KinesisRecord, ?> source) { + this.kinesis = checkNotNull(kinesis, "kinesis"); + this.initialCheckpointGenerator = + checkNotNull(initialCheckpointGenerator, "initialCheckpointGenerator"); + this.source = source; + } + + /** + * Generates initial checkpoint and instantiates iterators for shards. + */ + @Override + public boolean start() throws IOException { + LOG.info("Starting reader using {}", initialCheckpointGenerator); + + try { + KinesisReaderCheckpoint initialCheckpoint = + initialCheckpointGenerator.generate(kinesis); + List<ShardRecordsIterator> iterators = newArrayList(); + for (ShardCheckpoint checkpoint : initialCheckpoint) { + iterators.add(checkpoint.getShardRecordsIterator(kinesis)); + } + shardIterators = new RoundRobin<>(iterators); + } catch (TransientKinesisException e) { + throw new IOException(e); + } - private static final Logger LOG = LoggerFactory.getLogger(KinesisReader.class); - - private final SimplifiedKinesisClient kinesis; - private final UnboundedSource<KinesisRecord, ?> source; - private final CheckpointGenerator initialCheckpointGenerator; - private RoundRobin<ShardRecordsIterator> shardIterators; - private CustomOptional<KinesisRecord> currentRecord = CustomOptional.absent(); - - public KinesisReader(SimplifiedKinesisClient kinesis, - CheckpointGenerator initialCheckpointGenerator, - UnboundedSource<KinesisRecord, ?> source) { - this.kinesis = checkNotNull(kinesis, "kinesis"); - this.initialCheckpointGenerator = - checkNotNull(initialCheckpointGenerator, "initialCheckpointGenerator"); - this.source = source; - } - - /** - * Generates initial checkpoint and instantiates iterators for shards. - */ - @Override - public boolean start() throws IOException { - LOG.info("Starting reader using {}", initialCheckpointGenerator); - - try { - KinesisReaderCheckpoint initialCheckpoint = - initialCheckpointGenerator.generate(kinesis); - List<ShardRecordsIterator> iterators = newArrayList(); - for (ShardCheckpoint checkpoint : initialCheckpoint) { - iterators.add(checkpoint.getShardRecordsIterator(kinesis)); - } - shardIterators = new RoundRobin<>(iterators); - } catch (TransientKinesisException e) { - throw new IOException(e); + return advance(); } - return advance(); - } - - /** - * Moves to the next record in one of the shards. - * If current shard iterator can be move forward (i.e. there's a record present) then we do it. - * If not, we iterate over shards in a round-robin manner. - */ - @Override - public boolean advance() throws IOException { - try { - for (int i = 0; i < shardIterators.size(); ++i) { - currentRecord = shardIterators.getCurrent().next(); - if (currentRecord.isPresent()) { - return true; - } else { - shardIterators.moveForward(); + /** + * Moves to the next record in one of the shards. + * If current shard iterator can be move forward (i.e. there's a record present) then we do it. + * If not, we iterate over shards in a round-robin manner. + */ + @Override + public boolean advance() throws IOException { + try { + for (int i = 0; i < shardIterators.size(); ++i) { + currentRecord = shardIterators.getCurrent().next(); + if (currentRecord.isPresent()) { + return true; + } else { + shardIterators.moveForward(); + } + } + } catch (TransientKinesisException e) { + LOG.warn("Transient exception occurred", e); } - } - } catch (TransientKinesisException e) { - LOG.warn("Transient exception occurred", e); + return false; + } + + @Override + public byte[] getCurrentRecordId() throws NoSuchElementException { + return currentRecord.get().getUniqueId(); + } + + @Override + public KinesisRecord getCurrent() throws NoSuchElementException { + return currentRecord.get(); + } + + /** + * When {@link KinesisReader} was advanced to the current record. + * We cannot use approximate arrival timestamp given for each record by Kinesis as it + * is not guaranteed to be accurate - this could lead to mark some records as "late" + * even if they were not. + */ + @Override + public Instant getCurrentTimestamp() throws NoSuchElementException { + return currentRecord.get().getReadTime(); + } + + @Override + public void close() throws IOException { + } + + /** + * Current time. + * We cannot give better approximation of the watermark with current semantics of + * {@link KinesisReader#getCurrentTimestamp()}, because we don't know when the next + * {@link KinesisReader#advance()} will be called. + */ + @Override + public Instant getWatermark() { + return Instant.now(); + } + + @Override + public UnboundedSource.CheckpointMark getCheckpointMark() { + return KinesisReaderCheckpoint.asCurrentStateOf(shardIterators); + } + + @Override + public UnboundedSource<KinesisRecord, ?> getCurrentSource() { + return source; } - return false; - } - - @Override - public byte[] getCurrentRecordId() throws NoSuchElementException { - return currentRecord.get().getUniqueId(); - } - - @Override - public KinesisRecord getCurrent() throws NoSuchElementException { - return currentRecord.get(); - } - - /** - * When {@link KinesisReader} was advanced to the current record. - * We cannot use approximate arrival timestamp given for each record by Kinesis as it - * is not guaranteed to be accurate - this could lead to mark some records as "late" - * even if they were not. - */ - @Override - public Instant getCurrentTimestamp() throws NoSuchElementException { - return currentRecord.get().getReadTime(); - } - - @Override - public void close() throws IOException { - } - - /** - * Current time. - * We cannot give better approximation of the watermark with current semantics of - * {@link KinesisReader#getCurrentTimestamp()}, because we don't know when the next - * {@link KinesisReader#advance()} will be called. - */ - @Override - public Instant getWatermark() { - return Instant.now(); - } - - @Override - public UnboundedSource.CheckpointMark getCheckpointMark() { - return KinesisReaderCheckpoint.asCurrentStateOf(shardIterators); - } - - @Override - public UnboundedSource<KinesisRecord, ?> getCurrentSource() { - return source; - } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java index d995e75..f0fa45d 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java @@ -23,13 +23,11 @@ import static com.google.common.collect.Lists.partition; import com.google.common.base.Function; import com.google.common.collect.ImmutableList; - import java.io.IOException; import java.io.Serializable; import java.util.Iterator; import java.util.List; import javax.annotation.Nullable; - import org.apache.beam.sdk.io.UnboundedSource; /** @@ -39,61 +37,60 @@ import org.apache.beam.sdk.io.UnboundedSource; * This class is immutable. */ class KinesisReaderCheckpoint implements Iterable<ShardCheckpoint>, UnboundedSource - .CheckpointMark, Serializable { - - private final List<ShardCheckpoint> shardCheckpoints; + .CheckpointMark, Serializable { + private final List<ShardCheckpoint> shardCheckpoints; - public KinesisReaderCheckpoint(Iterable<ShardCheckpoint> shardCheckpoints) { - this.shardCheckpoints = ImmutableList.copyOf(shardCheckpoints); - } - - public static KinesisReaderCheckpoint asCurrentStateOf(Iterable<ShardRecordsIterator> - iterators) { - return new KinesisReaderCheckpoint(transform(iterators, - new Function<ShardRecordsIterator, ShardCheckpoint>() { - - @Nullable - @Override - public ShardCheckpoint apply(@Nullable - ShardRecordsIterator shardRecordsIterator) { - assert shardRecordsIterator != null; - return shardRecordsIterator.getCheckpoint(); - } - })); - } + public KinesisReaderCheckpoint(Iterable<ShardCheckpoint> shardCheckpoints) { + this.shardCheckpoints = ImmutableList.copyOf(shardCheckpoints); + } - /** - * Splits given multi-shard checkpoint into partitions of approximately equal size. - * - * @param desiredNumSplits - upper limit for number of partitions to generate. - * @return list of checkpoints covering consecutive partitions of current checkpoint. - */ - public List<KinesisReaderCheckpoint> splitInto(int desiredNumSplits) { - int partitionSize = divideAndRoundUp(shardCheckpoints.size(), desiredNumSplits); + public static KinesisReaderCheckpoint asCurrentStateOf(Iterable<ShardRecordsIterator> + iterators) { + return new KinesisReaderCheckpoint(transform(iterators, + new Function<ShardRecordsIterator, ShardCheckpoint>() { + + @Nullable + @Override + public ShardCheckpoint apply(@Nullable + ShardRecordsIterator shardRecordsIterator) { + assert shardRecordsIterator != null; + return shardRecordsIterator.getCheckpoint(); + } + })); + } - List<KinesisReaderCheckpoint> checkpoints = newArrayList(); - for (List<ShardCheckpoint> shardPartition : partition(shardCheckpoints, partitionSize)) { - checkpoints.add(new KinesisReaderCheckpoint(shardPartition)); + /** + * Splits given multi-shard checkpoint into partitions of approximately equal size. + * + * @param desiredNumSplits - upper limit for number of partitions to generate. + * @return list of checkpoints covering consecutive partitions of current checkpoint. + */ + public List<KinesisReaderCheckpoint> splitInto(int desiredNumSplits) { + int partitionSize = divideAndRoundUp(shardCheckpoints.size(), desiredNumSplits); + + List<KinesisReaderCheckpoint> checkpoints = newArrayList(); + for (List<ShardCheckpoint> shardPartition : partition(shardCheckpoints, partitionSize)) { + checkpoints.add(new KinesisReaderCheckpoint(shardPartition)); + } + return checkpoints; } - return checkpoints; - } - private int divideAndRoundUp(int nominator, int denominator) { - return (nominator + denominator - 1) / denominator; - } + private int divideAndRoundUp(int nominator, int denominator) { + return (nominator + denominator - 1) / denominator; + } - @Override - public void finalizeCheckpoint() throws IOException { + @Override + public void finalizeCheckpoint() throws IOException { - } + } - @Override - public String toString() { - return shardCheckpoints.toString(); - } + @Override + public String toString() { + return shardCheckpoints.toString(); + } - @Override - public Iterator<ShardCheckpoint> iterator() { - return shardCheckpoints.iterator(); - } + @Override + public Iterator<ShardCheckpoint> iterator() { + return shardCheckpoints.iterator(); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java index 057b7bb..02b5370 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java @@ -22,9 +22,7 @@ import static org.apache.commons.lang.builder.HashCodeBuilder.reflectionHashCode import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord; import com.google.common.base.Charsets; - import java.nio.ByteBuffer; - import org.apache.commons.lang.builder.EqualsBuilder; import org.joda.time.Instant; @@ -32,92 +30,91 @@ import org.joda.time.Instant; * {@link UserRecord} enhanced with utility methods. */ public class KinesisRecord { - - private Instant readTime; - private String streamName; - private String shardId; - private long subSequenceNumber; - private String sequenceNumber; - private Instant approximateArrivalTimestamp; - private ByteBuffer data; - private String partitionKey; - - public KinesisRecord(UserRecord record, String streamName, String shardId) { - this(record.getData(), record.getSequenceNumber(), record.getSubSequenceNumber(), - record.getPartitionKey(), - new Instant(record.getApproximateArrivalTimestamp()), - Instant.now(), - streamName, shardId); - } - - public KinesisRecord(ByteBuffer data, String sequenceNumber, long subSequenceNumber, - String partitionKey, Instant approximateArrivalTimestamp, - Instant readTime, - String streamName, String shardId) { - this.data = data; - this.sequenceNumber = sequenceNumber; - this.subSequenceNumber = subSequenceNumber; - this.partitionKey = partitionKey; - this.approximateArrivalTimestamp = approximateArrivalTimestamp; - this.readTime = readTime; - this.streamName = streamName; - this.shardId = shardId; - } - - public ExtendedSequenceNumber getExtendedSequenceNumber() { - return new ExtendedSequenceNumber(getSequenceNumber(), getSubSequenceNumber()); - } - - /*** - * @return unique id of the record based on its position in the stream - */ - public byte[] getUniqueId() { - return getExtendedSequenceNumber().toString().getBytes(Charsets.UTF_8); - } - - public Instant getReadTime() { - return readTime; - } - - public String getStreamName() { - return streamName; - } - - public String getShardId() { - return shardId; - } - - public byte[] getDataAsBytes() { - return getData().array(); - } - - @Override - public boolean equals(Object obj) { - return EqualsBuilder.reflectionEquals(this, obj); - } - - @Override - public int hashCode() { - return reflectionHashCode(this); - } - - public long getSubSequenceNumber() { - return subSequenceNumber; - } - - public String getSequenceNumber() { - return sequenceNumber; - } - - public Instant getApproximateArrivalTimestamp() { - return approximateArrivalTimestamp; - } - - public ByteBuffer getData() { - return data; - } - - public String getPartitionKey() { - return partitionKey; - } + private Instant readTime; + private String streamName; + private String shardId; + private long subSequenceNumber; + private String sequenceNumber; + private Instant approximateArrivalTimestamp; + private ByteBuffer data; + private String partitionKey; + + public KinesisRecord(UserRecord record, String streamName, String shardId) { + this(record.getData(), record.getSequenceNumber(), record.getSubSequenceNumber(), + record.getPartitionKey(), + new Instant(record.getApproximateArrivalTimestamp()), + Instant.now(), + streamName, shardId); + } + + public KinesisRecord(ByteBuffer data, String sequenceNumber, long subSequenceNumber, + String partitionKey, Instant approximateArrivalTimestamp, + Instant readTime, + String streamName, String shardId) { + this.data = data; + this.sequenceNumber = sequenceNumber; + this.subSequenceNumber = subSequenceNumber; + this.partitionKey = partitionKey; + this.approximateArrivalTimestamp = approximateArrivalTimestamp; + this.readTime = readTime; + this.streamName = streamName; + this.shardId = shardId; + } + + public ExtendedSequenceNumber getExtendedSequenceNumber() { + return new ExtendedSequenceNumber(getSequenceNumber(), getSubSequenceNumber()); + } + + /*** + * @return unique id of the record based on its position in the stream + */ + public byte[] getUniqueId() { + return getExtendedSequenceNumber().toString().getBytes(Charsets.UTF_8); + } + + public Instant getReadTime() { + return readTime; + } + + public String getStreamName() { + return streamName; + } + + public String getShardId() { + return shardId; + } + + public byte[] getDataAsBytes() { + return getData().array(); + } + + @Override + public boolean equals(Object obj) { + return EqualsBuilder.reflectionEquals(this, obj); + } + + @Override + public int hashCode() { + return reflectionHashCode(this); + } + + public long getSubSequenceNumber() { + return subSequenceNumber; + } + + public String getSequenceNumber() { + return sequenceNumber; + } + + public Instant getApproximateArrivalTimestamp() { + return approximateArrivalTimestamp; + } + + public ByteBuffer getData() { + return data; + } + + public String getPartitionKey() { + return partitionKey; + } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java index dcf564d..f233e27 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; - import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; @@ -34,41 +33,40 @@ import org.joda.time.Instant; * A {@link Coder} for {@link KinesisRecord}. */ class KinesisRecordCoder extends AtomicCoder<KinesisRecord> { + private static final StringUtf8Coder STRING_CODER = StringUtf8Coder.of(); + private static final ByteArrayCoder BYTE_ARRAY_CODER = ByteArrayCoder.of(); + private static final InstantCoder INSTANT_CODER = InstantCoder.of(); + private static final VarLongCoder VAR_LONG_CODER = VarLongCoder.of(); - private static final StringUtf8Coder STRING_CODER = StringUtf8Coder.of(); - private static final ByteArrayCoder BYTE_ARRAY_CODER = ByteArrayCoder.of(); - private static final InstantCoder INSTANT_CODER = InstantCoder.of(); - private static final VarLongCoder VAR_LONG_CODER = VarLongCoder.of(); - - public static KinesisRecordCoder of() { - return new KinesisRecordCoder(); - } + public static KinesisRecordCoder of() { + return new KinesisRecordCoder(); + } - @Override - public void encode(KinesisRecord value, OutputStream outStream) throws - IOException { - BYTE_ARRAY_CODER.encode(value.getData().array(), outStream); - STRING_CODER.encode(value.getSequenceNumber(), outStream); - STRING_CODER.encode(value.getPartitionKey(), outStream); - INSTANT_CODER.encode(value.getApproximateArrivalTimestamp(), outStream); - VAR_LONG_CODER.encode(value.getSubSequenceNumber(), outStream); - INSTANT_CODER.encode(value.getReadTime(), outStream); - STRING_CODER.encode(value.getStreamName(), outStream); - STRING_CODER.encode(value.getShardId(), outStream); - } + @Override + public void encode(KinesisRecord value, OutputStream outStream) throws + IOException { + BYTE_ARRAY_CODER.encode(value.getData().array(), outStream); + STRING_CODER.encode(value.getSequenceNumber(), outStream); + STRING_CODER.encode(value.getPartitionKey(), outStream); + INSTANT_CODER.encode(value.getApproximateArrivalTimestamp(), outStream); + VAR_LONG_CODER.encode(value.getSubSequenceNumber(), outStream); + INSTANT_CODER.encode(value.getReadTime(), outStream); + STRING_CODER.encode(value.getStreamName(), outStream); + STRING_CODER.encode(value.getShardId(), outStream); + } - @Override - public KinesisRecord decode(InputStream inStream) throws IOException { - ByteBuffer data = ByteBuffer.wrap(BYTE_ARRAY_CODER.decode(inStream)); - String sequenceNumber = STRING_CODER.decode(inStream); - String partitionKey = STRING_CODER.decode(inStream); - Instant approximateArrivalTimestamp = INSTANT_CODER.decode(inStream); - long subSequenceNumber = VAR_LONG_CODER.decode(inStream); - Instant readTimestamp = INSTANT_CODER.decode(inStream); - String streamName = STRING_CODER.decode(inStream); - String shardId = STRING_CODER.decode(inStream); - return new KinesisRecord(data, sequenceNumber, subSequenceNumber, partitionKey, - approximateArrivalTimestamp, readTimestamp, streamName, shardId - ); - } + @Override + public KinesisRecord decode(InputStream inStream) throws IOException { + ByteBuffer data = ByteBuffer.wrap(BYTE_ARRAY_CODER.decode(inStream)); + String sequenceNumber = STRING_CODER.decode(inStream); + String partitionKey = STRING_CODER.decode(inStream); + Instant approximateArrivalTimestamp = INSTANT_CODER.decode(inStream); + long subSequenceNumber = VAR_LONG_CODER.decode(inStream); + Instant readTimestamp = INSTANT_CODER.decode(inStream); + String streamName = STRING_CODER.decode(inStream); + String shardId = STRING_CODER.decode(inStream); + return new KinesisRecord(data, sequenceNumber, subSequenceNumber, partitionKey, + approximateArrivalTimestamp, readTimestamp, streamName, shardId + ); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java index 362792b..7e67d07 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java @@ -21,7 +21,6 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.collect.Lists.newArrayList; import java.util.List; - import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.io.UnboundedSource; @@ -29,85 +28,85 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + /** * Represents source for single stream in Kinesis. */ class KinesisSource extends UnboundedSource<KinesisRecord, KinesisReaderCheckpoint> { + private static final Logger LOG = LoggerFactory.getLogger(KinesisSource.class); + + private final KinesisClientProvider kinesis; + private CheckpointGenerator initialCheckpointGenerator; - private static final Logger LOG = LoggerFactory.getLogger(KinesisSource.class); - - private final KinesisClientProvider kinesis; - private CheckpointGenerator initialCheckpointGenerator; - - public KinesisSource(KinesisClientProvider kinesis, String streamName, - StartingPoint startingPoint) { - this(kinesis, new DynamicCheckpointGenerator(streamName, startingPoint)); - } - - private KinesisSource(KinesisClientProvider kinesisClientProvider, - CheckpointGenerator initialCheckpoint) { - this.kinesis = kinesisClientProvider; - this.initialCheckpointGenerator = initialCheckpoint; - validate(); - } - - /** - * Generate splits for reading from the stream. - * Basically, it'll try to evenly split set of shards in the stream into - * {@code desiredNumSplits} partitions. Each partition is then a split. - */ - @Override - public List<KinesisSource> split(int desiredNumSplits, - PipelineOptions options) throws Exception { - KinesisReaderCheckpoint checkpoint = - initialCheckpointGenerator.generate(SimplifiedKinesisClient.from(kinesis)); - - List<KinesisSource> sources = newArrayList(); - - for (KinesisReaderCheckpoint partition : checkpoint.splitInto(desiredNumSplits)) { - sources.add(new KinesisSource( - kinesis, - new StaticCheckpointGenerator(partition))); + public KinesisSource(KinesisClientProvider kinesis, String streamName, + StartingPoint startingPoint) { + this(kinesis, new DynamicCheckpointGenerator(streamName, startingPoint)); } - return sources; - } - - /** - * Creates reader based on given {@link KinesisReaderCheckpoint}. - * If {@link KinesisReaderCheckpoint} is not given, then we use - * {@code initialCheckpointGenerator} to generate new checkpoint. - */ - @Override - public UnboundedReader<KinesisRecord> createReader(PipelineOptions options, - KinesisReaderCheckpoint checkpointMark) { - - CheckpointGenerator checkpointGenerator = initialCheckpointGenerator; - - if (checkpointMark != null) { - checkpointGenerator = new StaticCheckpointGenerator(checkpointMark); + + private KinesisSource(KinesisClientProvider kinesisClientProvider, + CheckpointGenerator initialCheckpoint) { + this.kinesis = kinesisClientProvider; + this.initialCheckpointGenerator = initialCheckpoint; + validate(); } - LOG.info("Creating new reader using {}", checkpointGenerator); - - return new KinesisReader( - SimplifiedKinesisClient.from(kinesis), - checkpointGenerator, - this); - } - - @Override - public Coder<KinesisReaderCheckpoint> getCheckpointMarkCoder() { - return SerializableCoder.of(KinesisReaderCheckpoint.class); - } - - @Override - public void validate() { - checkNotNull(kinesis); - checkNotNull(initialCheckpointGenerator); - } - - @Override - public Coder<KinesisRecord> getDefaultOutputCoder() { - return KinesisRecordCoder.of(); - } + /** + * Generate splits for reading from the stream. + * Basically, it'll try to evenly split set of shards in the stream into + * {@code desiredNumSplits} partitions. Each partition is then a split. + */ + @Override + public List<KinesisSource> split(int desiredNumSplits, + PipelineOptions options) throws Exception { + KinesisReaderCheckpoint checkpoint = + initialCheckpointGenerator.generate(SimplifiedKinesisClient.from(kinesis)); + + List<KinesisSource> sources = newArrayList(); + + for (KinesisReaderCheckpoint partition : checkpoint.splitInto(desiredNumSplits)) { + sources.add(new KinesisSource( + kinesis, + new StaticCheckpointGenerator(partition))); + } + return sources; + } + + /** + * Creates reader based on given {@link KinesisReaderCheckpoint}. + * If {@link KinesisReaderCheckpoint} is not given, then we use + * {@code initialCheckpointGenerator} to generate new checkpoint. + */ + @Override + public UnboundedReader<KinesisRecord> createReader(PipelineOptions options, + KinesisReaderCheckpoint checkpointMark) { + + CheckpointGenerator checkpointGenerator = initialCheckpointGenerator; + + if (checkpointMark != null) { + checkpointGenerator = new StaticCheckpointGenerator(checkpointMark); + } + + LOG.info("Creating new reader using {}", checkpointGenerator); + + return new KinesisReader( + SimplifiedKinesisClient.from(kinesis), + checkpointGenerator, + this); + } + + @Override + public Coder<KinesisReaderCheckpoint> getCheckpointMarkCoder() { + return SerializableCoder.of(KinesisReaderCheckpoint.class); + } + + @Override + public void validate() { + checkNotNull(kinesis); + checkNotNull(initialCheckpointGenerator); + } + + @Override + public Coder<KinesisRecord> getDefaultOutputCoder() { + return KinesisRecordCoder.of(); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RecordFilter.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RecordFilter.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RecordFilter.java index eca725c..40e65fc 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RecordFilter.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RecordFilter.java @@ -21,6 +21,7 @@ import static com.google.common.collect.Lists.newArrayList; import java.util.List; + /** * Filters out records, which were already processed and checkpointed. * @@ -28,14 +29,13 @@ import java.util.List; * accuracy, not with "subSequenceNumber" accuracy. */ class RecordFilter { - - public List<KinesisRecord> apply(List<KinesisRecord> records, ShardCheckpoint checkpoint) { - List<KinesisRecord> filteredRecords = newArrayList(); - for (KinesisRecord record : records) { - if (checkpoint.isBeforeOrAt(record)) { - filteredRecords.add(record); - } + public List<KinesisRecord> apply(List<KinesisRecord> records, ShardCheckpoint checkpoint) { + List<KinesisRecord> filteredRecords = newArrayList(); + for (KinesisRecord record : records) { + if (checkpoint.isBeforeOrAt(record)) { + filteredRecords.add(record); + } + } + return filteredRecords; } - return filteredRecords; - } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java index 806d982..e4ff541 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java @@ -27,28 +27,27 @@ import java.util.Iterator; * Very simple implementation of round robin algorithm. */ class RoundRobin<T> implements Iterable<T> { + private final Deque<T> deque; - private final Deque<T> deque; + public RoundRobin(Iterable<T> collection) { + this.deque = newArrayDeque(collection); + checkArgument(!deque.isEmpty(), "Tried to initialize RoundRobin with empty collection"); + } - public RoundRobin(Iterable<T> collection) { - this.deque = newArrayDeque(collection); - checkArgument(!deque.isEmpty(), "Tried to initialize RoundRobin with empty collection"); - } + public T getCurrent() { + return deque.getFirst(); + } - public T getCurrent() { - return deque.getFirst(); - } + public void moveForward() { + deque.addLast(deque.removeFirst()); + } - public void moveForward() { - deque.addLast(deque.removeFirst()); - } + public int size() { + return deque.size(); + } - public int size() { - return deque.size(); - } - - @Override - public Iterator<T> iterator() { - return deque.iterator(); - } + @Override + public Iterator<T> iterator() { + return deque.iterator(); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java index 95f97b8..6aa3504 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.kinesis; + import static com.amazonaws.services.kinesis.model.ShardIteratorType.AFTER_SEQUENCE_NUMBER; import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_SEQUENCE_NUMBER; import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_TIMESTAMP; @@ -26,11 +27,10 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.ShardIteratorType; - import java.io.Serializable; - import org.joda.time.Instant; + /** * Checkpoint mark for single shard in the stream. * Current position in the shard is determined by either: @@ -45,132 +45,131 @@ import org.joda.time.Instant; * This class is immutable. */ class ShardCheckpoint implements Serializable { + private final String streamName; + private final String shardId; + private final String sequenceNumber; + private final ShardIteratorType shardIteratorType; + private final Long subSequenceNumber; + private final Instant timestamp; + + public ShardCheckpoint(String streamName, String shardId, StartingPoint + startingPoint) { + this(streamName, shardId, + ShardIteratorType.fromValue(startingPoint.getPositionName()), + startingPoint.getTimestamp()); + } + + public ShardCheckpoint(String streamName, String shardId, ShardIteratorType + shardIteratorType, Instant timestamp) { + this(streamName, shardId, shardIteratorType, null, null, timestamp); + } + + public ShardCheckpoint(String streamName, String shardId, ShardIteratorType + shardIteratorType, String sequenceNumber, Long subSequenceNumber) { + this(streamName, shardId, shardIteratorType, sequenceNumber, subSequenceNumber, null); + } + + private ShardCheckpoint(String streamName, String shardId, ShardIteratorType shardIteratorType, + String sequenceNumber, Long subSequenceNumber, Instant timestamp) { + this.shardIteratorType = checkNotNull(shardIteratorType, "shardIteratorType"); + this.streamName = checkNotNull(streamName, "streamName"); + this.shardId = checkNotNull(shardId, "shardId"); + if (shardIteratorType == AT_SEQUENCE_NUMBER || shardIteratorType == AFTER_SEQUENCE_NUMBER) { + checkNotNull(sequenceNumber, + "You must provide sequence number for AT_SEQUENCE_NUMBER" + + " or AFTER_SEQUENCE_NUMBER"); + } else { + checkArgument(sequenceNumber == null, + "Sequence number must be null for LATEST, TRIM_HORIZON or AT_TIMESTAMP"); + } + if (shardIteratorType == AT_TIMESTAMP) { + checkNotNull(timestamp, + "You must provide timestamp for AT_SEQUENCE_NUMBER" + + " or AFTER_SEQUENCE_NUMBER"); + } else { + checkArgument(timestamp == null, + "Timestamp must be null for an iterator type other than AT_TIMESTAMP"); + } + + this.subSequenceNumber = subSequenceNumber; + this.sequenceNumber = sequenceNumber; + this.timestamp = timestamp; + } + + /** + * Used to compare {@link ShardCheckpoint} object to {@link KinesisRecord}. Depending + * on the the underlying shardIteratorType, it will either compare the timestamp or the + * {@link ExtendedSequenceNumber}. + * + * @param other + * @return if current checkpoint mark points before or at given {@link ExtendedSequenceNumber} + */ + public boolean isBeforeOrAt(KinesisRecord other) { + if (shardIteratorType == AT_TIMESTAMP) { + return timestamp.compareTo(other.getApproximateArrivalTimestamp()) <= 0; + } + int result = extendedSequenceNumber().compareTo(other.getExtendedSequenceNumber()); + if (result == 0) { + return shardIteratorType == AT_SEQUENCE_NUMBER; + } + return result < 0; + } + + private ExtendedSequenceNumber extendedSequenceNumber() { + String fullSequenceNumber = sequenceNumber; + if (fullSequenceNumber == null) { + fullSequenceNumber = shardIteratorType.toString(); + } + return new ExtendedSequenceNumber(fullSequenceNumber, subSequenceNumber); + } - private final String streamName; - private final String shardId; - private final String sequenceNumber; - private final ShardIteratorType shardIteratorType; - private final Long subSequenceNumber; - private final Instant timestamp; - - public ShardCheckpoint(String streamName, String shardId, StartingPoint - startingPoint) { - this(streamName, shardId, - ShardIteratorType.fromValue(startingPoint.getPositionName()), - startingPoint.getTimestamp()); - } - - public ShardCheckpoint(String streamName, String shardId, ShardIteratorType - shardIteratorType, Instant timestamp) { - this(streamName, shardId, shardIteratorType, null, null, timestamp); - } - - public ShardCheckpoint(String streamName, String shardId, ShardIteratorType - shardIteratorType, String sequenceNumber, Long subSequenceNumber) { - this(streamName, shardId, shardIteratorType, sequenceNumber, subSequenceNumber, null); - } - - private ShardCheckpoint(String streamName, String shardId, ShardIteratorType shardIteratorType, - String sequenceNumber, Long subSequenceNumber, Instant timestamp) { - this.shardIteratorType = checkNotNull(shardIteratorType, "shardIteratorType"); - this.streamName = checkNotNull(streamName, "streamName"); - this.shardId = checkNotNull(shardId, "shardId"); - if (shardIteratorType == AT_SEQUENCE_NUMBER || shardIteratorType == AFTER_SEQUENCE_NUMBER) { - checkNotNull(sequenceNumber, - "You must provide sequence number for AT_SEQUENCE_NUMBER" - + " or AFTER_SEQUENCE_NUMBER"); - } else { - checkArgument(sequenceNumber == null, - "Sequence number must be null for LATEST, TRIM_HORIZON or AT_TIMESTAMP"); + @Override + public String toString() { + return String.format("Checkpoint %s for stream %s, shard %s: %s", shardIteratorType, + streamName, shardId, + sequenceNumber); } - if (shardIteratorType == AT_TIMESTAMP) { - checkNotNull(timestamp, - "You must provide timestamp for AT_SEQUENCE_NUMBER" - + " or AFTER_SEQUENCE_NUMBER"); - } else { - checkArgument(timestamp == null, - "Timestamp must be null for an iterator type other than AT_TIMESTAMP"); + + public ShardRecordsIterator getShardRecordsIterator(SimplifiedKinesisClient kinesis) + throws TransientKinesisException { + return new ShardRecordsIterator(this, kinesis); } - this.subSequenceNumber = subSequenceNumber; - this.sequenceNumber = sequenceNumber; - this.timestamp = timestamp; - } - - /** - * Used to compare {@link ShardCheckpoint} object to {@link KinesisRecord}. Depending - * on the the underlying shardIteratorType, it will either compare the timestamp or the - * {@link ExtendedSequenceNumber}. - * - * @param other - * @return if current checkpoint mark points before or at given {@link ExtendedSequenceNumber} - */ - public boolean isBeforeOrAt(KinesisRecord other) { - if (shardIteratorType == AT_TIMESTAMP) { - return timestamp.compareTo(other.getApproximateArrivalTimestamp()) <= 0; + public String getShardIterator(SimplifiedKinesisClient kinesisClient) + throws TransientKinesisException { + if (checkpointIsInTheMiddleOfAUserRecord()) { + return kinesisClient.getShardIterator(streamName, + shardId, AT_SEQUENCE_NUMBER, + sequenceNumber, null); + } + return kinesisClient.getShardIterator(streamName, + shardId, shardIteratorType, + sequenceNumber, timestamp); } - int result = extendedSequenceNumber().compareTo(other.getExtendedSequenceNumber()); - if (result == 0) { - return shardIteratorType == AT_SEQUENCE_NUMBER; + + private boolean checkpointIsInTheMiddleOfAUserRecord() { + return shardIteratorType == AFTER_SEQUENCE_NUMBER && subSequenceNumber != null; } - return result < 0; - } - private ExtendedSequenceNumber extendedSequenceNumber() { - String fullSequenceNumber = sequenceNumber; - if (fullSequenceNumber == null) { - fullSequenceNumber = shardIteratorType.toString(); + /** + * Used to advance checkpoint mark to position after given {@link Record}. + * + * @param record + * @return new checkpoint object pointing directly after given {@link Record} + */ + public ShardCheckpoint moveAfter(KinesisRecord record) { + return new ShardCheckpoint( + streamName, shardId, + AFTER_SEQUENCE_NUMBER, + record.getSequenceNumber(), + record.getSubSequenceNumber()); } - return new ExtendedSequenceNumber(fullSequenceNumber, subSequenceNumber); - } - - @Override - public String toString() { - return String.format("Checkpoint %s for stream %s, shard %s: %s", shardIteratorType, - streamName, shardId, - sequenceNumber); - } - - public ShardRecordsIterator getShardRecordsIterator(SimplifiedKinesisClient kinesis) - throws TransientKinesisException { - return new ShardRecordsIterator(this, kinesis); - } - - public String getShardIterator(SimplifiedKinesisClient kinesisClient) - throws TransientKinesisException { - if (checkpointIsInTheMiddleOfAUserRecord()) { - return kinesisClient.getShardIterator(streamName, - shardId, AT_SEQUENCE_NUMBER, - sequenceNumber, null); + + public String getStreamName() { + return streamName; + } + + public String getShardId() { + return shardId; } - return kinesisClient.getShardIterator(streamName, - shardId, shardIteratorType, - sequenceNumber, timestamp); - } - - private boolean checkpointIsInTheMiddleOfAUserRecord() { - return shardIteratorType == AFTER_SEQUENCE_NUMBER && subSequenceNumber != null; - } - - /** - * Used to advance checkpoint mark to position after given {@link Record}. - * - * @param record - * @return new checkpoint object pointing directly after given {@link Record} - */ - public ShardCheckpoint moveAfter(KinesisRecord record) { - return new ShardCheckpoint( - streamName, shardId, - AFTER_SEQUENCE_NUMBER, - record.getSequenceNumber(), - record.getSubSequenceNumber()); - } - - public String getStreamName() { - return streamName; - } - - public String getShardId() { - return shardId; - } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java index a69c6c1..872f604 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java @@ -21,9 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.collect.Queues.newArrayDeque; import com.amazonaws.services.kinesis.model.ExpiredIteratorException; - import java.util.Deque; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,68 +31,68 @@ import org.slf4j.LoggerFactory; * Then the caller of {@link ShardRecordsIterator#next()} can read from queue one by one. */ class ShardRecordsIterator { + private static final Logger LOG = LoggerFactory.getLogger(ShardRecordsIterator.class); - private static final Logger LOG = LoggerFactory.getLogger(ShardRecordsIterator.class); + private final SimplifiedKinesisClient kinesis; + private final RecordFilter filter; + private ShardCheckpoint checkpoint; + private String shardIterator; + private Deque<KinesisRecord> data = newArrayDeque(); - private final SimplifiedKinesisClient kinesis; - private final RecordFilter filter; - private ShardCheckpoint checkpoint; - private String shardIterator; - private Deque<KinesisRecord> data = newArrayDeque(); + public ShardRecordsIterator(final ShardCheckpoint initialCheckpoint, + SimplifiedKinesisClient simplifiedKinesisClient) throws + TransientKinesisException { + this(initialCheckpoint, simplifiedKinesisClient, new RecordFilter()); + } - public ShardRecordsIterator(final ShardCheckpoint initialCheckpoint, - SimplifiedKinesisClient simplifiedKinesisClient) throws - TransientKinesisException { - this(initialCheckpoint, simplifiedKinesisClient, new RecordFilter()); - } + public ShardRecordsIterator(final ShardCheckpoint initialCheckpoint, + SimplifiedKinesisClient simplifiedKinesisClient, + RecordFilter filter) throws + TransientKinesisException { - public ShardRecordsIterator(final ShardCheckpoint initialCheckpoint, - SimplifiedKinesisClient simplifiedKinesisClient, - RecordFilter filter) throws - TransientKinesisException { + this.checkpoint = checkNotNull(initialCheckpoint, "initialCheckpoint"); + this.filter = checkNotNull(filter, "filter"); + this.kinesis = checkNotNull(simplifiedKinesisClient, "simplifiedKinesisClient"); + shardIterator = checkpoint.getShardIterator(kinesis); + } - this.checkpoint = checkNotNull(initialCheckpoint, "initialCheckpoint"); - this.filter = checkNotNull(filter, "filter"); - this.kinesis = checkNotNull(simplifiedKinesisClient, "simplifiedKinesisClient"); - shardIterator = checkpoint.getShardIterator(kinesis); - } + /** + * Returns record if there's any present. + * Returns absent() if there are no new records at this time in the shard. + */ + public CustomOptional<KinesisRecord> next() throws TransientKinesisException { + readMoreIfNecessary(); - /** - * Returns record if there's any present. - * Returns absent() if there are no new records at this time in the shard. - */ - public CustomOptional<KinesisRecord> next() throws TransientKinesisException { - readMoreIfNecessary(); + if (data.isEmpty()) { + return CustomOptional.absent(); + } else { + KinesisRecord record = data.removeFirst(); + checkpoint = checkpoint.moveAfter(record); + return CustomOptional.of(record); + } + } - if (data.isEmpty()) { - return CustomOptional.absent(); - } else { - KinesisRecord record = data.removeFirst(); - checkpoint = checkpoint.moveAfter(record); - return CustomOptional.of(record); + private void readMoreIfNecessary() throws TransientKinesisException { + if (data.isEmpty()) { + GetKinesisRecordsResult response; + try { + response = kinesis.getRecords(shardIterator, checkpoint.getStreamName(), + checkpoint.getShardId()); + } catch (ExpiredIteratorException e) { + LOG.info("Refreshing expired iterator", e); + shardIterator = checkpoint.getShardIterator(kinesis); + response = kinesis.getRecords(shardIterator, checkpoint.getStreamName(), + checkpoint.getShardId()); + } + LOG.debug("Fetched {} new records", response.getRecords().size()); + shardIterator = response.getNextShardIterator(); + data.addAll(filter.apply(response.getRecords(), checkpoint)); + } } - } - private void readMoreIfNecessary() throws TransientKinesisException { - if (data.isEmpty()) { - GetKinesisRecordsResult response; - try { - response = kinesis.getRecords(shardIterator, checkpoint.getStreamName(), - checkpoint.getShardId()); - } catch (ExpiredIteratorException e) { - LOG.info("Refreshing expired iterator", e); - shardIterator = checkpoint.getShardIterator(kinesis); - response = kinesis.getRecords(shardIterator, checkpoint.getStreamName(), - checkpoint.getShardId()); - } - LOG.debug("Fetched {} new records", response.getRecords().size()); - shardIterator = response.getNextShardIterator(); - data.addAll(filter.apply(response.getRecords(), checkpoint)); + public ShardCheckpoint getCheckpoint() { + return checkpoint; } - } - public ShardCheckpoint getCheckpoint() { - return checkpoint; - } }
