Repository: incubator-gearpump Updated Branches: refs/heads/akka-streams 5c4d60c5b -> 4fe5458f4
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GraphTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GraphTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GraphTask.scala new file mode 100644 index 0000000..8e7a2df --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GraphTask.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.task + +import java.time.Instant + +import org.apache.gearpump.Message +import org.apache.gearpump.akkastream.task.GraphTask.{Index, PortId} +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.ProcessorId +import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskWrapper} + +class GraphTask(inputTaskContext : TaskContext, userConf : UserConfig) + extends Task(inputTaskContext, userConf) { + + private val context = inputTaskContext.asInstanceOf[TaskWrapper] + protected val outMapping = + portsMapping(userConf.getValue[List[ProcessorId]](GraphTask.OUT_PROCESSORS).get) + protected val inMapping = + portsMapping(userConf.getValue[List[ProcessorId]](GraphTask.IN_PROCESSORS).get) + + val sizeOfOutPorts = outMapping.keys.size + val sizeOfInPorts = inMapping.keys.size + + private def portsMapping(processors: List[ProcessorId]): Map[PortId, Index] = { + val portToProcessor = processors.zipWithIndex.map{kv => + (kv._2, kv._1) + }.toMap + + val processorToIndex = processors.sorted.zipWithIndex.toMap + + val portToIndex = portToProcessor.map{kv => + val (outlet, processorId) = kv + val index = processorToIndex(processorId) + (outlet, index) + } + portToIndex + } + + def output(outletId: Int, msg: Message): Unit = { + context.output(outMapping(outletId), msg) + } + + override def onStart(startTime : Instant) : Unit = {} + + override def onStop() : Unit = {} +} + +object GraphTask { + val OUT_PROCESSORS = "org.apache.gearpump.akkastream.task.outprocessors" + val IN_PROCESSORS = "org.apache.gearpump.akkastream.task.inprocessors" + + type PortId = Int + type Index = Int +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GroupedWithinTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GroupedWithinTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GroupedWithinTask.scala new file mode 100644 index 0000000..29d9c91 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GroupedWithinTask.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.task + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.TaskContext + +import scala.collection.immutable.VectorBuilder +import scala.concurrent.duration.FiniteDuration + +class GroupedWithinTask[T](context: TaskContext, userConf : UserConfig) + extends GraphTask(context, userConf) { + + case object GroupedWithinTrigger + val buf: VectorBuilder[T] = new VectorBuilder + val timeWindow = userConf.getValue[FiniteDuration](GroupedWithinTask.TIME_WINDOW) + val batchSize = userConf.getInt(GroupedWithinTask.BATCH_SIZE) + + override def onNext(msg : Message) : Unit = { + + } +} + +object GroupedWithinTask { + val BATCH_SIZE = "BATCH_SIZE" + val TIME_WINDOW = "TIME_WINDOW" +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/InterleaveTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/InterleaveTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/InterleaveTask.scala new file mode 100644 index 0000000..837de6b --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/InterleaveTask.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.task + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.TaskContext + +class InterleaveTask(context: TaskContext, userConf : UserConfig) + extends GraphTask(context, userConf) { + + val sizeOfInputs = sizeOfInPorts + var index = 0 + + // TODO access upstream and pull + override def onNext(msg : Message) : Unit = { + output(index, msg) + index += 1 + if (index == sizeOfInputs) { + index = 0 + } + } +} + +object InterleaveTask { + val INPUT_PORTS = "INPUT_PORTS" + val SEGMENT_SIZE = "SEGMENT_SIZE" +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MapAsyncTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MapAsyncTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MapAsyncTask.scala new file mode 100644 index 0000000..387116d --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MapAsyncTask.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.task + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.TaskContext + +import scala.concurrent.Future + +class MapAsyncTask[In, Out](context: TaskContext, userConf : UserConfig) + extends GraphTask(context, userConf) { + + val f = userConf.getValue[In => Future[Out]](MapAsyncTask.MAPASYNC_FUNC) + implicit val ec = context.system.dispatcher + + override def onNext(msg : Message) : Unit = { + val data = msg.msg.asInstanceOf[In] + val time = msg.timestamp + f match { + case Some(func) => + val fout = func(data) + fout.onComplete(value => { + value.foreach(out => { + val msg = new Message(out, time) + context.output(msg) + }) + }) + case None => + } + } +} + +object MapAsyncTask { + val MAPASYNC_FUNC = "MAPASYNC_FUNC" + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MergeTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MergeTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MergeTask.scala new file mode 100644 index 0000000..2b1cd33 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MergeTask.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * SeG the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.task + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.TaskContext + +class MergeTask(context: TaskContext, userConf : UserConfig) + extends GraphTask(context, userConf) { + + val eagerComplete = userConf.getBoolean(MergeTask.EAGER_COMPLETE) + val inputPorts = userConf.getInt(MergeTask.INPUT_PORTS) + + override def onNext(msg : Message) : Unit = { + context.output(msg) + } +} + +object MergeTask { + val EAGER_COMPLETE = "EAGER_COMPLETE" + val INPUT_PORTS = "INPUT_PORTS" +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala new file mode 100644 index 0000000..1ff9ccd --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.task + +import java.time.Instant +import java.util.Date +import java.util.concurrent.TimeUnit + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.TaskContext + +import scala.concurrent.duration.FiniteDuration + +class SingleSourceTask[T](context: TaskContext, userConf : UserConfig) + extends GraphTask(context, userConf) { + + val elem = userConf.getValue[T](SingleSourceTask.ELEMENT).get + + override def onNext(msg : Message) : Unit = { + context.output(Message(elem, msg.timestamp)) + } +} + +object SingleSourceTask { + val ELEMENT = "ELEMENT" +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SinkBridgeTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SinkBridgeTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SinkBridgeTask.scala new file mode 100644 index 0000000..05011e9 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SinkBridgeTask.scala @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.task + +import java.time.Instant +import java.util +import java.util.concurrent.TimeUnit + +import akka.actor.Actor.Receive +import akka.actor.{Actor, ActorRef, ActorSystem, Props} +import akka.util.Timeout +import org.apache.gearpump.Message +import org.apache.gearpump.akkastream.task.SinkBridgeTask.RequestMessage +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.streaming.ProcessorId +import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef} +import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskId} +import org.apache.gearpump.util.LogUtil +import org.reactivestreams.{Publisher, Subscriber, Subscription} + +/** + * Bridge Task when data flow is from remote Gearpump Task to local Akka-Stream Module + * + * + * upstream [[Task]] -> [[SinkBridgeTask]] + * \ Remote Cluster + * -------------------------\---------------------- + * \ Local JVM + * \| + * Akka Stream [[Subscriber]] + * + * + * @param taskContext TaskContext + * @param userConf UserConfig + */ +class SinkBridgeTask(taskContext : TaskContext, userConf : UserConfig) + extends Task(taskContext, userConf) { + import taskContext.taskId + + val queue = new util.LinkedList[Message]() + var subscriber: ActorRef = _ + + var request: Int = 0 + + override def onStart(startTime : Instant) : Unit = {} + + override def onNext(msg : Message) : Unit = { + queue.add(msg) + trySendingData() + } + + override def onStop() : Unit = {} + + private def trySendingData(): Unit = { + if (subscriber != null) { + (0 to request).map(_ => queue.poll()).filter(_ != null).foreach { msg => + subscriber ! msg.msg + request -= 1 + } + } + } + + override def receiveUnManagedMessage: Receive = { + case RequestMessage(n) => + this.subscriber = sender + LOG.info("the downstream has requested " + n + " messages from " + subscriber) + request += n.toInt + trySendingData() + case msg => + LOG.error("Failed! Received unknown message " + "taskId: " + taskId + ", " + msg.toString) + } +} + +object SinkBridgeTask { + + case class RequestMessage(number: Int) + + class SinkBridgeTaskClient(system: ActorSystem, context: ClientContext, appId: Int, + processorId: ProcessorId) extends Publisher[AnyRef] with Subscription { + private val taskId = TaskId(processorId, index = 0) + private val LOG = LogUtil.getLogger(getClass) + + private var actor: ActorRef = _ + import system.dispatcher + + private val task = + context.askAppMaster[TaskActorRef](appId, LookupTaskActorRef(taskId)).map{container => + // println("Successfully resolved taskRef for taskId " + taskId + ", " + container.task) + container.task + } + + override def subscribe(subscriber: Subscriber[_ >: AnyRef]): Unit = { + this.actor = system.actorOf(Props(new ClientActor(subscriber))) + subscriber.onSubscribe(this) + } + + override def cancel(): Unit = Unit + + private implicit val timeout = Timeout(5, TimeUnit.SECONDS) + + override def request(l: Long): Unit = { + task.foreach{ task => + task.tell(RequestMessage(l.toInt), actor) + } + } + } + + class ClientActor(subscriber: Subscriber[_ >: AnyRef]) extends Actor { + def receive: Receive = { + case result: AnyRef => + subscriber.onNext(result) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala new file mode 100644 index 0000000..b0eda19 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.task + +import java.time.Instant + +import akka.actor.Actor.Receive +import org.apache.gearpump.Message +import org.apache.gearpump.akkastream.task.SourceBridgeTask.{AkkaStreamMessage, Complete, Error} +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.streaming.ProcessorId +import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef} +import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskId} +import org.reactivestreams.{Subscriber, Subscription} + +import scala.concurrent.ExecutionContext + +/** + * Bridge Task when data flow is from local Akka-Stream Module to remote Gearpump Task + * + * + * + * [[SourceBridgeTask]] --> downstream [[Task]] + * /| Remote Cluster + * ---------------/-------------------------------- + * / Local JVM + * Akka Stream [[org.reactivestreams.Publisher]] + * + * + * @param taskContext TaskContext + * @param userConf UserConfig + */ +class SourceBridgeTask(taskContext : TaskContext, userConf : UserConfig) + extends Task(taskContext, userConf) { + import taskContext.taskId + + override def onStart(startTime : Instant) : Unit = {} + + override def onNext(msg : Message) : Unit = { + LOG.info("AkkaStreamSource receiving message " + msg) + } + + override def onStop() : Unit = {} + + override def receiveUnManagedMessage: Receive = { + case Error(ex) => + LOG.error("the stream has error", ex) + case AkkaStreamMessage(msg) => + LOG.info("we have received message from akka stream source: " + msg) + taskContext.output(Message(msg, System.currentTimeMillis())) + case Complete(description) => + LOG.info("the stream is completed: " + description) + case msg => + LOG.error("Failed! Received unknown message " + "taskId: " + taskId + ", " + msg.toString) + } +} + + +object SourceBridgeTask { + case class Error(ex: java.lang.Throwable) + + case class Complete(description: String) + + case class AkkaStreamMessage[T >: AnyRef](msg: T) + + class SourceBridgeTaskClient[T >: AnyRef](ec: ExecutionContext, + context: ClientContext, appId: Int, processorId: ProcessorId) extends Subscriber[T] { + val taskId = TaskId(processorId, 0) + var subscription: Subscription = _ + implicit val dispatcher = ec + + val task = context.askAppMaster[TaskActorRef](appId, + LookupTaskActorRef(taskId)).map{container => + // println("Successfully resolved taskRef for taskId " + taskId + ", " + container.task) + container.task + } + + override def onError(throwable: Throwable): Unit = { + task.map(task => task ! Error(throwable)) + } + + override def onSubscribe(subscription: Subscription): Unit = { + // when taskActorRef is resolved, request message from upstream + this.subscription = subscription + task.map(task => subscription.request(1)) + } + + override def onComplete(): Unit = { + task.map(task => task ! Complete("the upstream is completed")) + } + + override def onNext(t: T): Unit = { + task.map {task => + task ! AkkaStreamMessage(t) + } + subscription.request(1) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala new file mode 100644 index 0000000..bf2c14f --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.task + +import java.time.Instant + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.TaskContext + +class StatefulMapConcatTask[IN, OUT](context: TaskContext, userConf : UserConfig) + extends GraphTask(context, userConf) { + + val func = userConf.getValue[() => IN => Iterable[OUT]](StatefulMapConcatTask.FUNC).get + var f: IN => Iterable[OUT] = _ + + override def onStart(startTime: Instant) : Unit = { + f = func() + } + + override def onNext(msg : Message) : Unit = { + val in: IN = msg.msg.asInstanceOf[IN] + val out: Iterable[OUT] = f(in) + val iterator = out.iterator + while(iterator.hasNext) { + val nextValue = iterator.next + context.output(Message(nextValue, System.currentTimeMillis())) + } + } +} + +object StatefulMapConcatTask { + val FUNC = "FUNC" +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala new file mode 100644 index 0000000..ef43fbe --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.task + +import java.time.Instant +import java.util.concurrent.TimeUnit + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.TaskContext + +import scala.concurrent.duration.FiniteDuration + +case object TakeWithinTimeout + +class TakeWithinTask[T](context: TaskContext, userConf : UserConfig) + extends GraphTask(context, userConf) { + + val timeout = userConf.getValue[FiniteDuration](TakeWithinTask.TIMEOUT). + getOrElse(FiniteDuration(0, TimeUnit.MINUTES)) + var timeoutActive = false + + override def onStart(startTime: Instant): Unit = { + context.scheduleOnce(timeout)( + self ! Message(DropWithinTimeout, System.currentTimeMillis()) + ) + } + + override def onNext(msg : Message) : Unit = { + msg.msg match { + case DropWithinTimeout => + timeoutActive = true + case _ => + + } + timeoutActive match { + case true => + case false => + context.output(msg) + } + } +} + +object TakeWithinTask { + val TIMEOUT = "TIMEOUT" +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ThrottleTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ThrottleTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ThrottleTask.scala new file mode 100644 index 0000000..4e09bf2 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ThrottleTask.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.task + +import java.util.concurrent.TimeUnit + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.TaskContext + +import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration + +class ThrottleTask[T](context: TaskContext, userConf : UserConfig) + extends GraphTask(context, userConf) { + + val cost = userConf.getInt(ThrottleTask.COST).getOrElse(0) + val costCalc = userConf.getValue[T => Int](ThrottleTask.COST_CALC) + val maxBurst = userConf.getInt(ThrottleTask.MAX_BURST) + val timePeriod = userConf.getValue[FiniteDuration](ThrottleTask.TIME_PERIOD). + getOrElse(FiniteDuration(0, TimeUnit.MINUTES)) + val interval = timePeriod.toNanos / cost + + // TODO control rate from TaskActor + override def onNext(msg : Message) : Unit = { + val data = msg.msg.asInstanceOf[T] + val time = msg.timestamp + context.output(msg) + } +} + +object ThrottleTask { + val COST = "COST" + val COST_CALC = "COST_CAL" + val MAX_BURST = "MAX_BURST" + val TIME_PERIOD = "TIME_PERIOD" +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala new file mode 100644 index 0000000..b3850ca --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.task + +import java.time.Instant +import java.util.Date +import java.util.concurrent.TimeUnit + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.TaskContext + +import scala.concurrent.duration.FiniteDuration + +class TickSourceTask[T](context: TaskContext, userConf : UserConfig) + extends GraphTask(context, userConf) { + + val initialDelay = userConf.getValue[FiniteDuration](TickSourceTask.INITIAL_DELAY). + getOrElse(FiniteDuration(0, TimeUnit.MINUTES)) + (TickSourceTask.INITIAL_DELAY) + val interval = userConf.getValue[FiniteDuration](TickSourceTask.INTERVAL). + getOrElse(FiniteDuration(0, TimeUnit.MINUTES)) + val tick = userConf.getValue[T](TickSourceTask.TICK).get + + override def onStart(startTime: Instant): Unit = { + context.schedule(initialDelay, interval)( + self ! Message(tick, System.currentTimeMillis()) + ) + } + + override def onNext(msg : Message) : Unit = { + context.output(msg) + } +} + +object TickSourceTask { + val INITIAL_DELAY = "INITIAL_DELAY" + val INTERVAL = "INTERVAL" + val TICK = "TICK" +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala new file mode 100644 index 0000000..99f1b55 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.task + +import org.apache.gearpump.akkastream.task.Unzip2Task.UnZipFunction +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.TaskContext + +class Unzip2Task[In, A1, A2](context: TaskContext, userConf : UserConfig) + extends GraphTask(context, userConf) { + + val unzip = userConf. + getValue[UnZipFunction[In, A1, A2]](Unzip2Task.UNZIP2_FUNCTION)(context.system).get.unzip + + override def onNext(msg : Message) : Unit = { + val message = msg.msg + val time = msg.timestamp + val pair = unzip(message.asInstanceOf[In]) + val (a, b) = pair + output(0, Message(a.asInstanceOf[AnyRef], time)) + output(1, Message(b.asInstanceOf[AnyRef], time)) + } +} + +object Unzip2Task { + case class UnZipFunction[In, A1, A2](val unzip: In => (A1, A2)) extends Serializable + + val UNZIP2_FUNCTION = "org.apache.gearpump.akkastream.task.unzip2.function" +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Zip2Task.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Zip2Task.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Zip2Task.scala new file mode 100644 index 0000000..a35b133 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Zip2Task.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.task + +import org.apache.gearpump.Message +import org.apache.gearpump.akkastream.task.Zip2Task.ZipFunction +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.TaskContext + +class Zip2Task[A1, A2, OUT](context: TaskContext, userConf : UserConfig) + extends GraphTask(context, userConf) { + + val zip = userConf. + getValue[ZipFunction[A1, A2, OUT]](Zip2Task.ZIP2_FUNCTION)(context.system).get.zip + var a1: Option[A1] = None + var a2: Option[A2] = None + + override def onNext(msg : Message) : Unit = { + val message = msg.msg + val time = msg.timestamp + a1 match { + case Some(x) => + a2 = Some(message.asInstanceOf[A2]) + a1.foreach(v1 => { + a2.foreach(v2 => { + val out = zip(v1, v2) + context.output(Message(out.asInstanceOf[OUT], time)) + + }) + }) + case None => + a1 = Some(message.asInstanceOf[A1]) + } + } +} + +object Zip2Task { + case class ZipFunction[A1, A2, OUT](val zip: (A1, A2) => OUT) extends Serializable + + val ZIP2_FUNCTION = "org.apache.gearpump.akkastream.task.zip2.function" +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/util/MaterializedValueOps.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/util/MaterializedValueOps.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/util/MaterializedValueOps.scala new file mode 100644 index 0000000..c9fe67d --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/util/MaterializedValueOps.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.util + +import akka.stream.impl.StreamLayout.{Atomic, Combine, Ignore, MaterializedValueNode, Module, Transform} + +class MaterializedValueOps(mat: MaterializedValueNode) { + def resolve[Mat](materializedValues: scala.collection.mutable.Map[Module, Any]): Mat = { + def resolveMaterialized(mat: MaterializedValueNode, + materializedValues: scala.collection.mutable.Map[Module, Any]): Any = mat match { + case Atomic(m) => materializedValues.getOrElse(m, ()) + case Combine(f, d1, d2) => f(resolveMaterialized(d1, materializedValues), + resolveMaterialized(d2, materializedValues)) + case Transform(f, d) => f(resolveMaterialized(d, materializedValues)) + case Ignore => () + } + resolveMaterialized(mat, materializedValues).asInstanceOf[Mat] + } +} + +object MaterializedValueOps{ + def apply(mat: MaterializedValueNode): MaterializedValueOps = new MaterializedValueOps(mat) +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/test/scala/akka/stream/gearpump/AttributesSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/test/scala/akka/stream/gearpump/AttributesSpec.scala b/experiments/akkastream/src/test/scala/akka/stream/gearpump/AttributesSpec.scala deleted file mode 100644 index 4ead839..0000000 --- a/experiments/akkastream/src/test/scala/akka/stream/gearpump/AttributesSpec.scala +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.stream.gearpump - -import akka.stream.Attributes -import org.scalatest.{FlatSpec, Matchers} - -class AttributesSpec extends FlatSpec with Matchers { - it should "merge the attributes together" in { - val a = Attributes.name("aa") - val b = Attributes.name("bb") - - val c = a and b - - assert("aa-bb" == c.nameOrDefault()) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/test/scala/org/apache/gearpump/akkastream/AttributesSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/test/scala/org/apache/gearpump/akkastream/AttributesSpec.scala b/experiments/akkastream/src/test/scala/org/apache/gearpump/akkastream/AttributesSpec.scala new file mode 100644 index 0000000..3731d41 --- /dev/null +++ b/experiments/akkastream/src/test/scala/org/apache/gearpump/akkastream/AttributesSpec.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream + +import akka.stream.Attributes +import org.scalatest.{FlatSpec, Matchers} + +class AttributesSpec extends FlatSpec with Matchers { + it should "merge the attributes together" in { + val a = Attributes.name("aa") + val b = Attributes.name("bb") + + val c = a and b + + assert("aa-bb" == c.nameOrDefault()) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/project/Build.scala ---------------------------------------------------------------------- diff --git a/project/Build.scala b/project/Build.scala index 17e78df..0b1628e 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -35,7 +35,8 @@ object Build extends sbt.Build { val copySharedSourceFiles = TaskKey[Unit]("copied shared services source code") - val akkaVersion = "2.4.3" + val akkaVersion = "2.4.10" + val akkaStreamVersion = "2.4-SNAPSHOT" val apacheRepo = "https://repository.apache.org/" val hadoopVersion = "2.6.0" val hbaseVersion = "1.0.0" @@ -145,9 +146,12 @@ object Build extends sbt.Build { "com.typesafe.akka" %% "akka-cluster" % akkaVersion, "com.typesafe.akka" %% "akka-cluster-tools" % akkaVersion, "commons-logging" % "commons-logging" % commonsLoggingVersion, - "com.typesafe.akka" %% "akka-distributed-data-experimental" % akkaVersion, + "com.typesafe.akka" %% "akka-distributed-data-experimental" % akkaVersion + exclude("com.typesafe.akka", "akka-stream_2.11"), + "com.typesafe.akka" %% "akka-stream" % akkaStreamVersion, "org.apache.hadoop" % "hadoop-common" % hadoopVersion % "provided" - ) + ), + dependencyOverrides += "com.typesafe.akka" %% "akka-stream" % akkaStreamVersion ) val streamingDependencies = Seq( @@ -186,7 +190,8 @@ object Build extends sbt.Build { "com.typesafe.akka" %% "akka-http-spray-json-experimental" % akkaVersion, "org.scala-lang" % "scala-reflect" % scalaVersionNumber, "org.scala-lang.modules" %% "scala-parser-combinators" % "1.0.4", - "com.typesafe.akka" %% "akka-testkit" % akkaVersion % "test", + "com.typesafe.akka" %% "akka-testkit" % akkaVersion % "test" + exclude("com.typesafe.akka", "akka-stream_2.11"), "org.scalatest" %% "scalatest" % scalaTestVersion % "test", "org.scalacheck" %% "scalacheck" % scalaCheckVersion % "test", "org.mockito" % "mockito-core" % mockitoVersion % "test", @@ -250,7 +255,7 @@ object Build extends sbt.Build { base = file("."), settings = commonSettings ++ noPublish ++ gearpumpUnidocSetting) .aggregate(shaded, core, daemon, streaming, services, external_kafka, external_monoid, - external_serializer, examples, storm, yarn, external_hbase, packProject, + external_serializer, examples, akkastream, storm, yarn, external_hbase, packProject, external_hadoopfs, integration_test).settings(Defaults.itSettings: _*) .disablePlugins(sbtassembly.AssemblyPlugin) @@ -314,14 +319,17 @@ object Build extends sbt.Build { lazy val serviceJvmSettings = commonSettings ++ noPublish ++ Seq( libraryDependencies ++= Seq( - "com.typesafe.akka" %% "akka-http-testkit" % akkaVersion % "test", + "com.typesafe.akka" %% "akka-http-testkit" % akkaVersion % "test" + exclude("com.typesafe.akka", "akka-stream_2.11"), "org.scalatest" %% "scalatest" % scalaTestVersion % "test", "com.lihaoyi" %% "upickle" % upickleVersion, "com.softwaremill.akka-http-session" %% "core" % "0.2.5", - "com.typesafe.akka" %% "akka-http-spray-json-experimental" % akkaVersion, + "com.typesafe.akka" %% "akka-http-spray-json-experimental" % akkaVersion + exclude("com.typesafe.akka", "akka-stream_2.11"), "com.github.scribejava" % "scribejava-apis" % "2.4.0", "com.ning" % "async-http-client" % "1.9.33", "org.webjars" % "angularjs" % "1.4.9", + "org.apache.hadoop" % "hadoop-common" % hadoopVersion, // angular 1.5 breaks ui-select, but we need ng-touch 1.5 "org.webjars.npm" % "angular-touch" % "1.5.0", @@ -382,14 +390,17 @@ object Build extends sbt.Build { lazy val akkastream = Project( id = "gearpump-experiments-akkastream", base = file("experiments/akkastream"), - settings = commonSettings ++ noPublish ++ myAssemblySettings ++ + settings = commonSettings ++ noPublish ++ Seq( libraryDependencies ++= Seq( - "org.json4s" %% "json4s-jackson" % "3.2.11" + "com.typesafe.akka" %% "akka-stream" % akkaStreamVersion, + "org.json4s" %% "json4s-jackson" % "3.2.11", + "org.scalatest" %% "scalatest" % scalaTestVersion % "test" ), - mainClass in(Compile, packageBin) := Some("akka.stream.gearpump.example.Test") - )) - .dependsOn(streaming % "test->test; provided", daemon % "test->test; provided") + dependencyOverrides += "com.typesafe.akka" %% "akka-stream" % akkaStreamVersion + )) + .dependsOn (services % "test->test; compile->compile", daemon % "test->test; compile->compile") + .disablePlugins(sbtassembly.AssemblyPlugin) lazy val storm = Project( id = "gearpump-experiments-storm", http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/project/Pack.scala ---------------------------------------------------------------------- diff --git a/project/Pack.scala b/project/Pack.scala index 1c87653..47d3064 100644 --- a/project/Pack.scala +++ b/project/Pack.scala @@ -69,7 +69,8 @@ object Pack extends sbt.Build { "worker" -> "org.apache.gearpump.cluster.main.Worker", "services" -> "org.apache.gearpump.services.main.Services", "yarnclient" -> "org.apache.gearpump.experiments.yarn.client.Client", - "storm" -> "org.apache.gearpump.experiments.storm.StormRunner" + "storm" -> "org.apache.gearpump.experiments.storm.StormRunner", + "akkastream" -> "org.apache.gearpump.akkastream.example.Test11" ), packJvmOpts := Map( "gear" -> Seq("-Djava.net.preferIPv4Stack=true", "-Dgearpump.home=${PROG_HOME}"), @@ -109,7 +110,13 @@ object Pack extends sbt.Build { "storm" -> Seq( "-server", "-Djava.net.preferIPv4Stack=true", - "-Dgearpump.home=${PROG_HOME}") + "-Dgearpump.home=${PROG_HOME}"), + + "akkastream" -> Seq( + "-server", + "-Djava.net.preferIPv4Stack=true", + "-Dgearpump.home=${PROG_HOME}", + "-Djava.rmi.server.hostname=localhost") ), packLibDir := Map( "lib" -> new ProjectsToPack(core.id, streaming.id), @@ -141,13 +148,14 @@ object Pack extends sbt.Build { "worker" -> daemonClassPath, "services" -> serviceClassPath, "yarnclient" -> yarnClassPath, - "storm" -> stormClassPath + "storm" -> stormClassPath, + "akkstream" -> daemonClassPath ), packArchivePrefix := projectName + "-" + scalaBinaryVersion.value, packArchiveExcludes := Seq("integrationtest") ) - ).dependsOn(core, streaming, services, yarn, storm). + ).dependsOn(core, streaming, services, yarn, storm, akkastream). disablePlugins(sbtassembly.AssemblyPlugin) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/project/scalastyle_config.xml ---------------------------------------------------------------------- diff --git a/project/scalastyle_config.xml b/project/scalastyle_config.xml new file mode 100644 index 0000000..1b0a838 --- /dev/null +++ b/project/scalastyle_config.xml @@ -0,0 +1,240 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<!-- + +If you wish to turn off checking for a section of code, you can put a comment in the source +before and after the section, with the following syntax: + + // scalastyle:off + ... // stuff that breaks the styles + // scalastyle:on + +You can also disable only one rule, by specifying its rule id, as specified in: + http://www.scalastyle.org/rules-0.8.0.html + + // scalastyle:off no.finalize + override def finalize(): Unit = ... + // scalastyle:on no.finalize + +This file is divided into 3 sections: + (1) rules that we enforce. + (2) rules that we would like to enforce, but haven't cleaned up the codebase to turn on yet + (or we need to make the scalastyle rule more configurable). + (3) rules that we don't want to enforce. +--> + +<scalastyle> + <name>Scalastyle standard configuration</name> + + <check level="error" class="org.scalastyle.file.FileTabChecker" enabled="true"></check> + + <check level="error" class="org.scalastyle.file.HeaderMatchesChecker" enabled="true"> + <parameters> + <parameter name="header"><![CDATA[/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */]]></parameter> + </parameters> + </check> + + <check level="error" class="org.scalastyle.scalariform.SpacesAfterPlusChecker" + enabled="true"></check> + + <check level="error" class="org.scalastyle.scalariform.SpacesBeforePlusChecker" + enabled="true"></check> + + <check level="error" class="org.scalastyle.file.FileLineLengthChecker" enabled="true"> + <parameters> + <parameter name="maxLineLength"><![CDATA[100]]></parameter> + <parameter name="tabSize"><![CDATA[2]]></parameter> + <parameter name="ignoreImports">true</parameter> + </parameters> + </check> + + <check level="error" class="org.scalastyle.scalariform.ClassNamesChecker" enabled="true"> + <parameters> + <parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter> + </parameters> + </check> + + <check level="error" class="org.scalastyle.scalariform.ObjectNamesChecker" enabled="true"> + <parameters> + <parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter> + </parameters> + </check> + + <check level="error" class="org.scalastyle.scalariform.PackageObjectNamesChecker" enabled="true"> + <parameters> + <parameter name="regex"><![CDATA[^[a-z][A-Za-z]*$]]></parameter> + </parameters> + </check> + + <check level="error" class="org.scalastyle.scalariform.ParameterNumberChecker" enabled="true"> + <parameters> + <parameter name="maxParameters"><![CDATA[10]]></parameter> + </parameters> + </check> + + <check level="error" class="org.scalastyle.scalariform.NoFinalizeChecker" enabled="true"></check> + + <check level="error" class="org.scalastyle.scalariform.CovariantEqualsChecker" + enabled="true"></check> + + <check level="error" class="org.scalastyle.scalariform.StructuralTypeChecker" + enabled="true"></check> + + <check level="error" class="org.scalastyle.scalariform.UppercaseLChecker" enabled="true"></check> + + <check level="error" class="org.scalastyle.scalariform.IfBraceChecker" enabled="true"> + <parameters> + <parameter name="singleLineAllowed"><![CDATA[true]]></parameter> + <parameter name="doubleLineAllowed"><![CDATA[true]]></parameter> + </parameters> + </check> + + <check level="error" class="org.scalastyle.scalariform.PublicMethodsHaveTypeChecker" + enabled="true"></check> + + <check level="error" class="org.scalastyle.scalariform.NonASCIICharacterChecker" + enabled="true"></check> + + <check level="error" class="org.scalastyle.scalariform.SpaceAfterCommentStartChecker" + enabled="true"></check> + + <check level="error" class="org.scalastyle.scalariform.EnsureSingleSpaceBeforeTokenChecker" + enabled="true"> + <parameters> + <parameter name="tokens">ARROW, EQUALS, ELSE, TRY, CATCH, FINALLY, LARROW, RARROW</parameter> + </parameters> + </check> + + <check level="error" class="org.scalastyle.scalariform.EnsureSingleSpaceAfterTokenChecker" + enabled="true"> + <parameters> + <parameter name="tokens">ARROW, EQUALS, COMMA, COLON, IF, ELSE, DO, WHILE, FOR, MATCH, TRY, + CATCH, FINALLY, LARROW, RARROW + </parameter> + </parameters> + </check> + + <!-- ??? usually shouldn't be checked into the code base. --> + <check level="error" class="org.scalastyle.scalariform.NotImplementedErrorUsage" + enabled="true"></check> + + <check customId="println" level="error" class="org.scalastyle.scalariform.TokenChecker" + enabled="true"> + <parameters> + <parameter name="regex">^println$</parameter> + </parameters> + <customMessage><![CDATA[Are you sure you want to println? If yes, wrap the code block with + // scalastyle:off println + println(...) + // scalastyle:on println]]></customMessage> + </check> + + <check customId="runtimeaddshutdownhook" level="error" class="org.scalastyle.file.RegexChecker" + enabled="true"> + <parameters> + <parameter name="regex">Runtime\.getRuntime\.addShutdownHook</parameter> + </parameters> + <customMessage><![CDATA[ + Are you sure that you want to use Runtime.getRuntime.addShutdownHook? In most cases, you should use + ShutdownHookManager.addShutdownHook instead. + If you must use Runtime.getRuntime.addShutdownHook, wrap the code block with + // scalastyle:off runtimeaddshutdownhook + Runtime.getRuntime.addShutdownHook(...) + // scalastyle:on runtimeaddshutdownhook + ]]></customMessage> + </check> + + <check customId="mutablesynchronizedbuffer" level="error" class="org.scalastyle.file.RegexChecker" + enabled="true"> + <parameters> + <parameter name="regex">mutable\.SynchronizedBuffer</parameter> + </parameters> + <customMessage><![CDATA[ + Are you sure that you want to use mutable.SynchronizedBuffer? In most cases, you should use + java.util.concurrent.ConcurrentLinkedQueue instead. + If you must use mutable.SynchronizedBuffer, wrap the code block with + // scalastyle:off mutablesynchronizedbuffer + mutable.SynchronizedBuffer[...] + // scalastyle:on mutablesynchronizedbuffer + ]]></customMessage> + </check> + + <check customId="javaconversions" level="error" class="org.scalastyle.scalariform.TokenChecker" + enabled="true"> + <parameters> + <parameter name="regex">JavaConversions</parameter> + </parameters> + <customMessage>Instead of importing implicits in scala.collection.JavaConversions._, import + scala.collection.JavaConverters._ and use .asScala / .asJava methods + </customMessage> + </check> + + <check level="error" class="org.scalastyle.scalariform.DisallowSpaceBeforeTokenChecker" + enabled="true"> + <parameters> + <parameter name="tokens">COMMA</parameter> + </parameters> + </check> + + <!-- Should add single Space between ')' and '{' --> + <check customId="SingleSpaceBetweenRParenAndLCurlyBrace" level="error" + class="org.scalastyle.file.RegexChecker" enabled="true"> + <parameters> + <parameter name="regex">\)\{</parameter> + </parameters> + <customMessage><![CDATA[ + Single Space between ')' and `{`. + ]]></customMessage> + </check> + + <check level="error" class="org.scalastyle.scalariform.MethodNamesChecker" enabled="false"> + <parameters> + <parameter name="regex"><![CDATA[^[a-z][A-Za-z0-9]*$]]></parameter> + </parameters> + </check> + + <check level="error" class="org.scalastyle.scalariform.EqualsHashCodeChecker" + enabled="true"></check> + + <check level="error" class="org.scalastyle.file.IndentationChecker" enabled="true"> + <parameters> + <parameter name="tabSize">2</parameter> + <parameter name="methodParamIndentSize">4</parameter> + </parameters> + </check> + + <!-- Don't allow return --> + <check level="error" class="org.scalastyle.scalariform.ReturnChecker" enabled="true"></check> + +</scalastyle> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/scalastyle-config.xml ---------------------------------------------------------------------- diff --git a/scalastyle-config.xml b/scalastyle-config.xml deleted file mode 100644 index 1b0a838..0000000 --- a/scalastyle-config.xml +++ /dev/null @@ -1,240 +0,0 @@ -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one - ~ or more contributor license agreements. See the NOTICE file - ~ distributed with this work for additional information - ~ regarding copyright ownership. The ASF licenses this file - ~ to you under the Apache License, Version 2.0 (the - ~ "License"); you may not use this file except in compliance - ~ with the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> -<!-- - -If you wish to turn off checking for a section of code, you can put a comment in the source -before and after the section, with the following syntax: - - // scalastyle:off - ... // stuff that breaks the styles - // scalastyle:on - -You can also disable only one rule, by specifying its rule id, as specified in: - http://www.scalastyle.org/rules-0.8.0.html - - // scalastyle:off no.finalize - override def finalize(): Unit = ... - // scalastyle:on no.finalize - -This file is divided into 3 sections: - (1) rules that we enforce. - (2) rules that we would like to enforce, but haven't cleaned up the codebase to turn on yet - (or we need to make the scalastyle rule more configurable). - (3) rules that we don't want to enforce. ---> - -<scalastyle> - <name>Scalastyle standard configuration</name> - - <check level="error" class="org.scalastyle.file.FileTabChecker" enabled="true"></check> - - <check level="error" class="org.scalastyle.file.HeaderMatchesChecker" enabled="true"> - <parameters> - <parameter name="header"><![CDATA[/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */]]></parameter> - </parameters> - </check> - - <check level="error" class="org.scalastyle.scalariform.SpacesAfterPlusChecker" - enabled="true"></check> - - <check level="error" class="org.scalastyle.scalariform.SpacesBeforePlusChecker" - enabled="true"></check> - - <check level="error" class="org.scalastyle.file.FileLineLengthChecker" enabled="true"> - <parameters> - <parameter name="maxLineLength"><![CDATA[100]]></parameter> - <parameter name="tabSize"><![CDATA[2]]></parameter> - <parameter name="ignoreImports">true</parameter> - </parameters> - </check> - - <check level="error" class="org.scalastyle.scalariform.ClassNamesChecker" enabled="true"> - <parameters> - <parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter> - </parameters> - </check> - - <check level="error" class="org.scalastyle.scalariform.ObjectNamesChecker" enabled="true"> - <parameters> - <parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter> - </parameters> - </check> - - <check level="error" class="org.scalastyle.scalariform.PackageObjectNamesChecker" enabled="true"> - <parameters> - <parameter name="regex"><![CDATA[^[a-z][A-Za-z]*$]]></parameter> - </parameters> - </check> - - <check level="error" class="org.scalastyle.scalariform.ParameterNumberChecker" enabled="true"> - <parameters> - <parameter name="maxParameters"><![CDATA[10]]></parameter> - </parameters> - </check> - - <check level="error" class="org.scalastyle.scalariform.NoFinalizeChecker" enabled="true"></check> - - <check level="error" class="org.scalastyle.scalariform.CovariantEqualsChecker" - enabled="true"></check> - - <check level="error" class="org.scalastyle.scalariform.StructuralTypeChecker" - enabled="true"></check> - - <check level="error" class="org.scalastyle.scalariform.UppercaseLChecker" enabled="true"></check> - - <check level="error" class="org.scalastyle.scalariform.IfBraceChecker" enabled="true"> - <parameters> - <parameter name="singleLineAllowed"><![CDATA[true]]></parameter> - <parameter name="doubleLineAllowed"><![CDATA[true]]></parameter> - </parameters> - </check> - - <check level="error" class="org.scalastyle.scalariform.PublicMethodsHaveTypeChecker" - enabled="true"></check> - - <check level="error" class="org.scalastyle.scalariform.NonASCIICharacterChecker" - enabled="true"></check> - - <check level="error" class="org.scalastyle.scalariform.SpaceAfterCommentStartChecker" - enabled="true"></check> - - <check level="error" class="org.scalastyle.scalariform.EnsureSingleSpaceBeforeTokenChecker" - enabled="true"> - <parameters> - <parameter name="tokens">ARROW, EQUALS, ELSE, TRY, CATCH, FINALLY, LARROW, RARROW</parameter> - </parameters> - </check> - - <check level="error" class="org.scalastyle.scalariform.EnsureSingleSpaceAfterTokenChecker" - enabled="true"> - <parameters> - <parameter name="tokens">ARROW, EQUALS, COMMA, COLON, IF, ELSE, DO, WHILE, FOR, MATCH, TRY, - CATCH, FINALLY, LARROW, RARROW - </parameter> - </parameters> - </check> - - <!-- ??? usually shouldn't be checked into the code base. --> - <check level="error" class="org.scalastyle.scalariform.NotImplementedErrorUsage" - enabled="true"></check> - - <check customId="println" level="error" class="org.scalastyle.scalariform.TokenChecker" - enabled="true"> - <parameters> - <parameter name="regex">^println$</parameter> - </parameters> - <customMessage><![CDATA[Are you sure you want to println? If yes, wrap the code block with - // scalastyle:off println - println(...) - // scalastyle:on println]]></customMessage> - </check> - - <check customId="runtimeaddshutdownhook" level="error" class="org.scalastyle.file.RegexChecker" - enabled="true"> - <parameters> - <parameter name="regex">Runtime\.getRuntime\.addShutdownHook</parameter> - </parameters> - <customMessage><![CDATA[ - Are you sure that you want to use Runtime.getRuntime.addShutdownHook? In most cases, you should use - ShutdownHookManager.addShutdownHook instead. - If you must use Runtime.getRuntime.addShutdownHook, wrap the code block with - // scalastyle:off runtimeaddshutdownhook - Runtime.getRuntime.addShutdownHook(...) - // scalastyle:on runtimeaddshutdownhook - ]]></customMessage> - </check> - - <check customId="mutablesynchronizedbuffer" level="error" class="org.scalastyle.file.RegexChecker" - enabled="true"> - <parameters> - <parameter name="regex">mutable\.SynchronizedBuffer</parameter> - </parameters> - <customMessage><![CDATA[ - Are you sure that you want to use mutable.SynchronizedBuffer? In most cases, you should use - java.util.concurrent.ConcurrentLinkedQueue instead. - If you must use mutable.SynchronizedBuffer, wrap the code block with - // scalastyle:off mutablesynchronizedbuffer - mutable.SynchronizedBuffer[...] - // scalastyle:on mutablesynchronizedbuffer - ]]></customMessage> - </check> - - <check customId="javaconversions" level="error" class="org.scalastyle.scalariform.TokenChecker" - enabled="true"> - <parameters> - <parameter name="regex">JavaConversions</parameter> - </parameters> - <customMessage>Instead of importing implicits in scala.collection.JavaConversions._, import - scala.collection.JavaConverters._ and use .asScala / .asJava methods - </customMessage> - </check> - - <check level="error" class="org.scalastyle.scalariform.DisallowSpaceBeforeTokenChecker" - enabled="true"> - <parameters> - <parameter name="tokens">COMMA</parameter> - </parameters> - </check> - - <!-- Should add single Space between ')' and '{' --> - <check customId="SingleSpaceBetweenRParenAndLCurlyBrace" level="error" - class="org.scalastyle.file.RegexChecker" enabled="true"> - <parameters> - <parameter name="regex">\)\{</parameter> - </parameters> - <customMessage><![CDATA[ - Single Space between ')' and `{`. - ]]></customMessage> - </check> - - <check level="error" class="org.scalastyle.scalariform.MethodNamesChecker" enabled="false"> - <parameters> - <parameter name="regex"><![CDATA[^[a-z][A-Za-z0-9]*$]]></parameter> - </parameters> - </check> - - <check level="error" class="org.scalastyle.scalariform.EqualsHashCodeChecker" - enabled="true"></check> - - <check level="error" class="org.scalastyle.file.IndentationChecker" enabled="true"> - <parameters> - <parameter name="tabSize">2</parameter> - <parameter name="methodParamIndentSize">4</parameter> - </parameters> - </check> - - <!-- Don't allow return --> - <check level="error" class="org.scalastyle.scalariform.ReturnChecker" enabled="true"></check> - -</scalastyle> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala index b217363..46e16cf 100644 --- a/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala +++ b/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala @@ -149,7 +149,7 @@ class AppMasterService(val master: ActorRef, } } } ~ - path("metrics" / RestPath) { path => + path("metrics" / RemainingPath) { path => parameterMap { optionMap => parameter("aggregator" ? "") { aggregator => parameter(ReadOption.Key ? ReadOption.ReadLatest) { readOption => http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala index bf7092e..a763be6 100644 --- a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala +++ b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala @@ -102,7 +102,7 @@ class MasterService(val master: ActorRef, failWith(ex) } } ~ - path("metrics" / RestPath) { path => + path("metrics" / RemainingPath) { path => parameters(ParamMagnet(ReadOption.Key ? ReadOption.ReadLatest)) { readOption: String => val query = QueryHistoryMetrics(path.head.toString, readOption) onComplete(askActor[HistoryMetrics](master, query)) { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/services/jvm/src/main/scala/org/apache/gearpump/services/SecurityService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/SecurityService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/SecurityService.scala index 804b34f..8ae8dbe 100644 --- a/services/jvm/src/main/scala/org/apache/gearpump/services/SecurityService.scala +++ b/services/jvm/src/main/scala/org/apache/gearpump/services/SecurityService.scala @@ -60,8 +60,8 @@ import org.apache.gearpump.services.util.UpickleUtil._ class SecurityService(inner: RouteService, implicit val system: ActorSystem) extends RouteService { // Use scheme "GearpumpBasic" to avoid popping up web browser native authentication box. - private val challenge = HttpChallenge(scheme = "GearpumpBasic", realm = "gearpump", - params = Map.empty) + private val challenge = HttpChallenge(scheme = "GearpumpBasic", realm = Some("gearpump"), + params = Map.empty[String,String]) val LOG = LogUtil.getLogger(getClass, "AUDIT") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala index 284d3f2..7b33987 100644 --- a/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala +++ b/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala @@ -19,10 +19,12 @@ package org.apache.gearpump.services import akka.actor.ActorSystem +import akka.http.scaladsl.marshalling.ToResponseMarshallable import akka.http.scaladsl.model._ import akka.http.scaladsl.server.Directives._ +import akka.http.scaladsl.marshalling.ToResponseMarshallable._ +import akka.http.scaladsl.server.{RejectionHandler, StandardRoute} import akka.stream.Materializer - import org.apache.gearpump.util.Util // NOTE: This cannot be removed!!! import org.apache.gearpump.services.util.UpickleUtil._ @@ -56,14 +58,14 @@ class StaticService(override val system: ActorSystem, supervisorPath: String) getFromResource("index.html") } ~ path("favicon.ico") { - complete(StatusCodes.NotFound) + complete(ToResponseMarshallable(StatusCodes.NotFound)) } ~ pathPrefix("webjars") { get { getFromResourceDirectory("META-INF/resources/webjars") } } ~ - path(Rest) { path => + path(Remaining) { path => getFromResource("%s" format path) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/services/jvm/src/main/scala/org/apache/gearpump/services/WorkerService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/WorkerService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/WorkerService.scala index 8268d61..954fe97 100644 --- a/services/jvm/src/main/scala/org/apache/gearpump/services/WorkerService.scala +++ b/services/jvm/src/main/scala/org/apache/gearpump/services/WorkerService.scala @@ -63,7 +63,7 @@ class WorkerService(val master: ActorRef, override val system: ActorSystem) failWith(ex) } } ~ - path("metrics" / RestPath ) { path => + path("metrics" / RemainingPath ) { path => val workerId = WorkerId.parse(workerIdString) parameter(ReadOption.Key ? ReadOption.ReadLatest) { readOption => val query = QueryHistoryMetrics(path.head.toString, readOption) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala index 8de291c..b09d9b9 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala @@ -64,7 +64,7 @@ class OpTranslator extends java.io.Serializable { userConfig) case ProcessorOp(processor, parallelism, conf, description) => DefaultProcessor(parallelism, - description = description + "." + func.description, + description = description + " " + func.description, userConfig, processor) case DataSinkOp(dataSink, parallelism, conf, description) => DataSinkProcessor(dataSink, parallelism, description + func.description) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala index c0b6a29..eb52700 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala @@ -23,8 +23,6 @@ import java.util import java.util.concurrent.TimeUnit import akka.actor._ -import org.apache.gearpump.streaming.source.{Watermark, DataSourceTask} -import org.slf4j.Logger import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.gs.collections.impl.map.mutable.primitive.IntShortHashMap import org.apache.gearpump.metrics.Metrics @@ -32,8 +30,10 @@ import org.apache.gearpump.serializer.SerializationFramework import org.apache.gearpump.streaming.AppMasterToExecutor._ import org.apache.gearpump.streaming.ExecutorToAppMaster._ import org.apache.gearpump.streaming.ProcessorId +import org.apache.gearpump.streaming.source.Watermark import org.apache.gearpump.util.{LogUtil, TimeOutScheduler} import org.apache.gearpump.{Message, TimeStamp} +import org.slf4j.Logger /** * @@ -52,16 +52,15 @@ class TaskActor( def serializerPool: SerializationFramework = inputSerializerPool - import taskContextData._ - import org.apache.gearpump.streaming.Constants._ import org.apache.gearpump.streaming.task.TaskActor._ + import taskContextData._ val config = context.system.settings.config val LOG: Logger = LogUtil.getLogger(getClass, app = appId, executor = executorId, task = taskId) // Metrics - private val metricName = s"app${appId}.processor${taskId.processorId}.task${taskId.index}" + private val metricName = s"app$appId.processor${taskId.processorId}.task${taskId.index}" private val receiveLatency = Metrics(context.system).histogram( s"$metricName:receiveLatency", sampleRate = 1) private val processTime = Metrics(context.system).histogram(s"$metricName:processTime") @@ -76,9 +75,9 @@ class TaskActor( private var life = taskContextData.life // Latency probe - import scala.concurrent.duration._ - import context.dispatcher + + import scala.concurrent.duration._ final val LATENCY_PROBE_INTERVAL = FiniteDuration(1, TimeUnit.SECONDS) // Clock report interval
