[
https://issues.apache.org/jira/browse/FLINK-7062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653378#comment-16653378
]
ASF GitHub Bot commented on FLINK-7062:
---------------------------------------
twalthr commented on a change in pull request #6815: [FLINK-7062][cep][table]
Added basic support for MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/6815#discussion_r225590279
##########
File path:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
##########
@@ -172,71 +204,112 @@ class MatchCodeGenerator(
if (count != 0) {
throw new TableException("Flink does not support physical offsets
within partition.")
} else {
- val patternFieldVisitor = new RexPatternFieldRefVisitor(config,
- nullableInput,
+ val patternFieldVisitor = new RexPatternFieldRefVisitor(
+ config,
input,
0,
- false)
- call.getOperands.get(0).accept(patternFieldVisitor)
+ false,
+ reusablePatternListNames)
+ applyPatternVisitorToRef(call.getOperands.get(0),
patternFieldVisitor)
}
- case MATCH_NUMBER =>
- throw new TableException(s"Unsupported call: $call")
-
case FIRST | LAST =>
val countLiteral = call.operands.get(1).asInstanceOf[RexLiteral]
val count = checkedDownCast(countLiteral.getValueAs(classOf[JLong]))
val patternRef = call.operands.get(0)
- val patternFieldVisitor = new RexPatternFieldRefVisitor(config,
- nullableInput,
+ val patternFieldVisitor = new RexPatternFieldRefVisitor(
+ config,
input,
count,
- call.getOperator == FIRST)
- patternRef.accept(patternFieldVisitor)
+ call.getOperator == FIRST,
+ reusablePatternListNames)
+ applyPatternVisitorToRef(patternRef, patternFieldVisitor)
- case CLASSIFIER | RUNNING =>
- throw new TableException(s"${call.getOperator} is not supported yet.")
case FINAL =>
call.getOperands.get(0).accept(this)
case _ => super.visitCall(call)
}
}
+ private def applyPatternVisitorToRef(
+ node: RexNode,
+ visitor: RexPatternFieldRefVisitor)
+ : GeneratedExpression = {
+ val code = node.accept(visitor)
+ reusablePerRecordStatements.add(visitor.reusePerRecordCode())
+ reusableMemberStatements.add(visitor.reuseMemberCode())
+ reusableInitStatements.add(visitor.reuseInitCode())
+
+ code
+ }
+
+ private def newNameForPatternList(patternName: String) = {
+ reusablePatternListNames.getOrElseUpdate(patternName,
newName("patternEvents"))
+ }
+
+ override private[flink] def generateProctimeTimestamp() = {
+ val resultTerm = newName("result")
+
+ //TODO use timerService once it is available in PatternFlatSelectFunction
+ val resultCode =
+ s"""
+ |long $resultTerm = System.currentTimeMillis();
+ |""".stripMargin
+ GeneratedExpression(resultTerm, NEVER_NULL, resultCode,
SqlTimeTypeInfo.TIMESTAMP)
+ }
+
override def visitPatternFieldRef(fieldRef: RexPatternFieldRef):
GeneratedExpression = {
- val patternFieldRefVisitor = new RexPatternFieldRefVisitor(config,
- nullableInput,
+ val patternFieldRefVisitor = new RexPatternFieldRefVisitor(
+ config,
input,
0,
- false)
- fieldRef.accept(patternFieldRefVisitor)
+ false,
+ reusablePatternListNames)
+ applyPatternVisitorToRef(fieldRef, patternFieldRefVisitor)
}
- class RexPatternFieldRefVisitor(
+ private class RexPatternFieldRefVisitor(
config: TableConfig,
- nullableInput: Boolean,
input: TypeInformation[_ <: Any],
offset: Int,
- first: Boolean) extends CodeGenerator(config, nullableInput, input) {
+ first: Boolean,
+ patternListNames: mutable.Map[String, String]) extends
CodeGenerator(config, false, input) {
+
+ private case class GeneratedCode(resultTerm: String, code: String)
Review comment:
All usages of `GeneratedCode` have actually no code. So we could simply
return String in those cases.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Support the basic functionality of MATCH_RECOGNIZE
> --------------------------------------------------
>
> Key: FLINK-7062
> URL: https://issues.apache.org/jira/browse/FLINK-7062
> Project: Flink
> Issue Type: Sub-task
> Components: CEP, Table API & SQL
> Reporter: Dian Fu
> Assignee: Dian Fu
> Priority: Major
> Labels: pull-request-available
>
> In this JIRA, we will support the basic functionality of {{MATCH_RECOGNIZE}}
> in Flink SQL API which includes the support of syntax {{MEASURES}},
> {{PATTERN}} and {{DEFINE}}. This would allow users write basic cep use cases
> with SQL like the following example:
> {code}
> SELECT T.aid, T.bid, T.cid
> FROM MyTable
> MATCH_RECOGNIZE (
> MEASURES
> A.id AS aid,
> B.id AS bid,
> C.id AS cid
> PATTERN (A B C)
> DEFINE
> A AS A.name = 'a',
> B AS B.name = 'b',
> C AS C.name = 'c'
> ) AS T
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)