Repository: incubator-gearpump Updated Branches: refs/heads/akka-streams 4fe5458f4 -> bc3940352
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala new file mode 100644 index 0000000..4b7649f --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.task + +import java.time.Instant + +import akka.actor.ActorSystem +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.Constants._ +import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, GroupAlsoByWindow, WindowRunner} +import org.apache.gearpump.streaming.task.{Task, TaskContext} + +/** + * This task triggers output on watermark progress. + */ +class EventTimeTriggerTask[IN, GROUP]( + groupBy: GroupAlsoByWindow[IN, GROUP], + windowRunner: WindowRunner, + taskContext: TaskContext, + userConfig: UserConfig) + extends Task(taskContext, userConfig) { + + def this(groupBy: GroupAlsoByWindow[IN, GROUP], + taskContext: TaskContext, userConfig: UserConfig) = { + this(groupBy, new DefaultWindowRunner(taskContext, userConfig, groupBy)(taskContext.system), + taskContext, userConfig) + } + + def this(taskContext: TaskContext, userConfig: UserConfig) = { + this(userConfig.getValue[GroupAlsoByWindow[IN, GROUP]]( + GEARPUMP_STREAMING_GROUPBY_FUNCTION)(taskContext.system).get, + taskContext, userConfig) + } + + override def onNext(message: Message): Unit = { + windowRunner.process(message) + } + + override def onWatermarkProgress(watermark: Instant): Unit = { + windowRunner.trigger(watermark) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala new file mode 100644 index 0000000..980a54b --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.task + +import java.time.Instant +import java.util.concurrent.TimeUnit + +import akka.actor.Actor.Receive +import akka.actor.ActorSystem +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.Constants._ +import org.apache.gearpump.streaming.dsl.task.ProcessingTimeTriggerTask.Triggering +import org.apache.gearpump.streaming.dsl.window.api.SlidingWindowFn +import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, GroupAlsoByWindow, WindowRunner} +import org.apache.gearpump.streaming.task.{Task, TaskContext} + +import scala.concurrent.duration.FiniteDuration + +object ProcessingTimeTriggerTask { + case object Triggering +} + +/** + * This task triggers output on scheduled system time interval. + */ +class ProcessingTimeTriggerTask[IN, GROUP]( + groupBy: GroupAlsoByWindow[IN, GROUP], + windowRunner: WindowRunner, + taskContext: TaskContext, + userConfig: UserConfig) + extends Task(taskContext, userConfig) { + + def this(groupBy: GroupAlsoByWindow[IN, GROUP], + taskContext: TaskContext, userConfig: UserConfig) = { + this(groupBy, new DefaultWindowRunner(taskContext, userConfig, groupBy)(taskContext.system), + taskContext, userConfig) + } + + def this(taskContext: TaskContext, userConfig: UserConfig) = { + this(userConfig.getValue[GroupAlsoByWindow[IN, GROUP]]( + GEARPUMP_STREAMING_GROUPBY_FUNCTION)(taskContext.system).get, + taskContext, userConfig) + } + + private val windowFn = groupBy.window.windowFn.asInstanceOf[SlidingWindowFn] + private val windowSizeMs = windowFn.size.toMillis + private val windowStepMs = windowFn.step.toMillis + + override def onStart(startTime: Instant): Unit = { + val initialDelay = windowSizeMs - Instant.now.toEpochMilli % windowSizeMs + taskContext.scheduleOnce( + new FiniteDuration(initialDelay, TimeUnit.MILLISECONDS))(self ! Triggering) + } + + override def onNext(message: Message): Unit = { + windowRunner.process(message) + } + + override def receiveUnManagedMessage: Receive = { + case Triggering => + windowRunner.trigger(Instant.now) + taskContext.scheduleOnce( + new FiniteDuration(windowStepMs, TimeUnit.MILLISECONDS))(self ! Triggering) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala new file mode 100644 index 0000000..e35f085 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.task + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.Constants._ +import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction +import org.apache.gearpump.streaming.task.{Task, TaskContext} + +class TransformTask[IN, OUT]( + operator: Option[SingleInputFunction[IN, OUT]], taskContext: TaskContext, + userConf: UserConfig) extends Task(taskContext, userConf) { + + def this(taskContext: TaskContext, userConf: UserConfig) = { + this(userConf.getValue[SingleInputFunction[IN, OUT]]( + GEARPUMP_STREAMING_OPERATOR)(taskContext.system), taskContext, userConf) + } + + override def onNext(msg: Message): Unit = { + val time = msg.timestamp + + operator match { + case Some(op) => + op.process(msg.msg.asInstanceOf[IN]).foreach { msg => + taskContext.output(new Message(msg, time)) + } + case None => + taskContext.output(new Message(msg.msg, time)) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala new file mode 100644 index 0000000..a4524a8 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.window.api + +sealed trait AccumulationMode + +case object Accumulating extends AccumulationMode + +case object Discarding extends AccumulationMode http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/GroupByFn.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/GroupByFn.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/GroupByFn.scala new file mode 100644 index 0000000..30e68ba --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/GroupByFn.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.window.api + +import akka.actor.ActorSystem +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.Processor +import org.apache.gearpump.streaming.task.Task + +/** + * Divides messages into groups according its payload and timestamp. + * Check [[org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow]] + * for default implementation. + */ +trait GroupByFn[T, GROUP] { + + /** + * Used by + * 1. GroupByPartitioner to shuffle messages + * 2. WindowRunner to group messages for time-based aggregation + */ + def groupBy(message: Message): GROUP + + /** + * Returns a Processor according to window trigger during planning + */ + def getProcessor(parallelism: Int, description: String, + userConfig: UserConfig)(implicit system: ActorSystem): Processor[_ <: Task] +} + + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala new file mode 100644 index 0000000..9865e18 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.window.api + +sealed trait Trigger + +case object EventTimeTrigger extends Trigger + +case object ProcessingTimeTrigger extends Trigger + +case object CountTrigger extends Trigger + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Window.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Window.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Window.scala new file mode 100644 index 0000000..4b94879 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Window.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.window.api + +import java.time.Duration + +/** + * + * @param windowFn + * @param trigger + * @param accumulationMode + */ +case class Window( + windowFn: WindowFn, + trigger: Trigger = EventTimeTrigger, + accumulationMode: AccumulationMode = Discarding) { + + def triggering(trigger: Trigger): Window = { + Window(windowFn, trigger) + } + + def accumulating: Window = { + Window(windowFn, trigger, Accumulating) + } + + def discarding: Window = { + Window(windowFn, trigger, Discarding) + } +} + +object CountWindow { + + def apply(size: Int): Window = { + Window(CountWindowFn(size), CountTrigger) + } +} + +object FixedWindow { + + /** + * Defines a FixedWindow. + * @param size window size + * @return a Window definition + */ + def apply(size: Duration): Window = { + Window(SlidingWindowFn(size, size)) + } +} + +object SlidingWindow { + + /** + * Defines a SlidingWindow + * @param size window size + * @param step window step to slide forward + * @return a Window definition + */ + def apply(size: Duration, step: Duration): Window = { + Window(SlidingWindowFn(size, step)) + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFn.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFn.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFn.scala new file mode 100644 index 0000000..0768730 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFn.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.window.api + +import java.time.{Duration, Instant} + +import org.apache.gearpump.TimeStamp +import org.apache.gearpump.streaming.dsl.window.impl.Bucket + +import scala.collection.mutable.ArrayBuffer + +sealed trait WindowFn { + def apply(timestamp: Instant): List[Bucket] +} + +case class SlidingWindowFn(size: Duration, step: Duration) + extends WindowFn { + + def this(size: Duration) = { + this(size, size) + } + + override def apply(timestamp: Instant): List[Bucket] = { + val sizeMillis = size.toMillis + val stepMillis = step.toMillis + val timeMillis = timestamp.toEpochMilli + val windows = ArrayBuffer.empty[Bucket] + var start = lastStartFor(timeMillis, stepMillis) + windows += Bucket.ofEpochMilli(start, start + sizeMillis) + start -= stepMillis + while (start >= timeMillis) { + windows += Bucket.ofEpochMilli(start, start + sizeMillis) + start -= stepMillis + } + windows.toList + } + + private def lastStartFor(timestamp: TimeStamp, windowStep: Long): TimeStamp = { + timestamp - (timestamp + windowStep) % windowStep + } +} + +case class CountWindowFn(size: Int) extends WindowFn { + + override def apply(timestamp: Instant): List[Bucket] = { + List(Bucket.ofEpochMilli(0, size)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/ReduceFnRunner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/ReduceFnRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/ReduceFnRunner.scala new file mode 100644 index 0000000..e978983 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/ReduceFnRunner.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.window.impl + +import org.apache.gearpump.Message +import org.apache.gearpump.streaming.dsl.window.api.Trigger + +trait ReduceFnRunner { + + def process(message: Message): Unit + + def onTrigger(trigger: Trigger): Unit + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala new file mode 100644 index 0000000..53cf5d0 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.window.impl + +import java.time.Instant + +import akka.actor.ActorSystem +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.Constants._ +import org.apache.gearpump.streaming.Processor +import org.apache.gearpump.{Message, TimeStamp} +import org.apache.gearpump.streaming.dsl.window.api._ +import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, EventTimeTriggerTask, ProcessingTimeTriggerTask} +import org.apache.gearpump.streaming.task.Task + +object Bucket { + def ofEpochMilli(startTime: TimeStamp, endTime: TimeStamp): Bucket = { + Bucket(Instant.ofEpochMilli(startTime), Instant.ofEpochMilli(endTime)) + } +} + +/** + * A window unit including startTime and excluding endTime. + */ +case class Bucket(startTime: Instant, endTime: Instant) extends Comparable[Bucket] { + override def compareTo(o: Bucket): Int = { + val ret = startTime.compareTo(o.startTime) + if (ret != 0) { + ret + } else { + endTime.compareTo(o.endTime) + } + } +} + +case class GroupAlsoByWindow[T, GROUP](groupByFn: T => GROUP, window: Window) + extends GroupByFn[T, (GROUP, List[Bucket])] { + + override def groupBy(message: Message): (GROUP, List[Bucket]) = { + val group = groupByFn(message.msg.asInstanceOf[T]) + val buckets = window.windowFn(Instant.ofEpochMilli(message.timestamp)) + group -> buckets + } + + override def getProcessor(parallelism: Int, description: String, + userConfig: UserConfig)(implicit system: ActorSystem): Processor[_ <: Task] = { + val config = userConfig.withValue(GEARPUMP_STREAMING_GROUPBY_FUNCTION, this) + window.trigger match { + case CountTrigger => + Processor[CountTriggerTask[T, GROUP]](parallelism, description, config) + case ProcessingTimeTrigger => + Processor[ProcessingTimeTriggerTask[T, GROUP]](parallelism, description, config) + case EventTimeTrigger => + Processor[EventTimeTriggerTask[T, GROUP]](parallelism, description, config) + } + } + +} + + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala new file mode 100644 index 0000000..9af5e61 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.window.impl + +import java.time.Instant + +import akka.actor.ActorSystem +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.gs.collections.api.block.procedure.Procedure +import org.apache.gearpump.gs.collections.impl.list.mutable.FastList +import org.apache.gearpump.gs.collections.impl.map.mutable.UnifiedMap +import org.apache.gearpump.gs.collections.impl.map.sorted.mutable.TreeSortedMap +import org.apache.gearpump.streaming.Constants._ +import org.apache.gearpump.streaming.dsl.plan.functions.{EmitFunction, SingleInputFunction} +import org.apache.gearpump.streaming.dsl.window.api.Discarding +import org.apache.gearpump.streaming.task.TaskContext +import org.apache.gearpump.util.LogUtil +import org.slf4j.Logger + +trait WindowRunner { + + def process(message: Message): Unit + + def trigger(time: Instant): Unit + +} + +object DefaultWindowRunner { + + private val LOG: Logger = LogUtil.getLogger(classOf[DefaultWindowRunner[_, _, _]]) + + case class WindowGroup[GROUP](bucket: Bucket, group: GROUP) + extends Comparable[WindowGroup[GROUP]] { + override def compareTo(o: WindowGroup[GROUP]): Int = { + val ret = bucket.compareTo(o.bucket) + if (ret != 0) { + ret + } else if (group.equals(o.group)) { + 0 + } else { + -1 + } + } + } +} + +class DefaultWindowRunner[IN, GROUP, OUT]( + taskContext: TaskContext, userConfig: UserConfig, + groupBy: GroupAlsoByWindow[IN, GROUP])(implicit system: ActorSystem) + extends WindowRunner { + import org.apache.gearpump.streaming.dsl.window.impl.DefaultWindowRunner._ + + private val windowGroups = new TreeSortedMap[WindowGroup[GROUP], FastList[IN]] + private val groupFns = new UnifiedMap[GROUP, SingleInputFunction[IN, OUT]] + + + override def process(message: Message): Unit = { + val (group, buckets) = groupBy.groupBy(message) + buckets.foreach { bucket => + val wg = WindowGroup(bucket, group) + val inputs = windowGroups.getOrDefault(wg, new FastList[IN](1)) + inputs.add(message.msg.asInstanceOf[IN]) + windowGroups.put(wg, inputs) + } + groupFns.putIfAbsent(group, + userConfig.getValue[SingleInputFunction[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get) + } + + override def trigger(time: Instant): Unit = { + onTrigger() + + @annotation.tailrec + def onTrigger(): Unit = { + if (windowGroups.notEmpty()) { + val first = windowGroups.firstKey + if (!time.isBefore(first.bucket.endTime)) { + val inputs = windowGroups.remove(first) + val reduceFn = groupFns.get(first.group) + .andThen[Unit](new EmitFunction[OUT](emitResult(_, time))) + inputs.forEach(new Procedure[IN] { + override def value(t: IN): Unit = { + reduceFn.process(t) + } + }) + reduceFn.finish() + if (groupBy.window.accumulationMode == Discarding) { + reduceFn.clearState() + } + onTrigger() + } + } + } + + def emitResult(result: OUT, time: Instant): Unit = { + taskContext.output(Message(result, time.toEpochMilli)) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala index fb2d898..535497c 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala @@ -23,7 +23,7 @@ import java.time.Instant import org.apache.gearpump._ import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.Constants._ -import org.apache.gearpump.streaming.dsl.plan.OpTranslator.{DummyInputFunction, SingleInputFunction} +import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction import org.apache.gearpump.streaming.task.{Task, TaskContext} /** @@ -57,15 +57,10 @@ class DataSourceTask[IN, OUT] private[source]( private val processMessage: Message => Unit = operator match { case Some(op) => - op match { - case bad: DummyInputFunction[IN] => - (message: Message) => context.output(message) - case _ => - (message: Message) => { - op.process(message.msg.asInstanceOf[IN]).foreach { m: OUT => - context.output(Message(m, message.timestamp)) - } - } + (message: Message) => { + op.process(message.msg.asInstanceOf[IN]).foreach { m: OUT => + context.output(Message(m, message.timestamp)) + } } case None => (message: Message) => context.output(message) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala index eb52700..f72e5b8 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala @@ -60,7 +60,7 @@ class TaskActor( val LOG: Logger = LogUtil.getLogger(getClass, app = appId, executor = executorId, task = taskId) // Metrics - private val metricName = s"app$appId.processor${taskId.processorId}.task${taskId.index}" + private val metricName = s"app${appId}.processor${taskId.processorId}.task${taskId.index}" private val receiveLatency = Metrics(context.system).histogram( s"$metricName:receiveLatency", sampleRate = 1) private val processTime = Metrics(context.system).histogram(s"$metricName:processTime") @@ -307,9 +307,9 @@ class TaskActor( private def updateUpstreamMinClock(upstreamClock: TimeStamp): Unit = { if (upstreamClock > this.upstreamMinClock) { + this.upstreamMinClock = upstreamClock task.onWatermarkProgress(Instant.ofEpochMilli(this.upstreamMinClock)) } - this.upstreamMinClock = upstreamClock val subMinClock = subscriptions.foldLeft(Long.MaxValue) { (min, sub) => val subMin = sub._2.minClock http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala index e919a34..e0407ec 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala @@ -21,7 +21,10 @@ package org.apache.gearpump.streaming.dsl import akka.actor.ActorSystem import org.apache.gearpump.cluster.TestUtil import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.partitioner.PartitionerDescription +import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication} import org.apache.gearpump.streaming.source.DataSourceTask +import org.apache.gearpump.util.Graph import org.mockito.Mockito.when import org.scalatest._ import org.scalatest.mock.MockitoSugar @@ -30,7 +33,7 @@ import scala.concurrent.Await import scala.concurrent.duration.Duration class StreamAppSpec extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar { - implicit var system: ActorSystem = null + implicit var system: ActorSystem = _ override def beforeAll(): Unit = { system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) @@ -45,49 +48,25 @@ class StreamAppSpec extends FlatSpec with Matchers with BeforeAndAfterAll with M val context: ClientContext = mock[ClientContext] when(context.system).thenReturn(system) - val app = StreamApp("dsl", context) - app.source(List("A"), 1, "") - app.source(List("B"), 1, "") + val dsl = StreamApp("dsl", context) + dsl.source(List("A"), 2, "A") shouldBe a [Stream[_]] + dsl.source(List("B"), 3, "B") shouldBe a [Stream[_]] - assert(app.graph.vertices.size == 2) - } - - it should "plan the dsl to Processsor(TaskDescription) DAG" in { - val context: ClientContext = mock[ClientContext] - when(context.system).thenReturn(system) - - val app = StreamApp("dsl", context) - val parallism = 3 - app.source(List("A", "B", "C"), parallism, "").flatMap(Array(_)).reduce(_ + _) - val task = app.plan.dag.vertices.iterator.next() - assert(task.taskClass == classOf[DataSourceTask[_, _]].getName) - assert(task.parallelism == parallism) - } - - it should "produce 3 messages" in { - val context: ClientContext = mock[ClientContext] - when(context.system).thenReturn(system) - val app = StreamApp("dsl", context) - val list = List[String]( - "0", - "1", - "2" - ) - val producer = app.source(list, 1, "producer").flatMap(Array(_)).reduce(_ + _) - val task = app.plan.dag.vertices.iterator.next() - /* - val task = app.plan.dag.vertices.iterator.map(desc => { - LOG.info(s"${desc.taskClass}") - }) - val sum = producer.flatMap(msg => { - LOG.info("in flatMap") - assert(msg.msg.isInstanceOf[String]) - val num = msg.msg.asInstanceOf[String].toInt - Array(num) - }).reduce(_+_) - val task = app.plan.dag.vertices.iterator.map(desc => { - LOG.info(s"${desc.taskClass}") - }) - */ + val application = dsl.plan() + application shouldBe a [StreamApplication] + application.name shouldBe "dsl" + val dag = application.userConfig + .getValue[Graph[ProcessorDescription, PartitionerDescription]](StreamApplication.DAG).get + dag.vertices.size shouldBe 2 + dag.vertices.foreach { processor => + processor.taskClass shouldBe classOf[DataSourceTask[_, _]].getName + if (processor.description == "A") { + processor.parallelism shouldBe 2 + } else if (processor.description == "B") { + processor.parallelism shouldBe 3 + } else { + fail(s"undefined source ${processor.description}") + } + } } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala index 816feef..fdc721b 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala @@ -22,10 +22,11 @@ import akka.actor._ import org.apache.gearpump.Message import org.apache.gearpump.cluster.client.ClientContext import org.apache.gearpump.cluster.{TestUtil, UserConfig} -import org.apache.gearpump.partitioner.{CoLocationPartitioner, HashPartitioner} +import org.apache.gearpump.partitioner.{CoLocationPartitioner, HashPartitioner, PartitionerDescription} +import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication} import org.apache.gearpump.streaming.dsl.StreamSpec.Join import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner -import org.apache.gearpump.streaming.dsl.plan.OpTranslator._ +import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, TransformTask} import org.apache.gearpump.streaming.source.DataSourceTask import org.apache.gearpump.streaming.task.{Task, TaskContext} import org.apache.gearpump.util.Graph @@ -40,7 +41,6 @@ import scala.util.{Either, Left, Right} class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar { - implicit var system: ActorSystem = _ override def beforeAll(): Unit = { @@ -56,7 +56,7 @@ class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Mock val context: ClientContext = mock[ClientContext] when(context.system).thenReturn(system) - val app = StreamApp("dsl", context) + val dsl = StreamApp("dsl", context) val data = """ @@ -66,30 +66,32 @@ class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Mock five four five """ - val stream = app.source(data.lines.toList, 1, ""). + val stream = dsl.source(data.lines.toList, 1, ""). flatMap(line => line.split("[\\s]+")).filter(_.nonEmpty). map(word => (word, 1)). groupBy(_._1, parallelism = 2). reduce((left, right) => (left._1, left._2 + right._2)). map[Either[(String, Int), String]](Left(_)) - val query = app.source(List("two"), 1, "").map[Either[(String, Int), String]](Right(_)) + val query = dsl.source(List("two"), 1, "").map[Either[(String, Int), String]](Right(_)) stream.merge(query).process[(String, Int)](classOf[Join], 1) - val appDescription = app.plan() + val app: StreamApplication = dsl.plan() + val dag = app.userConfig + .getValue[Graph[ProcessorDescription, PartitionerDescription]](StreamApplication.DAG).get - val dagTopology = appDescription.dag.mapVertex(_.taskClass).mapEdge { (node1, edge, node2) => + val dagTopology = dag.mapVertex(_.taskClass).mapEdge { (node1, edge, node2) => edge.partitionerFactory.partitioner.getClass.getName } val expectedDagTopology = getExpectedDagTopology - assert(dagTopology.vertices.toSet.equals(expectedDagTopology.vertices.toSet)) - assert(dagTopology.edges.toSet.equals(expectedDagTopology.edges.toSet)) + dagTopology.vertices.toSet should contain theSameElementsAs expectedDagTopology.vertices.toSet + dagTopology.edges.toSet should contain theSameElementsAs expectedDagTopology.edges.toSet } private def getExpectedDagTopology: Graph[String, String] = { val source = classOf[DataSourceTask[_, _]].getName - val group = classOf[GroupByTask[_, _, _]].getName + val group = classOf[CountTriggerTask[_, _]].getName val merge = classOf[TransformTask[_, _]].getName val join = classOf[Join].getName http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala index fcc646d..f49eb04 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala @@ -18,24 +18,33 @@ package org.apache.gearpump.streaming.dsl.partitioner -import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} +import java.time.Duration +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} import org.apache.gearpump.Message import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitionerSpec.People +import org.apache.gearpump.streaming.dsl.window.api.{FixedWindow, GroupByFn} +import org.apache.gearpump.streaming.dsl.window.impl.{Bucket, GroupAlsoByWindow} class GroupByPartitionerSpec extends FlatSpec with Matchers with BeforeAndAfterAll { - it should "use the outpout of groupBy function to do partition" in { + + it should "group by message payload and window" in { val mark = People("Mark", "male") val tom = People("Tom", "male") val michelle = People("Michelle", "female") val partitionNum = 10 - val groupBy = new GroupByPartitioner[People, String](_.gender) - assert(groupBy.getPartition(Message(mark), partitionNum) - == groupBy.getPartition(Message(tom), partitionNum)) + val groupByFn: GroupByFn[People, (String, List[Bucket])] = + GroupAlsoByWindow[People, String](_.gender, FixedWindow.apply(Duration.ofMillis(5))) + val groupBy = new GroupByPartitioner[People, (String, List[Bucket])](groupByFn) + groupBy.getPartition(Message(mark, 1L), partitionNum) shouldBe + groupBy.getPartition(Message(tom, 2L), partitionNum) + + groupBy.getPartition(Message(mark, 1L), partitionNum) should not be + groupBy.getPartition(Message(tom, 6L), partitionNum) - assert(groupBy.getPartition(Message(mark), partitionNum) - != groupBy.getPartition(Message(michelle), partitionNum)) + groupBy.getPartition(Message(mark, 2L), partitionNum) should not be + groupBy.getPartition(Message(michelle, 3L), partitionNum) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala new file mode 100644 index 0000000..bf52abc --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.plan + +import java.time.Instant + +import akka.actor.ActorSystem +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.{TestUtil, UserConfig} +import org.apache.gearpump.streaming.Processor +import org.apache.gearpump.streaming.Processor.DefaultProcessor +import org.apache.gearpump.streaming.dsl.plan.OpSpec.{AnySink, AnySource, AnyTask} +import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction +import org.apache.gearpump.streaming.dsl.window.api.GroupByFn +import org.apache.gearpump.streaming.sink.DataSink +import org.apache.gearpump.streaming.source.DataSource +import org.apache.gearpump.streaming.task.{Task, TaskContext} +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec} +import org.scalatest.mock.MockitoSugar + +import scala.concurrent.Await +import scala.concurrent.duration.Duration + +class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoSugar { + + private val unchainableOps: List[Op] = List( + mock[DataSourceOp], + mock[DataSinkOp], + mock[GroupByOp[Any, Any]], + mock[MergeOp], + mock[ProcessorOp[AnyTask]]) + + implicit var system: ActorSystem = _ + + override def beforeAll(): Unit = { + system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) + } + + override def afterAll(): Unit = { + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } + + "DataSourceOp" should { + + "chain ChainableOp" in { + val dataSource = new AnySource + val dataSourceOp = DataSourceOp(dataSource) + val chainableOp = mock[ChainableOp[Any, Any]] + val fn = mock[SingleInputFunction[Any, Any]] + + val chainedOp = dataSourceOp.chain(chainableOp) + + chainedOp shouldBe a[DataSourceOp] + verify(chainableOp).fn + + unchainableOps.foreach { op => + intercept[OpChainException] { + dataSourceOp.chain(op) + } + } + } + + "get Processor of DataSource" in { + val dataSource = new AnySource + val dataSourceOp = DataSourceOp(dataSource) + val processor = dataSourceOp.getProcessor + processor shouldBe a[Processor[_]] + processor.parallelism shouldBe dataSourceOp.parallelism + processor.description shouldBe dataSourceOp.description + } + } + + "DataSinkOp" should { + + "not chain any Op" in { + val dataSink = new AnySink + val dataSinkOp = DataSinkOp(dataSink) + val chainableOp = mock[ChainableOp[Any, Any]] + val ops = chainableOp +: unchainableOps + ops.foreach { op => + intercept[OpChainException] { + dataSinkOp.chain(op) + } + } + } + + "get Processor of DataSink" in { + val dataSink = new AnySink + val dataSinkOp = DataSinkOp(dataSink) + val processor = dataSinkOp.getProcessor + processor shouldBe a[Processor[_]] + processor.parallelism shouldBe dataSinkOp.parallelism + processor.description shouldBe dataSinkOp.description + } + } + + "ProcessorOp" should { + + "not chain any Op" in { + val processorOp = new ProcessorOp[AnyTask] + val chainableOp = mock[ChainableOp[Any, Any]] + val ops = chainableOp +: unchainableOps + ops.foreach { op => + intercept[OpChainException] { + processorOp.chain(op) + } + } + } + + "get Processor" in { + val processorOp = new ProcessorOp[AnyTask] + val processor = processorOp.getProcessor + processor shouldBe a [DefaultProcessor[_]] + processor.parallelism shouldBe processorOp.parallelism + processor.description shouldBe processorOp.description + } + } + + "ChainableOp" should { + + "chain ChainableOp" in { + val fn1 = mock[SingleInputFunction[Any, Any]] + val chainableOp1 = ChainableOp[Any, Any](fn1) + + val fn2 = mock[SingleInputFunction[Any, Any]] + val chainableOp2 = ChainableOp[Any, Any](fn2) + + val chainedOp = chainableOp1.chain(chainableOp2) + + verify(fn1).andThen(fn2) + chainedOp shouldBe a[ChainableOp[_, _]] + + unchainableOps.foreach { op => + intercept[OpChainException] { + chainableOp1.chain(op) + } + } + } + + "throw exception on getProcessor" in { + val fn1 = mock[SingleInputFunction[Any, Any]] + val chainableOp1 = ChainableOp[Any, Any](fn1) + intercept[UnsupportedOperationException] { + chainableOp1.getProcessor + } + } + } + + "GroupByOp" should { + + "chain ChainableOp" in { + val groupByFn = mock[GroupByFn[Any, Any]] + val groupByOp = GroupByOp[Any, Any](groupByFn) + val fn = mock[SingleInputFunction[Any, Any]] + val chainableOp = mock[ChainableOp[Any, Any]] + when(chainableOp.fn).thenReturn(fn) + + val chainedOp = groupByOp.chain(chainableOp) + chainedOp shouldBe a[GroupByOp[_, _]] + + unchainableOps.foreach { op => + intercept[OpChainException] { + groupByOp.chain(op) + } + } + } + + "delegate to groupByFn on getProcessor" in { + val groupByFn = mock[GroupByFn[Any, Any]] + val groupByOp = GroupByOp[Any, Any](groupByFn) + + groupByOp.getProcessor + verify(groupByFn).getProcessor(anyInt, anyString, any[UserConfig])(any[ActorSystem]) + } + } + + "MergeOp" should { + + val mergeOp = MergeOp("merge") + + "chain ChainableOp" in { + val fn = mock[SingleInputFunction[Any, Any]] + val chainableOp = mock[ChainableOp[Any, Any]] + when(chainableOp.fn).thenReturn(fn) + + val chainedOp = mergeOp.chain(chainableOp) + chainedOp shouldBe a [MergeOp] + + unchainableOps.foreach { op => + intercept[OpChainException] { + mergeOp.chain(op) + } + } + } + + "get Processor" in { + val processor = mergeOp.getProcessor + processor shouldBe a[Processor[_]] + processor.parallelism shouldBe 1 + } + } +} + +object OpSpec { + class AnyTask(context: TaskContext, config: UserConfig) extends Task(context, config) + + class AnySource extends DataSource { + + override def open(context: TaskContext, startTime: Instant): Unit = {} + + override def read(): Message = Message("any") + + override def close(): Unit = {} + + override def getWatermark: Instant = Instant.now() + } + + class AnySink extends DataSink { + + override def open(context: TaskContext): Unit = {} + + override def write(message: Message): Unit = {} + + override def close(): Unit = {} +} +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala deleted file mode 100644 index 2112fd0..0000000 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.dsl.plan - -import java.time.Instant - -import scala.concurrent.Await -import scala.concurrent.duration.Duration -import akka.actor.ActorSystem -import org.mockito.ArgumentCaptor -import org.mockito.Matchers._ -import org.mockito.Mockito._ -import org.scalatest._ -import org.apache.gearpump.Message -import org.apache.gearpump.cluster.{TestUtil, UserConfig} -import org.apache.gearpump.streaming.Constants._ -import org.apache.gearpump.streaming.MockUtil -import org.apache.gearpump.streaming.dsl.CollectionDataSource -import org.apache.gearpump.streaming.dsl.plan.OpTranslator._ -import org.apache.gearpump.streaming.source.DataSourceTask - -class OpTranslatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll { - - - "andThen" should "chain multiple single input function" in { - val dummy = new DummyInputFunction[String] - val split = new FlatMapFunction[String, String](line => line.split("\\s"), "split") - - val filter = new FlatMapFunction[String, String](word => - if (word.isEmpty) None else Some(word), "filter") - - val map = new FlatMapFunction[String, Int](word => Some(1), "map") - - val sum = new ReduceFunction[Int]({ (left, right) => left + right }, "sum") - - val all = dummy.andThen(split).andThen(filter).andThen(map).andThen(sum) - - assert(all.description == "split.filter.map.sum") - - val data = - """ - five four three two one - five four three two - five four three - five four - five - """ - val count = all.process(data).toList.last - assert(count == 15) - } - - "Source" should "iterate over input source and apply attached operator" in { - - val taskContext = MockUtil.mockTaskContext - implicit val actorSystem = MockUtil.system - - val data = "one two three".split("\\s") - val dataSource = new CollectionDataSource[String](data) - val conf = UserConfig.empty.withValue(GEARPUMP_STREAMING_SOURCE, dataSource) - - // Source with no transformer - val source = new DataSourceTask[String, String]( - taskContext, conf) - source.onStart(Instant.EPOCH) - source.onNext(Message("next")) - data.foreach { s => - verify(taskContext, times(1)).output(Message(s)) - } - - // Source with transformer - val anotherTaskContext = MockUtil.mockTaskContext - val double = new FlatMapFunction[String, String](word => List(word, word), "double") - val another = new DataSourceTask(anotherTaskContext, - conf.withValue(GEARPUMP_STREAMING_OPERATOR, double)) - another.onStart(Instant.EPOCH) - another.onNext(Message("next")) - data.foreach { s => - verify(anotherTaskContext, times(2)).output(Message(s)) - } - } - - "GroupByTask" should "group input by groupBy Function and " + - "apply attached operator for each group" in { - - val data = "1 2 2 3 3 3" - - val concat = new ReduceFunction[String]({ (left, right) => - left + right - }, "concat") - - implicit val system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) - val config = UserConfig.empty.withValue[SingleInputFunction[String, String]]( - GEARPUMP_STREAMING_OPERATOR, concat) - - val taskContext = MockUtil.mockTaskContext - - val task = new GroupByTask[String, String, String](input => input, taskContext, config) - task.onStart(Instant.EPOCH) - - val peopleCaptor = ArgumentCaptor.forClass(classOf[Message]) - - data.split("\\s+").foreach { word => - task.onNext(Message(word)) - } - verify(taskContext, times(6)).output(peopleCaptor.capture()) - - import scala.collection.JavaConverters._ - - val values = peopleCaptor.getAllValues.asScala.map(input => input.msg.asInstanceOf[String]) - assert(values.mkString(",") == "1,2,22,3,33,333") - system.terminate() - Await.result(system.whenTerminated, Duration.Inf) - } - - "MergeTask" should "accept two stream and apply the attached operator" in { - - // Source with transformer - val taskContext = MockUtil.mockTaskContext - val conf = UserConfig.empty - val double = new FlatMapFunction[String, String](word => List(word, word), "double") - val task = new TransformTask[String, String](Some(double), taskContext, conf) - task.onStart(Instant.EPOCH) - - val data = "1 2 2 3 3 3".split("\\s+") - - data.foreach { input => - task.onNext(Message(input)) - } - - verify(taskContext, times(data.length * 2)).output(anyObject()) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala new file mode 100644 index 0000000..f8666ba --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.dsl.plan + +import java.time.Instant + +import akka.actor.ActorSystem +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.{TestUtil, UserConfig} +import org.apache.gearpump.partitioner.CoLocationPartitioner +import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner +import org.apache.gearpump.streaming.dsl.plan.PlannerSpec._ +import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapFunction, ReduceFunction} +import org.apache.gearpump.streaming.dsl.window.api.GroupByFn +import org.apache.gearpump.streaming.sink.DataSink +import org.apache.gearpump.streaming.source.DataSource +import org.apache.gearpump.streaming.{MockUtil, Processor} +import org.apache.gearpump.streaming.task.{Task, TaskContext} +import org.apache.gearpump.util.Graph +import org.scalatest.mock.MockitoSugar +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} + +import scala.concurrent.Await +import scala.concurrent.duration.Duration + +class PlannerSpec extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar { + + implicit var system: ActorSystem = _ + + override def beforeAll(): Unit = { + system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) + } + + override def afterAll(): Unit = { + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } + + "Planner" should "chain operations" in { + val graph = Graph.empty[Op, OpEdge] + val sourceOp = DataSourceOp(new AnySource) + val groupByOp = GroupByOp(new AnyGroupByFn) + val flatMapOp = ChainableOp[Any, Any](anyFlatMapFunction) + val reduceOp = ChainableOp[Any, Any](anyReduceFunction) + val processorOp = new ProcessorOp[AnyTask] + val sinkOp = DataSinkOp(new AnySink) + val directEdge = Direct + val shuffleEdge = Shuffle + + graph.addVertex(sourceOp) + graph.addVertex(groupByOp) + graph.addEdge(sourceOp, shuffleEdge, groupByOp) + graph.addVertex(flatMapOp) + graph.addEdge(groupByOp, directEdge, flatMapOp) + graph.addVertex(reduceOp) + graph.addEdge(flatMapOp, directEdge, reduceOp) + graph.addVertex(processorOp) + graph.addEdge(reduceOp, directEdge, processorOp) + graph.addVertex(sinkOp) + graph.addEdge(processorOp, directEdge, sinkOp) + + implicit val system = MockUtil.system + + val planner = new Planner + val plan = planner.plan(graph) + .mapVertex(_.description) + + plan.vertices.toSet should contain theSameElementsAs + Set("source", "groupBy", "processor", "sink") + plan.outgoingEdgesOf("source").iterator.next()._2 shouldBe a[GroupByPartitioner[_, _]] + plan.outgoingEdgesOf("groupBy").iterator.next()._2 shouldBe a[CoLocationPartitioner] + plan.outgoingEdgesOf("processor").iterator.next()._2 shouldBe a[CoLocationPartitioner] + } +} + +object PlannerSpec { + + private val anyParallelism = 1 + private val anyFlatMapFunction = new FlatMapFunction[Any, Any](Option(_), "flatMap") + private val anyReduceFunction = new ReduceFunction[Any]( + (left: Any, right: Any) => (left, right), "reduce") + + class AnyTask(context: TaskContext, config: UserConfig) extends Task(context, config) + + class AnySource extends DataSource { + + override def open(context: TaskContext, startTime: Instant): Unit = {} + + override def read(): Message = Message("any") + + override def close(): Unit = {} + + override def getWatermark: Instant = Instant.now() + } + + class AnySink extends DataSink { + + override def open(context: TaskContext): Unit = {} + + override def write(message: Message): Unit = {} + + override def close(): Unit = {} + } + + class AnyGroupByFn extends GroupByFn[Any, Any] { + + override def groupBy(message: Message): Any = message.msg + + override def getProcessor( + parallelism: Int, + description: String, + userConfig: UserConfig)(implicit system: ActorSystem): Processor[_ <: Task] = { + Processor[AnyTask](anyParallelism, description) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala new file mode 100644 index 0000000..94feae4 --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.plan.functions + +import java.time.Instant + +import akka.actor.ActorSystem +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.{TestUtil, UserConfig} +import org.apache.gearpump.streaming.MockUtil +import org.apache.gearpump.streaming.dsl.CollectionDataSource +import org.apache.gearpump.streaming.source.DataSourceTask +import org.apache.gearpump.streaming.Constants._ +import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, TransformTask} +import org.apache.gearpump.streaming.dsl.window.api.CountWindow +import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow +import org.mockito.ArgumentCaptor +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.scalatest.{Matchers, WordSpec} +import org.scalatest.mock.MockitoSugar + +import scala.concurrent.Await +import scala.concurrent.duration.Duration + +class SingleInputFunctionSpec extends WordSpec with Matchers with MockitoSugar { + import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunctionSpec._ + + "AndThen" should { + + val first = mock[SingleInputFunction[R, S]] + val second = mock[SingleInputFunction[S, T]] + val andThen = new AndThen(first, second) + + "chain first and second functions when processing input value" in { + val input = mock[R] + val firstOutput = mock[S] + val secondOutput = mock[T] + when(first.process(input)).thenReturn(Some(firstOutput)) + when(second.process(firstOutput)).thenReturn(Some(secondOutput)) + + andThen.process(input).toList shouldBe List(secondOutput) + } + + "return chained description" in { + when(first.description).thenReturn("first") + when(second.description).thenReturn("second") + andThen.description shouldBe "first.second" + } + + "return either first result or second on finish" in { + val firstResult = mock[S] + val processedFirst = mock[T] + val secondResult = mock[T] + + when(first.finish()).thenReturn(Some(firstResult)) + when(second.process(firstResult)).thenReturn(Some(processedFirst)) + andThen.finish().toList shouldBe List(processedFirst) + + when(first.finish()).thenReturn(None) + when(second.finish()).thenReturn(Some(secondResult)) + andThen.finish().toList shouldBe List(secondResult) + } + + "clear both states on clearState" in { + andThen.clearState() + + verify(first).clearState() + verify(second).clearState() + } + + "return AndThen on andThen" in { + val third = mock[SingleInputFunction[T, Any]] + andThen.andThen[Any](third) shouldBe an [AndThen[_, _, _]] + } + } + + "FlatMapFunction" should { + + val flatMap = mock[R => TraversableOnce[S]] + val flatMapFunction = new FlatMapFunction[R, S](flatMap, "flatMap") + + "call flatMap function when processing input value" in { + val input = mock[R] + flatMapFunction.process(input) + verify(flatMap).apply(input) + } + + "return passed in description" in { + flatMapFunction.description shouldBe "flatMap" + } + + "return None on finish" in { + flatMapFunction.finish() shouldBe List.empty[S] + } + + "do nothing on clearState" in { + flatMapFunction.clearState() + verifyZeroInteractions(flatMap) + } + + "return AndThen on andThen" in { + val other = mock[SingleInputFunction[S, T]] + flatMapFunction.andThen[T](other) shouldBe an [AndThen[_, _, _]] + } + } + + "ReduceFunction" should { + + + "call reduce function when processing input value" in { + val reduce = mock[(T, T) => T] + val reduceFunction = new ReduceFunction[T](reduce, "reduce") + val input1 = mock[T] + val input2 = mock[T] + val output = mock[T] + + when(reduce.apply(input1, input2)).thenReturn(output, output) + + reduceFunction.process(input1) shouldBe List.empty[T] + reduceFunction.process(input2) shouldBe List.empty[T] + reduceFunction.finish() shouldBe List(output) + + reduceFunction.clearState() + reduceFunction.process(input1) shouldBe List.empty[T] + reduceFunction.clearState() + reduceFunction.process(input2) shouldBe List.empty[T] + reduceFunction.finish() shouldBe List(input2) + } + + "return passed in description" in { + val reduce = mock[(T, T) => T] + val reduceFunction = new ReduceFunction[T](reduce, "reduce") + reduceFunction.description shouldBe "reduce" + } + + "return None on finish" in { + val reduce = mock[(T, T) => T] + val reduceFunction = new ReduceFunction[T](reduce, "reduce") + reduceFunction.finish() shouldBe List.empty[T] + } + + "do nothing on clearState" in { + val reduce = mock[(T, T) => T] + val reduceFunction = new ReduceFunction[T](reduce, "reduce") + reduceFunction.clearState() + verifyZeroInteractions(reduce) + } + + "return AndThen on andThen" in { + val reduce = mock[(T, T) => T] + val reduceFunction = new ReduceFunction[T](reduce, "reduce") + val other = mock[SingleInputFunction[T, Any]] + reduceFunction.andThen[Any](other) shouldBe an[AndThen[_, _, _]] + } + } + + "EmitFunction" should { + + val emit = mock[T => Unit] + val emitFunction = new EmitFunction[T](emit) + + "emit input value when processing input value" in { + val input = mock[T] + + emitFunction.process(input) shouldBe List.empty[Unit] + + verify(emit).apply(input) + } + + "return empty description" in { + emitFunction.description shouldBe "" + } + + "return None on finish" in { + emitFunction.finish() shouldBe List.empty[Unit] + } + + "do nothing on clearState" in { + emitFunction.clearState() + verifyZeroInteractions(emit) + } + + "throw exception on andThen" in { + val other = mock[SingleInputFunction[Unit, Any]] + intercept[UnsupportedOperationException] { + emitFunction.andThen(other) + } + } + } + + "andThen" should { + "chain multiple single input function" in { + val split = new FlatMapFunction[String, String](line => line.split("\\s"), "split") + + val filter = new FlatMapFunction[String, String](word => + if (word.isEmpty) None else Some(word), "filter") + + val map = new FlatMapFunction[String, Int](word => Some(1), "map") + + val sum = new ReduceFunction[Int]({ (left, right) => left + right }, "sum") + + val all = split.andThen(filter).andThen(map).andThen(sum) + + assert(all.description == "split.filter.map.sum") + + val data = + """ + five four three two one + five four three two + five four three + five four + five + """ + // force eager evaluation + all.process(data).toList + val result = all.finish().toList + assert(result.nonEmpty) + assert(result.last == 15) + } + } + + "Source" should { + "iterate over input source and apply attached operator" in { + + val taskContext = MockUtil.mockTaskContext + implicit val actorSystem = MockUtil.system + + val data = "one two three".split("\\s") + val dataSource = new CollectionDataSource[String](data) + val conf = UserConfig.empty.withValue(GEARPUMP_STREAMING_SOURCE, dataSource) + + // Source with no transformer + val source = new DataSourceTask[String, String]( + taskContext, conf) + source.onStart(Instant.EPOCH) + source.onNext(Message("next")) + data.foreach { s => + verify(taskContext, times(1)).output(MockUtil.argMatch[Message]( + message => message.msg == s)) + } + + // Source with transformer + val anotherTaskContext = MockUtil.mockTaskContext + val double = new FlatMapFunction[String, String](word => List(word, word), "double") + val another = new DataSourceTask(anotherTaskContext, + conf.withValue(GEARPUMP_STREAMING_OPERATOR, double)) + another.onStart(Instant.EPOCH) + another.onNext(Message("next")) + data.foreach { s => + verify(anotherTaskContext, times(2)).output(MockUtil.argMatch[Message]( + message => message.msg == s)) + } + } + } + + "CountTriggerTask" should { + "group input by groupBy Function and " + + "apply attached operator for each group" in { + + val data = "1 2 2 3 3 3" + + val concat = new ReduceFunction[String]({ (left, right) => + left + right + }, "concat") + + implicit val system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) + val config = UserConfig.empty.withValue[SingleInputFunction[String, String]]( + GEARPUMP_STREAMING_OPERATOR, concat) + + val taskContext = MockUtil.mockTaskContext + + val groupBy = GroupAlsoByWindow((input: String) => input, CountWindow.apply(1).accumulating) + val task = new CountTriggerTask[String, String](groupBy, taskContext, config) + task.onStart(Instant.EPOCH) + + val peopleCaptor = ArgumentCaptor.forClass(classOf[Message]) + + data.split("\\s+").foreach { word => + task.onNext(Message(word)) + } + verify(taskContext, times(6)).output(peopleCaptor.capture()) + + import scala.collection.JavaConverters._ + + val values = peopleCaptor.getAllValues.asScala.map(input => input.msg.asInstanceOf[String]) + assert(values.mkString(",") == "1,2,22,3,33,333") + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } + } + + "MergeTask" should { + "accept two stream and apply the attached operator" in { + + // Source with transformer + val taskContext = MockUtil.mockTaskContext + val conf = UserConfig.empty + val double = new FlatMapFunction[String, String](word => List(word, word), "double") + val task = new TransformTask[String, String](Some(double), taskContext, conf) + task.onStart(Instant.EPOCH) + + val data = "1 2 2 3 3 3".split("\\s+") + + data.foreach { input => + task.onNext(Message(input)) + } + + verify(taskContext, times(data.length * 2)).output(anyObject()) + } + } +} + +object SingleInputFunctionSpec { + type R = AnyRef + type S = AnyRef + type T = AnyRef +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala new file mode 100644 index 0000000..871d751 --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.task + +import java.time.Instant + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.MockUtil +import org.apache.gearpump.streaming.dsl.window.api.CountWindow +import org.apache.gearpump.streaming.dsl.window.impl.{GroupAlsoByWindow, WindowRunner} +import org.mockito.Mockito._ +import org.scalacheck.Gen +import org.scalatest.mock.MockitoSugar +import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} + +class CountTriggerTaskSpec extends PropSpec with PropertyChecks + with Matchers with MockitoSugar { + + property("CountTriggerTask should trigger output by number of messages in a window") { + + implicit val system = MockUtil.system + + val numGen = Gen.chooseNum[Int](1, 1000) + + forAll(numGen, numGen) { (windowSize: Int, msgNum: Int) => + + val groupBy = mock[GroupAlsoByWindow[Any, Any]] + val window = CountWindow.apply(windowSize) + when(groupBy.window).thenReturn(window) + val windowRunner = mock[WindowRunner] + val userConfig = UserConfig.empty + + val task = new CountTriggerTask[Any, Any](groupBy, windowRunner, + MockUtil.mockTaskContext, userConfig) + val message = mock[Message] + + for (i <- 1 to msgNum) { + task.onNext(message) + } + verify(windowRunner, times(msgNum)).process(message) + verify(windowRunner, times(msgNum / windowSize)).trigger(Instant.ofEpochMilli(windowSize)) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala new file mode 100644 index 0000000..a69abe6 --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.task + +import java.time.{Duration, Instant} + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.MockUtil +import org.apache.gearpump.streaming.dsl.window.api.{EventTimeTrigger, SlidingWindow} +import org.apache.gearpump.streaming.dsl.window.impl.{GroupAlsoByWindow, WindowRunner} +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.scalacheck.Gen +import org.scalatest.{Matchers, PropSpec} +import org.scalatest.mock.MockitoSugar +import org.scalatest.prop.PropertyChecks + +class EventTimeTriggerTaskSpec extends PropSpec with PropertyChecks + with Matchers with MockitoSugar { + + property("EventTimeTriggerTask should trigger on watermark") { + val longGen = Gen.chooseNum[Long](1L, 1000L) + val windowSizeGen = longGen + val windowStepGen = longGen + val watermarkGen = longGen.map(Instant.ofEpochMilli) + + forAll(windowSizeGen, windowStepGen, watermarkGen) { + (windowSize: Long, windowStep: Long, watermark: Instant) => + + val window = SlidingWindow.apply(Duration.ofMillis(windowSize), + Duration.ofMillis(windowStep)).triggering(EventTimeTrigger) + val groupBy = mock[GroupAlsoByWindow[Any, Any]] + val windowRunner = mock[WindowRunner] + val context = MockUtil.mockTaskContext + val config = UserConfig.empty + + when(groupBy.window).thenReturn(window) + + val task = new EventTimeTriggerTask[Any, Any](groupBy, windowRunner, context, config) + + val message = mock[Message] + task.onNext(message) + verify(windowRunner).process(message) + + task.onWatermarkProgress(watermark) + verify(windowRunner).trigger(any[Instant]) + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala new file mode 100644 index 0000000..39e1b4c --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.task + +import java.time.{Duration, Instant} + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.MockUtil +import org.apache.gearpump.streaming.dsl.task.ProcessingTimeTriggerTask.Triggering +import org.apache.gearpump.streaming.dsl.window.api.{ProcessingTimeTrigger, SlidingWindow} +import org.apache.gearpump.streaming.dsl.window.impl.{GroupAlsoByWindow, WindowRunner} +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.scalacheck.Gen +import org.scalatest.{Matchers, PropSpec} +import org.scalatest.mock.MockitoSugar +import org.scalatest.prop.PropertyChecks + +class ProcessingTimeTriggerTaskSpec extends PropSpec with PropertyChecks + with Matchers with MockitoSugar { + + property("ProcessingTimeTriggerTask should trigger on system time interval") { + val longGen = Gen.chooseNum[Long](1L, 1000L) + val windowSizeGen = longGen + val windowStepGen = longGen + val startTimeGen = longGen.map(Instant.ofEpochMilli) + + forAll(windowSizeGen, windowStepGen, startTimeGen) { + (windowSize: Long, windowStep: Long, startTime: Instant) => + + val window = SlidingWindow.apply(Duration.ofMillis(windowSize), + Duration.ofMillis(windowStep)).triggering(ProcessingTimeTrigger) + val groupBy = mock[GroupAlsoByWindow[Any, Any]] + val windowRunner = mock[WindowRunner] + val context = MockUtil.mockTaskContext + val config = UserConfig.empty + + when(groupBy.window).thenReturn(window) + + val task = new ProcessingTimeTriggerTask[Any, Any](groupBy, windowRunner, context, config) + + task.onStart(startTime) + + val message = mock[Message] + task.onNext(message) + verify(windowRunner).process(message) + + task.receiveUnManagedMessage(Triggering) + verify(windowRunner).trigger(any[Instant]) + } + } + +}
