[ https://issues.apache.org/jira/browse/FLINK-18503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17161938#comment-17161938 ]
fanrui commented on FLINK-18503: -------------------------------- Thanks for [~yunta] 's reply. I have read this issue. > bug occurs when `HeapReducingState#add` method handles null > ----------------------------------------------------------- > > Key: FLINK-18503 > URL: https://issues.apache.org/jira/browse/FLINK-18503 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends > Affects Versions: 1.12.0 > Reporter: fanrui > Priority: Major > Fix For: 1.12.0 > > Attachments: image-2020-07-07-02-20-03-420.png, > image-2020-07-07-02-20-57-299.png > > > In our production environment, there are advertising billing jobs, which are > keyBy according to advertiserId. Calculate the cost of each advertiser in the > specified window, that is, use ReducingFunction to sum the price after keyBy > by advertiser. > But it is found that the results calculated using FsStateBackend and > RocksDBStateBackend are different. The calculation result of FsStateBackend > is wrong, and the calculation result of RocksDBStateBackend is correct. > After reading the source code, HeapReducingState#add code:[code > link|https://github.com/apache/flink/blob/f730e16fb47b0fcace7d3a1a8c8e3cb2c837ceec/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java#L93] > > {code:java} > public void add(V value) throws IOException { > if (value == null) { > clear(); > return; > } > try { > stateTable.transform(currentNamespace, value, reduceTransformation); > } catch (Exception e) { > throw new IOException("Exception while applying ReduceFunction in > reducing state", e); > } > } > {code} > If value==null, the clear method deletes the data of the current > <key,namespace> from the StateTable. ReducingFunction will only be executed > if value!=null. > h2. Why is there a bug? > For a job that calculates cost, if price != null, the price is added to > result; if price == null, then result is unchanged. > The ResultFunction method handles the case of price == null, our > ResultFunction is as follows: > > {code:java} > ReduceFunction<Long> sumFunction = new ReduceFunction<Long>() { > @Override > public Long reduce(Long previousState, Long newValue) throws Exception { > // if newValue ==null, > // consider newValue to be 0 and return previousState directly > if (newValue == null) { > return previousState; > } > return previousState + newValue; > } > }; > {code} > > However, when HeapReducingState#add finds that the input value == null, it > directly executes the clear method, and does not execute the user-defined > ResultFunction at all. > For example: if the input prices are 17, null, and 11, the price saved in the > state is 17 when you enter 17, the price is cleared when you enter null, and > the price is 11 when you enter 11, so the result is wrong. > Fortunately, the calculation result of RocksDBStateBackend is correct. The > RocksDBReducingState#add method does not perform special treatment for null. > RocksDBReducingState#add code is as follows:[code > link|https://github.com/apache/flink/blob/f730e16fb47b0fcace7d3a1a8c8e3cb2c837ceec/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java#L92] > > {code:java} > public void add(V value) throws Exception { > byte[] key = getKeyBytes(); > V oldValue = getInternal(key); > V newValue = oldValue == null ? value : reduceFunction.reduce(oldValue, > value); > updateInternal(key, newValue); > } > {code} > h2. Flink UT can reproduce this bug > StateBackendTestBase#testReducingStateAddAndGet can reproduce this bug. > Need to be modified as follows: > # udf > {code:java} > ReduceFunction<Long> sumFunction = new ReduceFunction<Long>() { > @Override > public Long reduce(Long previousState, Long newValue) throws Exception { > // if newValue ==null, > // consider newValue to be 0 and return previousState directly > if (newValue == null) { > return previousState; > } > return previousState + newValue; > } > }; > final ReducingStateDescriptor<Long> stateDescr = > new ReducingStateDescriptor<>("my-state", sumFunction, Long.class);{code} > # add element > {code:java} > keyedBackend.setCurrentKey("def"); > assertNull(state.get()); > state.add(17L); > state.add(null);//new code > state.add(11L); > assertEquals(28L, state.get().longValue());{code} > My code repository commit > [link|https://github.com/1996fanrui/flink/commit/645118dd2f95de88580d07e00d88e8783a0f9680] > The UT execution output of RocksDBStateBackendTest is as follows: > !image-2020-07-07-02-20-03-420.png! > > The UT execution output of FileStateBackendTest&MemoryStateBackendTest is as > follows: > !image-2020-07-07-02-20-57-299.png! > {code:java} > java.lang.AssertionError: > Expected :28 > Actual :11{code} > The above phenomenon shows that the HeapReducingState#add method has a bug. > Regardless of which state backend you choose, the semantics provided by the > Flink engine should be consistent and should not output different calculation > results. > h2. My solution > Remove the processing logic of value == null in HeapReducingState#add. > Result: All UTs of FileStateBackendTest can be passed. > h2. Similar bug > HeapFoldingState#add & HeapAggregatingState#add > h2. Question > HeapReducingState#add When designing, why does the designer handle the null > case specially? I think the case of null should be handled by the > user-defined ReducingFunction. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)