Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2228#discussion_r70437664 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java --- @@ -17,89 +17,89 @@ package org.apache.flink.streaming.connectors.kinesis.config; +import com.amazonaws.services.kinesis.model.ShardIteratorType; +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; + /** - * Keys and default values used to configure the Kinesis consumer. + * Optional consumer specific configuration keys and default values for {@link FlinkKinesisConsumer} */ -public class KinesisConfigConstants { +public class ConsumerConfigConstants extends AWSConfigConstants { - // ------------------------------------------------------------------------ - // Configuration Keys - // ------------------------------------------------------------------------ + /** + * The initial position to start reading shards from. This will affect the {@link ShardIteratorType} used + * when the consumer tasks retrieve the first shard iterator for each Kinesis shard. + */ + public enum InitialPosition { + + /** Start reading from the earliest possible record in the stream (excluding expired data records) */ + TRIM_HORIZON(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM), + + /** Start reading from the latest incoming record */ + LATEST(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM); + + private SentinelSequenceNumber sentinelSequenceNumber; + + InitialPosition(SentinelSequenceNumber sentinelSequenceNumber) { + this.sentinelSequenceNumber = sentinelSequenceNumber; + } + + public SentinelSequenceNumber toSentinelSequenceNumber() { + return this.sentinelSequenceNumber; + } + } + + /** The initial position to start reading Kinesis streams from (LATEST is used if not set) */ + public static final String STREAM_INITIAL_POSITION = "flink.stream.initpos"; --- End diff -- I think it is better to use "position". For Kinesis users, the term "position" is more familiar to be used to refer to where to start reading a stream. Amazon's Kinesis Client Library API uses "position" too, as well as the Kinesis AWS service Web UI. "sequence numbers" refers more specifically to a record's offset in a shard, and since we aren't offering to start reading streams from a specific offset, I think the name change isn't that suitable.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---