This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new fdc3395d5e2 [FLINK-37618][table-planner] Fix PTFs INTERVAL argument
fdc3395d5e2 is described below

commit fdc3395d5e24daab52a40414434b4458f53bd400
Author: Juntao Zhang <[email protected]>
AuthorDate: Thu Nov 27 23:28:06 2025 +0800

    [FLINK-37618][table-planner] Fix PTFs INTERVAL argument
    
    This closes #26410.
---
 .../types/logical/utils/LogicalTypeCasts.java      | 30 +++++++++++++++++++
 .../table/types/LogicalTypeCastAvoidanceTest.java  | 12 ++++++++
 .../stream/ProcessTableFunctionSemanticTests.java  |  2 ++
 .../stream/ProcessTableFunctionTestPrograms.java   | 26 +++++++++++++++++
 .../exec/stream/ProcessTableFunctionTestUtils.java | 16 ++++++++++
 .../plan/stream/sql/ProcessTableFunctionTest.java  | 14 +++++++++
 .../plan/stream/sql/ProcessTableFunctionTest.xml   | 34 ++++++++++++++++++++++
 7 files changed, 134 insertions(+)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java
index 54c9c7d1260..310bc71e29b 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.types.logical.utils;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
 import org.apache.flink.table.types.logical.DistinctType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
@@ -28,6 +29,7 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.StructuredType;
 import org.apache.flink.table.types.logical.VarBinaryType;
 import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
 
 import java.util.Arrays;
 import java.util.HashMap;
@@ -74,7 +76,10 @@ import static 
org.apache.flink.table.types.logical.LogicalTypeRoot.TIME_WITHOUT_
 import static org.apache.flink.table.types.logical.LogicalTypeRoot.TINYINT;
 import static org.apache.flink.table.types.logical.LogicalTypeRoot.VARBINARY;
 import static org.apache.flink.table.types.logical.LogicalTypeRoot.VARCHAR;
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getDayPrecision;
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFractionalPrecision;
 import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getLength;
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getYearPrecision;
 import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isSingleFieldInterval;
 
 /**
@@ -538,6 +543,31 @@ public final class LogicalTypeCasts {
             this.sourceType = sourceType;
         }
 
+        @Override
+        public Boolean visit(YearMonthIntervalType targetType) {
+            if (sourceType.isNullable() && !targetType.isNullable()) {
+                return false;
+            }
+            if (sourceType.is(LogicalTypeRoot.INTERVAL_YEAR_MONTH)
+                    && getYearPrecision(sourceType) <= 
targetType.getYearPrecision()) {
+                return true;
+            }
+            return defaultMethod(targetType);
+        }
+
+        @Override
+        public Boolean visit(DayTimeIntervalType targetType) {
+            if (sourceType.isNullable() && !targetType.isNullable()) {
+                return false;
+            }
+            if (sourceType.is(LogicalTypeRoot.INTERVAL_DAY_TIME)
+                    && getDayPrecision(sourceType) <= 
targetType.getDayPrecision()
+                    && getFractionalPrecision(sourceType) <= 
targetType.getFractionalPrecision()) {
+                return true;
+            }
+            return defaultMethod(targetType);
+        }
+
         @Override
         public Boolean visit(VarCharType targetType) {
             if (sourceType.isNullable() && !targetType.isNullable()) {
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeCastAvoidanceTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeCastAvoidanceTest.java
index 5a8bf94e969..7c214fc1fbb 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeCastAvoidanceTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeCastAvoidanceTest.java
@@ -100,12 +100,24 @@ class LogicalTypeCastAvoidanceTest {
                         new YearMonthIntervalType(
                                 
YearMonthIntervalType.YearMonthResolution.YEAR_TO_MONTH, 2),
                         new 
YearMonthIntervalType(YearMonthIntervalType.YearMonthResolution.MONTH),
+                        true),
+                of(
+                        new YearMonthIntervalType(
+                                
YearMonthIntervalType.YearMonthResolution.YEAR_TO_MONTH, 4),
+                        new YearMonthIntervalType(
+                                
YearMonthIntervalType.YearMonthResolution.MONTH, 2),
                         false),
                 of(
                         new DayTimeIntervalType(
                                 
DayTimeIntervalType.DayTimeResolution.DAY_TO_SECOND, 2, 6),
                         new DayTimeIntervalType(
                                 
DayTimeIntervalType.DayTimeResolution.DAY_TO_SECOND, 2, 7),
+                        true),
+                of(
+                        new DayTimeIntervalType(
+                                
DayTimeIntervalType.DayTimeResolution.DAY_TO_SECOND, 2, 7),
+                        new DayTimeIntervalType(
+                                
DayTimeIntervalType.DayTimeResolution.DAY_TO_SECOND, 2, 6),
                         false),
                 of(new ArrayType(new TimestampType()), new ArrayType(new 
SmallIntType()), false),
                 of(
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java
index 72a95b43696..27f0b42c317 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java
@@ -56,6 +56,8 @@ public class ProcessTableFunctionSemanticTests extends 
SemanticTestBase {
                 
ProcessTableFunctionTestPrograms.PROCESS_TYPED_SET_SEMANTIC_TABLE,
                 
ProcessTableFunctionTestPrograms.PROCESS_TYPED_SET_SEMANTIC_TABLE_TABLE_API,
                 ProcessTableFunctionTestPrograms.PROCESS_POJO_ARGS,
+                ProcessTableFunctionTestPrograms.PROCESS_INTERVAL_DAY_ARGS,
+                ProcessTableFunctionTestPrograms.PROCESS_INTERVAL_YEAR_ARGS,
                 ProcessTableFunctionTestPrograms.PROCESS_EMPTY_ARGS,
                 
ProcessTableFunctionTestPrograms.PROCESS_ROW_SEMANTIC_TABLE_PASS_THROUGH,
                 
ProcessTableFunctionTestPrograms.PROCESS_SET_SEMANTIC_TABLE_PASS_THROUGH,
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java
index 9fadeb81d94..830d4fd5c29 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java
@@ -28,6 +28,8 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctio
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.ContextFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.DescriptorFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.EmptyArgFunction;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.IntervalDayArgFunction;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.IntervalYearArgFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.InvalidPassThroughTimersFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.InvalidRowKindFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.InvalidRowSemanticTableTimersFunction;
@@ -385,6 +387,30 @@ public class ProcessTableFunctionTestPrograms {
                     .runSql("INSERT INTO sink SELECT * FROM f()")
                     .build();
 
+    public static final TableTestProgram PROCESS_INTERVAL_DAY_ARGS =
+            TableTestProgram.of("process-interval-day-args", "interval 
argument")
+                    .setupTemporarySystemFunction("f", 
IntervalDayArgFunction.class)
+                    .setupSql(BASIC_VALUES)
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema(BASE_SINK_SCHEMA)
+                                    .consumedValues("+I[{PT1S}]")
+                                    .build())
+                    .runSql("INSERT INTO sink SELECT * FROM f(d => INTERVAL 
'1' SECOND)")
+                    .build();
+
+    public static final TableTestProgram PROCESS_INTERVAL_YEAR_ARGS =
+            TableTestProgram.of("process-interval-year-args", "interval 
argument")
+                    .setupTemporarySystemFunction("f", 
IntervalYearArgFunction.class)
+                    .setupSql(BASIC_VALUES)
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema(BASE_SINK_SCHEMA)
+                                    .consumedValues("+I[{P1Y}]")
+                                    .build())
+                    .runSql("INSERT INTO sink SELECT * FROM f(p => INTERVAL 
'1' YEAR)")
+                    .build();
+
     public static final TableTestProgram 
PROCESS_ROW_SEMANTIC_TABLE_PASS_THROUGH =
             TableTestProgram.of("process-row-pass-through", "pass columns 
through enabled")
                     .setupTemporarySystemFunction("f", 
RowSemanticTablePassThroughFunction.class)
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
index d20dc3fd70e..9c011a7b6aa 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
@@ -42,8 +42,10 @@ import org.apache.flink.types.ColumnList;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 
+import java.time.Duration;
 import java.time.Instant;
 import java.time.LocalDateTime;
+import java.time.Period;
 import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -336,6 +338,20 @@ public class ProcessTableFunctionTestUtils {
         }
     }
 
+    /** Testing function. */
+    public static class IntervalDayArgFunction extends 
AppendProcessTableFunctionBase {
+        public void eval(Duration d) {
+            collectObjects(d);
+        }
+    }
+
+    /** Testing function. */
+    public static class IntervalYearArgFunction extends 
AppendProcessTableFunctionBase {
+        public void eval(Period p) {
+            collectObjects(p);
+        }
+    }
+
     /** Testing function. */
     public static class RowSemanticTablePassThroughFunction extends 
AppendProcessTableFunctionBase {
         public void eval(
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java
index 184df1bccf7..659ee47cb97 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java
@@ -28,6 +28,8 @@ import org.apache.flink.table.functions.UserDefinedFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.AppendProcessTableFunctionBase;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.DescriptorFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.EmptyArgFunction;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.IntervalDayArgFunction;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.IntervalYearArgFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.InvalidUpdatingSemanticsFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.MultiInputFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.NoSystemArgsScalarFunction;
@@ -149,6 +151,18 @@ public class ProcessTableFunctionTest extends 
TableTestBase {
         util.verifyRelPlan("SELECT * FROM f(uid => 'my-ptf')");
     }
 
+    @Test
+    void testIntervalDayArgs() {
+        util.addTemporarySystemFunction("f", IntervalDayArgFunction.class);
+        util.verifyRelPlan("SELECT * FROM f(d => INTERVAL '1' SECOND)");
+    }
+
+    @Test
+    void testIntervalYearArgs() {
+        util.addTemporarySystemFunction("f", IntervalYearArgFunction.class);
+        util.verifyRelPlan("SELECT * FROM f(p => INTERVAL '1' YEAR)");
+    }
+
     @Test
     void testSetSemanticTablePassThroughColumns() {
         util.addTemporarySystemFunction("f", 
SetSemanticTablePassThroughFunction.class);
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml
index 98094f9591a..55fa321f1eb 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml
@@ -68,6 +68,40 @@ LogicalProject(out=[$0])
       <![CDATA[
 ProcessTableFunction(invocation=[f(DEFAULT(), _UTF-16LE'my-ptf')], 
uid=[my-ptf], select=[out], rowType=[RecordType(VARCHAR(2147483647) out)])
 +- Values(tuples=[[{  }]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testIntervalDayArgs">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM f(d => INTERVAL '1' SECOND)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(out=[$0])
++- LogicalTableFunctionScan(invocation=[f(1000:INTERVAL SECOND, DEFAULT(), 
DEFAULT())], rowType=[RecordType(VARCHAR(2147483647) out)])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+ProcessTableFunction(invocation=[f(1000:INTERVAL SECOND, DEFAULT(), 
DEFAULT())], uid=[null], select=[out], rowType=[RecordType(VARCHAR(2147483647) 
out)])
++- Values(tuples=[[{  }]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testIntervalYearArgs">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM f(p => INTERVAL '1' YEAR)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(out=[$0])
++- LogicalTableFunctionScan(invocation=[f(12:INTERVAL YEAR, DEFAULT(), 
DEFAULT())], rowType=[RecordType(VARCHAR(2147483647) out)])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+ProcessTableFunction(invocation=[f(12:INTERVAL YEAR, DEFAULT(), DEFAULT())], 
uid=[null], select=[out], rowType=[RecordType(VARCHAR(2147483647) out)])
++- Values(tuples=[[{  }]])
 ]]>
     </Resource>
   </TestCase>

Reply via email to