This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 9a858e7b489 [FLINK-38200][table] Allow to use `FIRST_VALUE` and `LAST_VALUE` with other types 9a858e7b489 is described below commit 9a858e7b489fca1ecf9775c21f5d17d9504bada7 Author: Sergey Nuyanzin <snuyan...@gmail.com> AuthorDate: Wed Aug 6 17:21:30 2025 +0200 [FLINK-38200][table] Allow to use `FIRST_VALUE` and `LAST_VALUE` with other types --- .../planner/plan/utils/AggFunctionFactory.scala | 40 +--- .../planner/functions/MiscAggFunctionITCase.java | 226 ++++++++++++++++++++- 2 files changed, 229 insertions(+), 37 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala index 2e74d111a54..99560f83478 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala @@ -568,25 +568,9 @@ class AggFunctionFactory( index: Int): UserDefinedFunction = { val valueType = argTypes(0) if (aggCallNeedRetractions(index)) { - valueType.getTypeRoot match { - case TINYINT | SMALLINT | INTEGER | BIGINT | FLOAT | DOUBLE | BOOLEAN | VARCHAR | DECIMAL | - VARIANT => - new FirstValueWithRetractAggFunction(valueType) - case t => - throw new TableException( - s"FIRST_VALUE with retract aggregate function does not " + - s"support type: ''$t''.\nPlease re-check the data type.") - } + new FirstValueWithRetractAggFunction(valueType) } else { - valueType.getTypeRoot match { - case TINYINT | SMALLINT | INTEGER | BIGINT | FLOAT | DOUBLE | BOOLEAN | VARCHAR | DECIMAL | - VARIANT => - new FirstValueAggFunction(valueType) - case t => - throw new TableException( - s"FIRST_VALUE aggregate function does not support " + - s"type: ''$t''.\nPlease re-check the data type.") - } + new FirstValueAggFunction(valueType) } } @@ -595,25 +579,9 @@ class AggFunctionFactory( index: Int): UserDefinedFunction = { val valueType = argTypes(0) if (aggCallNeedRetractions(index)) { - valueType.getTypeRoot match { - case TINYINT | SMALLINT | INTEGER | BIGINT | FLOAT | DOUBLE | BOOLEAN | VARCHAR | DECIMAL | - VARIANT => - new LastValueWithRetractAggFunction(valueType) - case t => - throw new TableException( - s"LAST_VALUE with retract aggregate function does not " + - s"support type: ''$t''.\nPlease re-check the data type.") - } + new LastValueWithRetractAggFunction(valueType) } else { - valueType.getTypeRoot match { - case TINYINT | SMALLINT | INTEGER | BIGINT | FLOAT | DOUBLE | BOOLEAN | VARCHAR | DECIMAL | - VARIANT => - new LastValueAggFunction(valueType) - case t => - throw new TableException( - s"LAST_VALUE aggregate function does not support " + - s"type: ''$t''.\nPlease re-check the data type.") - } + new LastValueAggFunction(valueType) } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/MiscAggFunctionITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/MiscAggFunctionITCase.java index 50a7ca55e11..16d23ef6e22 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/MiscAggFunctionITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/MiscAggFunctionITCase.java @@ -18,20 +18,46 @@ package org.apache.flink.table.planner.functions; +import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableRuntimeException; import org.apache.flink.types.Row; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.Period; +import java.time.ZoneOffset; +import java.time.temporal.ChronoUnit; import java.util.Arrays; +import java.util.Collections; import java.util.stream.Stream; +import static org.apache.flink.table.api.DataTypes.ARRAY; +import static org.apache.flink.table.api.DataTypes.DATE; import static org.apache.flink.table.api.DataTypes.INT; +import static org.apache.flink.table.api.DataTypes.INTERVAL; +import static org.apache.flink.table.api.DataTypes.MONTH; import static org.apache.flink.table.api.DataTypes.ROW; +import static org.apache.flink.table.api.DataTypes.SECOND; import static org.apache.flink.table.api.DataTypes.STRING; +import static org.apache.flink.table.api.DataTypes.TIME; +import static org.apache.flink.table.api.DataTypes.TIMESTAMP; +import static org.apache.flink.table.api.DataTypes.TIMESTAMP_LTZ; +import static org.apache.flink.table.api.Expressions.$; import static org.apache.flink.types.RowKind.INSERT; /** Tests for built-in ARRAY_AGG aggregation functions. */ class MiscAggFunctionITCase extends BuiltInAggregateFunctionTestBase { + private static final ZoneOffset TEST_OFFSET = ZoneOffset.ofHoursMinutes(-1, -20); + private static final LocalDate DEFAULT_DATE = LocalDate.parse("2021-09-24"); + + private static final LocalTime DEFAULT_TIME = LocalTime.parse("12:34:56.123"); + private static final LocalDateTime DEFAULT_TIMESTAMP = + LocalDateTime.parse("2021-09-24T12:34:56.1234567"); + @Override Stream<TestSpec> getTestCaseSpecs() { return Stream.of( @@ -49,6 +75,204 @@ class MiscAggFunctionITCase extends BuiltInAggregateFunctionTestBase { + " GROUP BY f0", ROW(STRING(), INT()), TableRuntimeException.class, - "SingleValueAggFunction received more than one element.")); + "SingleValueAggFunction received more than one element."), + TestSpec.forExpression("FIRST_VALUE") + .withSource( + ROW( + STRING(), + TIME(), + DATE(), + TIMESTAMP(), + TIMESTAMP_LTZ(), + INTERVAL(DataTypes.MONTH()), + INTERVAL(SECOND(3)), + ARRAY(INT())), + Arrays.asList( + Row.ofKind( + INSERT, + "A", + LocalTime.NOON, + LocalDate.EPOCH, + LocalDateTime.of(LocalDate.EPOCH, LocalTime.NOON), + fromLocalTZ("1900-09-24T22:34:56.1"), + Period.of(0, 2, 4), + Duration.of(123, ChronoUnit.MINUTES), + new Integer[] {1, 2, 3}), + Row.ofKind( + INSERT, + "A", + DEFAULT_TIME, + DEFAULT_DATE, + DEFAULT_TIMESTAMP, + fromLocalTZ("2100-09-24T22:34:56.1"), + Period.of(0, 5, 4), + Duration.of(321, ChronoUnit.MINUTES), + new Integer[] {4, 5, 6}), + Row.ofKind( + INSERT, + "B", + LocalTime.NOON, + LocalDate.EPOCH, + LocalDateTime.of( + LocalDate.EPOCH, LocalTime.MIDNIGHT), + fromLocalTZ("1900-09-24T22:34:56.1"), + Period.of(0, 3, 0), + Duration.of(123, ChronoUnit.MINUTES), + new Integer[] {7, 8, 9}))) + .testResult( + source -> + "SELECT f0, FIRST_VALUE(f1), FIRST_VALUE(f2), FIRST_VALUE(f3), FIRST_VALUE(f4), " + + "FIRST_VALUE(f5), FIRST_VALUE(f6), FIRST_VALUE(f7) FROM " + + source + + " GROUP BY f0", + TableApiAggSpec.groupBySelect( + Collections.singletonList($("f0")), + $("f0"), + $("f1").firstValue(), + $("f2").firstValue(), + $("f3").firstValue(), + $("f4").firstValue(), + $("f5").firstValue(), + $("f6").firstValue(), + $("f7").firstValue()), + ROW( + STRING(), + TIME(), + DATE(), + TIMESTAMP(), + TIMESTAMP_LTZ(), + INTERVAL(MONTH()), + INTERVAL(SECOND(3)), + ARRAY(INT())), + ROW( + STRING(), + TIME(), + DATE(), + TIMESTAMP(), + TIMESTAMP_LTZ(), + INTERVAL(MONTH()), + INTERVAL(SECOND(3)), + ARRAY(INT())), + Arrays.asList( + Row.of( + "A", + LocalTime.NOON, + LocalDate.EPOCH, + LocalDateTime.of(LocalDate.EPOCH, LocalTime.NOON), + fromLocalTZ("1900-09-24T22:34:56.1"), + Period.of(0, 2, 0), + Duration.of(123, ChronoUnit.MINUTES), + new Integer[] {1, 2, 3}), + Row.of( + "B", + LocalTime.NOON, + LocalDate.EPOCH, + LocalDateTime.of( + LocalDate.EPOCH, LocalTime.MIDNIGHT), + fromLocalTZ("1900-09-24T22:34:56.1"), + Period.of(0, 3, 0), + Duration.of(123, ChronoUnit.MINUTES), + new Integer[] {7, 8, 9}))), + TestSpec.forExpression("LAST_VALUE") + .withSource( + ROW( + STRING(), + TIME(), + DATE(), + TIMESTAMP(), + TIMESTAMP_LTZ(), + INTERVAL(DataTypes.MONTH()), + INTERVAL(SECOND(3)), + ARRAY(INT())), + Arrays.asList( + Row.ofKind( + INSERT, + "A", + LocalTime.NOON, + LocalDate.EPOCH, + LocalDateTime.of(LocalDate.EPOCH, LocalTime.NOON), + fromLocalTZ("1900-09-24T22:34:56.1"), + Period.of(0, 2, 0), + Duration.of(123, ChronoUnit.MINUTES), + new Integer[] {1, 2, 3}), + Row.ofKind( + INSERT, + "A", + DEFAULT_TIME, + DEFAULT_DATE, + DEFAULT_TIMESTAMP, + fromLocalTZ("2100-09-24T22:34:56.1"), + Period.of(0, 5, 4), + Duration.of(321, ChronoUnit.MINUTES), + new Integer[] {4, 5, 6}), + Row.ofKind( + INSERT, + "B", + LocalTime.NOON, + LocalDate.EPOCH, + LocalDateTime.of( + LocalDate.EPOCH, LocalTime.MIDNIGHT), + fromLocalTZ("1900-09-24T22:34:56.1"), + Period.of(0, 3, 0), + Duration.of(123, ChronoUnit.MINUTES), + new Integer[] {7, 8, 9}))) + .testResult( + source -> + "SELECT f0, LAST_VALUE(f1), LAST_VALUE(f2), LAST_VALUE(f3), LAST_VALUE(f4), " + + "LAST_VALUE(f5), LAST_VALUE(f6), LAST_VALUE(f7) FROM " + + source + + " GROUP BY f0", + TableApiAggSpec.groupBySelect( + Collections.singletonList($("f0")), + $("f0"), + $("f1").lastValue(), + $("f2").lastValue(), + $("f3").lastValue(), + $("f4").lastValue(), + $("f5").lastValue(), + $("f6").lastValue(), + $("f7").lastValue()), + ROW( + STRING(), + TIME(), + DATE(), + TIMESTAMP(), + TIMESTAMP_LTZ(), + INTERVAL(MONTH()), + INTERVAL(SECOND(3)), + ARRAY(INT())), + ROW( + STRING(), + TIME(), + DATE(), + TIMESTAMP(), + TIMESTAMP_LTZ(), + INTERVAL(MONTH()), + INTERVAL(SECOND(3)), + ARRAY(INT())), + Arrays.asList( + Row.of( + "A", + DEFAULT_TIME, + DEFAULT_DATE, + DEFAULT_TIMESTAMP, + fromLocalTZ("2100-09-24T22:34:56.1"), + Period.of(0, 5, 0), + Duration.of(321, ChronoUnit.MINUTES), + new Integer[] {4, 5, 6}), + Row.of( + "B", + LocalTime.NOON, + LocalDate.EPOCH, + LocalDateTime.of( + LocalDate.EPOCH, LocalTime.MIDNIGHT), + fromLocalTZ("1900-09-24T22:34:56.1"), + Period.of(0, 3, 0), + Duration.of(123, ChronoUnit.MINUTES), + new Integer[] {7, 8, 9})))); + } + + private static Instant fromLocalTZ(String str) { + return LocalDateTime.parse(str).toInstant(TEST_OFFSET); } }