[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15582775#comment-15582775 ]
ASF GitHub Bot commented on FLINK-4844: --------------------------------------- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83663696 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -305,39 +306,42 @@ public void close() throws Exception { super.close(); } } - + // ------------------------------------------------------------------------ // Checkpoint and restore // ------------------------------------------------------------------------ - @Override - public void initializeState(OperatorStateStore stateStore) throws Exception { - this.stateStore = stateStore; + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { - ListState<Serializable> offsets = - stateStore.getSerializableListState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME); + OperatorStateStore stateStore = context.getManagedOperatorStateStore(); + offsetsStateForCheckpoint = stateStore.getSerializableListState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME); - restoreToOffset = new HashMap<>(); + if (context.isRestored()) { + restoreToOffset = new HashMap<>(); + for (Serializable serializable : offsetsStateForCheckpoint.get()) { + @SuppressWarnings("unchecked") + Tuple2<KafkaTopicPartition, Long> kafkaOffset = (Tuple2<KafkaTopicPartition, Long>) serializable; + restoreToOffset.put(kafkaOffset.f0, kafkaOffset.f1); + } - for (Serializable serializable : offsets.get()) { - @SuppressWarnings("unchecked") - Tuple2<KafkaTopicPartition, Long> kafkaOffset = (Tuple2<KafkaTopicPartition, Long>) serializable; - restoreToOffset.put(kafkaOffset.f0, kafkaOffset.f1); + LOG.info("Setting restore state in the FlinkKafkaConsumer."); + if (LOG.isDebugEnabled()) { + LOG.debug("Using the following offsets: {}", restoreToOffset); + } + } else { + LOG.info("No restore state for FlinkKafkaConsumer."); } - - LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", restoreToOffset); } @Override - public void prepareSnapshot(long checkpointId, long timestamp) throws Exception { + public void snapshotState(FunctionSnapshotContext context) throws Exception { if (!running) { LOG.debug("storeOperatorState() called on closed source"); } else { - ListState<Serializable> listState = - stateStore.getSerializableListState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME); - listState.clear(); + offsetsStateForCheckpoint.clear(); final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher; if (fetcher == null) { --- End diff -- This is a workaround for the fact that we initialise the fetcher in `run()` and not in `open()`. Might be worthwhile to change that in a follow-up, if at all possible. > Partitionable Raw Keyed/Operator State > -------------------------------------- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature > Reporter: Stefan Richter > Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)