[ 
https://issues.apache.org/jira/browse/FLINK-18503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17161938#comment-17161938
 ] 

fanrui commented on FLINK-18503:
--------------------------------

Thanks for [~yunta] 's reply. I have read this issue.

> bug occurs when `HeapReducingState#add` method handles null
> -----------------------------------------------------------
>
>                 Key: FLINK-18503
>                 URL: https://issues.apache.org/jira/browse/FLINK-18503
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>    Affects Versions: 1.12.0
>            Reporter: fanrui
>            Priority: Major
>             Fix For: 1.12.0
>
>         Attachments: image-2020-07-07-02-20-03-420.png, 
> image-2020-07-07-02-20-57-299.png
>
>
> In our production environment, there are advertising billing jobs, which are 
> keyBy according to advertiserId. Calculate the cost of each advertiser in the 
> specified window, that is, use ReducingFunction to sum the price after keyBy 
> by advertiser. 
> But it is found that the results calculated using FsStateBackend and 
> RocksDBStateBackend are different. The calculation result of FsStateBackend 
> is wrong, and the calculation result of RocksDBStateBackend is correct.
> After reading the source code, HeapReducingState#add code:[code 
> link|https://github.com/apache/flink/blob/f730e16fb47b0fcace7d3a1a8c8e3cb2c837ceec/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java#L93]
>  
> {code:java}
> public void add(V value) throws IOException {
>    if (value == null) {
>       clear();
>       return;
>    }
>    try {
>       stateTable.transform(currentNamespace, value, reduceTransformation);
>    } catch (Exception e) {
>       throw new IOException("Exception while applying ReduceFunction in 
> reducing state", e);
>    }
> }
> {code}
> If value==null, the clear method deletes the data of the current 
> <key,namespace> from the StateTable. ReducingFunction will only be executed 
> if value!=null.
> h2. Why is there a bug?
> For a job that calculates cost, if price != null, the price is added to 
> result; if price == null, then result is unchanged. 
> The ResultFunction method handles the case of price == null, our 
> ResultFunction is as follows:
>  
> {code:java}
> ReduceFunction<Long> sumFunction = new ReduceFunction<Long>() {
>    @Override
>    public Long reduce(Long previousState, Long newValue) throws Exception {
>       // if newValue ==null,
>       // consider newValue to be 0 and return previousState directly
>       if (newValue == null) {
>          return previousState;
>       }
>       return previousState + newValue;
>    }
> };
> {code}
>  
> However, when HeapReducingState#add finds that the input value == null, it 
> directly executes the clear method, and does not execute the user-defined 
> ResultFunction at all.
> For example: if the input prices are 17, null, and 11, the price saved in the 
> state is 17 when you enter 17, the price is cleared when you enter null, and 
> the price is 11 when you enter 11, so the result is wrong.
> Fortunately, the calculation result of RocksDBStateBackend is correct. The 
> RocksDBReducingState#add method does not perform special treatment for null. 
> RocksDBReducingState#add code is as follows:[code 
> link|https://github.com/apache/flink/blob/f730e16fb47b0fcace7d3a1a8c8e3cb2c837ceec/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java#L92]
>  
> {code:java}
> public void add(V value) throws Exception {
>    byte[] key = getKeyBytes();
>    V oldValue = getInternal(key);
>    V newValue = oldValue == null ? value : reduceFunction.reduce(oldValue, 
> value);
>    updateInternal(key, newValue);
> }
> {code}
> h2. Flink UT can reproduce this bug
> StateBackendTestBase#testReducingStateAddAndGet can reproduce this bug.
> Need to be modified as follows:
>  # udf
> {code:java}
> ReduceFunction<Long> sumFunction = new ReduceFunction<Long>() {
>    @Override
>    public Long reduce(Long previousState, Long newValue) throws Exception {
>       // if newValue ==null,
>       // consider newValue to be 0 and return previousState directly
>       if (newValue == null) {
>          return previousState;
>       }
>       return previousState + newValue;
>    }
> };
> final ReducingStateDescriptor<Long> stateDescr =
>    new ReducingStateDescriptor<>("my-state", sumFunction, Long.class);{code}
>  # add element
> {code:java}
> keyedBackend.setCurrentKey("def");
> assertNull(state.get());
> state.add(17L);
> state.add(null);//new code
> state.add(11L);
> assertEquals(28L, state.get().longValue());{code}
> My code repository commit 
> [link|https://github.com/1996fanrui/flink/commit/645118dd2f95de88580d07e00d88e8783a0f9680]
>  The UT execution output of RocksDBStateBackendTest is as follows:
> !image-2020-07-07-02-20-03-420.png!
>  
>  The UT execution output of FileStateBackendTest&MemoryStateBackendTest is as 
> follows:
> !image-2020-07-07-02-20-57-299.png!
> {code:java}
> java.lang.AssertionError: 
> Expected :28
> Actual   :11{code}
> The above phenomenon shows that the HeapReducingState#add method has a bug. 
> Regardless of which state backend you choose, the semantics provided by the 
> Flink engine should be consistent and should not output different calculation 
> results.
> h2. My solution
> Remove the processing logic of value == null in HeapReducingState#add. 
> Result: All UTs of FileStateBackendTest can be passed.
> h2. Similar bug
> HeapFoldingState#add & HeapAggregatingState#add
> h2. Question
> HeapReducingState#add When designing, why does the designer handle the null 
> case specially? I think the case of null should be handled by the 
> user-defined ReducingFunction.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to