bvarghese1 commented on code in PR #26424:
URL: https://github.com/apache/flink/pull/26424#discussion_r2182507151


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/NonTimeRowsUnboundedPrecedingFunction.java:
##########
@@ -447,56 +180,35 @@ private void setAccumulatorOfPrevId(
      * @param insRow
      * @throws Exception
      */
-    private void reAccumulateIdsAfterInsert(RowData currAcc, List<Long> ids, 
RowData insRow)
+    void reAccumulateIdsAfterInsert(RowData currAcc, List<Long> ids, RowData 
insRow)
             throws Exception {
         aggFuncs.setAccumulators(currAcc);
-        // For rows, there is no need to re-accumulate all ids for the same 
sort key, since every id
-        // will have a unique aggregated value
+        // For rows, there is no need to re-accumulate all ids for the same 
sort key,
+        // since every id will have a unique aggregated value
         // Update acc for newly inserted row
         aggFuncs.accumulate(insRow);
     }
 
-    private RowData setAccumulatorAndGetValue(RowData accumulator) throws 
Exception {
-        aggFuncs.setAccumulators(accumulator);
-        return aggFuncs.getValue();
-    }
-
     /**
-     * Emits updates for all ids. The id belonging to idxOfChangedRow location 
either emits a DELETE
-     * or INSERT/UPDATE_AFTER, depending on whether the id is being added or 
removed from ids. All
-     * other ids except the id at position idxOfChangedRow emit old and new 
aggregated values in the
-     * form of UPDATE_BEFORE and UPDATE_AFTER respectively.
+     * Helper method to send updates for ids. To comply with sql rows syntax, 
only send update for
+     * the changed row.
      *
      * @param ids
      * @param idxOfChangedRow
-     * @param prevAcc
-     * @param currAcc
+     * @param out
      * @param rowKind
      * @param changedRow
-     * @param out
-     * @throws Exception
+     * @param prevAggValue
+     * @param currAggValue
      */
-    private void emitUpdatesForIds(
+    void sendUpdatesForIds(

Review Comment:
   Added



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/NonTimeRowsUnboundedPrecedingFunction.java:
##########
@@ -516,7 +228,7 @@ private void emitUpdatesForIds(
      * @param out
      * @throws Exception
      */
-    private void processRemainingElements(
+    protected void processRemainingElements(

Review Comment:
   Added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to