This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8bd215dc28ae431120e5b1a114a94ddc6e004b16 Author: Ingo Bürk <[email protected]> AuthorDate: Mon May 31 10:24:49 2021 +0200 [FLINK-22737][table-common][table-planner-blink] Introduce CURRENT_WATERMARK() This closes #16046. --- docs/data/sql_functions.yml | 10 ++- flink-python/pyflink/table/expressions.py | 28 ++++++- .../org/apache/flink/table/api/Expressions.java | 21 +++++ .../table/api/ImplicitExpressionConversions.scala | 20 +++++ .../functions/BuiltInFunctionDefinitions.java | 10 +++ .../table/types/inference/InputTypeStrategies.java | 5 ++ .../table/types/inference/TypeStrategies.java | 19 +++++ .../CurrentWatermarkInputTypeStrategy.java | 93 ++++++++++++++++++++ .../inference/InputTypeStrategiesTestBase.java | 22 ++--- .../table/types/inference/TypeStrategiesTest.java | 29 ++++++- .../CurrentWatermarkInputTypeStrategyTest.java | 80 ++++++++++++++++++ .../table/planner/codegen/ExprCodeGenerator.scala | 6 +- .../table/planner/codegen/GenerateUtils.scala | 20 +++++ .../planner/runtime/stream/sql/CalcITCase.scala | 98 ++++++++++++++++++++++ 14 files changed, 443 insertions(+), 18 deletions(-) diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml index 73abcf2..b7f040d 100644 --- a/docs/data/sql_functions.yml +++ b/docs/data/sql_functions.yml @@ -485,6 +485,14 @@ temporal: description: "Converts a epoch seconds or epoch milliseconds to a TIMESTAMP_LTZ, the valid precision is 0 or 3, the 0 represents TO_TIMESTAMP_LTZ(epochSeconds, 0), the 3 represents TO_TIMESTAMP_LTZ(epochMilliseconds, 3)." - sql: TO_TIMESTAMP(string1[, string2]) description: "Converts date time string string1 with format string2 (by default: 'yyyy-MM-dd HH:mm:ss') under the session time zone (specified by TableConfig) to a timestamp." + - sql: CURRENT_WATERMARK(rowtime) + description: | + Returns the current watermark for the given rowtime attribute, or NULL if no common watermark of all upstream operations is available at the current operation in the pipeline. + The return type of the function is inferred to match that of the provided rowtime attribute, but with an adjusted precision of 3. For example, if the rowtime attribute is TIMESTAMP_LTZ(9), the function will return TIMESTAMP_LTZ(3). + + Note that this function can return NULL, and you may have to consider this case. For example, if you want to filter out late data you can use: + + WHERE CURRENT_WATERMARK(ts) IS NULL OR ts > CURRENT_WATERMARK(ts) conditional: - sql: | @@ -682,4 +690,4 @@ aggregate: - sql: LAST_VALUE(expression) description: Returns the last value in an ordered set of values. - sql: LISTAGG(expression [, separator]) - description: Concatenates the values of string expressions and places separator values between them. The separator is not added at the end of string. The default value of separator is ','. \ No newline at end of file + description: Concatenates the values of string expressions and places separator values between them. The separator is not added at the end of string. The default value of separator is ','. diff --git a/flink-python/pyflink/table/expressions.py b/flink-python/pyflink/table/expressions.py index 0b3a60b..c169f31 100644 --- a/flink-python/pyflink/table/expressions.py +++ b/flink-python/pyflink/table/expressions.py @@ -26,10 +26,11 @@ from pyflink.util.java_utils import to_jarray, load_java_class __all__ = ['if_then_else', 'lit', 'col', 'range_', 'and_', 'or_', 'UNBOUNDED_ROW', 'UNBOUNDED_RANGE', 'CURRENT_ROW', 'CURRENT_RANGE', 'current_date', 'current_time', - 'current_timestamp', 'local_time', 'local_timestamp', 'temporal_overlaps', - 'date_format', 'timestamp_diff', 'array', 'row', 'map_', 'row_interval', 'pi', 'e', - 'rand', 'rand_integer', 'atan2', 'negative', 'concat', 'concat_ws', 'uuid', 'null_of', - 'log', 'with_columns', 'without_columns', 'call', 'call_sql'] + 'current_timestamp', 'current_watermark', 'local_time', 'local_timestamp', + 'temporal_overlaps', 'date_format', 'timestamp_diff', 'array', 'row', 'map_', + 'row_interval', 'pi', 'e', 'rand', 'rand_integer', 'atan2', 'negative', 'concat', + 'concat_ws', 'uuid', 'null_of', 'log', 'with_columns', 'without_columns', 'call', + 'call_sql'] def _leaf_op(op_name: str) -> Expression: @@ -203,6 +204,25 @@ def current_timestamp() -> Expression: return _leaf_op("currentTimestamp") +def current_watermark(rowtimeAttribute) -> Expression: + """ + Returns the current watermark for the given rowtime attribute, or NULL if no common watermark of + all upstream operations is available at the current operation in the pipeline. + + The function returns the watermark with the same type as the rowtime attribute, but with an + adjusted precision of 3. For example, if the rowtime attribute is `TIMESTAMP_LTZ(9)`, the + function will return `TIMESTAMP_LTZ(3)`. + + If no watermark has been emitted yet, the function will return `NULL`. Users must take care of + this when comparing against it, e.g. in order to filter out late data you can use + + :: + + WHERE CURRENT_WATERMARK(ts) IS NULL OR ts > CURRENT_WATERMARK(ts) + """ + return _unary_op("currentWatermark", rowtimeAttribute) + + def local_time() -> Expression: """ Returns the current SQL time in local time zone. diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java index b88be0f..45c0e28 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java @@ -212,6 +212,27 @@ public final class Expressions { } /** + * Returns the current watermark for the given rowtime attribute, or {@code NULL} if no common + * watermark of all upstream operations is available at the current operation in the pipeline. + * + * <p>The function returns the watermark with the same type as the rowtime attribute, but with + * an adjusted precision of 3. For example, if the rowtime attribute is {@link + * DataTypes#TIMESTAMP_LTZ(int) TIMESTAMP_LTZ(9)}, the function will return {@link + * DataTypes#TIMESTAMP_LTZ(int) TIMESTAMP_LTZ(3)}. + * + * <p>If no watermark has been emitted yet, the function will return {@code NULL}. Users must + * take care of this when comparing against it, e.g. in order to filter out late data you can + * use + * + * <pre>{@code + * WHERE CURRENT_WATERMARK(ts) IS NULL OR ts > CURRENT_WATERMARK(ts) + * }</pre> + */ + public static ApiExpression currentWatermark(Object rowtimeAttribute) { + return apiCall(BuiltInFunctionDefinitions.CURRENT_WATERMARK, rowtimeAttribute); + } + + /** * Returns the current SQL time in local time zone, the return type of this expression is {@link * DataTypes#TIME()}, this is a synonym for {@link Expressions#currentTime()}. */ diff --git a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala index dd655b0..7aca513 100644 --- a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala +++ b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala @@ -446,6 +446,26 @@ trait ImplicitExpressionConversions { } /** + * Returns the current watermark for the given rowtime attribute, or `NULL` if no common watermark + * of all upstream operations is available at the current operation in the pipeline. + * + * The function returns the watermark with the same type as the rowtime attribute, but with + * an adjusted precision of 3. For example, if the rowtime attribute is + * [[DataTypes.TIMESTAMP_LTZ(int) TIMESTAMP_LTZ(9)]], the function will return + * [[DataTypes.TIMESTAMP_LTZ(int) TIMESTAMP_LTZ(3)]]. + * + * If no watermark has been emitted yet, the function will return `NULL`. Users must take care of + * this when comparing against it, e.g. in order to filter out late data you can use + * + * {{{ + * WHERE CURRENT_WATERMARK(ts) IS NULL OR ts > CURRENT_WATERMARK(ts) + * }}} + */ + def currentWatermark(rowtimeAttribute: Expression): Expression = { + Expressions.currentWatermark(rowtimeAttribute) + } + + /** * Returns the current SQL time in local time zone, * the return type of this expression is [[DataTypes.TIME]], * this is a synonym for [[ImplicitExpressionConversions.currentTime()]]. diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java index 7d626a2..d4c76c6 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java @@ -1387,6 +1387,16 @@ public final class BuiltInFunctionDefinitions { .outputTypeStrategy(TypeStrategies.MISSING) .build(); + public static final BuiltInFunctionDefinition CURRENT_WATERMARK = + BuiltInFunctionDefinition.newBuilder() + .name("CURRENT_WATERMARK") + .kind(SCALAR) + .inputTypeStrategy(InputTypeStrategies.SPECIFIC_FOR_CURRENT_WATERMARK) + .outputTypeStrategy(TypeStrategies.CURRENT_WATERMARK) + .notDeterministic() + .runtimeProvided() + .build(); + // -------------------------------------------------------------------------------------------- // Over window // -------------------------------------------------------------------------------------------- diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java index 6e883ee..7838bef 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java @@ -29,6 +29,7 @@ import org.apache.flink.table.types.inference.strategies.CommonInputTypeStrategy import org.apache.flink.table.types.inference.strategies.ComparableTypeStrategy; import org.apache.flink.table.types.inference.strategies.CompositeArgumentTypeStrategy; import org.apache.flink.table.types.inference.strategies.ConstraintArgumentTypeStrategy; +import org.apache.flink.table.types.inference.strategies.CurrentWatermarkInputTypeStrategy; import org.apache.flink.table.types.inference.strategies.ExplicitArgumentTypeStrategy; import org.apache.flink.table.types.inference.strategies.FamilyArgumentTypeStrategy; import org.apache.flink.table.types.inference.strategies.LiteralArgumentTypeStrategy; @@ -338,6 +339,10 @@ public final class InputTypeStrategies { public static final InputTypeStrategy TWO_EQUALS_COMPARABLE = comparable(ConstantArgumentCount.of(2), StructuredComparision.EQUALS); + /** Strategy specific for {@link BuiltInFunctionDefinitions#CURRENT_WATERMARK}. */ + public static final InputTypeStrategy SPECIFIC_FOR_CURRENT_WATERMARK = + new CurrentWatermarkInputTypeStrategy(); + // -------------------------------------------------------------------------------------------- private InputTypeStrategies() { diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeStrategies.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeStrategies.java index d65b151..34ffc68 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeStrategies.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeStrategies.java @@ -20,6 +20,7 @@ package org.apache.flink.table.types.inference; import org.apache.flink.annotation.Internal; import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.inference.strategies.CommonTypeStrategy; import org.apache.flink.table.types.inference.strategies.ExplicitTypeStrategy; @@ -435,6 +436,24 @@ public final class TypeStrategies { }; /** + * Type strategy for {@link BuiltInFunctionDefinitions#CURRENT_WATERMARK} which mirrors the type + * of the passed rowtime column, but removes the rowtime kind and enforces the correct precision + * for watermarks. + */ + public static final TypeStrategy CURRENT_WATERMARK = + callContext -> { + final LogicalType inputType = + callContext.getArgumentDataTypes().get(0).getLogicalType(); + if (hasRoot(inputType, LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)) { + return Optional.of(DataTypes.TIMESTAMP(3)); + } else if (hasRoot(inputType, LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)) { + return Optional.of(DataTypes.TIMESTAMP_LTZ(3)); + } + + return Optional.empty(); + }; + + /** * Type strategy specific for aggregations that partially produce different nullability * depending whether the result is grouped or not. */ diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CurrentWatermarkInputTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CurrentWatermarkInputTypeStrategy.java new file mode 100644 index 0000000..f20a7c0 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CurrentWatermarkInputTypeStrategy.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference.strategies; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.ArgumentCount; +import org.apache.flink.table.types.inference.CallContext; +import org.apache.flink.table.types.inference.ConstantArgumentCount; +import org.apache.flink.table.types.inference.InputTypeStrategy; +import org.apache.flink.table.types.inference.Signature; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.TimestampKind; +import org.apache.flink.table.types.logical.utils.LogicalTypeChecks; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +/** + * {@link InputTypeStrategy} specific for {@link BuiltInFunctionDefinitions#CURRENT_WATERMARK}. + * + * <p>It expects a single argument representing a rowtime attribute. + */ +@Internal +public class CurrentWatermarkInputTypeStrategy implements InputTypeStrategy { + + @Override + public ArgumentCount getArgumentCount() { + return ConstantArgumentCount.of(1); + } + + @Override + public Optional<List<DataType>> inferInputTypes( + CallContext callContext, boolean throwOnFailure) { + final List<DataType> argumentDataTypes = callContext.getArgumentDataTypes(); + + final DataType dataType = argumentDataTypes.get(0); + if (!LogicalTypeChecks.canBeTimeAttributeType(dataType.getLogicalType())) { + if (throwOnFailure) { + throw callContext.newValidationError( + "CURRENT_WATERMARK() must be called with a single rowtime attribute argument, but '%s' cannot be a time attribute.", + dataType.getLogicalType().asSummaryString()); + } + + return Optional.empty(); + } + + if (!LogicalTypeChecks.isRowtimeAttribute(dataType.getLogicalType())) { + if (throwOnFailure) { + throw callContext.newValidationError( + "The argument of CURRENT_WATERMARK() must be a rowtime attribute, but was '%s'.", + dataType.getLogicalType().asSummaryString()); + } + + return Optional.empty(); + } + + return Optional.of(Collections.singletonList(dataType)); + } + + @Override + public List<Signature> getExpectedSignatures(FunctionDefinition definition) { + final List<Signature> signatures = new ArrayList<>(); + signatures.add(createExpectedSignature(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)); + signatures.add(createExpectedSignature(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)); + return signatures; + } + + private static Signature createExpectedSignature(LogicalTypeRoot typeRoot) { + final String argument = String.format("<%s *%s*>", typeRoot, TimestampKind.ROWTIME); + return Signature.of(Signature.Argument.of(argument)); + } +} diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTestBase.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTestBase.java index 52786ef..4f3bff80 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTestBase.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTestBase.java @@ -178,56 +178,56 @@ public abstract class InputTypeStrategiesTestBase { this.strategy = strategy; } - static TestSpec forStrategy(InputTypeStrategy strategy) { + public static TestSpec forStrategy(InputTypeStrategy strategy) { return new TestSpec(null, strategy); } - static TestSpec forStrategy(String description, InputTypeStrategy strategy) { + public static TestSpec forStrategy(String description, InputTypeStrategy strategy) { return new TestSpec(description, strategy); } - TestSpec namedArguments(String... names) { + public TestSpec namedArguments(String... names) { this.namedArguments = Arrays.asList(names); return this; } - TestSpec typedArguments(DataType... dataTypes) { + public TestSpec typedArguments(DataType... dataTypes) { this.typedArguments = Arrays.asList(dataTypes); return this; } - TestSpec surroundingStrategy(InputTypeStrategy surroundingStrategy) { + public TestSpec surroundingStrategy(InputTypeStrategy surroundingStrategy) { this.surroundingStrategy = surroundingStrategy; return this; } - TestSpec calledWithArgumentTypes(AbstractDataType<?>... dataTypes) { + public TestSpec calledWithArgumentTypes(AbstractDataType<?>... dataTypes) { this.actualArgumentTypes.add(resolveDataTypes(dataTypes)); return this; } - TestSpec calledWithLiteralAt(int pos) { + public TestSpec calledWithLiteralAt(int pos) { this.literalPos = pos; return this; } - TestSpec calledWithLiteralAt(int pos, Object value) { + public TestSpec calledWithLiteralAt(int pos, Object value) { this.literalPos = pos; this.literalValue = value; return this; } - TestSpec expectSignature(String signature) { + public TestSpec expectSignature(String signature) { this.expectedSignature = signature; return this; } - TestSpec expectArgumentTypes(AbstractDataType<?>... dataTypes) { + public TestSpec expectArgumentTypes(AbstractDataType<?>... dataTypes) { this.expectedArgumentTypes = resolveDataTypes(dataTypes); return this; } - TestSpec expectErrorMessage(String expectedErrorMessage) { + public TestSpec expectErrorMessage(String expectedErrorMessage) { this.expectedErrorMessage = expectedErrorMessage; return this; } diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/TypeStrategiesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/TypeStrategiesTest.java index 618f9a5..55985a3 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/TypeStrategiesTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/TypeStrategiesTest.java @@ -27,9 +27,13 @@ import org.apache.flink.table.types.FieldsDataType; import org.apache.flink.table.types.inference.utils.CallContextMock; import org.apache.flink.table.types.inference.utils.FunctionDefinitionMock; import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; import org.apache.flink.table.types.logical.LogicalTypeFamily; import org.apache.flink.table.types.logical.StructuredType; +import org.apache.flink.table.types.logical.TimestampKind; +import org.apache.flink.table.types.logical.TimestampType; import org.apache.flink.table.types.logical.utils.LogicalTypeMerging; +import org.apache.flink.table.types.utils.TypeConversions; import org.junit.Assert; import org.junit.Rule; @@ -311,7 +315,21 @@ public class TypeStrategiesTest { "Average without grouped aggregation", TypeStrategies.aggArg0(LogicalTypeMerging::findAvgAggType, true)) .inputTypes(DataTypes.INT().notNull()) - .expectDataType(DataTypes.INT())); + .expectDataType(DataTypes.INT()), + + // CURRENT_WATERMARK + TestSpec.forStrategy("TIMESTAMP(3) *ROWTIME*", TypeStrategies.CURRENT_WATERMARK) + .inputTypes(createRowtimeType(TimestampKind.ROWTIME, 3).notNull()) + .expectDataType(DataTypes.TIMESTAMP(3)), + TestSpec.forStrategy("TIMESTAMP_LTZ(3) *ROWTIME*", TypeStrategies.CURRENT_WATERMARK) + .inputTypes(createRowtimeLtzType(TimestampKind.ROWTIME, 3).notNull()) + .expectDataType(DataTypes.TIMESTAMP_LTZ(3)), + TestSpec.forStrategy("TIMESTAMP(9) *ROWTIME*", TypeStrategies.CURRENT_WATERMARK) + .inputTypes(createRowtimeType(TimestampKind.ROWTIME, 9).notNull()) + .expectDataType(DataTypes.TIMESTAMP(3)), + TestSpec.forStrategy("TIMESTAMP_LTZ(9) *ROWTIME*", TypeStrategies.CURRENT_WATERMARK) + .inputTypes(createRowtimeLtzType(TimestampKind.ROWTIME, 9).notNull()) + .expectDataType(DataTypes.TIMESTAMP_LTZ(3))); } @Parameter public TestSpec testSpec; @@ -443,4 +461,13 @@ public class TypeStrategiesTest { explicit(DataTypes.STRING())); return TypeStrategies.mapping(mappings); } + + private static DataType createRowtimeType(TimestampKind kind, int precision) { + return TypeConversions.fromLogicalToDataType(new TimestampType(false, kind, precision)); + } + + private static DataType createRowtimeLtzType(TimestampKind kind, int precision) { + return TypeConversions.fromLogicalToDataType( + new LocalZonedTimestampType(false, kind, precision)); + } } diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/CurrentWatermarkInputTypeStrategyTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/CurrentWatermarkInputTypeStrategyTest.java new file mode 100644 index 0000000..742d679 --- /dev/null +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/CurrentWatermarkInputTypeStrategyTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference.strategies; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.InputTypeStrategies; +import org.apache.flink.table.types.inference.InputTypeStrategiesTestBase; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.TimestampKind; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.utils.TypeConversions; + +import org.junit.runners.Parameterized; + +import java.util.List; + +import static java.util.Arrays.asList; + +/** Tests for {@link CurrentWatermarkInputTypeStrategy}. */ +public class CurrentWatermarkInputTypeStrategyTest extends InputTypeStrategiesTestBase { + + @Parameterized.Parameters(name = "{index}: {0}") + public static List<TestSpec> testData() { + return asList( + TestSpec.forStrategy( + "TIMESTAMP(3) *ROWTIME* works", + InputTypeStrategies.SPECIFIC_FOR_CURRENT_WATERMARK) + .calledWithArgumentTypes(createRowtimeType(TimestampKind.ROWTIME, 3)) + .expectArgumentTypes(createRowtimeType(TimestampKind.ROWTIME, 3)), + TestSpec.forStrategy( + "TIMESTAMP_LTZ(3) *ROWTIME* works", + InputTypeStrategies.SPECIFIC_FOR_CURRENT_WATERMARK) + .calledWithArgumentTypes(createRowtimeLtzType(TimestampKind.ROWTIME, 3)) + .expectArgumentTypes(createRowtimeLtzType(TimestampKind.ROWTIME, 3)), + TestSpec.forStrategy( + "TIMESTAMP(3) doesn't work", + InputTypeStrategies.SPECIFIC_FOR_CURRENT_WATERMARK) + .calledWithArgumentTypes(createRowtimeType(TimestampKind.REGULAR, 3)) + .expectErrorMessage( + "The argument of CURRENT_WATERMARK() must be a rowtime attribute, but was 'TIMESTAMP(3) NOT NULL'."), + TestSpec.forStrategy( + "TIMESTAMP_LTZ(3) doesn't work", + InputTypeStrategies.SPECIFIC_FOR_CURRENT_WATERMARK) + .calledWithArgumentTypes(createRowtimeLtzType(TimestampKind.REGULAR, 3)) + .expectErrorMessage( + "The argument of CURRENT_WATERMARK() must be a rowtime attribute, but was 'TIMESTAMP_LTZ(3) NOT NULL'."), + TestSpec.forStrategy( + "BIGINT doesn't work", + InputTypeStrategies.SPECIFIC_FOR_CURRENT_WATERMARK) + .calledWithArgumentTypes(DataTypes.BIGINT()) + .expectErrorMessage( + "CURRENT_WATERMARK() must be called with a single rowtime attribute argument, but 'BIGINT' cannot be a time attribute.")); + } + + private static DataType createRowtimeType(TimestampKind kind, int precision) { + return TypeConversions.fromLogicalToDataType(new TimestampType(false, kind, precision)); + } + + private static DataType createRowtimeLtzType(TimestampKind kind, int precision) { + return TypeConversions.fromLogicalToDataType( + new LocalZonedTimestampType(false, kind, precision)); + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala index ce8eb42..7dafd09 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala @@ -41,11 +41,11 @@ import org.apache.flink.table.runtime.typeutils.TypeCheckUtils.{isNumeric, isTem import org.apache.flink.table.types.logical._ import org.apache.flink.table.types.logical.utils.LogicalTypeChecks.{getFieldCount, isCompositeType} import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo - import org.apache.calcite.rex._ import org.apache.calcite.sql.{SqlKind, SqlOperator} import org.apache.calcite.sql.`type`.{ReturnTypes, SqlTypeName} import org.apache.calcite.util.{Sarg, TimestampString} +import org.apache.flink.table.functions.BuiltInFunctionDefinitions import scala.collection.JavaConversions._ @@ -828,6 +828,10 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean) tsf.makeFunction(getOperandLiterals(operands), operands.map(_.resultType).toArray)) .generate(ctx, operands, resultType) + case bsf: BridgingSqlFunction + if bsf.getDefinition eq BuiltInFunctionDefinitions.CURRENT_WATERMARK => + generateWatermark(ctx, contextTerm, resultType) + case _: BridgingSqlFunction => new BridgingSqlFunctionCallGen(call).generate(ctx, operands, resultType) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala index 1f744cb..c3bedc4 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala @@ -543,6 +543,26 @@ object GenerateUtils { resultType) } + def generateWatermark( + ctx: CodeGeneratorContext, + contextTerm: String, + resultType: LogicalType): GeneratedExpression = { + val resultTypeTerm = primitiveTypeTermForType(resultType) + val Seq(resultTerm, nullTerm, currentWatermarkTerm) = ctx.addReusableLocalVariables( + (resultTypeTerm, "result"), + ("boolean", "isNull"), + ("long", "currentWatermark") + ) + + val code = + s""" + |$currentWatermarkTerm = $contextTerm.timerService().currentWatermark(); + |$nullTerm = ($currentWatermarkTerm == java.lang.Long.MIN_VALUE); + |$resultTerm = $TIMESTAMP_DATA.fromEpochMillis($currentWatermarkTerm); + |""".stripMargin.trim + GeneratedExpression(resultTerm, nullTerm, code, resultType) + } + /** * Generates access to a field of the input. * @param ctx code generator context which maintains various code statements. diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala index 1bc6141..28d4b1b 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala @@ -33,12 +33,15 @@ import org.apache.flink.table.runtime.typeutils.InternalTypeInfo import org.apache.flink.table.runtime.typeutils.MapDataSerializerTest.CustomMapData import org.apache.flink.table.types.logical.{BigIntType, IntType, VarCharType} import org.apache.flink.table.utils.LegacyRowResource +import org.apache.flink.test.util.TestBaseUtils import org.apache.flink.types.Row import org.apache.flink.util.CollectionUtil import java.util import org.junit.Assert._ import org.junit._ + +import java.time.Instant import scala.collection.JavaConversions._ import scala.collection.Seq @@ -433,4 +436,99 @@ class CalcITCase extends StreamingTestBase { val expected = List("{a=0.12, b=0.50}") assertEquals(expected.sorted, sink.getAppendResults.sorted) } + + @Test + def testCurrentWatermark(): Unit = { + val rows = Seq( + row(1, Instant.ofEpochSecond(644326662L)), + row(2, Instant.ofEpochSecond(1622466300L)), + row(3, Instant.ofEpochSecond(1622466300L)) + ) + val tableId = TestValuesTableFactory.registerData(rows) + + // We need a fixed timezone to make sure this test can run on machines across the world + tEnv.getConfig.getConfiguration.setString("table.local-time-zone", "Europe/Berlin") + + tEnv.executeSql(s""" + |CREATE TABLE T ( + | id INT, + | ts TIMESTAMP_LTZ(3), + | WATERMARK FOR ts AS ts + |) WITH ( + | 'connector' = 'values', + | 'data-id' = '$tableId', + | 'bounded' = 'true' + |) + """.stripMargin) + + // Table API + val result1 = tEnv.from("T") + .select($("id"), currentWatermark($("ts"))) + .execute().collect().toList + TestBaseUtils.compareResultAsText(result1, + """1,null + |2,1990-06-02T11:37:42Z + |3,2021-05-31T13:05:00Z + |""".stripMargin) + + // SQL + val result2 = tEnv.sqlQuery("SELECT id, CURRENT_WATERMARK(ts) FROM T") + .execute().collect().toList + TestBaseUtils.compareResultAsText(result2, + """1,null + |2,1990-06-02T11:37:42Z + |3,2021-05-31T13:05:00Z + |""".stripMargin) + + val result3 = tEnv.sqlQuery( + """ + |SELECT id FROM T WHERE CURRENT_WATERMARK(ts) IS NULL OR ts > CURRENT_WATERMARK(ts) + |""".stripMargin) + .execute().collect().toList + TestBaseUtils.compareResultAsText(result3, + """1 + |2 + |""".stripMargin) + + val result4 = tEnv.sqlQuery( + """ + |SELECT + | TUMBLE_END(ts, INTERVAL '1' SECOND), + | CURRENT_WATERMARK(ts) + |FROM T + |GROUP BY + | TUMBLE(ts, INTERVAL '1' SECOND), + | CURRENT_WATERMARK(ts) + |""".stripMargin) + .execute().collect().toList + TestBaseUtils.compareResultAsText(result4, + """1990-06-02T13:37:43,null + |2021-05-31T15:05:01,1990-06-02T11:37:42Z + |2021-05-31T15:05:01,2021-05-31T13:05:00Z + |""".stripMargin) + } + + @Test + def testCurrentWatermarkForNonRowtimeAttribute(): Unit = { + val tableId = TestValuesTableFactory.registerData(Seq()) + tEnv.executeSql(s""" + |CREATE TABLE T ( + | ts TIMESTAMP_LTZ(3) + |) WITH ( + | 'connector' = 'values', + | 'data-id' = '$tableId', + | 'bounded' = 'true' + |) + """.stripMargin) + + try { + tEnv.sqlQuery("SELECT CURRENT_WATERMARK(ts) FROM T") + fail("CURRENT_WATERMARK for a non-rowtime attribute should have failed."); + } catch { + case e: Exception => assertEquals( + "SQL validation failed. Invalid function call:\n" + + "CURRENT_WATERMARK(TIMESTAMP_LTZ(3))", e.getMessage) + } + } + }
