[
https://issues.apache.org/jira/browse/FLINK-4821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15974152#comment-15974152
]
ASF GitHub Bot commented on FLINK-4821:
---------------------------------------
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/3001#discussion_r112122128
--- Diff:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
---
@@ -267,38 +293,84 @@ public void close() throws Exception {
//
------------------------------------------------------------------------
@Override
- public HashMap<KinesisStreamShard, SequenceNumber> snapshotState(long
checkpointId, long checkpointTimestamp) throws Exception {
+ public void snapshotState(FunctionSnapshotContext context) throws
Exception {
if (lastStateSnapshot == null) {
LOG.debug("snapshotState() requested on not yet opened
source; returning null.");
- return null;
- }
-
- if (fetcher == null) {
+ } else if (fetcher == null) {
LOG.debug("snapshotState() requested on not yet running
source; returning null.");
- return null;
- }
-
- if (!running) {
+ } else if (!running) {
LOG.debug("snapshotState() called on closed source;
returning null.");
- return null;
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Snapshotting state ...");
+ }
+
+ offsetsStateForCheckpoint.clear();
+ lastStateSnapshot = fetcher.snapshotState();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Snapshotted state, last processed
sequence numbers: {}, checkpoint id: {}, timestamp: {}",
+ lastStateSnapshot.toString(),
context.getCheckpointId(), context.getCheckpointTimestamp());
+ }
+
+ for (Map.Entry<KinesisStreamShard, SequenceNumber>
entry : lastStateSnapshot.entrySet()) {
+
offsetsStateForCheckpoint.add(Tuple2.of(entry.getKey(), entry.getValue()));
+ }
}
+ }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Snapshotting state ...");
+ @Override
+ public void initializeState(FunctionInitializationContext context)
throws Exception {
+ TypeInformation<Tuple2<KinesisStreamShard, SequenceNumber>>
tuple = new TupleTypeInfo<>(
+ TypeInformation.of(KinesisStreamShard.class),
+ TypeInformation.of(SequenceNumber.class)
+ );
+
+ offsetsStateForCheckpoint =
context.getOperatorStateStore().getUnionListState(
+ new
ListStateDescriptor<>(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME,
tuple));
+
+ if (context.isRestored()) {
+ if (sequenceNumsToRestore == null) {
+ sequenceNumsToRestore = new HashMap<>();
+ for (Tuple2<KinesisStreamShard, SequenceNumber>
kinesisOffset : offsetsStateForCheckpoint.get()) {
+
sequenceNumsToRestore.put(kinesisOffset.f0, kinesisOffset.f1);
+ }
+
+ LOG.info("Setting restore state in the
FlinkKinesisConsumer.");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Using the following offsets:
{}", sequenceNumsToRestore);
+ }
+ } else if (sequenceNumsToRestore.isEmpty()) {
+ sequenceNumsToRestore = null;
+ }
+ } else {
+ LOG.info("No restore state for FlinkKinesisConsumer.");
}
+ }
- lastStateSnapshot = fetcher.snapshotState();
+ @Override
+ public void restoreState(HashMap<KinesisStreamShard, SequenceNumber>
restoredState) throws Exception {
+ LOG.info("{} (taskIdx={}) restoring offsets from an older
version.",
--- End diff --
I think the other log messages follow a different format then this.
The others do something like `Subtask {} is restoring offset from an older
version`?
> Implement rescalable non-partitioned state for Kinesis Connector
> ----------------------------------------------------------------
>
> Key: FLINK-4821
> URL: https://issues.apache.org/jira/browse/FLINK-4821
> Project: Flink
> Issue Type: New Feature
> Components: Kinesis Connector
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Wei-Che Wei
>
> FLINK-4379 added the rescalable non-partitioned state feature, along with the
> implementation for the Kafka connector.
> The AWS Kinesis connector will benefit from the feature and should implement
> it too. This ticket tracks progress for this.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)