Github user uce commented on a diff in the pull request:
https://github.com/apache/flink/pull/2228#discussion_r70413349
--- Diff:
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
---
@@ -17,89 +17,89 @@
package org.apache.flink.streaming.connectors.kinesis.config;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+
/**
- * Keys and default values used to configure the Kinesis consumer.
+ * Optional consumer specific configuration keys and default values for
{@link FlinkKinesisConsumer}
*/
-public class KinesisConfigConstants {
+public class ConsumerConfigConstants extends AWSConfigConstants {
- //
------------------------------------------------------------------------
- // Configuration Keys
- //
------------------------------------------------------------------------
+ /**
+ * The initial position to start reading shards from. This will affect
the {@link ShardIteratorType} used
+ * when the consumer tasks retrieve the first shard iterator for each
Kinesis shard.
+ */
+ public enum InitialPosition {
+
+ /** Start reading from the earliest possible record in the
stream (excluding expired data records) */
+
TRIM_HORIZON(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM),
+
+ /** Start reading from the latest incoming record */
+ LATEST(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM);
+
+ private SentinelSequenceNumber sentinelSequenceNumber;
+
+ InitialPosition(SentinelSequenceNumber sentinelSequenceNumber) {
+ this.sentinelSequenceNumber = sentinelSequenceNumber;
+ }
+
+ public SentinelSequenceNumber toSentinelSequenceNumber() {
+ return this.sentinelSequenceNumber;
+ }
+ }
+
+ /** The initial position to start reading Kinesis streams from (LATEST
is used if not set) */
+ public static final String STREAM_INITIAL_POSITION =
"flink.stream.initpos";
--- End diff --
Should we rename "position" to sequence number as this is the term used in
the Kinesis docs? https://aws.amazon.com/kinesis/streams/faqs/
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---