[
https://issues.apache.org/jira/browse/FLINK-4821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15974156#comment-15974156
]
ASF GitHub Bot commented on FLINK-4821:
---------------------------------------
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/3001#discussion_r112121870
--- Diff:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
---
@@ -267,38 +293,84 @@ public void close() throws Exception {
//
------------------------------------------------------------------------
@Override
- public HashMap<KinesisStreamShard, SequenceNumber> snapshotState(long
checkpointId, long checkpointTimestamp) throws Exception {
+ public void snapshotState(FunctionSnapshotContext context) throws
Exception {
if (lastStateSnapshot == null) {
LOG.debug("snapshotState() requested on not yet opened
source; returning null.");
- return null;
- }
-
- if (fetcher == null) {
+ } else if (fetcher == null) {
LOG.debug("snapshotState() requested on not yet running
source; returning null.");
- return null;
- }
-
- if (!running) {
+ } else if (!running) {
LOG.debug("snapshotState() called on closed source;
returning null.");
- return null;
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Snapshotting state ...");
+ }
+
+ offsetsStateForCheckpoint.clear();
+ lastStateSnapshot = fetcher.snapshotState();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Snapshotted state, last processed
sequence numbers: {}, checkpoint id: {}, timestamp: {}",
+ lastStateSnapshot.toString(),
context.getCheckpointId(), context.getCheckpointTimestamp());
+ }
+
+ for (Map.Entry<KinesisStreamShard, SequenceNumber>
entry : lastStateSnapshot.entrySet()) {
+
offsetsStateForCheckpoint.add(Tuple2.of(entry.getKey(), entry.getValue()));
+ }
}
+ }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Snapshotting state ...");
+ @Override
+ public void initializeState(FunctionInitializationContext context)
throws Exception {
+ TypeInformation<Tuple2<KinesisStreamShard, SequenceNumber>>
tuple = new TupleTypeInfo<>(
+ TypeInformation.of(KinesisStreamShard.class),
+ TypeInformation.of(SequenceNumber.class)
+ );
+
+ offsetsStateForCheckpoint =
context.getOperatorStateStore().getUnionListState(
+ new
ListStateDescriptor<>(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME,
tuple));
--- End diff --
We can probably have a more meaningful state name now.
> Implement rescalable non-partitioned state for Kinesis Connector
> ----------------------------------------------------------------
>
> Key: FLINK-4821
> URL: https://issues.apache.org/jira/browse/FLINK-4821
> Project: Flink
> Issue Type: New Feature
> Components: Kinesis Connector
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Wei-Che Wei
>
> FLINK-4379 added the rescalable non-partitioned state feature, along with the
> implementation for the Kafka connector.
> The AWS Kinesis connector will benefit from the feature and should implement
> it too. This ticket tracks progress for this.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)