[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16504729#comment-16504729
 ] 

Sihua Zhou commented on FLINK-9506:
-----------------------------------

Hi [~yow] From the top of my head, I list answers here:

- >> 1. Just to confirm, RocksDB is needed to setup in every TM machine? Any 
other option?

RocksDB is needed to setup in every sub-tasks that use the KeyedState if you 
are using RocksDB backend.

- >> 2. What is the recommendation for RocksDB's statebackend? We are using 
tmpfs with checkpoint now with savepoint persists to hdfs.

Q1. I think the default configuration of the RocksDB backend is quite good for 
the most of the jobs.
Q2. I'm not sure whether I got you correctly, the savepoint is triggered 
manually, and checkpoint is triggered automatically, you means that you trigger 
the savepoint manually periodically?

- >> 3. By source code, rocksdb options like parallelism and certain predefined 
option could be configured, any corresponding parameter in flink_config.yaml?

AFAIK, RocksDB's options need to set in source code if you need to special it. 
The default parallelism of the operator can be configured in flink-conf.yaml

- >> 4. related to your RocksDB config.

I see you are using "file:///tmp/rocksdb_simple_example/checkpoints" as the 
checkpoint directory, I'm not sure if it's accessible to all TMs. If yes, I 
think that is ok, and also I didn't see your checkpoint interval...

BTW, you said you are using the {{r.getUNIQUE_KEY();}}  as the key, I'm a bit 
curious about it's length in general. If it's too long and if you don't need an 
exactly result, you could use the {{r.getUNIQUE_KEY().hashCode();}} instead, 
that may also help to improve the performance. And in fact, I also agree with 
[~kkrugler] that this type of question is best asked in the user mail list, 
that way more people could take part in and you might also get more ideals from 
them. ;)



> Flink ReducingState.add causing more than 100% performance drop
> ---------------------------------------------------------------
>
>                 Key: FLINK-9506
>                 URL: https://issues.apache.org/jira/browse/FLINK-9506
>             Project: Flink
>          Issue Type: Improvement
>    Affects Versions: 1.4.2
>            Reporter: swy
>            Priority: Major
>         Attachments: KeyNoHash_VS_KeyHash.png, flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream<String> stream = env.addSource(new GeneratorSource(loop);
> DataStream<JSONObject> convert = stream.map(new JsonTranslator())
>                                        .keyBy()
>                                        .process(new ProcessAggregation())
>                                        .map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
>     private ReducingState<Record> recStore;
>     public void processElement(Recordr, Context ctx, Collector<Record> out) {
>         recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to