This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 86ebb26 [FLINK-12711][table] Separate function implementation and
definition
86ebb26 is described below
commit 86ebb26f924ee114d0bda0a3665c5efc512bd932
Author: Timo Walther <[email protected]>
AuthorDate: Fri Jun 7 16:34:40 2019 +0200
[FLINK-12711][table] Separate function implementation and definition
This closes #8661.
---
.../flink/table/catalog/FunctionCatalog.java | 36 +-
.../operations/OperationExpressionsUtils.java | 6 +-
.../flink/table/expressions/CallExpression.java | 2 +-
.../flink/table/expressions/ExpressionUtils.java | 8 +-
.../flink/table/functions/AggregateFunction.java | 24 +
.../functions/AggregateFunctionDefinition.java | 53 +-
.../flink/table/functions/AsyncTableFunction.java | 5 +
.../table/functions/BuiltInFunctionDefinition.java | 89 ++
.../functions/BuiltInFunctionDefinitions.java | 957 +++++++++++++++------
.../flink/table/functions/FunctionDefinition.java | 72 +-
.../flink/table/functions/FunctionKind.java} | 19 +-
.../table/functions/FunctionRequirement.java} | 14 +-
.../flink/table/functions/ScalarFunction.java | 5 +
.../table/functions/ScalarFunctionDefinition.java | 47 +-
.../table/functions/TableAggregateFunction.java | 5 +
....java => TableAggregateFunctionDefinition.java} | 57 +-
.../flink/table/functions/TableFunction.java | 5 +
.../table/functions/TableFunctionDefinition.java | 47 +-
.../flink/table/functions/UserDefinedFunction.java | 28 +-
.../flink/table/expressions/RexNodeConverter.java | 6 +-
.../functions/InternalFunctionDefinitions.java | 9 +-
.../aggfunctions/DeclarativeAggregateFunction.java | 6 +
.../functions/utils/UserDefinedFunctionUtils.scala | 24 +-
.../rules/ExpandColumnFunctionsRule.java | 13 +-
.../operations/AggregateOperationFactory.java | 71 +-
.../table/operations/CalculatedTableFactory.java | 8 +-
.../flink/table/plan/QueryOperationConverter.java | 12 +-
.../flink/table/api/scala/expressionDsl.scala | 11 +-
.../expressions/PlannerExpressionConverter.scala | 9 +-
.../table/functions/utils/AggSqlFunction.scala | 5 +-
.../table/operations/OperationTreeBuilder.scala | 15 +-
.../api/stream/table/ColumnFunctionsTest.scala | 3 +-
32 files changed, 1198 insertions(+), 473 deletions(-)
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index 0297a5f..beeb37f 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -20,11 +20,15 @@ package org.apache.flink.table.catalog;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.AggregateFunctionDefinition;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.ScalarFunctionDefinition;
+import org.apache.flink.table.functions.TableAggregateFunction;
+import org.apache.flink.table.functions.TableAggregateFunctionDefinition;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.TableFunctionDefinition;
import org.apache.flink.table.functions.UserDefinedAggregateFunction;
@@ -33,6 +37,7 @@ import
org.apache.flink.table.functions.UserFunctionsTypeHelper;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
+import java.util.function.Function;
/**
* Simple function catalog to store {@link FunctionDefinition}s in memory.
@@ -70,7 +75,10 @@ public class FunctionCatalog implements FunctionLookup {
registerFunction(
name,
- new TableFunctionDefinition(name, function, resultType)
+ new TableFunctionDefinition(
+ name,
+ function,
+ resultType)
);
}
@@ -84,14 +92,33 @@ public class FunctionCatalog implements FunctionLookup {
// check if class could be instantiated
UserFunctionsTypeHelper.validateInstantiation(function.getClass());
+ final FunctionDefinition definition;
+ if (function instanceof AggregateFunction) {
+ definition = new AggregateFunctionDefinition(
+ name,
+ (AggregateFunction<?, ?>) function,
+ resultType,
+ accType);
+ } else if (function instanceof TableAggregateFunction) {
+ definition = new TableAggregateFunctionDefinition(
+ name,
+ (TableAggregateFunction<?, ?>) function,
+ resultType,
+ accType);
+ } else {
+ throw new TableException("Unknown function class: " +
function.getClass());
+ }
+
registerFunction(
name,
- new AggregateFunctionDefinition(name, function,
resultType, accType)
+ definition
);
}
public String[] getUserDefinedFunctions() {
- return
userFunctions.values().stream().map(FunctionDefinition::getName).toArray(String[]::new);
+ return userFunctions.values().stream()
+ .map(FunctionDefinition::toString)
+ .toArray(String[]::new);
}
@Override
@@ -104,7 +131,8 @@ public class FunctionCatalog implements FunctionLookup {
foundDefinition =
BuiltInFunctionDefinitions.getDefinitions()
.stream()
.filter(f ->
normalizeName(name).equals(normalizeName(f.getName())))
- .findFirst();
+ .findFirst()
+ .map(Function.identity());
}
return foundDefinition.map(definition -> new
FunctionLookup.Result(
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationExpressionsUtils.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationExpressionsUtils.java
index 332aa71..55574cd 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationExpressionsUtils.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationExpressionsUtils.java
@@ -39,10 +39,10 @@ import static
org.apache.flink.table.expressions.ApiExpressionUtils.call;
import static
org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
import static
org.apache.flink.table.expressions.ApiExpressionUtils.valueLiteral;
import static org.apache.flink.table.expressions.ExpressionUtils.extractValue;
-import static
org.apache.flink.table.expressions.ExpressionUtils.isFunctionOfType;
+import static
org.apache.flink.table.expressions.ExpressionUtils.isFunctionOfKind;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AS;
import static
org.apache.flink.table.functions.BuiltInFunctionDefinitions.WINDOW_PROPERTIES;
-import static
org.apache.flink.table.functions.FunctionDefinition.Type.AGGREGATE_FUNCTION;
+import static org.apache.flink.table.functions.FunctionKind.AGGREGATE;
/**
* Utility methods for transforming {@link Expression} to use them in {@link
QueryOperation}s.
@@ -158,7 +158,7 @@ public class OperationExpressionsUtils {
@Override
public Void visitCall(CallExpression call) {
FunctionDefinition functionDefinition =
call.getFunctionDefinition();
- if (isFunctionOfType(call, AGGREGATE_FUNCTION)) {
+ if (isFunctionOfKind(call, AGGREGATE)) {
aggregates.computeIfAbsent(call, expr ->
"EXPR$" + uniqueId++);
} else if
(WINDOW_PROPERTIES.contains(functionDefinition)) {
properties.computeIfAbsent(call, expr ->
"EXPR$" + uniqueId++);
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/CallExpression.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/CallExpression.java
index 396867d..acaa5e3 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/CallExpression.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/CallExpression.java
@@ -80,6 +80,6 @@ public final class CallExpression implements Expression {
@Override
public String toString() {
final List<String> argList =
args.stream().map(Object::toString).collect(Collectors.toList());
- return functionDefinition.getName() + "(" + String.join(", ",
argList) + ")";
+ return functionDefinition.toString() + "(" + String.join(", ",
argList) + ")";
}
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ExpressionUtils.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ExpressionUtils.java
index 29ef3b6..2446da7 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ExpressionUtils.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ExpressionUtils.java
@@ -19,7 +19,7 @@
package org.apache.flink.table.expressions;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.FunctionKind;
import java.util.Optional;
@@ -50,12 +50,12 @@ public final class ExpressionUtils {
* Checks if the expression is a function call of given type.
*
* @param expr expression to check
- * @param type expected type of function
+ * @param kind expected type of function
* @return true if the expression is function call of given type, false
otherwise
*/
- public static boolean isFunctionOfType(Expression expr,
FunctionDefinition.Type type) {
+ public static boolean isFunctionOfKind(Expression expr, FunctionKind
kind) {
return expr instanceof CallExpression &&
- ((CallExpression)
expr).getFunctionDefinition().getType() == type;
+ ((CallExpression)
expr).getFunctionDefinition().getKind() == kind;
}
private ExpressionUtils() {
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunction.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunction.java
index 2399fa75..94b96f3 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunction.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunction.java
@@ -20,6 +20,10 @@ package org.apache.flink.table.functions;
import org.apache.flink.annotation.PublicEvolving;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
/**
* Base class for user-defined aggregates.
*
@@ -97,6 +101,9 @@ import org.apache.flink.annotation.PublicEvolving;
* }
* </pre>
*
+ * <p>If this aggregate function can only be applied in an OVER window, this
can be declared using the
+ * requirement {@link FunctionRequirement#OVER_WINDOW_ONLY} in {@link
#getRequirements()}.
+ *
* @param <T> the type of the aggregation result
* @param <ACC> the type of the aggregation accumulator. The accumulator is
used to keep the
* aggregated values which are needed to compute an aggregation
result.
@@ -124,8 +131,25 @@ public abstract class AggregateFunction<T, ACC> extends
UserDefinedAggregateFunc
*
* @return <code>true</code> if the {@link AggregateFunction} requires
an OVER window,
* <code>false</code> otherwise.
+ *
+ * @deprecated Use {@link #getRequirements()} instead.
*/
+ @Deprecated
public boolean requiresOver() {
return false;
}
+
+ @Override
+ public final FunctionKind getKind() {
+ return FunctionKind.AGGREGATE;
+ }
+
+ @Override
+ public Set<FunctionRequirement> getRequirements() {
+ final HashSet<FunctionRequirement> requirements = new
HashSet<>();
+ if (requiresOver()) {
+ requirements.add(FunctionRequirement.OVER_WINDOW_ONLY);
+ }
+ return Collections.unmodifiableSet(requirements);
+ }
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunctionDefinition.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunctionDefinition.java
index a6440f8..b06c55a 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunctionDefinition.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunctionDefinition.java
@@ -22,30 +22,34 @@ import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Preconditions;
-import static
org.apache.flink.table.functions.FunctionDefinition.Type.AGGREGATE_FUNCTION;
+import java.util.Objects;
+import java.util.Set;
/**
* The function definition of an user-defined aggregate function.
+ *
+ * <p>This class can be dropped once we introduce a new type inference.
*/
@PublicEvolving
-public final class AggregateFunctionDefinition extends FunctionDefinition {
+public final class AggregateFunctionDefinition implements FunctionDefinition {
- private final UserDefinedAggregateFunction<?, ?> aggregateFunction;
+ private final String name;
+ private final AggregateFunction<?, ?> aggregateFunction;
private final TypeInformation<?> resultTypeInfo;
private final TypeInformation<?> accumulatorTypeInfo;
public AggregateFunctionDefinition(
String name,
- UserDefinedAggregateFunction<?, ?> aggregateFunction,
+ AggregateFunction<?, ?> aggregateFunction,
TypeInformation<?> resultTypeInfo,
TypeInformation<?> accTypeInfo) {
- super(name, AGGREGATE_FUNCTION);
+ this.name = Preconditions.checkNotNull(name);
this.aggregateFunction =
Preconditions.checkNotNull(aggregateFunction);
this.resultTypeInfo =
Preconditions.checkNotNull(resultTypeInfo);
this.accumulatorTypeInfo =
Preconditions.checkNotNull(accTypeInfo);
}
- public UserDefinedAggregateFunction<?, ?> getAggregateFunction() {
+ public AggregateFunction<?, ?> getAggregateFunction() {
return aggregateFunction;
}
@@ -56,4 +60,41 @@ public final class AggregateFunctionDefinition extends
FunctionDefinition {
public TypeInformation<?> getAccumulatorTypeInfo() {
return accumulatorTypeInfo;
}
+
+ @Override
+ public FunctionKind getKind() {
+ return FunctionKind.AGGREGATE;
+ }
+
+ @Override
+ public Set<FunctionRequirement> getRequirements() {
+ return aggregateFunction.getRequirements();
+ }
+
+ @Override
+ public boolean isDeterministic() {
+ return aggregateFunction.isDeterministic();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ AggregateFunctionDefinition that =
(AggregateFunctionDefinition) o;
+ return name.equals(that.name);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name);
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AsyncTableFunction.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AsyncTableFunction.java
index ddb1e61..3a8c879 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AsyncTableFunction.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AsyncTableFunction.java
@@ -101,4 +101,9 @@ public abstract class AsyncTableFunction<T> extends
UserDefinedFunction {
public TypeInformation<T> getResultType() {
return null;
}
+
+ @Override
+ public final FunctionKind getKind() {
+ return FunctionKind.ASYNC_TABLE;
+ }
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java
new file mode 100644
index 0000000..7b96de6
--- /dev/null
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Definition of a built-in function. It enables unique identification across
different
+ * modules by reference equality.
+ *
+ * <p>Compared to regular {@link FunctionDefinition}, built-in functions have
a default name.
+ *
+ * <p>Equality is defined by reference equality.
+ */
+@Internal
+public final class BuiltInFunctionDefinition implements FunctionDefinition {
+
+ private final String name;
+
+ private final FunctionKind kind;
+
+ private BuiltInFunctionDefinition(
+ String name,
+ FunctionKind kind) {
+ this.name = Preconditions.checkNotNull(name, "Name must not be
null.");
+ this.kind = Preconditions.checkNotNull(kind, "Kind must not be
null.");
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public FunctionKind getKind() {
+ return kind;
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+
+ //
--------------------------------------------------------------------------------------------
+
+ /**
+ * Builder for fluent definition of built-in functions.
+ */
+ public static class Builder {
+
+ private String name;
+
+ private FunctionKind kind;
+
+ public Builder() {
+ // default constructor to allow a fluent definition
+ }
+
+ public Builder name(String name) {
+ this.name = name;
+ return this;
+ }
+
+ public Builder kind(FunctionKind kind) {
+ this.kind = kind;
+ return this;
+ }
+
+ public BuiltInFunctionDefinition build() {
+ return new BuiltInFunctionDefinition(name, kind);
+ }
+ }
+}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index 6e4bc0f..fd02e46 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -29,9 +29,9 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import static
org.apache.flink.table.functions.FunctionDefinition.Type.AGGREGATE_FUNCTION;
-import static
org.apache.flink.table.functions.FunctionDefinition.Type.OTHER_FUNCTION;
-import static
org.apache.flink.table.functions.FunctionDefinition.Type.SCALAR_FUNCTION;
+import static org.apache.flink.table.functions.FunctionKind.AGGREGATE;
+import static org.apache.flink.table.functions.FunctionKind.OTHER;
+import static org.apache.flink.table.functions.FunctionKind.SCALAR;
/**
* Dictionary of function definitions for all built-in functions.
@@ -40,304 +40,709 @@ import static
org.apache.flink.table.functions.FunctionDefinition.Type.SCALAR_FU
public final class BuiltInFunctionDefinitions {
// logic functions
- public static final FunctionDefinition AND =
- new FunctionDefinition("and", SCALAR_FUNCTION);
- public static final FunctionDefinition OR =
- new FunctionDefinition("or", SCALAR_FUNCTION);
- public static final FunctionDefinition NOT =
- new FunctionDefinition("not", SCALAR_FUNCTION);
- public static final FunctionDefinition IF =
- new FunctionDefinition("ifThenElse", SCALAR_FUNCTION);
+ public static final BuiltInFunctionDefinition AND =
+ new BuiltInFunctionDefinition.Builder()
+ .name("and")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition OR =
+ new BuiltInFunctionDefinition.Builder()
+ .name("or")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition NOT =
+ new BuiltInFunctionDefinition.Builder()
+ .name("not")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition IF =
+ new BuiltInFunctionDefinition.Builder()
+ .name("ifThenElse")
+ .kind(SCALAR)
+ .build();
// comparison functions
- public static final FunctionDefinition EQUALS =
- new FunctionDefinition("equals", SCALAR_FUNCTION);
- public static final FunctionDefinition GREATER_THAN =
- new FunctionDefinition("greaterThan", SCALAR_FUNCTION);
- public static final FunctionDefinition GREATER_THAN_OR_EQUAL =
- new FunctionDefinition("greaterThanOrEqual", SCALAR_FUNCTION);
- public static final FunctionDefinition LESS_THAN =
- new FunctionDefinition("lessThan", SCALAR_FUNCTION);
- public static final FunctionDefinition LESS_THAN_OR_EQUAL =
- new FunctionDefinition("lessThanOrEqual", SCALAR_FUNCTION);
- public static final FunctionDefinition NOT_EQUALS =
- new FunctionDefinition("notEquals", SCALAR_FUNCTION);
- public static final FunctionDefinition IS_NULL =
- new FunctionDefinition("isNull", SCALAR_FUNCTION);
- public static final FunctionDefinition IS_NOT_NULL =
- new FunctionDefinition("isNotNull", SCALAR_FUNCTION);
- public static final FunctionDefinition IS_TRUE =
- new FunctionDefinition("isTrue", SCALAR_FUNCTION);
- public static final FunctionDefinition IS_FALSE =
- new FunctionDefinition("isFalse", SCALAR_FUNCTION);
- public static final FunctionDefinition IS_NOT_TRUE =
- new FunctionDefinition("isNotTrue", SCALAR_FUNCTION);
- public static final FunctionDefinition IS_NOT_FALSE =
- new FunctionDefinition("isNotFalse", SCALAR_FUNCTION);
- public static final FunctionDefinition BETWEEN =
- new FunctionDefinition("between", SCALAR_FUNCTION);
- public static final FunctionDefinition NOT_BETWEEN =
- new FunctionDefinition("notBetween", SCALAR_FUNCTION);
+ public static final BuiltInFunctionDefinition EQUALS =
+ new BuiltInFunctionDefinition.Builder()
+ .name("equals")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition GREATER_THAN =
+ new BuiltInFunctionDefinition.Builder()
+ .name("greaterThan")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition GREATER_THAN_OR_EQUAL =
+ new BuiltInFunctionDefinition.Builder()
+ .name("greaterThanOrEqual")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition LESS_THAN =
+ new BuiltInFunctionDefinition.Builder()
+ .name("lessThan")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition LESS_THAN_OR_EQUAL =
+ new BuiltInFunctionDefinition.Builder()
+ .name("lessThanOrEqual")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition NOT_EQUALS =
+ new BuiltInFunctionDefinition.Builder()
+ .name("notEquals")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition IS_NULL =
+ new BuiltInFunctionDefinition.Builder()
+ .name("isNull")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition IS_NOT_NULL =
+ new BuiltInFunctionDefinition.Builder()
+ .name("isNotNull")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition IS_TRUE =
+ new BuiltInFunctionDefinition.Builder()
+ .name("isTrue")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition IS_FALSE =
+ new BuiltInFunctionDefinition.Builder()
+ .name("isFalse")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition IS_NOT_TRUE =
+ new BuiltInFunctionDefinition.Builder()
+ .name("isNotTrue")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition IS_NOT_FALSE =
+ new BuiltInFunctionDefinition.Builder()
+ .name("isNotFalse")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition BETWEEN =
+ new BuiltInFunctionDefinition.Builder()
+ .name("between")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition NOT_BETWEEN =
+ new BuiltInFunctionDefinition.Builder()
+ .name("notBetween")
+ .kind(SCALAR)
+ .build();
// aggregate functions
- public static final FunctionDefinition AVG =
- new FunctionDefinition("avg", AGGREGATE_FUNCTION);
- public static final FunctionDefinition COUNT =
- new FunctionDefinition("count", AGGREGATE_FUNCTION);
- public static final FunctionDefinition MAX =
- new FunctionDefinition("max", AGGREGATE_FUNCTION);
- public static final FunctionDefinition MIN =
- new FunctionDefinition("min", AGGREGATE_FUNCTION);
- public static final FunctionDefinition SUM =
- new FunctionDefinition("sum", AGGREGATE_FUNCTION);
- public static final FunctionDefinition SUM0 =
- new FunctionDefinition("sum0", AGGREGATE_FUNCTION);
- public static final FunctionDefinition STDDEV_POP =
- new FunctionDefinition("stddevPop", AGGREGATE_FUNCTION);
- public static final FunctionDefinition STDDEV_SAMP =
- new FunctionDefinition("stddevSamp", AGGREGATE_FUNCTION);
- public static final FunctionDefinition VAR_POP =
- new FunctionDefinition("varPop", AGGREGATE_FUNCTION);
- public static final FunctionDefinition VAR_SAMP =
- new FunctionDefinition("varSamp", AGGREGATE_FUNCTION);
- public static final FunctionDefinition COLLECT =
- new FunctionDefinition("collect", AGGREGATE_FUNCTION);
- public static final FunctionDefinition DISTINCT =
- new FunctionDefinition("distinct", AGGREGATE_FUNCTION);
+ public static final BuiltInFunctionDefinition AVG =
+ new BuiltInFunctionDefinition.Builder()
+ .name("avg")
+ .kind(AGGREGATE)
+ .build();
+ public static final BuiltInFunctionDefinition COUNT =
+ new BuiltInFunctionDefinition.Builder()
+ .name("count")
+ .kind(AGGREGATE)
+ .build();
+ public static final BuiltInFunctionDefinition MAX =
+ new BuiltInFunctionDefinition.Builder()
+ .name("max")
+ .kind(AGGREGATE)
+ .build();
+ public static final BuiltInFunctionDefinition MIN =
+ new BuiltInFunctionDefinition.Builder()
+ .name("min")
+ .kind(AGGREGATE)
+ .build();
+ public static final BuiltInFunctionDefinition SUM =
+ new BuiltInFunctionDefinition.Builder()
+ .name("sum")
+ .kind(AGGREGATE)
+ .build();
+ public static final BuiltInFunctionDefinition SUM0 =
+ new BuiltInFunctionDefinition.Builder()
+ .name("sum0")
+ .kind(AGGREGATE)
+ .build();
+ public static final BuiltInFunctionDefinition STDDEV_POP =
+ new BuiltInFunctionDefinition.Builder()
+ .name("stddevPop")
+ .kind(AGGREGATE)
+ .build();
+ public static final BuiltInFunctionDefinition STDDEV_SAMP =
+ new BuiltInFunctionDefinition.Builder()
+ .name("stddevSamp")
+ .kind(AGGREGATE)
+ .build();
+ public static final BuiltInFunctionDefinition VAR_POP =
+ new BuiltInFunctionDefinition.Builder()
+ .name("varPop")
+ .kind(AGGREGATE)
+ .build();
+ public static final BuiltInFunctionDefinition VAR_SAMP =
+ new BuiltInFunctionDefinition.Builder()
+ .name("varSamp")
+ .kind(AGGREGATE)
+ .build();
+ public static final BuiltInFunctionDefinition COLLECT =
+ new BuiltInFunctionDefinition.Builder()
+ .name("collect")
+ .kind(AGGREGATE)
+ .build();
+ public static final BuiltInFunctionDefinition DISTINCT =
+ new BuiltInFunctionDefinition.Builder()
+ .name("distinct")
+ .kind(AGGREGATE)
+ .build();
// string functions
- public static final FunctionDefinition CHAR_LENGTH =
- new FunctionDefinition("charLength", SCALAR_FUNCTION);
- public static final FunctionDefinition INIT_CAP =
- new FunctionDefinition("initCap", SCALAR_FUNCTION);
- public static final FunctionDefinition LIKE =
- new FunctionDefinition("like", SCALAR_FUNCTION);
- public static final FunctionDefinition LOWER =
- new FunctionDefinition("lowerCase", SCALAR_FUNCTION);
- public static final FunctionDefinition SIMILAR =
- new FunctionDefinition("similar", SCALAR_FUNCTION);
- public static final FunctionDefinition SUBSTRING =
- new FunctionDefinition("substring", SCALAR_FUNCTION);
- public static final FunctionDefinition REPLACE =
- new FunctionDefinition("replace", SCALAR_FUNCTION);
- public static final FunctionDefinition TRIM =
- new FunctionDefinition("trim", SCALAR_FUNCTION);
- public static final FunctionDefinition UPPER =
- new FunctionDefinition("upperCase", SCALAR_FUNCTION);
- public static final FunctionDefinition POSITION =
- new FunctionDefinition("position", SCALAR_FUNCTION);
- public static final FunctionDefinition OVERLAY =
- new FunctionDefinition("overlay", SCALAR_FUNCTION);
- public static final FunctionDefinition CONCAT =
- new FunctionDefinition("concat", SCALAR_FUNCTION);
- public static final FunctionDefinition CONCAT_WS =
- new FunctionDefinition("concat_ws", SCALAR_FUNCTION);
- public static final FunctionDefinition LPAD =
- new FunctionDefinition("lpad", SCALAR_FUNCTION);
- public static final FunctionDefinition RPAD =
- new FunctionDefinition("rpad", SCALAR_FUNCTION);
- public static final FunctionDefinition REGEXP_EXTRACT =
- new FunctionDefinition("regexpExtract", SCALAR_FUNCTION);
- public static final FunctionDefinition FROM_BASE64 =
- new FunctionDefinition("fromBase64", SCALAR_FUNCTION);
- public static final FunctionDefinition TO_BASE64 =
- new FunctionDefinition("toBase64", SCALAR_FUNCTION);
- public static final FunctionDefinition UUID =
- new FunctionDefinition("uuid", SCALAR_FUNCTION);
- public static final FunctionDefinition LTRIM =
- new FunctionDefinition("ltrim", SCALAR_FUNCTION);
- public static final FunctionDefinition RTRIM =
- new FunctionDefinition("rtrim", SCALAR_FUNCTION);
- public static final FunctionDefinition REPEAT =
- new FunctionDefinition("repeat", SCALAR_FUNCTION);
- public static final FunctionDefinition REGEXP_REPLACE =
- new FunctionDefinition("regexpReplace", SCALAR_FUNCTION);
+ public static final BuiltInFunctionDefinition CHAR_LENGTH =
+ new BuiltInFunctionDefinition.Builder()
+ .name("charLength")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition INIT_CAP =
+ new BuiltInFunctionDefinition.Builder()
+ .name("initCap")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition LIKE =
+ new BuiltInFunctionDefinition.Builder()
+ .name("like")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition LOWER =
+ new BuiltInFunctionDefinition.Builder()
+ .name("lowerCase")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition SIMILAR =
+ new BuiltInFunctionDefinition.Builder()
+ .name("similar")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition SUBSTRING =
+ new BuiltInFunctionDefinition.Builder()
+ .name("substring")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition REPLACE =
+ new BuiltInFunctionDefinition.Builder()
+ .name("replace")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition TRIM =
+ new BuiltInFunctionDefinition.Builder()
+ .name("trim")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition UPPER =
+ new BuiltInFunctionDefinition.Builder()
+ .name("upperCase")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition POSITION =
+ new BuiltInFunctionDefinition.Builder()
+ .name("position")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition OVERLAY =
+ new BuiltInFunctionDefinition.Builder()
+ .name("overlay")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition CONCAT =
+ new BuiltInFunctionDefinition.Builder()
+ .name("concat")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition CONCAT_WS =
+ new BuiltInFunctionDefinition.Builder()
+ .name("concat_ws")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition LPAD =
+ new BuiltInFunctionDefinition.Builder()
+ .name("lpad")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition RPAD =
+ new BuiltInFunctionDefinition.Builder()
+ .name("rpad")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition REGEXP_EXTRACT =
+ new BuiltInFunctionDefinition.Builder()
+ .name("regexpExtract")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition FROM_BASE64 =
+ new BuiltInFunctionDefinition.Builder()
+ .name("fromBase64")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition TO_BASE64 =
+ new BuiltInFunctionDefinition.Builder()
+ .name("toBase64")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition UUID =
+ new BuiltInFunctionDefinition.Builder()
+ .name("uuid")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition LTRIM =
+ new BuiltInFunctionDefinition.Builder()
+ .name("ltrim")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition RTRIM =
+ new BuiltInFunctionDefinition.Builder()
+ .name("rtrim")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition REPEAT =
+ new BuiltInFunctionDefinition.Builder()
+ .name("repeat")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition REGEXP_REPLACE =
+ new BuiltInFunctionDefinition.Builder()
+ .name("regexpReplace")
+ .kind(SCALAR)
+ .build();
// math functions
- public static final FunctionDefinition PLUS =
- new FunctionDefinition("plus", SCALAR_FUNCTION);
- public static final FunctionDefinition MINUS =
- new FunctionDefinition("minus", SCALAR_FUNCTION);
- public static final FunctionDefinition DIVIDE =
- new FunctionDefinition("divide", SCALAR_FUNCTION);
- public static final FunctionDefinition TIMES =
- new FunctionDefinition("times", SCALAR_FUNCTION);
- public static final FunctionDefinition ABS =
- new FunctionDefinition("abs", SCALAR_FUNCTION);
- public static final FunctionDefinition CEIL =
- new FunctionDefinition("ceil", SCALAR_FUNCTION);
- public static final FunctionDefinition EXP =
- new FunctionDefinition("exp", SCALAR_FUNCTION);
- public static final FunctionDefinition FLOOR =
- new FunctionDefinition("floor", SCALAR_FUNCTION);
- public static final FunctionDefinition LOG10 =
- new FunctionDefinition("log10", SCALAR_FUNCTION);
- public static final FunctionDefinition LOG2 =
- new FunctionDefinition("log2", SCALAR_FUNCTION);
- public static final FunctionDefinition LN =
- new FunctionDefinition("ln", SCALAR_FUNCTION);
- public static final FunctionDefinition LOG =
- new FunctionDefinition("log", SCALAR_FUNCTION);
- public static final FunctionDefinition POWER =
- new FunctionDefinition("power", SCALAR_FUNCTION);
- public static final FunctionDefinition MOD =
- new FunctionDefinition("mod", SCALAR_FUNCTION);
- public static final FunctionDefinition SQRT =
- new FunctionDefinition("sqrt", SCALAR_FUNCTION);
- public static final FunctionDefinition MINUS_PREFIX =
- new FunctionDefinition("minusPrefix", SCALAR_FUNCTION);
- public static final FunctionDefinition SIN =
- new FunctionDefinition("sin", SCALAR_FUNCTION);
- public static final FunctionDefinition COS =
- new FunctionDefinition("cos", SCALAR_FUNCTION);
- public static final FunctionDefinition SINH =
- new FunctionDefinition("sinh", SCALAR_FUNCTION);
- public static final FunctionDefinition TAN =
- new FunctionDefinition("tan", SCALAR_FUNCTION);
- public static final FunctionDefinition TANH =
- new FunctionDefinition("tanh", SCALAR_FUNCTION);
- public static final FunctionDefinition COT =
- new FunctionDefinition("cot", SCALAR_FUNCTION);
- public static final FunctionDefinition ASIN =
- new FunctionDefinition("asin", SCALAR_FUNCTION);
- public static final FunctionDefinition ACOS =
- new FunctionDefinition("acos", SCALAR_FUNCTION);
- public static final FunctionDefinition ATAN =
- new FunctionDefinition("atan", SCALAR_FUNCTION);
- public static final FunctionDefinition ATAN2 =
- new FunctionDefinition("atan2", SCALAR_FUNCTION);
- public static final FunctionDefinition COSH =
- new FunctionDefinition("cosh", SCALAR_FUNCTION);
- public static final FunctionDefinition DEGREES =
- new FunctionDefinition("degrees", SCALAR_FUNCTION);
- public static final FunctionDefinition RADIANS =
- new FunctionDefinition("radians", SCALAR_FUNCTION);
- public static final FunctionDefinition SIGN =
- new FunctionDefinition("sign", SCALAR_FUNCTION);
- public static final FunctionDefinition ROUND =
- new FunctionDefinition("round", SCALAR_FUNCTION);
- public static final FunctionDefinition PI =
- new FunctionDefinition("pi", SCALAR_FUNCTION);
- public static final FunctionDefinition E =
- new FunctionDefinition("e", SCALAR_FUNCTION);
- public static final FunctionDefinition RAND =
- new FunctionDefinition("rand", SCALAR_FUNCTION);
- public static final FunctionDefinition RAND_INTEGER =
- new FunctionDefinition("randInteger", SCALAR_FUNCTION);
- public static final FunctionDefinition BIN =
- new FunctionDefinition("bin", SCALAR_FUNCTION);
- public static final FunctionDefinition HEX =
- new FunctionDefinition("hex", SCALAR_FUNCTION);
- public static final FunctionDefinition TRUNCATE =
- new FunctionDefinition("truncate", SCALAR_FUNCTION);
+ public static final BuiltInFunctionDefinition PLUS =
+ new BuiltInFunctionDefinition.Builder()
+ .name("plus")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition MINUS =
+ new BuiltInFunctionDefinition.Builder()
+ .name("minus")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition DIVIDE =
+ new BuiltInFunctionDefinition.Builder()
+ .name("divide")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition TIMES =
+ new BuiltInFunctionDefinition.Builder()
+ .name("times")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition ABS =
+ new BuiltInFunctionDefinition.Builder()
+ .name("abs")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition CEIL =
+ new BuiltInFunctionDefinition.Builder()
+ .name("ceil")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition EXP =
+ new BuiltInFunctionDefinition.Builder()
+ .name("exp")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition FLOOR =
+ new BuiltInFunctionDefinition.Builder()
+ .name("floor")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition LOG10 =
+ new BuiltInFunctionDefinition.Builder()
+ .name("log10")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition LOG2 =
+ new BuiltInFunctionDefinition.Builder()
+ .name("log2")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition LN =
+ new BuiltInFunctionDefinition.Builder()
+ .name("ln")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition LOG =
+ new BuiltInFunctionDefinition.Builder()
+ .name("log")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition POWER =
+ new BuiltInFunctionDefinition.Builder()
+ .name("power")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition MOD =
+ new BuiltInFunctionDefinition.Builder()
+ .name("mod")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition SQRT =
+ new BuiltInFunctionDefinition.Builder()
+ .name("sqrt")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition MINUS_PREFIX =
+ new BuiltInFunctionDefinition.Builder()
+ .name("minusPrefix")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition SIN =
+ new BuiltInFunctionDefinition.Builder()
+ .name("sin")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition COS =
+ new BuiltInFunctionDefinition.Builder()
+ .name("cos")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition SINH =
+ new BuiltInFunctionDefinition.Builder()
+ .name("sinh")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition TAN =
+ new BuiltInFunctionDefinition.Builder()
+ .name("tan")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition TANH =
+ new BuiltInFunctionDefinition.Builder()
+ .name("tanh")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition COT =
+ new BuiltInFunctionDefinition.Builder()
+ .name("cot")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition ASIN =
+ new BuiltInFunctionDefinition.Builder()
+ .name("asin")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition ACOS =
+ new BuiltInFunctionDefinition.Builder()
+ .name("acos")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition ATAN =
+ new BuiltInFunctionDefinition.Builder()
+ .name("atan")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition ATAN2 =
+ new BuiltInFunctionDefinition.Builder()
+ .name("atan2")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition COSH =
+ new BuiltInFunctionDefinition.Builder()
+ .name("cosh")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition DEGREES =
+ new BuiltInFunctionDefinition.Builder()
+ .name("degrees")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition RADIANS =
+ new BuiltInFunctionDefinition.Builder()
+ .name("radians")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition SIGN =
+ new BuiltInFunctionDefinition.Builder()
+ .name("sign")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition ROUND =
+ new BuiltInFunctionDefinition.Builder()
+ .name("round")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition PI =
+ new BuiltInFunctionDefinition.Builder()
+ .name("pi")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition E =
+ new BuiltInFunctionDefinition.Builder()
+ .name("e")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition RAND =
+ new BuiltInFunctionDefinition.Builder()
+ .name("rand")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition RAND_INTEGER =
+ new BuiltInFunctionDefinition.Builder()
+ .name("randInteger")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition BIN =
+ new BuiltInFunctionDefinition.Builder()
+ .name("bin")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition HEX =
+ new BuiltInFunctionDefinition.Builder()
+ .name("hex")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition TRUNCATE =
+ new BuiltInFunctionDefinition.Builder()
+ .name("truncate")
+ .kind(SCALAR)
+ .build();
// time functions
- public static final FunctionDefinition EXTRACT =
- new FunctionDefinition("extract", SCALAR_FUNCTION);
- public static final FunctionDefinition CURRENT_DATE =
- new FunctionDefinition("currentDate", SCALAR_FUNCTION);
- public static final FunctionDefinition CURRENT_TIME =
- new FunctionDefinition("currentTime", SCALAR_FUNCTION);
- public static final FunctionDefinition CURRENT_TIMESTAMP =
- new FunctionDefinition("currentTimestamp", SCALAR_FUNCTION);
- public static final FunctionDefinition LOCAL_TIME =
- new FunctionDefinition("localTime", SCALAR_FUNCTION);
- public static final FunctionDefinition LOCAL_TIMESTAMP =
- new FunctionDefinition("localTimestamp", SCALAR_FUNCTION);
- public static final FunctionDefinition TEMPORAL_OVERLAPS =
- new FunctionDefinition("temporalOverlaps", SCALAR_FUNCTION);
- public static final FunctionDefinition DATE_TIME_PLUS =
- new FunctionDefinition("dateTimePlus", SCALAR_FUNCTION);
- public static final FunctionDefinition DATE_FORMAT =
- new FunctionDefinition("dateFormat", SCALAR_FUNCTION);
- public static final FunctionDefinition TIMESTAMP_DIFF =
- new FunctionDefinition("timestampDiff", SCALAR_FUNCTION);
+ public static final BuiltInFunctionDefinition EXTRACT =
+ new BuiltInFunctionDefinition.Builder()
+ .name("extract")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition CURRENT_DATE =
+ new BuiltInFunctionDefinition.Builder()
+ .name("currentDate")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition CURRENT_TIME =
+ new BuiltInFunctionDefinition.Builder()
+ .name("currentTime")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition CURRENT_TIMESTAMP =
+ new BuiltInFunctionDefinition.Builder()
+ .name("currentTimestamp")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition LOCAL_TIME =
+ new BuiltInFunctionDefinition.Builder()
+ .name("localTime")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition LOCAL_TIMESTAMP =
+ new BuiltInFunctionDefinition.Builder()
+ .name("localTimestamp")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition TEMPORAL_OVERLAPS =
+ new BuiltInFunctionDefinition.Builder()
+ .name("temporalOverlaps")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition DATE_TIME_PLUS =
+ new BuiltInFunctionDefinition.Builder()
+ .name("dateTimePlus")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition DATE_FORMAT =
+ new BuiltInFunctionDefinition.Builder()
+ .name("dateFormat")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition TIMESTAMP_DIFF =
+ new BuiltInFunctionDefinition.Builder()
+ .name("timestampDiff")
+ .kind(SCALAR)
+ .build();
// collection
- public static final FunctionDefinition AT =
- new FunctionDefinition("at", SCALAR_FUNCTION);
- public static final FunctionDefinition CARDINALITY =
- new FunctionDefinition("cardinality", SCALAR_FUNCTION);
- public static final FunctionDefinition ARRAY =
- new FunctionDefinition("array", SCALAR_FUNCTION);
- public static final FunctionDefinition ARRAY_ELEMENT =
- new FunctionDefinition("element", SCALAR_FUNCTION);
- public static final FunctionDefinition MAP =
- new FunctionDefinition("map", SCALAR_FUNCTION);
- public static final FunctionDefinition ROW =
- new FunctionDefinition("row", SCALAR_FUNCTION);
+ public static final BuiltInFunctionDefinition AT =
+ new BuiltInFunctionDefinition.Builder()
+ .name("at")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition CARDINALITY =
+ new BuiltInFunctionDefinition.Builder()
+ .name("cardinality")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition ARRAY =
+ new BuiltInFunctionDefinition.Builder()
+ .name("array")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition ARRAY_ELEMENT =
+ new BuiltInFunctionDefinition.Builder()
+ .name("element")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition MAP =
+ new BuiltInFunctionDefinition.Builder()
+ .name("map")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition ROW =
+ new BuiltInFunctionDefinition.Builder()
+ .name("row")
+ .kind(SCALAR)
+ .build();
// composite
- public static final FunctionDefinition FLATTEN =
- new FunctionDefinition("flatten", OTHER_FUNCTION);
- public static final FunctionDefinition GET =
- new FunctionDefinition("get", OTHER_FUNCTION);
+ public static final BuiltInFunctionDefinition FLATTEN =
+ new BuiltInFunctionDefinition.Builder()
+ .name("flatten")
+ .kind(OTHER)
+ .build();
+ public static final BuiltInFunctionDefinition GET =
+ new BuiltInFunctionDefinition.Builder()
+ .name("get")
+ .kind(OTHER)
+ .build();
// window properties
- public static final FunctionDefinition WINDOW_START =
- new FunctionDefinition("start", OTHER_FUNCTION);
- public static final FunctionDefinition WINDOW_END =
- new FunctionDefinition("end", OTHER_FUNCTION);
+ public static final BuiltInFunctionDefinition WINDOW_START =
+ new BuiltInFunctionDefinition.Builder()
+ .name("start")
+ .kind(OTHER)
+ .build();
+ public static final BuiltInFunctionDefinition WINDOW_END =
+ new BuiltInFunctionDefinition.Builder()
+ .name("end")
+ .kind(OTHER)
+ .build();
// ordering
- public static final FunctionDefinition ORDER_ASC =
- new FunctionDefinition("asc", OTHER_FUNCTION);
- public static final FunctionDefinition ORDER_DESC =
- new FunctionDefinition("desc", OTHER_FUNCTION);
+ public static final BuiltInFunctionDefinition ORDER_ASC =
+ new BuiltInFunctionDefinition.Builder()
+ .name("asc")
+ .kind(OTHER)
+ .build();
+ public static final BuiltInFunctionDefinition ORDER_DESC =
+ new BuiltInFunctionDefinition.Builder()
+ .name("desc")
+ .kind(OTHER)
+ .build();
// crypto hash
- public static final FunctionDefinition MD5 =
- new FunctionDefinition("md5", SCALAR_FUNCTION);
- public static final FunctionDefinition SHA1 =
- new FunctionDefinition("sha1", SCALAR_FUNCTION);
- public static final FunctionDefinition SHA224 =
- new FunctionDefinition("sha224", SCALAR_FUNCTION);
- public static final FunctionDefinition SHA256 =
- new FunctionDefinition("sha256", SCALAR_FUNCTION);
- public static final FunctionDefinition SHA384 =
- new FunctionDefinition("sha384", SCALAR_FUNCTION);
- public static final FunctionDefinition SHA512 =
- new FunctionDefinition("sha512", SCALAR_FUNCTION);
- public static final FunctionDefinition SHA2 =
- new FunctionDefinition("sha2", SCALAR_FUNCTION);
+ public static final BuiltInFunctionDefinition MD5 =
+ new BuiltInFunctionDefinition.Builder()
+ .name("md5")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition SHA1 =
+ new BuiltInFunctionDefinition.Builder()
+ .name("sha1")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition SHA224 =
+ new BuiltInFunctionDefinition.Builder()
+ .name("sha224")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition SHA256 =
+ new BuiltInFunctionDefinition.Builder()
+ .name("sha256")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition SHA384 =
+ new BuiltInFunctionDefinition.Builder()
+ .name("sha384")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition SHA512 =
+ new BuiltInFunctionDefinition.Builder()
+ .name("sha512")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition SHA2 =
+ new BuiltInFunctionDefinition.Builder()
+ .name("sha2")
+ .kind(SCALAR)
+ .build();
// time attributes
- public static final FunctionDefinition PROCTIME =
- new FunctionDefinition("proctime", OTHER_FUNCTION);
- public static final FunctionDefinition ROWTIME =
- new FunctionDefinition("rowtime", OTHER_FUNCTION);
+ public static final BuiltInFunctionDefinition PROCTIME =
+ new BuiltInFunctionDefinition.Builder()
+ .name("proctime")
+ .kind(OTHER)
+ .build();
+ public static final BuiltInFunctionDefinition ROWTIME =
+ new BuiltInFunctionDefinition.Builder()
+ .name("rowtime")
+ .kind(OTHER)
+ .build();
// over window
- public static final FunctionDefinition OVER =
- new FunctionDefinition("over", OTHER_FUNCTION);
- public static final FunctionDefinition UNBOUNDED_RANGE =
- new FunctionDefinition("unboundedRange", OTHER_FUNCTION);
- public static final FunctionDefinition UNBOUNDED_ROW =
- new FunctionDefinition("unboundedRow", OTHER_FUNCTION);
- public static final FunctionDefinition CURRENT_RANGE =
- new FunctionDefinition("currentRange", OTHER_FUNCTION);
- public static final FunctionDefinition CURRENT_ROW =
- new FunctionDefinition("currentRow", OTHER_FUNCTION);
+ public static final BuiltInFunctionDefinition OVER =
+ new BuiltInFunctionDefinition.Builder()
+ .name("over")
+ .kind(OTHER)
+ .build();
+ public static final BuiltInFunctionDefinition UNBOUNDED_RANGE =
+ new BuiltInFunctionDefinition.Builder()
+ .name("unboundedRange")
+ .kind(OTHER)
+ .build();
+ public static final BuiltInFunctionDefinition UNBOUNDED_ROW =
+ new BuiltInFunctionDefinition.Builder()
+ .name("unboundedRow")
+ .kind(OTHER)
+ .build();
+ public static final BuiltInFunctionDefinition CURRENT_RANGE =
+ new BuiltInFunctionDefinition.Builder()
+ .name("currentRange")
+ .kind(OTHER)
+ .build();
+ public static final BuiltInFunctionDefinition CURRENT_ROW =
+ new BuiltInFunctionDefinition.Builder()
+ .name("currentRow")
+ .kind(OTHER)
+ .build();
// columns
- public static final FunctionDefinition WITH_COLUMNS =
- new FunctionDefinition("withColumns", OTHER_FUNCTION);
- public static final FunctionDefinition WITHOUT_COLUMNS =
- new FunctionDefinition("withoutColumns", OTHER_FUNCTION);
+ public static final BuiltInFunctionDefinition WITH_COLUMNS =
+ new BuiltInFunctionDefinition.Builder()
+ .name("withColumns")
+ .kind(OTHER)
+ .build();
+ public static final BuiltInFunctionDefinition WITHOUT_COLUMNS =
+ new BuiltInFunctionDefinition.Builder()
+ .name("withoutColumns")
+ .kind(OTHER)
+ .build();
// etc
- public static final FunctionDefinition IN =
- new FunctionDefinition("in", SCALAR_FUNCTION);
- public static final FunctionDefinition CAST =
- new FunctionDefinition("cast", SCALAR_FUNCTION);
- public static final FunctionDefinition REINTERPRET_CAST =
- new FunctionDefinition("reinterpretCast",
SCALAR_FUNCTION);
- public static final FunctionDefinition AS =
- new FunctionDefinition("as", OTHER_FUNCTION);
- public static final FunctionDefinition STREAM_RECORD_TIMESTAMP =
- new FunctionDefinition("streamRecordTimestamp", OTHER_FUNCTION);
- public static final FunctionDefinition RANGE_TO =
- new FunctionDefinition("rangeTo", OTHER_FUNCTION);
+ public static final BuiltInFunctionDefinition IN =
+ new BuiltInFunctionDefinition.Builder()
+ .name("in")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition CAST =
+ new BuiltInFunctionDefinition.Builder()
+ .name("cast")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition REINTERPRET_CAST =
+ new BuiltInFunctionDefinition.Builder()
+ .name("reinterpretCast")
+ .kind(SCALAR)
+ .build();
+ public static final BuiltInFunctionDefinition AS =
+ new BuiltInFunctionDefinition.Builder()
+ .name("as")
+ .kind(OTHER)
+ .build();
+ public static final BuiltInFunctionDefinition STREAM_RECORD_TIMESTAMP =
+ new BuiltInFunctionDefinition.Builder()
+ .name("streamRecordTimestamp")
+ .kind(OTHER)
+ .build();
+ public static final BuiltInFunctionDefinition RANGE_TO =
+ new BuiltInFunctionDefinition.Builder()
+ .name("rangeTo")
+ .kind(OTHER)
+ .build();
public static final Set<FunctionDefinition> WINDOW_PROPERTIES = new
HashSet<>(Arrays.asList(
WINDOW_START, WINDOW_END, PROCTIME, ROWTIME
@@ -349,13 +754,13 @@ public final class BuiltInFunctionDefinitions {
public static final List<FunctionDefinition> ORDERING =
Arrays.asList(ORDER_ASC, ORDER_DESC);
- public static List<FunctionDefinition> getDefinitions() {
+ public static List<BuiltInFunctionDefinition> getDefinitions() {
final Field[] fields =
BuiltInFunctionDefinitions.class.getFields();
- final List<FunctionDefinition> list = new
ArrayList<>(fields.length);
+ final List<BuiltInFunctionDefinition> list = new
ArrayList<>(fields.length);
for (Field field : fields) {
if
(FunctionDefinition.class.isAssignableFrom(field.getType())) {
try {
- final FunctionDefinition funcDef =
(FunctionDefinition) field.get(BuiltInFunctionDefinitions.class);
+ final BuiltInFunctionDefinition funcDef
= (BuiltInFunctionDefinition) field.get(BuiltInFunctionDefinitions.class);
list.add(Preconditions.checkNotNull(funcDef));
} catch (IllegalAccessException e) {
throw new TableException(
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionDefinition.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionDefinition.java
index ac5249f..1637928 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionDefinition.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionDefinition.java
@@ -19,61 +19,43 @@
package org.apache.flink.table.functions;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.util.Preconditions;
-import java.util.Objects;
+import java.util.Collections;
+import java.util.Set;
/**
- * Definition of a function for unique identification.
+ * Definition of a function. Instances of this class provide all details
necessary to validate a function
+ * call and perform planning.
+ *
+ * <p>A pure function definition must not contain a runtime implementation.
This can be provided by
+ * the planner at later stages.
+ *
+ * @see UserDefinedFunction
*/
@PublicEvolving
-public class FunctionDefinition {
+public interface FunctionDefinition {
/**
- * Classifies the function definition.
+ * Returns the kind of function this definition describes.
*/
- public enum Type {
- AGGREGATE_FUNCTION,
- SCALAR_FUNCTION,
- TABLE_FUNCTION,
- OTHER_FUNCTION
- }
-
- private final Type type;
- private final String name;
-
- public FunctionDefinition(String name, Type type) {
- this.name = Preconditions.checkNotNull(name);
- this.type = Preconditions.checkNotNull(type);
- }
-
- public Type getType() {
- return type;
- }
+ FunctionKind getKind();
- public String getName() {
- return name;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- FunctionDefinition that = (FunctionDefinition) o;
- return Objects.equals(name, that.name);
- }
-
- @Override
- public int hashCode() {
- return name.hashCode();
+ /**
+ * Returns the set of requirements this definition demands.
+ */
+ default Set<FunctionRequirement> getRequirements() {
+ return Collections.emptySet();
}
- @Override
- public String toString() {
- return name;
+ /**
+ * Returns information about the determinism of the function's results.
+ *
+ * <p>It returns <code>true</code> if and only if a call to this
function is guaranteed to
+ * always return the same result given the same parameters.
<code>true</code> is
+ * assumed by default. If the function is not pure functional like
<code>random(), date(), now(), ...</code>
+ * this method must return <code>false</code>.
+ */
+ default boolean isDeterministic() {
+ return true;
}
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/InternalFunctionDefinitions.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionKind.java
similarity index 72%
copy from
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/InternalFunctionDefinitions.java
copy to
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionKind.java
index 80a3fd4..1a9c2cc 100644
---
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/InternalFunctionDefinitions.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionKind.java
@@ -18,14 +18,23 @@
package org.apache.flink.table.functions;
-import static
org.apache.flink.table.functions.FunctionDefinition.Type.SCALAR_FUNCTION;
+import org.apache.flink.annotation.PublicEvolving;
/**
- * Dictionary of function definitions for all internal used functions.
+ * Categorizes the semantics of a {@link FunctionDefinition}.
*/
-public class InternalFunctionDefinitions {
+@PublicEvolving
+public enum FunctionKind {
- public static final FunctionDefinition THROW_EXCEPTION =
- new FunctionDefinition("throwException", SCALAR_FUNCTION);
+ SCALAR,
+ TABLE,
+
+ ASYNC_TABLE,
+
+ AGGREGATE,
+
+ TABLE_AGGREGATE,
+
+ OTHER
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/InternalFunctionDefinitions.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionRequirement.java
similarity index 72%
copy from
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/InternalFunctionDefinitions.java
copy to
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionRequirement.java
index 80a3fd4..dc7544b 100644
---
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/InternalFunctionDefinitions.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionRequirement.java
@@ -18,14 +18,16 @@
package org.apache.flink.table.functions;
-import static
org.apache.flink.table.functions.FunctionDefinition.Type.SCALAR_FUNCTION;
+import org.apache.flink.annotation.PublicEvolving;
/**
- * Dictionary of function definitions for all internal used functions.
+ * Characteristics that a {@link FunctionDefinition} requires.
*/
-public class InternalFunctionDefinitions {
-
- public static final FunctionDefinition THROW_EXCEPTION =
- new FunctionDefinition("throwException", SCALAR_FUNCTION);
+@PublicEvolving
+public enum FunctionRequirement {
+ /**
+ * Requirement that an aggregate function can only be applied in an
OVER window.
+ */
+ OVER_WINDOW_ONLY
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunction.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunction.java
index 9f7249c..cfb4d14 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunction.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunction.java
@@ -89,4 +89,9 @@ public abstract class ScalarFunction extends
UserDefinedFunction {
}
return types;
}
+
+ @Override
+ public final FunctionKind getKind() {
+ return FunctionKind.SCALAR;
+ }
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunctionDefinition.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunctionDefinition.java
index 3cb14cd..fc365eb 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunctionDefinition.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunctionDefinition.java
@@ -21,22 +21,63 @@ package org.apache.flink.table.functions;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.util.Preconditions;
-import static
org.apache.flink.table.functions.FunctionDefinition.Type.SCALAR_FUNCTION;
+import java.util.Objects;
+import java.util.Set;
/**
* The function definition of an user-defined scalar function.
+ *
+ * <p>This class can be dropped once we introduce a new type inference.
*/
@PublicEvolving
-public final class ScalarFunctionDefinition extends FunctionDefinition {
+public final class ScalarFunctionDefinition implements FunctionDefinition {
+ private final String name;
private final ScalarFunction scalarFunction;
public ScalarFunctionDefinition(String name, ScalarFunction
scalarFunction) {
- super(name, SCALAR_FUNCTION);
+ this.name = Preconditions.checkNotNull(name);
this.scalarFunction =
Preconditions.checkNotNull(scalarFunction);
}
public ScalarFunction getScalarFunction() {
return scalarFunction;
}
+
+ @Override
+ public FunctionKind getKind() {
+ return FunctionKind.SCALAR;
+ }
+
+ @Override
+ public Set<FunctionRequirement> getRequirements() {
+ return scalarFunction.getRequirements();
+ }
+
+ @Override
+ public boolean isDeterministic() {
+ return scalarFunction.isDeterministic();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ScalarFunctionDefinition that = (ScalarFunctionDefinition) o;
+ return name.equals(that.name);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name);
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunction.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunction.java
index c8d3aef..9b39efb 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunction.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunction.java
@@ -124,4 +124,9 @@ public abstract class TableAggregateFunction<T, ACC>
extends UserDefinedAggregat
*/
void retract(T record);
}
+
+ @Override
+ public final FunctionKind getKind() {
+ return FunctionKind.TABLE_AGGREGATE;
+ }
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunctionDefinition.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunctionDefinition.java
similarity index 56%
copy from
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunctionDefinition.java
copy to
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunctionDefinition.java
index a6440f8..03c4e33 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunctionDefinition.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunctionDefinition.java
@@ -22,30 +22,34 @@ import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Preconditions;
-import static
org.apache.flink.table.functions.FunctionDefinition.Type.AGGREGATE_FUNCTION;
+import java.util.Objects;
+import java.util.Set;
/**
- * The function definition of an user-defined aggregate function.
+ * The function definition of an user-defined table aggregate function.
+ *
+ * <p>This class can be dropped once we introduce a new type inference.
*/
@PublicEvolving
-public final class AggregateFunctionDefinition extends FunctionDefinition {
+public final class TableAggregateFunctionDefinition implements
FunctionDefinition {
- private final UserDefinedAggregateFunction<?, ?> aggregateFunction;
+ private final String name;
+ private final TableAggregateFunction<?, ?> aggregateFunction;
private final TypeInformation<?> resultTypeInfo;
private final TypeInformation<?> accumulatorTypeInfo;
- public AggregateFunctionDefinition(
+ public TableAggregateFunctionDefinition(
String name,
- UserDefinedAggregateFunction<?, ?> aggregateFunction,
+ TableAggregateFunction<?, ?> aggregateFunction,
TypeInformation<?> resultTypeInfo,
TypeInformation<?> accTypeInfo) {
- super(name, AGGREGATE_FUNCTION);
+ this.name = Preconditions.checkNotNull(name);
this.aggregateFunction =
Preconditions.checkNotNull(aggregateFunction);
this.resultTypeInfo =
Preconditions.checkNotNull(resultTypeInfo);
this.accumulatorTypeInfo =
Preconditions.checkNotNull(accTypeInfo);
}
- public UserDefinedAggregateFunction<?, ?> getAggregateFunction() {
+ public TableAggregateFunction<?, ?> getTableAggregateFunction() {
return aggregateFunction;
}
@@ -56,4 +60,41 @@ public final class AggregateFunctionDefinition extends
FunctionDefinition {
public TypeInformation<?> getAccumulatorTypeInfo() {
return accumulatorTypeInfo;
}
+
+ @Override
+ public FunctionKind getKind() {
+ return FunctionKind.TABLE_AGGREGATE;
+ }
+
+ @Override
+ public Set<FunctionRequirement> getRequirements() {
+ return aggregateFunction.getRequirements();
+ }
+
+ @Override
+ public boolean isDeterministic() {
+ return aggregateFunction.isDeterministic();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TableAggregateFunctionDefinition that =
(TableAggregateFunctionDefinition) o;
+ return name.equals(that.name);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name);
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunction.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunction.java
index 9310770..562ee63 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunction.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunction.java
@@ -144,4 +144,9 @@ public abstract class TableFunction<T> extends
UserDefinedFunction {
protected final void collect(T row) {
collector.collect(row);
}
+
+ @Override
+ public final FunctionKind getKind() {
+ return FunctionKind.TABLE;
+ }
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunctionDefinition.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunctionDefinition.java
index d6b42c0..0dcf569 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunctionDefinition.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunctionDefinition.java
@@ -22,14 +22,18 @@ import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Preconditions;
-import static
org.apache.flink.table.functions.FunctionDefinition.Type.TABLE_FUNCTION;
+import java.util.Objects;
+import java.util.Set;
/**
* The function definition of an user-defined table function.
+ *
+ * <p>This class can be dropped once we introduce a new type inference.
*/
@PublicEvolving
-public final class TableFunctionDefinition extends FunctionDefinition {
+public final class TableFunctionDefinition implements FunctionDefinition {
+ private final String name;
private final TableFunction<?> tableFunction;
private final TypeInformation<?> resultType;
@@ -37,7 +41,7 @@ public final class TableFunctionDefinition extends
FunctionDefinition {
String name,
TableFunction<?> tableFunction,
TypeInformation<?> resultType) {
- super(name, TABLE_FUNCTION);
+ this.name = Preconditions.checkNotNull(name);
this.tableFunction = Preconditions.checkNotNull(tableFunction);
this.resultType = Preconditions.checkNotNull(resultType);
}
@@ -49,4 +53,41 @@ public final class TableFunctionDefinition extends
FunctionDefinition {
public TypeInformation<?> getResultType() {
return resultType;
}
+
+ @Override
+ public FunctionKind getKind() {
+ return FunctionKind.TABLE;
+ }
+
+ @Override
+ public Set<FunctionRequirement> getRequirements() {
+ return tableFunction.getRequirements();
+ }
+
+ @Override
+ public boolean isDeterministic() {
+ return tableFunction.isDeterministic();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TableFunctionDefinition that = (TableFunctionDefinition) o;
+ return name.equals(that.name);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name);
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunction.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunction.java
index 0a4e600..3565a77 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunction.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunction.java
@@ -24,11 +24,19 @@ import org.apache.flink.table.utils.EncodingUtils;
import java.io.Serializable;
/**
- * Base class for all user-defined functions such as scalar functions, table
functions,
- * or aggregation functions.
+ * Base class for all user-defined functions.
+ *
+ * <p>User-defined functions combine the logical definition of a function for
validation and planning
+ * and contain a corresponding runtime implementation.
+ *
+ * @see ScalarFunction
+ * @see TableFunction
+ * @see AsyncTableFunction
+ * @see AggregateFunction
+ * @see TableAggregateFunction
*/
@PublicEvolving
-public abstract class UserDefinedFunction implements Serializable {
+public abstract class UserDefinedFunction implements FunctionDefinition,
Serializable {
/**
* Returns a unique, serialized representation for this function.
@@ -55,19 +63,7 @@ public abstract class UserDefinedFunction implements
Serializable {
}
/**
- * Returns information about the determinism of the function's results.
- *
- * @return <code>true</code> if and only if a call to this function is
guaranteed to
- * always return the same result given the same parameters.
<code>true</code> is
- * assumed by default. If the function is not pure functional
like
- * <code>random(), date(), now(), ...</code> this method must
return <code>false</code>.
- */
- public boolean isDeterministic() {
- return true;
- }
-
- /**
- * Returns the name of the UDF that is used for plan explain and
logging.
+ * Returns the name of the UDF that is used for plan explanation and
logging.
*/
@Override
public String toString() {
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java
index e67c08a..b299ebb 100644
---
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java
+++
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java
@@ -80,8 +80,8 @@ public class RexNodeConverter implements
ExpressionVisitor<RexNode> {
@Override
public RexNode visitCall(CallExpression call) {
- switch (call.getFunctionDefinition().getType()) {
- case SCALAR_FUNCTION:
+ switch (call.getFunctionDefinition().getKind()) {
+ case SCALAR:
return visitScalarFunc(call);
default: throw new UnsupportedOperationException();
}
@@ -169,7 +169,7 @@ public class RexNodeConverter implements
ExpressionVisitor<RexNode> {
} else if (BuiltInFunctionDefinitions.MOD.equals(def)) {
return relBuilder.call(FlinkSqlOperatorTable.MOD,
child);
} else {
- throw new UnsupportedOperationException(def.getName());
+ throw new UnsupportedOperationException(def.toString());
}
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/InternalFunctionDefinitions.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/InternalFunctionDefinitions.java
index 80a3fd4..ef2ee16 100644
---
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/InternalFunctionDefinitions.java
+++
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/InternalFunctionDefinitions.java
@@ -18,14 +18,17 @@
package org.apache.flink.table.functions;
-import static
org.apache.flink.table.functions.FunctionDefinition.Type.SCALAR_FUNCTION;
+import static org.apache.flink.table.functions.FunctionKind.SCALAR;
/**
* Dictionary of function definitions for all internal used functions.
*/
public class InternalFunctionDefinitions {
- public static final FunctionDefinition THROW_EXCEPTION =
- new FunctionDefinition("throwException", SCALAR_FUNCTION);
+ public static final BuiltInFunctionDefinition THROW_EXCEPTION =
+ new BuiltInFunctionDefinition.Builder()
+ .name("throwException")
+ .kind(SCALAR)
+ .build();
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/DeclarativeAggregateFunction.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/DeclarativeAggregateFunction.java
index a3c8633..ca158fc 100644
---
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/DeclarativeAggregateFunction.java
+++
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/DeclarativeAggregateFunction.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.functions.aggfunctions;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+import org.apache.flink.table.functions.FunctionKind;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Preconditions;
@@ -161,4 +162,9 @@ public abstract class DeclarativeAggregateFunction extends
UserDefinedFunction {
}
return ret;
}
+
+ @Override
+ public final FunctionKind getKind() {
+ return FunctionKind.OTHER;
+ }
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
index 4ee8b95..a231d20 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
@@ -19,13 +19,22 @@
package org.apache.flink.table.functions.utils
+import java.lang.reflect.{Method, Modifier}
+import java.lang.{Integer => JInt, Long => JLong}
+import java.sql.{Date, Time, Timestamp}
+
+import com.google.common.primitives.Primitives
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.calcite.rex.{RexLiteral, RexNode}
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.sql.{SqlFunction, SqlOperatorBinding}
import org.apache.flink.api.common.functions.InvalidTypesException
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils._
import org.apache.flink.table.api.{TableEnvironment, TableException,
ValidationException}
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.dataformat.{BaseRow, BinaryString, Decimal}
-import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction,
TableFunction, UserDefinedFunction}
+import org.apache.flink.table.functions._
import org.apache.flink.table.plan.schema.DeferredTypeFlinkTableFunction
import org.apache.flink.table.types.ClassDataTypeConverter.fromClassToDataType
import
org.apache.flink.table.types.ClassLogicalTypeConverter.getInternalClassForType
@@ -37,20 +46,11 @@ import
org.apache.flink.table.types.{ClassLogicalTypeConverter, DataType}
import org.apache.flink.types.Row
import org.apache.flink.util.InstantiationUtil
-import com.google.common.primitives.Primitives
-import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
-import org.apache.calcite.rex.{RexLiteral, RexNode}
-import org.apache.calcite.sql.`type`.SqlTypeName
-import org.apache.calcite.sql.{SqlFunction, SqlOperatorBinding}
-
-import java.lang.reflect.{Method, Modifier}
-import java.lang.{Integer => JInt, Long => JLong}
-import java.sql.{Date, Time, Timestamp}
-
import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.language.postfixOps
+
object UserDefinedFunctionUtils {
/**
@@ -505,7 +505,7 @@ object UserDefinedFunctionUtils {
externalResultType,
externalAccType,
typeFactory,
- aggFunction.requiresOver)
+
aggFunction.getRequirements.contains(FunctionRequirement.OVER_WINDOW_ONLY))
}
//
----------------------------------------------------------------------------------------------
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ExpandColumnFunctionsRule.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ExpandColumnFunctionsRule.java
index 622c3ab..1f9834c 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ExpandColumnFunctionsRule.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ExpandColumnFunctionsRule.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.ExpressionUtils;
import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.util.Preconditions;
import java.util.Collections;
@@ -76,10 +77,10 @@ final class ExpandColumnFunctionsRule implements
ResolverRule {
List<Expression> result;
- String definitionName =
call.getFunctionDefinition().getName();
- if (definitionName.equals(WITH_COLUMNS.getName())) {
+ final FunctionDefinition definition =
call.getFunctionDefinition();
+ if (definition == WITH_COLUMNS) {
result =
resolveArgsOfColumns(call.getChildren(), false);
- } else if
(definitionName.equals(WITHOUT_COLUMNS.getName())) {
+ } else if (definition == WITHOUT_COLUMNS) {
result =
resolveArgsOfColumns(call.getChildren(), true);
} else {
List<Expression> args = call.getChildren()
@@ -89,7 +90,7 @@ final class ExpandColumnFunctionsRule implements ResolverRule
{
result = Collections.singletonList(new
CallExpression(call.getFunctionDefinition(), args));
// validate as.
- if (definitionName.equals(AS.getName())) {
+ if (definition == AS) {
for (int i = 1; i < args.size(); ++i) {
if (!(args.get(i) instanceof
ValueLiteralExpression)) {
String errorMessage =
String.join(
@@ -205,7 +206,7 @@ final class ExpandColumnFunctionsRule implements
ResolverRule {
* Whether the expression is a column index range expression,
e.g. withColumns(1 ~ 2).
*/
private boolean isIndexRangeCall(CallExpression expression) {
- return
expression.getFunctionDefinition().getName().equals(RANGE_TO.getName()) &&
+ return expression.getFunctionDefinition() == RANGE_TO &&
expression.getChildren().get(0) instanceof
ValueLiteralExpression &&
expression.getChildren().get(1) instanceof
ValueLiteralExpression;
}
@@ -214,7 +215,7 @@ final class ExpandColumnFunctionsRule implements
ResolverRule {
* Whether the expression is a column name range expression,
e.g. withColumns(a ~ b).
*/
private boolean isNameRangeCall(CallExpression expression) {
- return
expression.getFunctionDefinition().getName().equals(RANGE_TO.getName()) &&
+ return expression.getFunctionDefinition() == RANGE_TO &&
expression.getChildren().get(0) instanceof
UnresolvedReferenceExpression &&
expression.getChildren().get(1) instanceof
UnresolvedReferenceExpression;
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AggregateOperationFactory.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AggregateOperationFactory.java
index 89ddbb6..d8f71de 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AggregateOperationFactory.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AggregateOperationFactory.java
@@ -41,11 +41,12 @@ import
org.apache.flink.table.expressions.FieldReferenceExpression;
import org.apache.flink.table.expressions.PlannerExpression;
import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
import org.apache.flink.table.expressions.ValueLiteralExpression;
-import org.apache.flink.table.functions.AggregateFunction;
-import org.apache.flink.table.functions.AggregateFunctionDefinition;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.FunctionKind;
+import org.apache.flink.table.functions.FunctionRequirement;
import org.apache.flink.table.functions.TableAggregateFunction;
+import org.apache.flink.table.functions.TableAggregateFunctionDefinition;
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils;
import
org.apache.flink.table.operations.WindowAggregateQueryOperation.ResolvedGroupWindow;
import org.apache.flink.table.types.logical.LogicalType;
@@ -62,9 +63,10 @@ import static java.lang.String.format;
import static java.util.Collections.singletonList;
import static java.util.stream.Collectors.toList;
import static
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
-import static
org.apache.flink.table.expressions.ExpressionUtils.isFunctionOfType;
+import static
org.apache.flink.table.expressions.ExpressionUtils.isFunctionOfKind;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AS;
-import static
org.apache.flink.table.functions.FunctionDefinition.Type.AGGREGATE_FUNCTION;
+import static org.apache.flink.table.functions.FunctionKind.AGGREGATE;
+import static org.apache.flink.table.functions.FunctionKind.TABLE_AGGREGATE;
import static
org.apache.flink.table.operations.OperationExpressionsUtils.extractName;
import static
org.apache.flink.table.operations.WindowAggregateQueryOperation.ResolvedGroupWindow.WindowType.SLIDE;
import static
org.apache.flink.table.operations.WindowAggregateQueryOperation.ResolvedGroupWindow.WindowType.TUMBLE;
@@ -191,10 +193,10 @@ public class AggregateOperationFactory {
* may return multi output names when the composite return type is
flattened.
*/
private Stream<String> extractAggregateNames(Expression expression) {
- if (isTableAggFunctionCall(expression)) {
- return
Arrays.stream(UserDefinedFunctionUtils.getFieldInfo(
- ((AggregateFunctionDefinition)
((CallExpression) expression).getFunctionDefinition())
- .getResultTypeInfo())._1());
+ if (isFunctionOfKind(expression, FunctionKind.TABLE_AGGREGATE))
{
+ final TableAggregateFunctionDefinition definition =
+ (TableAggregateFunctionDefinition)
((CallExpression) expression).getFunctionDefinition();
+ return
Arrays.stream(UserDefinedFunctionUtils.getFieldInfo(definition.getResultTypeInfo())._1());
} else {
return
Stream.of(extractName(expression).orElseGet(expression::toString));
}
@@ -417,21 +419,19 @@ public class AggregateOperationFactory {
@Override
public Void visitCall(CallExpression call) {
FunctionDefinition functionDefinition =
call.getFunctionDefinition();
- if (isFunctionOfType(call, AGGREGATE_FUNCTION)) {
+ if (isFunctionOfKind(call, AGGREGATE) ||
isFunctionOfKind(call, TABLE_AGGREGATE)) {
if (functionDefinition ==
BuiltInFunctionDefinitions.DISTINCT) {
call.getChildren().forEach(expr ->
expr.accept(validateDistinct));
} else {
- if (functionDefinition instanceof
AggregateFunctionDefinition) {
- if
(requiresOver(functionDefinition)) {
- throw new
ValidationException(format(
- "OVER clause is
necessary for window functions: [%s].",
- call));
- }
+ if (requiresOver(functionDefinition)) {
+ throw new
ValidationException(format(
+ "OVER clause is
necessary for window functions: [%s].",
+ call));
}
call.getChildren().forEach(child ->
child.accept(noNestedAggregates));
}
- } else if (functionDefinition ==
BuiltInFunctionDefinitions.AS) {
+ } else if (functionDefinition == AS) {
// skip alias
call.getChildren().get(0).accept(this);
} else {
@@ -441,10 +441,8 @@ public class AggregateOperationFactory {
}
private boolean requiresOver(FunctionDefinition
functionDefinition) {
- return ((AggregateFunctionDefinition)
functionDefinition).getAggregateFunction()
- instanceof AggregateFunction &&
- ((AggregateFunction)
((AggregateFunctionDefinition) functionDefinition)
- .getAggregateFunction()).requiresOver();
+ return functionDefinition.getRequirements()
+ .contains(FunctionRequirement.OVER_WINDOW_ONLY);
}
@Override
@@ -466,7 +464,7 @@ public class AggregateOperationFactory {
if (call.getFunctionDefinition() ==
BuiltInFunctionDefinitions.DISTINCT) {
throw new ValidationException("It's not allowed
to use an aggregate function as " +
"input of another aggregate function");
- } else if (call.getFunctionDefinition().getType() !=
AGGREGATE_FUNCTION) {
+ } else if (!isFunctionOfKind(call, AGGREGATE) &&
!isFunctionOfKind(call, TABLE_AGGREGATE)) {
throw new ValidationException("Distinct
operator can only be applied to aggregation expressions!");
} else {
call.getChildren().forEach(child ->
child.accept(noNestedAggregates));
@@ -484,7 +482,7 @@ public class AggregateOperationFactory {
@Override
public Void visitCall(CallExpression call) {
- if (call.getFunctionDefinition().getType() ==
AGGREGATE_FUNCTION) {
+ if (isFunctionOfKind(call, AGGREGATE) ||
isFunctionOfKind(call, TABLE_AGGREGATE)) {
throw new ValidationException("It's not allowed
to use an aggregate function as " +
"input of another aggregate function");
}
@@ -530,12 +528,9 @@ public class AggregateOperationFactory {
@Override
public Expression visitCall(CallExpression call) {
FunctionDefinition definition =
call.getFunctionDefinition();
- if (definition.equals(AS)) {
+ if (definition == BuiltInFunctionDefinitions.AS) {
return unwrapFromAlias(call);
- } else if (definition instanceof
AggregateFunctionDefinition) {
- if (!isTableAggFunctionCall(call)) {
- throw fail();
- }
+ } else if (isFunctionOfKind(call, TABLE_AGGREGATE)) {
return call;
} else {
return defaultMethod(call);
@@ -550,18 +545,20 @@ public class AggregateOperationFactory {
.orElseThrow(() -> new
ValidationException("Unexpected alias: " + alias)))
.collect(toList());
- if (!isTableAggFunctionCall(children.get(0))) {
+ if (!isFunctionOfKind(children.get(0),
TABLE_AGGREGATE)) {
throw fail();
}
- validateAlias(aliases, (AggregateFunctionDefinition)
((CallExpression) children.get(0)).getFunctionDefinition());
+ validateAlias(
+ aliases,
+ (TableAggregateFunctionDefinition)
((CallExpression) children.get(0)).getFunctionDefinition());
alias = aliases;
return children.get(0);
}
private void validateAlias(
List<String> aliases,
- AggregateFunctionDefinition aggFunctionDefinition) {
+ TableAggregateFunctionDefinition aggFunctionDefinition)
{
TypeInformation<?> resultType =
aggFunctionDefinition.getResultTypeInfo();
@@ -573,7 +570,7 @@ public class AggregateOperationFactory {
"List of column aliases must have same
degree as table; " +
"the returned table of function
'%s' has " +
"%d columns, whereas alias list
has %d columns",
- aggFunctionDefinition.getName(),
+ aggFunctionDefinition,
callArity,
aliasesSize));
}
@@ -590,16 +587,4 @@ public class AggregateOperationFactory {
"function that might be followed by
some alias.");
}
}
-
- /**
- * Return true if the input {@link Expression} is a {@link
CallExpression} of table aggregate function.
- */
- public static boolean isTableAggFunctionCall(Expression expression) {
- return Stream.of(expression)
- .filter(p -> p instanceof CallExpression)
- .map(p -> (CallExpression) p)
- .filter(p -> p.getFunctionDefinition() instanceof
AggregateFunctionDefinition)
- .map(p -> (AggregateFunctionDefinition)
p.getFunctionDefinition())
- .anyMatch(p -> p.getAggregateFunction() instanceof
TableAggregateFunction);
- }
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/CalculatedTableFactory.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/CalculatedTableFactory.java
index 76274e7..0847ab2 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/CalculatedTableFactory.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/CalculatedTableFactory.java
@@ -35,9 +35,9 @@ import java.util.Collections;
import java.util.List;
import static java.util.stream.Collectors.toList;
-import static
org.apache.flink.table.expressions.ExpressionUtils.isFunctionOfType;
+import static
org.apache.flink.table.expressions.ExpressionUtils.isFunctionOfKind;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AS;
-import static
org.apache.flink.table.functions.FunctionDefinition.Type.TABLE_FUNCTION;
+import static org.apache.flink.table.functions.FunctionKind.TABLE;
/**
* Utility class for creating a valid {@link CalculatedQueryOperation}
operation.
@@ -87,7 +87,7 @@ public class CalculatedTableFactory {
.orElseThrow(() -> new
ValidationException("Unexpected alias: " + alias)))
.collect(toList());
- if (!isFunctionOfType(children.get(0), TABLE_FUNCTION))
{
+ if (!isFunctionOfKind(children.get(0), TABLE)) {
throw fail();
}
@@ -114,7 +114,7 @@ public class CalculatedTableFactory {
"List of column aliases must have same
degree as table; " +
"the returned table of function
'%s' has " +
"%d columns, whereas alias list
has %d columns",
- tableFunctionDefinition.getName(),
+ tableFunctionDefinition.toString(),
callArity,
aliasesSize));
} else {
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/QueryOperationConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/QueryOperationConverter.java
index 94eb829..aa735e3 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/QueryOperationConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/QueryOperationConverter.java
@@ -36,7 +36,6 @@ import
org.apache.flink.table.expressions.RexPlannerExpression;
import org.apache.flink.table.expressions.WindowReference;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.utils.TableSqlFunction;
-import org.apache.flink.table.operations.AggregateOperationFactory;
import org.apache.flink.table.operations.AggregateQueryOperation;
import org.apache.flink.table.operations.CalculatedQueryOperation;
import org.apache.flink.table.operations.CatalogQueryOperation;
@@ -92,9 +91,10 @@ import scala.Some;
import static java.util.Arrays.asList;
import static java.util.stream.Collectors.toList;
import static org.apache.flink.table.expressions.ExpressionUtils.extractValue;
-import static
org.apache.flink.table.expressions.ExpressionUtils.isFunctionOfType;
+import static
org.apache.flink.table.expressions.ExpressionUtils.isFunctionOfKind;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AS;
-import static
org.apache.flink.table.functions.FunctionDefinition.Type.AGGREGATE_FUNCTION;
+import static org.apache.flink.table.functions.FunctionKind.AGGREGATE;
+import static org.apache.flink.table.functions.FunctionKind.TABLE_AGGREGATE;
import static
org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo;
/**
@@ -166,7 +166,7 @@ public class QueryOperationConverter extends
QueryOperationDefaultVisitor<RelNod
* Get the {@link AggCall} correspond to the aggregate
expression.
*/
private AggCall getAggCall(Expression aggregateExpression) {
- if
(AggregateOperationFactory.isTableAggFunctionCall(aggregateExpression)) {
+ if (isFunctionOfKind(aggregateExpression,
TABLE_AGGREGATE)) {
return
aggregateExpression.accept(tableAggregateVisitor);
} else {
return
aggregateExpression.accept(aggregateVisitor);
@@ -416,7 +416,7 @@ public class QueryOperationConverter extends
QueryOperationDefaultVisitor<RelNod
.orElseThrow(() -> new
TableException("Unexpected name."));
Expression aggregate =
call.getChildren().get(0);
- if (isFunctionOfType(aggregate,
AGGREGATE_FUNCTION)) {
+ if (isFunctionOfKind(aggregate, AGGREGATE)) {
return ((Aggregation)
expressionBridge.bridge(aggregate))
.toAggCall(aggregateName,
false, relBuilder);
}
@@ -433,7 +433,7 @@ public class QueryOperationConverter extends
QueryOperationDefaultVisitor<RelNod
private class TableAggregateVisitor extends AggregateVisitor {
@Override
public AggCall visitCall(CallExpression call) {
- if (isFunctionOfType(call, AGGREGATE_FUNCTION)) {
+ if (isFunctionOfKind(call, TABLE_AGGREGATE)) {
AggFunctionCall aggFunctionCall =
(AggFunctionCall) expressionBridge.bridge(call);
return
aggFunctionCall.toAggCall(aggFunctionCall.toString(), false, relBuilder);
}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index bf3d8a3..0e4bd9a 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -1112,14 +1112,21 @@ trait ImplicitExpressionConversions {
implicit class UserDefinedAggregateFunctionCall[T: TypeInformation, ACC:
TypeInformation]
(val a: UserDefinedAggregateFunction[T, ACC]) {
- private def createFunctionDefinition(): AggregateFunctionDefinition = {
+ private def createFunctionDefinition(): FunctionDefinition = {
val resultTypeInfo: TypeInformation[T] = UserFunctionsTypeHelper
.getReturnTypeOfAggregateFunction(a, implicitly[TypeInformation[T]])
val accTypeInfo: TypeInformation[ACC] = UserFunctionsTypeHelper.
getAccumulatorTypeOfAggregateFunction(a,
implicitly[TypeInformation[ACC]])
- new AggregateFunctionDefinition(a.getClass.getName, a, resultTypeInfo,
accTypeInfo)
+ a match {
+ case af: AggregateFunction[_, _] =>
+ new AggregateFunctionDefinition(
+ af.getClass.getName, af, resultTypeInfo, accTypeInfo)
+ case taf: TableAggregateFunction[_, _] =>
+ new TableAggregateFunctionDefinition(
+ taf.getClass.getName, taf, resultTypeInfo, accTypeInfo)
+ }
}
/**
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
index 7c046f7..b607473 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
@@ -83,7 +83,7 @@ class PlannerExpressionConverter private extends
ApiExpressionVisitor[PlannerExp
case tfd: TableFunctionDefinition =>
PlannerTableFunctionCall(
- tfd.getName,
+ tfd.toString,
tfd.getTableFunction,
args,
tfd.getResultType)
@@ -95,6 +95,13 @@ class PlannerExpressionConverter private extends
ApiExpressionVisitor[PlannerExp
afd.getAccumulatorTypeInfo,
args)
+ case tafd: TableAggregateFunctionDefinition =>
+ AggFunctionCall(
+ tafd.getTableAggregateFunction,
+ tafd.getResultTypeInfo,
+ tafd.getAccumulatorTypeInfo,
+ args)
+
case fd: FunctionDefinition =>
fd match {
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
index 8508a01..3aa021d 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
@@ -30,7 +30,7 @@ import org.apache.calcite.util.Optionality
import org.apache.flink.api.common.typeinfo._
import org.apache.flink.table.api.ValidationException
import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.functions.{AggregateFunction,
TableAggregateFunction, UserDefinedAggregateFunction}
+import org.apache.flink.table.functions.{AggregateFunction,
FunctionRequirement, TableAggregateFunction, UserDefinedAggregateFunction}
import
org.apache.flink.table.functions.utils.AggSqlFunction.{createOperandTypeChecker,
createOperandTypeInference, createReturnTypeInference}
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
@@ -87,7 +87,8 @@ object AggSqlFunction {
typeFactory: FlinkTypeFactory): AggSqlFunction = {
val requiresOver = aggregateFunction match {
- case a: AggregateFunction[_, _] => a.requiresOver()
+ case a: AggregateFunction[_, _] =>
+ a.getRequirements.contains(FunctionRequirement.OVER_WINDOW_ONLY)
case _ => false
}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/operations/OperationTreeBuilder.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/operations/OperationTreeBuilder.scala
index e8b7e0d..79a77b2 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/operations/OperationTreeBuilder.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/operations/OperationTreeBuilder.scala
@@ -24,10 +24,10 @@ import org.apache.flink.table.api._
import org.apache.flink.table.catalog.FunctionLookup
import org.apache.flink.table.expressions.ApiExpressionUtils.{call,
valueLiteral}
import org.apache.flink.table.expressions.ExpressionResolver.resolverFor
-import org.apache.flink.table.expressions.ExpressionUtils.isFunctionOfType
+import org.apache.flink.table.expressions.ExpressionUtils.isFunctionOfKind
import org.apache.flink.table.expressions._
import org.apache.flink.table.expressions.lookups.TableReferenceLookup
-import
org.apache.flink.table.functions.FunctionDefinition.Type.{SCALAR_FUNCTION,
TABLE_FUNCTION}
+import org.apache.flink.table.functions.FunctionKind.{SCALAR, TABLE}
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
import org.apache.flink.table.functions.{AggregateFunctionDefinition,
BuiltInFunctionDefinitions, TableFunctionDefinition}
import org.apache.flink.table.operations.AliasOperationUtils.createAliasList
@@ -201,8 +201,7 @@ class OperationTreeBuilder(private val tableEnv:
TableEnvImpl) {
// extract alias and aggregate function
var alias: Seq[String] = Seq()
val aggWithoutAlias = resolvedAggregate match {
- case c: CallExpression
- if c.getFunctionDefinition.getName ==
BuiltInFunctionDefinitions.AS.getName =>
+ case c: CallExpression if c.getFunctionDefinition ==
BuiltInFunctionDefinitions.AS =>
alias = c.getChildren
.drop(1)
.map(e => ExpressionUtils.extractValue(e, classOf[String]).get())
@@ -473,7 +472,7 @@ class OperationTreeBuilder(private val tableEnv:
TableEnvImpl) {
val resolver = resolverFor(tableCatalog, functionCatalog, child).build()
val resolvedMapFunction = resolveSingleExpression(mapFunction, resolver)
- if (!isFunctionOfType(resolvedMapFunction, SCALAR_FUNCTION)) {
+ if (!isFunctionOfKind(resolvedMapFunction, SCALAR)) {
throw new ValidationException("Only ScalarFunction can be used in the
map operator.")
}
@@ -487,7 +486,7 @@ class OperationTreeBuilder(private val tableEnv:
TableEnvImpl) {
val resolver = resolverFor(tableCatalog, functionCatalog, child).build()
val resolvedTableFunction = resolveSingleExpression(tableFunction,
resolver)
- if (!isFunctionOfType(resolvedTableFunction, TABLE_FUNCTION)) {
+ if (!isFunctionOfKind(resolvedTableFunction, TABLE)) {
throw new ValidationException("Only TableFunction can be used in the
flatMap operator.")
}
@@ -539,8 +538,7 @@ class OperationTreeBuilder(private val tableEnv:
TableEnvImpl) {
var attrNameCntr: Int = 0
val usedFieldNames = inputFieldNames.toBuffer
groupingExpressions.map {
- case c: CallExpression
- if
!c.getFunctionDefinition.getName.equals(BuiltInFunctionDefinitions.AS.getName)
=> {
+ case c: CallExpression if c.getFunctionDefinition !=
BuiltInFunctionDefinitions.AS =>
val tempName = getUniqueName("TMP_" + attrNameCntr, usedFieldNames)
usedFieldNames.append(tempName)
attrNameCntr += 1
@@ -548,7 +546,6 @@ class OperationTreeBuilder(private val tableEnv:
TableEnvImpl) {
BuiltInFunctionDefinitions.AS,
Seq(c, new ValueLiteralExpression(tempName))
)
- }
case e => e
}
}
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/ColumnFunctionsTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/ColumnFunctionsTest.scala
index 5c61269..fa7b8bf 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/ColumnFunctionsTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/ColumnFunctionsTest.scala
@@ -166,8 +166,7 @@ class ColumnFunctionsTest extends TableTestBase {
unaryNode(
"DataStreamCorrelate",
streamTableNode(t),
- term("invocation",
-
"org$apache$flink$table$utils$TableFunc0$497a630d2a145bca99673bcd05a53d2b($2)"),
+ term("invocation", func0.functionIdentifier() + "($2)"),
term("correlate", "table(TableFunc0(string))"),
term("select", "int", "long", "string", "name", "age"),
term("rowType",