[ https://issues.apache.org/jira/browse/GEARPUMP-316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16046692#comment-16046692 ]
ASF GitHub Bot commented on GEARPUMP-316: ----------------------------------------- Github user huafengw commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/186#discussion_r121446246 --- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala --- @@ -19,133 +19,121 @@ package org.apache.gearpump.streaming.dsl.window.impl import java.time.Instant -import akka.actor.ActorSystem import com.gs.collections.api.block.predicate.Predicate -import org.apache.gearpump.Message -import org.apache.gearpump.cluster.UserConfig -import com.gs.collections.api.block.procedure.{Procedure, Procedure2} +import com.gs.collections.api.block.procedure.Procedure import com.gs.collections.impl.list.mutable.FastList -import com.gs.collections.impl.map.mutable.UnifiedMap import com.gs.collections.impl.map.sorted.mutable.TreeSortedMap -import org.apache.gearpump.streaming.Constants._ import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner -import org.apache.gearpump.streaming.dsl.window.api.Discarding -import org.apache.gearpump.streaming.task.TaskContext -import org.apache.gearpump.util.LogUtil -import org.slf4j.Logger +import org.apache.gearpump.streaming.dsl.window.api.WindowFunction.Context +import org.apache.gearpump.streaming.dsl.window.api.{Discarding, Windows} +import scala.collection.mutable.ArrayBuffer -trait WindowRunner { +trait WindowRunner[IN, OUT] extends java.io.Serializable { - def process(message: Message): Unit + def process(in: IN, time: Instant): Unit - def trigger(time: Instant): Unit + def trigger(time: Instant): TraversableOnce[(OUT, Instant)] } -object DefaultWindowRunner { +case class AndThen[IN, MIDDLE, OUT](left: WindowRunner[IN, MIDDLE], + right: WindowRunner[MIDDLE, OUT]) extends WindowRunner[IN, OUT] { - private val LOG: Logger = LogUtil.getLogger(classOf[DefaultWindowRunner[_, _, _]]) + def process(in: IN, time: Instant): Unit = { + left.process(in, time) + } + + def trigger(time: Instant): TraversableOnce[(OUT, Instant)] = { + left.trigger(time).foreach(result => right.process(result._1, result._2)) + right.trigger(time) + } } -class DefaultWindowRunner[IN, GROUP, OUT]( - taskContext: TaskContext, userConfig: UserConfig, - groupBy: GroupAlsoByWindow[IN, GROUP])(implicit system: ActorSystem) - extends WindowRunner { - - private val windowFn = groupBy.window.windowFn - private val groupedWindowInputs = new UnifiedMap[GROUP, TreeSortedMap[Window, FastList[IN]]] - private val groupedFnRunners = new UnifiedMap[GROUP, FunctionRunner[IN, OUT]] - private val groupedRunnerSetups = new UnifiedMap[GROUP, Boolean] - - override def process(message: Message): Unit = { - val input = message.value.asInstanceOf[IN] - val (group, windows) = groupBy.groupBy(message) - if (!groupedWindowInputs.containsKey(group)) { - groupedWindowInputs.put(group, new TreeSortedMap[Window, FastList[IN]]()) - } - val windowInputs = groupedWindowInputs.get(group) - windows.foreach { win => +class DefaultWindowRunner[IN, OUT]( + windows: Windows, + fnRunner: FunctionRunner[IN, OUT]) + extends WindowRunner[IN, OUT] { + + private val windowFn = windows.windowFn + private val windowInputs = new TreeSortedMap[Window, FastList[(IN, Instant)]] + private var setup = false + + override def process(in: IN, time: Instant): Unit = { + val wins = windowFn(new Context[IN] { + override def element: IN = in + + override def timestamp: Instant = time + }) + wins.foreach { win => if (windowFn.isNonMerging) { if (!windowInputs.containsKey(win)) { - val inputs = new FastList[IN](1) + val inputs = new FastList[(IN, Instant)] windowInputs.put(win, inputs) } - windowInputs.get(win).add(input) + windowInputs.get(win).add(in -> time) } else { - merge(windowInputs, win, input) + merge(windowInputs, win, in, time) } } - if (!groupedFnRunners.containsKey(group)) { - val runner = userConfig.getValue[FunctionRunner[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get - groupedFnRunners.put(group, runner) - groupedRunnerSetups.put(group, false) - } - - def merge(windowInputs: TreeSortedMap[Window, FastList[IN]], win: Window, input: IN): Unit = { - val intersected = windowInputs.keySet.select(new Predicate[Window] { + def merge( + winIns: TreeSortedMap[Window, FastList[(IN, Instant)]], + win: Window, in: IN, time: Instant): Unit = { + val intersected = winIns.keySet.select(new Predicate[Window] { override def accept(each: Window): Boolean = { win.intersects(each) } }) var mergedWin = win - val mergedInputs = FastList.newListWith(input) + val mergedInputs = FastList.newListWith(in -> time) intersected.forEach(new Procedure[Window] { override def value(each: Window): Unit = { mergedWin = mergedWin.span(each) - mergedInputs.addAll(windowInputs.remove(each)) + mergedInputs.addAll(winIns.remove(each)) } }) - windowInputs.put(mergedWin, mergedInputs) + winIns.put(mergedWin, mergedInputs) } - } - override def trigger(time: Instant): Unit = { - groupedWindowInputs.forEachKeyValue(new Procedure2[GROUP, TreeSortedMap[Window, FastList[IN]]] { - override def value(group: GROUP, windowInputs: TreeSortedMap[Window, FastList[IN]]): Unit = { - onTrigger(group, windowInputs) - } - }) - + override def trigger(time: Instant): TraversableOnce[(OUT, Instant)] = { @annotation.tailrec - def onTrigger(group: GROUP, windowInputs: TreeSortedMap[Window, FastList[IN]]): Unit = { + def onTrigger(outputs: ArrayBuffer[(OUT, Instant)]): TraversableOnce[(OUT, Instant)] = { if (windowInputs.notEmpty()) { val firstWin = windowInputs.firstKey if (!time.isBefore(firstWin.endTime)) { val inputs = windowInputs.remove(firstWin) - if (groupedFnRunners.containsKey(group)) { - val runner = FunctionRunner.withEmitFn(groupedFnRunners.get(group), - (output: OUT) => { - taskContext.output(Message(output, time)) - }) - val setup = groupedRunnerSetups.get(group) - if (!setup) { - runner.setup() - groupedRunnerSetups.put(group, true) - } - inputs.forEach(new Procedure[IN] { - override def value(t: IN): Unit = { - // .toList forces eager evaluation - runner.process(t).toList + if (!setup) { + fnRunner.setup() + setup = true + } + inputs.forEach(new Procedure[(IN, Instant)] { + override def value(v: (IN, Instant)): Unit = { + fnRunner.process(v._1).foreach { + out: OUT => outputs += (out -> v._2) } - }) - // .toList forces eager evaluation - runner.finish().toList - if (groupBy.window.accumulationMode == Discarding) { - runner.teardown() - groupedRunnerSetups.put(group, false) - // dicarding, setup need to be called for each window - onTrigger(group, windowInputs) - } else { - // accumulating, setup is only called for the first window - onTrigger(group, windowInputs) } + }) + fnRunner.finish().foreach { + out: OUT => outputs += (out -> firstWin.endTime.minusMillis(1)) --- End diff -- I'm wondering, does the message time order in `outputs` matter? If so, is it guaranteed? > Don't enforce groupBy after window > ---------------------------------- > > Key: GEARPUMP-316 > URL: https://issues.apache.org/jira/browse/GEARPUMP-316 > Project: Apache Gearpump > Issue Type: Sub-task > Components: streaming > Reporter: Manu Zhang > Assignee: Manu Zhang > > Return a normal Stream instead of WindowStream on window function. Window > function defines a boundary (window) for elements and the following > operations should fall in corresponding boundaries. The boundary should not > change until a new window function is defined. The default boundary is the > {{GlobalWindows}} if not defined. > This means there will be a window context for each underlying task. Elements > are emitted in terms of the trigger semantics. -- This message was sent by Atlassian JIRA (v6.4.14#64029)