[
https://issues.apache.org/jira/browse/FLINK-4170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372875#comment-15372875
]
ASF GitHub Bot commented on FLINK-4170:
---------------------------------------
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/2228#discussion_r70437664
--- Diff:
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
---
@@ -17,89 +17,89 @@
package org.apache.flink.streaming.connectors.kinesis.config;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+
/**
- * Keys and default values used to configure the Kinesis consumer.
+ * Optional consumer specific configuration keys and default values for
{@link FlinkKinesisConsumer}
*/
-public class KinesisConfigConstants {
+public class ConsumerConfigConstants extends AWSConfigConstants {
- //
------------------------------------------------------------------------
- // Configuration Keys
- //
------------------------------------------------------------------------
+ /**
+ * The initial position to start reading shards from. This will affect
the {@link ShardIteratorType} used
+ * when the consumer tasks retrieve the first shard iterator for each
Kinesis shard.
+ */
+ public enum InitialPosition {
+
+ /** Start reading from the earliest possible record in the
stream (excluding expired data records) */
+
TRIM_HORIZON(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM),
+
+ /** Start reading from the latest incoming record */
+ LATEST(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM);
+
+ private SentinelSequenceNumber sentinelSequenceNumber;
+
+ InitialPosition(SentinelSequenceNumber sentinelSequenceNumber) {
+ this.sentinelSequenceNumber = sentinelSequenceNumber;
+ }
+
+ public SentinelSequenceNumber toSentinelSequenceNumber() {
+ return this.sentinelSequenceNumber;
+ }
+ }
+
+ /** The initial position to start reading Kinesis streams from (LATEST
is used if not set) */
+ public static final String STREAM_INITIAL_POSITION =
"flink.stream.initpos";
--- End diff --
I think it is better to use "position".
For Kinesis users, the term "position" is more familiar to be used to refer
to where to start reading a stream. Amazon's Kinesis Client Library API uses
"position" too, as well as the Kinesis AWS service Web UI.
"sequence numbers" refers more specifically to a record's offset in a
shard, and since we aren't offering to start reading streams from a specific
offset, I think the name change isn't that suitable.
> Remove `CONFIG_` prefix from KinesisConfigConstants variables
> -------------------------------------------------------------
>
> Key: FLINK-4170
> URL: https://issues.apache.org/jira/browse/FLINK-4170
> Project: Flink
> Issue Type: Improvement
> Components: Streaming Connectors
> Reporter: Ufuk Celebi
> Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> I find the static variable names verbose. I think it's clear from context
> that they refer to the Kinesis configuration since they are all gathered in
> that class.
> Therefore would like to remove the {{CONFIG_}} prefix before the release, so
> that we have
> {code}
> conf.put(KinesisConfigConstants.AWS_REGION, "")
> {code}
> instead of
> {code}
> conf.put(KinesisConfigConstants.CONFIG_AWS_REGION, "")
> {code}
> For longer variables it becomes even longer otherwise.
> ---
> Some basic variable names that might be accessed frequently are also very
> long:
> {code}
> CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY
> CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID
> {code}
> It might suffice to just have:
> {code}
> AWS_SECRET_KEY
> AWS_ACCESS_KEY
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)