[ https://issues.apache.org/jira/browse/FLINK-34129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hongshun Wang updated FLINK-34129: ---------------------------------- Description: Take sum for example: When state is expired, then an update operation from source happens. MiniBatchGlobalGroupAggFunction take -U[1, 20] and +U[1, 20] as input, but will emit +I[1, -20] and -D[1, -20]. The sink will detele the data from external database. Let's see why this will happens: * when state is expired and -U[1, 20] arrive, MiniBatchGlobalGroupAggFunction will create a new sum accumulator and set firstRow as true. {code:java} if (stateAcc == null) { stateAcc = globalAgg.createAccumulators(); firstRow = true; } {code} * then sum accumulator will retract sum value as -20 * As the first row, MiniBatchGlobalGroupAggFunction will change -U as +I, then emit to downstream. {code:java} if (!recordCounter.recordCountIsZero(acc)) { // if this was not the first row and we have to emit retractions if (!firstRow) { // ignore } else { // update acc to state accState.update(acc); // this is the first, output new result // prepare INSERT message for new row resultRow.replace(currentKey, newAggValue).setRowKind(RowKind.INSERT); out.collect(resultRow); } {code} * when next +U[1, 20] arrives, sum accumulator will retract sum value as 0, so RetractionRecordCounter#recordCountIsZero will return true. Because firstRow = false now, will change the +U as -D, then emit to downtream. {code:java} if (!recordCounter.recordCountIsZero(acc)) { // ignode }else{ // we retracted the last record for this key // if this is not first row sent out a DELETE message if (!firstRow) { // prepare DELETE message for previous row resultRow.replace(currentKey, prevAggValue).setRowKind(RowKind.DELETE); out.collect(resultRow); } {code} So the sink will receiver +I and -D after a source update operation, the data will be delete. was: Take sum for example: When state is expired, then an update operation from source happens. MiniBatchGlobalGroupAggFunction take -U[1, 20] and +U[1, 20] as input, but will emit +I[1, -20] and -D[1, -20]. The sink will detele the data from external database. Let's see why this will happens: # when state is expired and -U[1, 20] arrive, MiniBatchGlobalGroupAggFunction will create a new sum accumulator and set firstRow as true. {code:java} if (stateAcc == null) { stateAcc = globalAgg.createAccumulators(); firstRow = true; } {code} # then sum accumulator will retract sum value as -20 # As the first row, MiniBatchGlobalGroupAggFunction will change -U as +I, then emit to downstream. {code:java} if (!recordCounter.recordCountIsZero(acc)) { // if this was not the first row and we have to emit retractions if (!firstRow) { // ignore } else { // update acc to state accState.update(acc); // this is the first, output new result // prepare INSERT message for new row resultRow.replace(currentKey, newAggValue).setRowKind(RowKind.INSERT); out.collect(resultRow); } {code} # when next +U[1, 20] arrives, sum accumulator will retract sum value as 0, so RetractionRecordCounter#recordCountIsZero will return true. Because firstRow = false now, will change the +U as -D, then emit to downtream. {code:java} if (!recordCounter.recordCountIsZero(acc)) { // ignode }else{ // we retracted the last record for this key // if this is not first row sent out a DELETE message if (!firstRow) { // prepare DELETE message for previous row resultRow.replace(currentKey, prevAggValue).setRowKind(RowKind.DELETE); out.collect(resultRow); } {code} So the sink will receiver +I and -D after a source update operation, the data will be delete. > MiniBatchGlobalGroupAggFunction will make -D as +I then make +I as -U when > state expired > ----------------------------------------------------------------------------------------- > > Key: FLINK-34129 > URL: https://issues.apache.org/jira/browse/FLINK-34129 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime > Affects Versions: 1.18.1 > Reporter: Hongshun Wang > Priority: Major > Fix For: 1.19.0 > > > Take sum for example: > When state is expired, then an update operation from source happens. > MiniBatchGlobalGroupAggFunction take -U[1, 20] and +U[1, 20] as input, but > will emit +I[1, -20] and -D[1, -20]. The sink will detele the data from > external database. > Let's see why this will happens: > * when state is expired and -U[1, 20] arrive, > MiniBatchGlobalGroupAggFunction will create a new sum accumulator and set > firstRow as true. > {code:java} > if (stateAcc == null) { > stateAcc = globalAgg.createAccumulators(); > firstRow = true; > } {code} > * then sum accumulator will retract sum value as -20 > * As the first row, MiniBatchGlobalGroupAggFunction will change -U as +I, > then emit to downstream. > {code:java} > if (!recordCounter.recordCountIsZero(acc)) { > // if this was not the first row and we have to emit retractions > if (!firstRow) { > // ignore > } else { > // update acc to state > accState.update(acc); > > // this is the first, output new result > // prepare INSERT message for new row > resultRow.replace(currentKey, newAggValue).setRowKind(RowKind.INSERT); > out.collect(resultRow); > } {code} > * when next +U[1, 20] arrives, sum accumulator will retract sum value as 0, > so RetractionRecordCounter#recordCountIsZero will return true. Because > firstRow = false now, will change the +U as -D, then emit to downtream. > {code:java} > if (!recordCounter.recordCountIsZero(acc)) { > // ignode > }else{ > // we retracted the last record for this key > // if this is not first row sent out a DELETE message > if (!firstRow) { > // prepare DELETE message for previous row > resultRow.replace(currentKey, prevAggValue).setRowKind(RowKind.DELETE); > out.collect(resultRow); > } {code} > > So the sink will receiver +I and -D after a source update operation, the data > will be delete. -- This message was sent by Atlassian Jira (v8.20.10#820010)