Repository: incubator-gearpump Updated Branches: refs/heads/master e1228a314 -> c1801595d
[GEARPUMP-339] Add ScalaDoc to window api and impl Author: manuzhang <[email protected]> Closes #213 from manuzhang/add_window_doc. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/c1801595 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/c1801595 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/c1801595 Branch: refs/heads/master Commit: c1801595df62eff8bd8d1b3e51e94aa130fb686f Parents: e1228a3 Author: manuzhang <[email protected]> Authored: Mon Aug 14 12:07:06 2017 +0800 Committer: manuzhang <[email protected]> Committed: Mon Aug 14 12:07:17 2017 +0800 ---------------------------------------------------------------------- .../dsl/window/api/AccumulationMode.scala | 10 +++++++ .../streaming/dsl/window/api/Trigger.scala | 9 ++++++ .../dsl/window/api/WindowFunction.scala | 21 ++++++++++++++ .../streaming/dsl/window/api/Windows.scala | 18 ++++++++---- .../dsl/window/impl/ReduceFnRunner.scala | 29 -------------------- .../streaming/dsl/window/impl/Window.scala | 2 +- .../dsl/window/impl/WindowRunner.scala | 25 ++++++++++++++++- 7 files changed, 78 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c1801595/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala index a4524a8..46b8e92 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala @@ -17,8 +17,18 @@ */ package org.apache.gearpump.streaming.dsl.window.api + +/** + * Determines relationship between multiple results for the same window. + */ sealed trait AccumulationMode +/** + * Window results are accumulated. + */ case object Accumulating extends AccumulationMode +/** + * Window results are independent. + */ case object Discarding extends AccumulationMode http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c1801595/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala index 02d52a0..b9a8695 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala @@ -17,7 +17,16 @@ */ package org.apache.gearpump.streaming.dsl.window.api +/** + * Determines when window results are emitted. + * For now, [[EventTimeTrigger]] is used for all applications. + */ +// TODO: Make this a public API sealed trait Trigger +/** + * Triggers emitting when watermark past the end of window on event time. + */ +// FIXME: This is no more than a tag now and the logic is hard corded in WindowRunner case object EventTimeTrigger extends Trigger http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c1801595/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala index 85ca969..4db02e7 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala @@ -33,8 +33,14 @@ object WindowFunction { } } +/** + * Determines how elements are assigned to windows for calculation. + */ trait WindowFunction { + /** + * Assigns elements into windows. + */ def apply[T](context: WindowFunction.Context[T]): Array[Window] def isNonMerging: Boolean @@ -51,6 +57,9 @@ object GlobalWindowFunction { Instant.ofEpochMilli(Time.MAX_TIME_MILLIS))) } +/** + * All elements are assigned to the same global window for calculation. + */ case class GlobalWindowFunction() extends NonMergingWindowFunction { override def apply[T](context: WindowFunction.Context[T]): Array[Window] = { @@ -58,6 +67,12 @@ case class GlobalWindowFunction() extends NonMergingWindowFunction { } } +/** + * Elements are assigned to non-merging sliding windows for calculation. + * + * @param size window size + * @param step window step to slide forward + */ case class SlidingWindowFunction(size: Duration, step: Duration) extends NonMergingWindowFunction { @@ -86,6 +101,12 @@ case class SlidingWindowFunction(size: Duration, step: Duration) } } +/** + * Elements are assigned to merging windows for calculation. Windows are merged + * if their distance is within the defined gap. + * + * @param gap session gap + */ case class SessionWindowFunction(gap: Duration) extends WindowFunction { override def apply[T](context: WindowFunction.Context[T]): Array[Window] = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c1801595/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala index d53bc96..e15b5c4 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala @@ -20,11 +20,14 @@ package org.apache.gearpump.streaming.dsl.window.api import java.time.Duration /** - * Defines how to apply window functions. + * User facing Window DSL. + * Defines how to apply [[WindowFunction]], [[Trigger]] + * and [[AccumulationMode]]. * * @param windowFn how to divide windows * @param trigger when to trigger window result - * @param accumulationMode whether to accumulate results across windows + * @param accumulationMode whether to accumulate window results + * @param description window description */ case class Windows( windowFn: WindowFunction, @@ -47,6 +50,11 @@ case class Windows( object GlobalWindows { + /** + * Defines a [[GlobalWindowFunction]]. + * + * @return a Window definition + */ def apply(): Windows = { Windows(GlobalWindowFunction(), description = "globalWindows") } @@ -55,7 +63,7 @@ object GlobalWindows { object FixedWindows { /** - * Defines a FixedWindow. + * Defines a non-overlapping [[SlidingWindowFunction]]. * * @param size window size * @return a Window definition @@ -68,7 +76,7 @@ object FixedWindows { object SlidingWindows { /** - * Defines a SlidingWindow. + * Defines a overlapping [[SlidingWindowFunction]]. * * @param size window size * @param step window step to slide forward @@ -82,7 +90,7 @@ object SlidingWindows { object SessionWindows { /** - * Defines a SessionWindow. + * Defines a [[SessionWindowFunction]]. * * @param gap session gap * @return a Window definition http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c1801595/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/ReduceFnRunner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/ReduceFnRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/ReduceFnRunner.scala deleted file mode 100644 index e978983..0000000 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/ReduceFnRunner.scala +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.streaming.dsl.window.impl - -import org.apache.gearpump.Message -import org.apache.gearpump.streaming.dsl.window.api.Trigger - -trait ReduceFnRunner { - - def process(message: Message): Unit - - def onTrigger(trigger: Trigger): Unit - -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c1801595/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala index 7536473..d6d08c9 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala @@ -28,7 +28,7 @@ object Window { } /** - * A window unit including startTime and excluding endTime. + * A window unit from startTime(including) to endTime(excluding). */ case class Window(startTime: Instant, endTime: Instant) extends Comparable[Window] { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c1801595/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala index 17a9525..ee3c067 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala @@ -31,11 +31,23 @@ import org.apache.gearpump.streaming.task.TaskUtil import scala.collection.mutable.ArrayBuffer +/** + * Inputs for [[WindowRunner]]. + */ case class TimestampedValue[T](value: T, timestamp: Instant) +/** + * Outputs triggered by [[WindowRunner]] + */ case class TriggeredOutputs[T](outputs: TraversableOnce[TimestampedValue[T]], watermark: Instant) +/** + * This is responsible for executing window calculation. + * 1. Groups elements into windows as defined by window function + * 2. Applies window calculation to each group + * 3. Emits results on triggering + */ trait WindowRunner[IN, OUT] extends java.io.Serializable { def process(timestampedValue: TimestampedValue[IN]): Unit @@ -43,6 +55,10 @@ trait WindowRunner[IN, OUT] extends java.io.Serializable { def trigger(time: Instant): TriggeredOutputs[OUT] } +/** + * A composite WindowRunner that first executes its left child and feeds results + * into result child. + */ case class AndThen[IN, MIDDLE, OUT](left: WindowRunner[IN, MIDDLE], right: WindowRunner[MIDDLE, OUT]) extends WindowRunner[IN, OUT] { @@ -57,6 +73,9 @@ case class AndThen[IN, MIDDLE, OUT](left: WindowRunner[IN, MIDDLE], } } +/** + * Default implementation for [[WindowRunner]]. + */ class DefaultWindowRunner[IN, OUT]( windows: Windows, fnRunner: FunctionRunner[IN, OUT]) @@ -137,11 +156,15 @@ class DefaultWindowRunner[IN, OUT]( } onTrigger(outputs, newWmk) } else { - // minimum of end of last triggered window and start of first un-triggered window + // The output watermark is the minimum of end of last triggered window + // and start of first un-triggered window TriggeredOutputs(outputs, TaskUtil.min(wmk, firstWin.startTime)) } } else { + // All windows have been triggered. if (time == Watermark.MAX) { + // This means there will be no more inputs + // so it's safe to advance to the maximum watermark. TriggeredOutputs(outputs, Watermark.MAX) } else { TriggeredOutputs(outputs, wmk)
