JingsongLi commented on a change in pull request #8977:
[FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner
support add kinds of QueryOperations.
URL: https://github.com/apache/flink/pull/8977#discussion_r300213763
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java
##########
@@ -79,25 +112,196 @@
private final RelBuilder relBuilder;
private final FlinkTypeFactory typeFactory;
+ private static final int DECIMAL_PRECISION_NEEDED_FOR_LONG = 19;
+
+ /**
+ * The mapping only keeps part of FunctionDefinitions, which could be
converted to SqlOperator in a very simple
+ * way.
+ */
+ private static final Map<FunctionDefinition, SqlOperator>
SIMPLE_DEF_SQL_OPERATOR_MAPPING = new HashMap<>();
+
+ static {
+ // logic functions
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.AND,
FlinkSqlOperatorTable.AND);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.OR,
FlinkSqlOperatorTable.OR);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.NOT,
FlinkSqlOperatorTable.NOT);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IF,
FlinkSqlOperatorTable.CASE);
+
+ // comparison functions
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.EQUALS,
FlinkSqlOperatorTable.EQUALS);
+ SIMPLE_DEF_SQL_OPERATOR_MAPPING
+ .put(BuiltInFunctionDefinitions.GREATER_THAN,
FlinkSqlOperatorTable.GREATER_THAN);
+ SIMPLE_DEF_SQL_OPERATOR_MAPPING
+
.put(BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL,
FlinkSqlOperatorTable.GREATER_THAN_OR_EQUAL);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LESS_THAN,
FlinkSqlOperatorTable.LESS_THAN);
+ SIMPLE_DEF_SQL_OPERATOR_MAPPING
+
.put(BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL,
FlinkSqlOperatorTable.LESS_THAN_OR_EQUAL);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.NOT_EQUALS,
FlinkSqlOperatorTable.NOT_EQUALS);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_NULL,
FlinkSqlOperatorTable.IS_NULL);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_NOT_NULL,
FlinkSqlOperatorTable.IS_NOT_NULL);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_TRUE,
FlinkSqlOperatorTable.IS_TRUE);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_FALSE,
FlinkSqlOperatorTable.IS_FALSE);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_NOT_TRUE,
FlinkSqlOperatorTable.IS_NOT_TRUE);
+ SIMPLE_DEF_SQL_OPERATOR_MAPPING
+ .put(BuiltInFunctionDefinitions.IS_NOT_FALSE,
FlinkSqlOperatorTable.IS_NOT_FALSE);
+
+ // string functions
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.CHAR_LENGTH,
FlinkSqlOperatorTable.CHAR_LENGTH);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.INIT_CAP,
FlinkSqlOperatorTable.INITCAP);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LIKE,
FlinkSqlOperatorTable.LIKE);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LOWER,
FlinkSqlOperatorTable.LOWER);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SIMILAR,
FlinkSqlOperatorTable.SIMILAR_TO);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SUBSTRING,
FlinkSqlOperatorTable.SUBSTRING);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.UPPER,
FlinkSqlOperatorTable.UPPER);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.POSITION,
FlinkSqlOperatorTable.POSITION);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.OVERLAY,
FlinkSqlOperatorTable.OVERLAY);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.CONCAT,
FlinkSqlOperatorTable.CONCAT_FUNCTION);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.CONCAT_WS,
FlinkSqlOperatorTable.CONCAT_WS);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LPAD,
FlinkSqlOperatorTable.LPAD);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.RPAD,
FlinkSqlOperatorTable.RPAD);
+ SIMPLE_DEF_SQL_OPERATOR_MAPPING
+ .put(BuiltInFunctionDefinitions.REGEXP_EXTRACT,
FlinkSqlOperatorTable.REGEXP_EXTRACT);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.FROM_BASE64,
FlinkSqlOperatorTable.FROM_BASE64);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.TO_BASE64,
FlinkSqlOperatorTable.TO_BASE64);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.UUID,
FlinkSqlOperatorTable.UUID);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LTRIM,
FlinkSqlOperatorTable.LTRIM);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.RTRIM,
FlinkSqlOperatorTable.RTRIM);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.REPEAT,
FlinkSqlOperatorTable.REPEAT);
+ SIMPLE_DEF_SQL_OPERATOR_MAPPING
+ .put(BuiltInFunctionDefinitions.REGEXP_REPLACE,
FlinkSqlOperatorTable.REGEXP_REPLACE);
+
+ // math functions
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.MINUS,
FlinkSqlOperatorTable.MINUS);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.DIVIDE,
FlinkSqlOperatorTable.DIVIDE);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.TIMES,
FlinkSqlOperatorTable.MULTIPLY);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.ABS,
FlinkSqlOperatorTable.ABS);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.EXP,
FlinkSqlOperatorTable.EXP);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LOG10,
FlinkSqlOperatorTable.LOG10);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LOG2,
FlinkSqlOperatorTable.LOG2);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LN,
FlinkSqlOperatorTable.LN);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LOG,
FlinkSqlOperatorTable.LOG);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.POWER,
FlinkSqlOperatorTable.POWER);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.MOD,
FlinkSqlOperatorTable.MOD);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SQRT,
FlinkSqlOperatorTable.SQRT);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.MINUS_PREFIX,
FlinkSqlOperatorTable.UNARY_MINUS);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SIN,
FlinkSqlOperatorTable.SIN);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.COS,
FlinkSqlOperatorTable.COS);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SINH,
FlinkSqlOperatorTable.SINH);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.TAN,
FlinkSqlOperatorTable.TAN);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.TANH,
FlinkSqlOperatorTable.TANH);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.COT,
FlinkSqlOperatorTable.COT);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.ASIN,
FlinkSqlOperatorTable.ASIN);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.ACOS,
FlinkSqlOperatorTable.ACOS);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.ATAN,
FlinkSqlOperatorTable.ATAN);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.ATAN2,
FlinkSqlOperatorTable.ATAN2);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.COSH,
FlinkSqlOperatorTable.COSH);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.DEGREES,
FlinkSqlOperatorTable.DEGREES);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.RADIANS,
FlinkSqlOperatorTable.RADIANS);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SIGN,
FlinkSqlOperatorTable.SIGN);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.ROUND,
FlinkSqlOperatorTable.ROUND);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.PI,
FlinkSqlOperatorTable.PI);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.E,
FlinkSqlOperatorTable.E);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.RAND,
FlinkSqlOperatorTable.RAND);
+ SIMPLE_DEF_SQL_OPERATOR_MAPPING
+ .put(BuiltInFunctionDefinitions.RAND_INTEGER,
FlinkSqlOperatorTable.RAND_INTEGER);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.BIN,
FlinkSqlOperatorTable.BIN);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.HEX,
FlinkSqlOperatorTable.HEX);
+
+// TODO
+//
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.TRUNCATE,
FlinkSqlOperatorTable.TRUNCATE);
+
+ // time functions
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.EXTRACT,
FlinkSqlOperatorTable.EXTRACT);
+ SIMPLE_DEF_SQL_OPERATOR_MAPPING
+ .put(BuiltInFunctionDefinitions.CURRENT_DATE,
FlinkSqlOperatorTable.CURRENT_DATE);
+ SIMPLE_DEF_SQL_OPERATOR_MAPPING
+ .put(BuiltInFunctionDefinitions.CURRENT_TIME,
FlinkSqlOperatorTable.CURRENT_TIME);
+ SIMPLE_DEF_SQL_OPERATOR_MAPPING
+
.put(BuiltInFunctionDefinitions.CURRENT_TIMESTAMP,
FlinkSqlOperatorTable.CURRENT_TIMESTAMP);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LOCAL_TIME,
FlinkSqlOperatorTable.LOCALTIME);
+ SIMPLE_DEF_SQL_OPERATOR_MAPPING
+
.put(BuiltInFunctionDefinitions.LOCAL_TIMESTAMP,
FlinkSqlOperatorTable.LOCALTIMESTAMP);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.DATE_FORMAT,
FlinkSqlOperatorTable.DATE_FORMAT);
+
+ // collection
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.AT,
FlinkSqlOperatorTable.ITEM);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.CARDINALITY,
FlinkSqlOperatorTable.CARDINALITY);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.ORDER_DESC,
FlinkSqlOperatorTable.DESC);
+
+ // crypto hash
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.MD5,
FlinkSqlOperatorTable.MD5);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SHA2,
FlinkSqlOperatorTable.SHA2);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SHA224,
FlinkSqlOperatorTable.SHA224);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SHA256,
FlinkSqlOperatorTable.SHA256);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SHA384,
FlinkSqlOperatorTable.SHA384);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SHA512,
FlinkSqlOperatorTable.SHA512);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SHA1,
FlinkSqlOperatorTable.SHA1);
+
+ // etc
+ //
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(InternalFunctionDefinitions.THROW_EXCEPTION,
FlinkSqlOperatorTable.THROW_EXCEPTION);
+
+ }
+
public RexNodeConverter(RelBuilder relBuilder) {
this.relBuilder = relBuilder;
this.typeFactory = (FlinkTypeFactory)
relBuilder.getRexBuilder().getTypeFactory();
}
+ // TODO removed later after PlanExpression is merged
public RexNode visit(UnresolvedCallExpression call) {
- switch (call.getFunctionDefinition().getKind()) {
+ FunctionDefinition func = call.getFunctionDefinition();
+ switch (func.getKind()) {
case SCALAR:
- return
translateScalarCall(call.getFunctionDefinition(), call.getChildren());
- default: throw new UnsupportedOperationException();
+ if (func instanceof ScalarFunctionDefinition) {
+ ScalarFunction scalaFunc =
((ScalarFunctionDefinition) func).getScalarFunction();
+ List<RexNode> child =
convertCallChildren(call.getChildren());
+ SqlFunction sqlFunction =
UserDefinedFunctionUtils.createScalarSqlFunction(
+
scalaFunc.functionIdentifier(),
+ scalaFunc.toString(),
+ scalaFunc,
+ typeFactory);
+ return relBuilder.call(sqlFunction,
child);
+ } else {
+ return
visitBuiltInFunc(call.getFunctionDefinition(), call.getChildren());
+ }
+
+ default:
+ throw new UnsupportedOperationException();
}
}
@Override
public RexNode visit(CallExpression call) {
- switch (call.getFunctionDefinition().getKind()) {
- case SCALAR:
- return
translateScalarCall(call.getFunctionDefinition(), call.getChildren());
- default: throw new UnsupportedOperationException();
+ FunctionDefinition func = call.getFunctionDefinition();
+ if (func instanceof ScalarFunctionDefinition) {
+ ScalarFunction scalaFunc = ((ScalarFunctionDefinition)
func).getScalarFunction();
+ List<RexNode> child =
convertCallChildren(call.getChildren());
+ SqlFunction sqlFunction =
UserDefinedFunctionUtils.createScalarSqlFunction(
+ scalaFunc.functionIdentifier(),
+ scalaFunc.toString(),
+ scalaFunc,
+ typeFactory);
+ return relBuilder.call(sqlFunction, child);
+ } else if (func instanceof TableFunctionDefinition) {
+ throw new
UnsupportedOperationException(func.toString());
Review comment:
Why not?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services