[ 
https://issues.apache.org/jira/browse/FLINK-7062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655165#comment-16655165
 ] 

ASF GitHub Bot commented on FLINK-7062:
---------------------------------------

dawidwys commented on a change in pull request #6815:  [FLINK-7062][cep][table] 
Added basic support for MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/6815#discussion_r226283576
 
 

 ##########
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamMatch.scala
 ##########
 @@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.calcite.rel._
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.SqlKind
+import org.apache.calcite.sql.SqlMatchRecognize.AfterOption
+import org.apache.calcite.sql.fun.SqlStdOperatorTable._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.TypeComparator
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy
+import org.apache.flink.cep.nfa.compiler.NFACompiler
+import org.apache.flink.cep.pattern.Pattern
+import org.apache.flink.cep.pattern.Quantifier.QuantifierProperty
+import org.apache.flink.cep.pattern.conditions.BooleanConditions
+import org.apache.flink.cep.{CEP, EventComparator, PatternStream}
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, 
TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.nodes.MatchRecognize
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalMatch
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.plan.util.RexDefaultVisitor
+import org.apache.flink.table.runtime.`match`._
+import org.apache.flink.table.runtime.aggregate.SortUtil
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.table.runtime.{RowKeySelector, RowtimeProcessFunction}
+import org.apache.flink.types.Row
+import org.apache.flink.util.MathUtils
+
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode which matches along with LogicalMatch.
+  */
+class DataStreamMatch(
+  cluster: RelOptCluster,
+  traitSet: RelTraitSet,
+  input: RelNode,
+  logicalMatch: FlinkLogicalMatch,
+  schema: RowSchema,
+  inputSchema: RowSchema)
+  extends SingleRel(cluster, traitSet, input)
+    with MatchRecognize
+    with DataStreamRel {
+
+  private[flink] def getLogicalMatch = logicalMatch
+
+  override def deriveRowType(): RelDataType = schema.relDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
+    new DataStreamMatch(
+      cluster,
+      traitSet,
+      inputs.get(0),
+      logicalMatch,
+      schema,
+      inputSchema)
+  }
+
+  override def toString: String = {
+    matchToString(logicalMatch, inputSchema, getExpressionString)
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    explainMatch(super.explainTerms(pw), logicalMatch, inputSchema, 
getExpressionString)
+  }
+
+  private def translateOrder(
+    tableEnv: StreamTableEnvironment,
+    crowInput: DataStream[CRow],
+    orderKeys: RelCollation) = {
+
+    if (orderKeys.getFieldCollations.size() == 0) {
+      throw new TableException("You must specify either rowtime or proctime 
for order by.")
+    }
+
+    // need to identify time between others order fields. Time needs to be 
first sort element
+    val timeOrderField = SortUtil.getFirstSortField(orderKeys, 
inputSchema.relDataType)
+
+    if (!FlinkTypeFactory.isTimeIndicatorType(timeOrderField.getType)) {
+      throw new TableException(
+        "You must specify either rowtime or proctime for order by as the first 
one.")
+    }
+
+    // time ordering needs to be ascending
+    if (SortUtil.getFirstSortDirection(orderKeys) != Direction.ASCENDING) {
+      throw new TableException("Primary sort order of a streaming table must 
be ascending on time.")
+    }
+
+    val rowComparator = if (orderKeys.getFieldCollations.size() > 1) {
+      Some(SortUtil
+        .createRowComparator(inputSchema.relDataType,
+          orderKeys.getFieldCollations.asScala.tail,
+          tableEnv.execEnv.getConfig))
+    } else {
+      None
+    }
+
+    timeOrderField.getType match {
+      case _ if 
FlinkTypeFactory.isRowtimeIndicatorType(timeOrderField.getType) =>
+        (crowInput.process(
+          new RowtimeProcessFunction(timeOrderField.getIndex, 
CRowTypeInfo(inputSchema.typeInfo))
+        ).setParallelism(crowInput.getParallelism),
+          rowComparator)
+      case _ =>
+        (crowInput, rowComparator)
+    }
+  }
+
+  private def applyPartitioning(partitionKeys: util.List[RexNode], inputDs: 
DataStream[Row]) = {
+    if (partitionKeys.size() > 0) {
+      val keys = partitionKeys.asScala.map {
+        case ref: RexInputRef => ref.getIndex
+      }.toArray
+      val keySelector = new RowKeySelector(keys, 
inputSchema.projectedTypeInfo(keys))
+      inputDs.keyBy(keySelector)
+    } else {
+      inputDs
+    }
+  }
+
+  override def translateToPlan(
+    tableEnv: StreamTableEnvironment,
+    queryConfig: StreamQueryConfig): DataStream[CRow] = {
+
+    val config = tableEnv.config
+    val inputTypeInfo = inputSchema.typeInfo
+
+    val crowInput: DataStream[CRow] = getInput
+      .asInstanceOf[DataStreamRel]
+      .translateToPlan(tableEnv, queryConfig)
+
+    val orderKeys = logicalMatch.getOrderKeys.getFieldCollations
+    val (timestampedInput, rowComparator) = translateOrder(tableEnv,
+      crowInput,
+      logicalMatch.getOrderKeys)
+
+    val cepPattern = logicalMatch.getPattern
+      .accept(new PatternVisitor(config, inputTypeInfo, logicalMatch))
+
+    //TODO remove this once it is supported in CEP library
+    if (NFACompiler.canProduceEmptyMatches(cepPattern)) {
+      throw new TableException(
+        "Patterns that can produce empty matches are not supported. There must 
be at least one " +
+          "non-optional state.")
+    }
+
+    //TODO remove this once it is supported in CEP library
+    if (cepPattern.getQuantifier.hasProperty(QuantifierProperty.GREEDY)) {
+      throw new TableException(
+        "Greedy quantifiers are not allowed as the last element of a Pattern 
yet. Finish your " +
+          "pattern with either a simple variable or reluctant quantifier.")
+    }
+
+    if (logicalMatch.getInterval != null) {
+      throw new TableException(
+        "WITHIN clause is not part of the SQL Standard, thus it is not 
supported.")
+    }
+
+    val inputDS: DataStream[Row] = timestampedInput
+      .map(new ConvertToRow)
+      .setParallelism(timestampedInput.getParallelism)
+      .name("ConvertToRow")
+      .returns(inputTypeInfo)
+
+    val partitionKeys = logicalMatch.getPartitionKeys
+    val partitionedStream = applyPartitioning(partitionKeys, inputDS)
+
+    val patternStream: PatternStream[Row] = if (rowComparator.isDefined) {
+      CEP.pattern[Row](partitionedStream, cepPattern, new 
EventRowComparator(rowComparator.get))
+    } else {
+      CEP.pattern[Row](partitionedStream, cepPattern)
+    }
+
+    val measures = logicalMatch.getMeasures
+    val outTypeInfo = CRowTypeInfo(schema.typeInfo)
+    if (logicalMatch.isAllRows) {
+      throw new TableException("All rows per match mode is not supported yet.")
+    } else {
+      val patternSelectFunction =
+        MatchUtil.generateOneRowPerMatchExpression(
+          config,
+          schema,
+          partitionKeys,
+          orderKeys,
+          measures,
+          inputTypeInfo)
+      patternStream.flatSelect[CRow](patternSelectFunction, outTypeInfo)
+    }
+  }
+}
+
+private[flink] class PatternVisitor(
+  config: TableConfig,
+  inputTypeInfo: TypeInformation[Row],
+  logicalMatch: FlinkLogicalMatch
+) extends RexDefaultVisitor[Pattern[Row, Row]] {
+
+  private var pattern: Pattern[Row, Row] = _
+  private val names = new util.HashSet[String]()
+
+  private def translateSkipStrategy = {
+    val getPatternTarget = () => 
logicalMatch.getAfter.asInstanceOf[RexCall].getOperands.get(0)
+      .asInstanceOf[RexLiteral].getValueAs(classOf[String])
+
+    logicalMatch.getAfter.getKind match {
+      case SqlKind.LITERAL =>
+        
logicalMatch.getAfter.asInstanceOf[RexLiteral].getValueAs(classOf[AfterOption]) 
match {
+          case AfterOption.SKIP_PAST_LAST_ROW => 
AfterMatchSkipStrategy.skipPastLastEvent()
+          case AfterOption.SKIP_TO_NEXT_ROW => 
AfterMatchSkipStrategy.skipToNext()
+        }
+      case SqlKind.SKIP_TO_FIRST =>
+        
AfterMatchSkipStrategy.skipToFirst(getPatternTarget()).throwExceptionOnMiss()
+      case SqlKind.SKIP_TO_LAST =>
+        
AfterMatchSkipStrategy.skipToLast(getPatternTarget()).throwExceptionOnMiss()
+    }
+  }
+
+  private def translateSingleVariable(
+    previousPattern: Option[Pattern[Row, Row]],
+    patternName: String
+  ): Pattern[Row, Row] = {
+    if (names.contains(patternName)) {
+      throw new TableException("Pattern variables must be unique. That might 
change in the future.")
+    } else {
+      names.add(patternName)
+    }
+
+    previousPattern match {
+      case Some(p) => p.next(patternName)
+      case None =>
+        Pattern.begin(patternName, translateSkipStrategy)
+    }
+  }
+
+  private def applyQuantifier(
+    pattern: Pattern[Row, Row],
+    startNum: Int,
+    endNum: Int,
+    greedy: Boolean
+  ): Pattern[Row, Row] = {
+
+    val isOptional = (startNum == 0) && endNum == 1
+
+    val newPattern = if (startNum == 0 && endNum == -1) { // zero or more
+      pattern.oneOrMore().optional().consecutive()
+    } else if (startNum == 1 && endNum == -1) { // one or more
+      pattern.oneOrMore().consecutive()
+    } else if (isOptional) { // optional
+      pattern.optional()
+    } else if (endNum != -1) { // times
+      pattern.times(startNum, endNum).consecutive()
+    } else { // times or more
+      pattern.timesOrMore(startNum).consecutive()
+    }
+
+    if (greedy && isOptional) {
+      newPattern
+    } else if (greedy) {
+      newPattern.greedy()
+    } else if (isOptional) {
+      throw new TableException("Reluctant optional variables are not supported 
yet.")
+    } else {
+      newPattern
+    }
+  }
+
+  override def visitLiteral(literal: RexLiteral): Pattern[Row, Row] = {
+    val patternName = literal.getValue3.toString
+    pattern = translateSingleVariable(Option.apply(pattern), patternName)
+
+    val patternDefinition = logicalMatch.getPatternDefinitions.get(patternName)
+    if (patternDefinition != null) {
+      val condition = MatchUtil.generateIterativeCondition(
+        config,
+        patternName,
+        patternDefinition,
+        inputTypeInfo)
+
+      pattern.where(condition)
+    } else {
+      pattern.where(BooleanConditions.trueFunction())
+    }
+  }
+
+  override def visitCall(call: RexCall): Pattern[Row, Row] = {
+    call.getOperator match {
+      case PATTERN_CONCAT =>
+        val left = call.operands.get(0)
+        val right = call.operands.get(1)
+
+        pattern = left.accept(this)
+        pattern = right.accept(this)
+        pattern
+
+      case PATTERN_QUANTIFIER =>
+        val name = call.operands.get(0) match {
+          case c: RexLiteral => c
+          case _ => throw new TableException("Group patterns are not supported 
yet.")
+        }
+        pattern = name.accept(this)
+        val startNum = 
MathUtils.checkedDownCast(call.operands.get(1).asInstanceOf[RexLiteral]
+          .getValueAs(classOf[_root_.java.lang.Long]))
+        val endNum = 
MathUtils.checkedDownCast(call.operands.get(2).asInstanceOf[RexLiteral]
+          .getValueAs(classOf[_root_.java.lang.Long]))
+        val isGreedy = !call.operands.get(3).asInstanceOf[RexLiteral]
+          .getValueAs(classOf[_root_.java.lang.Boolean])
+
+        applyQuantifier(pattern, startNum, endNum, isGreedy)
+
+      case PATTERN_ALTER =>
+        throw TableException("Currently, CEP doesn't support branching 
patterns.")
+
+      case PATTERN_PERMUTE =>
+        throw TableException("Currently, CEP doesn't support PERMUTE 
patterns.")
+
+      case PATTERN_EXCLUDE =>
+        throw TableException("Currently, CEP doesn't support '{-' '-}' 
patterns.")
+    }
+  }
+
+  override def visitNode(rexNode: RexNode): Pattern[Row, Row] = throw new 
TableException(
+    s"Unsupported expression within Pattern: [$rexNode]")
+}
+
+/**
+  * Wrapper for Row TypeComparator to a Java Comparator object
+  */
+class EventRowComparator(
 
 Review comment:
   I've moved it to the `org.apache.flink.table.runtime.`match`` specific 
package as it uses CEP specific interface.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support the basic functionality of MATCH_RECOGNIZE
> --------------------------------------------------
>
>                 Key: FLINK-7062
>                 URL: https://issues.apache.org/jira/browse/FLINK-7062
>             Project: Flink
>          Issue Type: Sub-task
>          Components: CEP, Table API & SQL
>            Reporter: Dian Fu
>            Assignee: Dian Fu
>            Priority: Major
>              Labels: pull-request-available
>
> In this JIRA, we will support the basic functionality of {{MATCH_RECOGNIZE}} 
> in Flink SQL API which includes the support of syntax {{MEASURES}}, 
> {{PATTERN}} and {{DEFINE}}. This would allow users write basic cep use cases 
> with SQL like the following example:
> {code}
> SELECT T.aid, T.bid, T.cid
> FROM MyTable
> MATCH_RECOGNIZE (
>   MEASURES
>     A.id AS aid,
>     B.id AS bid,
>     C.id AS cid
>   PATTERN (A B C)
>   DEFINE
>     A AS A.name = 'a',
>     B AS B.name = 'b',
>     C AS C.name = 'c'
> ) AS T
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to