asfgit closed pull request #7187: [Flink-7603][cep, table] Support for WITHIN 
clause in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7187
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 90e20065726..55c144a8ed2 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -211,6 +211,7 @@ matchRecognize:
             | SKIP TO variable )
       ]
       PATTERN '(' pattern ')'
+      [ WITHIN intervalLiteral ]
       DEFINE variable AS condition [, variable AS condition ]*
       ')'
 
diff --git a/docs/dev/table/streaming/match_recognize.md 
b/docs/dev/table/streaming/match_recognize.md
index b12cbe5e0d9..465aa6bc83d 100644
--- a/docs/dev/table/streaming/match_recognize.md
+++ b/docs/dev/table/streaming/match_recognize.md
@@ -93,7 +93,7 @@ Every `MATCH_RECOGNIZE` query consists of the following 
clauses:
 * [MEASURES](#define--measures) - defines output of the clause; similar to a 
`SELECT` clause.
 * [ONE ROW PER MATCH](#output-mode) - output mode which defines how many rows 
per match should be produced.
 * [AFTER MATCH SKIP](#after-match-strategy) - specifies where the next match 
should start; this is also a way to control how many distinct matches a single 
event can belong to.
-* [PATTERN](#defining-pattern) - allows constructing patterns that will be 
searched for using a _regular expression_-like syntax.
+* [PATTERN](#defining-a-pattern) - allows constructing patterns that will be 
searched for using a _regular expression_-like syntax.
 * [DEFINE](#define--measures) - this section defines the conditions that the 
pattern variables must satisfy.
 
 <span class="label label-danger">Attention</span> Currently, the 
`MATCH_RECOGNIZE` clause can only be applied to an [append 
table](dynamic_tables.html#update-and-append-queries). Furthermore, it always 
produces
@@ -206,7 +206,7 @@ The `DEFINE` and `MEASURES` keywords have similar meanings 
to the `WHERE` and `S
 The `MEASURES` clause defines what will be included in the output of a 
matching pattern. It can project columns and define expressions for evaluation.
 The number of produced rows depends on the [output mode](#output-mode) setting.
 
-The `DEFINE` clause specifies conditions that rows have to fulfill in order to 
be classified to a corresponding [pattern variable](#defining-pattern).
+The `DEFINE` clause specifies conditions that rows have to fulfill in order to 
be classified to a corresponding [pattern variable](#defining-a-pattern).
 If a condition is not defined for a pattern variable, a default condition will 
be used which evaluates to `true` for every row.
 
 For a more detailed explanation about expressions that can be used in those 
clauses, please have a look at the [event stream 
navigation](#pattern-navigation) section.
@@ -311,6 +311,65 @@ DEFINE
 
 <span class="label label-danger">Attention</span> The optional reluctant 
quantifier (`A??` or `A{0,1}?`) is not supported right now.
 
+### Time constraint
+
+It is possible to specify a pattern that has to finish in a given period of 
time. If the time between first and last event of potential match is longer than
+the given value, such match will not be emitted downstream.
+
+The use of `WITHIN` clause can be illustrated with a query that will look for 
a drop of 10 in price that happened within 1 hour. This task can be expressed 
with following query:
+
+{% highlight sql %}
+SELECT *
+FROM Ticker
+    MATCH_RECOGNIZE(
+        PARTITION BY symbol
+        ORDER BY rowtime
+        MEASURES
+            C.rowtime AS dropTime,
+            A.price - C.price AS dropValue
+        PATTERN (A B* C) WITHIN INTERVAL '1' HOUR
+        ONE ROW PER MATCH
+        AFTER MATCH SKIP PAST LAST ROW
+        DEFINE
+            B AS B.price > A.price - 10
+            C AS C.price < A.price - 10
+    )
+{% endhighlight %}
+
+If that query is used to analyze following ticker data:
+
+{% highlight text %}
+symbol         rowtime         price    tax
+======  ====================  ======= =======
+'ACME'  '01-Apr-11 10:00:00'   20      1
+'ACME'  '01-Apr-11 10:20:00'   17      2
+'ACME'  '01-Apr-11 10:40:00'   18      1
+'ACME'  '01-Apr-11 11:00:00'   11      3
+'ACME'  '01-Apr-11 11:20:00'   14      2
+'ACME'  '01-Apr-11 11:40:00'   9       1
+'ACME'  '01-Apr-11 12:00:00'   15      1
+'ACME'  '01-Apr-11 12:20:00'   14      2
+'ACME'  '01-Apr-11 12:40:00'   24      2
+'ACME'  '01-Apr-11 13:00:00'   1       2
+'ACME'  '01-Apr-11 13:20:00'   19      1
+{% endhighlight %}
+
+It will produce following results:
+
+{% highlight text %}
+symbol         dropTime         dropValue
+======  ====================  =============
+'ACME'  '01-Apr-11 13:00:00'      14
+{% endhighlight %}
+
+Notice that even though e.g between `01-Apr-11 10:00:00` and `01-Apr-11 
11:40:00` the value dropped by 11 as well, but the time difference between those
+two events is larger than 1 hour, thus this was not a match.
+
+<span class="label label-info">Note</span> It is generally encouraged to use 
the `WITHIN` clause as it also helps with memory management, as the underlying 
timed out state will be
+pruned once the threshold is reached.
+
+<span class="label label-danger">Attention</span> The `WITHIN` clause is not 
part of SQL standard. The recommended way of dealing with time constraints 
might change in the future.
+
 Output Mode
 -----------
 
@@ -781,8 +840,8 @@ One has to keep in mind that in case of the `SKIP TO 
FIRST/LAST variable`strateg
 variable (e.g. for pattern `A*`). In such cases, a runtime exception will be 
thrown as the standard requires a valid row to continue the
 matching.
 
-
-### Controlling Memory Consumption
+Controlling Memory Consumption
+------------------------------
 
 Memory consumption is an important consideration when writing 
`MATCH_RECOGNIZE` queries, as the space of potential matches is built in a 
breadth-first-like manner.
 Having that in mind, one must make sure that the pattern can finish. 
Preferably with a reasonable number of rows mapped to the match as they have to 
fit into memory.
@@ -815,8 +874,7 @@ DEFINE
   C as C.price > 20
 {% endhighlight %}
 
-<span class="label label-danger">Attention</span> Please note that the 
`MATCH_RECOGNIZE` clause does not use a configured [state retention 
time](query_configuration.html#idle-state-retention-time). As of now, there is 
also no possibility to define a time restriction on the pattern to finish 
because there is no such possibility in the SQL standard. The community is in 
the process of designing a proper syntax for that
-feature right now.
+<span class="label label-danger">Attention</span> Please note that the 
`MATCH_RECOGNIZE` clause does not use a configured [state retention 
time](query_configuration.html#idle-state-retention-time). One may want to use 
the `WITHIN` [clause](#time-constraint) for this purpose.
 
 Known Limitations
 -----------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamMatch.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamMatch.scala
index 19b298cbf98..d73fe2dbfff 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamMatch.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamMatch.scala
@@ -28,6 +28,7 @@ import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rex._
 import org.apache.calcite.sql.SqlKind
 import org.apache.calcite.sql.SqlMatchRecognize.AfterOption
+import org.apache.calcite.sql.`type`.{SqlTypeFamily, SqlTypeName}
 import org.apache.calcite.sql.fun.SqlStdOperatorTable._
 import org.apache.flink.annotation.VisibleForTesting
 import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -39,6 +40,7 @@ import 
org.apache.flink.cep.pattern.Quantifier.QuantifierProperty
 import org.apache.flink.cep.pattern.conditions.BooleanConditions
 import org.apache.flink.cep.{CEP, PatternStream}
 import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.windowing.time.Time
 
 import scala.collection.JavaConverters._
 import org.apache.flink.table.api._
@@ -95,6 +97,32 @@ class DataStreamMatch(
     explainMatch(super.explainTerms(pw), logicalMatch, inputSchema.fieldNames, 
getExpressionString)
   }
 
+  private def translateTimeBound(interval: RexNode): Time = {
+    interval match {
+      case x: RexLiteral if x.getTypeName.getFamily == 
SqlTypeFamily.INTERVAL_DAY_TIME =>
+        Time.milliseconds(x.getValueAs(classOf[JLong]))
+      case x: RexLiteral if x.getTypeName.getFamily == 
SqlTypeFamily.INTERVAL_YEAR_MONTH =>
+        throw new ValidationException("Time restrictions can only be used with 
DAY_TIME " +
+          "resolution.")
+      case _ => throw new TableException("Could not translate time bound.")
+    }
+  }
+
+  @VisibleForTesting
+  private[flink] def translatePattern(
+    config: TableConfig,
+    inputTypeInfo: TypeInformation[Row]
+  ): (Pattern[Row, Row], Iterable[String]) = {
+    val patternVisitor = new PatternVisitor(config, inputTypeInfo, 
logicalMatch)
+    val cepPattern = if (logicalMatch.interval != null) {
+      val interval = translateTimeBound(logicalMatch.interval)
+      logicalMatch.pattern.accept(patternVisitor).within(interval)
+    } else {
+      logicalMatch.pattern.accept(patternVisitor)
+    }
+    (cepPattern, patternVisitor.names)
+  }
+
   override def translateToPlan(
       tableEnv: StreamTableEnvironment,
       queryConfig: StreamQueryConfig)
@@ -120,9 +148,7 @@ class DataStreamMatch(
       crowInput,
       logicalMatch.orderKeys)
 
-    val patternVisitor = new PatternVisitor(config, inputTypeInfo, 
logicalMatch)
-    val cepPattern = logicalMatch.pattern.accept(patternVisitor)
-    val patternNames = patternVisitor.names
+    val (cepPattern, patternNames) = translatePattern(config, inputTypeInfo)
 
     //TODO remove this once it is supported in CEP library
     if (NFACompiler.canProduceEmptyMatches(cepPattern)) {
@@ -138,11 +164,6 @@ class DataStreamMatch(
           "pattern with either a simple variable or reluctant quantifier.")
     }
 
-    if (logicalMatch.interval != null) {
-      throw new TableException(
-        "WITHIN clause is not part of the SQL Standard, thus it is not 
supported.")
-    }
-
     val inputDS: DataStream[Row] = timestampedInput
       .map(new CRowToRowMapFunction)
       .setParallelism(timestampedInput.getParallelism)
@@ -232,12 +253,9 @@ class DataStreamMatch(
       inputDs
     }
   }
-
-  @VisibleForTesting private[flink] def getLogicalMatch = logicalMatch
 }
 
-@VisibleForTesting
-private[flink] class PatternVisitor(
+private class PatternVisitor(
     config: TableConfig,
     inputTypeInfo: TypeInformation[Row],
     logicalMatch: MatchRecognize)
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
index 62eecbb0532..4611018d6ed 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
@@ -68,7 +68,8 @@ class DataStreamLogicalWindowAggregateRule
 
     def getOperandAsLong(call: RexCall, idx: Int): Long =
       call.getOperands.get(idx) match {
-        case v: RexLiteral => v.getValue.asInstanceOf[JBigDecimal].longValue()
+        case v: RexLiteral =>
+          v.getValue.asInstanceOf[JBigDecimal].longValue()
         case _ => throw new TableException("Only constant window descriptors 
are supported.")
       }
 
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTest.scala
index 9bf651fc0b8..4ac77fbd36c 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTest.scala
@@ -20,9 +20,11 @@ package org.apache.flink.table.`match`
 
 import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy._
 import org.apache.flink.cep.pattern.Pattern
-import org.apache.flink.table.api.TableException
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.table.api.{TableException, ValidationException}
 import org.junit.Test
 
+
 class PatternTranslatorTest extends PatternTranslatorTestBase {
   @Test
   def simplePattern(): Unit = {
@@ -212,6 +214,63 @@ class PatternTranslatorTest extends 
PatternTranslatorTestBase {
       Pattern.begin("A\"", skipToNext()).optional().next("\u006C").next("C"))
   }
 
+  @Test
+  def testWithinClause(): Unit = {
+    verifyPattern(
+      """MATCH_RECOGNIZE (
+        |  ORDER BY proctime
+        |  MEASURES
+        |    A.f0 AS aid
+        |  PATTERN (A B) WITHIN INTERVAL '10 00:00:00.004' DAY TO SECOND
+        |  DEFINE
+        |    A as A.f0 = 1
+        |) AS T
+        |""".stripMargin,
+      Pattern.begin("A", skipToNext()).next("B")
+        .within(Time.milliseconds(10 * 24 * 60 * 60 * 1000 + 4)))
+
+    verifyPattern(
+      """MATCH_RECOGNIZE (
+        |  ORDER BY proctime
+        |  MEASURES
+        |    A.f0 AS aid
+        |  PATTERN (A B) WITHIN INTERVAL '10 00' DAY TO HOUR
+        |  DEFINE
+        |    A as A.f0 = 1
+        |) AS T
+        |""".stripMargin,
+      Pattern.begin("A", skipToNext()).next("B")
+        .within(Time.milliseconds(10 * 24 * 60 * 60 * 1000)))
+
+    verifyPattern(
+      """MATCH_RECOGNIZE (
+        |  ORDER BY proctime
+        |  MEASURES
+        |    A.f0 AS aid
+        |  PATTERN (A B) WITHIN INTERVAL '10' MINUTE
+        |  DEFINE
+        |    A as A.f0 = 1
+        |) AS T
+        |""".stripMargin,
+      Pattern.begin("A", skipToNext()).next("B")
+        .within(Time.milliseconds(10 * 60 * 1000)))
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testWithinClauseWithYearMonthResolution(): Unit = {
+    verifyPattern(
+      """MATCH_RECOGNIZE (
+        |  ORDER BY proctime
+        |  MEASURES
+        |    A.f0 AS aid
+        |  PATTERN (A B) WITHIN INTERVAL '2-10' YEAR TO MONTH
+        |  DEFINE
+        |    A as A.f0 = 1
+        |) AS T
+        |""".stripMargin,
+      null /* don't care */)
+  }
+
   @Test(expected = classOf[TableException])
   def testReluctantOptionalNotSupported(): Unit = {
     verifyPattern(
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTestBase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTestBase.scala
index 883ce0adf81..34b528c4a55 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTestBase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTestBase.scala
@@ -27,7 +27,7 @@ import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironm
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.{TableConfig, TableEnvironment}
 import org.apache.flink.table.calcite.FlinkPlannerImpl
-import org.apache.flink.table.plan.nodes.datastream.{DataStreamMatch, 
DataStreamScan, PatternVisitor}
+import org.apache.flink.table.plan.nodes.datastream.{DataStreamMatch, 
DataStreamScan}
 import org.apache.flink.types.Row
 import org.apache.flink.util.TestLogger
 import org.junit.Assert._
@@ -89,11 +89,8 @@ abstract class PatternTranslatorTestBase extends TestLogger{
       fail("Expression is converted into more than a Match operation. Use a 
different test method.")
     }
 
-    val dataMatch = optimized
-      .asInstanceOf[DataStreamMatch]
-
-    val pVisitor = new PatternVisitor(new TableConfig, testTableTypeInfo, 
dataMatch.getLogicalMatch)
-    val p = dataMatch.getLogicalMatch.pattern.accept(pVisitor)
+    val dataMatch = optimized.asInstanceOf[DataStreamMatch]
+    val p = dataMatch.translatePattern(new TableConfig, testTableTypeInfo)._1
 
     compare(expected, p)
   }
@@ -108,10 +105,16 @@ abstract class PatternTranslatorTestBase extends 
TestLogger{
       val sameSkipStrategy = currentLeft.getAfterMatchSkipStrategy ==
         currentRight.getAfterMatchSkipStrategy
 
+      val sameTimeWindow = if (currentLeft.getWindowTime != null && 
currentRight != null) {
+        currentLeft.getWindowTime.toMilliseconds == 
currentRight.getWindowTime.toMilliseconds
+      } else {
+        currentLeft.getWindowTime == null && currentRight.getWindowTime == null
+      }
+
       currentLeft = currentLeft.getPrevious
       currentRight = currentRight.getPrevious
 
-      if (!sameName || !sameQuantifier || !sameTimes || !sameSkipStrategy) {
+      if (!sameName || !sameQuantifier || !sameTimes || !sameSkipStrategy || 
!sameTimeWindow) {
         throw new ComparisonFailure("Compiled different pattern.",
           expected.toString,
           actual.toString)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to