pnowojski commented on a change in pull request #7440: [FLINK-10591][table]
Introduced functions to return time attributes from MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7440#discussion_r247512002
##########
File path:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/PatternProcessFunctionRunner.scala
##########
@@ -33,41 +34,42 @@ import org.apache.flink.util.Collector
/**
* PatternSelectFunctionRunner with [[Row]] input and [[CRow]] output.
*/
-class PatternSelectFunctionRunner(
+class PatternProcessFunctionRunner(
name: String,
code: String)
- extends RichPatternFlatSelectFunction[Row, CRow]
- with Compiler[RichPatternSelectFunction[Row, Row]]
+ extends PatternProcessFunction[Row, CRow]
+ with Compiler[PatternProcessFunction[Row, Row]]
with Logging {
- @transient private var outCRow: CRow = _
+ @transient private var cRowWrappingCollector : CRowWrappingCollector = _
- @transient private var function: RichPatternSelectFunction[Row, Row] = _
+ @transient private var function: PatternProcessFunction[Row, Row] = _
override def open(parameters: Configuration): Unit = {
- if (outCRow == null) {
- outCRow = new CRow(null, true)
- }
+ this.cRowWrappingCollector = new CRowWrappingCollector
- LOG.debug(s"Compiling PatternSelectFunction: $name \n\n Code:\n$code")
+ LOG.debug(s"Compiling PatternProcessFunction: $name \n\n Code:\n$code")
val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
- LOG.debug("Instantiating PatternSelectFunction.")
+ LOG.debug("Instantiating PatternProcessFunction.")
function = clazz.newInstance()
FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
FunctionUtils.openFunction(function, parameters)
}
- override def flatSelect(
- pattern: util.Map[String, util.List[Row]],
- out: Collector[CRow])
- : Unit = {
- outCRow.row = function.select(pattern)
+ override def processMatch(
+ `match`: util.Map[String, util.List[Row]],
+ ctx: PatternProcessFunction.Context,
+ out: Collector[CRow]): Unit = {
+
out.asInstanceOf[TimestampedCollector[_]].eraseTimestamp()
- out.collect(outCRow)
+ cRowWrappingCollector.out = out
+ function.processMatch(`match`, ctx, cRowWrappingCollector)
}
override def close(): Unit = {
FunctionUtils.closeFunction(function)
}
+
Review comment:
Remove the white lines?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services