This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 07e70a927ee36cd5eda85eece6b221086fe335ff Author: JingsongLi <lzljs3620...@aliyun.com> AuthorDate: Wed Jul 17 20:18:56 2019 +0800 [FLINK-13287][table-planner] Support STREAM_RECORD_TIMESTAMP call in table planner --- .../java/org/apache/flink/table/expressions/RexNodeConverter.java | 2 ++ .../apache/flink/table/expressions/PlannerExpressionConverter.scala | 4 ++++ .../apache/flink/table/expressions/PlannerExpressionConverter.scala | 4 ++++ 3 files changed, 10 insertions(+) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java index 2c6ef4d..5528571 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java @@ -289,6 +289,8 @@ public class RexNodeConverter implements ExpressionVisitor<RexNode> { conversionsOfBuiltInFunc .put(BuiltInFunctionDefinitions.SHA512, exprs -> convert(FlinkSqlOperatorTable.SHA512, exprs)); conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.SHA1, exprs -> convert(FlinkSqlOperatorTable.SHA1, exprs)); + conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.STREAM_RECORD_TIMESTAMP, + exprs -> convert(FlinkSqlOperatorTable.STREAMRECORD_TIMESTAMP, exprs)); } @Override diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala index d52d6e6a..208cad9 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala @@ -683,6 +683,10 @@ class PlannerExpressionConverter private extends ApiExpressionVisitor[PlannerExp assert(args.isEmpty) CurrentRow() + case STREAM_RECORD_TIMESTAMP => + assert(args.isEmpty) + StreamRecordTimestamp() + case _ => throw new TableException(s"Unsupported function definition: $fd") } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala index 999fa56..5684594 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala @@ -682,6 +682,10 @@ class PlannerExpressionConverter private extends ApiExpressionVisitor[PlannerExp assert(args.isEmpty) CurrentRow() + case STREAM_RECORD_TIMESTAMP => + assert(args.isEmpty) + StreamRecordTimestamp() + case _ => throw new TableException(s"Unsupported function definition: $fd") }