leonardBang commented on a change in pull request #15821:
URL: https://github.com/apache/flink/pull/15821#discussion_r631550903
##########
File path: docs/data/sql_functions.yml
##########
@@ -682,4 +688,4 @@ aggregate:
- sql: LAST_VALUE(expression)
description: Returns the last value in an ordered set of values.
- sql: LISTAGG(expression [, separator])
- description: Concatenates the values of string expressions and places
separator values between them. The separator is not added at the end of string.
The default value of separator is ','.
\ No newline at end of file
+ description: Concatenates the values of string expressions and places
separator values between them. The separator is not added at the end of string.
The default value of separator is ','.
Review comment:
revert this ?
##########
File path:
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/GreatestLeastFunctionsITCase.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+
+import org.junit.runners.Parameterized;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.table.api.Expressions.$;
+import static org.apache.flink.table.api.Expressions.greatest;
+import static org.apache.flink.table.api.Expressions.least;
+
+/** Tests for GREATEST, LEAST functions {@link BuiltInFunctionDefinitions}. */
+public class GreatestLeastFunctionsITCase extends BuiltInFunctionTestBase {
+
+ @Parameterized.Parameters(name = "{index}: {0}")
+ public static List<TestSpec> testData() {
+ return Arrays.asList(
+ TestSpec.forFunction(BuiltInFunctionDefinitions.GREATEST)
+ .onFieldsWithData(null, 1, 2, 3.14, "hello", "world")
+ .andDataTypes(
+ DataTypes.INT().nullable(),
+ DataTypes.INT().notNull(),
+ DataTypes.INT().notNull(),
+ DataTypes.DECIMAL(3, 2).notNull(),
+ DataTypes.STRING().notNull(),
+ DataTypes.STRING().notNull())
Review comment:
we should cover all Flink SQL types here, and for Table API test we can
use
```
call("LEAST", $("f1"), $("f3"), $("f2"))
.cast(DataTypes.DECIMAL(3, 2))
```
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
##########
@@ -494,6 +494,28 @@ public static ApiExpression ifThenElse(Object condition,
Object ifTrue, Object i
return apiCall(BuiltInFunctionDefinitions.IF, condition, ifTrue,
ifFalse);
}
+ /**
+ * Creates an expression that returns the greatest from the input.
+ *
+ * <p>e.g. greatest(1, 2, 3) leads to 3, greatest("hello", "world") leads
to "world".
+ *
+ * <p>Note: if at least one input parameter is null the return value will
be null.
+ */
+ public static ApiExpression greatest(Object head, Object... tail) {
+ return apiCallAtLeastOneArgument(BuiltInFunctionDefinitions.GREATEST,
head, tail);
+ }
+
+ /**
+ * Creates an expression that returns the least from the input.
+ *
+ * <p>e.g. least(1, 2, 3) leads to 1, least("hello", "world") leads to
"hello".
+ *
+ * <p>Note: if at least one input parameter is null the return value will
be null.
+ */
+ public static ApiExpression least(Object head, Object... tail) {
+ return apiCallAtLeastOneArgument(BuiltInFunctionDefinitions.LEAST,
head, tail);
+ }
+
Review comment:
It's recommended to don't add the function in expression API, because we
can call built-in function in expression.
##########
File path: flink-python/pyflink/table/expressions.py
##########
@@ -505,6 +505,34 @@ def if_then_else(condition: Union[bool, Expression[bool]],
if_true, if_false) ->
return _ternary_op("ifThenElse", condition, if_true, if_false)
+def greatest(head, *tail) -> Expression:
+ """
+ Returns the greatest element if there is no null elements.
+ Returns null if there is at least one null element.
+
+ e.g. greatest(1, 2, 3) leads to 3
+
+ .. seealso:: :func:`~pyflink.table.expressions.least`
+ """
+ gateway = get_gateway()
+ tail = to_jarray(gateway.jvm.Object, [_get_java_expression(t) for t in
tail])
+ return _binary_op("greatest", head, tail)
+
+
+def least(head, *tail) -> Expression:
+ """
+ Returns the least element if there is no null elements.
+ Returns null if there is at least one null element.
+
+ e.g. least(1, 2, 3) leads to 1
+
+ .. seealso:: :func:`~pyflink.table.expressions.greatest`
+ """
+ gateway = get_gateway()
+ tail = to_jarray(gateway.jvm.Object, [_get_java_expression(t) for t in
tail])
+ return _binary_op("least", head, tail)
+
+
Review comment:
ditto, python expression can also call SQL builtin function.
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
##########
@@ -754,6 +754,18 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext,
nullableInput: Boolean)
case MAP_VALUE_CONSTRUCTOR =>
generateMap(ctx, resultType, operands)
+ case GREATEST =>
+ operands.foreach { operand =>
+ requireComparable(operand)
+ }
+ generateGreatestLeast(resultType, operands)
+
+ case LEAST =>
+ operands.foreach { operand =>
+ requireComparable(operand)
+ }
+ generateGreatestLeast(resultType, operands, greatest = false)
+
Review comment:
we could only support in built-in functions in SQL
##########
File path:
flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
##########
@@ -708,6 +708,26 @@ trait ImplicitExpressionConversions {
Expressions.ifThenElse(condition, ifTrue, ifFalse)
}
+ /**
+ * Returns the greatest value from the input if input consists from non null
values.
+ * Returns null if there is at least one null value in the input.
+ *
+ * e.g. greatest(1, 2, 3) leads to 3
+ */
+ def greatest(head: Expression, tail: Expression): Expression = {
+ Expressions.greatest(head, tail)
+ }
+
+ /**
+ * Returns the least value from the input if input consists from non null
values.
+ * Returns null if there is at least one null value in the input.
+ *
+ * e.g. least(1, 2, 3) leads to 1
+ */
+ def least(head: Expression, tail: Expression): Expression = {
+ Expressions.least(head, tail)
+ }
+
Review comment:
diito
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
##########
@@ -1756,6 +1756,56 @@ object ScalarOperatorGens {
}
}
+ /**
+ * Return null when at least one of the elements is null.
+ */
+ def generateGreatestLeast(
+ resultType: LogicalType,
+ elements: Seq[GeneratedExpression],
+ greatest: Boolean = true)
+ : GeneratedExpression = {
+ val Seq(result, cur, nullTerm) = newNames("result", "cur", "nullTerm")
+ val widerType = toScala(findCommonType(elements.map(element =>
element.resultType)))
+ .orElse(throw new CodeGenException(s"Unable to find common type for
$elements."))
+ val resultTypeTerm = boxedTypeTermForType(widerType.get)
+
+ def castIfNumeric(t: GeneratedExpression): String = {
+ if (TypeCheckUtils.isNumeric(widerType.get)) {
+ s"${numericCasting(t.resultType, widerType.get).apply(t.resultTerm)}"
+ } else {
+ s"${t.resultTerm}"
+ }
+ }
+
+ val elementsCode = elements.zipWithIndex.map { case (element, idx) =>
+ s"""
+ | ${element.code}
+ | if (!$nullTerm) {
+ | $resultTypeTerm $cur = ${castIfNumeric(element)};
+ | if (${element.nullTerm}) {
+ | $nullTerm = true;
+ | } else {
+ | int compareResult = $result.compareTo($cur);
+ | if ($greatest && compareResult < 0 || compareResult > 0 &&
!$greatest) {
+ | $result = $cur;
+ | }
+ | }
+ | }
+ """.stripMargin
+ }.mkString("\n")
Review comment:
Looks we didn't use `idx` and generated code for each input element,
could we optimize to one loop?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]