This is an automated email from the ASF dual-hosted git repository.
fhueske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 383d5640d2a [FLINK-39719][table] Drop late probe records from
TemporalRowTimeJoinOperator (#28229)
383d5640d2a is described below
commit 383d5640d2a7d298e8ce13c5b3caf38998150077
Author: Fabian Hueske <[email protected]>
AuthorDate: Wed May 27 09:47:56 2026 +0200
[FLINK-39719][table] Drop late probe records from
TemporalRowTimeJoinOperator (#28229)
TemporalRowTimeJoinOperator cleans up build-side state when the watermark
advances.
Hence, late arriving probe-side records might not see the correct
build-side state and join with no or incorrect data.
Especially for probe-side inputs with retractions, this is causing problems
for downstream operators that cannot match retraction because insertions and
retractions differ.
This change drops late arriving probe-side records and tracks the number of
dropped records in a metric.
The new behavior is aligned with other operators that operate in event-time
and clean up state on watermark progress such as windowed or OVER aggregates
and interval joins.
Changes:
* drop late arriving probe-side records
* track dropped record count as a metric
* adjust existing tests
* add new unit tests to ensure correct dropping behavior
* add late data behavior to docs
---
.../content.zh/docs/sql/reference/queries/joins.md | 2 +
docs/content/docs/sql/reference/queries/joins.md | 2 +
.../exec/stream/TemporalJoinTestPrograms.java | 4 +-
.../join/temporal/TemporalRowTimeJoinOperator.java | 32 +++++-
.../temporal/TemporalRowTimeJoinOperatorTest.java | 113 ++++++++++++++++++---
5 files changed, 134 insertions(+), 19 deletions(-)
diff --git a/docs/content.zh/docs/sql/reference/queries/joins.md
b/docs/content.zh/docs/sql/reference/queries/joins.md
index 0cf8d669709..a1e1db2caab 100644
--- a/docs/content.zh/docs/sql/reference/queries/joins.md
+++ b/docs/content.zh/docs/sql/reference/queries/joins.md
@@ -181,6 +181,8 @@ o_002 12.51 EUR 1.10 12:06:00
这里的 `INTERVAL` 时间减法用于等待后续事件,以确保 join 满足预期。
请确保 join 两边设置了正确的 watermark 。
+**Note:** Probe-side (left) records that arrive late (their event time is less
than or equal to the current watermark) are dropped on arrival and counted by
the `numLateRecordsDropped` operator metric. They are not joined or emitted,
not even as null-padded results for `LEFT JOIN`, because the matching
build-side version may already have been cleaned up.
+
**注意:** 事件时间 temporal join 需要包含主键相等的条件,即:`currency_rates` 表的主键
`currency_rates.currency` 包含在条件 `orders.currency = currency_rates.currency` 中。
与 [regular joins](#regular-joins) 相比,就算 build side(例子中的 currency_rates
表)发生变更了,之前的 temporal table 的结果也不会被影响。
diff --git a/docs/content/docs/sql/reference/queries/joins.md
b/docs/content/docs/sql/reference/queries/joins.md
index be99c737d3f..d000a46b822 100644
--- a/docs/content/docs/sql/reference/queries/joins.md
+++ b/docs/content/docs/sql/reference/queries/joins.md
@@ -186,6 +186,8 @@ o_002 12.51 EUR 1.10 12:06:00
The `INTERVAL` time subtraction is used to wait for late events in order to
make sure the join will meet the expectation.
Please ensure both sides of the join have set watermark correctly.
+**Note:** Probe-side (left) records that arrive late (their event time is less
than or equal to the current watermark) are dropped on arrival and counted by
the `numLateRecordsDropped` operator metric. They are not joined or emitted,
not even as null-padded results for `LEFT JOIN`, because the matching
build-side version may already have been cleaned up.
+
**Note:** The event-time temporal join requires the primary key contained in
the equivalence condition of the temporal join condition, e.g., The primary key
`currency_rates.currency` of table `currency_rates` to be constrained in the
condition `orders.currency = currency_rates.currency`.
In contrast to [regular joins](#regular-joins), the previous temporal table
results will not be affected despite the changes on the build side.
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinTestPrograms.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinTestPrograms.java
index ed83169c74a..b8a03ffacf9 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinTestPrograms.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinTestPrograms.java
@@ -45,7 +45,7 @@ public class TemporalJoinTestPrograms {
Row.of(3L, "Euro", "2020-10-10 00:00:45"))
.producedAfterRestore(
Row.of(1L, "Euro", "2020-10-10 00:00:58"),
- Row.of(1L, "USD", "2020-10-10 00:00:58"))
+ Row.of(1L, "USD", "2020-10-10 00:00:59"))
.build();
static final SourceTestStep ORDERS_WITH_NESTED_ID =
@@ -88,7 +88,7 @@ public class TemporalJoinTestPrograms {
1L,
Row.of("usd"),
mapOf("currency", "USD"),
- "2020-10-10 00:00:58"))
+ "2020-10-10 00:00:59"))
.build();
static final SourceTestStep RATES =
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java
index 940e15c2b4d..71ed75b5970 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.operators.InternalTimer;
@@ -61,9 +62,14 @@ import java.util.TreeMap;
* idea is that between watermarks we are collecting those elements and once
we are sure that there
* will be no updates we emit the correct result and clean up the expired data
in state.
*
- * <p>Cleaning up the state drops all of the "old" values from the probe side,
where "old" is
- * defined as older then the current watermark. Build side is also cleaned up
in the similar
- * fashion, however we always keep at least one record - the latest one - even
if it's past the last
+ * <p>Probe-side records that arrive late (their event time is less than or
equal to the current
+ * watermark) are dropped on arrival and counted via the {@code
numLateRecordsDropped} metric; they
+ * are not joined or emitted (not even as null-padded results for left outer
joins), because the
+ * matching build-side version may already have been cleaned up.
+ *
+ * <p>Cleaning up the state drops all the "old" values from the probe side,
where "old" is defined
+ * as older than the current watermark. Build side is also cleaned up in the
similar fashion,
+ * however we always keep at least one record - the latest one - even if it's
past the last
* watermark.
*
* <p>One more trick is how the emitting results and cleaning up is triggered.
It is achieved by
@@ -84,6 +90,7 @@ public class TemporalRowTimeJoinOperator extends
BaseTwoInputStreamOperatorWithS
private static final String RIGHT_STATE_NAME = "right";
private static final String REGISTERED_TIMER_STATE_NAME = "timer";
private static final String TIMERS_STATE_NAME = "timers";
+ private static final String LATE_ELEMENTS_DROPPED_METRIC_NAME =
"numLateRecordsDropped";
private final boolean isLeftOuterJoin;
private final InternalTypeInfo<RowData> leftType;
@@ -123,6 +130,8 @@ public class TemporalRowTimeJoinOperator extends
BaseTwoInputStreamOperatorWithS
private transient JoinedRowData outRow;
private transient GenericRowData rightNullRow;
+ private transient Counter numLateRecordsDropped;
+
public TemporalRowTimeJoinOperator(
InternalTypeInfo<RowData> leftType,
InternalTypeInfo<RowData> rightType,
@@ -174,13 +183,23 @@ public class TemporalRowTimeJoinOperator extends
BaseTwoInputStreamOperatorWithS
outRow = new JoinedRowData();
rightNullRow = new
GenericRowData(rightType.toRowType().getFieldCount());
collector = new TimestampedCollector<>(output);
+
+ numLateRecordsDropped =
+
getRuntimeContext().getMetricGroup().counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);
}
@Override
public void processElement1(StreamRecord<RowData> element) throws
Exception {
RowData row = element.getValue();
+ long leftTime = getLeftTime(row);
+ if (leftTime <= timerService.currentWatermark()) {
+ // The probe-side record is late. Drop it, because the matching
build-side version may
+ // already have been cleaned up.
+ numLateRecordsDropped.inc();
+ return;
+ }
leftState.put(getNextLeftIndex(), row);
- registerSmallestTimer(getLeftTime(row)); // Timer to emit and clean up
the state
+ registerSmallestTimer(leftTime); // Timer to emit and clean up the
state
registerProcessingCleanupTimer();
}
@@ -441,4 +460,9 @@ public class TemporalRowTimeJoinOperator extends
BaseTwoInputStreamOperatorWithS
static String getRegisteredTimerStateName() {
return REGISTERED_TIMER_STATE_NAME;
}
+
+ @VisibleForTesting
+ Counter getNumLateRecordsDropped() {
+ return numLateRecordsDropped;
+ }
}
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperatorTest.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperatorTest.java
index bff3b8ae73a..2a54ac02fdc 100644
---
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperatorTest.java
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperatorTest.java
@@ -39,9 +39,9 @@ import static org.assertj.core.api.Assertions.assertThat;
class TemporalRowTimeJoinOperatorTest extends TemporalTimeJoinOperatorTestBase
{
/** Test rowtime temporal join. */
@Test
- void testRowTimeTemporalJoin() throws Exception {
+ void testRowTimeInnerTemporalJoin() throws Exception {
List<Object> expectedOutput = new ArrayList<>();
- expectedOutput.add(new Watermark(1));
+ expectedOutput.add(new Watermark(0));
expectedOutput.add(new Watermark(2));
expectedOutput.add(insertRecord(3L, "k1", "1a3", 2L, "k1", "1a2"));
expectedOutput.add(new Watermark(5));
@@ -54,14 +54,12 @@ class TemporalRowTimeJoinOperatorTest extends
TemporalTimeJoinOperatorTestBase {
testRowTimeTemporalJoin(false, expectedOutput);
}
- /** Test rowtime left temporal join. */
@Test
void testRowTimeLeftTemporalJoin() throws Exception {
List<Object> expectedOutput = new ArrayList<>();
- expectedOutput.add(new Watermark(1));
+ expectedOutput.add(new Watermark(0));
expectedOutput.add(insertRecord(1L, "k1", "1a1", null, null, null));
expectedOutput.add(new Watermark(2));
- expectedOutput.add(insertRecord(1L, "k1", "1a1", null, null, null));
expectedOutput.add(insertRecord(3L, "k1", "1a3", 2L, "k1", "1a2"));
expectedOutput.add(new Watermark(5));
expectedOutput.add(insertRecord(6L, "k2", "2a3", 4L, "k2", "2a4"));
@@ -84,8 +82,8 @@ class TemporalRowTimeJoinOperatorTest extends
TemporalTimeJoinOperatorTestBase {
testHarness.open();
- testHarness.processWatermark1(new Watermark(1));
- testHarness.processWatermark2(new Watermark(1));
+ testHarness.processWatermark1(new Watermark(0));
+ testHarness.processWatermark2(new Watermark(0));
testHarness.processElement1(insertRecord(1L, "k1", "1a1"));
testHarness.processElement2(insertRecord(2L, "k1", "1a2"));
@@ -93,7 +91,6 @@ class TemporalRowTimeJoinOperatorTest extends
TemporalTimeJoinOperatorTestBase {
testHarness.processWatermark1(new Watermark(2));
testHarness.processWatermark2(new Watermark(2));
- testHarness.processElement1(insertRecord(1L, "k1", "1a1"));
testHarness.processElement1(insertRecord(3L, "k1", "1a3"));
testHarness.processElement2(insertRecord(4L, "k2", "2a4"));
@@ -194,9 +191,9 @@ class TemporalRowTimeJoinOperatorTest extends
TemporalTimeJoinOperatorTestBase {
}
@Test
- void testRowTimeTemporalJoinOnUpsertSource() throws Exception {
+ void testRowTimeInnerTemporalJoinOnUpsertSource() throws Exception {
List<Object> expectedOutput = new ArrayList<>();
- expectedOutput.add(new Watermark(1));
+ expectedOutput.add(new Watermark(0));
expectedOutput.add(new Watermark(2));
expectedOutput.add(updateAfterRecord(3L, "k1", "1a3", 2L, "k1",
"1a2"));
expectedOutput.add(new Watermark(5));
@@ -212,7 +209,7 @@ class TemporalRowTimeJoinOperatorTest extends
TemporalTimeJoinOperatorTestBase {
@Test
void testRowTimeLeftTemporalJoinOnUpsertSource() throws Exception {
List<Object> expectedOutput = new ArrayList<>();
- expectedOutput.add(new Watermark(1));
+ expectedOutput.add(new Watermark(0));
expectedOutput.add(insertRecord(1L, "k1", "1a1", null, null, null));
expectedOutput.add(new Watermark(2));
expectedOutput.add(updateAfterRecord(3L, "k1", "1a3", 2L, "k1",
"1a2"));
@@ -237,8 +234,8 @@ class TemporalRowTimeJoinOperatorTest extends
TemporalTimeJoinOperatorTestBase {
testHarness.open();
- testHarness.processWatermark1(new Watermark(1));
- testHarness.processWatermark2(new Watermark(1));
+ testHarness.processWatermark1(new Watermark(0));
+ testHarness.processWatermark2(new Watermark(0));
testHarness.processElement1(insertRecord(1L, "k1", "1a1"));
testHarness.processElement2(insertRecord(2L, "k1", "1a2"));
@@ -270,6 +267,96 @@ class TemporalRowTimeJoinOperatorTest extends
TemporalTimeJoinOperatorTestBase {
testHarness.close();
}
+ @Test
+ void testRowTimeInnerTemporalJoinLateRecords() throws Exception {
+ List<Object> expectedOutput = new ArrayList<>();
+ expectedOutput.add(new Watermark(1));
+ expectedOutput.add(insertRecord(3L, "k1", "1a3", 2L, "k1", "2a2"));
+ expectedOutput.add(new Watermark(5));
+ expectedOutput.add(insertRecord(7L, "k1", "1a7", 2L, "k1", "2a2"));
+ expectedOutput.add(new Watermark(8));
+ expectedOutput.add(new Watermark(11));
+ expectedOutput.add(insertRecord(13L, "k2", "1a13", 9L, "k2", "2a9"));
+ expectedOutput.add(new Watermark(13));
+ expectedOutput.add(new Watermark(15));
+
+ testRowTimeTemporalJoinLateRecords(false, expectedOutput);
+ }
+
+ @Test
+ void testRowTimeLeftTemporalJoinLateRecords() throws Exception {
+ List<Object> expectedOutput = new ArrayList<>();
+ expectedOutput.add(new Watermark(1));
+ expectedOutput.add(insertRecord(3L, "k1", "1a3", 2L, "k1", "2a2"));
+ expectedOutput.add(new Watermark(5));
+ expectedOutput.add(insertRecord(7L, "k1", "1a7", 2L, "k1", "2a2"));
+ expectedOutput.add(new Watermark(8));
+ expectedOutput.add(insertRecord(10L, "k2", "1a10", null, null, null));
+ expectedOutput.add(new Watermark(11));
+ expectedOutput.add(insertRecord(13L, "k2", "1a13", 9L, "k2", "2a9"));
+ expectedOutput.add(new Watermark(13));
+ expectedOutput.add(new Watermark(15));
+
+ testRowTimeTemporalJoinLateRecords(true, expectedOutput);
+ }
+
+ /**
+ * Verifies that probe-side records whose event time is less than or equal
to the current
+ * watermark are dropped on arrival: they are not joined, not emitted
(even with a left outer
+ * join), and are counted in the {@code numLateRecordsDropped} metric.
+ */
+ private void testRowTimeTemporalJoinLateRecords(
+ boolean isLeftOuter, List<Object> expectedOutput) throws Exception
{
+ TemporalRowTimeJoinOperator joinOperator =
+ new TemporalRowTimeJoinOperator(
+ rowType, rowType, joinCondition, 0, 0, 0, 0,
isLeftOuter);
+ KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData,
RowData> testHarness =
+ createTestHarness(joinOperator);
+
+ testHarness.open();
+
+ // initialize watermark to 1
+ testHarness.processWatermark1(new Watermark(1));
+ testHarness.processWatermark2(new Watermark(1));
+
+ // Establish a build-side version at time 2 and a non-late probe
record at time 3.
+ testHarness.processElement2(insertRecord(2L, "k1", "2a2"));
+ testHarness.processElement1(insertRecord(3L, "k1", "1a3"));
+ testHarness.processWatermark1(new Watermark(5));
+ testHarness.processWatermark2(new Watermark(5));
+
+ // After Watermark(5), any probe record with leftTime <= 5 is late and
must be dropped.
+ testHarness.processElement1(insertRecord(5L, "k1", "1a5")); //
leftTime == watermark
+ testHarness.processElement1(insertRecord(4L, "k1", "1a4")); //
leftTime < watermark
+ testHarness.processElement1(insertRecord(1L, "k1", "1a1")); //
leftTime << watermark
+ // A non-late probe record should still be processed.
+ testHarness.processElement1(insertRecord(7L, "k1", "1a7"));
+ testHarness.processWatermark1(new Watermark(8));
+ testHarness.processWatermark2(new Watermark(8));
+
+ // A record for late retraction
+ testHarness.processElement1(insertRecord(10L, "k2", "1a10"));
+ testHarness.processWatermark1(new Watermark(11));
+ testHarness.processWatermark2(new Watermark(11));
+
+ // Add a late retraction and a late build-side record
+ testHarness.processElement1(insertRecord(13L, "k2", "1a13"));
+ testHarness.processElement2(insertRecord(9L, "k2", "2a9"));
+ testHarness.processElement1(deleteRecord(10L, "k2", "1a10")); // late
-> dropped
+ testHarness.processWatermark1(new Watermark(13));
+ testHarness.processWatermark2(new Watermark(13));
+
+ // Another late retraction
+ testHarness.processElement1(deleteRecord(13L, "k2", "1a13"));
+ testHarness.processWatermark1(new Watermark(15));
+ testHarness.processWatermark2(new Watermark(15));
+
+ assertor.assertOutputEquals("output wrong.", expectedOutput,
testHarness.getOutput());
+
assertThat(joinOperator.getNumLateRecordsDropped().getCount()).isEqualTo(5L);
+
+ testHarness.close();
+ }
+
private KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData,
RowData>
createTestHarness(TemporalRowTimeJoinOperator
temporalJoinOperator) throws Exception {