hequn8128 commented on a change in pull request #9865: [FLINK-14212][python] 
Support no-argument Python UDFs.
URL: https://github.com/apache/flink/pull/9865#discussion_r333854883
 
 

 ##########
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala
 ##########
 @@ -120,59 +124,64 @@ class ExpressionReducer(
     var reducedIdx = 0
     while (i < constExprs.size()) {
       val unreduced = constExprs.get(i)
-      unreduced.getType.getSqlTypeName match {
-        // we insert the original expression for object literals
-        case SqlTypeName.ANY |
-             SqlTypeName.ROW |
-             SqlTypeName.ARRAY |
-             SqlTypeName.MAP |
-             SqlTypeName.MULTISET =>
-          reducedValues.add(unreduced)
-        case SqlTypeName.VARCHAR | SqlTypeName.CHAR =>
-          val escapeVarchar = StringEscapeUtils
-            
.escapeJava(safeToString(reduced.getField(reducedIdx).asInstanceOf[BinaryString]))
-          reducedValues.add(maySkipNullLiteralReduce(rexBuilder, 
escapeVarchar, unreduced))
-          reducedIdx += 1
-        case SqlTypeName.VARBINARY | SqlTypeName.BINARY =>
-          val reducedValue = reduced.getField(reducedIdx)
-          val value = if (null != reducedValue) {
-            new 
ByteString(reduced.getField(reducedIdx).asInstanceOf[Array[Byte]])
-          } else {
-            reducedValue
-          }
-          reducedValues.add(maySkipNullLiteralReduce(rexBuilder, value, 
unreduced))
-          reducedIdx += 1
-        case SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
-          val value = if (!reduced.isNullAt(reducedIdx)) {
-            val mills = reduced.getField(reducedIdx).asInstanceOf[Long]
-            Long.box(SqlDateTimeUtils.timestampWithLocalZoneToTimestamp(
-              mills, TimeZone.getTimeZone(config.getLocalTimeZone)))
-          } else {
-            null
-          }
-          reducedValues.add(maySkipNullLiteralReduce(rexBuilder, value, 
unreduced))
-          reducedIdx += 1
-        case SqlTypeName.DECIMAL =>
-          val reducedValue = reduced.getField(reducedIdx)
-          val value = if (reducedValue != null) {
-            reducedValue.asInstanceOf[Decimal].toBigDecimal
-          } else {
-            reducedValue
-          }
-          reducedValues.add(maySkipNullLiteralReduce(rexBuilder, value, 
unreduced))
-          reducedIdx += 1
-        case _ =>
-          val reducedValue = reduced.getField(reducedIdx)
-          // RexBuilder handle double literal incorrectly, convert it into 
BigDecimal manually
-          val value = if (reducedValue != null &&
-            unreduced.getType.getSqlTypeName == SqlTypeName.DOUBLE) {
-            new 
java.math.BigDecimal(reducedValue.asInstanceOf[Number].doubleValue())
-          } else {
-            reducedValue
-          }
-
-          reducedValues.add(maySkipNullLiteralReduce(rexBuilder, value, 
unreduced))
-          reducedIdx += 1
+      if (PythonUtil.containsFunctionOf(unreduced, FunctionLanguage.PYTHON)) {
 
 Review comment:
   Rex tree traverse is expensive. We traverse twice in this reduce() method. 
How about do some reuse here, e.g., 
   ```
       val jvmUdfs = constExprs.asScala.zipWithIndex
         .filter(p => PythonUtil.containsFunctionOf(p._1, FunctionLanguage.JVM))
   ...
   
        if (!jvmUdfIndexes.contains(i)) {
           // if contains python function then just insert the original 
expression.
           reducedValues.add(unreduced)
         }
   ```
   In this way, we can reduce some optimization time. What do you think?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to