This is an automated email from the ASF dual-hosted git repository. fhueske pushed a commit to branch release-2.1 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-2.1 by this push: new c8ad71276da [FLINK-38020] Fix NPE in NonTimeRangeUnboundedFunction (#26807) c8ad71276da is described below commit c8ad71276daae537bd0ee315b860e450387d5617 Author: Bonnie Varghese <bvargh...@confluent.io> AuthorDate: Mon Aug 25 04:25:47 2025 -0700 [FLINK-38020] Fix NPE in NonTimeRangeUnboundedFunction (#26807) - Fix NPE by setting accumulators after creation --- .../NonTimeRangeUnboundedPrecedingFunction.java | 6 +- ...NonTimeRangeUnboundedPrecedingFunctionTest.java | 68 ++++++++++++++--- .../operators/over/RowTimeOverWindowTestBase.java | 8 ++ .../operators/over/SumLongAggsHandleFunction.java | 87 ++++++++++++++++++++++ 4 files changed, 158 insertions(+), 11 deletions(-) diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/NonTimeRangeUnboundedPrecedingFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/NonTimeRangeUnboundedPrecedingFunction.java index fa3e61f2721..46a2b0d9904 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/NonTimeRangeUnboundedPrecedingFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/NonTimeRangeUnboundedPrecedingFunction.java @@ -422,11 +422,13 @@ public class NonTimeRangeUnboundedPrecedingFunction<K> private void setAccumulatorOfPrevRow( List<Tuple2<RowData, List<Long>>> sortedList, int prevIndex) throws Exception { if (prevIndex < 0) { - aggFuncs.createAccumulators(); + RowData accData = aggFuncs.createAccumulators(); + aggFuncs.setAccumulators(accData); } else { RowData prevAcc = accMapState.get(sortedList.get(prevIndex).f0); if (prevAcc == null) { - aggFuncs.createAccumulators(); + RowData accData = aggFuncs.createAccumulators(); + aggFuncs.setAccumulators(accData); } else { aggFuncs.setAccumulators(prevAcc); } diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/NonTimeRangeUnboundedPrecedingFunctionTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/NonTimeRangeUnboundedPrecedingFunctionTest.java index 8cb444c4904..b96935b272c 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/NonTimeRangeUnboundedPrecedingFunctionTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/NonTimeRangeUnboundedPrecedingFunctionTest.java @@ -42,7 +42,6 @@ import org.apache.flink.types.RowKind; import org.junit.Test; import java.time.Duration; -import java.util.Arrays; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @@ -212,7 +211,58 @@ public class NonTimeRangeUnboundedPrecedingFunctionTest extends RowTimeOverWindo testHarness.processElement(insertRecord("key1", 4L, 400L)); List<RowData> expectedRows = - Arrays.asList( + List.of( + outputRecord(RowKind.INSERT, "key1", 1L, 100L, 1L), + outputRecord(RowKind.INSERT, "key1", 2L, 200L, 3L), + outputRecord(RowKind.INSERT, "key1", 5L, 500L, 8L), + outputRecord(RowKind.INSERT, "key1", 6L, 600L, 14L), + outputRecord(RowKind.INSERT, "key2", 1L, 100L, 1L), + outputRecord(RowKind.INSERT, "key2", 2L, 200L, 3L), + outputRecord(RowKind.INSERT, "key1", 4L, 400L, 7L), + outputRecord(RowKind.UPDATE_BEFORE, "key1", 5L, 500L, 8L), + outputRecord(RowKind.UPDATE_AFTER, "key1", 5L, 500L, 12L), + outputRecord(RowKind.UPDATE_BEFORE, "key1", 6L, 600L, 14L), + outputRecord(RowKind.UPDATE_AFTER, "key1", 6L, 600L, 18L)); + + List<RowData> actualRows = testHarness.extractOutputValues(); + + validateRows(actualRows, expectedRows); + } + + @Test + public void testInsertOnlyRecordsWithCustomSortKeyAndLongSumAgg() throws Exception { + NonTimeRangeUnboundedPrecedingFunction<RowData> function = + new NonTimeRangeUnboundedPrecedingFunction<RowData>( + 0, + aggsSumLongHandleFunction, + GENERATED_ROW_VALUE_EQUALISER, + GENERATED_SORT_KEY_EQUALISER, + GENERATED_SORT_KEY_COMPARATOR_ASC, + accTypes, + inputFieldTypes, + SORT_KEY_TYPES, + SORT_KEY_SELECTOR) {}; + KeyedProcessOperator<RowData, RowData, RowData> operator = + new KeyedProcessOperator<>(function); + + OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = + createTestHarness(operator); + + testHarness.open(); + + // put some records + testHarness.processElement(insertRecord("key1", 1L, 100L)); + testHarness.processElement(insertRecord("key1", 2L, 200L)); + testHarness.processElement(insertRecord("key1", 5L, 500L)); + testHarness.processElement(insertRecord("key1", 6L, 600L)); + testHarness.processElement(insertRecord("key2", 1L, 100L)); + testHarness.processElement(insertRecord("key2", 2L, 200L)); + + // out of order record should trigger updates for all records after its inserted position + testHarness.processElement(insertRecord("key1", 4L, 400L)); + + List<RowData> expectedRows = + List.of( outputRecord(RowKind.INSERT, "key1", 1L, 100L, 1L), outputRecord(RowKind.INSERT, "key1", 2L, 200L, 3L), outputRecord(RowKind.INSERT, "key1", 5L, 500L, 8L), @@ -267,7 +317,7 @@ public class NonTimeRangeUnboundedPrecedingFunctionTest extends RowTimeOverWindo testHarness.processElement(insertRecord("key1", 4L, 400L)); List<RowData> expectedRows = - Arrays.asList( + List.of( outputRecord(RowKind.INSERT, "key1", 1L, 100L, 1L), outputRecord(RowKind.INSERT, "key1", 2L, 200L, 3L), outputRecord(RowKind.INSERT, "key1", 5L, 500L, 8L), @@ -356,7 +406,7 @@ public class NonTimeRangeUnboundedPrecedingFunctionTest extends RowTimeOverWindo testHarness.processElement(updateAfterRecord("key1", 3L, 300L)); List<RowData> expectedRows = - Arrays.asList( + List.of( outputRecord(RowKind.INSERT, "key1", 1L, 100L, 1L), outputRecord(RowKind.INSERT, "key1", 2L, 200L, 3L), outputRecord(RowKind.INSERT, "key1", 5L, 500L, 8L), @@ -430,7 +480,7 @@ public class NonTimeRangeUnboundedPrecedingFunctionTest extends RowTimeOverWindo testHarness.processElement(updateBeforeRecord("key1", 5L, 500L)); List<RowData> expectedRows = - Arrays.asList( + List.of( outputRecord(RowKind.INSERT, "key1", 1L, 100L, 1L), outputRecord(RowKind.INSERT, "key1", 2L, 200L, 3L), outputRecord(RowKind.UPDATE_BEFORE, "key1", 2L, 200L, 3L), @@ -490,7 +540,7 @@ public class NonTimeRangeUnboundedPrecedingFunctionTest extends RowTimeOverWindo testHarness.processElement(updateBeforeRecord("key1", 5L, 502L)); List<RowData> expectedRows = - Arrays.asList( + List.of( outputRecord(RowKind.INSERT, "key1", 1L, 100L, 1L), outputRecord(RowKind.INSERT, "key1", 2L, 200L, 3L), outputRecord(RowKind.UPDATE_BEFORE, "key1", 2L, 200L, 3L), @@ -550,7 +600,7 @@ public class NonTimeRangeUnboundedPrecedingFunctionTest extends RowTimeOverWindo testHarness.processElement(updateBeforeRecord("key1", 5L, 501L)); List<RowData> expectedRows = - Arrays.asList( + List.of( outputRecord(RowKind.INSERT, "key1", 1L, 100L, 1L), outputRecord(RowKind.INSERT, "key1", 2L, 200L, 3L), outputRecord(RowKind.UPDATE_BEFORE, "key1", 2L, 200L, 3L), @@ -608,7 +658,7 @@ public class NonTimeRangeUnboundedPrecedingFunctionTest extends RowTimeOverWindo testHarness.processElement(updateBeforeRecord("key1", 2L, 200L)); List<RowData> expectedRows = - Arrays.asList( + List.of( outputRecord(RowKind.INSERT, "key1", 1L, 100L, 1L), outputRecord(RowKind.INSERT, "key1", 2L, 200L, 2L), outputRecord(RowKind.UPDATE_BEFORE, "key1", 1L, 100L, 1L), @@ -679,7 +729,7 @@ public class NonTimeRangeUnboundedPrecedingFunctionTest extends RowTimeOverWindo testHarness.processElement(updateBeforeRecord("key1", 0L, 100L)); List<RowData> expectedRows = - Arrays.asList( + List.of( outputRecord(RowKind.INSERT, "key1", 0L, 100L, 0L), outputRecord(RowKind.INSERT, "key1", 0L, 101L, 0L), outputRecord(RowKind.INSERT, "key1", 0L, 102L, 0L), diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/RowTimeOverWindowTestBase.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/RowTimeOverWindowTestBase.java index 3f85eee4e90..2ea90fa1ad9 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/RowTimeOverWindowTestBase.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/RowTimeOverWindowTestBase.java @@ -41,6 +41,14 @@ public class RowTimeOverWindowTestBase { } }; + protected static GeneratedAggsHandleFunction aggsSumLongHandleFunction = + new GeneratedAggsHandleFunction("Function", "", new Object[0]) { + @Override + public AggsHandleFunction newInstance(ClassLoader classLoader) { + return new SumLongAggsHandleFunction(1); + } + }; + protected LogicalType[] inputFieldTypes = new LogicalType[] {VarCharType.STRING_TYPE, new BigIntType(), new BigIntType()}; protected LogicalType[] accTypes = new LogicalType[] {new BigIntType()}; diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/SumLongAggsHandleFunction.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/SumLongAggsHandleFunction.java new file mode 100644 index 00000000000..e7d7c6733c6 --- /dev/null +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/SumLongAggsHandleFunction.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.over; + +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.dataview.StateDataViewStore; +import org.apache.flink.table.runtime.generated.AggsHandleFunction; + +/** Test {@link AggsHandleFunction}. */ +class SumLongAggsHandleFunction implements AggsHandleFunction { + + private final int inputIndex; + private Long sum; + + public SumLongAggsHandleFunction(int inputIndex) { + this.inputIndex = inputIndex; + } + + @Override + public void open(StateDataViewStore store) throws Exception {} + + @Override + public void accumulate(RowData input) throws Exception { + sum += input.getLong(inputIndex); + } + + @Override + public void retract(RowData input) throws Exception { + sum -= input.getLong(inputIndex); + } + + @Override + public void merge(RowData accumulator) throws Exception { + sum += accumulator.getLong(0); + } + + @Override + public void setAccumulators(RowData accumulator) throws Exception { + sum = accumulator.getLong(0); + } + + @Override + public void resetAccumulators() throws Exception { + sum = 0L; + } + + @Override + public RowData getAccumulators() throws Exception { + return GenericRowData.of(sum); + } + + @Override + public RowData createAccumulators() throws Exception { + return GenericRowData.of(0L); + } + + @Override + public RowData getValue() throws Exception { + return getAccumulators(); + } + + @Override + public void setWindowSize(int windowSize) {} + + @Override + public void cleanup() throws Exception {} + + @Override + public void close() throws Exception {} +}