This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new 921b6081582 [FLINK-28019][table] fix error when retract a staled
record if state ttl enabled in RetractableTopNFunction
921b6081582 is described below
commit 921b608158288bc807493e1c425f6d7ec6f47b18
Author: lincoln lee <[email protected]>
AuthorDate: Mon Jun 20 10:12:37 2022 +0800
[FLINK-28019][table] fix error when retract a staled record if state ttl
enabled in RetractableTopNFunction
This closes #19996
---
.../operators/rank/RetractableTopNFunction.java | 32 ++++++++++--------
.../rank/RetractableTopNFunctionTest.java | 38 ++++++++++++++++++++++
2 files changed, 56 insertions(+), 14 deletions(-)
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunction.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunction.java
index da4d94d994b..b9d6b6dfdca 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunction.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunction.java
@@ -191,16 +191,7 @@ public class RetractableTopNFunction extends
AbstractTopNFunction {
sortedMap.put(sortKey, count);
}
} else {
- if (sortedMap.isEmpty()) {
- if (lenient) {
- LOG.warn(STATE_CLEARED_WARN_MSG);
- } else {
- throw new RuntimeException(STATE_CLEARED_WARN_MSG);
- }
- } else {
- throw new RuntimeException(
- "Can not retract a non-existent record. This
should never happen.");
- }
+ stateStaledErrorHandle();
}
if (!stateRemoved) {
@@ -231,10 +222,19 @@ public class RetractableTopNFunction extends
AbstractTopNFunction {
private void processStateStaled(Iterator<Map.Entry<RowData, Long>>
sortedMapIterator)
throws RuntimeException {
+ // Sync with dataState first
+ sortedMapIterator.remove();
+
+ stateStaledErrorHandle();
+ }
+
+ /**
+ * Handle state staled error by configured lenient option. If option is
true, warning log only,
+ * otherwise a {@link RuntimeException} will be thrown.
+ */
+ private void stateStaledErrorHandle() {
// Skip the data if it's state is cleared because of state ttl.
if (lenient) {
- // Sync with dataState
- sortedMapIterator.remove();
LOG.warn(STATE_CLEARED_WARN_MSG);
} else {
throw new RuntimeException(STATE_CLEARED_WARN_MSG);
@@ -395,8 +395,12 @@ public class RetractableTopNFunction extends
AbstractTopNFunction {
}
}
if (isInRankEnd(currentRank)) {
- // there is no enough elements in Top-N, emit DELETE message for
the retract record.
- collectDelete(out, prevRow, currentRank);
+ if (!findsSortKey && null == prevRow) {
+ stateStaledErrorHandle();
+ } else {
+ // there is no enough elements in Top-N, emit DELETE message
for the retract record.
+ collectDelete(out, prevRow, currentRank);
+ }
}
return findsSortKey;
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunctionTest.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunctionTest.java
index 20efa889c18..d0c3c7d3d75 100644
---
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunctionTest.java
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunctionTest.java
@@ -18,10 +18,12 @@
package org.apache.flink.table.runtime.operators.rank;
+import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.util.StateConfigUtil;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.VarCharType;
@@ -555,4 +557,40 @@ public class RetractableTopNFunctionTest extends
TopNFunctionTestBase {
assertorWithRowNumber.assertOutputEquals(
"output wrong.", expectedOutput, testHarness.getOutput());
}
+
+ @Test
+ public void testRetractAnStaledRecordWithRowNumber() throws Exception {
+ StateTtlConfig ttlConfig = StateConfigUtil.createTtlConfig(1_000);
+ AbstractTopNFunction func =
+ new RetractableTopNFunction(
+ ttlConfig,
+ InternalTypeInfo.ofFields(
+ VarCharType.STRING_TYPE, new BigIntType(), new
IntType()),
+ comparableRecordComparator,
+ sortKeySelector,
+ RankType.ROW_NUMBER,
+ new ConstantRankRange(1, 2),
+ generatedEqualiser,
+ true,
+ true);
+
+ OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
createTestHarness(func);
+ testHarness.open();
+ testHarness.setStateTtlProcessingTime(0);
+ testHarness.processElement(insertRecord("a", 1L, 10));
+ testHarness.setStateTtlProcessingTime(1001);
+ testHarness.processElement(insertRecord("a", 2L, 11));
+ testHarness.processElement(deleteRecord("a", 1L, 10));
+ testHarness.close();
+
+ List<Object> expectedOutput = new ArrayList<>();
+ expectedOutput.add(insertRecord("a", 1L, 10, 1L));
+ expectedOutput.add(insertRecord("a", 2L, 11, 1L));
+ // the following delete record should not be sent because the left row
is null which is
+ // illegal.
+ // -D{row1=null, row2=+I(1)};
+
+ assertorWithRowNumber.assertOutputEquals(
+ "output wrong.", expectedOutput, testHarness.getOutput());
+ }
}