[ 
https://issues.apache.org/jira/browse/FLINK-7062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655158#comment-16655158
 ] 

ASF GitHub Bot commented on FLINK-7062:
---------------------------------------

dawidwys commented on a change in pull request #6815:  [FLINK-7062][cep][table] 
Added basic support for MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/6815#discussion_r226282704
 
 

 ##########
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamMatch.scala
 ##########
 @@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.calcite.rel._
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.SqlKind
+import org.apache.calcite.sql.SqlMatchRecognize.AfterOption
+import org.apache.calcite.sql.fun.SqlStdOperatorTable._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.TypeComparator
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy
+import org.apache.flink.cep.nfa.compiler.NFACompiler
+import org.apache.flink.cep.pattern.Pattern
+import org.apache.flink.cep.pattern.Quantifier.QuantifierProperty
+import org.apache.flink.cep.pattern.conditions.BooleanConditions
+import org.apache.flink.cep.{CEP, EventComparator, PatternStream}
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, 
TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.nodes.MatchRecognize
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalMatch
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.plan.util.RexDefaultVisitor
+import org.apache.flink.table.runtime.`match`._
+import org.apache.flink.table.runtime.aggregate.SortUtil
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.table.runtime.{RowKeySelector, RowtimeProcessFunction}
+import org.apache.flink.types.Row
+import org.apache.flink.util.MathUtils
+
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode which matches along with LogicalMatch.
+  */
+class DataStreamMatch(
+  cluster: RelOptCluster,
+  traitSet: RelTraitSet,
+  input: RelNode,
+  logicalMatch: FlinkLogicalMatch,
+  schema: RowSchema,
+  inputSchema: RowSchema)
+  extends SingleRel(cluster, traitSet, input)
+    with MatchRecognize
+    with DataStreamRel {
+
+  private[flink] def getLogicalMatch = logicalMatch
+
+  override def deriveRowType(): RelDataType = schema.relDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
+    new DataStreamMatch(
+      cluster,
+      traitSet,
+      inputs.get(0),
+      logicalMatch,
+      schema,
+      inputSchema)
+  }
+
+  override def toString: String = {
+    matchToString(logicalMatch, inputSchema, getExpressionString)
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    explainMatch(super.explainTerms(pw), logicalMatch, inputSchema, 
getExpressionString)
+  }
+
+  private def translateOrder(
+    tableEnv: StreamTableEnvironment,
+    crowInput: DataStream[CRow],
+    orderKeys: RelCollation) = {
+
+    if (orderKeys.getFieldCollations.size() == 0) {
+      throw new TableException("You must specify either rowtime or proctime 
for order by.")
+    }
+
+    // need to identify time between others order fields. Time needs to be 
first sort element
+    val timeOrderField = SortUtil.getFirstSortField(orderKeys, 
inputSchema.relDataType)
+
+    if (!FlinkTypeFactory.isTimeIndicatorType(timeOrderField.getType)) {
+      throw new TableException(
+        "You must specify either rowtime or proctime for order by as the first 
one.")
+    }
+
+    // time ordering needs to be ascending
+    if (SortUtil.getFirstSortDirection(orderKeys) != Direction.ASCENDING) {
+      throw new TableException("Primary sort order of a streaming table must 
be ascending on time.")
+    }
+
+    val rowComparator = if (orderKeys.getFieldCollations.size() > 1) {
+      Some(SortUtil
+        .createRowComparator(inputSchema.relDataType,
+          orderKeys.getFieldCollations.asScala.tail,
+          tableEnv.execEnv.getConfig))
+    } else {
+      None
+    }
+
+    timeOrderField.getType match {
+      case _ if 
FlinkTypeFactory.isRowtimeIndicatorType(timeOrderField.getType) =>
+        (crowInput.process(
+          new RowtimeProcessFunction(timeOrderField.getIndex, 
CRowTypeInfo(inputSchema.typeInfo))
+        ).setParallelism(crowInput.getParallelism),
+          rowComparator)
+      case _ =>
+        (crowInput, rowComparator)
+    }
+  }
+
+  private def applyPartitioning(partitionKeys: util.List[RexNode], inputDs: 
DataStream[Row]) = {
+    if (partitionKeys.size() > 0) {
+      val keys = partitionKeys.asScala.map {
+        case ref: RexInputRef => ref.getIndex
+      }.toArray
+      val keySelector = new RowKeySelector(keys, 
inputSchema.projectedTypeInfo(keys))
+      inputDs.keyBy(keySelector)
+    } else {
+      inputDs
+    }
+  }
+
+  override def translateToPlan(
+    tableEnv: StreamTableEnvironment,
+    queryConfig: StreamQueryConfig): DataStream[CRow] = {
+
+    val config = tableEnv.config
+    val inputTypeInfo = inputSchema.typeInfo
+
+    val crowInput: DataStream[CRow] = getInput
+      .asInstanceOf[DataStreamRel]
+      .translateToPlan(tableEnv, queryConfig)
+
+    val orderKeys = logicalMatch.getOrderKeys.getFieldCollations
+    val (timestampedInput, rowComparator) = translateOrder(tableEnv,
+      crowInput,
+      logicalMatch.getOrderKeys)
+
+    val cepPattern = logicalMatch.getPattern
+      .accept(new PatternVisitor(config, inputTypeInfo, logicalMatch))
+
+    //TODO remove this once it is supported in CEP library
+    if (NFACompiler.canProduceEmptyMatches(cepPattern)) {
+      throw new TableException(
+        "Patterns that can produce empty matches are not supported. There must 
be at least one " +
+          "non-optional state.")
+    }
+
+    //TODO remove this once it is supported in CEP library
+    if (cepPattern.getQuantifier.hasProperty(QuantifierProperty.GREEDY)) {
+      throw new TableException(
+        "Greedy quantifiers are not allowed as the last element of a Pattern 
yet. Finish your " +
+          "pattern with either a simple variable or reluctant quantifier.")
+    }
+
+    if (logicalMatch.getInterval != null) {
+      throw new TableException(
+        "WITHIN clause is not part of the SQL Standard, thus it is not 
supported.")
+    }
+
+    val inputDS: DataStream[Row] = timestampedInput
+      .map(new ConvertToRow)
+      .setParallelism(timestampedInput.getParallelism)
+      .name("ConvertToRow")
+      .returns(inputTypeInfo)
+
+    val partitionKeys = logicalMatch.getPartitionKeys
+    val partitionedStream = applyPartitioning(partitionKeys, inputDS)
+
+    val patternStream: PatternStream[Row] = if (rowComparator.isDefined) {
+      CEP.pattern[Row](partitionedStream, cepPattern, new 
EventRowComparator(rowComparator.get))
+    } else {
+      CEP.pattern[Row](partitionedStream, cepPattern)
+    }
+
+    val measures = logicalMatch.getMeasures
+    val outTypeInfo = CRowTypeInfo(schema.typeInfo)
+    if (logicalMatch.isAllRows) {
+      throw new TableException("All rows per match mode is not supported yet.")
+    } else {
+      val patternSelectFunction =
+        MatchUtil.generateOneRowPerMatchExpression(
+          config,
+          schema,
+          partitionKeys,
+          orderKeys,
+          measures,
+          inputTypeInfo)
+      patternStream.flatSelect[CRow](patternSelectFunction, outTypeInfo)
+    }
+  }
+}
+
+private[flink] class PatternVisitor(
+  config: TableConfig,
+  inputTypeInfo: TypeInformation[Row],
+  logicalMatch: FlinkLogicalMatch
+) extends RexDefaultVisitor[Pattern[Row, Row]] {
+
+  private var pattern: Pattern[Row, Row] = _
 
 Review comment:
   In general I agree, I never check against null explicitly (I go through 
Option), but having Option as the field makes the code much more complex, as I 
would need to change all method return types in this class. I used null here 
intentionally to simplify it a little bit.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support the basic functionality of MATCH_RECOGNIZE
> --------------------------------------------------
>
>                 Key: FLINK-7062
>                 URL: https://issues.apache.org/jira/browse/FLINK-7062
>             Project: Flink
>          Issue Type: Sub-task
>          Components: CEP, Table API & SQL
>            Reporter: Dian Fu
>            Assignee: Dian Fu
>            Priority: Major
>              Labels: pull-request-available
>
> In this JIRA, we will support the basic functionality of {{MATCH_RECOGNIZE}} 
> in Flink SQL API which includes the support of syntax {{MEASURES}}, 
> {{PATTERN}} and {{DEFINE}}. This would allow users write basic cep use cases 
> with SQL like the following example:
> {code}
> SELECT T.aid, T.bid, T.cid
> FROM MyTable
> MATCH_RECOGNIZE (
>   MEASURES
>     A.id AS aid,
>     B.id AS bid,
>     C.id AS cid
>   PATTERN (A B C)
>   DEFINE
>     A AS A.name = 'a',
>     B AS B.name = 'b',
>     C AS C.name = 'c'
> ) AS T
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to