JingsongLi commented on a change in pull request #8782: [FLINK-12888]
[table-planner-blink] Introduce planner rule to push filter into TableSource
URL: https://github.com/apache/flink/pull/8782#discussion_r296078539
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/RexNodeExtractor.scala
##########
@@ -137,3 +191,158 @@ class RefFieldAccessorVisitor(usedFields: Array[Int])
extends RexVisitorImpl[Uni
override def visitCall(call: RexCall): Unit =
call.operands.foreach(operand => operand.accept(this))
}
+
+/**
+ * An RexVisitor to convert RexNode to Expression.
+ *
+ * @param inputNames The input names of the relation node
+ * @param functionCatalog The function catalog
+ */
+class RexNodeToExpressionConverter(
+ inputNames: Array[String],
+ functionCatalog: FunctionCatalog)
+ extends RexVisitor[Option[Expression]] {
+
+ override def visitInputRef(inputRef: RexInputRef): Option[Expression] = {
+ Preconditions.checkArgument(inputRef.getIndex < inputNames.length)
+ Some(new FieldReferenceExpression(
+ inputNames(inputRef.getIndex),
+
TypeConversions.fromLogicalToDataType(FlinkTypeFactory.toLogicalType(inputRef.getType)),
+ 0,
+ inputRef.getIndex
+ ))
+ }
+
+ override def visitTableInputRef(rexTableInputRef: RexTableInputRef):
Option[Expression] =
+ visitInputRef(rexTableInputRef)
+
+ override def visitLocalRef(localRef: RexLocalRef): Option[Expression] = {
+ throw new TableException("Bug: RexLocalRef should have been expanded")
+ }
+
+ override def visitLiteral(literal: RexLiteral): Option[Expression] = {
+ // TODO support SqlTrimFunction.Flag
+ literal.getValue match {
+ case _: SqlTrimFunction.Flag => return None
+ case _ => // do nothing
+ }
+
+ val literalType = FlinkTypeFactory.toLogicalType(literal.getType)
+
+ val literalValue = literalType.getTypeRoot match {
+
+ case DATE =>
+ val v = literal.getValueAs(classOf[DateString])
+ new Date(DateTimeUtils.dateStringToUnixDate(v.toString) *
DateTimeUtils.MILLIS_PER_DAY)
+
+ case TIME_WITHOUT_TIME_ZONE =>
+ val v = literal.getValueAs(classOf[TimeString])
+ new Time(DateTimeUtils.timeStringToUnixDate(v.toString(0)).longValue())
+
+ case TIMESTAMP_WITHOUT_TIME_ZONE =>
+ val v = literal.getValueAs(classOf[TimestampString])
+ new Timestamp(DateTimeUtils.timestampStringToUnixDate(v.toString(3)))
+
+ case TINYINT =>
+ // convert from BigDecimal to Byte
+ literal.getValueAs(classOf[java.lang.Byte])
+
+ case SMALLINT =>
+ // convert from BigDecimal to Short
+ literal.getValueAs(classOf[java.lang.Short])
+
+ case INTEGER =>
+ // convert from BigDecimal to Integer
+ literal.getValueAs(classOf[java.lang.Integer])
+
+ case BIGINT =>
+ // convert from BigDecimal to Long
+ literal.getValueAs(classOf[java.lang.Long])
+
+ case FLOAT =>
+ // convert from BigDecimal to Float
+ literal.getValueAs(classOf[java.lang.Float])
+
+ case DOUBLE =>
+ // convert from BigDecimal to Double
+ literal.getValueAs(classOf[java.lang.Double])
+
+ case VARCHAR =>
Review comment:
Add CHAR
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services