Repository: incubator-gearpump Updated Branches: refs/heads/master e7677f6d9 -> 6f153e806
fix GEARPUMP-61, add more tests Author: manuzhang <[email protected]> Closes #45 from manuzhang/it. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/6f153e80 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/6f153e80 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/6f153e80 Branch: refs/heads/master Commit: 6f153e806319ad61fb2ede0dbbb61e0d9c9b99d3 Parents: e7677f6 Author: manuzhang <[email protected]> Authored: Thu Jun 23 15:50:08 2016 +0800 Committer: manuzhang <[email protected]> Committed: Thu Jun 23 15:50:08 2016 +0800 ---------------------------------------------------------------------- .../gearpump/partitioner/PartitionerSpec.scala | 26 ++++ .../services/SupervisorServiceSpec.scala | 150 +++++++++++++++++++ .../gearpump/streaming/sink/DataSinkTask.scala | 8 +- .../streaming/source/DataSourceConfig.scala | 2 - .../streaming/source/DataSourceTask.scala | 8 +- .../streaming/sink/DataSinkTaskSpec.scala | 66 ++++++++ .../streaming/source/DataSourceTaskSpec.scala | 76 ++++++++++ 7 files changed, 328 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/6f153e80/core/src/test/scala/org/apache/gearpump/partitioner/PartitionerSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/partitioner/PartitionerSpec.scala b/core/src/test/scala/org/apache/gearpump/partitioner/PartitionerSpec.scala index 14be887..fcf819b 100644 --- a/core/src/test/scala/org/apache/gearpump/partitioner/PartitionerSpec.scala +++ b/core/src/test/scala/org/apache/gearpump/partitioner/PartitionerSpec.scala @@ -52,4 +52,30 @@ class PartitionerSpec extends FlatSpec with Matchers { assert(partition != partitioner.getPartition(msg, NUM), "multiple run should return" + "consistent result") } + + "BroadcastPartitioner" should "return all partitions" in { + val partitioner = new BroadcastPartitioner + + val data = new Array[Byte](1000) + (new java.util.Random()).nextBytes(data) + val msg = Message(data) + val partitions = partitioner.getPartitions(msg, NUM) + + partitions should contain theSameElementsAs 0.until(NUM) + } + + + "ShuffleGroupingPartitioner" should "hash same key randomly" in { + val partitioner = new ShuffleGroupingPartitioner + + val data = new Array[Byte](1000) + (new java.util.Random()).nextBytes(data) + val msg = Message(data) + + val partition = partitioner.getPartition(msg, NUM) + assert(partition >= 0 && partition < NUM, "Partition Id should be >= 0") + + assert(partition != partitioner.getPartition(msg, NUM), "multiple run should return" + + "consistent result") + } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/6f153e80/services/jvm/src/test/scala/org/apache/gearpump/services/SupervisorServiceSpec.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/test/scala/org/apache/gearpump/services/SupervisorServiceSpec.scala b/services/jvm/src/test/scala/org/apache/gearpump/services/SupervisorServiceSpec.scala new file mode 100644 index 0000000..030d4bb --- /dev/null +++ b/services/jvm/src/test/scala/org/apache/gearpump/services/SupervisorServiceSpec.scala @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.services + +import akka.actor.ActorRef +import akka.http.scaladsl.model.StatusCodes +import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest} +import akka.testkit.TestActor.{KeepRunning, AutoPilot} +import akka.testkit.{TestKit, TestProbe} +import com.typesafe.config.{ConfigFactory, Config} +import org.apache.gearpump.cluster.AppMasterToMaster.{WorkerData, GetWorkerData} +import org.apache.gearpump.cluster.ClientToMaster._ +import org.apache.gearpump.cluster.MasterToClient.ResolveWorkerIdResult +import org.apache.gearpump.cluster.TestUtil +import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary} +import org.scalatest.{BeforeAndAfterAll, Matchers, FlatSpec} + +import scala.concurrent.duration._ +import scala.util.Success + +class SupervisorServiceSpec + extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll { + + override def testConfig: Config = TestUtil.DEFAULT_CONFIG + + protected def actorRefFactory = system + + private val mockSupervisor = TestProbe() + + private val supervisor = mockSupervisor.ref + + private val mockMaster = TestProbe() + + protected def master = mockMaster.ref + + private val mockWorker = TestProbe() + + protected def supervisorRoute = new SupervisorService(master, supervisor, system).route + + protected def nullRoute = new SupervisorService(master, null, system).route + + mockSupervisor.setAutoPilot { + new AutoPilot { + def run(sender: ActorRef, msg: Any): AutoPilot = msg match { + case AddWorker(workerCount) => + sender ! CommandResult(success = true) + KeepRunning + case RemoveWorker(workerId) => + sender ! CommandResult(success = true) + KeepRunning + } + } + } + + mockWorker.setAutoPilot { + new AutoPilot { + def run(sender: ActorRef, msg: Any): AutoPilot = msg match { + case GetWorkerData(workerId) => + sender ! WorkerData(WorkerSummary.empty) + KeepRunning + } + } + } + + mockMaster.setAutoPilot { + new AutoPilot { + def run(sender: ActorRef, msg: Any): AutoPilot = msg match { + case ResolveWorkerId(workerId) => + sender ! ResolveWorkerIdResult(Success(mockWorker.ref)) + KeepRunning + } + } + } + + "SupervisorService" should "get supervisor path" in { + implicit val customTimeout = RouteTestTimeout(15.seconds) + (Get(s"/api/$REST_VERSION/supervisor") ~> supervisorRoute) ~> check { + val responseBody = responseAs[String] + ConfigFactory.parseString(responseBody).getString("path") shouldBe supervisor.path.toString + } + + (Get(s"/api/$REST_VERSION/supervisor") ~> nullRoute) ~> check { + val responseBody = responseAs[String] + ConfigFactory.parseString(responseBody).getIsNull("path") shouldBe true + } + } + + "SupervisorService" should "write status" in { + implicit val customTimeout = RouteTestTimeout(15.seconds) + (Post(s"/api/$REST_VERSION/supervisor/status") + ~> supervisorRoute) ~> check { + val responseBody = responseAs[String] + ConfigFactory.parseString(responseBody).getBoolean("enabled") shouldBe true + } + + (Post(s"/api/$REST_VERSION/supervisor/status") ~> nullRoute) ~> check { + val responseBody = responseAs[String] + ConfigFactory.parseString(responseBody).getBoolean("enabled") shouldBe false + } + } + + "SupervisorService" should "add worker" in { + implicit val customTimeout = RouteTestTimeout(15.seconds) + (Post(s"/api/$REST_VERSION/supervisor/addworker/1") + ~> supervisorRoute) ~> check { + val responseBody = responseAs[String] + ConfigFactory.parseString(responseBody).getBoolean("success") shouldBe true + } + + (Post(s"/api/$REST_VERSION/supervisor/addworker/1") + ~> nullRoute) ~> check { + status shouldBe StatusCodes.InternalServerError + } + } + + "SupervisorService" should "remove worker" in { + implicit val customTimeout = RouteTestTimeout(15.seconds) + (Post(s"/api/$REST_VERSION/supervisor/removeworker/${WorkerId.render(WorkerId(1, 0L))}") + ~> supervisorRoute) ~> check { + val responseBody = responseAs[String] + ConfigFactory.parseString(responseBody).getBoolean("success") shouldBe true + } + + + (Post(s"/api/$REST_VERSION/supervisor/removeworker/${WorkerId.render(WorkerId(1, 0L))}") + ~> nullRoute) ~> check { + status shouldBe StatusCodes.InternalServerError + } + } + + override def afterAll(): Unit = { + TestKit.shutdownActorSystem(system) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/6f153e80/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala index eb6118d..f8bc0ab 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala @@ -29,10 +29,12 @@ object DataSinkTask { /** * General task that runs any [[DataSink]] */ -class DataSinkTask(context: TaskContext, conf: UserConfig) extends Task(context, conf) { - import org.apache.gearpump.streaming.sink.DataSinkTask._ +class DataSinkTask private[sink](context: TaskContext, conf: UserConfig, sink: DataSink) + extends Task(context, conf) { - private val sink = conf.getValue[DataSink](DATA_SINK).get + def this(context: TaskContext, conf: UserConfig) = { + this(context, conf, conf.getValue[DataSink](DataSinkTask.DATA_SINK)(context.system).get) + } override def onStart(startTime: StartTime): Unit = { LOG.info("opening data sink...") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/6f153e80/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceConfig.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceConfig.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceConfig.scala index 4a76958..080e095 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceConfig.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceConfig.scala @@ -19,7 +19,5 @@ package org.apache.gearpump.streaming.source object DataSourceConfig { - val SOURCE_READ_BATCH_SIZE = "gearpump.source.read.batch.size" - val SOURCE_TIMESTAMP_FILTER = "gearpump.source.timestamp.filter.class" } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/6f153e80/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala index 6777721..f845628 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala @@ -38,10 +38,12 @@ object DataSourceTask { * - `DataSource.read()` in each `onNext`, which reads a batch of messages * - `DataSource.close()` in `onStop` */ -class DataSourceTask(context: TaskContext, conf: UserConfig) extends Task(context, conf) { - import org.apache.gearpump.streaming.source.DataSourceTask._ +class DataSourceTask private[source](context: TaskContext, conf: UserConfig, source: DataSource) + extends Task(context, conf) { - private val source = conf.getValue[DataSource](DATA_SOURCE).get + def this(context: TaskContext, conf: UserConfig) = { + this(context, conf, conf.getValue[DataSource](DataSourceTask.DATA_SOURCE)(context.system).get) + } private val batchSize = conf.getInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE).getOrElse(1000) private var startTime = 0L http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/6f153e80/streaming/src/test/scala/org/apache/gearpump/streaming/sink/DataSinkTaskSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/sink/DataSinkTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/sink/DataSinkTaskSpec.scala new file mode 100644 index 0000000..7a2c2d1 --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/sink/DataSinkTaskSpec.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.sink + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.MockUtil +import org.apache.gearpump.streaming.task.StartTime +import org.mockito.Mockito._ +import org.scalacheck.Gen +import org.scalatest.mock.MockitoSugar +import org.scalatest.prop.PropertyChecks +import org.scalatest.{PropSpec, Matchers} + +class DataSinkTaskSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { + + property("DataSinkTask.onStart should call DataSink.open" ) { + forAll(Gen.chooseNum[Long](0L, 1000L)) { (startTime: Long) => + val taskContext = MockUtil.mockTaskContext + val config = UserConfig.empty + val dataSink = mock[DataSink] + val sinkTask = new DataSinkTask(taskContext, config, dataSink) + sinkTask.onStart(StartTime(startTime)) + verify(dataSink).open(taskContext) + } + } + + property("DataSinkTask.onNext should call DataSink.write") { + forAll(Gen.alphaStr) { (str: String) => + val taskContext = MockUtil.mockTaskContext + val config = UserConfig.empty + val dataSink = mock[DataSink] + val sinkTask = new DataSinkTask(taskContext, config, dataSink) + val msg = Message(str) + sinkTask.onNext(msg) + verify(dataSink).write(msg) + } + } + + + property("DataSinkTask.onStop should call DataSink.close") { + val taskContext = MockUtil.mockTaskContext + val config = UserConfig.empty + val dataSink = mock[DataSink] + val sinkTask = new DataSinkTask(taskContext, config, dataSink) + sinkTask.onStop() + verify(dataSink).close() + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/6f153e80/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala new file mode 100644 index 0000000..d4d580f --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.source + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.MockUtil +import org.apache.gearpump.streaming.task.{TaskContext, StartTime} +import org.mockito.Mockito._ +import org.scalacheck.Gen +import org.scalatest.mock.MockitoSugar +import org.scalatest.{Matchers, PropSpec} +import org.scalatest.prop.PropertyChecks + +class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { + + property("DataSourceTask.onStart should call DataSource.open") { + forAll(Gen.chooseNum[Long](0L, 1000L)) { (startTime: Long) => + val taskContext = MockUtil.mockTaskContext + implicit val system = MockUtil.system + val dataSource = mock[DataSource] + val config = UserConfig.empty + .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1) + + val sourceTask = new DataSourceTask(taskContext, config, dataSource) + + sourceTask.onStart(StartTime(startTime)) + verify(dataSource).open(taskContext, startTime) + } + } + + property("DataSourceTask.onNext should call DataSource.read") { + forAll(Gen.alphaStr) { (str: String) => + val taskContext = MockUtil.mockTaskContext + implicit val system = MockUtil.system + val dataSource = mock[DataSource] + val config = UserConfig.empty + .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1) + + val sourceTask = new DataSourceTask(taskContext, config, dataSource) + val msg = Message(str) + when(dataSource.read()).thenReturn(msg) + + sourceTask.onNext(Message("next")) + verify(taskContext).output(msg) + } + } + + property("DataSourceTask.onStop should call DataSource.close") { + val taskContext = MockUtil.mockTaskContext + implicit val system = MockUtil.system + val dataSource = mock[DataSource] + val config = UserConfig.empty + .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1) + val sourceTask = new DataSourceTask(taskContext, config, dataSource) + + sourceTask.onStop() + verify(dataSource).close() + } +}
