[
https://issues.apache.org/jira/browse/FLINK-4821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15974158#comment-15974158
]
ASF GitHub Bot commented on FLINK-4821:
---------------------------------------
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/3001#discussion_r112122730
--- Diff:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
---
@@ -66,31 +80,48 @@
// Consumer properties
//
------------------------------------------------------------------------
- /** The names of the Kinesis streams that we will be consuming from */
+ /**
+ * The names of the Kinesis streams that we will be consuming from
+ */
private final List<String> streams;
- /** Properties to parametrize settings such as AWS service region,
initial position in stream,
- * shard list retrieval behaviours, etc */
+ /**
+ * Properties to parametrize settings such as AWS service region,
initial position in stream,
+ * shard list retrieval behaviours, etc
+ */
private final Properties configProps;
- /** User supplied deseriliazation schema to convert Kinesis byte
messages to Flink objects */
+ /**
+ * User supplied deseriliazation schema to convert Kinesis byte
messages to Flink objects
+ */
private final KinesisDeserializationSchema<T> deserializer;
//
------------------------------------------------------------------------
// Runtime state
//
------------------------------------------------------------------------
- /** Per-task fetcher for Kinesis data records, where each fetcher pulls
data from one or more Kinesis shards */
+ /**
+ * Per-task fetcher for Kinesis data records, where each fetcher pulls
data from one or more Kinesis shards
+ */
private transient KinesisDataFetcher<T> fetcher;
- /** The sequence numbers in the last state snapshot of this subtask */
+ /**
+ * The sequence numbers in the last state snapshot of this subtask
+ */
private transient HashMap<KinesisStreamShard, SequenceNumber>
lastStateSnapshot;
- /** The sequence numbers to restore to upon restore from failure */
+ /**
+ * The sequence numbers to restore to upon restore from failure
+ */
private transient HashMap<KinesisStreamShard, SequenceNumber>
sequenceNumsToRestore;
private volatile boolean running = true;
+ //
------------------------------------------------------------------------
+ // State for Checkpoint
+ //
------------------------------------------------------------------------
+
+ private transient ListState<Tuple2<KinesisStreamShard, SequenceNumber>>
offsetsStateForCheckpoint;
--- End diff --
"offset" is the Kafka term.
I would try to rename this to use "sequence number" instead (or a likewise
abbreviation).
> Implement rescalable non-partitioned state for Kinesis Connector
> ----------------------------------------------------------------
>
> Key: FLINK-4821
> URL: https://issues.apache.org/jira/browse/FLINK-4821
> Project: Flink
> Issue Type: New Feature
> Components: Kinesis Connector
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Wei-Che Wei
>
> FLINK-4379 added the rescalable non-partitioned state feature, along with the
> implementation for the Kafka connector.
> The AWS Kinesis connector will benefit from the feature and should implement
> it too. This ticket tracks progress for this.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)