twalthr commented on a change in pull request #16046:
URL: https://github.com/apache/flink/pull/16046#discussion_r647443804



##########
File path: docs/data/sql_functions.yml
##########
@@ -485,6 +485,14 @@ temporal:
     description: "Converts a epoch seconds or epoch milliseconds to a 
TIMESTAMP_LTZ, the valid precision is 0 or 3, the 0 represents 
TO_TIMESTAMP_LTZ(epochSeconds, 0), the 3 represents 
TO_TIMESTAMP_LTZ(epochMilliseconds, 3)."
   - sql: TO_TIMESTAMP(string1[, string2])
     description: "Converts date time string string1 with format string2 (by 
default: 'yyyy-MM-dd HH:mm:ss') under the session time zone (specified by 
TableConfig) to a timestamp."
+  - sql: CURRENT_WATERMARK(rowtime)
+    description: |
+      Returns the current watermark for the given rowtime attribute, or NULL 
if no watermark has been emitted yet.
+      The return type of the function is inferred to match that of the 
provided rowtime attribute, but with an adjusted precision of 3. For example, 
if the rowtime attribute is TIMESTAMP_LTZ(9), the function will return 
TIMESTAMP_LTZ(3).
+
+      Note that this function can return NULL, and you may have to consider 
this case. For example, if you want to filter out late data you can use:
+
+      WHERE CURRENT_WATERMARK(ts) IS NULL OR ts > CURRENT_WATERMARK(ts)

Review comment:
       have you checked if it is possible to use monospace here to make the 
code more readable after generation?

##########
File path: flink-python/pyflink/table/expressions.py
##########
@@ -203,6 +204,23 @@ def current_timestamp() -> Expression:
     return _leaf_op("currentTimestamp")
 
 
+def current_watermark(rowtimeAttribute) -> Expression:
+    """
+    Returns the current watermark for the given rowtime attribute, or None if 
no watermark has been
+    emitted yet.
+
+    The function returns the watermark with the same type as the rowtime 
attribute, but with an
+    adjusted precision of 3. For example, if the rowtime attribute is 
TIMESTAMP_LTZ(9), the
+    function will return TIMESTAMP_LTZ(3).
+
+    If no watermark has been emitted yet, the function will return {@code 
NULL}. Users must take
+    care of this when comparing against it, e.g. in order to filter out late 
data you can use
+
+        WHERE CURRENT_WATERMARK(ts) IS NULL OR ts > CURRENT_WATERMARK(ts)

Review comment:
       use Python syntax for code examples?

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala
##########
@@ -543,6 +543,32 @@ object GenerateUtils {
       resultType)
   }
 
+  def generateWatermark(
+      ctx: CodeGeneratorContext,
+      contextTerm: String,
+      isTimestampLtz: Boolean): GeneratedExpression = {
+    val resultType = if (isTimestampLtz) {
+      new LocalZonedTimestampType(3)
+    } else {
+      new TimestampType(3)
+    }
+
+    val resultTypeTerm = primitiveTypeTermForType(resultType)
+    val Seq(resultTerm, nullTerm, currentWatermarkTerm) = 
ctx.addReusableLocalVariables(
+      (resultTypeTerm, "result"),
+      ("boolean", "isNull"),
+      ("long", "currentWatermark")
+    )
+
+    val code =
+      s"""
+         |$currentWatermarkTerm = 
$contextTerm.timerService().currentWatermark();

Review comment:
       I'm wondering if we should wrap this in a try catch, I'm sure it doesn't 
work in every combination. but before that, we should figure out when it 
doesn't work, e.g. what happens when used in DDL

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java
##########
@@ -53,19 +53,23 @@
 
     private final boolean isDeterministic;
 
+    private final boolean deferredRuntimeImplementation;
+
     private @Nullable String runtimeClass;
 
     private BuiltInFunctionDefinition(
             String name,
             FunctionKind kind,
             TypeInference typeInference,
             boolean isDeterministic,
+            boolean deferredRuntimeImplementation,

Review comment:
       nit: I thought about naming again, how about `runtimeProvided` next to 
`runtimeClass`? Because also `runtimeClass` is kind of deferred.

##########
File path: flink-python/pyflink/table/expressions.py
##########
@@ -203,6 +204,23 @@ def current_timestamp() -> Expression:
     return _leaf_op("currentTimestamp")
 
 
+def current_watermark(rowtimeAttribute) -> Expression:
+    """
+    Returns the current watermark for the given rowtime attribute, or None if 
no watermark has been

Review comment:
       `None` -> `NULL`

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
##########
@@ -211,6 +211,27 @@ public static ApiExpression currentTimestamp() {
         return apiCall(BuiltInFunctionDefinitions.CURRENT_TIMESTAMP);
     }
 
+    /**
+     * Returns the current watermark for the given rowtime attribute, or 
{@code NULL} if no
+     * watermark has been emitted yet.
+     *
+     * <p>The function returns the watermark with the same type as the rowtime 
attribute, but with
+     * an adjusted precision of 3. For example, if the rowtime attribute is 
{@link
+     * DataTypes#TIMESTAMP_LTZ(int) TIMESTAMP_LTZ(9)}, the function will 
return {@link
+     * DataTypes#TIMESTAMP_LTZ(int) TIMESTAMP_LTZ(3)}.
+     *
+     * <p>If no watermark has been emitted yet, the function will return 
{@code NULL}. Users must
+     * take care of this when comparing against it, e.g. in order to filter 
out late data you can
+     * use
+     *
+     * <pre>{@code
+     * WHERE CURRENT_WATERMARK(ts) IS NULL OR ts > CURRENT_WATERMARK(ts)
+     * }</pre>
+     */
+    public static ApiExpression currentWatermark(Object rowtimeAttribute) {

Review comment:
       was it a deliberate decision to us prefix notation instead of infix 
notation? In theory, we could allow a more fluent API `ts.currentWatermark()`.

##########
File path: flink-python/pyflink/table/expressions.py
##########
@@ -203,6 +204,23 @@ def current_timestamp() -> Expression:
     return _leaf_op("currentTimestamp")
 
 
+def current_watermark(rowtimeAttribute) -> Expression:
+    """
+    Returns the current watermark for the given rowtime attribute, or None if 
no watermark has been
+    emitted yet.
+
+    The function returns the watermark with the same type as the rowtime 
attribute, but with an
+    adjusted precision of 3. For example, if the rowtime attribute is 
TIMESTAMP_LTZ(9), the
+    function will return TIMESTAMP_LTZ(3).
+
+    If no watermark has been emitted yet, the function will return {@code 
NULL}. Users must take

Review comment:
       convert `{@code }` to Python

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
##########
@@ -211,6 +211,27 @@ public static ApiExpression currentTimestamp() {
         return apiCall(BuiltInFunctionDefinitions.CURRENT_TIMESTAMP);
     }
 
+    /**
+     * Returns the current watermark for the given rowtime attribute, or 
{@code NULL} if no
+     * watermark has been emitted yet.

Review comment:
       `if no watermark has been emitted yet.`
   
   There could have been watermarks emitted but maybe not for every parallel 
instance. Maybe we can rephrase this a bit, sth. along `if no common watermark 
of all upstream operations is available at the current operation in the 
pipeline.`

##########
File path: 
flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
##########
@@ -445,6 +445,13 @@ trait ImplicitExpressionConversions {
     Expressions.currentTimestamp()
   }
 
+  /**
+   * Returns the current watermark for the given rowtime attribute.

Review comment:
       we also need to the full docs here

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CurrentWatermarkInputTypeStrategy.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.ConstantArgumentCount;
+import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * {@link InputTypeStrategy} specific for {@link 
BuiltInFunctionDefinitions#CURRENT_WATERMARK}.
+ *
+ * <p>It expects a single argument representing a rowtime attribute.
+ */
+@Internal
+public class CurrentWatermarkInputTypeStrategy implements InputTypeStrategy {
+
+    @Override
+    public ArgumentCount getArgumentCount() {
+        return ConstantArgumentCount.of(1);
+    }
+
+    @Override
+    public Optional<List<DataType>> inferInputTypes(
+            CallContext callContext, boolean throwOnFailure) {
+        final List<DataType> argumentDataTypes = 
callContext.getArgumentDataTypes();
+        if (argumentDataTypes.isEmpty()) {

Review comment:
       this should be checked by the argument count already.

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CurrentWatermarkInputTypeStrategy.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.ConstantArgumentCount;
+import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * {@link InputTypeStrategy} specific for {@link 
BuiltInFunctionDefinitions#CURRENT_WATERMARK}.
+ *
+ * <p>It expects a single argument representing a rowtime attribute.
+ */
+@Internal
+public class CurrentWatermarkInputTypeStrategy implements InputTypeStrategy {

Review comment:
       package private?

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
##########
@@ -828,6 +828,15 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, 
nullableInput: Boolean)
           tsf.makeFunction(getOperandLiterals(operands), 
operands.map(_.resultType).toArray))
             .generate(ctx, operands, resultType)
 
+      case bsf: BridgingSqlFunction
+        if bsf.getDefinition eq BuiltInFunctionDefinitions.CURRENT_WATERMARK =>
+          val isTimestampLtz = resultType.getTypeRoot match {

Review comment:
       you could simplify the code further by passing `resultType` directly to 
`generateWatermark`

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
##########
@@ -1387,6 +1387,16 @@
                     .outputTypeStrategy(TypeStrategies.MISSING)
                     .build();
 
+    public static final BuiltInFunctionDefinition CURRENT_WATERMARK =
+            BuiltInFunctionDefinition.newBuilder()
+                    .name("CURRENT_WATERMARK")
+                    .kind(SCALAR)
+                    
.inputTypeStrategy(InputTypeStrategies.SPECIFIC_FOR_CURRENT_WATERMARK)
+                    .outputTypeStrategy(TypeStrategies.CURRENT_WATERMARK)

Review comment:
       nit: at some point we should split the specific input and output 
strategies to a dedicated package private class instead of further poluting 
common `InputTypeStrategies` and `TypeStrategies`

##########
File path: 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/TypeStrategiesTest.java
##########
@@ -311,7 +315,21 @@
                                 "Average without grouped aggregation",
                                 
TypeStrategies.aggArg0(LogicalTypeMerging::findAvgAggType, true))
                         .inputTypes(DataTypes.INT().notNull())
-                        .expectDataType(DataTypes.INT()));
+                        .expectDataType(DataTypes.INT()),
+
+                // CURRENT_WATERMARK
+                TestSpec.forStrategy("TIMESTAMP(3) *ROWTIME*", 
TypeStrategies.CURRENT_WATERMARK)

Review comment:
       same comment as above, let's start separating the specific strategies 
from the frequently used ones, otherwise this could become quite messy. 
actually it is already quite messy. maybe `TimeFunctionStratgiesTest`, 
`CollectionFunctionStrategiesTest` etc. ?

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CurrentWatermarkInputTypeStrategy.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.ConstantArgumentCount;
+import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * {@link InputTypeStrategy} specific for {@link 
BuiltInFunctionDefinitions#CURRENT_WATERMARK}.
+ *
+ * <p>It expects a single argument representing a rowtime attribute.
+ */
+@Internal
+public class CurrentWatermarkInputTypeStrategy implements InputTypeStrategy {
+
+    @Override
+    public ArgumentCount getArgumentCount() {
+        return ConstantArgumentCount.of(1);
+    }
+
+    @Override
+    public Optional<List<DataType>> inferInputTypes(
+            CallContext callContext, boolean throwOnFailure) {
+        final List<DataType> argumentDataTypes = 
callContext.getArgumentDataTypes();
+        if (argumentDataTypes.isEmpty()) {
+            return Optional.empty();
+        }
+
+        final DataType dataType = argumentDataTypes.get(0);
+        if 
(!LogicalTypeChecks.canBeTimeAttributeType(dataType.getLogicalType())) {

Review comment:
       We can remove this branch. `isRowtimeAttribute` should do the job, no?

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
##########
@@ -433,4 +435,61 @@ class CalcITCase extends StreamingTestBase {
     val expected = List("{a=0.12, b=0.50}")
     assertEquals(expected.sorted, sink.getAppendResults.sorted)
   }
+
+  @Test
+  def testCurrentWatermark(): Unit = {

Review comment:
       add one test for Table API

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
##########
@@ -433,4 +435,61 @@ class CalcITCase extends StreamingTestBase {
     val expected = List("{a=0.12, b=0.50}")
     assertEquals(expected.sorted, sink.getAppendResults.sorted)
   }
+
+  @Test
+  def testCurrentWatermark(): Unit = {
+    val rows = Seq(
+      row(1, Instant.ofEpochSecond(644326662L)),
+      row(2, Instant.ofEpochSecond(1622466300L)),
+      row(3, Instant.ofEpochSecond(1622466300L))
+    )
+    val tableId = TestValuesTableFactory.registerData(rows)
+
+    tEnv.executeSql(s"""
+                       |CREATE TABLE T (
+                       |  id INT,
+                       |  ts TIMESTAMP_LTZ(3),
+                       |  WATERMARK FOR ts AS ts
+                       |) WITH (
+                       |  'connector' = 'values',
+                       |  'data-id' = '$tableId',
+                       |  'bounded' = 'true'
+                       |)
+       """.stripMargin)
+
+    val result = tEnv.sqlQuery("SELECT id, CURRENT_WATERMARK(ts) FROM 
T").toAppendStream[Row]

Review comment:
       use the new APIs with `collect()`, `toAppendStream` is the pre FLIP-136 
API

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
##########
@@ -433,4 +435,61 @@ class CalcITCase extends StreamingTestBase {
     val expected = List("{a=0.12, b=0.50}")
     assertEquals(expected.sorted, sink.getAppendResults.sorted)
   }
+
+  @Test
+  def testCurrentWatermark(): Unit = {

Review comment:
       test the example that we also documented using WHERE

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
##########
@@ -433,4 +435,61 @@ class CalcITCase extends StreamingTestBase {
     val expected = List("{a=0.12, b=0.50}")
     assertEquals(expected.sorted, sink.getAppendResults.sorted)
   }
+
+  @Test
+  def testCurrentWatermark(): Unit = {

Review comment:
       test some more complex example, maybe with a window or window join 
before calling `CURRENT_WATERMARK`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to