This is an automated email from the ASF dual-hosted git repository.

fhueske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 383d5640d2a [FLINK-39719][table] Drop late probe records from 
TemporalRowTimeJoinOperator (#28229)
383d5640d2a is described below

commit 383d5640d2a7d298e8ce13c5b3caf38998150077
Author: Fabian Hueske <[email protected]>
AuthorDate: Wed May 27 09:47:56 2026 +0200

    [FLINK-39719][table] Drop late probe records from 
TemporalRowTimeJoinOperator (#28229)
    
    TemporalRowTimeJoinOperator cleans up build-side state when the watermark 
advances.
    Hence, late arriving probe-side records might not see the correct 
build-side state and join with no or incorrect data.
    Especially for probe-side inputs with retractions, this is causing problems 
for downstream operators that cannot match retraction because insertions and 
retractions differ.
    This change drops late arriving probe-side records and tracks the number of 
dropped records in a metric.
    The new behavior is aligned with other operators that operate in event-time 
and clean up state on watermark progress such as windowed or OVER aggregates 
and interval joins.
    
    Changes:
    * drop late arriving probe-side records
    * track dropped record count as a metric
    * adjust existing tests
    * add new unit tests to ensure correct dropping behavior
    * add late data behavior to docs
---
 .../content.zh/docs/sql/reference/queries/joins.md |   2 +
 docs/content/docs/sql/reference/queries/joins.md   |   2 +
 .../exec/stream/TemporalJoinTestPrograms.java      |   4 +-
 .../join/temporal/TemporalRowTimeJoinOperator.java |  32 +++++-
 .../temporal/TemporalRowTimeJoinOperatorTest.java  | 113 ++++++++++++++++++---
 5 files changed, 134 insertions(+), 19 deletions(-)

diff --git a/docs/content.zh/docs/sql/reference/queries/joins.md 
b/docs/content.zh/docs/sql/reference/queries/joins.md
index 0cf8d669709..a1e1db2caab 100644
--- a/docs/content.zh/docs/sql/reference/queries/joins.md
+++ b/docs/content.zh/docs/sql/reference/queries/joins.md
@@ -181,6 +181,8 @@ o_002     12.51  EUR       1.10             12:06:00
 这里的 `INTERVAL` 时间减法用于等待后续事件,以确保 join 满足预期。
 请确保 join 两边设置了正确的 watermark 。
 
+**Note:** Probe-side (left) records that arrive late (their event time is less 
than or equal to the current watermark) are dropped on arrival and counted by 
the `numLateRecordsDropped` operator metric. They are not joined or emitted, 
not even as null-padded results for `LEFT JOIN`, because the matching 
build-side version may already have been cleaned up.
+
 **注意:** 事件时间 temporal join 需要包含主键相等的条件,即:`currency_rates` 表的主键 
`currency_rates.currency` 包含在条件 `orders.currency = currency_rates.currency` 中。
 
 与 [regular joins](#regular-joins) 相比,就算 build side(例子中的 currency_rates 
表)发生变更了,之前的 temporal table 的结果也不会被影响。
diff --git a/docs/content/docs/sql/reference/queries/joins.md 
b/docs/content/docs/sql/reference/queries/joins.md
index be99c737d3f..d000a46b822 100644
--- a/docs/content/docs/sql/reference/queries/joins.md
+++ b/docs/content/docs/sql/reference/queries/joins.md
@@ -186,6 +186,8 @@ o_002     12.51  EUR       1.10             12:06:00
 The `INTERVAL` time subtraction is used to wait for late events in order to 
make sure the join will meet the expectation. 
 Please ensure both sides of the join have set watermark correctly.
 
+**Note:** Probe-side (left) records that arrive late (their event time is less 
than or equal to the current watermark) are dropped on arrival and counted by 
the `numLateRecordsDropped` operator metric. They are not joined or emitted, 
not even as null-padded results for `LEFT JOIN`, because the matching 
build-side version may already have been cleaned up.
+
 **Note:** The event-time temporal join requires the primary key contained in 
the equivalence condition of the temporal join condition, e.g., The primary key 
`currency_rates.currency` of table `currency_rates` to be constrained in the 
condition `orders.currency = currency_rates.currency`.
 
 In contrast to [regular joins](#regular-joins), the previous temporal table 
results will not be affected despite the changes on the build side.
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinTestPrograms.java
index ed83169c74a..b8a03ffacf9 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinTestPrograms.java
@@ -45,7 +45,7 @@ public class TemporalJoinTestPrograms {
                             Row.of(3L, "Euro", "2020-10-10 00:00:45"))
                     .producedAfterRestore(
                             Row.of(1L, "Euro", "2020-10-10 00:00:58"),
-                            Row.of(1L, "USD", "2020-10-10 00:00:58"))
+                            Row.of(1L, "USD", "2020-10-10 00:00:59"))
                     .build();
 
     static final SourceTestStep ORDERS_WITH_NESTED_ID =
@@ -88,7 +88,7 @@ public class TemporalJoinTestPrograms {
                                     1L,
                                     Row.of("usd"),
                                     mapOf("currency", "USD"),
-                                    "2020-10-10 00:00:58"))
+                                    "2020-10-10 00:00:59"))
                     .build();
 
     static final SourceTestStep RATES =
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java
index 940e15c2b4d..71ed75b5970 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.streaming.api.operators.InternalTimer;
@@ -61,9 +62,14 @@ import java.util.TreeMap;
  * idea is that between watermarks we are collecting those elements and once 
we are sure that there
  * will be no updates we emit the correct result and clean up the expired data 
in state.
  *
- * <p>Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is
- * defined as older then the current watermark. Build side is also cleaned up 
in the similar
- * fashion, however we always keep at least one record - the latest one - even 
if it's past the last
+ * <p>Probe-side records that arrive late (their event time is less than or 
equal to the current
+ * watermark) are dropped on arrival and counted via the {@code 
numLateRecordsDropped} metric; they
+ * are not joined or emitted (not even as null-padded results for left outer 
joins), because the
+ * matching build-side version may already have been cleaned up.
+ *
+ * <p>Cleaning up the state drops all the "old" values from the probe side, 
where "old" is defined
+ * as older than the current watermark. Build side is also cleaned up in the 
similar fashion,
+ * however we always keep at least one record - the latest one - even if it's 
past the last
  * watermark.
  *
  * <p>One more trick is how the emitting results and cleaning up is triggered. 
It is achieved by
@@ -84,6 +90,7 @@ public class TemporalRowTimeJoinOperator extends 
BaseTwoInputStreamOperatorWithS
     private static final String RIGHT_STATE_NAME = "right";
     private static final String REGISTERED_TIMER_STATE_NAME = "timer";
     private static final String TIMERS_STATE_NAME = "timers";
+    private static final String LATE_ELEMENTS_DROPPED_METRIC_NAME = 
"numLateRecordsDropped";
 
     private final boolean isLeftOuterJoin;
     private final InternalTypeInfo<RowData> leftType;
@@ -123,6 +130,8 @@ public class TemporalRowTimeJoinOperator extends 
BaseTwoInputStreamOperatorWithS
     private transient JoinedRowData outRow;
     private transient GenericRowData rightNullRow;
 
+    private transient Counter numLateRecordsDropped;
+
     public TemporalRowTimeJoinOperator(
             InternalTypeInfo<RowData> leftType,
             InternalTypeInfo<RowData> rightType,
@@ -174,13 +183,23 @@ public class TemporalRowTimeJoinOperator extends 
BaseTwoInputStreamOperatorWithS
         outRow = new JoinedRowData();
         rightNullRow = new 
GenericRowData(rightType.toRowType().getFieldCount());
         collector = new TimestampedCollector<>(output);
+
+        numLateRecordsDropped =
+                
getRuntimeContext().getMetricGroup().counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);
     }
 
     @Override
     public void processElement1(StreamRecord<RowData> element) throws 
Exception {
         RowData row = element.getValue();
+        long leftTime = getLeftTime(row);
+        if (leftTime <= timerService.currentWatermark()) {
+            // The probe-side record is late. Drop it, because the matching 
build-side version may
+            // already have been cleaned up.
+            numLateRecordsDropped.inc();
+            return;
+        }
         leftState.put(getNextLeftIndex(), row);
-        registerSmallestTimer(getLeftTime(row)); // Timer to emit and clean up 
the state
+        registerSmallestTimer(leftTime); // Timer to emit and clean up the 
state
 
         registerProcessingCleanupTimer();
     }
@@ -441,4 +460,9 @@ public class TemporalRowTimeJoinOperator extends 
BaseTwoInputStreamOperatorWithS
     static String getRegisteredTimerStateName() {
         return REGISTERED_TIMER_STATE_NAME;
     }
+
+    @VisibleForTesting
+    Counter getNumLateRecordsDropped() {
+        return numLateRecordsDropped;
+    }
 }
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperatorTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperatorTest.java
index bff3b8ae73a..2a54ac02fdc 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperatorTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperatorTest.java
@@ -39,9 +39,9 @@ import static org.assertj.core.api.Assertions.assertThat;
 class TemporalRowTimeJoinOperatorTest extends TemporalTimeJoinOperatorTestBase 
{
     /** Test rowtime temporal join. */
     @Test
-    void testRowTimeTemporalJoin() throws Exception {
+    void testRowTimeInnerTemporalJoin() throws Exception {
         List<Object> expectedOutput = new ArrayList<>();
-        expectedOutput.add(new Watermark(1));
+        expectedOutput.add(new Watermark(0));
         expectedOutput.add(new Watermark(2));
         expectedOutput.add(insertRecord(3L, "k1", "1a3", 2L, "k1", "1a2"));
         expectedOutput.add(new Watermark(5));
@@ -54,14 +54,12 @@ class TemporalRowTimeJoinOperatorTest extends 
TemporalTimeJoinOperatorTestBase {
         testRowTimeTemporalJoin(false, expectedOutput);
     }
 
-    /** Test rowtime left temporal join. */
     @Test
     void testRowTimeLeftTemporalJoin() throws Exception {
         List<Object> expectedOutput = new ArrayList<>();
-        expectedOutput.add(new Watermark(1));
+        expectedOutput.add(new Watermark(0));
         expectedOutput.add(insertRecord(1L, "k1", "1a1", null, null, null));
         expectedOutput.add(new Watermark(2));
-        expectedOutput.add(insertRecord(1L, "k1", "1a1", null, null, null));
         expectedOutput.add(insertRecord(3L, "k1", "1a3", 2L, "k1", "1a2"));
         expectedOutput.add(new Watermark(5));
         expectedOutput.add(insertRecord(6L, "k2", "2a3", 4L, "k2", "2a4"));
@@ -84,8 +82,8 @@ class TemporalRowTimeJoinOperatorTest extends 
TemporalTimeJoinOperatorTestBase {
 
         testHarness.open();
 
-        testHarness.processWatermark1(new Watermark(1));
-        testHarness.processWatermark2(new Watermark(1));
+        testHarness.processWatermark1(new Watermark(0));
+        testHarness.processWatermark2(new Watermark(0));
 
         testHarness.processElement1(insertRecord(1L, "k1", "1a1"));
         testHarness.processElement2(insertRecord(2L, "k1", "1a2"));
@@ -93,7 +91,6 @@ class TemporalRowTimeJoinOperatorTest extends 
TemporalTimeJoinOperatorTestBase {
         testHarness.processWatermark1(new Watermark(2));
         testHarness.processWatermark2(new Watermark(2));
 
-        testHarness.processElement1(insertRecord(1L, "k1", "1a1"));
         testHarness.processElement1(insertRecord(3L, "k1", "1a3"));
         testHarness.processElement2(insertRecord(4L, "k2", "2a4"));
 
@@ -194,9 +191,9 @@ class TemporalRowTimeJoinOperatorTest extends 
TemporalTimeJoinOperatorTestBase {
     }
 
     @Test
-    void testRowTimeTemporalJoinOnUpsertSource() throws Exception {
+    void testRowTimeInnerTemporalJoinOnUpsertSource() throws Exception {
         List<Object> expectedOutput = new ArrayList<>();
-        expectedOutput.add(new Watermark(1));
+        expectedOutput.add(new Watermark(0));
         expectedOutput.add(new Watermark(2));
         expectedOutput.add(updateAfterRecord(3L, "k1", "1a3", 2L, "k1", 
"1a2"));
         expectedOutput.add(new Watermark(5));
@@ -212,7 +209,7 @@ class TemporalRowTimeJoinOperatorTest extends 
TemporalTimeJoinOperatorTestBase {
     @Test
     void testRowTimeLeftTemporalJoinOnUpsertSource() throws Exception {
         List<Object> expectedOutput = new ArrayList<>();
-        expectedOutput.add(new Watermark(1));
+        expectedOutput.add(new Watermark(0));
         expectedOutput.add(insertRecord(1L, "k1", "1a1", null, null, null));
         expectedOutput.add(new Watermark(2));
         expectedOutput.add(updateAfterRecord(3L, "k1", "1a3", 2L, "k1", 
"1a2"));
@@ -237,8 +234,8 @@ class TemporalRowTimeJoinOperatorTest extends 
TemporalTimeJoinOperatorTestBase {
 
         testHarness.open();
 
-        testHarness.processWatermark1(new Watermark(1));
-        testHarness.processWatermark2(new Watermark(1));
+        testHarness.processWatermark1(new Watermark(0));
+        testHarness.processWatermark2(new Watermark(0));
 
         testHarness.processElement1(insertRecord(1L, "k1", "1a1"));
         testHarness.processElement2(insertRecord(2L, "k1", "1a2"));
@@ -270,6 +267,96 @@ class TemporalRowTimeJoinOperatorTest extends 
TemporalTimeJoinOperatorTestBase {
         testHarness.close();
     }
 
+    @Test
+    void testRowTimeInnerTemporalJoinLateRecords() throws Exception {
+        List<Object> expectedOutput = new ArrayList<>();
+        expectedOutput.add(new Watermark(1));
+        expectedOutput.add(insertRecord(3L, "k1", "1a3", 2L, "k1", "2a2"));
+        expectedOutput.add(new Watermark(5));
+        expectedOutput.add(insertRecord(7L, "k1", "1a7", 2L, "k1", "2a2"));
+        expectedOutput.add(new Watermark(8));
+        expectedOutput.add(new Watermark(11));
+        expectedOutput.add(insertRecord(13L, "k2", "1a13", 9L, "k2", "2a9"));
+        expectedOutput.add(new Watermark(13));
+        expectedOutput.add(new Watermark(15));
+
+        testRowTimeTemporalJoinLateRecords(false, expectedOutput);
+    }
+
+    @Test
+    void testRowTimeLeftTemporalJoinLateRecords() throws Exception {
+        List<Object> expectedOutput = new ArrayList<>();
+        expectedOutput.add(new Watermark(1));
+        expectedOutput.add(insertRecord(3L, "k1", "1a3", 2L, "k1", "2a2"));
+        expectedOutput.add(new Watermark(5));
+        expectedOutput.add(insertRecord(7L, "k1", "1a7", 2L, "k1", "2a2"));
+        expectedOutput.add(new Watermark(8));
+        expectedOutput.add(insertRecord(10L, "k2", "1a10", null, null, null));
+        expectedOutput.add(new Watermark(11));
+        expectedOutput.add(insertRecord(13L, "k2", "1a13", 9L, "k2", "2a9"));
+        expectedOutput.add(new Watermark(13));
+        expectedOutput.add(new Watermark(15));
+
+        testRowTimeTemporalJoinLateRecords(true, expectedOutput);
+    }
+
+    /**
+     * Verifies that probe-side records whose event time is less than or equal 
to the current
+     * watermark are dropped on arrival: they are not joined, not emitted 
(even with a left outer
+     * join), and are counted in the {@code numLateRecordsDropped} metric.
+     */
+    private void testRowTimeTemporalJoinLateRecords(
+            boolean isLeftOuter, List<Object> expectedOutput) throws Exception 
{
+        TemporalRowTimeJoinOperator joinOperator =
+                new TemporalRowTimeJoinOperator(
+                        rowType, rowType, joinCondition, 0, 0, 0, 0, 
isLeftOuter);
+        KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, 
RowData> testHarness =
+                createTestHarness(joinOperator);
+
+        testHarness.open();
+
+        // initialize watermark to 1
+        testHarness.processWatermark1(new Watermark(1));
+        testHarness.processWatermark2(new Watermark(1));
+
+        // Establish a build-side version at time 2 and a non-late probe 
record at time 3.
+        testHarness.processElement2(insertRecord(2L, "k1", "2a2"));
+        testHarness.processElement1(insertRecord(3L, "k1", "1a3"));
+        testHarness.processWatermark1(new Watermark(5));
+        testHarness.processWatermark2(new Watermark(5));
+
+        // After Watermark(5), any probe record with leftTime <= 5 is late and 
must be dropped.
+        testHarness.processElement1(insertRecord(5L, "k1", "1a5")); // 
leftTime == watermark
+        testHarness.processElement1(insertRecord(4L, "k1", "1a4")); // 
leftTime < watermark
+        testHarness.processElement1(insertRecord(1L, "k1", "1a1")); // 
leftTime << watermark
+        // A non-late probe record should still be processed.
+        testHarness.processElement1(insertRecord(7L, "k1", "1a7"));
+        testHarness.processWatermark1(new Watermark(8));
+        testHarness.processWatermark2(new Watermark(8));
+
+        // A record for late retraction
+        testHarness.processElement1(insertRecord(10L, "k2", "1a10"));
+        testHarness.processWatermark1(new Watermark(11));
+        testHarness.processWatermark2(new Watermark(11));
+
+        // Add a late retraction and a late build-side record
+        testHarness.processElement1(insertRecord(13L, "k2", "1a13"));
+        testHarness.processElement2(insertRecord(9L, "k2", "2a9"));
+        testHarness.processElement1(deleteRecord(10L, "k2", "1a10")); // late 
-> dropped
+        testHarness.processWatermark1(new Watermark(13));
+        testHarness.processWatermark2(new Watermark(13));
+
+        // Another late retraction
+        testHarness.processElement1(deleteRecord(13L, "k2", "1a13"));
+        testHarness.processWatermark1(new Watermark(15));
+        testHarness.processWatermark2(new Watermark(15));
+
+        assertor.assertOutputEquals("output wrong.", expectedOutput, 
testHarness.getOutput());
+        
assertThat(joinOperator.getNumLateRecordsDropped().getCount()).isEqualTo(5L);
+
+        testHarness.close();
+    }
+
     private KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, 
RowData>
             createTestHarness(TemporalRowTimeJoinOperator 
temporalJoinOperator) throws Exception {
 

Reply via email to