fhueske commented on code in PR #28326:
URL: https://github.com/apache/flink/pull/28326#discussion_r3473614925
##########
flink-table/flink-table-test-utils/src/test/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarnessTest.java:
##########
@@ -2240,6 +2240,19 @@ void testWatermarkAdvancesWithoutTimers() throws
Exception {
}
}
+ @Test
+ void testWatermarkAdvancesWithoutOnTimeColumn() throws Exception {
+ try (ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PassthroughPTF.class)
+ .withTableArgument("input", DataTypes.of("ROW<value
INT>"))
+ .build()) {
+
+ harness.processElement(Row.of(42));
+ harness.setWatermark(LocalDateTime.of(2025, 1, 1, 0, 0, 10));
Review Comment:
Can the function access the current WM if no `on_time` attribute was
specified?
Is that something that the harness needs to support?
For example, a function could:
* ask for the current WM
* register a timer for currentWm + x seconds (i.e., firing x event-time secs
in the future)
* handle the timer
For this, the function would not really need an `on_time` attribute defined.
Not sure if the PTF framework supports this case, but if it does, the
harness should support it too with the same behavior and there should be a test
for it.
##########
flink-table/flink-table-test-utils/src/test/java/org/apache/flink/table/runtime/functions/TestHarnessTimerManagerTest.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions;
+
+import org.apache.flink.types.Row;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class TestHarnessTimerManagerTest {
+
+ private static final Row P1 = Row.of("P1");
+ private static final Row P2 = Row.of("P2");
+
+ private static final ThrowingConsumer<Timer, Exception> NOOP_FIRER = t ->
{};
+
+ @Test
+ void testNamedTimerRegistration() {
+ TestHarnessTimerManager manager = new TestHarnessTimerManager();
+
+ manager.register(P1, 1000L, "timer-a");
+ manager.register(P1, 2000L, "timer-b");
+
+ assertThat(manager.getPendingTimers()).hasSize(2);
+ }
+
+ @Test
+ void testNamedTimerReplacementOverwritesTimestamp() throws Exception {
+ TestHarnessTimerManager manager = new TestHarnessTimerManager();
+ List<Timer> fired = new ArrayList<>();
+
+ manager.register(P1, 1000L, "timer-a");
+ manager.register(P1, 3000L, "timer-a");
+
+ assertThat(manager.getPendingTimers()).hasSize(1);
+
assertThat(manager.getPendingTimers().get(0).getTimestamp()).isEqualTo(3000L);
+
+ manager.setTableWatermark("table-a", 2000L);
+ manager.updateGlobalWatermarkAndFireTimers(fired::add);
+ assertThat(fired).isEmpty();
+
+ manager.setTableWatermark("table-a", 3000L);
+ manager.updateGlobalWatermarkAndFireTimers(fired::add);
+ assertThat(fired).hasSize(1);
+ assertThat(fired.get(0).getTimestamp()).isEqualTo(3000L);
+ }
+
+ @Test
+ void testNamedTimerReplacementIsPerPartition() {
+ TestHarnessTimerManager manager = new TestHarnessTimerManager();
+
+ manager.register(P1, 1000L, "timer-a");
+ manager.register(P2, 2000L, "timer-a");
+
+ assertThat(manager.getPendingTimers()).hasSize(2);
+ }
+
+ @Test
+ void testUnnamedTimerRegistration() {
+ TestHarnessTimerManager manager = new TestHarnessTimerManager();
+
+ manager.register(P1, 1000L, null);
+ manager.register(P1, 2000L, null);
+
+ assertThat(manager.getPendingTimers()).hasSize(2);
+ }
+
+ @Test
+ void testUnnamedTimerDeduplicationBySameTimestamp() {
+ TestHarnessTimerManager manager = new TestHarnessTimerManager();
+
+ manager.register(P1, 1000L, null);
+ manager.register(P1, 1000L, null);
+
+ assertThat(manager.getPendingTimers()).hasSize(1);
+ }
+
+ @Test
+ void testUnnamedTimerDeduplicationIsPerPartition() {
+ TestHarnessTimerManager manager = new TestHarnessTimerManager();
+
+ manager.register(P1, 1000L, null);
+ manager.register(P2, 1000L, null);
+
+ assertThat(manager.getPendingTimers()).hasSize(2);
+ }
+
+ @Test
+ void testNamedAndUnnamedTimersAreIndependent() {
+ TestHarnessTimerManager manager = new TestHarnessTimerManager();
+
+ manager.register(P1, 1000L, "timer-a");
+ manager.register(P1, 1000L, null);
+
+ assertThat(manager.getPendingTimers()).hasSize(2);
+ }
+
+ @Test
+ void testGlobalWatermarkIsMinAcrossTables() throws Exception {
+ TestHarnessTimerManager manager = new TestHarnessTimerManager();
+
+ manager.setTableWatermark("table-a", 1000L);
+ manager.setTableWatermark("table-b", 3000L);
+ manager.updateGlobalWatermarkAndFireTimers(NOOP_FIRER);
+
+ assertThat(manager.getGlobalWatermark()).isEqualTo(1000L);
+ }
+
+ @Test
+ void testGlobalWatermarkAdvancesWhenLaggingTableCatchesUp() throws
Exception {
+ TestHarnessTimerManager manager = new TestHarnessTimerManager();
+
+ manager.setTableWatermark("table-a", 1000L);
+ manager.setTableWatermark("table-b", 3000L);
+ manager.updateGlobalWatermarkAndFireTimers(NOOP_FIRER);
+ assertThat(manager.getGlobalWatermark()).isEqualTo(1000L);
+
+ manager.setTableWatermark("table-a", 5000L);
+ manager.updateGlobalWatermarkAndFireTimers(NOOP_FIRER);
+ assertThat(manager.getGlobalWatermark()).isEqualTo(3000L);
+ }
+
+ @Test
+ void testPerTableWatermarkCannotMoveBackward() {
+ TestHarnessTimerManager manager = new TestHarnessTimerManager();
+
+ manager.setTableWatermark("table-a", 2000L);
+ assertThrows(
+ IllegalArgumentException.class, () ->
manager.setTableWatermark("table-a", 1000L));
+ }
+
+ @Test
+ void testPerTableWatermarkTrackedIndependently() {
+ TestHarnessTimerManager manager = new TestHarnessTimerManager();
+
+ manager.setTableWatermark("table-a", 1000L);
+ manager.setTableWatermark("table-b", 3000L);
+
+ assertThat(manager.getWatermarkForTable("table-a")).isEqualTo(1000L);
+ assertThat(manager.getWatermarkForTable("table-b")).isEqualTo(3000L);
+ }
+
+ @Test
+ void testNewTableCanCauseGlobalWatermarkBackwardError() throws Exception {
+ TestHarnessTimerManager manager = new TestHarnessTimerManager();
+
+ manager.setTableWatermark("table-a", 5000L);
+ manager.updateGlobalWatermarkAndFireTimers(NOOP_FIRER);
+ assertThat(manager.getGlobalWatermark()).isEqualTo(5000L);
+
+ manager.setTableWatermark("table-b", 1000L);
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> manager.updateGlobalWatermarkAndFireTimers(NOOP_FIRER));
+ }
+
+ @Test
+ void testTimerFiresWhenWatermarkAdvancesPastTimestamp() throws Exception {
+ TestHarnessTimerManager manager = new TestHarnessTimerManager();
+ List<Timer> fired = new ArrayList<>();
+
+ manager.register(P1, 1000L, "timer-a");
+ manager.setTableWatermark("table-a", 2000L);
+ manager.updateGlobalWatermarkAndFireTimers(fired::add);
+
+ assertThat(fired).hasSize(1);
+ assertThat(fired.get(0).getName()).isEqualTo("timer-a");
+ assertThat(manager.getPendingTimers()).isEmpty();
+ assertThat(manager.getFiredTimers()).hasSize(1);
+ }
+
+ @Test
+ void testTimerDoesNotFireBeforeWatermark() throws Exception {
+ TestHarnessTimerManager manager = new TestHarnessTimerManager();
+ List<Timer> fired = new ArrayList<>();
+
+ manager.register(P1, 3000L, "timer-a");
+ manager.setTableWatermark("table-a", 2000L);
Review Comment:
`manager.setTableWatermark("table-a", 2999L);`
##########
flink-table/flink-table-test-utils/src/test/java/org/apache/flink/table/runtime/functions/TestHarnessTimerManagerTest.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions;
+
+import org.apache.flink.types.Row;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class TestHarnessTimerManagerTest {
+
+ private static final Row P1 = Row.of("P1");
+ private static final Row P2 = Row.of("P2");
+
+ private static final ThrowingConsumer<Timer, Exception> NOOP_FIRER = t ->
{};
+
+ @Test
+ void testNamedTimerRegistration() {
+ TestHarnessTimerManager manager = new TestHarnessTimerManager();
+
+ manager.register(P1, 1000L, "timer-a");
+ manager.register(P1, 2000L, "timer-b");
+
+ assertThat(manager.getPendingTimers()).hasSize(2);
+ }
+
+ @Test
+ void testNamedTimerReplacementOverwritesTimestamp() throws Exception {
+ TestHarnessTimerManager manager = new TestHarnessTimerManager();
+ List<Timer> fired = new ArrayList<>();
+
+ manager.register(P1, 1000L, "timer-a");
+ manager.register(P1, 3000L, "timer-a");
+
+ assertThat(manager.getPendingTimers()).hasSize(1);
+
assertThat(manager.getPendingTimers().get(0).getTimestamp()).isEqualTo(3000L);
+
+ manager.setTableWatermark("table-a", 2000L);
+ manager.updateGlobalWatermarkAndFireTimers(fired::add);
+ assertThat(fired).isEmpty();
+
+ manager.setTableWatermark("table-a", 3000L);
+ manager.updateGlobalWatermarkAndFireTimers(fired::add);
+ assertThat(fired).hasSize(1);
+ assertThat(fired.get(0).getTimestamp()).isEqualTo(3000L);
+ }
+
+ @Test
+ void testNamedTimerReplacementIsPerPartition() {
+ TestHarnessTimerManager manager = new TestHarnessTimerManager();
+
+ manager.register(P1, 1000L, "timer-a");
+ manager.register(P2, 2000L, "timer-a");
+
+ assertThat(manager.getPendingTimers()).hasSize(2);
+ }
+
+ @Test
+ void testUnnamedTimerRegistration() {
+ TestHarnessTimerManager manager = new TestHarnessTimerManager();
+
+ manager.register(P1, 1000L, null);
+ manager.register(P1, 2000L, null);
+
+ assertThat(manager.getPendingTimers()).hasSize(2);
+ }
+
+ @Test
+ void testUnnamedTimerDeduplicationBySameTimestamp() {
+ TestHarnessTimerManager manager = new TestHarnessTimerManager();
+
+ manager.register(P1, 1000L, null);
+ manager.register(P1, 1000L, null);
+
+ assertThat(manager.getPendingTimers()).hasSize(1);
+ }
+
+ @Test
+ void testUnnamedTimerDeduplicationIsPerPartition() {
+ TestHarnessTimerManager manager = new TestHarnessTimerManager();
+
+ manager.register(P1, 1000L, null);
+ manager.register(P2, 1000L, null);
+
+ assertThat(manager.getPendingTimers()).hasSize(2);
+ }
+
+ @Test
+ void testNamedAndUnnamedTimersAreIndependent() {
+ TestHarnessTimerManager manager = new TestHarnessTimerManager();
+
+ manager.register(P1, 1000L, "timer-a");
+ manager.register(P1, 1000L, null);
+
+ assertThat(manager.getPendingTimers()).hasSize(2);
+ }
+
+ @Test
+ void testGlobalWatermarkIsMinAcrossTables() throws Exception {
+ TestHarnessTimerManager manager = new TestHarnessTimerManager();
+
+ manager.setTableWatermark("table-a", 1000L);
+ manager.setTableWatermark("table-b", 3000L);
+ manager.updateGlobalWatermarkAndFireTimers(NOOP_FIRER);
+
+ assertThat(manager.getGlobalWatermark()).isEqualTo(1000L);
+ }
+
+ @Test
+ void testGlobalWatermarkAdvancesWhenLaggingTableCatchesUp() throws
Exception {
+ TestHarnessTimerManager manager = new TestHarnessTimerManager();
+
+ manager.setTableWatermark("table-a", 1000L);
+ manager.setTableWatermark("table-b", 3000L);
+ manager.updateGlobalWatermarkAndFireTimers(NOOP_FIRER);
+ assertThat(manager.getGlobalWatermark()).isEqualTo(1000L);
+
+ manager.setTableWatermark("table-a", 5000L);
+ manager.updateGlobalWatermarkAndFireTimers(NOOP_FIRER);
+ assertThat(manager.getGlobalWatermark()).isEqualTo(3000L);
+ }
+
+ @Test
+ void testPerTableWatermarkCannotMoveBackward() {
+ TestHarnessTimerManager manager = new TestHarnessTimerManager();
+
+ manager.setTableWatermark("table-a", 2000L);
+ assertThrows(
+ IllegalArgumentException.class, () ->
manager.setTableWatermark("table-a", 1000L));
+ }
+
+ @Test
+ void testPerTableWatermarkTrackedIndependently() {
+ TestHarnessTimerManager manager = new TestHarnessTimerManager();
+
+ manager.setTableWatermark("table-a", 1000L);
+ manager.setTableWatermark("table-b", 3000L);
+
+ assertThat(manager.getWatermarkForTable("table-a")).isEqualTo(1000L);
+ assertThat(manager.getWatermarkForTable("table-b")).isEqualTo(3000L);
+ }
+
+ @Test
+ void testNewTableCanCauseGlobalWatermarkBackwardError() throws Exception {
+ TestHarnessTimerManager manager = new TestHarnessTimerManager();
+
+ manager.setTableWatermark("table-a", 5000L);
+ manager.updateGlobalWatermarkAndFireTimers(NOOP_FIRER);
+ assertThat(manager.getGlobalWatermark()).isEqualTo(5000L);
+
+ manager.setTableWatermark("table-b", 1000L);
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> manager.updateGlobalWatermarkAndFireTimers(NOOP_FIRER));
+ }
+
+ @Test
+ void testTimerFiresWhenWatermarkAdvancesPastTimestamp() throws Exception {
+ TestHarnessTimerManager manager = new TestHarnessTimerManager();
+ List<Timer> fired = new ArrayList<>();
+
+ manager.register(P1, 1000L, "timer-a");
+ manager.setTableWatermark("table-a", 2000L);
+ manager.updateGlobalWatermarkAndFireTimers(fired::add);
+
+ assertThat(fired).hasSize(1);
+ assertThat(fired.get(0).getName()).isEqualTo("timer-a");
+ assertThat(manager.getPendingTimers()).isEmpty();
+ assertThat(manager.getFiredTimers()).hasSize(1);
+ }
+
+ @Test
+ void testTimerDoesNotFireBeforeWatermark() throws Exception {
+ TestHarnessTimerManager manager = new TestHarnessTimerManager();
+ List<Timer> fired = new ArrayList<>();
+
+ manager.register(P1, 3000L, "timer-a");
+ manager.setTableWatermark("table-a", 2000L);
+ manager.updateGlobalWatermarkAndFireTimers(fired::add);
+
+ assertThat(fired).isEmpty();
+ assertThat(manager.getPendingTimers()).hasSize(1);
+ }
+
+ @Test
+ void testTimerFiresAtExactWatermark() throws Exception {
+ TestHarnessTimerManager manager = new TestHarnessTimerManager();
+ List<Timer> fired = new ArrayList<>();
+
+ manager.register(P1, 2000L, "timer-a");
+ manager.setTableWatermark("table-a", 2000L);
+ manager.updateGlobalWatermarkAndFireTimers(fired::add);
+
+ assertThat(fired).hasSize(1);
+ }
+
+ @Test
+ void testFiringOrderIsDeterministic() throws Exception {
+ TestHarnessTimerManager manager = new TestHarnessTimerManager();
+ List<String> firingOrder = new ArrayList<>();
+
+ manager.register(P1, 2000L, "b-timer");
+ manager.register(P1, 1000L, "c-timer");
+ manager.register(P1, 1000L, "a-timer");
+ manager.register(P2, 1000L, "a-timer");
+
+ manager.setTableWatermark("table-a", 3000L);
+ manager.updateGlobalWatermarkAndFireTimers(
+ t -> firingOrder.add(t.getKey() + ":" + t.getName()));
+
+ assertThat(firingOrder)
+ .containsExactly(
+ "+I[P1]:a-timer", "+I[P2]:a-timer", "+I[P1]:c-timer",
"+I[P1]:b-timer");
+ }
+
+ @Test
+ void testCascadingTimerFiring() throws Exception {
+ TestHarnessTimerManager manager = new TestHarnessTimerManager();
+ List<String> fired = new ArrayList<>();
+
+ manager.register(P1, 1000L, "first");
+ manager.setTableWatermark("table-a", 3000L);
+ manager.updateGlobalWatermarkAndFireTimers(
+ t -> {
+ fired.add(t.getName());
+ if ("first".equals(t.getName())) {
+ manager.register(P1, 2000L, "within");
+ manager.register(P1, 5000L, "beyond");
+ }
+ });
+
+ assertThat(fired).containsExactly("first", "within");
+ assertThat(manager.getPendingTimers()).hasSize(1);
+
assertThat(manager.getPendingTimers().get(0).getName()).isEqualTo("beyond");
+ assertThat(manager.getFiredTimers()).hasSize(2);
+ }
+
+ @Test
+ void testTimerClearedDuringFiringCallbackDoesNotFire() throws Exception {
+ TestHarnessTimerManager manager = new TestHarnessTimerManager();
+ List<String> fired = new ArrayList<>();
+
+ manager.register(P1, 1000L, "first");
+ manager.register(P1, 1000L, "second");
+
+ manager.setTableWatermark("table-a", 2000L);
+ manager.updateGlobalWatermarkAndFireTimers(
+ t -> {
+ fired.add(t.getName());
+ if ("first".equals(t.getName())) {
+ manager.clearByName(P1, "second");
+ }
+ });
+
+ assertThat(fired).containsExactly("first");
Review Comment:
also assert that "second" is not pending?
##########
flink-table/flink-table-test-utils/src/test/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarnessTest.java:
##########
@@ -2455,11 +2457,13 @@ void testPassThroughWithTimersIsRejected() throws
Exception {
.withOnTimeColumn("ts")
.build()) {
- assertThrows(
- TableRuntimeException.class,
- () ->
- harness.processElement(
- Row.of("P1", LocalDateTime.of(2025, 1, 1,
0, 0, 1))));
+ TableRuntimeException e =
+ assertThrows(
+ TableRuntimeException.class,
+ () ->
+ harness.processElement(
Review Comment:
Wouldn't it be better to throw when the harness is constructed?
All information should be present there already, correct?
Never mind, if this requires a major refactoring.
##########
flink-table/flink-table-test-utils/src/test/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarnessTest.java:
##########
@@ -1728,4 +1741,888 @@ void testPartitionKeyValidationOnClearState() throws
Exception {
harness.close();
}
+
+ //
-------------------------------------------------------------------------
+ // Timer PTFs
+ //
-------------------------------------------------------------------------
+
+ @DataTypeHint("ROW<message STRING>")
+ public static class TimerPTF extends ProcessTableFunction<Row> {
+ public void eval(
+ Context ctx,
+ @ArgumentHint({ArgumentTrait.SET_SEMANTIC_TABLE,
ArgumentTrait.REQUIRE_ON_TIME})
+ Row input) {
+ String name = input.getFieldAs("name");
+ TimeContext<LocalDateTime> timeCtx =
ctx.timeContext(LocalDateTime.class);
+ timeCtx.registerOnTime("timeout-" + name,
timeCtx.time().plus(Duration.ofSeconds(5)));
+ collect(Row.of("registered-" + name));
+ }
+
+ public void onTimer(OnTimerContext ctx) {
+ collect(Row.of("timer-fired-" + ctx.currentTimer()));
+ }
+ }
+
+ @DataTypeHint("ROW<message STRING>")
+ public static class UnnamedTimerPTF extends ProcessTableFunction<Row> {
+ public void eval(
+ Context ctx,
+ @ArgumentHint({ArgumentTrait.SET_SEMANTIC_TABLE,
ArgumentTrait.REQUIRE_ON_TIME})
+ Row input) {
+ TimeContext<LocalDateTime> timeCtx =
ctx.timeContext(LocalDateTime.class);
+ timeCtx.registerOnTime(timeCtx.time().plus(Duration.ofSeconds(5)));
+ collect(Row.of("registered"));
+ }
+
+ public void onTimer(OnTimerContext ctx) {
+ String timerName = ctx.currentTimer();
+ collect(Row.of("fired-unnamed-" + (timerName == null ? "null" :
timerName)));
+ }
+ }
+
+ @DataTypeHint("ROW<message STRING>")
+ public static class TimerWithStatePTF extends ProcessTableFunction<Row> {
+ public static class CounterState {
+ public long count = 0L;
+ }
+
+ public void eval(
+ Context ctx,
+ @StateHint CounterState state,
+ @ArgumentHint({ArgumentTrait.SET_SEMANTIC_TABLE,
ArgumentTrait.REQUIRE_ON_TIME})
+ Row input) {
+ state.count++;
+ TimeContext<LocalDateTime> timeCtx =
ctx.timeContext(LocalDateTime.class);
+ timeCtx.registerOnTime("check",
timeCtx.time().plus(Duration.ofSeconds(5)));
+ }
+
+ public void onTimer(OnTimerContext ctx, @StateHint CounterState state)
{
+ collect(Row.of("count=" + state.count));
+ }
+ }
+
+ @DataTypeHint("ROW<message STRING>")
+ public static class PassThroughTimerPTF extends ProcessTableFunction<Row> {
+ public void eval(
+ Context ctx,
+ @ArgumentHint({
+ ArgumentTrait.SET_SEMANTIC_TABLE,
+ ArgumentTrait.PASS_COLUMNS_THROUGH,
+ ArgumentTrait.REQUIRE_ON_TIME
+ })
+ Row input) {
+ TimeContext<LocalDateTime> timeCtx =
ctx.timeContext(LocalDateTime.class);
+ timeCtx.registerOnTime("timer",
timeCtx.time().plus(Duration.ofSeconds(5)));
+ collect(Row.of("registered"));
+ }
+
+ public void onTimer(OnTimerContext ctx) {
+ collect(Row.of("fired"));
+ }
+ }
+
+ @DataTypeHint("ROW<message STRING>")
+ public static class NoOnTimerPTF extends ProcessTableFunction<Row> {
+ public void eval(
+ Context ctx,
+ @ArgumentHint({ArgumentTrait.SET_SEMANTIC_TABLE,
ArgumentTrait.REQUIRE_ON_TIME})
+ Row input) {
+ TimeContext<LocalDateTime> timeCtx =
ctx.timeContext(LocalDateTime.class);
+ timeCtx.registerOnTime("timer",
timeCtx.time().plus(Duration.ofSeconds(10)));
+ collect(Row.of("registered"));
+ }
+ }
+
+ @DataTypeHint("ROW<message STRING>")
+ public static class MultipleOnTimerPTF extends ProcessTableFunction<Row> {
+ public void eval(
+ Context ctx,
+ @ArgumentHint({ArgumentTrait.SET_SEMANTIC_TABLE,
ArgumentTrait.REQUIRE_ON_TIME})
+ Row input) {
+ TimeContext<LocalDateTime> timeCtx =
ctx.timeContext(LocalDateTime.class);
+ timeCtx.registerOnTime("timer",
timeCtx.time().plus(Duration.ofSeconds(5)));
+ collect(Row.of("registered"));
+ }
+
+ public void onTimer(OnTimerContext ctx) {
+ collect(Row.of("fired-no-state"));
+ }
+
+ public void onTimer(OnTimerContext ctx, @StateHint String state) {
+ collect(Row.of("fired-with-state"));
+ }
+ }
+
+ @DataTypeHint("ROW<message STRING>")
+ public static class CascadingTimerPTF extends ProcessTableFunction<Row> {
+ public void eval(
+ Context ctx,
+ @ArgumentHint({ArgumentTrait.SET_SEMANTIC_TABLE,
ArgumentTrait.REQUIRE_ON_TIME})
+ Row input) {
+ TimeContext<LocalDateTime> timeCtx =
ctx.timeContext(LocalDateTime.class);
+ timeCtx.registerOnTime("first",
timeCtx.time().plus(Duration.ofSeconds(5)));
+ collect(Row.of("eval"));
+ }
+
+ public void onTimer(OnTimerContext ctx) {
+ String name = ctx.currentTimer();
+ collect(Row.of("fired-" + name));
+ if ("first".equals(name)) {
+ TimeContext<LocalDateTime> timeCtx =
ctx.timeContext(LocalDateTime.class);
+ timeCtx.registerOnTime("second",
timeCtx.time().minus(Duration.ofSeconds(2)));
+ }
+ }
+ }
+
+ @DataTypeHint("ROW<message STRING>")
+ public static class MultiTableTimerPTF extends ProcessTableFunction<Row> {
+ public void eval(
+ Context ctx,
+ @ArgumentHint({ArgumentTrait.SET_SEMANTIC_TABLE,
ArgumentTrait.REQUIRE_ON_TIME})
+ Row leftTable,
+ @ArgumentHint({ArgumentTrait.SET_SEMANTIC_TABLE,
ArgumentTrait.REQUIRE_ON_TIME})
+ Row rightTable) {
+ if (leftTable != null) {
+ TimeContext<LocalDateTime> timeCtx =
ctx.timeContext(LocalDateTime.class);
+ timeCtx.registerOnTime("check",
timeCtx.time().plus(Duration.ofSeconds(5)));
+ collect(Row.of("left"));
+ }
+ if (rightTable != null) {
+ collect(Row.of("right"));
+ }
+ }
+
+ public void onTimer(OnTimerContext ctx) {
+ collect(Row.of("timer-fired"));
+ }
+ }
+
+ @DataTypeHint("ROW<message STRING>")
+ public static class ContextClearStatePTF extends ProcessTableFunction<Row>
{
+ public static class CounterState {
+ public long count = 0L;
+ }
+
+ public void eval(
+ Context ctx,
+ @StateHint CounterState state,
+ @ArgumentHint({ArgumentTrait.SET_SEMANTIC_TABLE,
ArgumentTrait.REQUIRE_ON_TIME})
+ Row input) {
+ String action = input.getFieldAs("action");
+ if ("increment".equals(action)) {
+ if (state == null) {
+ state = new CounterState();
+ }
+ state.count++;
+ collect(Row.of("count=" + state.count));
+ } else if ("clear-state".equals(action)) {
+ ctx.clearState("state");
+ collect(Row.of("cleared"));
+ } else if ("clear-all-state".equals(action)) {
+ ctx.clearAllState();
+ collect(Row.of("cleared-all"));
+ } else if ("clear-all".equals(action)) {
+ ctx.clearAll();
+ collect(Row.of("cleared-everything"));
+ } else if ("register-timer".equals(action)) {
+ TimeContext<LocalDateTime> timeCtx =
ctx.timeContext(LocalDateTime.class);
+ timeCtx.registerOnTime("t",
timeCtx.time().plus(Duration.ofHours(1)));
+ collect(Row.of("timer-registered"));
+ }
+ }
+
+ public void onTimer(OnTimerContext ctx, @StateHint CounterState state)
{
+ collect(Row.of("timer-fired"));
+ }
+ }
+
+ @DataTypeHint("ROW<message STRING>")
+ public static class PojoTimerPTF extends ProcessTableFunction<Row> {
+ public void eval(
+ Context ctx,
+ @ArgumentHint({ArgumentTrait.SET_SEMANTIC_TABLE,
ArgumentTrait.REQUIRE_ON_TIME})
+ TimedEvent input) {
+ TimeContext<LocalDateTime> timeCtx =
ctx.timeContext(LocalDateTime.class);
+ timeCtx.registerOnTime("check",
timeCtx.time().plus(Duration.ofSeconds(5)));
+ collect(Row.of("registered-" + input.key));
+ }
+
+ public void onTimer(OnTimerContext ctx) {
+ collect(Row.of("fired-" + ctx.currentTimer()));
+ }
+ }
+
+ @DataTypeHint("ROW<partitionCols STRING, timeCol INT, changelogMode
STRING>")
+ public static class ContextSemanticsIntrospectionPTF extends
ProcessTableFunction<Row> {
+ public void eval(
+ Context ctx,
+ @ArgumentHint({ArgumentTrait.SET_SEMANTIC_TABLE,
ArgumentTrait.REQUIRE_ON_TIME})
+ Row input) {
+ TableSemantics semantics = ctx.tableSemanticsFor("input");
+ collect(
+ Row.of(
+
java.util.Arrays.toString(semantics.partitionByColumns()),
+ semantics.timeColumn(),
+ ctx.getChangelogMode().toString()));
+ }
+ }
+
+ @DataTypeHint("ROW<message STRING>")
+ public static class ClearTimerPTF extends ProcessTableFunction<Row> {
+ public void eval(
+ Context ctx,
+ @ArgumentHint({ArgumentTrait.SET_SEMANTIC_TABLE,
ArgumentTrait.REQUIRE_ON_TIME})
+ Row input) {
+ TimeContext<LocalDateTime> timeCtx =
ctx.timeContext(LocalDateTime.class);
+ String action = input.getFieldAs("action");
+ if ("register-named".equals(action)) {
+ timeCtx.registerOnTime("myTimer",
timeCtx.time().plus(Duration.ofSeconds(5)));
+ } else if ("register-unnamed".equals(action)) {
+
timeCtx.registerOnTime(timeCtx.time().plus(Duration.ofSeconds(5)));
+ } else if ("clear-by-name".equals(action)) {
+ timeCtx.clearTimer("myTimer");
+ } else if ("clear-by-timestamp".equals(action)) {
+ timeCtx.clearTimer(timeCtx.time().plus(Duration.ofSeconds(4)));
+ } else if ("clear-all".equals(action)) {
+ ctx.clearAllTimers();
+ }
+ }
+
+ public void onTimer(OnTimerContext ctx) {
+ collect(
+ Row.of(
+ "fired-"
+ + (ctx.currentTimer() != null
+ ? ctx.currentTimer()
+ : "unnamed")));
+ }
+ }
+
+ @DataTypeHint("ROW<message STRING>")
+ public static class OnTimePTF extends ProcessTableFunction<Row> {
+ public void eval(
+ Context ctx,
+ @ArgumentHint({ArgumentTrait.SET_SEMANTIC_TABLE,
ArgumentTrait.REQUIRE_ON_TIME})
+ Row input) {
+ String name = input.getFieldAs("name");
+ collect(Row.of("hello-" + name));
+ }
+ }
+
+ //
-------------------------------------------------------------------------
+ // Context Tests
+ //
-------------------------------------------------------------------------
+
+ @Test
+ void testPerTableWatermarkTimerFiring() throws Exception {
+ try (ProcessTableFunctionTestHarness<Row> harness =
+
ProcessTableFunctionTestHarness.ofClass(MultiTableTimerPTF.class)
+ .withTableArgument(
+ "leftTable", DataTypes.of("ROW<partition
STRING, ts TIMESTAMP(3)>"))
+ .withTableArgument(
+ "rightTable",
+ DataTypes.of("ROW<partition STRING, ts
TIMESTAMP(3)>"))
+ .withPartitionBy("leftTable", "partition")
+ .withPartitionBy("rightTable", "partition")
+ .withOnTimeColumn("ts")
+ .build()) {
+
+ // Register a timer at t=5000 via the left table
+ harness.processElementForTable(
+ "leftTable", Row.of("P1", LocalDateTime.of(2025, 1, 1, 0,
0, 1)));
+ harness.clearOutput();
+
+ // Advance only left table past the timer — global watermark is
still min(left, right)
+ // Since right has no watermark yet, global won't advance enough
to fire
+ harness.setWatermarkForTable("rightTable", LocalDateTime.of(2025,
1, 1, 0, 0, 3));
+ harness.setWatermarkForTable("leftTable", LocalDateTime.of(2025,
1, 1, 0, 0, 10));
+ assertThat(harness.getFiredTimers()).isEmpty();
+
+ // Now advance right table past the timer — global watermark
catches up, timer fires
+ harness.setWatermarkForTable("rightTable", LocalDateTime.of(2025,
1, 1, 0, 0, 10));
+ assertThat(harness.getFiredTimers()).hasSize(1);
+ // Both tables' partition keys are prepended (one "P1" per table)
+ assertThat(harness.getOutput())
+ .containsExactly(
+ Row.of(
+ "P1",
+ "P1",
+ "timer-fired",
+ LocalDateTime.of(2025, 1, 1, 0, 0, 6)));
+ }
+ }
+
+ @Test
+ void testContextClearState() throws Exception {
+ try (ProcessTableFunctionTestHarness<Row> harness =
+
ProcessTableFunctionTestHarness.ofClass(ContextClearStatePTF.class)
+ .withTableArgument(
+ "input",
+ DataTypes.of(
+ "ROW<partition STRING, action STRING,
ts TIMESTAMP(3)>"))
+ .withPartitionBy("input", "partition")
+ .withOnTimeColumn("ts")
+ .build()) {
+
+ harness.processElement(
+ Row.of("P1", "increment", LocalDateTime.of(2025, 1, 1, 0,
0, 1)));
+ harness.processElement(
+ Row.of("P1", "increment", LocalDateTime.of(2025, 1, 1, 0,
0, 2)));
+ assertThat(harness.getOutput())
+ .containsExactly(
+ Row.of("P1", "count=1", LocalDateTime.of(2025, 1,
1, 0, 0, 1)),
+ Row.of("P1", "count=2", LocalDateTime.of(2025, 1,
1, 0, 0, 2)));
+ harness.clearOutput();
+
+ harness.processElement(
+ Row.of("P1", "clear-state", LocalDateTime.of(2025, 1, 1,
0, 0, 3)));
+ harness.processElement(
+ Row.of("P1", "increment", LocalDateTime.of(2025, 1, 1, 0,
0, 4)));
+ assertThat(harness.getOutput())
+ .containsExactly(
+ Row.of("P1", "cleared", LocalDateTime.of(2025, 1,
1, 0, 0, 3)),
+ Row.of("P1", "count=1", LocalDateTime.of(2025, 1,
1, 0, 0, 4)));
+ }
+ }
+
+ @Test
+ void testContextClearAllState() throws Exception {
+ try (ProcessTableFunctionTestHarness<Row> harness =
+
ProcessTableFunctionTestHarness.ofClass(ContextClearStatePTF.class)
+ .withTableArgument(
+ "input",
+ DataTypes.of(
+ "ROW<partition STRING, action STRING,
ts TIMESTAMP(3)>"))
+ .withPartitionBy("input", "partition")
+ .withOnTimeColumn("ts")
+ .build()) {
+
+ harness.processElement(
+ Row.of("P1", "increment", LocalDateTime.of(2025, 1, 1, 0,
0, 1)));
+ harness.clearOutput();
+
+ harness.processElement(
+ Row.of("P1", "clear-all-state", LocalDateTime.of(2025, 1,
1, 0, 0, 2)));
+ harness.processElement(
+ Row.of("P1", "increment", LocalDateTime.of(2025, 1, 1, 0,
0, 3)));
+ assertThat(harness.getOutput())
+ .containsExactly(
+ Row.of("P1", "cleared-all", LocalDateTime.of(2025,
1, 1, 0, 0, 2)),
+ Row.of("P1", "count=1", LocalDateTime.of(2025, 1,
1, 0, 0, 3)));
+ }
+ }
+
+ @Test
+ void testContextClearAll() throws Exception {
+ try (ProcessTableFunctionTestHarness<Row> harness =
+
ProcessTableFunctionTestHarness.ofClass(ContextClearStatePTF.class)
+ .withTableArgument(
+ "input",
+ DataTypes.of(
+ "ROW<partition STRING, action STRING,
ts TIMESTAMP(3)>"))
+ .withPartitionBy("input", "partition")
+ .withOnTimeColumn("ts")
+ .build()) {
+
+ harness.processElement(
+ Row.of("P1", "increment", LocalDateTime.of(2025, 1, 1, 0,
0, 1)));
+ harness.processElement(
+ Row.of("P1", "register-timer", LocalDateTime.of(2025, 1,
1, 0, 0, 2)));
+ assertThat(harness.getPendingTimers()).hasSize(1);
+ harness.clearOutput();
+
+ harness.processElement(
+ Row.of("P1", "clear-all", LocalDateTime.of(2025, 1, 1, 0,
0, 3)));
+ assertThat(harness.getPendingTimers()).isEmpty();
+
+ harness.processElement(
+ Row.of("P1", "increment", LocalDateTime.of(2025, 1, 1, 0,
0, 4)));
+ assertThat(harness.getOutput())
+ .containsExactly(
+ Row.of(
+ "P1",
+ "cleared-everything",
+ LocalDateTime.of(2025, 1, 1, 0, 0, 3)),
+ Row.of("P1", "count=1", LocalDateTime.of(2025, 1,
1, 0, 0, 4)));
+ }
+ }
+
+ @Test
+ void testContextTableSemanticsAndChangelogMode() throws Exception {
+ try (ProcessTableFunctionTestHarness<Row> harness =
+
ProcessTableFunctionTestHarness.ofClass(ContextSemanticsIntrospectionPTF.class)
+ .withTableArgument(
+ "input", DataTypes.of("ROW<partition STRING,
ts TIMESTAMP(3)>"))
+ .withPartitionBy("input", "partition")
+ .withOnTimeColumn("ts")
+ .build()) {
+
+ harness.processElement(Row.of("P1", LocalDateTime.of(2025, 1, 1,
0, 0, 1)));
+
+ assertThat(harness.getOutput()).hasSize(1);
+ Row result = harness.getOutput().get(0);
+ // Field 0 is prepended partition key
+ assertThat(result.getFieldAs(0).toString()).isEqualTo("P1");
+ // partitionByColumns: [0] (index of "partition" column)
+ assertThat(result.getFieldAs(1).toString()).isEqualTo("[0]");
+ // timeColumn: 1 (index of "ts" column)
+ assertThat((int) result.getFieldAs(2)).isEqualTo(1);
+ // changelogMode: insert-only
+ assertThat(result.getFieldAs(3).toString())
+ .isEqualTo(ChangelogMode.insertOnly().toString());
+ }
+ }
+
+ @Test
+ void testPojoInputWithOnTimeColumn() throws Exception {
+ try (ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PojoTimerPTF.class)
+ .withTableArgument("input")
+ .withPartitionBy("input", "key")
+ .withOnTimeColumn("ts")
+ .build()) {
+
+ harness.processElement(Row.of("P1", LocalDateTime.of(2025, 1, 1,
0, 0, 1)));
+ assertThat(harness.getOutput())
+ .containsExactly(
+ Row.of("P1", "registered-P1",
LocalDateTime.of(2025, 1, 1, 0, 0, 1)));
+ assertThat(harness.getPendingTimers()).hasSize(1);
+ harness.clearOutput();
+
+ harness.setWatermark(LocalDateTime.of(2025, 1, 1, 0, 0, 10));
+ assertThat(harness.getFiredTimers()).hasSize(1);
+ assertThat(harness.getOutput())
+ .containsExactly(
+ Row.of("P1", "fired-check", LocalDateTime.of(2025,
1, 1, 0, 0, 6)));
+ }
+ }
+
+ //
-------------------------------------------------------------------------
+ // Time / On-Time Column Tests
+ //
-------------------------------------------------------------------------
+
+ @Test
+ void testOnTimeColumnAppendedToEvalOutput() throws Exception {
+ try (ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(OnTimePTF.class)
+ .withTableArgument(
+ "input",
+ DataTypes.of("ROW<partition STRING, name
STRING, ts TIMESTAMP(3)>"))
+ .withPartitionBy("input", "partition")
+ .withOnTimeColumn("ts")
+ .build()) {
+
+ harness.processElement(Row.of("P1", "Alice",
LocalDateTime.of(2025, 1, 1, 0, 0, 1)));
+ assertThat(harness.getOutput())
+ .containsExactly(
+ Row.of("P1", "hello-Alice", LocalDateTime.of(2025,
1, 1, 0, 0, 1)));
+ }
+ }
+
+ @Test
+ void testOnTimeColumnWithMultipleRows() throws Exception {
+ try (ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(OnTimePTF.class)
+ .withTableArgument(
+ "input",
+ DataTypes.of("ROW<partition STRING, name
STRING, ts TIMESTAMP(3)>"))
+ .withPartitionBy("input", "partition")
+ .withOnTimeColumn("ts")
+ .build()) {
+
+ harness.processElement(Row.of("P1", "Alice",
LocalDateTime.of(2025, 1, 1, 0, 0, 1)));
+ harness.processElement(Row.of("P1", "Bob", LocalDateTime.of(2025,
1, 1, 0, 0, 5)));
+ harness.processElement(Row.of("P2", "Carol",
LocalDateTime.of(2025, 1, 1, 0, 0, 3)));
+
+ assertThat(harness.getOutput())
+ .containsExactly(
+ Row.of("P1", "hello-Alice", LocalDateTime.of(2025,
1, 1, 0, 0, 1)),
+ Row.of("P1", "hello-Bob", LocalDateTime.of(2025,
1, 1, 0, 0, 5)),
+ Row.of("P2", "hello-Carol", LocalDateTime.of(2025,
1, 1, 0, 0, 3)));
+ }
+ }
+
+ @Test
+ void testWatermarkAdvancesWithoutTimers() throws Exception {
+ try (ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(OnTimePTF.class)
Review Comment:
I assume that you added `testWatermarkAdvancesWithoutOnTimeColumn()` to
address this comment.
The new test is good, but IIRC I wanted to suggest to add a test that checks
the correct error message of creating a harness for a function that requires
`on_time` but with no `on_time` arg specified.
Sorry for the ambiguity
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]