bvarghese1 commented on code in PR #26424: URL: https://github.com/apache/flink/pull/26424#discussion_r2182507151
########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/NonTimeRowsUnboundedPrecedingFunction.java: ########## @@ -447,56 +180,35 @@ private void setAccumulatorOfPrevId( * @param insRow * @throws Exception */ - private void reAccumulateIdsAfterInsert(RowData currAcc, List<Long> ids, RowData insRow) + void reAccumulateIdsAfterInsert(RowData currAcc, List<Long> ids, RowData insRow) throws Exception { aggFuncs.setAccumulators(currAcc); - // For rows, there is no need to re-accumulate all ids for the same sort key, since every id - // will have a unique aggregated value + // For rows, there is no need to re-accumulate all ids for the same sort key, + // since every id will have a unique aggregated value // Update acc for newly inserted row aggFuncs.accumulate(insRow); } - private RowData setAccumulatorAndGetValue(RowData accumulator) throws Exception { - aggFuncs.setAccumulators(accumulator); - return aggFuncs.getValue(); - } - /** - * Emits updates for all ids. The id belonging to idxOfChangedRow location either emits a DELETE - * or INSERT/UPDATE_AFTER, depending on whether the id is being added or removed from ids. All - * other ids except the id at position idxOfChangedRow emit old and new aggregated values in the - * form of UPDATE_BEFORE and UPDATE_AFTER respectively. + * Helper method to send updates for ids. To comply with sql rows syntax, only send update for + * the changed row. * * @param ids * @param idxOfChangedRow - * @param prevAcc - * @param currAcc + * @param out * @param rowKind * @param changedRow - * @param out - * @throws Exception + * @param prevAggValue + * @param currAggValue */ - private void emitUpdatesForIds( + void sendUpdatesForIds( Review Comment: Added ########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/NonTimeRowsUnboundedPrecedingFunction.java: ########## @@ -516,7 +228,7 @@ private void emitUpdatesForIds( * @param out * @throws Exception */ - private void processRemainingElements( + protected void processRemainingElements( Review Comment: Added -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org