http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamSpec.scala index c25acb1..c8a44f6 100644 --- a/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamSpec.scala +++ b/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,11 +18,18 @@ package io.gearpump.streaming.dsl +import scala.concurrent.Await +import scala.concurrent.duration.Duration +import scala.util.{Either, Left, Right} + import akka.actor._ -import io.gearpump.streaming.task.{StartTime, TaskContext} +import org.mockito.Mockito.when +import org.scalatest._ +import org.scalatest.mock.MockitoSugar + import io.gearpump.Message -import io.gearpump.cluster.{TestUtil, UserConfig} import io.gearpump.cluster.client.ClientContext +import io.gearpump.cluster.{TestUtil, UserConfig} import io.gearpump.partitioner.{CoLocationPartitioner, HashPartitioner} import io.gearpump.streaming.dsl.StreamSpec.Join import io.gearpump.streaming.dsl.partitioner.GroupByPartitioner @@ -30,23 +37,18 @@ import io.gearpump.streaming.dsl.plan.OpTranslator._ import io.gearpump.streaming.task.{StartTime, Task, TaskContext} import io.gearpump.util.Graph import io.gearpump.util.Graph._ -import org.mockito.Mockito.when -import org.scalatest._ -import org.scalatest.mock.MockitoSugar -import scala.util.{Either, Left, Right} - -class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar { +class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar { implicit var system: ActorSystem = null - override def beforeAll: Unit = { - system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) + override def beforeAll(): Unit = { + system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) } override def afterAll(): Unit = { - system.shutdown() - system.awaitTermination() + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) } it should "translate the DSL to a DAG" in { @@ -55,7 +57,7 @@ class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Mo val app = StreamApp("dsl", context) - val data = + val data = """ five four three two one five four three two @@ -75,7 +77,9 @@ class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Mo val appDescription = app.plan - val dagTopology = appDescription.dag.mapVertex(_.taskClass).mapEdge((node1, edge, node2) => edge.partitionerFactory.partitioner.getClass.getName) + val dagTopology = appDescription.dag.mapVertex(_.taskClass).mapEdge { (node1, edge, node2) => + edge.partitionerFactory.partitioner.getClass.getName + } val expectedDagTopology = getExpectedDagTopology assert(dagTopology.vertices.toSet.equals(expectedDagTopology.vertices.toSet)) @@ -102,14 +106,14 @@ class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Mo object StreamSpec { - class Join(taskContext : TaskContext, userConf : UserConfig) extends Task(taskContext, userConf) { + class Join(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) { var query: String = null override def onStart(startTime: StartTime): Unit = {} override def onNext(msg: Message): Unit = { msg.msg match { - case Left(wordCount: (String, Int)) => + case Left(wordCount: (String @unchecked, Int @unchecked)) => if (query != null && wordCount._1 == query) { taskContext.output(new Message(wordCount)) }
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala index fa2c8cf..03dd242 100644 --- a/streaming/src/test/scala/io/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala +++ b/streaming/src/test/scala/io/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,11 +18,12 @@ package io.gearpump.streaming.dsl.partitioner +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} + import io.gearpump.Message import io.gearpump.streaming.dsl.partitioner.GroupByPartitionerSpec.People -import org.scalatest.{BeforeAndAfterAll, Matchers, FlatSpec} -class GroupByPartitionerSpec extends FlatSpec with Matchers with BeforeAndAfterAll { +class GroupByPartitionerSpec extends FlatSpec with Matchers with BeforeAndAfterAll { it should "use the outpout of groupBy function to do partition" in { val mark = People("Mark", "male") val tom = People("Tom", "male") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala index 13af9c2..62a0f95 100644 --- a/streaming/src/test/scala/io/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala +++ b/streaming/src/test/scala/io/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,7 +18,15 @@ package io.gearpump.streaming.dsl.plan +import scala.concurrent.Await +import scala.concurrent.duration.Duration + import akka.actor.ActorSystem +import org.mockito.ArgumentCaptor +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.scalatest._ + import io.gearpump.Message import io.gearpump.cluster.{TestUtil, UserConfig} import io.gearpump.streaming.Constants._ @@ -26,29 +34,25 @@ import io.gearpump.streaming.MockUtil import io.gearpump.streaming.dsl.CollectionDataSource import io.gearpump.streaming.dsl.plan.OpTranslator._ import io.gearpump.streaming.task.StartTime -import org.mockito.ArgumentCaptor -import org.mockito.Matchers._ -import org.mockito.Mockito._ -import org.scalatest._ -class OpTranslatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll { +class OpTranslatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll { "andThen" should "chain multiple single input function" in { val dummy = new DummyInputFunction[String] val split = new FlatMapFunction[String, String](line => line.split("\\s"), "split") - val filter = new FlatMapFunction[String, String](word => if (word.isEmpty) None else Some(word), "filter") + val filter = new FlatMapFunction[String, String](word => + if (word.isEmpty) None else Some(word), "filter") val map = new FlatMapFunction[String, Int](word => Some(1), "map") - val sum = new ReduceFunction[Int]({ (left, right) => left + right}, "sum") + val sum = new ReduceFunction[Int]({ (left, right) => left + right }, "sum") val all = dummy.andThen(split).andThen(filter).andThen(map).andThen(sum) - assert(all.description == "split.filter.map.sum") - val data = + val data = """ five four three two one five four three two @@ -67,22 +71,25 @@ class OpTranslatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll { val conf = UserConfig.empty val data = "one two three".split("\\s") - //source with no transformer - val source = new SourceTask[String, String](new CollectionDataSource[String](data), None, taskContext, conf) + // Source with no transformer + val source = new SourceTask[String, String](new CollectionDataSource[String](data), None, + taskContext, conf) source.onStart(StartTime(0)) source.onNext(Message("next")) verify(taskContext, times(1)).output(anyObject()) - //source with transformer + // Source with transformer val anotherTaskContext = MockUtil.mockTaskContext val double = new FlatMapFunction[String, String](word => List(word, word), "double") - val another = new SourceTask(new CollectionDataSource[String](data), Some(double), anotherTaskContext, conf) + val another = new SourceTask(new CollectionDataSource[String](data), Some(double), + anotherTaskContext, conf) another.onStart(StartTime(0)) another.onNext(Message("next")) verify(anotherTaskContext, times(2)).output(anyObject()) } - "GroupByTask" should "group input by groupBy Function and apply attached operator for each group" in { + "GroupByTask" should "group input by groupBy Function and " + + "apply attached operator for each group" in { val data = "1 2 2 3 3 3" @@ -92,32 +99,33 @@ class OpTranslatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll { left + right }, "concat") - implicit val system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) - val config = UserConfig.empty.withValue[SingleInputFunction[String, String]](GEARPUMP_STREAMING_OPERATOR, concat) + implicit val system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) + val config = UserConfig.empty.withValue[SingleInputFunction[String, String]]( + GEARPUMP_STREAMING_OPERATOR, concat) val taskContext = MockUtil.mockTaskContext val task = new GroupByTask[String, String, String](input => input, taskContext, config) task.onStart(StartTime(0)) - val peopleCaptor = ArgumentCaptor.forClass(classOf[Message]); + val peopleCaptor = ArgumentCaptor.forClass(classOf[Message]) data.split("\\s+").foreach { word => task.onNext(Message(word)) } - verify(taskContext, times(6)).output(peopleCaptor.capture()); + verify(taskContext, times(6)).output(peopleCaptor.capture()) import scala.collection.JavaConverters._ val values = peopleCaptor.getAllValues().asScala.map(input => input.msg.asInstanceOf[String]) assert(values.mkString(",") == "1,2,22,3,33,333") - system.shutdown - system.awaitTermination() + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) } "MergeTask" should "accept two stream and apply the attached operator" in { - //source with transformer + // Source with transformer val taskContext = MockUtil.mockTaskContext val conf = UserConfig.empty val double = new FlatMapFunction[String, String](word => List(word, word), "double") @@ -126,7 +134,7 @@ class OpTranslatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll { val data = "1 2 2 3 3 3".split("\\s+") - data.foreach{input => + data.foreach { input => task.onNext(Message(input)) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/executor/ExecutorSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/executor/ExecutorSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/executor/ExecutorSpec.scala index 151a188..1d94776 100644 --- a/streaming/src/test/scala/io/gearpump/streaming/executor/ExecutorSpec.scala +++ b/streaming/src/test/scala/io/gearpump/streaming/executor/ExecutorSpec.scala @@ -1,27 +1,34 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package io.gearpump.streaming.executor +import scala.concurrent.Await +import scala.concurrent.duration.Duration + import akka.actor.{ActorSystem, Props} import akka.testkit.TestProbe -import io.gearpump.WorkerId +import org.mockito.Matchers._ +import org.mockito.Mockito.{times, _} +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} + import io.gearpump.cluster.appmaster.WorkerInfo import io.gearpump.cluster.scheduler.Resource +import io.gearpump.cluster.worker.WorkerId import io.gearpump.cluster.{ExecutorContext, TestUtil, UserConfig} import io.gearpump.streaming.AppMasterToExecutor._ import io.gearpump.streaming.ExecutorToAppMaster.RegisterTask @@ -30,12 +37,6 @@ import io.gearpump.streaming.executor.TaskLauncherSpec.MockTask import io.gearpump.streaming.task.{Subscriber, TaskId} import io.gearpump.streaming.{LifeTime, ProcessorDescription} import io.gearpump.transport.HostPort -import org.mockito.Matchers._ -import org.mockito.Mockito.{times, _} -import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} - -import scala.language.postfixOps - class ExecutorSpec extends FlatSpec with Matchers with BeforeAndAfterAll { val appId = 0 @@ -51,22 +52,25 @@ class ExecutorSpec extends FlatSpec with Matchers with BeforeAndAfterAll { } override def afterAll(): Unit = { - system.shutdown() - system.awaitTermination() + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) } it should "call launcher to launch task" in { val worker = TestProbe() val workerInfo = WorkerInfo(workerId, worker.ref) - val executorContext = ExecutorContext(executorId, workerInfo, appId, "app", appMaster.ref, Resource(2)) + val executorContext = ExecutorContext(executorId, workerInfo, appId, "app", + appMaster.ref, Resource(2)) val taskLauncher = mock(classOf[ITaskLauncher]) val executor = system.actorOf(Props(new Executor(executorContext, userConf, taskLauncher))) - val processor = ProcessorDescription(id = 0, taskClass = classOf[MockTask].getName, parallelism = 2) + val processor = ProcessorDescription(id = 0, taskClass = classOf[MockTask].getName, + parallelism = 2) val taskIds = List(TaskId(0, 0), TaskId(0, 1)) - val launchTasks = LaunchTasks(taskIds, dagVersion = 0, processor, List.empty[Subscriber]) + val launchTasks = LaunchTasks(taskIds, dagVersion = 0, processor, List.empty[Subscriber]) val task = TestProbe() - when(taskLauncher.launch(any(), any(), any(), any(), any())).thenReturn(taskIds.map((_, task.ref)).toMap) + when(taskLauncher.launch(any(), any(), any(), any(), any())) + .thenReturn(taskIds.map((_, task.ref)).toMap) val client = TestProbe() client.send(executor, launchTasks) @@ -91,7 +95,8 @@ class ExecutorSpec extends FlatSpec with Matchers with BeforeAndAfterAll { task.expectMsgType[StartTask] task.expectMsgType[StartTask] - val changeTasks = ChangeTasks(taskIds, dagVersion = 1, life = LifeTime(0, Long.MaxValue), List.empty[Subscriber]) + val changeTasks = ChangeTasks(taskIds, dagVersion = 1, life = LifeTime(0, Long.MaxValue), + List.empty[Subscriber]) client.send(executor, changeTasks) client.expectMsgType[TasksChanged] http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/executor/TaskArgumentStoreSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/executor/TaskArgumentStoreSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/executor/TaskArgumentStoreSpec.scala index d220e60..c97ae4c 100644 --- a/streaming/src/test/scala/io/gearpump/streaming/executor/TaskArgumentStoreSpec.scala +++ b/streaming/src/test/scala/io/gearpump/streaming/executor/TaskArgumentStoreSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,13 +17,13 @@ */ package io.gearpump.streaming.executor -import io.gearpump.streaming.executor.TaskLauncher.TaskArgument import org.scalatest._ -import io.gearpump.streaming.executor.Executor.{TaskArgumentStore} -import scala.language.postfixOps + +import io.gearpump.streaming.executor.Executor.TaskArgumentStore +import io.gearpump.streaming.executor.TaskLauncher.TaskArgument import io.gearpump.streaming.task.TaskId -class TaskArgumentStoreSpec extends FlatSpec with Matchers with BeforeAndAfterEach { +class TaskArgumentStoreSpec extends FlatSpec with Matchers with BeforeAndAfterEach { it should "retain all history of taskArgument" in { val version0 = TaskArgument(0, null, null) val version2 = version0.copy(dagVersion = 2) @@ -32,15 +32,14 @@ class TaskArgumentStoreSpec extends FlatSpec with Matchers with BeforeAndAfterE store.add(task, version0) store.add(task, version2) - // we should return a version which is same or older than expected version + // Should return a version which is same or older than expected version assert(store.get(dagVersion = 1, task) == Some(version0)) assert(store.get(dagVersion = 0, task) == Some(version0)) assert(store.get(dagVersion = 2, task) == Some(version2)) - store.removeObsoleteVersion + store.removeObsoleteVersion() assert(store.get(dagVersion = 1, task) == None) assert(store.get(dagVersion = 0, task) == None) assert(store.get(dagVersion = 2, task) == Some(version2)) } - } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/executor/TaskLauncherSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/executor/TaskLauncherSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/executor/TaskLauncherSpec.scala index f9decf5..c135c5b 100644 --- a/streaming/src/test/scala/io/gearpump/streaming/executor/TaskLauncherSpec.scala +++ b/streaming/src/test/scala/io/gearpump/streaming/executor/TaskLauncherSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,19 +17,21 @@ */ package io.gearpump.streaming.executor +import scala.concurrent.Await +import scala.concurrent.duration.Duration + import akka.actor.{Actor, ActorSystem} import akka.testkit.TestProbe +import org.scalatest._ + import io.gearpump.cluster.{TestUtil, UserConfig} import io.gearpump.serializer.SerializationFramework import io.gearpump.streaming.ProcessorDescription import io.gearpump.streaming.executor.TaskLauncher.TaskArgument import io.gearpump.streaming.executor.TaskLauncherSpec.{MockTask, MockTaskActor} import io.gearpump.streaming.task.{Task, TaskContext, TaskContextData, TaskId, TaskWrapper} -import org.scalatest._ - -import scala.language.postfixOps -class TaskLauncherSpec extends FlatSpec with Matchers with BeforeAndAfterAll { +class TaskLauncherSpec extends FlatSpec with Matchers with BeforeAndAfterAll { val appId = 0 val executorId = 0 var appMaster: TestProbe = null @@ -42,17 +44,20 @@ class TaskLauncherSpec extends FlatSpec with Matchers with BeforeAndAfterAll { } override def afterAll(): Unit = { - system.shutdown() - system.awaitTermination() + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) } it should "able to launch tasks" in { - val launcher = new TaskLauncher(appId, "app", executorId, appMaster.ref, userConf, classOf[MockTaskActor]) + val launcher = new TaskLauncher(appId, "app", executorId, appMaster.ref, + userConf, classOf[MockTaskActor]) val taskIds = List(TaskId(0, 0), TaskId(0, 1)) - val processor = ProcessorDescription(id = 0, taskClass = classOf[MockTask].getName, parallelism = 2) + val processor = ProcessorDescription(id = 0, taskClass = classOf[MockTask].getName, + parallelism = 2) val argument = TaskArgument(0, processor, null) - val tasks = launcher.launch(taskIds, argument, system, null, "gearpump.shared-thread-pool-dispatcher") + val tasks = launcher.launch(taskIds, argument, system, null, + "gearpump.shared-thread-pool-dispatcher") tasks.keys.toSet shouldBe taskIds.toSet } } @@ -67,6 +72,7 @@ object TaskLauncherSpec { def receive: Receive = null } - class MockTask(taskContext : TaskContext, userConf : UserConfig) extends Task(taskContext, userConf) { + class MockTask(taskContext: TaskContext, userConf: UserConfig) + extends Task(taskContext, userConf) { } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/metrics/ProcessorAggregatorSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/metrics/ProcessorAggregatorSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/metrics/ProcessorAggregatorSpec.scala index c147a4b..da1ca9f 100644 --- a/streaming/src/test/scala/io/gearpump/streaming/metrics/ProcessorAggregatorSpec.scala +++ b/streaming/src/test/scala/io/gearpump/streaming/metrics/ProcessorAggregatorSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,26 +18,27 @@ package io.gearpump.streaming.metrics +import scala.collection.JavaConverters._ +import scala.util.Random + +import org.scalatest.{FlatSpec, Matchers} + import io.gearpump.cluster.ClientToMaster.ReadOption import io.gearpump.cluster.MasterToClient.HistoryMetricsItem -import io.gearpump.metrics.Metrics.{Gauge, Meter, Histogram} -import io.gearpump.streaming.metrics.ProcessorAggregator.{AggregatorFactory, MeterAggregator, HistogramAggregator, MultiLayerMap} +import io.gearpump.metrics.Metrics.{Gauge, Histogram, Meter} +import io.gearpump.streaming.metrics.ProcessorAggregator.{AggregatorFactory, HistogramAggregator, MeterAggregator, MultiLayerMap} import io.gearpump.streaming.task.TaskId import io.gearpump.util.HistoryMetricsService.HistoryMetricsConfig -import org.scalatest.{Matchers, FlatSpec} -import scala.collection.JavaConverters._ -import scala.util.Random class ProcessorAggregatorSpec extends FlatSpec with Matchers { - "MultiLayerMap" should "maintain multiple layers HashMap" in { val layers = 3 val map = new MultiLayerMap[String](layers) assert(map.get(layer = 0, "key") == null) - // illegal, handle safely + // Illegal, handle safely assert(map.get(layer = 10, "key") == null) map.put(layer = 0, "key", "value") @@ -47,7 +48,7 @@ class ProcessorAggregatorSpec extends FlatSpec with Matchers { map.put(layer = 2, "key3", "value3") map.put(layer = 2, "key4", "value4") - // illegal, should be ignored + // Illegal, should be ignored map.put(layer = 4, "key5", "value5") assert(map.size == 4) @@ -69,15 +70,15 @@ class ProcessorAggregatorSpec extends FlatSpec with Matchers { val result = aggregator.result - //pick old time as aggregated time + // Picks old time as aggregated time assert(result.time == olderTime) - // do average + // Does average val check = result.value.asInstanceOf[Histogram] assert(check.mean - expect.mean < 0.01) assert(check.stddev - expect.stddev < 0.01) - assert(check.median - expect.median< 0.01) + assert(check.median - expect.median < 0.01) assert(check.p95 - expect.p95 < 0.01) assert(check.p99 - expect.p99 < 0.01) assert(check.p999 - expect.p999 < 0.01) @@ -98,13 +99,13 @@ class ProcessorAggregatorSpec extends FlatSpec with Matchers { val result = aggregator.result - //pick old time + // Picks old time assert(result.time == olderTime) - // do summing + // Does summing val check = result.value.asInstanceOf[Meter] - assert(check.count == expect.count) + assert(check.count == expect.count) assert(check.m1 - expect.m1 < 0.01) assert(check.meanRate - expect.meanRate < 0.01) assert(check.rateUnit == expect.rateUnit) @@ -124,25 +125,34 @@ class ProcessorAggregatorSpec extends FlatSpec with Matchers { } "ProcessorAggregator" should "aggregate on different read options" in { - val hours = 2 // maintain 2 hours history - val seconds = 2 // maintain 2 seconds recent data - val taskCount = 5 // for each processor - val metricCount = 100 // for each task, have metricCount metrics - val range = new HistoryMetricsConfig(hours, hours / 2 * 3600 * 1000, seconds, seconds / 2 * 1000) + val hours = 2 // Maintains 2 hours history + val seconds = 2 // Maintains 2 seconds recent data + val taskCount = 5 // For each processor + val metricCount = 100 // For each task, have metricCount metrics + val range = new HistoryMetricsConfig(hours, hours / 2 * 3600 * 1000, + seconds, seconds / 2 * 1000) val aggregator = new ProcessorAggregator(range) def count(value: Int): Int = value - def inputs(timeRange: Long) = { - (0 until taskCount).map(TaskId(processorId = 0, _)).flatMap(histogram(_, "receiveLatency", timeRange, metricCount)).toList ++ - (0 until taskCount).map(TaskId(processorId = 0, _)).flatMap(histogram(_, "processTime", timeRange, metricCount)).toList ++ - (0 until taskCount).map(TaskId(processorId = 1, _)).flatMap(histogram(_, "receiveLatency", timeRange, metricCount)).toList ++ - (0 until taskCount).map(TaskId(processorId = 1, _)).flatMap(histogram(_, "processTime", timeRange, metricCount)).toList ++ - (0 until taskCount).map(TaskId(processorId = 0, _)).flatMap(meter(_, "sendThroughput", timeRange, metricCount)).toList ++ - (0 until taskCount).map(TaskId(processorId = 0, _)).flatMap(meter(_, "receiveThroughput", timeRange, metricCount)).toList ++ - (0 until taskCount).map(TaskId(processorId = 1, _)).flatMap(meter(_, "sendThroughput", timeRange, metricCount)).toList ++ - (0 until taskCount).map(TaskId(processorId = 1, _)).flatMap(meter(_, "receiveThroughput", timeRange, metricCount)).toList + def inputs(timeRange: Long): List[HistoryMetricsItem] = { + (0 until taskCount).map(TaskId(processorId = 0, _)) + .flatMap(histogram(_, "receiveLatency", timeRange, metricCount)).toList ++ + (0 until taskCount).map(TaskId(processorId = 0, _)) + .flatMap(histogram(_, "processTime", timeRange, metricCount)).toList ++ + (0 until taskCount).map(TaskId(processorId = 1, _)) + .flatMap(histogram(_, "receiveLatency", timeRange, metricCount)).toList ++ + (0 until taskCount).map(TaskId(processorId = 1, _)) + .flatMap(histogram(_, "processTime", timeRange, metricCount)).toList ++ + (0 until taskCount).map(TaskId(processorId = 0, _)) + .flatMap(meter(_, "sendThroughput", timeRange, metricCount)).toList ++ + (0 until taskCount).map(TaskId(processorId = 0, _)) + .flatMap(meter(_, "receiveThroughput", timeRange, metricCount)).toList ++ + (0 until taskCount).map(TaskId(processorId = 1, _)) + .flatMap(meter(_, "sendThroughput", timeRange, metricCount)).toList ++ + (0 until taskCount).map(TaskId(processorId = 1, _)) + .flatMap(meter(_, "receiveThroughput", timeRange, metricCount)).toList } def check(list: List[HistoryMetricsItem], countMap: Map[String, Int]): Boolean = { @@ -150,14 +160,15 @@ class ProcessorAggregatorSpec extends FlatSpec with Matchers { nameCount sameElements countMap } - // aggregate on processor and meterNames, + // Aggregates on processor and meterNames, val input = inputs(Long.MaxValue) - val readLatest = aggregator.aggregate(ReadOption.ReadLatest, input.iterator, now = Long.MaxValue) - assert(readLatest.size == 8) //2 processor * 4 metrics type + val readLatest = aggregator.aggregate(ReadOption.ReadLatest, + input.iterator, now = Long.MaxValue) + assert(readLatest.size == 8) // 2 processor * 4 metrics type assert(check(readLatest, Map( "app0.processor0:receiveLatency" -> count(1), "app0.processor0:processTime" -> count(1), - "app0.processor0:sendThroughput"-> count(1), + "app0.processor0:sendThroughput" -> count(1), "app0.processor0:receiveThroughput" -> count(1), "app0.processor1:receiveLatency" -> count(1), "app0.processor1:processTime" -> count(1), @@ -165,13 +176,14 @@ class ProcessorAggregatorSpec extends FlatSpec with Matchers { "app0.processor1:receiveThroughput" -> count(1) ))) - // aggregate on processor and meterNames and time range, - val readRecent = aggregator.aggregate(ReadOption.ReadRecent, inputs(seconds * 1000).iterator, now = seconds * 1000) - assert(readRecent.size == 16) //2 processor * 4 metrics type * 2 time range + // Aggregates on processor and meterNames and time range, + val readRecent = aggregator.aggregate(ReadOption.ReadRecent, + inputs(seconds * 1000).iterator, now = seconds * 1000) + assert(readRecent.size == 16) // 2 processor * 4 metrics type * 2 time range assert(check(readRecent, Map( "app0.processor0:receiveLatency" -> count(2), "app0.processor0:processTime" -> count(2), - "app0.processor0:sendThroughput"-> count(2), + "app0.processor0:sendThroughput" -> count(2), "app0.processor0:receiveThroughput" -> count(2), "app0.processor1:receiveLatency" -> count(2), "app0.processor1:processTime" -> count(2), @@ -179,13 +191,14 @@ class ProcessorAggregatorSpec extends FlatSpec with Matchers { "app0.processor1:receiveThroughput" -> count(2) ))) - // aggregate on processor and meterNames and time range, - val readHistory = aggregator.aggregate(ReadOption.ReadHistory, inputs(hours * 3600 * 1000).iterator, now = hours * 3600 * 1000) - assert(readHistory.size == 16) //2 processor * 4 metrics type * 2 time ranges + // Aggregates on processor and meterNames and time range, + val readHistory = aggregator.aggregate(ReadOption.ReadHistory, + inputs(hours * 3600 * 1000).iterator, now = hours * 3600 * 1000) + assert(readHistory.size == 16) // 2 processor * 4 metrics type * 2 time ranges assert(check(readHistory, Map( "app0.processor0:receiveLatency" -> count(2), "app0.processor0:processTime" -> count(2), - "app0.processor0:sendThroughput"-> count(2), + "app0.processor0:sendThroughput" -> count(2), "app0.processor0:receiveThroughput" -> count(2), "app0.processor1:receiveLatency" -> count(2), "app0.processor1:processTime" -> count(2), @@ -194,9 +207,11 @@ class ProcessorAggregatorSpec extends FlatSpec with Matchers { ))) } - private def histogram(taskId: TaskId, metricName: String = "latency", timeRange: Long = Long.MaxValue, repeat: Int = 1): List[HistoryMetricsItem] = { + private def histogram( + taskId: TaskId, metricName: String = "latency", timeRange: Long = Long.MaxValue, + repeat: Int = 1): List[HistoryMetricsItem] = { val random = new Random() - (0 until repeat).map {_ => + (0 until repeat).map { _ => new HistoryMetricsItem(Math.abs(random.nextLong() % timeRange), new Histogram(s"app0.processor${taskId.processorId}.task${taskId.index}:$metricName", Math.abs(random.nextDouble()), @@ -209,7 +224,8 @@ class ProcessorAggregatorSpec extends FlatSpec with Matchers { }.toList } - private def meter(taskId: TaskId, metricName: String, timeRange: Long, repeat: Int): List[HistoryMetricsItem] = { + private def meter(taskId: TaskId, metricName: String, timeRange: Long, repeat: Int) + : List[HistoryMetricsItem] = { val random = new Random() (0 until repeat).map { _ => new HistoryMetricsItem(Math.abs(random.nextLong() % timeRange), @@ -225,19 +241,20 @@ class ProcessorAggregatorSpec extends FlatSpec with Matchers { "ProcessorAggregator" should "handle smoothly for unsupported metric type and " + "error formatted metric name" in { val invalid = List( - // unsupported metric type + // Unsupported metric type HistoryMetricsItem(0, new Gauge("app0.processor0.task0:gauge", 100)), - //wrong format: should be app0.processor0.task0:throughput + // Wrong format: should be app0.processor0.task0:throughput HistoryMetricsItem(0, new Meter("app0.processor0.task0/throughput", 100, 0, 0, "")) ) val valid = histogram(TaskId(0, 0), repeat = 10) val aggregator = new ProcessorAggregator(new HistoryMetricsConfig(0, 0, 0, 0)) - val result = aggregator.aggregate(ReadOption.ReadLatest, (valid ++ invalid).toIterator, now = Long.MaxValue) + val result = aggregator.aggregate(ReadOption.ReadLatest, (valid ++ invalid).toIterator, + now = Long.MaxValue) - // for one taskId, will only use one data point. + // For one taskId, will only use one data point. assert(result.size == 1) assert(result.head.value.name == "app0.processor0:latency") } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/metrics/TaskFilterAggregatorSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/metrics/TaskFilterAggregatorSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/metrics/TaskFilterAggregatorSpec.scala index 7924acb..f654eb9 100644 --- a/streaming/src/test/scala/io/gearpump/streaming/metrics/TaskFilterAggregatorSpec.scala +++ b/streaming/src/test/scala/io/gearpump/streaming/metrics/TaskFilterAggregatorSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,26 +18,27 @@ package io.gearpump.streaming.metrics +import scala.util.Random + +import org.scalatest.{FlatSpec, Matchers} + import io.gearpump.cluster.MasterToClient.HistoryMetricsItem -import io.gearpump.metrics.Metrics.{Meter, Gauge, Histogram} +import io.gearpump.metrics.Metrics.{Histogram, Meter} import io.gearpump.streaming.metrics.TaskFilterAggregator.Options -import io.gearpump.streaming.task.{StartTime, TaskId} -import org.scalatest.{Matchers, FlatSpec} - -import scala.util.Random +import io.gearpump.streaming.task.TaskId -class TaskFilterAggregatorSpec extends FlatSpec with Matchers { +class TaskFilterAggregatorSpec extends FlatSpec with Matchers { def metric(taskId: TaskId): HistoryMetricsItem = { val random = new Random() new HistoryMetricsItem(Math.abs(random.nextLong()), new Histogram(s"app0.processor${taskId.processorId}.task${taskId.index}:latency", - 0, 0,0,0,0,0)) + 0, 0, 0, 0, 0, 0)) } it should "filter data on processor range, task range combination" in { - val inputs = (0 until 10).flatMap{processor => - (0 until 10).map{ task => + val inputs = (0 until 10).flatMap { processor => + (0 until 10).map { task => metric(TaskId(processor, task)) } }.toList @@ -45,17 +46,17 @@ class TaskFilterAggregatorSpec extends FlatSpec with Matchers { val globalLimit = 10 val aggregator = new TaskFilterAggregator(globalLimit) - // limit not met, return all matches in this matrix + // Limit not met, return all matches in this matrix var options = new Options(limit = 20, startTask = 3, endTask = 6, startProcessor = 3, endProcessor = 6) assert(aggregator.aggregate(options, inputs.iterator).size == 9) - // user limit reached + // User limit reached options = new Options(limit = 3, startTask = 3, endTask = 5, startProcessor = 3, endProcessor = 5) assert(aggregator.aggregate(options, inputs.iterator).size == 3) - // global limit reached + // Global limit reached options = new Options(limit = 20, startTask = 3, endTask = 8, startProcessor = 3, endProcessor = 8) assert(aggregator.aggregate(options, inputs.iterator).size == globalLimit) @@ -67,8 +68,8 @@ class TaskFilterAggregatorSpec extends FlatSpec with Matchers { } it should "skip wrong format metrics" in { - val invalid = List{ - //wrong format: should be app0.processor0.task0:throughput + val invalid = List { + // Wrong format: should be app0.processor0.task0:throughput HistoryMetricsItem(0, new Meter("app0.processor0.task0/throughput", 100, 0, 0, "")) } val options = Options.acceptAll http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/source/DefaultTimeStampFilterSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/source/DefaultTimeStampFilterSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/source/DefaultTimeStampFilterSpec.scala index 10bc48e..752a8f4 100644 --- a/streaming/src/test/scala/io/gearpump/streaming/source/DefaultTimeStampFilterSpec.scala +++ b/streaming/src/test/scala/io/gearpump/streaming/source/DefaultTimeStampFilterSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,10 +18,11 @@ package io.gearpump.streaming.source -import io.gearpump.{TimeStamp, Message} import org.scalacheck.Gen -import org.scalatest.{Matchers, PropSpec} import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} + +import io.gearpump.{Message, TimeStamp} class DefaultTimeStampFilterSpec extends PropSpec with PropertyChecks with Matchers { property("DefaultTimeStampFilter should filter message against give timestamp") { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/state/impl/CheckpointManagerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/state/impl/CheckpointManagerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/state/impl/CheckpointManagerSpec.scala index 483200f..ea670f6 100644 --- a/streaming/src/test/scala/io/gearpump/streaming/state/impl/CheckpointManagerSpec.scala +++ b/streaming/src/test/scala/io/gearpump/streaming/state/impl/CheckpointManagerSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,14 +18,15 @@ package io.gearpump.streaming.state.impl -import io.gearpump.TimeStamp -import io.gearpump.streaming.transaction.api.CheckpointStore -import org.mockito.{Matchers => MockitoMatchers} import org.mockito.Mockito._ +import org.mockito.{Matchers => MockitoMatchers} import org.scalacheck.Gen import org.scalatest.mock.MockitoSugar -import org.scalatest.{Matchers, PropSpec} import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} + +import io.gearpump.TimeStamp +import io.gearpump.streaming.transaction.api.CheckpointStore class CheckpointManagerSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { @@ -56,7 +57,7 @@ class CheckpointManagerSpec extends PropSpec with PropertyChecks with Matchers w } } - property("CheckpointManager should close CheckpointStore") { + property("CheckpointManager should close CheckpointStore") { forAll(checkpointIntervalGen) { (checkpointInterval: Long) => val checkpointStore = mock[CheckpointStore] @@ -85,5 +86,4 @@ class CheckpointManagerSpec extends PropSpec with PropertyChecks with Matchers w checkpointManager.getCheckpointTime shouldBe empty } } - } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/state/impl/InMemoryCheckpointStoreSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/state/impl/InMemoryCheckpointStoreSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/state/impl/InMemoryCheckpointStoreSpec.scala index 86996ab..4bbad3b 100644 --- a/streaming/src/test/scala/io/gearpump/streaming/state/impl/InMemoryCheckpointStoreSpec.scala +++ b/streaming/src/test/scala/io/gearpump/streaming/state/impl/InMemoryCheckpointStoreSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/state/impl/NonWindowStateSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/state/impl/NonWindowStateSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/state/impl/NonWindowStateSpec.scala index e420a86..b20b666 100644 --- a/streaming/src/test/scala/io/gearpump/streaming/state/impl/NonWindowStateSpec.scala +++ b/streaming/src/test/scala/io/gearpump/streaming/state/impl/NonWindowStateSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,15 +18,16 @@ package io.gearpump.streaming.state.impl -import io.gearpump.TimeStamp -import io.gearpump.streaming.state.api.{Monoid, Serializer} +import scala.util.Success + import org.mockito.Mockito._ import org.scalacheck.Gen import org.scalatest.mock.MockitoSugar import org.scalatest.prop.PropertyChecks import org.scalatest.{Matchers, PropSpec} -import scala.util.Success +import io.gearpump.TimeStamp +import io.gearpump.streaming.state.api.{Monoid, Serializer} class NonWindowStateSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { @@ -89,7 +90,6 @@ class NonWindowStateSpec extends PropSpec with PropertyChecks with Matchers with state.left shouldBe plus state.right shouldBe zero state.get shouldBe Some(plus) - } } @@ -129,5 +129,4 @@ class NonWindowStateSpec extends PropSpec with PropertyChecks with Matchers with state.get shouldBe Some(plus) } } - } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/state/impl/WindowSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/state/impl/WindowSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/state/impl/WindowSpec.scala index f0489a3..e12ec9b 100644 --- a/streaming/src/test/scala/io/gearpump/streaming/state/impl/WindowSpec.scala +++ b/streaming/src/test/scala/io/gearpump/streaming/state/impl/WindowSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,11 +18,12 @@ package io.gearpump.streaming.state.impl -import io.gearpump.TimeStamp import org.scalacheck.Gen import org.scalatest.mock.MockitoSugar -import org.scalatest.{Matchers, PropSpec} import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} + +import io.gearpump.TimeStamp class WindowSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { @@ -43,10 +44,10 @@ class WindowSpec extends PropSpec with PropertyChecks with Matchers with Mockito forAll(timestampGen, windowSizeGen, windowStepGen) { (timestamp: TimeStamp, windowSize: Long, windowStep: Long) => val window = new Window(windowSize, windowStep) - window.range shouldBe (0L, windowSize) + window.range shouldBe(0L, windowSize) window.slideOneStep() - window.range shouldBe (windowStep, windowSize + windowStep) + window.range shouldBe(windowStep, windowSize + windowStep) window.slideTo(timestamp) val (startTime, endTime) = window.range http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/state/impl/WindowStateSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/state/impl/WindowStateSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/state/impl/WindowStateSpec.scala index dd54d41..bc79ff6 100644 --- a/streaming/src/test/scala/io/gearpump/streaming/state/impl/WindowStateSpec.scala +++ b/streaming/src/test/scala/io/gearpump/streaming/state/impl/WindowStateSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,17 +18,18 @@ package io.gearpump.streaming.state.impl -import io.gearpump._ -import io.gearpump.streaming.MockUtil -import io.gearpump.streaming.state.api.{Serializer, Group} +import scala.collection.immutable.TreeMap +import scala.util.Success + import org.mockito.Mockito._ import org.scalacheck.Gen import org.scalatest.mock.MockitoSugar import org.scalatest.prop.PropertyChecks import org.scalatest.{Matchers, PropSpec} -import scala.collection.immutable.TreeMap -import scala.util.Success +import io.gearpump._ +import io.gearpump.streaming.MockUtil +import io.gearpump.streaming.state.api.{Group, Serializer} class WindowStateSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { @@ -91,7 +92,7 @@ class WindowStateSpec extends PropSpec with PropertyChecks with Matchers with Mo state.right shouldBe zero state.get shouldBe Some(zero) - val start = checkpointTime - 1 + val start = checkpointTime - 1 val end = checkpointTime + 1 val size = end - start val step = 1L @@ -130,11 +131,10 @@ class WindowStateSpec extends PropSpec with PropertyChecks with Matchers with Mo val right = mock[AnyRef] val plus = mock[AnyRef] - when(group.zero).thenReturn(zero, zero) val state = new WindowState[AnyRef](group, serializer, taskContext, window) - val start = checkpointTime - 1 + val start = checkpointTime - 1 val end = checkpointTime + 1 val size = end - start val step = 1L @@ -147,8 +147,8 @@ class WindowStateSpec extends PropSpec with PropertyChecks with Matchers with Mo when(group.plus(left, zero)).thenReturn(left, Nil: _*) when(taskContext.upstreamMinClock).thenReturn(0L) - // time < checkpointTime - // update left in current window + // Time < checkpointTime + // Update left in current window state.setNextCheckpointTime(checkpointTime) state.update(start, left) @@ -166,8 +166,8 @@ class WindowStateSpec extends PropSpec with PropertyChecks with Matchers with Mo when(group.plus(left, right)).thenReturn(plus, Nil: _*) when(taskContext.upstreamMinClock).thenReturn(0L) - // time >= checkpointTime - // update right in current window + // Time >= checkpointTime + // Update right in current window state.setNextCheckpointTime(checkpointTime) state.update(checkpointTime, right) @@ -176,10 +176,9 @@ class WindowStateSpec extends PropSpec with PropertyChecks with Matchers with Mo state.right shouldBe right state.get shouldBe Some(plus) state.getIntervalStates(start, end) shouldBe - TreeMap(Interval(start, start + step) -> left, Interval(start + step, end) -> right) - + TreeMap(Interval(start, start + step) -> left, Interval(start + step, end) -> right) - // slide window + // Slides window forward when(window.range).thenReturn((start, end), (start + step, end + step)) when(window.shouldSlide).thenReturn(true) when(taskContext.upstreamMinClock).thenReturn(checkpointTime) @@ -197,10 +196,10 @@ class WindowStateSpec extends PropSpec with PropertyChecks with Matchers with Mo state.right shouldBe plus state.get shouldBe Some(plus) state.getIntervalStates(start, end + step) shouldBe - TreeMap( - Interval(start, start + step) -> left, - Interval(start + step, end) -> right, - Interval(end, end + step) -> right) + TreeMap( + Interval(start, start + step) -> left, + Interval(start + step, end) -> right, + Interval(end, end + step) -> right) } } @@ -232,7 +231,8 @@ class WindowStateSpec extends PropSpec with PropertyChecks with Matchers with Mo timestamp / windowStep * windowStep should (be <= interval.startTime) (timestamp - windowSize) / windowStep * windowStep should (be <= interval.startTime) (timestamp / windowStep + 1) * windowStep should (be >= interval.endTime) - ((timestamp - windowSize) / windowStep + 1) * windowStep + windowSize should (be >= interval.endTime) + ((timestamp - windowSize) / windowStep + 1) * windowStep + windowSize should + (be >= interval.endTime) checkpointTime should (be <= interval.startTime or be >= interval.endTime) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/storage/InMemoryAppStoreOnMasterSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/storage/InMemoryAppStoreOnMasterSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/storage/InMemoryAppStoreOnMasterSpec.scala index 5099466..60baa74 100644 --- a/streaming/src/test/scala/io/gearpump/streaming/storage/InMemoryAppStoreOnMasterSpec.scala +++ b/streaming/src/test/scala/io/gearpump/streaming/storage/InMemoryAppStoreOnMasterSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,17 +17,16 @@ */ package io.gearpump.streaming.storage -import io.gearpump.streaming.StreamingTestUtil -import io.gearpump.cluster.{MasterHarness, MiniCluster, TestUtil} -import io.gearpump.streaming.StreamingTestUtil -import io.gearpump.util.Constants -import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec} - import scala.concurrent.Await import scala.concurrent.duration._ -import scala.language.postfixOps -class InMemoryAppStoreOnMasterSpec extends WordSpec with Matchers with BeforeAndAfterAll{ +import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec} + +import io.gearpump.cluster.{MasterHarness, MiniCluster} +import io.gearpump.streaming.StreamingTestUtil +import io.gearpump.util.Constants + +class InMemoryAppStoreOnMasterSpec extends WordSpec with Matchers with BeforeAndAfterAll { implicit val timeout = Constants.FUTURE_TIMEOUT implicit val dispatcher = MasterHarness.cachedPool @@ -45,14 +44,18 @@ class InMemoryAppStoreOnMasterSpec extends WordSpec with Matchers with BeforeAnd store.put("Int_type", 1024) store.put("Tuple2_type", ("element1", 1024)) - val future1 = store.get("String_type").map { value => value.asInstanceOf[String] should be("this is a string")} - val future2 = store.get("Int_type").map { value => value.asInstanceOf[Int] should be(1024)} - val future3 = store.get("Tuple2_type").map { value => value.asInstanceOf[(String, Int)] should be(("element1", 1024))} - val future4 = store.get("key").map { value => value.asInstanceOf[Object] should be(null)} - Await.result(future1, 15 seconds) - Await.result(future2, 15 seconds) - Await.result(future3, 15 seconds) - Await.result(future4, 15 seconds) + val future1 = store.get("String_type").map { value => + value.asInstanceOf[String] should be("this is a string") + } + val future2 = store.get("Int_type").map { value => value.asInstanceOf[Int] should be(1024) } + val future3 = store.get("Tuple2_type").map { value => + value.asInstanceOf[(String, Int)] should be(("element1", 1024)) + } + val future4 = store.get("key").map { value => value.asInstanceOf[Object] should be(null) } + Await.result(future1, 15.seconds) + Await.result(future2, 15.seconds) + Await.result(future3, 15.seconds) + Await.result(future4, 15.seconds) miniCluster.shutDown } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/task/SubscriberSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/task/SubscriberSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/task/SubscriberSpec.scala index ffc6df1..ffb343e 100644 --- a/streaming/src/test/scala/io/gearpump/streaming/task/SubscriberSpec.scala +++ b/streaming/src/test/scala/io/gearpump/streaming/task/SubscriberSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,29 +17,32 @@ */ package io.gearpump.streaming.task +import org.scalatest.{FlatSpec, Matchers} + import io.gearpump.partitioner.{HashPartitioner, Partitioner} import io.gearpump.streaming.task.SubscriberSpec.TestTask import io.gearpump.streaming.{DAG, ProcessorDescription} import io.gearpump.util.Graph import io.gearpump.util.Graph._ -import org.scalatest.{FlatSpec, Matchers} -class SubscriberSpec extends FlatSpec with Matchers { +class SubscriberSpec extends FlatSpec with Matchers { "Subscriber.of" should "return all subscriber for a processor" in { val sourceProcessorId = 0 - val task1 = ProcessorDescription(id = sourceProcessorId, taskClass = classOf[TestTask].getName, parallelism = 1) + val task1 = ProcessorDescription(id = sourceProcessorId, taskClass = + classOf[TestTask].getName, parallelism = 1) val task2 = ProcessorDescription(id = 1, taskClass = classOf[TestTask].getName, parallelism = 1) val task3 = ProcessorDescription(id = 2, taskClass = classOf[TestTask].getName, parallelism = 1) val partitioner = Partitioner[HashPartitioner] - val dag = DAG(Graph(task1 ~ partitioner ~> task2, task1 ~ partitioner ~> task3, task2 ~ partitioner ~> task3)) - + val dag = DAG(Graph(task1 ~ partitioner ~> task2, task1 ~ partitioner ~> task3, + task2 ~ partitioner ~> task3)) val subscribers = Subscriber.of(sourceProcessorId, dag) assert(subscribers.size == 2) assert(subscribers.toSet == - Set(Subscriber(1, partitioner, task2.parallelism, task2.life), Subscriber(2, partitioner, task3.parallelism, task3.life))) + Set(Subscriber(1, partitioner, task2.parallelism, task2.life), Subscriber(2, partitioner, + task3.parallelism, task3.life))) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/task/SubscriptionSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/task/SubscriptionSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/task/SubscriptionSpec.scala index ecad47b..57f4b8c 100644 --- a/streaming/src/test/scala/io/gearpump/streaming/task/SubscriptionSpec.scala +++ b/streaming/src/test/scala/io/gearpump/streaming/task/SubscriptionSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -20,16 +20,15 @@ package io.gearpump.streaming.task import java.util.Random +import org.mockito.Mockito._ +import org.scalatest.mock.MockitoSugar +import org.scalatest.{FlatSpec, Matchers} + import io.gearpump.Message import io.gearpump.cluster.UserConfig -import io.gearpump.partitioner.{Partitioner, HashPartitioner} -import io.gearpump.streaming.{LifeTime, ProcessorDescription} +import io.gearpump.partitioner.{HashPartitioner, Partitioner} import io.gearpump.streaming.task.SubscriptionSpec.NextTask -import org.scalatest.{FlatSpec} - -import org.mockito.Mockito._ -import org.scalatest.{Matchers} -import org.scalatest.mock.MockitoSugar +import io.gearpump.streaming.{LifeTime, ProcessorDescription} class SubscriptionSpec extends FlatSpec with Matchers with MockitoSugar { val appId = 0 @@ -41,16 +40,18 @@ class SubscriptionSpec extends FlatSpec with Matchers with MockitoSugar { val partitioner = Partitioner[HashPartitioner] val parallism = 2 - val downstreamProcessor = ProcessorDescription(downstreamProcessorId, classOf[NextTask].getName, parallism) - val subscriber = Subscriber(downstreamProcessorId, partitioner, downstreamProcessor.parallelism, downstreamProcessor.life) + val downstreamProcessor = ProcessorDescription(downstreamProcessorId, classOf[NextTask].getName, + parallism) + val subscriber = Subscriber(downstreamProcessorId, partitioner, downstreamProcessor.parallelism, + downstreamProcessor.life) private def prepare: (Subscription, ExpressTransport) = { val transport = mock[ExpressTransport] val subscription = new Subscription(appId, executorId, taskId, subscriber, session, transport) - subscription.start + subscription.start() val expectedAckRequest = InitialAckRequest(taskId, session) - verify(transport, times(1)).transport(expectedAckRequest, TaskId(1,0), TaskId(1, 1)) + verify(transport, times(1)).transport(expectedAckRequest, TaskId(1, 0), TaskId(1, 1)) (subscription, transport) } @@ -67,41 +68,40 @@ class SubscriptionSpec extends FlatSpec with Matchers with MockitoSugar { val msg1 = new Message("1", timestamp = 70) subscription.sendMessage(msg1) - verify(transport, times(1)).transport(msg1, TaskId(1,1)) + verify(transport, times(1)).transport(msg1, TaskId(1, 1)) assert(subscription.minClock == 70) val msg2 = new Message("0", timestamp = 50) subscription.sendMessage(msg2) - verify(transport, times(1)).transport(msg2, TaskId(1,0)) + verify(transport, times(1)).transport(msg2, TaskId(1, 0)) // minClock has been set to smaller one assert(subscription.minClock == 50) val initialMinClock = subscription.minClock - //ack initial AckRequest(0) + // Acks initial AckRequest(0) subscription.receiveAck(Ack(TaskId(1, 1), 0, 0, session)) subscription.receiveAck(Ack(TaskId(1, 0), 0, 0, session)) - //send 100 messages - 100 until 200 foreach {clock => + // Sends 100 messages + 100 until 200 foreach { clock => subscription.sendMessage(Message("1", clock)) subscription.sendMessage(Message("2", clock)) } - // ack not received, minClock no change + // Ack not received, minClock no change assert(subscription.minClock == initialMinClock) subscription.receiveAck(Ack(TaskId(1, 1), 100, 100, session)) subscription.receiveAck(Ack(TaskId(1, 0), 100, 100, session)) - // ack received, minClock changed + // Ack received, minClock changed assert(subscription.minClock > initialMinClock) - - // we expect to receive two ackRequest for two downstream tasks + // Expects to receive two ackRequest for two downstream tasks val ackRequestForTask0 = AckRequest(taskId, 200, session) - verify(transport, times(1)).transport(ackRequestForTask0, TaskId(1,0)) + verify(transport, times(1)).transport(ackRequestForTask0, TaskId(1, 0)) val ackRequestForTask1 = AckRequest(taskId, 200, session) verify(transport, times(1)).transport(ackRequestForTask1, TaskId(1, 1)) @@ -109,13 +109,12 @@ class SubscriptionSpec extends FlatSpec with Matchers with MockitoSugar { it should "disallow more message sending if there is no ack back" in { val (subscription, transport) = prepare - //send 100 messages - 0 until (Subscription.MAX_PENDING_MESSAGE_COUNT * 2 + 1) foreach {clock => + // send 100 messages + 0 until (Subscription.MAX_PENDING_MESSAGE_COUNT * 2 + 1) foreach { clock => subscription.sendMessage(Message(randomMessage, clock)) } assert(subscription.allowSendingMoreMessages() == false) - } it should "report minClock as Long.MaxValue when there is no pending message" in { @@ -128,19 +127,16 @@ class SubscriptionSpec extends FlatSpec with Matchers with MockitoSugar { } private def randomMessage: String = new Random().nextInt.toString - } object SubscriptionSpec { + class NextTask(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { - class NextTask(taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) { - import taskContext.{output, self} - - override def onStart(startTime : StartTime) : Unit = { + override def onStart(startTime: StartTime): Unit = { } - override def onNext(msg : Message) : Unit = { + override def onNext(msg: Message): Unit = { } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/task/TaskActorSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/task/TaskActorSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/task/TaskActorSpec.scala index 275cdb0..a48f887 100644 --- a/streaming/src/test/scala/io/gearpump/streaming/task/TaskActorSpec.scala +++ b/streaming/src/test/scala/io/gearpump/streaming/task/TaskActorSpec.scala @@ -1,44 +1,46 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package io.gearpump.streaming.task import akka.actor.{ExtendedActorSystem, Props} import akka.testkit._ -import com.typesafe.config.ConfigFactory +import com.typesafe.config.{Config, ConfigFactory} +import org.mockito.Mockito.{mock, times, verify, when} +import org.scalatest.{BeforeAndAfterEach, Matchers, WordSpec} + import io.gearpump.Message import io.gearpump.cluster.{MasterHarness, TestUtil, UserConfig} import io.gearpump.partitioner.{HashPartitioner, Partitioner} import io.gearpump.serializer.{FastKryoSerializer, SerializationFramework} -import io.gearpump.streaming.AppMasterToExecutor.{StartTask, TaskRegistered, ChangeTask, MsgLostException, TaskChanged} +import io.gearpump.streaming.AppMasterToExecutor.{ChangeTask, MsgLostException, StartTask, TaskChanged, TaskRegistered} import io.gearpump.streaming.task.TaskActorSpec.TestTask import io.gearpump.streaming.{DAG, LifeTime, ProcessorDescription} import io.gearpump.util.Graph._ import io.gearpump.util.{Graph, Util} -import org.mockito.Mockito.{mock, times, verify, when} -import org.scalatest.{BeforeAndAfterEach, Matchers, WordSpec} - class TaskActorSpec extends WordSpec with Matchers with BeforeAndAfterEach with MasterHarness { - override def config = ConfigFactory.parseString( - """ akka.loggers = ["akka.testkit.TestEventListener"] - | akka.test.filter-leeway = 20000 - """.stripMargin). - withFallback(TestUtil.DEFAULT_CONFIG) + protected override def config: Config = { + ConfigFactory.parseString( + """ akka.loggers = ["akka.testkit.TestEventListener"] + | akka.test.filter-leeway = 20000 + """.stripMargin). + withFallback(TestUtil.DEFAULT_CONFIG) + } val appId = 0 val task1 = ProcessorDescription(id = 0, taskClass = classOf[TestTask].getName, parallelism = 1) @@ -54,7 +56,7 @@ class TaskActorSpec extends WordSpec with Matchers with BeforeAndAfterEach with var mockSerializerPool: SerializationFramework = null - override def beforeEach() = { + override def beforeEach(): Unit = { startActorSystem() mockMaster = TestProbe()(getActorSystem) @@ -71,8 +73,10 @@ class TaskActorSpec extends WordSpec with Matchers with BeforeAndAfterEach with "TaskActor" should { "register itself to AppMaster when started" in { val mockTask = mock(classOf[TaskWrapper]) - val testActor = TestActorRef[TaskActor](Props(new TaskActor(taskId1, taskContext1, UserConfig.empty, mockTask, mockSerializerPool)))(getActorSystem) - testActor ! TaskRegistered(taskId1, 0, Util.randInt) + val testActor = TestActorRef[TaskActor](Props( + new TaskActor(taskId1, taskContext1, UserConfig.empty, + mockTask, mockSerializerPool)))(getActorSystem) + testActor ! TaskRegistered(taskId1, 0, Util.randInt()) testActor ! StartTask(taskId1) implicit val system = getActorSystem @@ -84,8 +88,9 @@ class TaskActorSpec extends WordSpec with Matchers with BeforeAndAfterEach with "respond to ChangeTask" in { val mockTask = mock(classOf[TaskWrapper]) - val testActor = TestActorRef[TaskActor](Props(new TaskActor(taskId1, taskContext1, UserConfig.empty, mockTask, mockSerializerPool)))(getActorSystem) - testActor ! TaskRegistered(taskId1, 0, Util.randInt) + val testActor = TestActorRef[TaskActor](Props(new TaskActor(taskId1, taskContext1, + UserConfig.empty, mockTask, mockSerializerPool)))(getActorSystem) + testActor ! TaskRegistered(taskId1, 0, Util.randInt()) testActor ! StartTask(taskId1) mockMaster.expectMsgType[GetUpstreamMinClock] @@ -97,8 +102,9 @@ class TaskActorSpec extends WordSpec with Matchers with BeforeAndAfterEach with val mockTask = mock(classOf[TaskWrapper]) val msg = Message("test") - val testActor = TestActorRef[TaskActor](Props(new TaskActor(taskId1, taskContext1, UserConfig.empty, mockTask, mockSerializerPool)))(getActorSystem) - testActor.tell(TaskRegistered(taskId1, 0, Util.randInt), mockMaster.ref) + val testActor = TestActorRef[TaskActor](Props(new TaskActor(taskId1, taskContext1, + UserConfig.empty, mockTask, mockSerializerPool)))(getActorSystem) + testActor.tell(TaskRegistered(taskId1, 0, Util.randInt()), mockMaster.ref) testActor.tell(StartTask(taskId1), mockMaster.ref) testActor.tell(msg, testActor) @@ -107,7 +113,7 @@ class TaskActorSpec extends WordSpec with Matchers with BeforeAndAfterEach with } } - override def afterEach() = { + override def afterEach(): Unit = { shutdownActorSystem() } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/unmanagedlibs/README.md ---------------------------------------------------------------------- diff --git a/unmanagedlibs/README.md b/unmanagedlibs/README.md index 23a5895..7c51d2c 100644 --- a/unmanagedlibs/README.md +++ b/unmanagedlibs/README.md @@ -1,5 +1,3 @@ -This folder contains the unmanaged dependency files that couldn't be achieved from public repositories. +This folder contains the unmanaged dependency libraries. We will copy the jars under directory {scala-version}/ to out/target/pack/lib/ -Related issue: https://github.com/gearpump/gearpump/issues/1816 - -We built a new akka-actor artifact to replace the flawed one. \ No newline at end of file +For example, jars under 2.11/ will be copied to output/target/pack/lib when building for scala 2.11. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/version.sbt ---------------------------------------------------------------------- diff --git a/version.sbt b/version.sbt index e74ced4..7b23e02 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + version in ThisBuild := "0.8.1-SNAPSHOT" http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/yarnconf/README.md ---------------------------------------------------------------------- diff --git a/yarnconf/README.md b/yarnconf/README.md index 38fa818..87b6b3d 100644 --- a/yarnconf/README.md +++ b/yarnconf/README.md @@ -1,8 +1,10 @@ -### Put YARN configuration files under classpath +This directory contains YARN configurations that you want to use when launching Gearpump over YARN. + +### How to put YARN configuration files under classpath? Before calling "yarnclient launch", make sure you have put all YARN configuration files under classpath. Typically, you can just copy all files under $HADOOP_HOME/etc/hadoop from one of the YARN Cluster machine to "conf/yarnconf" of gearpump. -NOTE: The "conf/yarnconf" is only effecive when you run "yarnclient". When you -run other commands like "gear", "conf/yarnconf" will not be addded into classpath. \ No newline at end of file +NOTE: The "conf/yarnconf" is only effective when you run "yarnclient". When you +run other commands like "gear", "conf/yarnconf" will not be added into classpath. \ No newline at end of file
