[ 
https://issues.apache.org/jira/browse/FLINK-19283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17198091#comment-17198091
 ] 

Yun Tang commented on FLINK-19283:
----------------------------------

[~adriank], I am not familiar with the background of kafka source connector to 
mark  {{initializeState}} method as final. However, you could still work around 
this to add your additional logic by using {{DataStreamSource}}:


{code:java}
        private static class KafkaSource extends StreamSource<Long, 
FlinkKafkaConsumer<Long>> {

                public KafkaSource(FlinkKafkaConsumer sourceFunction) {
                        super(sourceFunction);
                }

                @Override
                public void initializeState(StateInitializationContext context) 
throws Exception {
                        super.initializeState(context);
                       // some additional initialization which executed before 
CheckpointedFunction#initializeState
                }

                @Override
                public void snapshotState(StateSnapshotContext context) throws 
Exception {
                        super.snapshotState(context);
                        // set some additional state which executed before 
CheckpointedFunction#snapshotState
                }
        }

......

    KafkaSource kafkaSource = new KafkaSource(new FlinkKafkaConsumer<>(topic, 
new LimitedLongDeserializer(), standardProps));
    DataStreamSource<Long> streamSource = new DataStreamSource<>(env, 
LONG_TYPE_INFO, kafkaSource1, true, "source");

{code}


> Allow subclasses to override/extend FlinkKafkaConsumerBase checkpoint methods
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-19283
>                 URL: https://issues.apache.org/jira/browse/FLINK-19283
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka
>    Affects Versions: 1.11.0, 1.11.1
>            Reporter: Adrian Kreuziger
>            Priority: Minor
>
> I'm working on a class that extends the FlinkKafkaConsumer to add some 
> additional functionality the first time the consumer runs. I'd like to be 
> able to store some additional state, but am unable to do so as the 
> initializeState() and snapshotState() are marked as final. Ideally I'd like 
> to be able to do something like
> {code:java}
> @Override     
> public void initializeState(FunctionInitializationContext context) throws 
> Exception {
>     super.initializeState(context);
>     // some additional initialization here
> }
> @Override     
> public void snapshotState(FunctionSnapshotContext context) throws Exception {
>     super.snapshotState(context);
>     // set some additional state here
> }{code}
> I'm guessing it was marked final for a reason, is there a reason this would 
> be problematic? The restoredState and unionOffsetStates properties are still 
> private which would prevent subclasses from modifying the offset state.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to