[
https://issues.apache.org/jira/browse/FLINK-19283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17198091#comment-17198091
]
Yun Tang commented on FLINK-19283:
----------------------------------
[~adriank], I am not familiar with the background of kafka source connector to
mark {{initializeState}} method as final. However, you could still work around
this to add your additional logic by using {{DataStreamSource}}:
{code:java}
private static class KafkaSource extends StreamSource<Long,
FlinkKafkaConsumer<Long>> {
public KafkaSource(FlinkKafkaConsumer sourceFunction) {
super(sourceFunction);
}
@Override
public void initializeState(StateInitializationContext context)
throws Exception {
super.initializeState(context);
// some additional initialization which executed before
CheckpointedFunction#initializeState
}
@Override
public void snapshotState(StateSnapshotContext context) throws
Exception {
super.snapshotState(context);
// set some additional state which executed before
CheckpointedFunction#snapshotState
}
}
......
KafkaSource kafkaSource = new KafkaSource(new FlinkKafkaConsumer<>(topic,
new LimitedLongDeserializer(), standardProps));
DataStreamSource<Long> streamSource = new DataStreamSource<>(env,
LONG_TYPE_INFO, kafkaSource1, true, "source");
{code}
> Allow subclasses to override/extend FlinkKafkaConsumerBase checkpoint methods
> -----------------------------------------------------------------------------
>
> Key: FLINK-19283
> URL: https://issues.apache.org/jira/browse/FLINK-19283
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / Kafka
> Affects Versions: 1.11.0, 1.11.1
> Reporter: Adrian Kreuziger
> Priority: Minor
>
> I'm working on a class that extends the FlinkKafkaConsumer to add some
> additional functionality the first time the consumer runs. I'd like to be
> able to store some additional state, but am unable to do so as the
> initializeState() and snapshotState() are marked as final. Ideally I'd like
> to be able to do something like
> {code:java}
> @Override
> public void initializeState(FunctionInitializationContext context) throws
> Exception {
> super.initializeState(context);
> // some additional initialization here
> }
> @Override
> public void snapshotState(FunctionSnapshotContext context) throws Exception {
> super.snapshotState(context);
> // set some additional state here
> }{code}
> I'm guessing it was marked final for a reason, is there a reason this would
> be problematic? The restoredState and unionOffsetStates properties are still
> private which would prevent subclasses from modifying the offset state.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)