This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new fdc3395d5e2 [FLINK-37618][table-planner] Fix PTFs INTERVAL argument
fdc3395d5e2 is described below
commit fdc3395d5e24daab52a40414434b4458f53bd400
Author: Juntao Zhang <[email protected]>
AuthorDate: Thu Nov 27 23:28:06 2025 +0800
[FLINK-37618][table-planner] Fix PTFs INTERVAL argument
This closes #26410.
---
.../types/logical/utils/LogicalTypeCasts.java | 30 +++++++++++++++++++
.../table/types/LogicalTypeCastAvoidanceTest.java | 12 ++++++++
.../stream/ProcessTableFunctionSemanticTests.java | 2 ++
.../stream/ProcessTableFunctionTestPrograms.java | 26 +++++++++++++++++
.../exec/stream/ProcessTableFunctionTestUtils.java | 16 ++++++++++
.../plan/stream/sql/ProcessTableFunctionTest.java | 14 +++++++++
.../plan/stream/sql/ProcessTableFunctionTest.xml | 34 ++++++++++++++++++++++
7 files changed, 134 insertions(+)
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java
index 54c9c7d1260..310bc71e29b 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.types.logical.utils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
import org.apache.flink.table.types.logical.DistinctType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeFamily;
@@ -28,6 +29,7 @@ import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.StructuredType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
import java.util.Arrays;
import java.util.HashMap;
@@ -74,7 +76,10 @@ import static
org.apache.flink.table.types.logical.LogicalTypeRoot.TIME_WITHOUT_
import static org.apache.flink.table.types.logical.LogicalTypeRoot.TINYINT;
import static org.apache.flink.table.types.logical.LogicalTypeRoot.VARBINARY;
import static org.apache.flink.table.types.logical.LogicalTypeRoot.VARCHAR;
+import static
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getDayPrecision;
+import static
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFractionalPrecision;
import static
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getLength;
+import static
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getYearPrecision;
import static
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isSingleFieldInterval;
/**
@@ -538,6 +543,31 @@ public final class LogicalTypeCasts {
this.sourceType = sourceType;
}
+ @Override
+ public Boolean visit(YearMonthIntervalType targetType) {
+ if (sourceType.isNullable() && !targetType.isNullable()) {
+ return false;
+ }
+ if (sourceType.is(LogicalTypeRoot.INTERVAL_YEAR_MONTH)
+ && getYearPrecision(sourceType) <=
targetType.getYearPrecision()) {
+ return true;
+ }
+ return defaultMethod(targetType);
+ }
+
+ @Override
+ public Boolean visit(DayTimeIntervalType targetType) {
+ if (sourceType.isNullable() && !targetType.isNullable()) {
+ return false;
+ }
+ if (sourceType.is(LogicalTypeRoot.INTERVAL_DAY_TIME)
+ && getDayPrecision(sourceType) <=
targetType.getDayPrecision()
+ && getFractionalPrecision(sourceType) <=
targetType.getFractionalPrecision()) {
+ return true;
+ }
+ return defaultMethod(targetType);
+ }
+
@Override
public Boolean visit(VarCharType targetType) {
if (sourceType.isNullable() && !targetType.isNullable()) {
diff --git
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeCastAvoidanceTest.java
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeCastAvoidanceTest.java
index 5a8bf94e969..7c214fc1fbb 100644
---
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeCastAvoidanceTest.java
+++
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeCastAvoidanceTest.java
@@ -100,12 +100,24 @@ class LogicalTypeCastAvoidanceTest {
new YearMonthIntervalType(
YearMonthIntervalType.YearMonthResolution.YEAR_TO_MONTH, 2),
new
YearMonthIntervalType(YearMonthIntervalType.YearMonthResolution.MONTH),
+ true),
+ of(
+ new YearMonthIntervalType(
+
YearMonthIntervalType.YearMonthResolution.YEAR_TO_MONTH, 4),
+ new YearMonthIntervalType(
+
YearMonthIntervalType.YearMonthResolution.MONTH, 2),
false),
of(
new DayTimeIntervalType(
DayTimeIntervalType.DayTimeResolution.DAY_TO_SECOND, 2, 6),
new DayTimeIntervalType(
DayTimeIntervalType.DayTimeResolution.DAY_TO_SECOND, 2, 7),
+ true),
+ of(
+ new DayTimeIntervalType(
+
DayTimeIntervalType.DayTimeResolution.DAY_TO_SECOND, 2, 7),
+ new DayTimeIntervalType(
+
DayTimeIntervalType.DayTimeResolution.DAY_TO_SECOND, 2, 6),
false),
of(new ArrayType(new TimestampType()), new ArrayType(new
SmallIntType()), false),
of(
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java
index 72a95b43696..27f0b42c317 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java
@@ -56,6 +56,8 @@ public class ProcessTableFunctionSemanticTests extends
SemanticTestBase {
ProcessTableFunctionTestPrograms.PROCESS_TYPED_SET_SEMANTIC_TABLE,
ProcessTableFunctionTestPrograms.PROCESS_TYPED_SET_SEMANTIC_TABLE_TABLE_API,
ProcessTableFunctionTestPrograms.PROCESS_POJO_ARGS,
+ ProcessTableFunctionTestPrograms.PROCESS_INTERVAL_DAY_ARGS,
+ ProcessTableFunctionTestPrograms.PROCESS_INTERVAL_YEAR_ARGS,
ProcessTableFunctionTestPrograms.PROCESS_EMPTY_ARGS,
ProcessTableFunctionTestPrograms.PROCESS_ROW_SEMANTIC_TABLE_PASS_THROUGH,
ProcessTableFunctionTestPrograms.PROCESS_SET_SEMANTIC_TABLE_PASS_THROUGH,
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java
index 9fadeb81d94..830d4fd5c29 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java
@@ -28,6 +28,8 @@ import
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctio
import
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.ContextFunction;
import
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.DescriptorFunction;
import
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.EmptyArgFunction;
+import
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.IntervalDayArgFunction;
+import
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.IntervalYearArgFunction;
import
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.InvalidPassThroughTimersFunction;
import
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.InvalidRowKindFunction;
import
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.InvalidRowSemanticTableTimersFunction;
@@ -385,6 +387,30 @@ public class ProcessTableFunctionTestPrograms {
.runSql("INSERT INTO sink SELECT * FROM f()")
.build();
+ public static final TableTestProgram PROCESS_INTERVAL_DAY_ARGS =
+ TableTestProgram.of("process-interval-day-args", "interval
argument")
+ .setupTemporarySystemFunction("f",
IntervalDayArgFunction.class)
+ .setupSql(BASIC_VALUES)
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema(BASE_SINK_SCHEMA)
+ .consumedValues("+I[{PT1S}]")
+ .build())
+ .runSql("INSERT INTO sink SELECT * FROM f(d => INTERVAL
'1' SECOND)")
+ .build();
+
+ public static final TableTestProgram PROCESS_INTERVAL_YEAR_ARGS =
+ TableTestProgram.of("process-interval-year-args", "interval
argument")
+ .setupTemporarySystemFunction("f",
IntervalYearArgFunction.class)
+ .setupSql(BASIC_VALUES)
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema(BASE_SINK_SCHEMA)
+ .consumedValues("+I[{P1Y}]")
+ .build())
+ .runSql("INSERT INTO sink SELECT * FROM f(p => INTERVAL
'1' YEAR)")
+ .build();
+
public static final TableTestProgram
PROCESS_ROW_SEMANTIC_TABLE_PASS_THROUGH =
TableTestProgram.of("process-row-pass-through", "pass columns
through enabled")
.setupTemporarySystemFunction("f",
RowSemanticTablePassThroughFunction.class)
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
index d20dc3fd70e..9c011a7b6aa 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
@@ -42,8 +42,10 @@ import org.apache.flink.types.ColumnList;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
+import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
+import java.time.Period;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashMap;
@@ -336,6 +338,20 @@ public class ProcessTableFunctionTestUtils {
}
}
+ /** Testing function. */
+ public static class IntervalDayArgFunction extends
AppendProcessTableFunctionBase {
+ public void eval(Duration d) {
+ collectObjects(d);
+ }
+ }
+
+ /** Testing function. */
+ public static class IntervalYearArgFunction extends
AppendProcessTableFunctionBase {
+ public void eval(Period p) {
+ collectObjects(p);
+ }
+ }
+
/** Testing function. */
public static class RowSemanticTablePassThroughFunction extends
AppendProcessTableFunctionBase {
public void eval(
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java
index 184df1bccf7..659ee47cb97 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java
@@ -28,6 +28,8 @@ import org.apache.flink.table.functions.UserDefinedFunction;
import
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.AppendProcessTableFunctionBase;
import
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.DescriptorFunction;
import
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.EmptyArgFunction;
+import
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.IntervalDayArgFunction;
+import
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.IntervalYearArgFunction;
import
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.InvalidUpdatingSemanticsFunction;
import
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.MultiInputFunction;
import
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.NoSystemArgsScalarFunction;
@@ -149,6 +151,18 @@ public class ProcessTableFunctionTest extends
TableTestBase {
util.verifyRelPlan("SELECT * FROM f(uid => 'my-ptf')");
}
+ @Test
+ void testIntervalDayArgs() {
+ util.addTemporarySystemFunction("f", IntervalDayArgFunction.class);
+ util.verifyRelPlan("SELECT * FROM f(d => INTERVAL '1' SECOND)");
+ }
+
+ @Test
+ void testIntervalYearArgs() {
+ util.addTemporarySystemFunction("f", IntervalYearArgFunction.class);
+ util.verifyRelPlan("SELECT * FROM f(p => INTERVAL '1' YEAR)");
+ }
+
@Test
void testSetSemanticTablePassThroughColumns() {
util.addTemporarySystemFunction("f",
SetSemanticTablePassThroughFunction.class);
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml
index 98094f9591a..55fa321f1eb 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml
@@ -68,6 +68,40 @@ LogicalProject(out=[$0])
<![CDATA[
ProcessTableFunction(invocation=[f(DEFAULT(), _UTF-16LE'my-ptf')],
uid=[my-ptf], select=[out], rowType=[RecordType(VARCHAR(2147483647) out)])
+- Values(tuples=[[{ }]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testIntervalDayArgs">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM f(d => INTERVAL '1' SECOND)]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(out=[$0])
++- LogicalTableFunctionScan(invocation=[f(1000:INTERVAL SECOND, DEFAULT(),
DEFAULT())], rowType=[RecordType(VARCHAR(2147483647) out)])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+ProcessTableFunction(invocation=[f(1000:INTERVAL SECOND, DEFAULT(),
DEFAULT())], uid=[null], select=[out], rowType=[RecordType(VARCHAR(2147483647)
out)])
++- Values(tuples=[[{ }]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testIntervalYearArgs">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM f(p => INTERVAL '1' YEAR)]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(out=[$0])
++- LogicalTableFunctionScan(invocation=[f(12:INTERVAL YEAR, DEFAULT(),
DEFAULT())], rowType=[RecordType(VARCHAR(2147483647) out)])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+ProcessTableFunction(invocation=[f(12:INTERVAL YEAR, DEFAULT(), DEFAULT())],
uid=[null], select=[out], rowType=[RecordType(VARCHAR(2147483647) out)])
++- Values(tuples=[[{ }]])
]]>
</Resource>
</TestCase>