swy created FLINK-9506:
--------------------------

             Summary: Flink ReducingState.add causing more than 100% 
performance drop
                 Key: FLINK-9506
                 URL: https://issues.apache.org/jira/browse/FLINK-9506
             Project: Flink
          Issue Type: Improvement
    Affects Versions: 1.4.2
            Reporter: swy


Hi, we found out application performance drop more than 100% when 
ReducingState.add is used in the source code. In the test checkpoint is 
disable. And filesystem(hdfs) as statebackend.

It could be easyly reproduce with a simple app, without checkpoint, just simply 
keep storing record, also with simple reduction function(in fact with empty 
function would see the same result). Any idea would be appreciated. What an 
unbelievable obvious issue.

Basically the app just keep storing record into the state, and we measure how 
many record per second in "JsonTranslator", which is shown in the graph. The 
difference between is just 1 line, comment/un-comment "recStore.add(r)".

DataStream<String> stream = env.addSource(new GeneratorSource(loop);
DataStream<JSONObject> convert = stream.map(new JsonTranslator())
                                       .keyBy()
                                       .process(new ProcessAggregation())
                                       .map(new PassthruFunction());  


public class ProcessAggregation extends ProcessFunction {
    private ReducingState<Record> recStore;

    public void processElement(Recordr, Context ctx, Collector<Record> out) {
        recStore.add(r); //this line make the difference
}

Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to