twalthr commented on a change in pull request #6815: [FLINK-7062][cep][table]
Added basic support for MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/6815#discussion_r225590279
##########
File path:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
##########
@@ -172,71 +204,112 @@ class MatchCodeGenerator(
if (count != 0) {
throw new TableException("Flink does not support physical offsets
within partition.")
} else {
- val patternFieldVisitor = new RexPatternFieldRefVisitor(config,
- nullableInput,
+ val patternFieldVisitor = new RexPatternFieldRefVisitor(
+ config,
input,
0,
- false)
- call.getOperands.get(0).accept(patternFieldVisitor)
+ false,
+ reusablePatternListNames)
+ applyPatternVisitorToRef(call.getOperands.get(0),
patternFieldVisitor)
}
- case MATCH_NUMBER =>
- throw new TableException(s"Unsupported call: $call")
-
case FIRST | LAST =>
val countLiteral = call.operands.get(1).asInstanceOf[RexLiteral]
val count = checkedDownCast(countLiteral.getValueAs(classOf[JLong]))
val patternRef = call.operands.get(0)
- val patternFieldVisitor = new RexPatternFieldRefVisitor(config,
- nullableInput,
+ val patternFieldVisitor = new RexPatternFieldRefVisitor(
+ config,
input,
count,
- call.getOperator == FIRST)
- patternRef.accept(patternFieldVisitor)
+ call.getOperator == FIRST,
+ reusablePatternListNames)
+ applyPatternVisitorToRef(patternRef, patternFieldVisitor)
- case CLASSIFIER | RUNNING =>
- throw new TableException(s"${call.getOperator} is not supported yet.")
case FINAL =>
call.getOperands.get(0).accept(this)
case _ => super.visitCall(call)
}
}
+ private def applyPatternVisitorToRef(
+ node: RexNode,
+ visitor: RexPatternFieldRefVisitor)
+ : GeneratedExpression = {
+ val code = node.accept(visitor)
+ reusablePerRecordStatements.add(visitor.reusePerRecordCode())
+ reusableMemberStatements.add(visitor.reuseMemberCode())
+ reusableInitStatements.add(visitor.reuseInitCode())
+
+ code
+ }
+
+ private def newNameForPatternList(patternName: String) = {
+ reusablePatternListNames.getOrElseUpdate(patternName,
newName("patternEvents"))
+ }
+
+ override private[flink] def generateProctimeTimestamp() = {
+ val resultTerm = newName("result")
+
+ //TODO use timerService once it is available in PatternFlatSelectFunction
+ val resultCode =
+ s"""
+ |long $resultTerm = System.currentTimeMillis();
+ |""".stripMargin
+ GeneratedExpression(resultTerm, NEVER_NULL, resultCode,
SqlTimeTypeInfo.TIMESTAMP)
+ }
+
override def visitPatternFieldRef(fieldRef: RexPatternFieldRef):
GeneratedExpression = {
- val patternFieldRefVisitor = new RexPatternFieldRefVisitor(config,
- nullableInput,
+ val patternFieldRefVisitor = new RexPatternFieldRefVisitor(
+ config,
input,
0,
- false)
- fieldRef.accept(patternFieldRefVisitor)
+ false,
+ reusablePatternListNames)
+ applyPatternVisitorToRef(fieldRef, patternFieldRefVisitor)
}
- class RexPatternFieldRefVisitor(
+ private class RexPatternFieldRefVisitor(
config: TableConfig,
- nullableInput: Boolean,
input: TypeInformation[_ <: Any],
offset: Int,
- first: Boolean) extends CodeGenerator(config, nullableInput, input) {
+ first: Boolean,
+ patternListNames: mutable.Map[String, String]) extends
CodeGenerator(config, false, input) {
+
+ private case class GeneratedCode(resultTerm: String, code: String)
Review comment:
All usages of `GeneratedCode` have actually no code. So we could simply
return String in those cases.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services