[
https://issues.apache.org/jira/browse/FLINK-4679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15676708#comment-15676708
]
Jark Wu commented on FLINK-4679:
--------------------------------
Hi [~fhueske] [~twalthr], if I understand correctly, the row-window will
evaluate the aggregates every time a row comes in the window. I think it is
really like window early-fire which is controlled by Trigger. Could we
implement some specific Trigger to fire on every element and then no custom
stream operator needed ? Have I missed anything?
The Row-count row-window trigger could be like this :
{code}
class RowWindowCountTrigger[W <: Window](maxCount: Long) extends Trigger[Any,
W] {
val stateDesc = new ReducingStateDescriptor[JLong]("count", Sum,
LongSerializer.INSTANCE)
override def onElement(element: Any, timestamp: Long, window: W, ctx:
TriggerContext)
: TriggerResult = {
val count: ReducingState[JLong] = ctx.getPartitionedState(stateDesc)
count.add(1L)
if (count.get >= maxCount) {
count.clear()
TriggerResult.FIRE_AND_PURGE
} else {
TriggerResult.FIRE
}
}
override def onProcessingTime(time: Long, window: W, ctx: TriggerContext):
TriggerResult =
TriggerResult.CONTINUE
override def onEventTime(time: Long, window: W, ctx: TriggerContext):
TriggerResult =
TriggerResult.CONTINUE
override def clear(window: W, ctx: TriggerContext): Unit =
ctx.getPartitionedState(stateDesc).clear()
@SerialVersionUID(1L)
object Sum extends ReduceFunction[JLong] {
@throws[Exception]
def reduce(value1: JLong, value2: JLong): JLong = value1 + value2
}
}
{code}
> Add TumbleRow row-windows for streaming tables
> ----------------------------------------------
>
> Key: FLINK-4679
> URL: https://issues.apache.org/jira/browse/FLINK-4679
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Affects Versions: 1.2.0
> Reporter: Fabian Hueske
> Assignee: Jark Wu
>
> Add TumbleRow row-windows for streaming tables as described in
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
>
> This task requires to implement a custom stream operator and integrate it
> with checkpointing and timestamp / watermark logic.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)