hequn8128 commented on a change in pull request #9865: [FLINK-14212][python] Support no-argument Python UDFs. URL: https://github.com/apache/flink/pull/9865#discussion_r333854883
########## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala ########## @@ -120,59 +124,64 @@ class ExpressionReducer( var reducedIdx = 0 while (i < constExprs.size()) { val unreduced = constExprs.get(i) - unreduced.getType.getSqlTypeName match { - // we insert the original expression for object literals - case SqlTypeName.ANY | - SqlTypeName.ROW | - SqlTypeName.ARRAY | - SqlTypeName.MAP | - SqlTypeName.MULTISET => - reducedValues.add(unreduced) - case SqlTypeName.VARCHAR | SqlTypeName.CHAR => - val escapeVarchar = StringEscapeUtils - .escapeJava(safeToString(reduced.getField(reducedIdx).asInstanceOf[BinaryString])) - reducedValues.add(maySkipNullLiteralReduce(rexBuilder, escapeVarchar, unreduced)) - reducedIdx += 1 - case SqlTypeName.VARBINARY | SqlTypeName.BINARY => - val reducedValue = reduced.getField(reducedIdx) - val value = if (null != reducedValue) { - new ByteString(reduced.getField(reducedIdx).asInstanceOf[Array[Byte]]) - } else { - reducedValue - } - reducedValues.add(maySkipNullLiteralReduce(rexBuilder, value, unreduced)) - reducedIdx += 1 - case SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE => - val value = if (!reduced.isNullAt(reducedIdx)) { - val mills = reduced.getField(reducedIdx).asInstanceOf[Long] - Long.box(SqlDateTimeUtils.timestampWithLocalZoneToTimestamp( - mills, TimeZone.getTimeZone(config.getLocalTimeZone))) - } else { - null - } - reducedValues.add(maySkipNullLiteralReduce(rexBuilder, value, unreduced)) - reducedIdx += 1 - case SqlTypeName.DECIMAL => - val reducedValue = reduced.getField(reducedIdx) - val value = if (reducedValue != null) { - reducedValue.asInstanceOf[Decimal].toBigDecimal - } else { - reducedValue - } - reducedValues.add(maySkipNullLiteralReduce(rexBuilder, value, unreduced)) - reducedIdx += 1 - case _ => - val reducedValue = reduced.getField(reducedIdx) - // RexBuilder handle double literal incorrectly, convert it into BigDecimal manually - val value = if (reducedValue != null && - unreduced.getType.getSqlTypeName == SqlTypeName.DOUBLE) { - new java.math.BigDecimal(reducedValue.asInstanceOf[Number].doubleValue()) - } else { - reducedValue - } - - reducedValues.add(maySkipNullLiteralReduce(rexBuilder, value, unreduced)) - reducedIdx += 1 + if (PythonUtil.containsFunctionOf(unreduced, FunctionLanguage.PYTHON)) { Review comment: Rex tree traverse is expensive. We traverse twice in this reduce() method. How about do some reuse here, e.g., ``` val jvmUdfs = constExprs.asScala.zipWithIndex .filter(p => PythonUtil.containsFunctionOf(p._1, FunctionLanguage.JVM)) ... if (!jvmUdfIndexes.contains(i)) { // if contains python function then just insert the original expression. reducedValues.add(unreduced) } ``` In this way, we can reduce some optimization time. What do you think? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services