[ 
https://issues.apache.org/jira/browse/FLINK-23132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17368698#comment-17368698
 ] 

Martijn Visser commented on FLINK-23132:
----------------------------------------

Hi, thanks for opening a ticket. It would be great if you could first reach out 
to the [User ML|https://flink.apache.org/community.html] and see if it could 
get resolved there. I'm closing this as "Abandoned". We can always re-open the 
ticket if it turns out that's needed. 

> flink upgrade issue(1.11.3->1.13.0)
> -----------------------------------
>
>                 Key: FLINK-23132
>                 URL: https://issues.apache.org/jira/browse/FLINK-23132
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Jeff Hu
>            Priority: Major
>
>  
> In order to improve the performance of data process, we store events to a map 
> and do not process them untill event count reaches 100. in the meantime, 
> start a timer in open method, so data is processed every 60 seconds
> this works when flink version is *1.11.3*,
> after upgrading flink version to *1.13.0*
> I found sometimes events were consumed from Kafka continuously, but were not 
> processed in RichFlatMapFunction, it means data was missing. after restarting 
> service, it works well, but several hours later the same thing happened again.
> any known issue for this flink version? any suggestions are appreciated.
>  {{public class MyJob \{
>     public static void main(String[] args) throws Exception {
>         ...
>         DataStream<String> rawEventSource = env.addSource(flinkKafkaConsumer);
>         ...
>     }}} {{public class MyMapFunction extends RichFlatMapFunction<String, 
> String> implements Serializable \{
>     @Override
>     public void open(Configuration parameters) {
>         ...
>         long periodTimeout = 60;
>         pool.scheduleAtFixedRate(() -> {
>             // processing data
>         }, periodTimeout, periodTimeout, TimeUnit.SECONDS);
>     }
>     
>     @Override
>     public void flatMap(String message, Collector<String> out) \{
>         // store event to map 
>         // count event, 
>         // when count = 100, start data processing
>     }
> }}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to