Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2228#discussion_r70413349
  
    --- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 ---
    @@ -17,89 +17,89 @@
     
     package org.apache.flink.streaming.connectors.kinesis.config;
     
    +import com.amazonaws.services.kinesis.model.ShardIteratorType;
    +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
    +import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
    +
     /**
    - * Keys and default values used to configure the Kinesis consumer.
    + * Optional consumer specific configuration keys and default values for 
{@link FlinkKinesisConsumer}
      */
    -public class KinesisConfigConstants {
    +public class ConsumerConfigConstants extends AWSConfigConstants {
     
    -   // 
------------------------------------------------------------------------
    -   //  Configuration Keys
    -   // 
------------------------------------------------------------------------
    +   /**
    +    * The initial position to start reading shards from. This will affect 
the {@link ShardIteratorType} used
    +    * when the consumer tasks retrieve the first shard iterator for each 
Kinesis shard.
    +    */
    +   public enum InitialPosition {
    +
    +           /** Start reading from the earliest possible record in the 
stream (excluding expired data records) */
    +           
TRIM_HORIZON(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM),
    +
    +           /** Start reading from the latest incoming record */
    +           LATEST(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM);
    +
    +           private SentinelSequenceNumber sentinelSequenceNumber;
    +
    +           InitialPosition(SentinelSequenceNumber sentinelSequenceNumber) {
    +                   this.sentinelSequenceNumber = sentinelSequenceNumber;
    +           }
    +
    +           public SentinelSequenceNumber toSentinelSequenceNumber() {
    +                   return this.sentinelSequenceNumber;
    +           }
    +   }
    +
    +   /** The initial position to start reading Kinesis streams from (LATEST 
is used if not set) */
    +   public static final String STREAM_INITIAL_POSITION = 
"flink.stream.initpos";
    --- End diff --
    
    Should we rename "position" to sequence number as this is the term used in 
the Kinesis docs? https://aws.amazon.com/kinesis/streams/faqs/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to