Repository: incubator-gearpump Updated Branches: refs/heads/master a01809b25 -> c3d5eb63f
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/test/scala/org/apache/gearpump/cluster/master/InMemoryKVServiceSpec.scala ---------------------------------------------------------------------- diff --git a/daemon/src/test/scala/org/apache/gearpump/cluster/master/InMemoryKVServiceSpec.scala b/daemon/src/test/scala/org/apache/gearpump/cluster/master/InMemoryKVServiceSpec.scala deleted file mode 100644 index 325a484..0000000 --- a/daemon/src/test/scala/org/apache/gearpump/cluster/master/InMemoryKVServiceSpec.scala +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.cluster.master - -import scala.concurrent.duration._ - -import akka.actor.Props -import akka.testkit.TestProbe -import com.typesafe.config.Config -import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} - -import org.apache.gearpump.cluster.master.InMemoryKVService._ -import org.apache.gearpump.cluster.{MasterHarness, TestUtil} - -class InMemoryKVServiceSpec - extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness { - - override def beforeEach(): Unit = { - startActorSystem() - } - - override def afterEach(): Unit = { - shutdownActorSystem() - } - - override def config: Config = TestUtil.MASTER_CONFIG - - "KVService" should "get, put, delete correctly" in { - val system = getActorSystem - val kvService = system.actorOf(Props(new InMemoryKVService())) - val group = "group" - - val client = TestProbe()(system) - - client.send(kvService, PutKV(group, "key", 1)) - client.expectMsg(PutKVSuccess) - - client.send(kvService, PutKV(group, "key", 2)) - client.expectMsg(PutKVSuccess) - - client.send(kvService, GetKV(group, "key")) - client.expectMsg(GetKVSuccess("key", 2)) - - client.send(kvService, DeleteKVGroup(group)) - - // After DeleteGroup, it no longer accept Get and Put message for this group. - client.send(kvService, GetKV(group, "key")) - client.expectNoMsg(3.seconds) - - client.send(kvService, PutKV(group, "key", 3)) - client.expectNoMsg(3.seconds) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala ---------------------------------------------------------------------- diff --git a/daemon/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala b/daemon/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala deleted file mode 100644 index e82dff3..0000000 --- a/daemon/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala +++ /dev/null @@ -1,232 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.cluster.scheduler - -import org.apache.gearpump.cluster.worker.WorkerId - -import scala.concurrent.duration._ - -import akka.actor.{ActorSystem, Props} -import akka.testkit.{ImplicitSender, TestKit, TestProbe} -import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} - -import org.apache.gearpump.cluster.AppMasterToMaster.RequestResource -import org.apache.gearpump.cluster.MasterToAppMaster.ResourceAllocated -import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, WorkerRegistered} -import org.apache.gearpump.cluster.TestUtil -import org.apache.gearpump.cluster.WorkerToMaster.ResourceUpdate -import org.apache.gearpump.cluster.master.Master.MasterInfo -import org.apache.gearpump.cluster.scheduler.Priority.{HIGH, LOW, NORMAL} -import org.apache.gearpump.cluster.scheduler.Scheduler.ApplicationFinished - -class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender - with WordSpecLike with Matchers with BeforeAndAfterAll{ - - def this() = this(ActorSystem("PrioritySchedulerSpec", TestUtil.DEFAULT_CONFIG)) - val appId = 0 - val workerId1: WorkerId = WorkerId(1, 0L) - val workerId2: WorkerId = WorkerId(2, 0L) - val mockAppMaster = TestProbe() - val mockWorker1 = TestProbe() - val mockWorker2 = TestProbe() - - override def afterAll { - TestKit.shutdownActorSystem(system) - } - - "The scheduler" should { - "update resource only when the worker is registered" in { - val scheduler = system.actorOf(Props(classOf[PriorityScheduler])) - scheduler ! ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)) - expectMsg(UpdateResourceFailed(s"ResourceUpdate failed! The worker $workerId1 has not been " + - s"registered into master")) - } - - "drop application's resource requests when the application is removed" in { - val scheduler = system.actorOf(Props(classOf[PriorityScheduler])) - val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, HIGH, Relaxation.ANY) - val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, HIGH, Relaxation.ANY) - scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref) - scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref) - scheduler.tell(ApplicationFinished(appId), mockAppMaster.ref) - scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref) - scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref) - mockAppMaster.expectNoMsg(5.seconds) - } - } - - def sameElement(left: ResourceAllocated, right: ResourceAllocated): Boolean = { - left.allocations.sortBy(_.workerId).sameElements(right.allocations.sortBy(_.workerId)) - } - - "The resource request with higher priority" should { - "be handled first" in { - val scheduler = system.actorOf(Props(classOf[PriorityScheduler])) - val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, LOW, Relaxation.ANY) - val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, NORMAL, Relaxation.ANY) - val request3 = ResourceRequest(Resource(30), WorkerId.unspecified, HIGH, Relaxation.ANY) - - scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref) - scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref) - scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref) - scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref) - scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref) - scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref) - - var expect = ResourceAllocated( - Array(ResourceAllocation(Resource(30), mockWorker1.ref, workerId1))) - mockAppMaster.expectMsgPF(5.seconds) { - case request: ResourceAllocated if sameElement(request, expect) => Unit - } - - expect = ResourceAllocated( - Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1))) - mockAppMaster.expectMsgPF(5.seconds) { - case request: ResourceAllocated if sameElement(request, expect) => Unit - } - - expect = ResourceAllocated( - Array(ResourceAllocation(Resource(40), mockWorker1.ref, workerId1))) - mockAppMaster.expectMsgPF(5.seconds) { - case request: ResourceAllocated if sameElement(request, expect) => Unit - } - - scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref) - scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource.empty), mockWorker1.ref) - scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref) - - expect = ResourceAllocated( - Array(ResourceAllocation(Resource(40), mockWorker2.ref, workerId2))) - mockAppMaster.expectMsgPF(5.seconds) { - case request: ResourceAllocated if sameElement(request, expect) => Unit - } - } - } - - "The resource request which delivered earlier" should { - "be handled first if the priorities are the same" in { - val scheduler = system.actorOf(Props(classOf[PriorityScheduler])) - val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, HIGH, Relaxation.ANY) - val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, HIGH, Relaxation.ANY) - scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref) - scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref) - scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref) - scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref) - - var expect = ResourceAllocated( - Array(ResourceAllocation(Resource(40), mockWorker1.ref, workerId1))) - mockAppMaster.expectMsgPF(5.seconds) { - case request: ResourceAllocated if sameElement(request, expect) => Unit - } - expect = ResourceAllocated( - Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1))) - mockAppMaster.expectMsgPF(5.seconds) { - case request: ResourceAllocated if sameElement(request, expect) => Unit - } - } - } - - "The PriorityScheduler" should { - "handle the resource request with different relaxation" in { - val scheduler = system.actorOf(Props(classOf[PriorityScheduler])) - val request1 = ResourceRequest(Resource(40), workerId2, HIGH, Relaxation.SPECIFICWORKER) - val request2 = ResourceRequest(Resource(20), workerId1, NORMAL, Relaxation.SPECIFICWORKER) - - scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref) - scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref) - scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref) - scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref) - - var expect = ResourceAllocated( - Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1))) - mockAppMaster.expectMsgPF(5.seconds) { - case request: ResourceAllocated if sameElement(request, expect) => Unit - } - - scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref) - scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref) - - expect = ResourceAllocated( - Array(ResourceAllocation(Resource(40), mockWorker2.ref, workerId2))) - mockAppMaster.expectMsgPF(5.seconds) { - case request: ResourceAllocated if sameElement(request, expect) => Unit - } - - val request3 = ResourceRequest( - Resource(30), WorkerId.unspecified, NORMAL, Relaxation.ANY, executorNum = 2) - scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref) - - expect = ResourceAllocated(Array( - ResourceAllocation(Resource(15), mockWorker1.ref, workerId1), - ResourceAllocation(Resource(15), mockWorker2.ref, workerId2))) - mockAppMaster.expectMsgPF(5.seconds) { - case request: ResourceAllocated if sameElement(request, expect) => Unit - } - - // We have to manually update the resource on each worker - scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(65)), mockWorker1.ref) - scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(45)), mockWorker2.ref) - val request4 = ResourceRequest(Resource(60), WorkerId(0, 0L), NORMAL, Relaxation.ONEWORKER) - scheduler.tell(RequestResource(appId, request4), mockAppMaster.ref) - - expect = ResourceAllocated( - Array(ResourceAllocation(Resource(60), mockWorker1.ref, workerId1))) - mockAppMaster.expectMsgPF(5.seconds) { - case request: ResourceAllocated if sameElement(request, expect) => Unit - } - } - } - - "The PriorityScheduler" should { - "handle the resource request with different executor number" in { - val scheduler = system.actorOf(Props(classOf[PriorityScheduler])) - scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref) - scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref) - scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref) - scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref) - - // By default, the request requires only one executor - val request2 = ResourceRequest(Resource(20), WorkerId.unspecified) - scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref) - val allocations2 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated] - assert(allocations2.allocations.length == 1) - assert(allocations2.allocations.head.resource == Resource(20)) - - val request3 = ResourceRequest(Resource(24), WorkerId.unspecified, executorNum = 3) - scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref) - val allocations3 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated] - assert(allocations3.allocations.length == 3) - assert(allocations3.allocations.forall(_.resource == Resource(8))) - - // The total available resource can not satisfy the requirements with executor number - scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(30)), mockWorker1.ref) - scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(30)), mockWorker2.ref) - val request4 = ResourceRequest(Resource(60), WorkerId.unspecified, executorNum = 3) - scheduler.tell(RequestResource(appId, request4), mockAppMaster.ref) - val allocations4 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated] - assert(allocations4.allocations.length == 2) - assert(allocations4.allocations.forall(_.resource == Resource(20))) - - // When new resources are available, the remaining request will be satisfied - scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(40)), mockWorker1.ref) - val allocations5 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated] - assert(allocations5.allocations.length == 1) - assert(allocations4.allocations.forall(_.resource == Resource(20))) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala ---------------------------------------------------------------------- diff --git a/daemon/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala b/daemon/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala deleted file mode 100644 index bf25057..0000000 --- a/daemon/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.cluster.worker - -import scala.concurrent.Await -import scala.concurrent.duration._ - -import akka.actor.{ActorSystem, PoisonPill, Props} -import akka.testkit.TestProbe -import com.typesafe.config.{Config, ConfigFactory} -import org.scalatest._ - -import org.apache.gearpump.cluster.AppMasterToWorker.{ChangeExecutorResource, LaunchExecutor, ShutdownExecutor} -import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, WorkerRegistered} -import org.apache.gearpump.cluster.WorkerToAppMaster.{ExecutorLaunchRejected, ShutdownExecutorFailed, ShutdownExecutorSucceed} -import org.apache.gearpump.cluster.WorkerToMaster.{RegisterNewWorker, RegisterWorker, ResourceUpdate} -import org.apache.gearpump.cluster.master.Master.MasterInfo -import org.apache.gearpump.cluster.scheduler.Resource -import org.apache.gearpump.cluster.{ExecutorJVMConfig, MasterHarness, TestUtil} -import org.apache.gearpump.util.{ActorSystemBooter, ActorUtil, Constants} - -class WorkerSpec extends WordSpec with Matchers with BeforeAndAfterEach with MasterHarness { - override def config: Config = TestUtil.DEFAULT_CONFIG - - val appId = 1 - val workerId: WorkerId = WorkerId(1, 0L) - val executorId = 1 - var masterProxy: TestProbe = null - var mockMaster: TestProbe = null - var client: TestProbe = null - val workerSlots = 50 - - override def beforeEach(): Unit = { - startActorSystem() - mockMaster = TestProbe()(getActorSystem) - masterProxy = TestProbe()(getActorSystem) - client = TestProbe()(getActorSystem) - } - - override def afterEach(): Unit = { - shutdownActorSystem() - } - - "The new started worker" should { - "kill itself if no response from Master after registering" in { - val worker = getActorSystem.actorOf(Props(classOf[Worker], mockMaster.ref)) - mockMaster watch worker - mockMaster.expectMsg(RegisterNewWorker) - mockMaster.expectTerminated(worker, 60.seconds) - } - } - - "Worker" should { - "init its resource from the gearpump config" in { - val config = ConfigFactory.parseString(s"${Constants.GEARPUMP_WORKER_SLOTS} = $workerSlots"). - withFallback(TestUtil.DEFAULT_CONFIG) - val workerSystem = ActorSystem("WorkerSystem", config) - val worker = workerSystem.actorOf(Props(classOf[Worker], mockMaster.ref)) - mockMaster watch worker - mockMaster.expectMsg(RegisterNewWorker) - - worker.tell(WorkerRegistered(workerId, MasterInfo(mockMaster.ref)), mockMaster.ref) - mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(workerSlots))) - - worker.tell( - UpdateResourceFailed("Test resource update failed", new Exception()), mockMaster.ref) - mockMaster.expectTerminated(worker, 5.seconds) - workerSystem.terminate() - Await.result(workerSystem.whenTerminated, Duration.Inf) - } - } - - "Worker" should { - "update its remaining resource when launching and shutting down executors" in { - val worker = getActorSystem.actorOf(Props(classOf[Worker], masterProxy.ref)) - masterProxy.expectMsg(RegisterNewWorker) - - worker.tell(WorkerRegistered(workerId, MasterInfo(mockMaster.ref)), mockMaster.ref) - mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(100))) - - val executorName = ActorUtil.actorNameForExecutor(appId, executorId) - // This is an actor path which the ActorSystemBooter will report back to, - // not needed in this test - val reportBack = "dummy" - val executionContext = ExecutorJVMConfig(Array.empty[String], - getActorSystem.settings.config.getString(Constants.GEARPUMP_APPMASTER_ARGS).split(" "), - classOf[ActorSystemBooter].getName, Array(executorName, reportBack), None, - username = "user") - - // Test LaunchExecutor - worker.tell(LaunchExecutor(appId, executorId, Resource(101), executionContext), - mockMaster.ref) - mockMaster.expectMsg(ExecutorLaunchRejected("There is no free resource on this machine")) - - worker.tell(LaunchExecutor(appId, executorId, Resource(5), executionContext), mockMaster.ref) - mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(95))) - - worker.tell(ChangeExecutorResource(appId, executorId, Resource(2)), client.ref) - mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(98))) - - // Test terminationWatch - worker.tell(ShutdownExecutor(appId, executorId, "Test shut down executor"), client.ref) - mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(100))) - client.expectMsg(ShutdownExecutorSucceed(1, 1)) - - worker.tell(ShutdownExecutor(appId, executorId + 1, "Test shut down executor"), client.ref) - client.expectMsg(ShutdownExecutorFailed( - s"Can not find executor ${executorId + 1} for app $appId")) - - mockMaster.ref ! PoisonPill - masterProxy.expectMsg(RegisterWorker(workerId)) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala ---------------------------------------------------------------------- diff --git a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala index 84dec70..2988f5b 100644 --- a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala +++ b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala @@ -20,7 +20,6 @@ package org.apache.gearpump.redis import java.nio.charset.Charset object RedisMessage { - private def toBytes(strings: List[String]): List[Array[Byte]] = strings.map(string => string.getBytes(Charset.forName("UTF8"))) @@ -48,11 +47,10 @@ object RedisMessage { * @param latitude * @param member */ - case class GEOADD(key: Array[Byte], longitude: Double, - latitude: Double, member: Array[Byte]) { - def this(key: String, longitude: Double, - latitude: Double, member: String) = + case class GEOADD(key: Array[Byte], longitude: Double, latitude: Double, member: Array[Byte]) { + def this(key: String, longitude: Double, latitude: Double, member: String) = { this(toBytes(key), longitude, latitude, toBytes(member)) + } } } @@ -66,7 +64,9 @@ object RedisMessage { * @param field */ case class HDEL(key: Array[Byte], field: Array[Byte]) { - def this(key: String, field: String) = this(toBytes(key), toBytes(field)) + def this(key: String, field: String) = { + this(toBytes(key), toBytes(field)) + } } /** @@ -77,8 +77,9 @@ object RedisMessage { * @param increment */ case class HINCRBY(key: Array[Byte], field: Array[Byte], increment: Long) { - def this(key: String, field: String, increment: Long) = + def this(key: String, field: String, increment: Long) = { this(toBytes(key), toBytes(field), increment) + } } /** @@ -89,8 +90,9 @@ object RedisMessage { * @param increment */ case class HINCRBYFLOAT(key: Array[Byte], field: Array[Byte], increment: Float) { - def this(key: String, field: String, increment: Float) = + def this(key: String, field: String, increment: Float) = { this(toBytes(key), toBytes(field), increment) + } } @@ -102,8 +104,9 @@ object RedisMessage { * @param value */ case class HSET(key: Array[Byte], field: Array[Byte], value: Array[Byte]) { - def this(key: String, field: String, value: String) = + def this(key: String, field: String, value: String) = { this(toBytes(key), toBytes(field), toBytes(value)) + } } /** @@ -114,8 +117,9 @@ object RedisMessage { * @param value */ case class HSETNX(key: Array[Byte], field: Array[Byte], value: Array[Byte]) { - def this(key: String, field: String, value: String) = + def this(key: String, field: String, value: String) = { this(toBytes(key), toBytes(field), toBytes(value)) + } } } @@ -142,8 +146,9 @@ object RedisMessage { * @param value */ case class LPUSH(key: Array[Byte], value: Array[Byte]) { - - def this(key: String, value: String) = this(key, toBytes(value)) + def this(key: String, value: String) = { + this(key, toBytes(value)) + } } /** @@ -153,7 +158,9 @@ object RedisMessage { * @param value */ case class LPUSHX(key: Array[Byte], value: Array[Byte]) { - def this(key: String, value: String) = this(toBytes(key), toBytes(value)) + def this(key: String, value: String) = { + this(toBytes(key), toBytes(value)) + } } /** @@ -164,7 +171,9 @@ object RedisMessage { * @param value */ case class LSET(key: Array[Byte], index: Long, value: Array[Byte]) { - def this(key: String, index: Long, value: String) = this(toBytes(key), index, toBytes(value)) + def this(key: String, index: Long, value: String) = { + this(toBytes(key), index, toBytes(value)) + } } /** @@ -174,8 +183,9 @@ object RedisMessage { * @param value */ case class RPUSH(key: Array[Byte], value: Array[Byte]) { - - def this(key: String, value: String) = this(key, toBytes(value)) + def this(key: String, value: String) = { + this(key, toBytes(value)) + } } /** @@ -185,7 +195,9 @@ object RedisMessage { * @param value */ case class RPUSHX(key: Array[Byte], value: Array[Byte]) { - def this(key: String, value: String) = this(toBytes(key), toBytes(value)) + def this(key: String, value: String) = { + this(toBytes(key), toBytes(value)) + } } } @@ -198,8 +210,9 @@ object RedisMessage { * @param message */ case class DEL(message: Array[Byte]) { - - def this(message: String) = this(toBytes(message)) + def this(message: String) = { + this(toBytes(message)) + } } /** @@ -208,7 +221,9 @@ object RedisMessage { * @param key */ case class EXPIRE(key: Array[Byte], seconds: Int) { - def this(key: String, seconds: Int) = this(toBytes(key), seconds) + def this(key: String, seconds: Int) = { + this(toBytes(key), seconds) + } } /** @@ -218,7 +233,9 @@ object RedisMessage { * @param timestamp */ case class EXPIREAT(key: Array[Byte], timestamp: Long) { - def this(key: String, timestamp: Long) = this(toBytes(key), timestamp) + def this(key: String, timestamp: Long) = { + this(toBytes(key), timestamp) + } } /** @@ -230,9 +247,11 @@ object RedisMessage { * @param database * @param timeout */ - case class MIGRATE(host: Array[Byte], port: Int, key: Array[Byte], database: Int, timeout: Int) { - def this(host: String, port: Int, key: String, database: Int, timeout: Int) = + case class MIGRATE(host: Array[Byte], port: Int, key: Array[Byte], + database: Int, timeout: Int) { + def this(host: String, port: Int, key: String, database: Int, timeout: Int) = { this(toBytes(host), port, toBytes(key), database, timeout) + } } /** @@ -242,7 +261,9 @@ object RedisMessage { * @param db */ case class MOVE(key: Array[Byte], db: Int) { - def this(key: String, db: Int) = this(toBytes(key), db) + def this(key: String, db: Int) = { + this(toBytes(key), db) + } } /** @@ -251,7 +272,9 @@ object RedisMessage { * @param key */ case class PERSIST(key: Array[Byte]) { - def this(key: String) = this(toBytes(key)) + def this(key: String) = { + this(toBytes(key)) + } } /** @@ -261,7 +284,9 @@ object RedisMessage { * @param milliseconds */ case class PEXPIRE(key: Array[Byte], milliseconds: Long) { - def this(key: String, milliseconds: Long) = this(toBytes(key), milliseconds) + def this(key: String, milliseconds: Long) = { + this(toBytes(key), milliseconds) + } } /** @@ -271,7 +296,9 @@ object RedisMessage { * @param timestamp */ case class PEXPIREAT(key: Array[Byte], timestamp: Long) { - def this(key: String, milliseconds: Long) = this(toBytes(key), milliseconds) + def this(key: String, milliseconds: Long) = { + this(toBytes(key), milliseconds) + } } /** @@ -281,7 +308,9 @@ object RedisMessage { * @param newKey */ case class RENAME(key: Array[Byte], newKey: Array[Byte]) { - def this(key: String, newKey: String) = this(toBytes(key), toBytes(newKey)) + def this(key: String, newKey: String) = { + this(toBytes(key), toBytes(newKey)) + } } /** @@ -291,7 +320,9 @@ object RedisMessage { * @param newKey */ case class RENAMENX(key: Array[Byte], newKey: Array[Byte]) { - def this(key: String, newKey: String) = this(toBytes(key), toBytes(newKey)) + def this(key: String, newKey: String) = { + this(toBytes(key), toBytes(newKey)) + } } } @@ -306,8 +337,9 @@ object RedisMessage { * @param members */ case class SADD(key: Array[Byte], members: Array[Byte]) { - - def this(key: String, members: String) = this(key, toBytes(members)) + def this(key: String, members: String) = { + this(key, toBytes(members)) + } } @@ -319,8 +351,9 @@ object RedisMessage { * @param member */ case class SMOVE(source: Array[Byte], destination: Array[Byte], member: Array[Byte]) { - def this(source: String, destination: String, member: String) = + def this(source: String, destination: String, member: String) = { this(toBytes(source), toBytes(destination), toBytes(member)) + } } @@ -331,8 +364,9 @@ object RedisMessage { * @param member */ case class SREM(key: Array[Byte], member: Array[Byte]) { - - def this(key: String, member: String) = this(key, toBytes(member)) + def this(key: String, member: String) = { + this(key, toBytes(member)) + } } } @@ -346,7 +380,9 @@ object RedisMessage { * @param value */ case class APPEND(key: Array[Byte], value: Array[Byte]) { - def this(key: String, value: String) = this(toBytes(key), toBytes(value)) + def this(key: String, value: String) = { + this(toBytes(key), toBytes(value)) + } } /** @@ -355,7 +391,9 @@ object RedisMessage { * @param key */ case class DECR(key: Array[Byte]) { - def this(key: String) = this(toBytes(key)) + def this(key: String) = { + this(toBytes(key)) + } } /** @@ -365,7 +403,9 @@ object RedisMessage { * @param decrement */ case class DECRBY(key: Array[Byte], decrement: Int) { - def this(key: String, decrement: Int) = this(toBytes(key), decrement) + def this(key: String, decrement: Int) = { + this(toBytes(key), decrement) + } } /** @@ -374,7 +414,9 @@ object RedisMessage { * @param key */ case class INCR(key: Array[Byte]) { - def this(key: String) = this(toBytes(key)) + def this(key: String) = { + this(toBytes(key)) + } } /** @@ -384,7 +426,9 @@ object RedisMessage { * @param increment */ case class INCRBY(key: Array[Byte], increment: Int) { - def this(key: String, increment: Int) = this(toBytes(key), increment) + def this(key: String, increment: Int) = { + this(toBytes(key), increment) + } } /** @@ -394,7 +438,9 @@ object RedisMessage { * @param increment */ case class INCRBYFLOAT(key: Array[Byte], increment: Double) { - def this(key: String, increment: Number) = this(toBytes(key), increment) + def this(key: String, increment: Number) = { + this(toBytes(key), increment) + } } @@ -405,7 +451,9 @@ object RedisMessage { * @param value */ case class SET(key: Array[Byte], value: Array[Byte]) { - def this(key: String, value: String) = this(toBytes(key), toBytes(value)) + def this(key: String, value: String) = { + this(toBytes(key), toBytes(value)) + } } /** @@ -416,7 +464,9 @@ object RedisMessage { * @param value */ case class SETBIT(key: Array[Byte], offset: Long, value: Array[Byte]) { - def this(key: String, offset: Long, value: String) = this(toBytes(key), offset, toBytes(value)) + def this(key: String, offset: Long, value: String) = { + this(toBytes(key), offset, toBytes(value)) + } } /** @@ -427,7 +477,9 @@ object RedisMessage { * @param value */ case class SETEX(key: Array[Byte], seconds: Int, value: Array[Byte]) { - def this(key: String, seconds: Int, value: String) = this(toBytes(key), seconds, toBytes(value)) + def this(key: String, seconds: Int, value: String) = { + this(toBytes(key), seconds, toBytes(value)) + } } /** @@ -437,7 +489,9 @@ object RedisMessage { * @param value */ case class SETNX(key: Array[Byte], value: Array[Byte]) { - def this(key: String, value: String) = this(toBytes(key), toBytes(value)) + def this(key: String, value: String) = { + this(toBytes(key), toBytes(value)) + } } /** @@ -448,9 +502,9 @@ object RedisMessage { * @param value */ case class SETRANGE(key: Array[Byte], offset: Int, value: Array[Byte]) { - def this(key: String, offset: Int, value: String) = this(toBytes(key), offset, toBytes(value)) + def this(key: String, offset: Int, value: String) = { + this(toBytes(key), offset, toBytes(value)) + } } - } - } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala ---------------------------------------------------------------------- diff --git a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala index 3f75949..36a9fe3 100644 --- a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala +++ b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala @@ -32,20 +32,20 @@ import redis.clients.jedis.Jedis import redis.clients.jedis.Protocol.{DEFAULT_DATABASE, DEFAULT_HOST, DEFAULT_PORT, DEFAULT_TIMEOUT} /** - * Save message in Redis Instance - * - * @param host - * @param port - * @param timeout - * @param database - * @param password - */ + * Save message in Redis Instance + * + * @param host + * @param port + * @param timeout + * @param database + * @param password + */ class RedisSink( - host: String = DEFAULT_HOST, - port: Int = DEFAULT_PORT, - timeout: Int = DEFAULT_TIMEOUT, - database: Int = DEFAULT_DATABASE, - password: String = "") extends DataSink { + host: String = DEFAULT_HOST, + port: Int = DEFAULT_PORT, + timeout: Int = DEFAULT_TIMEOUT, + database: Int = DEFAULT_DATABASE, + password: String = "") extends DataSink { private val LOG = LogUtil.getLogger(getClass) @transient private lazy val client = new Jedis(host, port, timeout) @@ -59,7 +59,6 @@ class RedisSink( } override def write(message: Message): Unit = { - message.msg match { // GEO case msg: GEOADD => client.geoadd(msg.key, msg.longitude, msg.latitude, msg.member) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/project/Build.scala ---------------------------------------------------------------------- diff --git a/project/Build.scala b/project/Build.scala index 4552a64..40b5743 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -140,15 +140,6 @@ object Build extends sbt.Build { publishArtifact in Test := false ) - val daemonDependencies = Seq( - libraryDependencies ++= Seq( - "com.typesafe.akka" %% "akka-cluster" % akkaVersion, - "com.typesafe.akka" %% "akka-cluster-tools" % akkaVersion, - "commons-logging" % "commons-logging" % commonsLoggingVersion, - "com.typesafe.akka" %% "akka-distributed-data-experimental" % akkaVersion - ) - ) - val coreDependencies = Seq( libraryDependencies ++= Seq( "org.slf4j" % "slf4j-api" % slf4jVersion, @@ -171,6 +162,10 @@ object Build extends sbt.Build { "com.typesafe.akka" %% "akka-remote" % akkaVersion exclude("io.netty", "netty"), + "com.typesafe.akka" %% "akka-cluster" % akkaVersion, + "com.typesafe.akka" %% "akka-cluster-tools" % akkaVersion, + "commons-logging" % "commons-logging" % commonsLoggingVersion, + "com.typesafe.akka" %% "akka-distributed-data-experimental" % akkaVersion, "com.typesafe.akka" %% "akka-actor" % akkaVersion, "com.typesafe.akka" %% "akka-agent" % akkaVersion, "com.typesafe.akka" %% "akka-slf4j" % akkaVersion, @@ -256,7 +251,7 @@ object Build extends sbt.Build { id = "gearpump", base = file("."), settings = commonSettings ++ noPublish ++ gearpumpUnidocSetting) - .aggregate(shaded, core, daemon, streaming, services, external_kafka, external_monoid, + .aggregate(shaded, core, streaming, services, external_kafka, external_monoid, external_serializer, examples, storm, yarn, external_hbase, gearpumpHadoop, packProject, external_hadoopfs, integration_test).settings(Defaults.itSettings: _*) .disablePlugins(sbtassembly.AssemblyPlugin) @@ -271,20 +266,13 @@ object Build extends sbt.Build { getShadedDepXML(organization.value, shaded_guava.id, version.value), getShadedDepXML(organization.value, shaded_metrics_graphite.id, version.value)), node) } - )) - .disablePlugins(sbtassembly.AssemblyPlugin) + )).disablePlugins(sbtassembly.AssemblyPlugin) - lazy val daemon = Project( - id = "gearpump-daemon", - base = file("daemon"), - settings = commonSettings ++ daemonDependencies) - .dependsOn(core % "test->test; compile->compile", cgroup % "test->test; compile->compile") - .disablePlugins(sbtassembly.AssemblyPlugin) lazy val cgroup = Project( id = "gearpump-experimental-cgroup", base = file("experiments/cgroup"), - settings = commonSettings ++ noPublish ++ daemonDependencies) + settings = commonSettings ++ noPublish) .dependsOn (core % "test->test; compile->compile") .disablePlugins(sbtassembly.AssemblyPlugin) @@ -301,7 +289,7 @@ object Build extends sbt.Build { getShadedDepXML(organization.value, shaded_gs_collections.id, version.value)), node) } )) - .dependsOn(core % "test->test; compile->compile", shaded_gs_collections, daemon % "test->test") + .dependsOn(core % "test->test; compile->compile", shaded_gs_collections) .disablePlugins(sbtassembly.AssemblyPlugin) lazy val external_kafka = Project( @@ -412,19 +400,18 @@ object Build extends sbt.Build { ), mainClass in(Compile, packageBin) := Some("akka.stream.gearpump.example.Test") )) - .dependsOn(streaming % "test->test; provided", daemon % "test->test; provided") + .dependsOn(streaming % "test->test; provided") lazy val redis = Project( id = "gearpump-experiments-redis", base = file("experiments/redis"), - settings = commonSettings ++ noPublish ++ myAssemblySettings ++ + settings = commonSettings ++ noPublish ++ Seq( libraryDependencies ++= Seq( "redis.clients" % "jedis" % "2.9.0" - ), - mainClass in(Compile, packageBin) := Some("org.apache.gearpump.example.Test") - )) - .dependsOn(streaming % "test->test; provided", daemon % "test->test; provided") + ) + ) + ).dependsOn(streaming % "test->test; provided") lazy val storm = Project( id = "gearpump-experiments-storm", @@ -489,7 +476,7 @@ object Build extends sbt.Build { "org.apache.hadoop" % "hadoop-yarn-server-nodemanager" % hadoopVersion % "provided" ) )) - .dependsOn(services % "test->test;compile->compile", daemon % "provided", + .dependsOn(services % "test->test;compile->compile", core % "provided", gearpumpHadoop).disablePlugins(sbtassembly.AssemblyPlugin) lazy val external_hbase = Project( http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/project/BuildExample.scala ---------------------------------------------------------------------- diff --git a/project/BuildExample.scala b/project/BuildExample.scala index 75fc9be..fadc1ec 100644 --- a/project/BuildExample.scala +++ b/project/BuildExample.scala @@ -42,7 +42,7 @@ object BuildExample extends sbt.Build { target in assembly := baseDirectory.value.getParentFile.getParentFile / "target" / CrossVersion.binaryScalaVersion(scalaVersion.value) ) - ) dependsOn(streaming % "test->test; provided", daemon % "test->test; provided") + ) dependsOn(streaming % "test->test; provided") lazy val wordcount = Project( id = "gearpump-examples-wordcount", @@ -55,7 +55,7 @@ object BuildExample extends sbt.Build { target in assembly := baseDirectory.value.getParentFile.getParentFile / "target" / CrossVersion.binaryScalaVersion(scalaVersion.value) ) - ) dependsOn(streaming % "test->test; provided", daemon % "test->test; provided") + ) dependsOn(streaming % "test->test; provided") lazy val sol = Project( id = "gearpump-examples-sol", @@ -113,7 +113,7 @@ object BuildExample extends sbt.Build { target in assembly := baseDirectory.value.getParentFile / "target" / CrossVersion.binaryScalaVersion(scalaVersion.value) ) - ) dependsOn (daemon % "test->test; provided") + ) dependsOn (core % "test->test; provided") lazy val distributeservice = Project( id = "gearpump-examples-distributeservice", @@ -133,7 +133,7 @@ object BuildExample extends sbt.Build { target in assembly := baseDirectory.value.getParentFile / "target" / CrossVersion.binaryScalaVersion(scalaVersion.value) ) - ) dependsOn (daemon % "test->test; provided") + ) dependsOn (core % "test->test; provided") lazy val fsio = Project( id = "gearpump-examples-fsio", http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/project/Pack.scala ---------------------------------------------------------------------- diff --git a/project/Pack.scala b/project/Pack.scala index 13e53de..54b1d43 100644 --- a/project/Pack.scala +++ b/project/Pack.scala @@ -24,7 +24,6 @@ import xerial.sbt.Pack._ object Pack extends sbt.Build { val daemonClassPath = Seq( "${PROG_HOME}/conf", - "${PROG_HOME}/lib/daemon/*", // This is for DFSJarStore "${PROG_HOME}/lib/yarn/*" ) @@ -37,14 +36,12 @@ object Pack extends sbt.Build { val serviceClassPath = Seq( "${PROG_HOME}/conf", - "${PROG_HOME}/lib/daemon/*", "${PROG_HOME}/lib/services/*", "${PROG_HOME}/dashboard" ) val yarnClassPath = Seq( "${PROG_HOME}/conf", - "${PROG_HOME}/lib/daemon/*", "${PROG_HOME}/lib/services/*", "${PROG_HOME}/lib/yarn/*", "${PROG_HOME}/conf/yarnconf", @@ -112,11 +109,10 @@ object Pack extends sbt.Build { "-Dgearpump.home=${PROG_HOME}") ), packLibDir := Map( - "lib" -> new ProjectsToPack(core.id, streaming.id), - "lib/daemon" -> new ProjectsToPack(daemon.id, cgroup.id).exclude(core.id, streaming.id), + "lib" -> new ProjectsToPack(core.id, cgroup.id, streaming.id), "lib/yarn" -> new ProjectsToPack(gearpumpHadoop.id, yarn.id). - exclude(services.id, daemon.id, core.id), - "lib/services" -> new ProjectsToPack(services.id).exclude(daemon.id), + exclude(services.id, core.id), + "lib/services" -> new ProjectsToPack(services.id).exclude(core.id), "lib/storm" -> new ProjectsToPack(storm.id).exclude(streaming.id) ), packExclude := Seq(thisProjectRef.value.project), @@ -139,7 +135,7 @@ object Pack extends sbt.Build { "gear" -> applicationClassPath, "local" -> daemonClassPath, "master" -> daemonClassPath, - "worker" -> daemonClassPath, + "worker" -> applicationClassPath, "services" -> serviceClassPath, "yarnclient" -> yarnClassPath, "storm" -> stormClassPath @@ -149,6 +145,6 @@ object Pack extends sbt.Build { packArchiveExcludes := Seq("integrationtest") ) - ).dependsOn(core, streaming, services, yarn, storm). + ).dependsOn(core, streaming, services, yarn, storm, cgroup). disablePlugins(sbtassembly.AssemblyPlugin) }
