beyond1920 commented on a change in pull request #8977:
[FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner
support add kinds of QueryOperations.
URL: https://github.com/apache/flink/pull/8977#discussion_r300219868
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java
##########
@@ -79,25 +112,196 @@
private final RelBuilder relBuilder;
private final FlinkTypeFactory typeFactory;
+ private static final int DECIMAL_PRECISION_NEEDED_FOR_LONG = 19;
+
+ /**
+ * The mapping only keeps part of FunctionDefinitions, which could be
converted to SqlOperator in a very simple
+ * way.
+ */
+ private static final Map<FunctionDefinition, SqlOperator>
SIMPLE_DEF_SQL_OPERATOR_MAPPING = new HashMap<>();
+
+ static {
+ // logic functions
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.AND,
FlinkSqlOperatorTable.AND);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.OR,
FlinkSqlOperatorTable.OR);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.NOT,
FlinkSqlOperatorTable.NOT);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IF,
FlinkSqlOperatorTable.CASE);
+
+ // comparison functions
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.EQUALS,
FlinkSqlOperatorTable.EQUALS);
+ SIMPLE_DEF_SQL_OPERATOR_MAPPING
+ .put(BuiltInFunctionDefinitions.GREATER_THAN,
FlinkSqlOperatorTable.GREATER_THAN);
+ SIMPLE_DEF_SQL_OPERATOR_MAPPING
+
.put(BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL,
FlinkSqlOperatorTable.GREATER_THAN_OR_EQUAL);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LESS_THAN,
FlinkSqlOperatorTable.LESS_THAN);
+ SIMPLE_DEF_SQL_OPERATOR_MAPPING
+
.put(BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL,
FlinkSqlOperatorTable.LESS_THAN_OR_EQUAL);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.NOT_EQUALS,
FlinkSqlOperatorTable.NOT_EQUALS);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_NULL,
FlinkSqlOperatorTable.IS_NULL);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_NOT_NULL,
FlinkSqlOperatorTable.IS_NOT_NULL);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_TRUE,
FlinkSqlOperatorTable.IS_TRUE);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_FALSE,
FlinkSqlOperatorTable.IS_FALSE);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_NOT_TRUE,
FlinkSqlOperatorTable.IS_NOT_TRUE);
+ SIMPLE_DEF_SQL_OPERATOR_MAPPING
+ .put(BuiltInFunctionDefinitions.IS_NOT_FALSE,
FlinkSqlOperatorTable.IS_NOT_FALSE);
+
+ // string functions
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.CHAR_LENGTH,
FlinkSqlOperatorTable.CHAR_LENGTH);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.INIT_CAP,
FlinkSqlOperatorTable.INITCAP);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LIKE,
FlinkSqlOperatorTable.LIKE);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LOWER,
FlinkSqlOperatorTable.LOWER);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SIMILAR,
FlinkSqlOperatorTable.SIMILAR_TO);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SUBSTRING,
FlinkSqlOperatorTable.SUBSTRING);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.UPPER,
FlinkSqlOperatorTable.UPPER);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.POSITION,
FlinkSqlOperatorTable.POSITION);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.OVERLAY,
FlinkSqlOperatorTable.OVERLAY);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.CONCAT,
FlinkSqlOperatorTable.CONCAT_FUNCTION);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.CONCAT_WS,
FlinkSqlOperatorTable.CONCAT_WS);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LPAD,
FlinkSqlOperatorTable.LPAD);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.RPAD,
FlinkSqlOperatorTable.RPAD);
+ SIMPLE_DEF_SQL_OPERATOR_MAPPING
+ .put(BuiltInFunctionDefinitions.REGEXP_EXTRACT,
FlinkSqlOperatorTable.REGEXP_EXTRACT);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.FROM_BASE64,
FlinkSqlOperatorTable.FROM_BASE64);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.TO_BASE64,
FlinkSqlOperatorTable.TO_BASE64);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.UUID,
FlinkSqlOperatorTable.UUID);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LTRIM,
FlinkSqlOperatorTable.LTRIM);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.RTRIM,
FlinkSqlOperatorTable.RTRIM);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.REPEAT,
FlinkSqlOperatorTable.REPEAT);
+ SIMPLE_DEF_SQL_OPERATOR_MAPPING
+ .put(BuiltInFunctionDefinitions.REGEXP_REPLACE,
FlinkSqlOperatorTable.REGEXP_REPLACE);
+
+ // math functions
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.MINUS,
FlinkSqlOperatorTable.MINUS);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.DIVIDE,
FlinkSqlOperatorTable.DIVIDE);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.TIMES,
FlinkSqlOperatorTable.MULTIPLY);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.ABS,
FlinkSqlOperatorTable.ABS);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.EXP,
FlinkSqlOperatorTable.EXP);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LOG10,
FlinkSqlOperatorTable.LOG10);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LOG2,
FlinkSqlOperatorTable.LOG2);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LN,
FlinkSqlOperatorTable.LN);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LOG,
FlinkSqlOperatorTable.LOG);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.POWER,
FlinkSqlOperatorTable.POWER);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.MOD,
FlinkSqlOperatorTable.MOD);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SQRT,
FlinkSqlOperatorTable.SQRT);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.MINUS_PREFIX,
FlinkSqlOperatorTable.UNARY_MINUS);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SIN,
FlinkSqlOperatorTable.SIN);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.COS,
FlinkSqlOperatorTable.COS);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SINH,
FlinkSqlOperatorTable.SINH);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.TAN,
FlinkSqlOperatorTable.TAN);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.TANH,
FlinkSqlOperatorTable.TANH);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.COT,
FlinkSqlOperatorTable.COT);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.ASIN,
FlinkSqlOperatorTable.ASIN);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.ACOS,
FlinkSqlOperatorTable.ACOS);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.ATAN,
FlinkSqlOperatorTable.ATAN);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.ATAN2,
FlinkSqlOperatorTable.ATAN2);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.COSH,
FlinkSqlOperatorTable.COSH);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.DEGREES,
FlinkSqlOperatorTable.DEGREES);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.RADIANS,
FlinkSqlOperatorTable.RADIANS);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SIGN,
FlinkSqlOperatorTable.SIGN);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.ROUND,
FlinkSqlOperatorTable.ROUND);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.PI,
FlinkSqlOperatorTable.PI);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.E,
FlinkSqlOperatorTable.E);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.RAND,
FlinkSqlOperatorTable.RAND);
+ SIMPLE_DEF_SQL_OPERATOR_MAPPING
+ .put(BuiltInFunctionDefinitions.RAND_INTEGER,
FlinkSqlOperatorTable.RAND_INTEGER);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.BIN,
FlinkSqlOperatorTable.BIN);
+
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.HEX,
FlinkSqlOperatorTable.HEX);
+
+// TODO
+//
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.TRUNCATE,
FlinkSqlOperatorTable.TRUNCATE);
Review comment:
TRUNCATE is not supported in Blink yet.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services