Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2228#discussion_r70437664
  
    --- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 ---
    @@ -17,89 +17,89 @@
     
     package org.apache.flink.streaming.connectors.kinesis.config;
     
    +import com.amazonaws.services.kinesis.model.ShardIteratorType;
    +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
    +import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
    +
     /**
    - * Keys and default values used to configure the Kinesis consumer.
    + * Optional consumer specific configuration keys and default values for 
{@link FlinkKinesisConsumer}
      */
    -public class KinesisConfigConstants {
    +public class ConsumerConfigConstants extends AWSConfigConstants {
     
    -   // 
------------------------------------------------------------------------
    -   //  Configuration Keys
    -   // 
------------------------------------------------------------------------
    +   /**
    +    * The initial position to start reading shards from. This will affect 
the {@link ShardIteratorType} used
    +    * when the consumer tasks retrieve the first shard iterator for each 
Kinesis shard.
    +    */
    +   public enum InitialPosition {
    +
    +           /** Start reading from the earliest possible record in the 
stream (excluding expired data records) */
    +           
TRIM_HORIZON(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM),
    +
    +           /** Start reading from the latest incoming record */
    +           LATEST(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM);
    +
    +           private SentinelSequenceNumber sentinelSequenceNumber;
    +
    +           InitialPosition(SentinelSequenceNumber sentinelSequenceNumber) {
    +                   this.sentinelSequenceNumber = sentinelSequenceNumber;
    +           }
    +
    +           public SentinelSequenceNumber toSentinelSequenceNumber() {
    +                   return this.sentinelSequenceNumber;
    +           }
    +   }
    +
    +   /** The initial position to start reading Kinesis streams from (LATEST 
is used if not set) */
    +   public static final String STREAM_INITIAL_POSITION = 
"flink.stream.initpos";
    --- End diff --
    
    I think it is better to use "position".
    For Kinesis users, the term "position" is more familiar to be used to refer 
to where to start reading a stream. Amazon's Kinesis Client Library API uses 
"position" too, as well as the Kinesis AWS service Web UI.
    "sequence numbers" refers more specifically to a record's offset in a 
shard, and since we aren't offering to start reading streams from a specific 
offset, I think the name change isn't that suitable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to