[
https://issues.apache.org/jira/browse/FLINK-13658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16903350#comment-16903350
]
Sergei Winitzki commented on FLINK-13658:
-----------------------------------------
{code:java}
package org.apache.flink.streaming.api.windowing.triggers
import org.apache.flink.streaming.api.windowing.triggers.{Trigger,
TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.Window
object TriggerOf {
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult._
/** Combine two [[TriggerResult]] values. This is a monoidal operation.
*
* @param r1 The first [[TriggerResult]] value.
* @param r2 The second [[TriggerResult]] value.
* @return A new [[TriggerResult]] value that combines the two values.
*/
def or(r1: TriggerResult, r2: TriggerResult): TriggerResult = (r1, r2) match {
case (CONTINUE, x) ⇒ x
case (x, CONTINUE) ⇒ x
case (_, FIRE_AND_PURGE) | (FIRE_AND_PURGE, _) ⇒ FIRE_AND_PURGE
case (PURGE, FIRE) | (FIRE, PURGE) ⇒ FIRE_AND_PURGE // This could also be
defined as `FIRE` or as `PURGE` without violating the monoid associativity law.
case (PURGE, PURGE) ⇒ PURGE
case (FIRE, FIRE) ⇒ FIRE
}
// Syntax extension for the combining operation for [[TriggerResult]] values.
implicit class OrOps(val r1: TriggerResult) extends AnyVal {
def \/(r2: TriggerResult): TriggerResult = or(r1, r2)
}
}
/** Combine two triggers into one. The new trigger fires whenever one of the
two previously defined triggers fire.
* Both triggers must have the same type of window and the same type of stream
elements.
*
* The firing events (`CONTINUE`, `FIRE`, `PURGE`, `FIRE_AND_PURGE`) are
combined according to the monoid operation `\/` defined above.
*
* @param t1 A first trigger.
* @param t2 A second trigger.
* @tparam T The type of stream elements.
* @tparam W The type of window.
*/
final case class TriggerOf[T, W <: Window](t1: Trigger[T, W], t2: Trigger[T,
W]) extends Trigger[T, W] {
import TriggerOf._
override def onElement(element: T, timestamp: Long, window: W, ctx:
Trigger.TriggerContext): TriggerResult = t1.onElement(element, timestamp,
window, ctx) \/ t2.onElement(element, timestamp, window, ctx)
override def onProcessingTime(time: Long, window: W, ctx:
Trigger.TriggerContext): TriggerResult = t1.onProcessingTime(time, window, ctx)
\/ t2.onProcessingTime(time, window, ctx)
override def onEventTime(time: Long, window: W, ctx: Trigger.TriggerContext):
TriggerResult = t1.onEventTime(time, window, ctx) \/ t2.onEventTime(time,
window, ctx)
override def canMerge: Boolean = t1.canMerge && t2.canMerge
override def onMerge(window: W, ctx: Trigger.OnMergeContext): Unit = {
t1.onMerge(window, ctx)
t2.onMerge(window, ctx)
}
override def clear(window: W, ctx: Trigger.TriggerContext): Unit = {
t1.clear(window, ctx)
t2.clear(window, ctx)
}
}{code}
> Combine two triggers into one (for streaming windows)
> -----------------------------------------------------
>
> Key: FLINK-13658
> URL: https://issues.apache.org/jira/browse/FLINK-13658
> Project: Flink
> Issue Type: New Feature
> Components: API / DataStream
> Reporter: Sergei Winitzki
> Priority: Major
>
> Combine two `Trigger`s into one. This allows users to write in one line a
> windowed stream whose windows are defined by max element count together with
> a max time delay between windows.
>
> Presently, Flink documentation and Stack Overflow discussions tell users to
> implement such triggers manually as custom triggers. However, the
> `TriggerResult` enumeration type can be defined as a monoid, and so two
> results can be naturally combined into one. This allows users to combine two
> or more triggers automatically.
>
> This implementation is a Scala-only prototype. I am new to Flink and may not
> be able to contribute a fully compliant PR.
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)