snuyanzin commented on code in PR #28113:
URL: https://github.com/apache/flink/pull/28113#discussion_r3196292196
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CalcCodeGenerator.scala:
##########
@@ -181,26 +195,58 @@ object CalcCodeGenerator {
val filterInputCode = ctx.reuseInputUnboxingCode()
val filterInputSet = Set(ctx.reusableInputUnboxingExprs.keySet.toSeq:
_*)
+ val filterLocalRefSet: Set[Int] =
ctx.reusableLocalRefExprs.keySet.toSet
+
// if any filter conditions, projection code will enter an new scope
val projectionCode = produceProjectionCode
val projectionInputCode = ctx.reusableInputUnboxingExprs
- .filter(entry => !filterInputSet.contains(entry._1))
+ .filter { case (k, _) => !filterInputSet.contains(k) }
+ .values
+ .map(_.code)
+ .mkString("\n")
+
+ val filterLocalRefCode = ctx.reusableLocalRefExprs
+ .filter { case (k, _) => filterLocalRefSet.contains(k) }
.values
.map(_.code)
.mkString("\n")
+ val projectionLocalRefCode = ctx.reusableLocalRefExprs
+ .filter { case (k, _) => !filterLocalRefSet.contains(k) }
+ .values
+ .map(_.code)
+ .mkString("\n")
+
s"""
|${if (eagerInputUnboxingCode) filterInputCode else ""}
+ |$filterLocalRefCode
|${filterCondition.code}
|if (${filterCondition.resultTerm}) {
- | ${if (eagerInputUnboxingCode) projectionInputCode else ""}
+ | ${if (eagerInputUnboxingCode) projectionInputCode else ""}
+ | $projectionLocalRefCode
| $projectionCode
|}
|""".stripMargin
}
}
}
+ private def buildRexProgram(
+ classLoader: ClassLoader,
+ inputType: RowType,
+ projection: Seq[RexNode],
+ condition: Option[RexNode]): RexProgram = {
+ val typeFactory = new FlinkTypeFactory(classLoader,
FlinkTypeSystem.INSTANCE)
Review Comment:
reused existing typeFactory
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]