Repository: incubator-gearpump Updated Branches: refs/heads/master 5f90b70f9 -> 2913a1fd8
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SinkBridgeTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SinkBridgeTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SinkBridgeTask.scala new file mode 100644 index 0000000..1b9c4e3 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SinkBridgeTask.scala @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.task + +import java.time.Instant +import java.util +import java.util.concurrent.TimeUnit + +import akka.actor.Actor.Receive +import akka.actor.{Actor, ActorRef, ActorSystem, Props} +import akka.util.Timeout +import org.apache.gearpump.Message +import org.apache.gearpump.akkastream.task.SinkBridgeTask.RequestMessage +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.streaming.ProcessorId +import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef} +import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskId} +import org.apache.gearpump.util.LogUtil +import org.reactivestreams.{Publisher, Subscriber, Subscription} + +/** + * Bridge Task when data flow is from remote Gearpump Task to local Akka-Stream Module + * + * + * upstream [[Task]] -> [[SinkBridgeTask]] + * \ Remote Cluster + * -------------------------\---------------------- + * \ Local JVM + * \| + * Akka Stream [[Subscriber]] + * + * + * @param taskContext TaskContext + * @param userConf UserConfig + */ +class SinkBridgeTask(taskContext : TaskContext, userConf : UserConfig) + extends Task(taskContext, userConf) { + import taskContext.taskId + + val queue = new util.LinkedList[Message]() + var subscriber: ActorRef = _ + + var request: Int = 0 + + override def onStart(startTime : Instant) : Unit = {} + + override def onNext(msg : Message) : Unit = { + queue.add(msg) + trySendingData() + } + + override def onStop() : Unit = {} + + private def trySendingData(): Unit = { + if (subscriber != null) { + (0 to request).map(_ => queue.poll()).filter(_ != null).foreach { msg => + subscriber ! msg.msg + request -= 1 + } + } + } + + override def receiveUnManagedMessage: Receive = { + case RequestMessage(n) => + this.subscriber = sender + LOG.info("the downstream has requested " + n + " messages from " + subscriber) + request += n.toInt + trySendingData() + case msg => + LOG.error("Failed! Received unknown message " + "taskId: " + taskId + ", " + msg.toString) + } +} + +object SinkBridgeTask { + + case class RequestMessage(number: Int) + + class SinkBridgeTaskClient(system: ActorSystem, context: ClientContext, appId: Int, + processorId: ProcessorId) extends Publisher[AnyRef] with Subscription { + private val taskId = TaskId(processorId, index = 0) + private val LOG = LogUtil.getLogger(getClass) + + private var actor: ActorRef = _ + import system.dispatcher + + private val task = + context.askAppMaster[TaskActorRef](appId, LookupTaskActorRef(taskId)).map{container => + // println("Successfully resolved taskRef for taskId " + taskId + ", " + container.task) + container.task + } + + override def subscribe(subscriber: Subscriber[_ >: AnyRef]): Unit = { + this.actor = system.actorOf(Props(new ClientActor(subscriber))) + subscriber.onSubscribe(this) + } + + override def cancel(): Unit = Unit + + private implicit val timeout = Timeout(5, TimeUnit.SECONDS) + + override def request(l: Long): Unit = { + task.foreach{ task => + task.tell(RequestMessage(l.toInt), actor) + } + } + } + + class ClientActor(subscriber: Subscriber[_ >: AnyRef]) extends Actor { + def receive: Receive = { + case result: AnyRef => + subscriber.onNext(result) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala new file mode 100644 index 0000000..054b483 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.task + +import java.time.Instant + +import akka.actor.Actor.Receive +import org.apache.gearpump.Message +import org.apache.gearpump.akkastream.task.SourceBridgeTask.{AkkaStreamMessage, Complete, Error} +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.streaming.ProcessorId +import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef} +import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskId} +import org.reactivestreams.{Subscriber, Subscription} + +import scala.concurrent.ExecutionContext + +/** + * Bridge Task when data flow is from local Akka-Stream Module to remote Gearpump Task + * + * + * + * [[SourceBridgeTask]] --> downstream [[Task]] + * /| Remote Cluster + * ---------------/-------------------------------- + * / Local JVM + * Akka Stream [[org.reactivestreams.Publisher]] + * + * + * @param taskContext TaskContext + * @param userConf UserConfig + */ +class SourceBridgeTask(taskContext : TaskContext, userConf : UserConfig) + extends Task(taskContext, userConf) { + import taskContext.taskId + + override def onStart(startTime : Instant) : Unit = {} + + override def onNext(msg : Message) : Unit = { + LOG.info("AkkaStreamSource receiving message " + msg) + } + + override def onStop() : Unit = {} + + override def receiveUnManagedMessage: Receive = { + case Error(ex) => + LOG.error("the stream has error", ex) + case AkkaStreamMessage(msg) => + LOG.info("we have received message from akka stream source: " + msg) + taskContext.output(Message(msg, System.currentTimeMillis())) + case Complete(description) => + LOG.info("the stream is completed: " + description) + case msg => + LOG.error("Failed! Received unknown message " + "taskId: " + taskId + ", " + msg.toString) + } +} + + +object SourceBridgeTask { + case class Error(ex: java.lang.Throwable) + + case class Complete(description: String) + + case class AkkaStreamMessage[T >: AnyRef](msg: T) + + class SourceBridgeTaskClient[T >: AnyRef](ec: ExecutionContext, + context: ClientContext, appId: Int, processorId: ProcessorId) extends Subscriber[T] { + val taskId = TaskId(processorId, 0) + var subscription: Subscription = _ + implicit val dispatcher = ec + + val task = context.askAppMaster[TaskActorRef](appId, + LookupTaskActorRef(taskId)).map{container => + // println("Successfully resolved taskRef for taskId " + taskId + ", " + container.task) + container.task + } + + override def onError(throwable: Throwable): Unit = { + task.map(task => task ! Error(throwable)) + } + + override def onSubscribe(subscription: Subscription): Unit = { + // when taskActorRef is resolved, request message from upstream + this.subscription = subscription + task.map(task => subscription.request(1)) + } + + override def onComplete(): Unit = { + task.map(task => task ! Complete("the upstream is completed")) + } + + override def onNext(t: T): Unit = { + task.map {task => + task ! AkkaStreamMessage(t) + } + subscription.request(1) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala new file mode 100644 index 0000000..a0674bc --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.task + +import java.time.Instant + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.TaskContext + +class StatefulMapConcatTask[IN, OUT](context: TaskContext, userConf : UserConfig) + extends GraphTask(context, userConf) { + + val func = userConf.getValue[() => IN => Iterable[OUT]](StatefulMapConcatTask.FUNC).get + var f: IN => Iterable[OUT] = _ + + override def onStart(startTime: Instant) : Unit = { + f = func() + } + + override def onNext(msg : Message) : Unit = { + val in: IN = msg.msg.asInstanceOf[IN] + val out: Iterable[OUT] = f(in) + val iterator = out.iterator + while(iterator.hasNext) { + val nextValue = iterator.next + context.output(Message(nextValue, System.currentTimeMillis())) + } + } +} + +object StatefulMapConcatTask { + val FUNC = "FUNC" +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala new file mode 100644 index 0000000..9559d8f --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.task + +import java.time.Instant +import java.util.concurrent.TimeUnit + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.TaskContext + +import scala.concurrent.duration.FiniteDuration + +case object TakeWithinTimeout + +class TakeWithinTask[T](context: TaskContext, userConf : UserConfig) + extends GraphTask(context, userConf) { + + val timeout = userConf.getValue[FiniteDuration](TakeWithinTask.TIMEOUT). + getOrElse(FiniteDuration(0, TimeUnit.MINUTES)) + var timeoutActive = false + + override def onStart(startTime: Instant): Unit = { + context.scheduleOnce(timeout)( + self ! Message(DropWithinTimeout, System.currentTimeMillis()) + ) + } + + override def onNext(msg : Message) : Unit = { + msg.msg match { + case DropWithinTimeout => + timeoutActive = true + case _ => + + } + timeoutActive match { + case true => + case false => + context.output(msg) + } + } +} + +object TakeWithinTask { + val TIMEOUT = "TIMEOUT" +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ThrottleTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ThrottleTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ThrottleTask.scala new file mode 100644 index 0000000..3c7ad87 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ThrottleTask.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.task + +import java.util.concurrent.TimeUnit + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.TaskContext + +import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration + +class ThrottleTask[T](context: TaskContext, userConf : UserConfig) + extends GraphTask(context, userConf) { + + val cost = userConf.getInt(ThrottleTask.COST).getOrElse(0) + val costCalc = userConf.getValue[T => Int](ThrottleTask.COST_CALC) + val maxBurst = userConf.getInt(ThrottleTask.MAX_BURST) + val timePeriod = userConf.getValue[FiniteDuration](ThrottleTask.TIME_PERIOD). + getOrElse(FiniteDuration(0, TimeUnit.MINUTES)) + val interval = timePeriod.toNanos / cost + + // TODO control rate from TaskActor + override def onNext(msg : Message) : Unit = { + val data = msg.msg.asInstanceOf[T] + val time = msg.timestamp + context.output(msg) + } +} + +object ThrottleTask { + val COST = "COST" + val COST_CALC = "COST_CAL" + val MAX_BURST = "MAX_BURST" + val TIME_PERIOD = "TIME_PERIOD" +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala new file mode 100644 index 0000000..d99d2db --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.task + +import java.time.Instant +import java.util.Date +import java.util.concurrent.TimeUnit + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.TaskContext + +import scala.concurrent.duration.FiniteDuration + +class TickSourceTask[T](context: TaskContext, userConf : UserConfig) + extends GraphTask(context, userConf) { + + val initialDelay = userConf.getValue[FiniteDuration](TickSourceTask.INITIAL_DELAY). + getOrElse(FiniteDuration(0, TimeUnit.MINUTES)) + (TickSourceTask.INITIAL_DELAY) + val interval = userConf.getValue[FiniteDuration](TickSourceTask.INTERVAL). + getOrElse(FiniteDuration(0, TimeUnit.MINUTES)) + val tick = userConf.getValue[T](TickSourceTask.TICK).get + + override def onStart(startTime: Instant): Unit = { + context.schedule(initialDelay, interval)( + self ! Message(tick, System.currentTimeMillis()) + ) + } + + override def onNext(msg : Message) : Unit = { + context.output(msg) + } +} + +object TickSourceTask { + val INITIAL_DELAY = "INITIAL_DELAY" + val INTERVAL = "INTERVAL" + val TICK = "TICK" +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala new file mode 100644 index 0000000..005d018 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.task + +import org.apache.gearpump.akkastream.task.Unzip2Task.UnZipFunction +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.TaskContext + +class Unzip2Task[In, A1, A2](context: TaskContext, userConf : UserConfig) + extends GraphTask(context, userConf) { + + val unzip = userConf. + getValue[UnZipFunction[In, A1, A2]](Unzip2Task.UNZIP2_FUNCTION)(context.system).get.unzip + + override def onNext(msg : Message) : Unit = { + val message = msg.msg + val time = msg.timestamp + val pair = unzip(message.asInstanceOf[In]) + val (a, b) = pair + output(0, Message(a.asInstanceOf[AnyRef], time)) + output(1, Message(b.asInstanceOf[AnyRef], time)) + } +} + +object Unzip2Task { + case class UnZipFunction[In, A1, A2](unzip: In => (A1, A2)) extends Serializable + + val UNZIP2_FUNCTION = "org.apache.gearpump.akkastream.task.unzip2.function" +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Zip2Task.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Zip2Task.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Zip2Task.scala new file mode 100644 index 0000000..7e0c082 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Zip2Task.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.task + +import org.apache.gearpump.Message +import org.apache.gearpump.akkastream.task.Zip2Task.ZipFunction +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.TaskContext + +class Zip2Task[A1, A2, OUT](context: TaskContext, userConf : UserConfig) + extends GraphTask(context, userConf) { + + val zip = userConf. + getValue[ZipFunction[A1, A2, OUT]](Zip2Task.ZIP2_FUNCTION)(context.system).get.zip + var a1: Option[A1] = None + var a2: Option[A2] = None + + override def onNext(msg : Message) : Unit = { + val message = msg.msg + val time = msg.timestamp + a1 match { + case Some(x) => + a2 = Some(message.asInstanceOf[A2]) + a1.foreach(v1 => { + a2.foreach(v2 => { + val out = zip(v1, v2) + context.output(Message(out.asInstanceOf[OUT], time)) + + }) + }) + case None => + a1 = Some(message.asInstanceOf[A1]) + } + } +} + +object Zip2Task { + case class ZipFunction[A1, A2, OUT](val zip: (A1, A2) => OUT) extends Serializable + + val ZIP2_FUNCTION = "org.apache.gearpump.akkastream.task.zip2.function" +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/util/MaterializedValueOps.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/util/MaterializedValueOps.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/util/MaterializedValueOps.scala new file mode 100644 index 0000000..6ad90df --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/util/MaterializedValueOps.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.util + +import akka.stream.impl.StreamLayout.{Atomic, Combine, Ignore, MaterializedValueNode, Module, Transform} + +class MaterializedValueOps(mat: MaterializedValueNode) { + def resolve[Mat](materializedValues: scala.collection.mutable.Map[Module, Any]): Mat = { + def resolveMaterialized(mat: MaterializedValueNode, + materializedValues: scala.collection.mutable.Map[Module, Any]): Any = mat match { + case Atomic(m) => materializedValues.getOrElse(m, ()) + case Combine(f, d1, d2) => f(resolveMaterialized(d1, materializedValues), + resolveMaterialized(d2, materializedValues)) + case Transform(f, d) => f(resolveMaterialized(d, materializedValues)) + case Ignore => () + } + resolveMaterialized(mat, materializedValues).asInstanceOf[Mat] + } +} + +object MaterializedValueOps{ + def apply(mat: MaterializedValueNode): MaterializedValueOps = new MaterializedValueOps(mat) +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/test/scala/akka/stream/gearpump/AttributesSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/test/scala/akka/stream/gearpump/AttributesSpec.scala b/experiments/akkastream/src/test/scala/akka/stream/gearpump/AttributesSpec.scala deleted file mode 100644 index 4ead839..0000000 --- a/experiments/akkastream/src/test/scala/akka/stream/gearpump/AttributesSpec.scala +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.stream.gearpump - -import akka.stream.Attributes -import org.scalatest.{FlatSpec, Matchers} - -class AttributesSpec extends FlatSpec with Matchers { - it should "merge the attributes together" in { - val a = Attributes.name("aa") - val b = Attributes.name("bb") - - val c = a and b - - assert("aa-bb" == c.nameOrDefault()) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/test/scala/org/apache/gearpump/akkastream/AttributesSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/test/scala/org/apache/gearpump/akkastream/AttributesSpec.scala b/experiments/akkastream/src/test/scala/org/apache/gearpump/akkastream/AttributesSpec.scala new file mode 100644 index 0000000..e1846ea --- /dev/null +++ b/experiments/akkastream/src/test/scala/org/apache/gearpump/akkastream/AttributesSpec.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream + +import akka.stream.Attributes +import org.scalatest.{FlatSpec, Matchers} + +class AttributesSpec extends FlatSpec with Matchers { + it should "merge the attributes together" in { + val a = Attributes.name("aa") + val b = Attributes.name("bb") + + val c = a and b + + assert("aa-bb" == c.nameOrDefault()) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/project/BuildDashboard.scala ---------------------------------------------------------------------- diff --git a/project/BuildDashboard.scala b/project/BuildDashboard.scala index c14b9d6..cfa6aae 100644 --- a/project/BuildDashboard.scala +++ b/project/BuildDashboard.scala @@ -46,11 +46,11 @@ object BuildDashboard extends sbt.Build { private lazy val serviceJvmSettings = commonSettings ++ noPublish ++ Seq( libraryDependencies ++= Seq( - "com.typesafe.akka" %% "akka-http-testkit" % akkaVersion % "test", + "com.typesafe.akka" %% "akka-http-testkit" % akkaHttpVersion % "test", "org.scalatest" %% "scalatest" % scalaTestVersion % "test", "com.lihaoyi" %% "upickle" % upickleVersion, - "com.softwaremill.akka-http-session" %% "core" % "0.2.5", - "com.typesafe.akka" %% "akka-http-spray-json-experimental" % akkaVersion, + "com.softwaremill.akka-http-session" %% "core" % "0.3.0", + "com.typesafe.akka" %% "akka-http-spray-json" % akkaHttpVersion, "com.github.scribejava" % "scribejava-apis" % "2.4.0", "com.ning" % "async-http-client" % "1.9.33", "org.webjars" % "angularjs" % "1.4.9", http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/project/BuildExperiments.scala ---------------------------------------------------------------------- diff --git a/project/BuildExperiments.scala b/project/BuildExperiments.scala index e07b688..eb5f9e1 100644 --- a/project/BuildExperiments.scala +++ b/project/BuildExperiments.scala @@ -25,7 +25,7 @@ import sbt.Keys._ object BuildExperiments extends sbt.Build { lazy val experiments: Seq[ProjectReference] = Seq( - // akkastream, + akkastream, cgroup, redis, storm, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/project/Dependencies.scala ---------------------------------------------------------------------- diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 6949497..4e30d3f 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -23,7 +23,8 @@ object Dependencies { val crossScalaVersionNumbers = Seq("2.11.8") val scalaVersionNumber = crossScalaVersionNumbers.last - val akkaVersion = "2.4.3" + val akkaVersion = "2.4.16" + val akkaHttpVersion = "10.0.1" val hadoopVersion = "2.6.0" val hbaseVersion = "1.0.0" val commonsHttpVersion = "3.1" @@ -82,10 +83,9 @@ object Dependencies { "com.typesafe.akka" %% "akka-agent" % akkaVersion, "com.typesafe.akka" %% "akka-slf4j" % akkaVersion, "com.typesafe.akka" %% "akka-kernel" % akkaVersion, - "com.typesafe.akka" %% "akka-http-experimental" % akkaVersion, - "com.typesafe.akka" %% "akka-http-spray-json-experimental" % akkaVersion, + "com.typesafe.akka" %% "akka-http" % akkaHttpVersion, + "com.typesafe.akka" %% "akka-http-spray-json" % akkaHttpVersion, "org.scala-lang" % "scala-reflect" % scalaVersionNumber, - "org.scala-lang.modules" %% "scala-parser-combinators" % "1.0.4", "com.github.romix.akka" %% "akka-kryo-serialization" % kryoVersion, "com.google.guava" % "guava" % guavaVersion, "com.codahale.metrics" % "metrics-graphite" % codahaleVersion http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala index 3088a39..53ee692 100644 --- a/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala +++ b/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala @@ -149,7 +149,7 @@ class AppMasterService(val master: ActorRef, } } } ~ - path("metrics" / RestPath) { path => + path("metrics" / RemainingPath) { path => parameterMap { optionMap => parameter("aggregator" ? "") { aggregator => parameter(ReadOption.Key ? ReadOption.ReadLatest) { readOption => http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala index ed15121..be96577 100644 --- a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala +++ b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala @@ -102,7 +102,7 @@ class MasterService(val master: ActorRef, failWith(ex) } } ~ - path("metrics" / RestPath) { path => + path("metrics" / RemainingPath) { path => parameters(ParamMagnet(ReadOption.Key ? ReadOption.ReadLatest)) { readOption: String => val query = QueryHistoryMetrics(path.head.toString, readOption) onComplete(askActor[HistoryMetrics](master, query)) { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/services/jvm/src/main/scala/org/apache/gearpump/services/SecurityService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/SecurityService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/SecurityService.scala index 804b34f..4989364 100644 --- a/services/jvm/src/main/scala/org/apache/gearpump/services/SecurityService.scala +++ b/services/jvm/src/main/scala/org/apache/gearpump/services/SecurityService.scala @@ -60,8 +60,8 @@ import org.apache.gearpump.services.util.UpickleUtil._ class SecurityService(inner: RouteService, implicit val system: ActorSystem) extends RouteService { // Use scheme "GearpumpBasic" to avoid popping up web browser native authentication box. - private val challenge = HttpChallenge(scheme = "GearpumpBasic", realm = "gearpump", - params = Map.empty) + private val challenge = HttpChallenge(scheme = "GearpumpBasic", realm = Some("gearpump"), + params = Map.empty[String, String]) val LOG = LogUtil.getLogger(getClass, "AUDIT") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala index 284d3f2..7b33987 100644 --- a/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala +++ b/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala @@ -19,10 +19,12 @@ package org.apache.gearpump.services import akka.actor.ActorSystem +import akka.http.scaladsl.marshalling.ToResponseMarshallable import akka.http.scaladsl.model._ import akka.http.scaladsl.server.Directives._ +import akka.http.scaladsl.marshalling.ToResponseMarshallable._ +import akka.http.scaladsl.server.{RejectionHandler, StandardRoute} import akka.stream.Materializer - import org.apache.gearpump.util.Util // NOTE: This cannot be removed!!! import org.apache.gearpump.services.util.UpickleUtil._ @@ -56,14 +58,14 @@ class StaticService(override val system: ActorSystem, supervisorPath: String) getFromResource("index.html") } ~ path("favicon.ico") { - complete(StatusCodes.NotFound) + complete(ToResponseMarshallable(StatusCodes.NotFound)) } ~ pathPrefix("webjars") { get { getFromResourceDirectory("META-INF/resources/webjars") } } ~ - path(Rest) { path => + path(Remaining) { path => getFromResource("%s" format path) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/services/jvm/src/main/scala/org/apache/gearpump/services/WorkerService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/WorkerService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/WorkerService.scala index 8268d61..954fe97 100644 --- a/services/jvm/src/main/scala/org/apache/gearpump/services/WorkerService.scala +++ b/services/jvm/src/main/scala/org/apache/gearpump/services/WorkerService.scala @@ -63,7 +63,7 @@ class WorkerService(val master: ActorRef, override val system: ActorSystem) failWith(ex) } } ~ - path("metrics" / RestPath ) { path => + path("metrics" / RemainingPath ) { path => val workerId = WorkerId.parse(workerIdString) parameter(ReadOption.Key ? ReadOption.ReadLatest) { readOption => val query = QueryHistoryMetrics(path.head.toString, readOption) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala index ca8d89e..d4b3719 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala @@ -123,7 +123,7 @@ object LifeTime { */ class StreamApplication( override val name: String, val inputUserConfig: UserConfig, - dag: Graph[ProcessorDescription, PartitionerDescription]) + val dag: Graph[ProcessorDescription, PartitionerDescription]) extends Application { require(!dag.hasDuplicatedEdge(), "Graph should not have duplicated edges") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala index 82ea7c7..5aaf2fa 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala @@ -124,12 +124,11 @@ case class DataSinkOp( * to another Op to be used */ case class ChainableOp[IN, OUT]( - fn: SingleInputFunction[IN, OUT]) extends Op { + fn: SingleInputFunction[IN, OUT], + userConfig: UserConfig = UserConfig.empty) extends Op { override def description: String = fn.description - override def userConfig: UserConfig = UserConfig.empty - override def chain(other: Op)(implicit system: ActorSystem): Op = { other match { case op: ChainableOp[OUT, _] =>
