[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16504729#comment-16504729 ]
Sihua Zhou commented on FLINK-9506: ----------------------------------- Hi [~yow] From the top of my head, I list answers here: - >> 1. Just to confirm, RocksDB is needed to setup in every TM machine? Any other option? RocksDB is needed to setup in every sub-tasks that use the KeyedState if you are using RocksDB backend. - >> 2. What is the recommendation for RocksDB's statebackend? We are using tmpfs with checkpoint now with savepoint persists to hdfs. Q1. I think the default configuration of the RocksDB backend is quite good for the most of the jobs. Q2. I'm not sure whether I got you correctly, the savepoint is triggered manually, and checkpoint is triggered automatically, you means that you trigger the savepoint manually periodically? - >> 3. By source code, rocksdb options like parallelism and certain predefined option could be configured, any corresponding parameter in flink_config.yaml? AFAIK, RocksDB's options need to set in source code if you need to special it. The default parallelism of the operator can be configured in flink-conf.yaml - >> 4. related to your RocksDB config. I see you are using "file:///tmp/rocksdb_simple_example/checkpoints" as the checkpoint directory, I'm not sure if it's accessible to all TMs. If yes, I think that is ok, and also I didn't see your checkpoint interval... BTW, you said you are using the {{r.getUNIQUE_KEY();}} as the key, I'm a bit curious about it's length in general. If it's too long and if you don't need an exactly result, you could use the {{r.getUNIQUE_KEY().hashCode();}} instead, that may also help to improve the performance. And in fact, I also agree with [~kkrugler] that this type of question is best asked in the user mail list, that way more people could take part in and you might also get more ideals from them. ;) > Flink ReducingState.add causing more than 100% performance drop > --------------------------------------------------------------- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement > Affects Versions: 1.4.2 > Reporter: swy > Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream<String> stream = env.addSource(new GeneratorSource(loop); > DataStream<JSONObject> convert = stream.map(new JsonTranslator()) > .keyBy() > .process(new ProcessAggregation()) > .map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState<Record> recStore; > public void processElement(Recordr, Context ctx, Collector<Record> out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)