[ 
https://issues.apache.org/jira/browse/FLINK-28562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser closed FLINK-28562.
----------------------------------
    Resolution: Invalid

[~yue.shang]  This doesn't seem like a complete bug ticket. There can be 
multiple reasons why the performance has been subpar in your case. It's better 
to reach out to the Flink community via the User mailinglist, Stackoverflow or 
Slack. One thing that is immediately visible from your screenshot is that 
you're using Kryo serialization which is slower then other serializers. See 
[https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html|https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#kryo]
 for example. 

> Rocksdb state backend is too slow when using AggregateFunction
> --------------------------------------------------------------
>
>                 Key: FLINK-28562
>                 URL: https://issues.apache.org/jira/browse/FLINK-28562
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.13.2, 1.14.3
>         Environment: {code:java}
> final ParameterTool params = ParameterTool.fromArgs(args);
> // set up the execution environment
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> // get input data
> DataStream<UserTag> source = env.addSource(new SourceFunction<UserTag>() {
>     @Override
>     public void run(SourceContext ctx) throws Exception {
>         Random rd = new Random();
>         while (true){
>             UserTag userTag = new UserTag();
>             userTag.setUserId(rd.nextLong());
>             userTag.setMetricName(UUID.randomUUID().toString());
>             userTag.setTagDimension(UUID.randomUUID().toString());
>             userTag.setTagValue(rd.nextDouble());
>             userTag.setTagTime( new Long(new Date().getTime()).intValue());
>             userTag.setMonitorTime(new Long(new Date().getTime()).intValue());
>             userTag.setAggregationPeriod("5s");
>             userTag.setAggregationType("sum");
>             userTag.setTimePeriod("hour");
>             userTag.setDataType("number");
>             userTag.setBaseTime(1657803600);
>             userTag.setTopic(UUID.randomUUID().toString());
>             for(int i = 0;i<100;i++){
>                 userTag.setUserTagName(UUID.randomUUID() + "-"+i);
>                 ctx.collect(userTag);
>             }
>             Thread.sleep(1);
>         }
>     }
>     @Override
>     public void cancel() {
>     }
> });
> source.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
>         .aggregate(new AggregateFunction<UserTag, Map<String,UserTag>, 
> List<UserTag>>(){
>             @Override
>             public Map<String, UserTag> createAccumulator() {
>                 return new HashMap<>();
>             }
>             @Override
>             public Map<String, UserTag> add(UserTag userTag, Map<String, 
> UserTag> stringUserTagMap) {
>                 stringUserTagMap.put(userTag.getUserTagName(),userTag);
>                 return stringUserTagMap;
>             }
>             @Override
>             public List<UserTag> getResult(Map<String, UserTag> 
> stringUserTagMap) {
>                 return new ArrayList<>(stringUserTagMap.values());
>             }
>             @Override
>             public Map<String, UserTag> merge(Map<String, UserTag> acc1, 
> Map<String, UserTag> acc2) {
>                 acc1.putAll(acc2);
>                 return acc1;
>             }
>         })
>         .setParallelism(1)
>         .name("NewUserTagAggregation_5s")
>         .print().setParallelism(2);
> // execute program
> env.execute(); {code}
> !image-2022-07-15-14-19-21-200.png!
> !image-2022-07-15-14-19-41-678.png!
>            Reporter: Yue Shang
>            Priority: Major
>         Attachments: image-2022-07-15-14-19-21-200.png, 
> image-2022-07-15-14-19-41-678.png
>
>
> Rocksdb state backend is too slow when using AggregateFunction.
> just only supports 300 traffic per second use Map<String,Object>.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to