[
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16503505#comment-16503505
]
Sihua Zhou edited comment on FLINK-9506 at 6/7/18 9:25 AM:
-----------------------------------------------------------
[~yow] Maybe there is one more optimization that could have a try, I see you
are using the ReduceState in your code just to accumulate the
`record.getInt("I_PRIMARY_UNITS")` and collect the result in `onTimer()`. For
the ReduceState it works as follows:
- get the "old result" from RocksDB.
- reduce the "old result" with the input, and put the "new result" back to
RocksDB.
that means for input record in processElement(), it needs to do a `get` and a
`put` to RocksDB. And the `get` cost much more then `put`. I would suggest to
use the ListState instead. With using ListState, what you need to do are:
- Performing {{ListState.add(record)}} in {{processElement()}}, since the
`ListState.add()` is cheap as it only put the record into Rocks.
- Performing reducing in {{OnTimer()}}, the reducing might look as follow:
{code:java}
List< JSONObject> records = listState.get();
for (JSonObject jsonObj : records) {
// do accumulation
}
out.collect(result);
{code}
In this way, for every key every second, you only need to do one read operation
of RocksDB.
was (Author: sihuazhou):
[~yow] Maybe there is one more optimization that could have a try, I see you
are using the ReduceState in your code just to accumulate the
`record.getInt("I_PRIMARY_UNITS")` and collect the result in `onTimer()`. For
the ReduceState it works as follows:
- get the "old result" from RocksDB.
- reduce the "old result" with the input, and put the "new result" back to
RocksDB.
that means for input record in processElement(), it needs to do a `get` and a
`put` to RocksDB. And the `get` cost much more then `put`. I would suggest to
use the ListState instead. With using ListState, what you need to do are:
- Performing {{ListState.add(record)}} in {{processElement()}}, since the
`ListState.add()` is cheap as it not put the record into Rocks.
- Performing reducing in {{OnTimer()}}, the reducing might look as follow:
{code:java}
List< JSONObject> records = listState.get();
for (JSonObject jsonObj : records) {
// do accumulation
}
out.collect(result);
{code}
In this way, for every key very seconds, you only need to do one read operation
of RocksDB.
> Flink ReducingState.add causing more than 100% performance drop
> ---------------------------------------------------------------
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
> Issue Type: Improvement
> Affects Versions: 1.4.2
> Reporter: swy
> Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png
>
>
> Hi, we found out application performance drop more than 100% when
> ReducingState.add is used in the source code. In the test checkpoint is
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just
> simply keep storing record, also with simple reduction function(in fact with
> empty function would see the same result). Any idea would be appreciated.
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how
> many record per second in "JsonTranslator", which is shown in the graph. The
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream<String> stream = env.addSource(new GeneratorSource(loop);
> DataStream<JSONObject> convert = stream.map(new JsonTranslator())
> .keyBy()
> .process(new ProcessAggregation())
> .map(new PassthruFunction());
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState<Record> recStore;
> public void processElement(Recordr, Context ctx, Collector<Record> out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)