This is an automated email from the ASF dual-hosted git repository.

fhueske pushed a commit to branch release-2.1
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-2.1 by this push:
     new c8ad71276da [FLINK-38020] Fix NPE in NonTimeRangeUnboundedFunction 
(#26807)
c8ad71276da is described below

commit c8ad71276daae537bd0ee315b860e450387d5617
Author: Bonnie Varghese <bvargh...@confluent.io>
AuthorDate: Mon Aug 25 04:25:47 2025 -0700

    [FLINK-38020] Fix NPE in NonTimeRangeUnboundedFunction (#26807)
    
    - Fix NPE by setting accumulators after creation
---
 .../NonTimeRangeUnboundedPrecedingFunction.java    |  6 +-
 ...NonTimeRangeUnboundedPrecedingFunctionTest.java | 68 ++++++++++++++---
 .../operators/over/RowTimeOverWindowTestBase.java  |  8 ++
 .../operators/over/SumLongAggsHandleFunction.java  | 87 ++++++++++++++++++++++
 4 files changed, 158 insertions(+), 11 deletions(-)

diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/NonTimeRangeUnboundedPrecedingFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/NonTimeRangeUnboundedPrecedingFunction.java
index fa3e61f2721..46a2b0d9904 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/NonTimeRangeUnboundedPrecedingFunction.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/NonTimeRangeUnboundedPrecedingFunction.java
@@ -422,11 +422,13 @@ public class NonTimeRangeUnboundedPrecedingFunction<K>
     private void setAccumulatorOfPrevRow(
             List<Tuple2<RowData, List<Long>>> sortedList, int prevIndex) 
throws Exception {
         if (prevIndex < 0) {
-            aggFuncs.createAccumulators();
+            RowData accData = aggFuncs.createAccumulators();
+            aggFuncs.setAccumulators(accData);
         } else {
             RowData prevAcc = accMapState.get(sortedList.get(prevIndex).f0);
             if (prevAcc == null) {
-                aggFuncs.createAccumulators();
+                RowData accData = aggFuncs.createAccumulators();
+                aggFuncs.setAccumulators(accData);
             } else {
                 aggFuncs.setAccumulators(prevAcc);
             }
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/NonTimeRangeUnboundedPrecedingFunctionTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/NonTimeRangeUnboundedPrecedingFunctionTest.java
index 8cb444c4904..b96935b272c 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/NonTimeRangeUnboundedPrecedingFunctionTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/NonTimeRangeUnboundedPrecedingFunctionTest.java
@@ -42,7 +42,6 @@ import org.apache.flink.types.RowKind;
 import org.junit.Test;
 
 import java.time.Duration;
-import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -212,7 +211,58 @@ public class NonTimeRangeUnboundedPrecedingFunctionTest 
extends RowTimeOverWindo
         testHarness.processElement(insertRecord("key1", 4L, 400L));
 
         List<RowData> expectedRows =
-                Arrays.asList(
+                List.of(
+                        outputRecord(RowKind.INSERT, "key1", 1L, 100L, 1L),
+                        outputRecord(RowKind.INSERT, "key1", 2L, 200L, 3L),
+                        outputRecord(RowKind.INSERT, "key1", 5L, 500L, 8L),
+                        outputRecord(RowKind.INSERT, "key1", 6L, 600L, 14L),
+                        outputRecord(RowKind.INSERT, "key2", 1L, 100L, 1L),
+                        outputRecord(RowKind.INSERT, "key2", 2L, 200L, 3L),
+                        outputRecord(RowKind.INSERT, "key1", 4L, 400L, 7L),
+                        outputRecord(RowKind.UPDATE_BEFORE, "key1", 5L, 500L, 
8L),
+                        outputRecord(RowKind.UPDATE_AFTER, "key1", 5L, 500L, 
12L),
+                        outputRecord(RowKind.UPDATE_BEFORE, "key1", 6L, 600L, 
14L),
+                        outputRecord(RowKind.UPDATE_AFTER, "key1", 6L, 600L, 
18L));
+
+        List<RowData> actualRows = testHarness.extractOutputValues();
+
+        validateRows(actualRows, expectedRows);
+    }
+
+    @Test
+    public void testInsertOnlyRecordsWithCustomSortKeyAndLongSumAgg() throws 
Exception {
+        NonTimeRangeUnboundedPrecedingFunction<RowData> function =
+                new NonTimeRangeUnboundedPrecedingFunction<RowData>(
+                        0,
+                        aggsSumLongHandleFunction,
+                        GENERATED_ROW_VALUE_EQUALISER,
+                        GENERATED_SORT_KEY_EQUALISER,
+                        GENERATED_SORT_KEY_COMPARATOR_ASC,
+                        accTypes,
+                        inputFieldTypes,
+                        SORT_KEY_TYPES,
+                        SORT_KEY_SELECTOR) {};
+        KeyedProcessOperator<RowData, RowData, RowData> operator =
+                new KeyedProcessOperator<>(function);
+
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+                createTestHarness(operator);
+
+        testHarness.open();
+
+        // put some records
+        testHarness.processElement(insertRecord("key1", 1L, 100L));
+        testHarness.processElement(insertRecord("key1", 2L, 200L));
+        testHarness.processElement(insertRecord("key1", 5L, 500L));
+        testHarness.processElement(insertRecord("key1", 6L, 600L));
+        testHarness.processElement(insertRecord("key2", 1L, 100L));
+        testHarness.processElement(insertRecord("key2", 2L, 200L));
+
+        // out of order record should trigger updates for all records after 
its inserted position
+        testHarness.processElement(insertRecord("key1", 4L, 400L));
+
+        List<RowData> expectedRows =
+                List.of(
                         outputRecord(RowKind.INSERT, "key1", 1L, 100L, 1L),
                         outputRecord(RowKind.INSERT, "key1", 2L, 200L, 3L),
                         outputRecord(RowKind.INSERT, "key1", 5L, 500L, 8L),
@@ -267,7 +317,7 @@ public class NonTimeRangeUnboundedPrecedingFunctionTest 
extends RowTimeOverWindo
         testHarness.processElement(insertRecord("key1", 4L, 400L));
 
         List<RowData> expectedRows =
-                Arrays.asList(
+                List.of(
                         outputRecord(RowKind.INSERT, "key1", 1L, 100L, 1L),
                         outputRecord(RowKind.INSERT, "key1", 2L, 200L, 3L),
                         outputRecord(RowKind.INSERT, "key1", 5L, 500L, 8L),
@@ -356,7 +406,7 @@ public class NonTimeRangeUnboundedPrecedingFunctionTest 
extends RowTimeOverWindo
         testHarness.processElement(updateAfterRecord("key1", 3L, 300L));
 
         List<RowData> expectedRows =
-                Arrays.asList(
+                List.of(
                         outputRecord(RowKind.INSERT, "key1", 1L, 100L, 1L),
                         outputRecord(RowKind.INSERT, "key1", 2L, 200L, 3L),
                         outputRecord(RowKind.INSERT, "key1", 5L, 500L, 8L),
@@ -430,7 +480,7 @@ public class NonTimeRangeUnboundedPrecedingFunctionTest 
extends RowTimeOverWindo
         testHarness.processElement(updateBeforeRecord("key1", 5L, 500L));
 
         List<RowData> expectedRows =
-                Arrays.asList(
+                List.of(
                         outputRecord(RowKind.INSERT, "key1", 1L, 100L, 1L),
                         outputRecord(RowKind.INSERT, "key1", 2L, 200L, 3L),
                         outputRecord(RowKind.UPDATE_BEFORE, "key1", 2L, 200L, 
3L),
@@ -490,7 +540,7 @@ public class NonTimeRangeUnboundedPrecedingFunctionTest 
extends RowTimeOverWindo
         testHarness.processElement(updateBeforeRecord("key1", 5L, 502L));
 
         List<RowData> expectedRows =
-                Arrays.asList(
+                List.of(
                         outputRecord(RowKind.INSERT, "key1", 1L, 100L, 1L),
                         outputRecord(RowKind.INSERT, "key1", 2L, 200L, 3L),
                         outputRecord(RowKind.UPDATE_BEFORE, "key1", 2L, 200L, 
3L),
@@ -550,7 +600,7 @@ public class NonTimeRangeUnboundedPrecedingFunctionTest 
extends RowTimeOverWindo
         testHarness.processElement(updateBeforeRecord("key1", 5L, 501L));
 
         List<RowData> expectedRows =
-                Arrays.asList(
+                List.of(
                         outputRecord(RowKind.INSERT, "key1", 1L, 100L, 1L),
                         outputRecord(RowKind.INSERT, "key1", 2L, 200L, 3L),
                         outputRecord(RowKind.UPDATE_BEFORE, "key1", 2L, 200L, 
3L),
@@ -608,7 +658,7 @@ public class NonTimeRangeUnboundedPrecedingFunctionTest 
extends RowTimeOverWindo
         testHarness.processElement(updateBeforeRecord("key1", 2L, 200L));
 
         List<RowData> expectedRows =
-                Arrays.asList(
+                List.of(
                         outputRecord(RowKind.INSERT, "key1", 1L, 100L, 1L),
                         outputRecord(RowKind.INSERT, "key1", 2L, 200L, 2L),
                         outputRecord(RowKind.UPDATE_BEFORE, "key1", 1L, 100L, 
1L),
@@ -679,7 +729,7 @@ public class NonTimeRangeUnboundedPrecedingFunctionTest 
extends RowTimeOverWindo
         testHarness.processElement(updateBeforeRecord("key1", 0L, 100L));
 
         List<RowData> expectedRows =
-                Arrays.asList(
+                List.of(
                         outputRecord(RowKind.INSERT, "key1", 0L, 100L, 0L),
                         outputRecord(RowKind.INSERT, "key1", 0L, 101L, 0L),
                         outputRecord(RowKind.INSERT, "key1", 0L, 102L, 0L),
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/RowTimeOverWindowTestBase.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/RowTimeOverWindowTestBase.java
index 3f85eee4e90..2ea90fa1ad9 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/RowTimeOverWindowTestBase.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/RowTimeOverWindowTestBase.java
@@ -41,6 +41,14 @@ public class RowTimeOverWindowTestBase {
                 }
             };
 
+    protected static GeneratedAggsHandleFunction aggsSumLongHandleFunction =
+            new GeneratedAggsHandleFunction("Function", "", new Object[0]) {
+                @Override
+                public AggsHandleFunction newInstance(ClassLoader classLoader) 
{
+                    return new SumLongAggsHandleFunction(1);
+                }
+            };
+
     protected LogicalType[] inputFieldTypes =
             new LogicalType[] {VarCharType.STRING_TYPE, new BigIntType(), new 
BigIntType()};
     protected LogicalType[] accTypes = new LogicalType[] {new BigIntType()};
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/SumLongAggsHandleFunction.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/SumLongAggsHandleFunction.java
new file mode 100644
index 00000000000..e7d7c6733c6
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/SumLongAggsHandleFunction.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.over;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.dataview.StateDataViewStore;
+import org.apache.flink.table.runtime.generated.AggsHandleFunction;
+
+/** Test {@link AggsHandleFunction}. */
+class SumLongAggsHandleFunction implements AggsHandleFunction {
+
+    private final int inputIndex;
+    private Long sum;
+
+    public SumLongAggsHandleFunction(int inputIndex) {
+        this.inputIndex = inputIndex;
+    }
+
+    @Override
+    public void open(StateDataViewStore store) throws Exception {}
+
+    @Override
+    public void accumulate(RowData input) throws Exception {
+        sum += input.getLong(inputIndex);
+    }
+
+    @Override
+    public void retract(RowData input) throws Exception {
+        sum -= input.getLong(inputIndex);
+    }
+
+    @Override
+    public void merge(RowData accumulator) throws Exception {
+        sum += accumulator.getLong(0);
+    }
+
+    @Override
+    public void setAccumulators(RowData accumulator) throws Exception {
+        sum = accumulator.getLong(0);
+    }
+
+    @Override
+    public void resetAccumulators() throws Exception {
+        sum = 0L;
+    }
+
+    @Override
+    public RowData getAccumulators() throws Exception {
+        return GenericRowData.of(sum);
+    }
+
+    @Override
+    public RowData createAccumulators() throws Exception {
+        return GenericRowData.of(0L);
+    }
+
+    @Override
+    public RowData getValue() throws Exception {
+        return getAccumulators();
+    }
+
+    @Override
+    public void setWindowSize(int windowSize) {}
+
+    @Override
+    public void cleanup() throws Exception {}
+
+    @Override
+    public void close() throws Exception {}
+}

Reply via email to