[ 
https://issues.apache.org/jira/browse/FLINK-34129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hongshun Wang updated FLINK-34129:
----------------------------------
    Description: 
Take sum for example:
When state is expired, then an update operation from source happens. 
MiniBatchGlobalGroupAggFunction take -U[1, 20] and +U[1, 20] as input, but will 
emit +I[1, -20] and -D[1, -20]. The sink will detele the data from external 
database.

Let's see why this will happens:
 * when state is expired and -U[1, 20] arrive, MiniBatchGlobalGroupAggFunction 
will create a new sum accumulator and set firstRow as true.

{code:java}
if (stateAcc == null) { 
    stateAcc = globalAgg.createAccumulators(); 
    firstRow = true; 
}   {code}
 * then sum accumulator will retract sum value as -20
 * As the first row, MiniBatchGlobalGroupAggFunction will change -U as +I, then 
emit to downstream.

{code:java}
if (!recordCounter.recordCountIsZero(acc)) {
   // if this was not the first row and we have to emit retractions
    if (!firstRow) {
       // ignore
    } else {
    // update acc to state
    accState.update(acc);
 
   // this is the first, output new result
   // prepare INSERT message for new row
   resultRow.replace(currentKey, newAggValue).setRowKind(RowKind.INSERT);
   out.collect(resultRow);
}  {code}
 * when next +U[1, 20] arrives, sum accumulator will retract sum value as 0, so 
RetractionRecordCounter#recordCountIsZero will return true. Because firstRow = 
false now, will change the +U as -D, then emit to downtream.

{code:java}
if (!recordCounter.recordCountIsZero(acc)) {
    // ignode
}else{
   // we retracted the last record for this key
   // if this is not first row sent out a DELETE message
   if (!firstRow) {
   // prepare DELETE message for previous row
   resultRow.replace(currentKey, prevAggValue).setRowKind(RowKind.DELETE);
   out.collect(resultRow);
} {code}
 
So the sink will receiver +I and -D after a source update operation, the data 
will be delete.

  was:
Take sum for example:
When state is expired, then an update operation from source happens. 
MiniBatchGlobalGroupAggFunction take -U[1, 20] and +U[1, 20] as input, but will 
emit +I[1, -20] and -D[1, -20]. The sink will detele the data from external 
database.

Let's see why this will happens:
 # when state is expired and -U[1, 20] arrive, MiniBatchGlobalGroupAggFunction 
will create a new sum accumulator and set firstRow as true.
{code:java}
if (stateAcc == null) { 
    stateAcc = globalAgg.createAccumulators(); 
    firstRow = true; 
}   {code}

 # then sum accumulator will retract sum value as -20
 # As the first row, MiniBatchGlobalGroupAggFunction will change -U as +I, then 
emit to downstream.
{code:java}
if (!recordCounter.recordCountIsZero(acc)) {
   // if this was not the first row and we have to emit retractions
    if (!firstRow) {
       // ignore
    } else {
    // update acc to state
    accState.update(acc);
 
   // this is the first, output new result
   // prepare INSERT message for new row
   resultRow.replace(currentKey, newAggValue).setRowKind(RowKind.INSERT);
   out.collect(resultRow);
}  {code}

 # when next +U[1, 20] arrives, sum accumulator will retract sum value as 0, so 
RetractionRecordCounter#recordCountIsZero will return true. Because firstRow = 
false now, will change the +U as -D, then emit to downtream.
{code:java}
if (!recordCounter.recordCountIsZero(acc)) {
    // ignode
}else{
   // we retracted the last record for this key
   // if this is not first row sent out a DELETE message
   if (!firstRow) {
   // prepare DELETE message for previous row
   resultRow.replace(currentKey, prevAggValue).setRowKind(RowKind.DELETE);
   out.collect(resultRow);
} {code}

 
So the sink will receiver +I and -D after a source update operation, the data 
will be delete.


> MiniBatchGlobalGroupAggFunction will make -D as +I then make +I as -U when 
> state expired 
> -----------------------------------------------------------------------------------------
>
>                 Key: FLINK-34129
>                 URL: https://issues.apache.org/jira/browse/FLINK-34129
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.18.1
>            Reporter: Hongshun Wang
>            Priority: Major
>             Fix For: 1.19.0
>
>
> Take sum for example:
> When state is expired, then an update operation from source happens. 
> MiniBatchGlobalGroupAggFunction take -U[1, 20] and +U[1, 20] as input, but 
> will emit +I[1, -20] and -D[1, -20]. The sink will detele the data from 
> external database.
> Let's see why this will happens:
>  * when state is expired and -U[1, 20] arrive, 
> MiniBatchGlobalGroupAggFunction will create a new sum accumulator and set 
> firstRow as true.
> {code:java}
> if (stateAcc == null) { 
>     stateAcc = globalAgg.createAccumulators(); 
>     firstRow = true; 
> }   {code}
>  * then sum accumulator will retract sum value as -20
>  * As the first row, MiniBatchGlobalGroupAggFunction will change -U as +I, 
> then emit to downstream.
> {code:java}
> if (!recordCounter.recordCountIsZero(acc)) {
>    // if this was not the first row and we have to emit retractions
>     if (!firstRow) {
>        // ignore
>     } else {
>     // update acc to state
>     accState.update(acc);
>  
>    // this is the first, output new result
>    // prepare INSERT message for new row
>    resultRow.replace(currentKey, newAggValue).setRowKind(RowKind.INSERT);
>    out.collect(resultRow);
> }  {code}
>  * when next +U[1, 20] arrives, sum accumulator will retract sum value as 0, 
> so RetractionRecordCounter#recordCountIsZero will return true. Because 
> firstRow = false now, will change the +U as -D, then emit to downtream.
> {code:java}
> if (!recordCounter.recordCountIsZero(acc)) {
>     // ignode
> }else{
>    // we retracted the last record for this key
>    // if this is not first row sent out a DELETE message
>    if (!firstRow) {
>    // prepare DELETE message for previous row
>    resultRow.replace(currentKey, prevAggValue).setRowKind(RowKind.DELETE);
>    out.collect(resultRow);
> } {code}
>  
> So the sink will receiver +I and -D after a source update operation, the data 
> will be delete.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to