asfgit closed pull request #7187: [Flink-7603][cep, table] Support for WITHIN
clause in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7187
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 90e20065726..55c144a8ed2 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -211,6 +211,7 @@ matchRecognize:
| SKIP TO variable )
]
PATTERN '(' pattern ')'
+ [ WITHIN intervalLiteral ]
DEFINE variable AS condition [, variable AS condition ]*
')'
diff --git a/docs/dev/table/streaming/match_recognize.md
b/docs/dev/table/streaming/match_recognize.md
index b12cbe5e0d9..465aa6bc83d 100644
--- a/docs/dev/table/streaming/match_recognize.md
+++ b/docs/dev/table/streaming/match_recognize.md
@@ -93,7 +93,7 @@ Every `MATCH_RECOGNIZE` query consists of the following
clauses:
* [MEASURES](#define--measures) - defines output of the clause; similar to a
`SELECT` clause.
* [ONE ROW PER MATCH](#output-mode) - output mode which defines how many rows
per match should be produced.
* [AFTER MATCH SKIP](#after-match-strategy) - specifies where the next match
should start; this is also a way to control how many distinct matches a single
event can belong to.
-* [PATTERN](#defining-pattern) - allows constructing patterns that will be
searched for using a _regular expression_-like syntax.
+* [PATTERN](#defining-a-pattern) - allows constructing patterns that will be
searched for using a _regular expression_-like syntax.
* [DEFINE](#define--measures) - this section defines the conditions that the
pattern variables must satisfy.
<span class="label label-danger">Attention</span> Currently, the
`MATCH_RECOGNIZE` clause can only be applied to an [append
table](dynamic_tables.html#update-and-append-queries). Furthermore, it always
produces
@@ -206,7 +206,7 @@ The `DEFINE` and `MEASURES` keywords have similar meanings
to the `WHERE` and `S
The `MEASURES` clause defines what will be included in the output of a
matching pattern. It can project columns and define expressions for evaluation.
The number of produced rows depends on the [output mode](#output-mode) setting.
-The `DEFINE` clause specifies conditions that rows have to fulfill in order to
be classified to a corresponding [pattern variable](#defining-pattern).
+The `DEFINE` clause specifies conditions that rows have to fulfill in order to
be classified to a corresponding [pattern variable](#defining-a-pattern).
If a condition is not defined for a pattern variable, a default condition will
be used which evaluates to `true` for every row.
For a more detailed explanation about expressions that can be used in those
clauses, please have a look at the [event stream
navigation](#pattern-navigation) section.
@@ -311,6 +311,65 @@ DEFINE
<span class="label label-danger">Attention</span> The optional reluctant
quantifier (`A??` or `A{0,1}?`) is not supported right now.
+### Time constraint
+
+It is possible to specify a pattern that has to finish in a given period of
time. If the time between first and last event of potential match is longer than
+the given value, such match will not be emitted downstream.
+
+The use of `WITHIN` clause can be illustrated with a query that will look for
a drop of 10 in price that happened within 1 hour. This task can be expressed
with following query:
+
+{% highlight sql %}
+SELECT *
+FROM Ticker
+ MATCH_RECOGNIZE(
+ PARTITION BY symbol
+ ORDER BY rowtime
+ MEASURES
+ C.rowtime AS dropTime,
+ A.price - C.price AS dropValue
+ PATTERN (A B* C) WITHIN INTERVAL '1' HOUR
+ ONE ROW PER MATCH
+ AFTER MATCH SKIP PAST LAST ROW
+ DEFINE
+ B AS B.price > A.price - 10
+ C AS C.price < A.price - 10
+ )
+{% endhighlight %}
+
+If that query is used to analyze following ticker data:
+
+{% highlight text %}
+symbol rowtime price tax
+====== ==================== ======= =======
+'ACME' '01-Apr-11 10:00:00' 20 1
+'ACME' '01-Apr-11 10:20:00' 17 2
+'ACME' '01-Apr-11 10:40:00' 18 1
+'ACME' '01-Apr-11 11:00:00' 11 3
+'ACME' '01-Apr-11 11:20:00' 14 2
+'ACME' '01-Apr-11 11:40:00' 9 1
+'ACME' '01-Apr-11 12:00:00' 15 1
+'ACME' '01-Apr-11 12:20:00' 14 2
+'ACME' '01-Apr-11 12:40:00' 24 2
+'ACME' '01-Apr-11 13:00:00' 1 2
+'ACME' '01-Apr-11 13:20:00' 19 1
+{% endhighlight %}
+
+It will produce following results:
+
+{% highlight text %}
+symbol dropTime dropValue
+====== ==================== =============
+'ACME' '01-Apr-11 13:00:00' 14
+{% endhighlight %}
+
+Notice that even though e.g between `01-Apr-11 10:00:00` and `01-Apr-11
11:40:00` the value dropped by 11 as well, but the time difference between those
+two events is larger than 1 hour, thus this was not a match.
+
+<span class="label label-info">Note</span> It is generally encouraged to use
the `WITHIN` clause as it also helps with memory management, as the underlying
timed out state will be
+pruned once the threshold is reached.
+
+<span class="label label-danger">Attention</span> The `WITHIN` clause is not
part of SQL standard. The recommended way of dealing with time constraints
might change in the future.
+
Output Mode
-----------
@@ -781,8 +840,8 @@ One has to keep in mind that in case of the `SKIP TO
FIRST/LAST variable`strateg
variable (e.g. for pattern `A*`). In such cases, a runtime exception will be
thrown as the standard requires a valid row to continue the
matching.
-
-### Controlling Memory Consumption
+Controlling Memory Consumption
+------------------------------
Memory consumption is an important consideration when writing
`MATCH_RECOGNIZE` queries, as the space of potential matches is built in a
breadth-first-like manner.
Having that in mind, one must make sure that the pattern can finish.
Preferably with a reasonable number of rows mapped to the match as they have to
fit into memory.
@@ -815,8 +874,7 @@ DEFINE
C as C.price > 20
{% endhighlight %}
-<span class="label label-danger">Attention</span> Please note that the
`MATCH_RECOGNIZE` clause does not use a configured [state retention
time](query_configuration.html#idle-state-retention-time). As of now, there is
also no possibility to define a time restriction on the pattern to finish
because there is no such possibility in the SQL standard. The community is in
the process of designing a proper syntax for that
-feature right now.
+<span class="label label-danger">Attention</span> Please note that the
`MATCH_RECOGNIZE` clause does not use a configured [state retention
time](query_configuration.html#idle-state-retention-time). One may want to use
the `WITHIN` [clause](#time-constraint) for this purpose.
Known Limitations
-----------------
diff --git
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamMatch.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamMatch.scala
index 19b298cbf98..d73fe2dbfff 100644
---
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamMatch.scala
+++
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamMatch.scala
@@ -28,6 +28,7 @@ import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rex._
import org.apache.calcite.sql.SqlKind
import org.apache.calcite.sql.SqlMatchRecognize.AfterOption
+import org.apache.calcite.sql.`type`.{SqlTypeFamily, SqlTypeName}
import org.apache.calcite.sql.fun.SqlStdOperatorTable._
import org.apache.flink.annotation.VisibleForTesting
import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -39,6 +40,7 @@ import
org.apache.flink.cep.pattern.Quantifier.QuantifierProperty
import org.apache.flink.cep.pattern.conditions.BooleanConditions
import org.apache.flink.cep.{CEP, PatternStream}
import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.windowing.time.Time
import scala.collection.JavaConverters._
import org.apache.flink.table.api._
@@ -95,6 +97,32 @@ class DataStreamMatch(
explainMatch(super.explainTerms(pw), logicalMatch, inputSchema.fieldNames,
getExpressionString)
}
+ private def translateTimeBound(interval: RexNode): Time = {
+ interval match {
+ case x: RexLiteral if x.getTypeName.getFamily ==
SqlTypeFamily.INTERVAL_DAY_TIME =>
+ Time.milliseconds(x.getValueAs(classOf[JLong]))
+ case x: RexLiteral if x.getTypeName.getFamily ==
SqlTypeFamily.INTERVAL_YEAR_MONTH =>
+ throw new ValidationException("Time restrictions can only be used with
DAY_TIME " +
+ "resolution.")
+ case _ => throw new TableException("Could not translate time bound.")
+ }
+ }
+
+ @VisibleForTesting
+ private[flink] def translatePattern(
+ config: TableConfig,
+ inputTypeInfo: TypeInformation[Row]
+ ): (Pattern[Row, Row], Iterable[String]) = {
+ val patternVisitor = new PatternVisitor(config, inputTypeInfo,
logicalMatch)
+ val cepPattern = if (logicalMatch.interval != null) {
+ val interval = translateTimeBound(logicalMatch.interval)
+ logicalMatch.pattern.accept(patternVisitor).within(interval)
+ } else {
+ logicalMatch.pattern.accept(patternVisitor)
+ }
+ (cepPattern, patternVisitor.names)
+ }
+
override def translateToPlan(
tableEnv: StreamTableEnvironment,
queryConfig: StreamQueryConfig)
@@ -120,9 +148,7 @@ class DataStreamMatch(
crowInput,
logicalMatch.orderKeys)
- val patternVisitor = new PatternVisitor(config, inputTypeInfo,
logicalMatch)
- val cepPattern = logicalMatch.pattern.accept(patternVisitor)
- val patternNames = patternVisitor.names
+ val (cepPattern, patternNames) = translatePattern(config, inputTypeInfo)
//TODO remove this once it is supported in CEP library
if (NFACompiler.canProduceEmptyMatches(cepPattern)) {
@@ -138,11 +164,6 @@ class DataStreamMatch(
"pattern with either a simple variable or reluctant quantifier.")
}
- if (logicalMatch.interval != null) {
- throw new TableException(
- "WITHIN clause is not part of the SQL Standard, thus it is not
supported.")
- }
-
val inputDS: DataStream[Row] = timestampedInput
.map(new CRowToRowMapFunction)
.setParallelism(timestampedInput.getParallelism)
@@ -232,12 +253,9 @@ class DataStreamMatch(
inputDs
}
}
-
- @VisibleForTesting private[flink] def getLogicalMatch = logicalMatch
}
-@VisibleForTesting
-private[flink] class PatternVisitor(
+private class PatternVisitor(
config: TableConfig,
inputTypeInfo: TypeInformation[Row],
logicalMatch: MatchRecognize)
diff --git
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
index 62eecbb0532..4611018d6ed 100644
---
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
+++
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
@@ -68,7 +68,8 @@ class DataStreamLogicalWindowAggregateRule
def getOperandAsLong(call: RexCall, idx: Int): Long =
call.getOperands.get(idx) match {
- case v: RexLiteral => v.getValue.asInstanceOf[JBigDecimal].longValue()
+ case v: RexLiteral =>
+ v.getValue.asInstanceOf[JBigDecimal].longValue()
case _ => throw new TableException("Only constant window descriptors
are supported.")
}
diff --git
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTest.scala
index 9bf651fc0b8..4ac77fbd36c 100644
---
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTest.scala
+++
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTest.scala
@@ -20,9 +20,11 @@ package org.apache.flink.table.`match`
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy._
import org.apache.flink.cep.pattern.Pattern
-import org.apache.flink.table.api.TableException
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.table.api.{TableException, ValidationException}
import org.junit.Test
+
class PatternTranslatorTest extends PatternTranslatorTestBase {
@Test
def simplePattern(): Unit = {
@@ -212,6 +214,63 @@ class PatternTranslatorTest extends
PatternTranslatorTestBase {
Pattern.begin("A\"", skipToNext()).optional().next("\u006C").next("C"))
}
+ @Test
+ def testWithinClause(): Unit = {
+ verifyPattern(
+ """MATCH_RECOGNIZE (
+ | ORDER BY proctime
+ | MEASURES
+ | A.f0 AS aid
+ | PATTERN (A B) WITHIN INTERVAL '10 00:00:00.004' DAY TO SECOND
+ | DEFINE
+ | A as A.f0 = 1
+ |) AS T
+ |""".stripMargin,
+ Pattern.begin("A", skipToNext()).next("B")
+ .within(Time.milliseconds(10 * 24 * 60 * 60 * 1000 + 4)))
+
+ verifyPattern(
+ """MATCH_RECOGNIZE (
+ | ORDER BY proctime
+ | MEASURES
+ | A.f0 AS aid
+ | PATTERN (A B) WITHIN INTERVAL '10 00' DAY TO HOUR
+ | DEFINE
+ | A as A.f0 = 1
+ |) AS T
+ |""".stripMargin,
+ Pattern.begin("A", skipToNext()).next("B")
+ .within(Time.milliseconds(10 * 24 * 60 * 60 * 1000)))
+
+ verifyPattern(
+ """MATCH_RECOGNIZE (
+ | ORDER BY proctime
+ | MEASURES
+ | A.f0 AS aid
+ | PATTERN (A B) WITHIN INTERVAL '10' MINUTE
+ | DEFINE
+ | A as A.f0 = 1
+ |) AS T
+ |""".stripMargin,
+ Pattern.begin("A", skipToNext()).next("B")
+ .within(Time.milliseconds(10 * 60 * 1000)))
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testWithinClauseWithYearMonthResolution(): Unit = {
+ verifyPattern(
+ """MATCH_RECOGNIZE (
+ | ORDER BY proctime
+ | MEASURES
+ | A.f0 AS aid
+ | PATTERN (A B) WITHIN INTERVAL '2-10' YEAR TO MONTH
+ | DEFINE
+ | A as A.f0 = 1
+ |) AS T
+ |""".stripMargin,
+ null /* don't care */)
+ }
+
@Test(expected = classOf[TableException])
def testReluctantOptionalNotSupported(): Unit = {
verifyPattern(
diff --git
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTestBase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTestBase.scala
index 883ce0adf81..34b528c4a55 100644
---
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTestBase.scala
+++
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTestBase.scala
@@ -27,7 +27,7 @@ import org.apache.flink.streaming.api.scala.{DataStream,
StreamExecutionEnvironm
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{TableConfig, TableEnvironment}
import org.apache.flink.table.calcite.FlinkPlannerImpl
-import org.apache.flink.table.plan.nodes.datastream.{DataStreamMatch,
DataStreamScan, PatternVisitor}
+import org.apache.flink.table.plan.nodes.datastream.{DataStreamMatch,
DataStreamScan}
import org.apache.flink.types.Row
import org.apache.flink.util.TestLogger
import org.junit.Assert._
@@ -89,11 +89,8 @@ abstract class PatternTranslatorTestBase extends TestLogger{
fail("Expression is converted into more than a Match operation. Use a
different test method.")
}
- val dataMatch = optimized
- .asInstanceOf[DataStreamMatch]
-
- val pVisitor = new PatternVisitor(new TableConfig, testTableTypeInfo,
dataMatch.getLogicalMatch)
- val p = dataMatch.getLogicalMatch.pattern.accept(pVisitor)
+ val dataMatch = optimized.asInstanceOf[DataStreamMatch]
+ val p = dataMatch.translatePattern(new TableConfig, testTableTypeInfo)._1
compare(expected, p)
}
@@ -108,10 +105,16 @@ abstract class PatternTranslatorTestBase extends
TestLogger{
val sameSkipStrategy = currentLeft.getAfterMatchSkipStrategy ==
currentRight.getAfterMatchSkipStrategy
+ val sameTimeWindow = if (currentLeft.getWindowTime != null &&
currentRight != null) {
+ currentLeft.getWindowTime.toMilliseconds ==
currentRight.getWindowTime.toMilliseconds
+ } else {
+ currentLeft.getWindowTime == null && currentRight.getWindowTime == null
+ }
+
currentLeft = currentLeft.getPrevious
currentRight = currentRight.getPrevious
- if (!sameName || !sameQuantifier || !sameTimes || !sameSkipStrategy) {
+ if (!sameName || !sameQuantifier || !sameTimes || !sameSkipStrategy ||
!sameTimeWindow) {
throw new ComparisonFailure("Compiled different pattern.",
expected.toString,
actual.toString)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services