[ 
https://issues.apache.org/jira/browse/FLINK-13658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16903350#comment-16903350
 ] 

Sergei Winitzki commented on FLINK-13658:
-----------------------------------------

{code:java}
package org.apache.flink.streaming.api.windowing.triggers

import org.apache.flink.streaming.api.windowing.triggers.{Trigger, 
TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.Window

object TriggerOf {

import org.apache.flink.streaming.api.windowing.triggers.TriggerResult._

/** Combine two [[TriggerResult]] values. This is a monoidal operation.
*
* @param r1 The first [[TriggerResult]] value.
* @param r2 The second [[TriggerResult]] value.
* @return A new [[TriggerResult]] value that combines the two values.
*/
def or(r1: TriggerResult, r2: TriggerResult): TriggerResult = (r1, r2) match {
case (CONTINUE, x) ⇒ x
case (x, CONTINUE) ⇒ x
case (_, FIRE_AND_PURGE) | (FIRE_AND_PURGE, _) ⇒ FIRE_AND_PURGE
case (PURGE, FIRE) | (FIRE, PURGE) ⇒ FIRE_AND_PURGE // This could also be 
defined as `FIRE` or as `PURGE` without violating the monoid associativity law.
case (PURGE, PURGE) ⇒ PURGE
case (FIRE, FIRE) ⇒ FIRE
}

// Syntax extension for the combining operation for [[TriggerResult]] values.
implicit class OrOps(val r1: TriggerResult) extends AnyVal {
def \/(r2: TriggerResult): TriggerResult = or(r1, r2)
}

}

/** Combine two triggers into one. The new trigger fires whenever one of the 
two previously defined triggers fire.
* Both triggers must have the same type of window and the same type of stream 
elements.
*
* The firing events (`CONTINUE`, `FIRE`, `PURGE`, `FIRE_AND_PURGE`) are 
combined according to the monoid operation `\/` defined above.
*
* @param t1 A first trigger.
* @param t2 A second trigger.
* @tparam T The type of stream elements.
* @tparam W The type of window.
*/
final case class TriggerOf[T, W <: Window](t1: Trigger[T, W], t2: Trigger[T, 
W]) extends Trigger[T, W] {

import TriggerOf._

override def onElement(element: T, timestamp: Long, window: W, ctx: 
Trigger.TriggerContext): TriggerResult = t1.onElement(element, timestamp, 
window, ctx) \/ t2.onElement(element, timestamp, window, ctx)

override def onProcessingTime(time: Long, window: W, ctx: 
Trigger.TriggerContext): TriggerResult = t1.onProcessingTime(time, window, ctx) 
\/ t2.onProcessingTime(time, window, ctx)

override def onEventTime(time: Long, window: W, ctx: Trigger.TriggerContext): 
TriggerResult = t1.onEventTime(time, window, ctx) \/ t2.onEventTime(time, 
window, ctx)

override def canMerge: Boolean = t1.canMerge && t2.canMerge

override def onMerge(window: W, ctx: Trigger.OnMergeContext): Unit = {
t1.onMerge(window, ctx)
t2.onMerge(window, ctx)
}

override def clear(window: W, ctx: Trigger.TriggerContext): Unit = {
t1.clear(window, ctx)
t2.clear(window, ctx)
}
}{code}

> Combine two triggers into one (for streaming windows)
> -----------------------------------------------------
>
>                 Key: FLINK-13658
>                 URL: https://issues.apache.org/jira/browse/FLINK-13658
>             Project: Flink
>          Issue Type: New Feature
>          Components: API / DataStream
>            Reporter: Sergei Winitzki
>            Priority: Major
>
> Combine two `Trigger`s into one. This allows users to write in one line a 
> windowed stream whose windows are defined by max element count together with 
> a max time delay between windows.
>  
> Presently, Flink documentation and Stack Overflow discussions tell users to 
> implement such triggers manually as custom triggers. However, the 
> `TriggerResult` enumeration type can be defined as a monoid, and so two 
> results can be naturally combined into one. This allows users to combine two 
> or more triggers automatically.
>  
> This implementation is a Scala-only prototype. I am new to Flink and may not 
> be able to contribute a fully compliant PR.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to