Repository: incubator-gearpump
Updated Branches:
  refs/heads/master e7677f6d9 -> 6f153e806


fix GEARPUMP-61, add more tests

Author: manuzhang <[email protected]>

Closes #45 from manuzhang/it.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/6f153e80
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/6f153e80
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/6f153e80

Branch: refs/heads/master
Commit: 6f153e806319ad61fb2ede0dbbb61e0d9c9b99d3
Parents: e7677f6
Author: manuzhang <[email protected]>
Authored: Thu Jun 23 15:50:08 2016 +0800
Committer: manuzhang <[email protected]>
Committed: Thu Jun 23 15:50:08 2016 +0800

----------------------------------------------------------------------
 .../gearpump/partitioner/PartitionerSpec.scala  |  26 ++++
 .../services/SupervisorServiceSpec.scala        | 150 +++++++++++++++++++
 .../gearpump/streaming/sink/DataSinkTask.scala  |   8 +-
 .../streaming/source/DataSourceConfig.scala     |   2 -
 .../streaming/source/DataSourceTask.scala       |   8 +-
 .../streaming/sink/DataSinkTaskSpec.scala       |  66 ++++++++
 .../streaming/source/DataSourceTaskSpec.scala   |  76 ++++++++++
 7 files changed, 328 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/6f153e80/core/src/test/scala/org/apache/gearpump/partitioner/PartitionerSpec.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/gearpump/partitioner/PartitionerSpec.scala 
b/core/src/test/scala/org/apache/gearpump/partitioner/PartitionerSpec.scala
index 14be887..fcf819b 100644
--- a/core/src/test/scala/org/apache/gearpump/partitioner/PartitionerSpec.scala
+++ b/core/src/test/scala/org/apache/gearpump/partitioner/PartitionerSpec.scala
@@ -52,4 +52,30 @@ class PartitionerSpec extends FlatSpec with Matchers {
     assert(partition != partitioner.getPartition(msg, NUM), "multiple run 
should return" +
       "consistent result")
   }
+
+  "BroadcastPartitioner" should "return all partitions" in {
+    val partitioner = new BroadcastPartitioner
+
+    val data = new Array[Byte](1000)
+    (new java.util.Random()).nextBytes(data)
+    val msg = Message(data)
+    val partitions = partitioner.getPartitions(msg, NUM)
+
+    partitions should contain theSameElementsAs 0.until(NUM)
+  }
+
+
+  "ShuffleGroupingPartitioner" should "hash same key randomly" in {
+    val partitioner = new ShuffleGroupingPartitioner
+
+    val data = new Array[Byte](1000)
+    (new java.util.Random()).nextBytes(data)
+    val msg = Message(data)
+
+    val partition = partitioner.getPartition(msg, NUM)
+    assert(partition >= 0 && partition < NUM, "Partition Id should be >= 0")
+
+    assert(partition != partitioner.getPartition(msg, NUM), "multiple run 
should return" +
+      "consistent result")
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/6f153e80/services/jvm/src/test/scala/org/apache/gearpump/services/SupervisorServiceSpec.scala
----------------------------------------------------------------------
diff --git 
a/services/jvm/src/test/scala/org/apache/gearpump/services/SupervisorServiceSpec.scala
 
b/services/jvm/src/test/scala/org/apache/gearpump/services/SupervisorServiceSpec.scala
new file mode 100644
index 0000000..030d4bb
--- /dev/null
+++ 
b/services/jvm/src/test/scala/org/apache/gearpump/services/SupervisorServiceSpec.scala
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.services
+
+import akka.actor.ActorRef
+import akka.http.scaladsl.model.StatusCodes
+import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
+import akka.testkit.TestActor.{KeepRunning, AutoPilot}
+import akka.testkit.{TestKit, TestProbe}
+import com.typesafe.config.{ConfigFactory, Config}
+import org.apache.gearpump.cluster.AppMasterToMaster.{WorkerData, 
GetWorkerData}
+import org.apache.gearpump.cluster.ClientToMaster._
+import org.apache.gearpump.cluster.MasterToClient.ResolveWorkerIdResult
+import org.apache.gearpump.cluster.TestUtil
+import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary}
+import org.scalatest.{BeforeAndAfterAll, Matchers, FlatSpec}
+
+import scala.concurrent.duration._
+import scala.util.Success
+
+class SupervisorServiceSpec
+  extends FlatSpec with ScalatestRouteTest with Matchers with 
BeforeAndAfterAll {
+
+  override def testConfig: Config = TestUtil.DEFAULT_CONFIG
+
+  protected def actorRefFactory = system
+
+  private val mockSupervisor = TestProbe()
+
+  private val supervisor = mockSupervisor.ref
+
+  private val mockMaster = TestProbe()
+
+  protected def master = mockMaster.ref
+
+  private val mockWorker = TestProbe()
+
+  protected def supervisorRoute = new SupervisorService(master, supervisor, 
system).route
+
+  protected def nullRoute = new SupervisorService(master, null, system).route
+
+  mockSupervisor.setAutoPilot {
+    new AutoPilot {
+      def run(sender: ActorRef, msg: Any): AutoPilot = msg match {
+        case AddWorker(workerCount) =>
+          sender ! CommandResult(success = true)
+          KeepRunning
+        case RemoveWorker(workerId) =>
+          sender ! CommandResult(success = true)
+          KeepRunning
+      }
+    }
+  }
+
+  mockWorker.setAutoPilot {
+    new AutoPilot {
+      def run(sender: ActorRef, msg: Any): AutoPilot = msg match {
+        case GetWorkerData(workerId) =>
+          sender ! WorkerData(WorkerSummary.empty)
+          KeepRunning
+      }
+    }
+  }
+
+  mockMaster.setAutoPilot {
+    new AutoPilot {
+      def run(sender: ActorRef, msg: Any): AutoPilot = msg match {
+        case ResolveWorkerId(workerId) =>
+          sender ! ResolveWorkerIdResult(Success(mockWorker.ref))
+          KeepRunning
+      }
+    }
+  }
+
+  "SupervisorService" should "get supervisor path" in {
+    implicit val customTimeout = RouteTestTimeout(15.seconds)
+    (Get(s"/api/$REST_VERSION/supervisor") ~> supervisorRoute) ~> check {
+      val responseBody = responseAs[String]
+      ConfigFactory.parseString(responseBody).getString("path") shouldBe 
supervisor.path.toString
+    }
+
+    (Get(s"/api/$REST_VERSION/supervisor") ~> nullRoute) ~> check {
+      val responseBody = responseAs[String]
+      ConfigFactory.parseString(responseBody).getIsNull("path") shouldBe true
+    }
+  }
+
+  "SupervisorService" should "write status" in {
+    implicit val customTimeout = RouteTestTimeout(15.seconds)
+    (Post(s"/api/$REST_VERSION/supervisor/status")
+      ~> supervisorRoute) ~> check {
+      val responseBody = responseAs[String]
+      ConfigFactory.parseString(responseBody).getBoolean("enabled") shouldBe 
true
+    }
+
+    (Post(s"/api/$REST_VERSION/supervisor/status") ~> nullRoute) ~> check {
+      val responseBody = responseAs[String]
+      ConfigFactory.parseString(responseBody).getBoolean("enabled") shouldBe 
false
+    }
+  }
+
+  "SupervisorService" should "add worker" in {
+    implicit val customTimeout = RouteTestTimeout(15.seconds)
+    (Post(s"/api/$REST_VERSION/supervisor/addworker/1")
+      ~> supervisorRoute) ~> check {
+      val responseBody = responseAs[String]
+      ConfigFactory.parseString(responseBody).getBoolean("success") shouldBe 
true
+    }
+
+    (Post(s"/api/$REST_VERSION/supervisor/addworker/1")
+      ~> nullRoute) ~> check {
+      status shouldBe StatusCodes.InternalServerError
+    }
+  }
+
+  "SupervisorService" should "remove worker" in {
+    implicit val customTimeout = RouteTestTimeout(15.seconds)
+    
(Post(s"/api/$REST_VERSION/supervisor/removeworker/${WorkerId.render(WorkerId(1,
 0L))}")
+      ~> supervisorRoute) ~> check {
+      val responseBody = responseAs[String]
+      ConfigFactory.parseString(responseBody).getBoolean("success") shouldBe 
true
+    }
+
+
+    
(Post(s"/api/$REST_VERSION/supervisor/removeworker/${WorkerId.render(WorkerId(1,
 0L))}")
+      ~> nullRoute) ~> check {
+      status shouldBe StatusCodes.InternalServerError
+    }
+  }
+
+  override def afterAll(): Unit = {
+    TestKit.shutdownActorSystem(system)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/6f153e80/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala
index eb6118d..f8bc0ab 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala
@@ -29,10 +29,12 @@ object DataSinkTask {
 /**
  * General task that runs any [[DataSink]]
  */
-class DataSinkTask(context: TaskContext, conf: UserConfig) extends 
Task(context, conf) {
-  import org.apache.gearpump.streaming.sink.DataSinkTask._
+class DataSinkTask private[sink](context: TaskContext, conf: UserConfig, sink: 
DataSink)
+  extends Task(context, conf) {
 
-  private val sink = conf.getValue[DataSink](DATA_SINK).get
+  def this(context: TaskContext, conf: UserConfig) = {
+    this(context, conf, 
conf.getValue[DataSink](DataSinkTask.DATA_SINK)(context.system).get)
+  }
 
   override def onStart(startTime: StartTime): Unit = {
     LOG.info("opening data sink...")

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/6f153e80/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceConfig.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceConfig.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceConfig.scala
index 4a76958..080e095 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceConfig.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceConfig.scala
@@ -19,7 +19,5 @@
 package org.apache.gearpump.streaming.source
 
 object DataSourceConfig {
-
   val SOURCE_READ_BATCH_SIZE = "gearpump.source.read.batch.size"
-  val SOURCE_TIMESTAMP_FILTER = "gearpump.source.timestamp.filter.class"
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/6f153e80/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
index 6777721..f845628 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
@@ -38,10 +38,12 @@ object DataSourceTask {
  *  - `DataSource.read()` in each `onNext`, which reads a batch of messages
  *  - `DataSource.close()` in `onStop`
  */
-class DataSourceTask(context: TaskContext, conf: UserConfig) extends 
Task(context, conf) {
-  import org.apache.gearpump.streaming.source.DataSourceTask._
+class DataSourceTask private[source](context: TaskContext, conf: UserConfig, 
source: DataSource)
+  extends Task(context, conf) {
 
-  private val source = conf.getValue[DataSource](DATA_SOURCE).get
+  def this(context: TaskContext, conf: UserConfig) = {
+    this(context, conf, 
conf.getValue[DataSource](DataSourceTask.DATA_SOURCE)(context.system).get)
+  }
   private val batchSize = 
conf.getInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE).getOrElse(1000)
   private var startTime = 0L
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/6f153e80/streaming/src/test/scala/org/apache/gearpump/streaming/sink/DataSinkTaskSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/sink/DataSinkTaskSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/sink/DataSinkTaskSpec.scala
new file mode 100644
index 0000000..7a2c2d1
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/sink/DataSinkTaskSpec.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.sink
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.task.StartTime
+import org.mockito.Mockito._
+import org.scalacheck.Gen
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{PropSpec, Matchers}
+
+class DataSinkTaskSpec extends PropSpec with PropertyChecks with Matchers with 
MockitoSugar {
+
+  property("DataSinkTask.onStart should call DataSink.open" ) {
+    forAll(Gen.chooseNum[Long](0L, 1000L)) { (startTime: Long) =>
+      val taskContext = MockUtil.mockTaskContext
+      val config = UserConfig.empty
+      val dataSink = mock[DataSink]
+      val sinkTask = new DataSinkTask(taskContext, config, dataSink)
+      sinkTask.onStart(StartTime(startTime))
+      verify(dataSink).open(taskContext)
+    }
+  }
+
+  property("DataSinkTask.onNext should call DataSink.write") {
+    forAll(Gen.alphaStr) { (str: String) =>
+      val taskContext = MockUtil.mockTaskContext
+      val config = UserConfig.empty
+      val dataSink = mock[DataSink]
+      val sinkTask = new DataSinkTask(taskContext, config, dataSink)
+      val msg = Message(str)
+      sinkTask.onNext(msg)
+      verify(dataSink).write(msg)
+    }
+  }
+
+
+  property("DataSinkTask.onStop should call DataSink.close") {
+    val taskContext = MockUtil.mockTaskContext
+    val config = UserConfig.empty
+    val dataSink = mock[DataSink]
+    val sinkTask = new DataSinkTask(taskContext, config, dataSink)
+    sinkTask.onStop()
+    verify(dataSink).close()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/6f153e80/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
new file mode 100644
index 0000000..d4d580f
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.source
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.task.{TaskContext, StartTime}
+import org.mockito.Mockito._
+import org.scalacheck.Gen
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{Matchers, PropSpec}
+import org.scalatest.prop.PropertyChecks
+
+class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers 
with MockitoSugar {
+
+  property("DataSourceTask.onStart should call DataSource.open") {
+    forAll(Gen.chooseNum[Long](0L, 1000L)) { (startTime: Long) =>
+      val taskContext = MockUtil.mockTaskContext
+      implicit val system = MockUtil.system
+      val dataSource = mock[DataSource]
+      val config = UserConfig.empty
+        .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
+
+      val sourceTask = new DataSourceTask(taskContext, config, dataSource)
+
+      sourceTask.onStart(StartTime(startTime))
+      verify(dataSource).open(taskContext, startTime)
+    }
+  }
+
+  property("DataSourceTask.onNext should call DataSource.read") {
+    forAll(Gen.alphaStr) { (str: String) =>
+      val taskContext = MockUtil.mockTaskContext
+      implicit val system = MockUtil.system
+      val dataSource = mock[DataSource]
+      val config = UserConfig.empty
+        .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
+
+      val sourceTask = new DataSourceTask(taskContext, config, dataSource)
+      val msg = Message(str)
+      when(dataSource.read()).thenReturn(msg)
+
+      sourceTask.onNext(Message("next"))
+      verify(taskContext).output(msg)
+    }
+  }
+
+  property("DataSourceTask.onStop should call DataSource.close") {
+    val taskContext = MockUtil.mockTaskContext
+    implicit val system = MockUtil.system
+    val dataSource = mock[DataSource]
+    val config = UserConfig.empty
+      .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
+    val sourceTask = new DataSourceTask(taskContext, config, dataSource)
+
+    sourceTask.onStop()
+    verify(dataSource).close()
+  }
+}

Reply via email to