This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a858e7b489 [FLINK-38200][table] Allow to use `FIRST_VALUE` and 
`LAST_VALUE` with other types
9a858e7b489 is described below

commit 9a858e7b489fca1ecf9775c21f5d17d9504bada7
Author: Sergey Nuyanzin <snuyan...@gmail.com>
AuthorDate: Wed Aug 6 17:21:30 2025 +0200

    [FLINK-38200][table] Allow to use `FIRST_VALUE` and `LAST_VALUE` with other 
types
---
 .../planner/plan/utils/AggFunctionFactory.scala    |  40 +---
 .../planner/functions/MiscAggFunctionITCase.java   | 226 ++++++++++++++++++++-
 2 files changed, 229 insertions(+), 37 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
index 2e74d111a54..99560f83478 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
@@ -568,25 +568,9 @@ class AggFunctionFactory(
       index: Int): UserDefinedFunction = {
     val valueType = argTypes(0)
     if (aggCallNeedRetractions(index)) {
-      valueType.getTypeRoot match {
-        case TINYINT | SMALLINT | INTEGER | BIGINT | FLOAT | DOUBLE | BOOLEAN 
| VARCHAR | DECIMAL |
-            VARIANT =>
-          new FirstValueWithRetractAggFunction(valueType)
-        case t =>
-          throw new TableException(
-            s"FIRST_VALUE with retract aggregate function does not " +
-              s"support type: ''$t''.\nPlease re-check the data type.")
-      }
+      new FirstValueWithRetractAggFunction(valueType)
     } else {
-      valueType.getTypeRoot match {
-        case TINYINT | SMALLINT | INTEGER | BIGINT | FLOAT | DOUBLE | BOOLEAN 
| VARCHAR | DECIMAL |
-            VARIANT =>
-          new FirstValueAggFunction(valueType)
-        case t =>
-          throw new TableException(
-            s"FIRST_VALUE aggregate function does not support " +
-              s"type: ''$t''.\nPlease re-check the data type.")
-      }
+      new FirstValueAggFunction(valueType)
     }
   }
 
@@ -595,25 +579,9 @@ class AggFunctionFactory(
       index: Int): UserDefinedFunction = {
     val valueType = argTypes(0)
     if (aggCallNeedRetractions(index)) {
-      valueType.getTypeRoot match {
-        case TINYINT | SMALLINT | INTEGER | BIGINT | FLOAT | DOUBLE | BOOLEAN 
| VARCHAR | DECIMAL |
-            VARIANT =>
-          new LastValueWithRetractAggFunction(valueType)
-        case t =>
-          throw new TableException(
-            s"LAST_VALUE with retract aggregate function does not " +
-              s"support type: ''$t''.\nPlease re-check the data type.")
-      }
+      new LastValueWithRetractAggFunction(valueType)
     } else {
-      valueType.getTypeRoot match {
-        case TINYINT | SMALLINT | INTEGER | BIGINT | FLOAT | DOUBLE | BOOLEAN 
| VARCHAR | DECIMAL |
-            VARIANT =>
-          new LastValueAggFunction(valueType)
-        case t =>
-          throw new TableException(
-            s"LAST_VALUE aggregate function does not support " +
-              s"type: ''$t''.\nPlease re-check the data type.")
-      }
+      new LastValueAggFunction(valueType)
     }
   }
 
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/MiscAggFunctionITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/MiscAggFunctionITCase.java
index 50a7ca55e11..16d23ef6e22 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/MiscAggFunctionITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/MiscAggFunctionITCase.java
@@ -18,20 +18,46 @@
 
 package org.apache.flink.table.planner.functions;
 
+import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableRuntimeException;
 import org.apache.flink.types.Row;
 
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.Period;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.stream.Stream;
 
+import static org.apache.flink.table.api.DataTypes.ARRAY;
+import static org.apache.flink.table.api.DataTypes.DATE;
 import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.INTERVAL;
+import static org.apache.flink.table.api.DataTypes.MONTH;
 import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.SECOND;
 import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIME;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_LTZ;
+import static org.apache.flink.table.api.Expressions.$;
 import static org.apache.flink.types.RowKind.INSERT;
 
 /** Tests for built-in ARRAY_AGG aggregation functions. */
 class MiscAggFunctionITCase extends BuiltInAggregateFunctionTestBase {
 
+    private static final ZoneOffset TEST_OFFSET = 
ZoneOffset.ofHoursMinutes(-1, -20);
+    private static final LocalDate DEFAULT_DATE = 
LocalDate.parse("2021-09-24");
+
+    private static final LocalTime DEFAULT_TIME = 
LocalTime.parse("12:34:56.123");
+    private static final LocalDateTime DEFAULT_TIMESTAMP =
+            LocalDateTime.parse("2021-09-24T12:34:56.1234567");
+
     @Override
     Stream<TestSpec> getTestCaseSpecs() {
         return Stream.of(
@@ -49,6 +75,204 @@ class MiscAggFunctionITCase extends 
BuiltInAggregateFunctionTestBase {
                                                 + " GROUP BY f0",
                                 ROW(STRING(), INT()),
                                 TableRuntimeException.class,
-                                "SingleValueAggFunction received more than one 
element."));
+                                "SingleValueAggFunction received more than one 
element."),
+                TestSpec.forExpression("FIRST_VALUE")
+                        .withSource(
+                                ROW(
+                                        STRING(),
+                                        TIME(),
+                                        DATE(),
+                                        TIMESTAMP(),
+                                        TIMESTAMP_LTZ(),
+                                        INTERVAL(DataTypes.MONTH()),
+                                        INTERVAL(SECOND(3)),
+                                        ARRAY(INT())),
+                                Arrays.asList(
+                                        Row.ofKind(
+                                                INSERT,
+                                                "A",
+                                                LocalTime.NOON,
+                                                LocalDate.EPOCH,
+                                                
LocalDateTime.of(LocalDate.EPOCH, LocalTime.NOON),
+                                                
fromLocalTZ("1900-09-24T22:34:56.1"),
+                                                Period.of(0, 2, 4),
+                                                Duration.of(123, 
ChronoUnit.MINUTES),
+                                                new Integer[] {1, 2, 3}),
+                                        Row.ofKind(
+                                                INSERT,
+                                                "A",
+                                                DEFAULT_TIME,
+                                                DEFAULT_DATE,
+                                                DEFAULT_TIMESTAMP,
+                                                
fromLocalTZ("2100-09-24T22:34:56.1"),
+                                                Period.of(0, 5, 4),
+                                                Duration.of(321, 
ChronoUnit.MINUTES),
+                                                new Integer[] {4, 5, 6}),
+                                        Row.ofKind(
+                                                INSERT,
+                                                "B",
+                                                LocalTime.NOON,
+                                                LocalDate.EPOCH,
+                                                LocalDateTime.of(
+                                                        LocalDate.EPOCH, 
LocalTime.MIDNIGHT),
+                                                
fromLocalTZ("1900-09-24T22:34:56.1"),
+                                                Period.of(0, 3, 0),
+                                                Duration.of(123, 
ChronoUnit.MINUTES),
+                                                new Integer[] {7, 8, 9})))
+                        .testResult(
+                                source ->
+                                        "SELECT f0, FIRST_VALUE(f1), 
FIRST_VALUE(f2), FIRST_VALUE(f3), FIRST_VALUE(f4), "
+                                                + "FIRST_VALUE(f5), 
FIRST_VALUE(f6), FIRST_VALUE(f7) FROM "
+                                                + source
+                                                + " GROUP BY f0",
+                                TableApiAggSpec.groupBySelect(
+                                        Collections.singletonList($("f0")),
+                                        $("f0"),
+                                        $("f1").firstValue(),
+                                        $("f2").firstValue(),
+                                        $("f3").firstValue(),
+                                        $("f4").firstValue(),
+                                        $("f5").firstValue(),
+                                        $("f6").firstValue(),
+                                        $("f7").firstValue()),
+                                ROW(
+                                        STRING(),
+                                        TIME(),
+                                        DATE(),
+                                        TIMESTAMP(),
+                                        TIMESTAMP_LTZ(),
+                                        INTERVAL(MONTH()),
+                                        INTERVAL(SECOND(3)),
+                                        ARRAY(INT())),
+                                ROW(
+                                        STRING(),
+                                        TIME(),
+                                        DATE(),
+                                        TIMESTAMP(),
+                                        TIMESTAMP_LTZ(),
+                                        INTERVAL(MONTH()),
+                                        INTERVAL(SECOND(3)),
+                                        ARRAY(INT())),
+                                Arrays.asList(
+                                        Row.of(
+                                                "A",
+                                                LocalTime.NOON,
+                                                LocalDate.EPOCH,
+                                                
LocalDateTime.of(LocalDate.EPOCH, LocalTime.NOON),
+                                                
fromLocalTZ("1900-09-24T22:34:56.1"),
+                                                Period.of(0, 2, 0),
+                                                Duration.of(123, 
ChronoUnit.MINUTES),
+                                                new Integer[] {1, 2, 3}),
+                                        Row.of(
+                                                "B",
+                                                LocalTime.NOON,
+                                                LocalDate.EPOCH,
+                                                LocalDateTime.of(
+                                                        LocalDate.EPOCH, 
LocalTime.MIDNIGHT),
+                                                
fromLocalTZ("1900-09-24T22:34:56.1"),
+                                                Period.of(0, 3, 0),
+                                                Duration.of(123, 
ChronoUnit.MINUTES),
+                                                new Integer[] {7, 8, 9}))),
+                TestSpec.forExpression("LAST_VALUE")
+                        .withSource(
+                                ROW(
+                                        STRING(),
+                                        TIME(),
+                                        DATE(),
+                                        TIMESTAMP(),
+                                        TIMESTAMP_LTZ(),
+                                        INTERVAL(DataTypes.MONTH()),
+                                        INTERVAL(SECOND(3)),
+                                        ARRAY(INT())),
+                                Arrays.asList(
+                                        Row.ofKind(
+                                                INSERT,
+                                                "A",
+                                                LocalTime.NOON,
+                                                LocalDate.EPOCH,
+                                                
LocalDateTime.of(LocalDate.EPOCH, LocalTime.NOON),
+                                                
fromLocalTZ("1900-09-24T22:34:56.1"),
+                                                Period.of(0, 2, 0),
+                                                Duration.of(123, 
ChronoUnit.MINUTES),
+                                                new Integer[] {1, 2, 3}),
+                                        Row.ofKind(
+                                                INSERT,
+                                                "A",
+                                                DEFAULT_TIME,
+                                                DEFAULT_DATE,
+                                                DEFAULT_TIMESTAMP,
+                                                
fromLocalTZ("2100-09-24T22:34:56.1"),
+                                                Period.of(0, 5, 4),
+                                                Duration.of(321, 
ChronoUnit.MINUTES),
+                                                new Integer[] {4, 5, 6}),
+                                        Row.ofKind(
+                                                INSERT,
+                                                "B",
+                                                LocalTime.NOON,
+                                                LocalDate.EPOCH,
+                                                LocalDateTime.of(
+                                                        LocalDate.EPOCH, 
LocalTime.MIDNIGHT),
+                                                
fromLocalTZ("1900-09-24T22:34:56.1"),
+                                                Period.of(0, 3, 0),
+                                                Duration.of(123, 
ChronoUnit.MINUTES),
+                                                new Integer[] {7, 8, 9})))
+                        .testResult(
+                                source ->
+                                        "SELECT f0, LAST_VALUE(f1), 
LAST_VALUE(f2), LAST_VALUE(f3), LAST_VALUE(f4), "
+                                                + "LAST_VALUE(f5), 
LAST_VALUE(f6), LAST_VALUE(f7) FROM "
+                                                + source
+                                                + " GROUP BY f0",
+                                TableApiAggSpec.groupBySelect(
+                                        Collections.singletonList($("f0")),
+                                        $("f0"),
+                                        $("f1").lastValue(),
+                                        $("f2").lastValue(),
+                                        $("f3").lastValue(),
+                                        $("f4").lastValue(),
+                                        $("f5").lastValue(),
+                                        $("f6").lastValue(),
+                                        $("f7").lastValue()),
+                                ROW(
+                                        STRING(),
+                                        TIME(),
+                                        DATE(),
+                                        TIMESTAMP(),
+                                        TIMESTAMP_LTZ(),
+                                        INTERVAL(MONTH()),
+                                        INTERVAL(SECOND(3)),
+                                        ARRAY(INT())),
+                                ROW(
+                                        STRING(),
+                                        TIME(),
+                                        DATE(),
+                                        TIMESTAMP(),
+                                        TIMESTAMP_LTZ(),
+                                        INTERVAL(MONTH()),
+                                        INTERVAL(SECOND(3)),
+                                        ARRAY(INT())),
+                                Arrays.asList(
+                                        Row.of(
+                                                "A",
+                                                DEFAULT_TIME,
+                                                DEFAULT_DATE,
+                                                DEFAULT_TIMESTAMP,
+                                                
fromLocalTZ("2100-09-24T22:34:56.1"),
+                                                Period.of(0, 5, 0),
+                                                Duration.of(321, 
ChronoUnit.MINUTES),
+                                                new Integer[] {4, 5, 6}),
+                                        Row.of(
+                                                "B",
+                                                LocalTime.NOON,
+                                                LocalDate.EPOCH,
+                                                LocalDateTime.of(
+                                                        LocalDate.EPOCH, 
LocalTime.MIDNIGHT),
+                                                
fromLocalTZ("1900-09-24T22:34:56.1"),
+                                                Period.of(0, 3, 0),
+                                                Duration.of(123, 
ChronoUnit.MINUTES),
+                                                new Integer[] {7, 8, 9}))));
+    }
+
+    private static Instant fromLocalTZ(String str) {
+        return LocalDateTime.parse(str).toInstant(TEST_OFFSET);
     }
 }

Reply via email to