http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala deleted file mode 100644 index 73413da..0000000 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.integrationtest.minicluster - -import scala.sys.process._ - -import org.apache.gearpump.integrationtest.Docker - -/** - * A helper to instantiate the base image for different usage. - */ -class BaseContainer(val host: String, command: String, - masterAddrs: List[(String, Int)], - tunnelPorts: Set[Int] = Set.empty) { - - private val IMAGE_NAME = "stanleyxu2005/gearpump-launcher" - private val DOCKER_IMAGE_GEARPUMP_HOME = "/opt/gearpump" - private val DOCKER_IMAGE_LOG_HOME = "/var/log/gearpump" - private val HOST_GEARPUMP_HOME = "pwd".!!.trim + "/output/target/pack" - private val HOST_LOG_HOME = { - val dir = "/tmp/gearpump" - s"mkdir -p $dir".!! - s"mktemp -p $dir -d".!!.trim - } - - private val CLUSTER_OPTS = { - masterAddrs.zipWithIndex.map { case (hostPort, index) => - s"-Dgearpump.cluster.masters.$index=${hostPort._1}:${hostPort._2}" - }.mkString(" ") - } - - def createAndStart(): String = { - Docker.createAndStartContainer(host, IMAGE_NAME, command, - environ = Map("JAVA_OPTS" -> CLUSTER_OPTS), - volumes = Map( - HOST_GEARPUMP_HOME -> DOCKER_IMAGE_GEARPUMP_HOME, - HOST_LOG_HOME -> DOCKER_IMAGE_LOG_HOME), - knownHosts = masterAddrs.map(_._1).filter(_ != host).toSet, - tunnelPorts = tunnelPorts) - } - - def killAndRemove(): Unit = { - Docker.killAndRemoveContainer(host) - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala deleted file mode 100644 index 884a8d1..0000000 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.integrationtest.minicluster - -import org.apache.log4j.Logger - -import org.apache.gearpump.cluster.MasterToAppMaster -import org.apache.gearpump.integrationtest.Docker - -/** - * A command-line client to operate a Gearpump cluster - */ -class CommandLineClient(host: String) { - - private val LOG = Logger.getLogger(getClass) - - def listApps(): Array[String] = { - gearCommand(host, "gear info").split("\n").filter( - _.startsWith("application: ") - ) - } - - def listRunningApps(): Array[String] = - listApps().filter( - _.contains(s", status: ${MasterToAppMaster.AppMasterActive}") - ) - - def queryApp(appId: Int): String = try { - listApps().filter( - _.startsWith(s"application: $appId") - ).head - } catch { - case ex: Throwable => - LOG.warn(s"swallowed an exception: $ex") - "" - } - - def submitAppAndCaptureOutput(jar: String, executorNum: Int, args: String = ""): String = { - gearCommand(host, s"gear app -verbose true -jar $jar -executors $executorNum $args") - } - - def submitApp(jar: String, args: String = ""): Int = { - LOG.debug(s"|=> Submit Application $jar...") - submitAppUse("gear app", jar, args) - } - - private def submitAppUse(launcher: String, jar: String, args: String = ""): Int = try { - gearCommand(host, s"$launcher -jar $jar $args").split("\n") - .filter(_.contains("The application id is ")).head.split(" ").last.toInt - } catch { - case ex: Throwable => - LOG.warn(s"swallowed an exception: $ex") - -1 - } - - def killApp(appId: Int): Boolean = { - tryGearCommand(host, s"gear kill -appid $appId") - } - - private def gearCommand(container: String, command: String): String = { - LOG.debug(s"|=> Gear command $command in container $container...") - Docker.execute(container, s"/opt/start $command") - } - - private def tryGearCommand(container: String, command: String): Boolean = { - LOG.debug(s"|=> Gear command $command in container $container...") - Docker.executeSilently(container, s"/opt/start $command") - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala deleted file mode 100644 index 4d439e8..0000000 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.integrationtest.minicluster - -import java.io.IOException -import scala.collection.mutable.ListBuffer - -import org.apache.log4j.Logger - -import org.apache.gearpump.integrationtest.{Docker, Util} - -/** - * This class is a test driver for end-to-end integration test. - */ -class MiniCluster { - - private val LOG = Logger.getLogger(getClass) - private val SUT_HOME = "/opt/gearpump" - - private val REST_SERVICE_PORT = 8090 - private val MASTER_PORT = 3000 - private val MASTER_ADDRS: List[(String, Int)] = { - (0 to 0).map(index => - ("master" + index, MASTER_PORT) - ).toList - } - - lazy val commandLineClient: CommandLineClient = new CommandLineClient(getMasterHosts.head) - - lazy val restClient: RestClient = { - val client = new RestClient(getMasterHosts.head, REST_SERVICE_PORT) - client - } - - private var workers: ListBuffer[String] = ListBuffer.empty - - def start(workerNum: Int = 2): Unit = { - - // Kill master - MASTER_ADDRS.foreach { case (host, _) => - if (Docker.containerExists(host)) { - Docker.killAndRemoveContainer(host) - } - } - - // Kill existing workers - workers ++= (0 until workerNum).map("worker" + _) - workers.foreach { worker => - if (Docker.containerExists(worker)) { - Docker.killAndRemoveContainer(worker) - } - } - - // Start Masters - MASTER_ADDRS.foreach({ case (host, port) => - addMasterNode(host, port) - }) - - // Start Workers - workers.foreach { worker => - val container = new BaseContainer(worker, "worker", MASTER_ADDRS) - container.createAndStart() - } - - // Check cluster status - expectRestClientAuthenticated() - expectClusterAvailable() - } - - private def addMasterNode(host: String, port: Int): Unit = { - val container = new BaseContainer(host, s"master -ip $host -port $port", MASTER_ADDRS) - container.createAndStart() - } - - def addWorkerNode(host: String): Unit = { - if (workers.find(_ == host).isEmpty) { - val container = new BaseContainer(host, "worker", MASTER_ADDRS) - container.createAndStart() - workers += host - } else { - throw new IOException(s"Cannot add new worker $host, " + - s"as worker with same hostname already exists") - } - } - - /** - * @throws RuntimeException if rest client is not authenticated after N attempts - */ - private def expectRestClientAuthenticated(): Unit = { - Util.retryUntil(() => { - restClient.login() - LOG.info("rest client has been authenticated") - true - }, "login successfully") - } - - /** - * @throws RuntimeException if service is not available after N attempts - */ - private def expectClusterAvailable(): Unit = { - Util.retryUntil(() => { - val response = restClient.queryMaster() - LOG.info(s"cluster is now available with response: $response.") - response.aliveFor > 0 - }, "cluster running") - } - - def isAlive: Boolean = { - getMasterHosts.exists(nodeIsOnline) - } - - def getNetworkGateway: String = { - Docker.getNetworkGateway(MASTER_ADDRS.head._1) - } - - def shutDown(): Unit = { - val removalHosts = (getMasterHosts ++ getWorkerHosts).toSet - .filter(nodeIsOnline).toArray - if (removalHosts.length > 0) { - Docker.killAndRemoveContainer(removalHosts) - } - workers.clear() - } - - def removeMasterNode(host: String): Unit = { - Docker.killAndRemoveContainer(host) - } - - def removeWorkerNode(host: String): Unit = { - workers -= host - Docker.killAndRemoveContainer(host) - } - - def restart(): Unit = { - shutDown() - Util.retryUntil(() => { - !(getMasterHosts ++ getWorkerHosts).exists(Docker.containerExists) - }, "all docker containers killed") - LOG.info("all docker containers have been killed. restarting...") - start() - } - - def getMastersAddresses: List[(String, Int)] = { - MASTER_ADDRS - } - - def getMasterHosts: List[String] = { - MASTER_ADDRS.map({ case (host, port) => host }) - } - - def getWorkerHosts: List[String] = { - workers.toList - } - - def nodeIsOnline(host: String): Boolean = { - Docker.containerIsRunning(host) - } - - private def builtInJarsUnder(folder: String): Array[String] = { - Docker.findJars(getMasterHosts.head, s"$SUT_HOME/$folder") - } - - private def queryBuiltInJars(folder: String, subtext: String): Seq[String] = { - builtInJarsUnder(folder).filter(_.contains(subtext)) - } - - def queryBuiltInExampleJars(subtext: String): Seq[String] = { - queryBuiltInJars("examples", subtext) - } - - def queryBuiltInITJars(subtext: String): Seq[String] = { - queryBuiltInJars("integrationtest", subtext) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala deleted file mode 100644 index 1b143af..0000000 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala +++ /dev/null @@ -1,268 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.integrationtest.minicluster - -import scala.reflect.ClassTag - -import com.typesafe.config.{Config, ConfigFactory} -import org.apache.log4j.Logger -import upickle.Js -import upickle.default._ - -import org.apache.gearpump.cluster.AppMasterToMaster.MasterData -import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMastersData} -import org.apache.gearpump.cluster.MasterToClient.HistoryMetrics -import org.apache.gearpump.cluster.master.MasterSummary -import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary} -import org.apache.gearpump.cluster.{AppJar, MasterToAppMaster} -import org.apache.gearpump.integrationtest.{Docker, Util} -import org.apache.gearpump.services.AppMasterService.Status -import org.apache.gearpump.services.MasterService.{AppSubmissionResult, BuiltinPartitioners} -// NOTE: This cannot be removed!!! -import org.apache.gearpump.services.util.UpickleUtil._ -import org.apache.gearpump.streaming.ProcessorDescription -import org.apache.gearpump.streaming.appmaster.AppMaster.ExecutorBrief -import org.apache.gearpump.streaming.appmaster.DagManager.{DAGOperationResult, ReplaceProcessor} -import org.apache.gearpump.streaming.appmaster.StreamAppMasterSummary -import org.apache.gearpump.streaming.executor.Executor.ExecutorSummary -import org.apache.gearpump.util.{Constants, Graph} - -/** - * A REST client to operate a Gearpump cluster - */ -class RestClient(host: String, port: Int) { - - private val LOG = Logger.getLogger(getClass) - - private val cookieFile: String = "cookie.txt" - - implicit val graphReader: upickle.default.Reader[Graph[Int, String]] = - upickle.default.Reader[Graph[Int, String]] { - case Js.Obj(verties, edges) => - val vertexList = upickle.default.readJs[List[Int]](verties._2) - val edgeList = upickle.default.readJs[List[(Int, String, Int)]](edges._2) - Graph(vertexList, edgeList) - } - - private def decodeAs[T]( - expr: String)(implicit reader: upickle.default.Reader[T], classTag: ClassTag[T]): T = try { - read[T](expr) - } catch { - case ex: Throwable => - LOG.error(s"Failed to decode Rest response to ${classTag.runtimeClass.getSimpleName}") - throw ex - } - - def queryVersion(): String = { - curl("version") - } - - def listWorkers(): Array[WorkerSummary] = { - val resp = callApi("master/workerlist") - decodeAs[List[WorkerSummary]](resp).toArray - } - - def listRunningWorkers(): Array[WorkerSummary] = { - listWorkers().filter(_.state == MasterToAppMaster.AppMasterActive) - } - - def listApps(): Array[AppMasterData] = { - val resp = callApi("master/applist") - decodeAs[AppMastersData](resp).appMasters.toArray - } - - def listRunningApps(): Array[AppMasterData] = { - listApps().filter(_.status == MasterToAppMaster.AppMasterActive) - } - - def getNextAvailableAppId(): Int = { - listApps().length + 1 - } - - def submitApp(jar: String, executorNum: Int, args: String = "", config: String = "") - : Boolean = try { - var endpoint = "master/submitapp" - - var options = Seq(s"jar=@$jar") - if (config.length > 0) { - options :+= s"conf=@$config" - } - - options :+= s"executorcount=$executorNum" - - if (args != null && !args.isEmpty) { - options :+= "args=\"" + args + "\"" - } - - val resp = callApi(endpoint, options.map("-F " + _).mkString(" ")) - val result = decodeAs[AppSubmissionResult](resp) - assert(result.success) - true - } catch { - case ex: Throwable => - LOG.warn(s"swallowed an exception: $ex") - false - } - - def queryApp(appId: Int): AppMasterData = { - val resp = callApi(s"appmaster/$appId") - decodeAs[AppMasterData](resp) - } - - def queryAppMasterConfig(appId: Int): Config = { - val resp = callApi(s"appmaster/$appId/config") - ConfigFactory.parseString(resp) - } - - def queryStreamingAppDetail(appId: Int): StreamAppMasterSummary = { - val resp = callApi(s"appmaster/$appId?detail=true") - decodeAs[StreamAppMasterSummary](resp) - } - - def queryStreamingAppMetrics(appId: Int, current: Boolean, path: String = "processor*") - : HistoryMetrics = { - val args = if (current) "?readLatest=true" else "" - val resp = callApi(s"appmaster/$appId/metrics/app$appId.$path$args") - decodeAs[HistoryMetrics](resp) - } - - def queryExecutorSummary(appId: Int, executorId: Int): ExecutorSummary = { - val resp = callApi(s"appmaster/$appId/executor/$executorId") - decodeAs[ExecutorSummary](resp) - } - - def queryExecutorBrief(appId: Int): Array[ExecutorBrief] = { - queryStreamingAppDetail(appId).executors.toArray - } - - def queryExecutorMetrics(appId: Int, current: Boolean): HistoryMetrics = { - val args = if (current) "?readLatest=true" else "" - val resp = callApi(s"appmaster/$appId/metrics/app$appId.executor*$args") - decodeAs[HistoryMetrics](resp) - } - - def queryExecutorConfig(appId: Int, executorId: Int): Config = { - val resp = callApi(s"appmaster/$appId/executor/$executorId/config") - ConfigFactory.parseString(resp) - } - - def queryMaster(): MasterSummary = { - val resp = callApi("master") - decodeAs[MasterData](resp).masterDescription - } - - def queryMasterMetrics(current: Boolean): HistoryMetrics = { - val args = if (current) "?readLatest=true" else "" - val resp = callApi(s"master/metrics/master?$args") - decodeAs[HistoryMetrics](resp) - } - - def queryMasterConfig(): Config = { - val resp = callApi("master/config") - ConfigFactory.parseString(resp) - } - - def queryWorkerMetrics(workerId: WorkerId, current: Boolean): HistoryMetrics = { - val args = if (current) "?readLatest=true" else "" - val workerIdStr = WorkerId.render(workerId) - val resp = callApi(s"worker/$workerIdStr/metrics/worker$workerIdStr?$args") - decodeAs[HistoryMetrics](resp) - } - - def queryWorkerConfig(workerId: WorkerId): Config = { - val resp = callApi(s"worker/${WorkerId.render(workerId)}/config") - ConfigFactory.parseString(resp) - } - - def queryBuiltInPartitioners(): Array[String] = { - val resp = callApi("master/partitioners") - decodeAs[BuiltinPartitioners](resp).partitioners - } - - def uploadJar(localFilePath: String): AppJar = { - val resp = callApi(s"master/uploadjar -F jar=@$localFilePath", CRUD_POST) - decodeAs[AppJar](resp) - } - - def replaceStreamingAppProcessor(appId: Int, replaceMe: ProcessorDescription): Boolean = { - replaceStreamingAppProcessor(appId, replaceMe, false) - } - - def replaceStreamingAppProcessor( - appId: Int, replaceMe: ProcessorDescription, inheritConf: Boolean): Boolean = try { - val replaceOperation = new ReplaceProcessor(replaceMe.id, replaceMe, inheritConf) - val args = upickle.default.write(replaceOperation) - val resp = callApi(s"appmaster/$appId/dynamicdag?args=" + Util.encodeUriComponent(args), - CRUD_POST) - decodeAs[DAGOperationResult](resp) - true - } catch { - case ex: Throwable => - LOG.warn(s"swallowed an exception: $ex") - false - } - - def killAppMaster(appId: Int): Boolean = { - killExecutor(appId, Constants.APPMASTER_DEFAULT_EXECUTOR_ID) - } - - def killExecutor(appId: Int, executorId: Int): Boolean = try { - val jvmInfo = queryExecutorSummary(appId, executorId).jvmName.split("@") - val pid = jvmInfo(0).toInt - val hostname = jvmInfo(1) - Docker.killProcess(hostname, pid) - } catch { - case ex: Throwable => - LOG.warn(s"swallowed an exception: $ex") - false - } - - def killApp(appId: Int): Boolean = try { - val resp = callApi(s"appmaster/$appId", CRUD_DELETE) - resp.contains("\"status\":\"success\"") - } catch { - case ex: Throwable => - LOG.warn(s"swallowed an exception: $ex") - false - } - - def restartApp(appId: Int): Boolean = try { - val resp = callApi(s"appmaster/$appId/restart", CRUD_POST) - decodeAs[Status](resp).success - } catch { - case ex: Throwable => - LOG.warn(s"swallowed an exception: $ex") - false - } - - private val CRUD_POST = "-X POST" - private val CRUD_DELETE = "-X DELETE" - - private def callApi(endpoint: String, option: String = ""): String = { - curl(s"api/v1.0/$endpoint", Array(option, s"--cookie $cookieFile")) - } - - private def curl(endpoint: String, options: Array[String] = Array.empty[String]): String = { - Docker.curl(host, s"http://$host:$port/$endpoint", options) - } - - def login(): Unit = { - curl("login", Array(CRUD_POST, s"--cookie-jar $cookieFile", - "--data username=admin", "--data password=admin")) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala deleted file mode 100644 index 79adfc4..0000000 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.integrationtest.storm - -import scala.util.Random - -import backtype.storm.utils.{DRPCClient, Utils} - -import org.apache.gearpump.integrationtest.minicluster.{BaseContainer, MiniCluster, RestClient} -import org.apache.gearpump.integrationtest.{Docker, Util} - -class StormClient(cluster: MiniCluster, restClient: RestClient) { - - private val masterAddrs: List[(String, Int)] = cluster.getMastersAddresses - private val CONFIG_FILE = s"/opt/gearpump/storm${new Random().nextInt()}.yaml" - private val DRPC_HOST = "storm0" - private val DRPC_PORT = 3772 - private val DRPC_INVOCATIONS_PORT = 3773 - private val STORM_DRPC = "storm-drpc" - private val NIMBUS_HOST = "storm1" - private val STORM_NIMBUS = "storm nimbus" - private val STORM_APP = "/opt/start storm app" - - private val drpcContainer = new BaseContainer(DRPC_HOST, STORM_DRPC, masterAddrs, - tunnelPorts = Set(DRPC_PORT, DRPC_INVOCATIONS_PORT)) - - private val nimbusContainer = - new BaseContainer(NIMBUS_HOST, s"$STORM_NIMBUS -output $CONFIG_FILE", masterAddrs) - - def start(): Unit = { - nimbusContainer.createAndStart() - ensureNimbusRunning() - - drpcContainer.createAndStart() - ensureDrpcServerRunning() - } - - private def ensureNimbusRunning(): Unit = { - Util.retryUntil(() => { - val response = Docker.execute(NIMBUS_HOST, "grep \"port\" " + CONFIG_FILE) - // Parse format nimbus.thrift.port: '39322' - val thriftPort = response.split(" ")(1).replace("'", "").toInt - - Docker.executeSilently(NIMBUS_HOST, s"""sh -c "netstat -na | grep $thriftPort" """) - }, "Nimbus running") - } - - private def ensureDrpcServerRunning(): Unit = { - Util.retryUntil(() => { - Docker.executeSilently(DRPC_HOST, s"""sh -c "netstat -na | grep $DRPC_PORT " """) - }, "DRPC running") - } - - def submitStormApp(jar: String, mainClass: String, args: String, appName: String): Int = { - Util.retryUntil(() => { - Docker.executeSilently(NIMBUS_HOST, s"$STORM_APP -config $CONFIG_FILE " + - s"-jar $jar $mainClass $args") - restClient.listRunningApps().exists(_.appName == appName) - }, "app running") - restClient.listRunningApps().filter(_.appName == appName).head.appId - } - - def getDRPCClient(drpcServerIp: String): DRPCClient = { - val config = Utils.readDefaultConfig() - new DRPCClient(config, drpcServerIp, DRPC_PORT) - } - - def shutDown(): Unit = { - - // Cleans up the storm.yaml config file - Docker.executeSilently(NIMBUS_HOST, s"rm $CONFIG_FILE ") - drpcContainer.killAndRemove() - nimbusContainer.killAndRemove() - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/Docker.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/Docker.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/Docker.scala new file mode 100644 index 0000000..f315ad3 --- /dev/null +++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/Docker.scala @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.integrationtest + +import org.apache.log4j.Logger + +/** + * The class is used to execute Docker commands. + */ +object Docker { + + private val LOG = Logger.getLogger(getClass) + + /** + * @throws RuntimeException in case retval != 0 + */ + private def doExecute(container: String, command: String): String = { + ShellExec.execAndCaptureOutput(s"docker exec $container $command", s"EXEC $container") + } + + private def doExecuteSilently(container: String, command: String): Boolean = { + ShellExec.exec(s"docker exec $container $command", s"EXEC $container") + } + + /** + * @throws RuntimeException in case retval != 0 + */ + final def execute(container: String, command: String): String = { + trace(container, s"Execute $command") { + doExecute(container, command) + } + } + + final def executeSilently(container: String, command: String): Boolean = { + trace(container, s"Execute silently $command") { + doExecuteSilently(container, command) + } + } + + final def listContainers(): Seq[String] = { + trace("", s"Listing how many containers...") { + ShellExec.execAndCaptureOutput("docker ps -q -a", "LIST") + .split("\n").filter(_.nonEmpty) + } + } + + final def containerIsRunning(name: String): Boolean = { + trace(name, s"Check container running or not...") { + ShellExec.execAndCaptureOutput(s"docker ps -q --filter name=$name", s"FIND $name").nonEmpty + } + } + + final def getContainerIPAddr(name: String): String = { + trace(name, s"Get Ip Address") { + Docker.inspect(name, "--format={{.NetworkSettings.IPAddress}}") + } + } + + final def containerExists(name: String): Boolean = { + trace(name, s"Check container existing or not...") { + ShellExec.execAndCaptureOutput(s"docker ps -q -a --filter name=$name", s"FIND $name").nonEmpty + } + } + + /** + * @throws RuntimeException in case particular container is created already + */ + final def createAndStartContainer(name: String, image: String, command: String, + environ: Map[String, String] = Map.empty, // key, value + volumes: Map[String, String] = Map.empty, // from, to + knownHosts: Set[String] = Set.empty, + tunnelPorts: Set[Int] = Set.empty): String = { + + if (containerExists(name)) { + killAndRemoveContainer(name) + } + + trace(name, s"Create and start $name ($image)...") { + + val optsBuilder = new StringBuilder + optsBuilder.append("-d") // run in background + optsBuilder.append(" -h " + name) // use container name as hostname + optsBuilder.append(" -v /etc/localtime:/etc/localtime:ro") // synchronize timezone settings + + environ.foreach { case (key, value) => + optsBuilder.append(s" -e $key=$value") + } + volumes.foreach { case (from, to) => + optsBuilder.append(s" -v $from:$to") + } + knownHosts.foreach(host => + optsBuilder.append(" --link " + host) + ) + tunnelPorts.foreach(port => + optsBuilder.append(s" -p $port:$port") + ) + createAndStartContainer(name, optsBuilder.toString(), command, image) + } + } + + /** + * @throws RuntimeException in case particular container is created already + */ + private def createAndStartContainer( + name: String, options: String, command: String, image: String): String = { + ShellExec.execAndCaptureOutput(s"docker run $options " + + s"--name $name $image $command", s"MAKE $name") + } + + final def killAndRemoveContainer(name: String): Boolean = { + trace(name, s"kill and remove container") { + ShellExec.exec(s"docker rm -f $name", s"STOP $name") + } + } + + final def killAndRemoveContainer(names: Array[String]): Boolean = { + assert(names.length > 0) + val args = names.mkString(" ") + trace(names.mkString(","), s"kill and remove containers") { + ShellExec.exec(s"docker rm -f $args", s"STOP $args.") + } + } + + private def inspect(container: String, option: String): String = { + ShellExec.execAndCaptureOutput(s"docker inspect $option $container", s"EXEC $container") + } + + final def curl(container: String, url: String, options: Array[String] = Array.empty[String]) + : String = { + trace(container, s"curl $url") { + doExecute(container, s"curl -s ${options.mkString(" ")} $url") + } + } + + final def getHostName(container: String): String = { + trace(container, s"Get hostname of container...") { + doExecute(container, "hostname") + } + } + + final def getNetworkGateway(container: String): String = { + trace(container, s"Get gateway of container...") { + doExecute(container, "ip route").split("\\s+")(2) + } + } + final def killProcess(container: String, pid: Int, signal: String = "SIGKILL"): Boolean = { + trace(container, s"Kill process pid: $pid") { + doExecuteSilently(container, s"kill -$signal $pid") + } + } + + final def findJars(container: String, folder: String): Array[String] = { + trace(container, s"Find jars under $folder") { + doExecute(container, s"find $folder") + .split("\n").filter(_.endsWith(".jar")) + } + } + + private def trace[T](container: String, msg: String)(fun: => T): T = { + // scalastyle:off println + Console.println() // A empty line to let the output looks better. + // scalastyle:on println + LOG.debug(s"Container $container====>> $msg") + LOG.debug("INPUT==>>") + val response = fun + LOG.debug("<<==OUTPUT") + + LOG.debug(brief(response)) + + LOG.debug(s"<<====Command END. Container $container, $msg \n") + response + } + + private val PREVIEW_MAX_LENGTH = 1024 + + private def brief[T](input: T): String = { + val output = input match { + case true => + "Success|True" + case false => + "Failure|False" + case x: Array[Any] => + "Success: [" + x.mkString(",") + "]" + case x => + x.toString + } + + val preview = if (output.length > PREVIEW_MAX_LENGTH) { + output.substring(0, PREVIEW_MAX_LENGTH) + "..." + } + else { + output + } + preview + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/ShellExec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/ShellExec.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/ShellExec.scala new file mode 100644 index 0000000..25d7ee3 --- /dev/null +++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/ShellExec.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.integrationtest + +import scala.collection.JavaConverters._ +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent._ +import scala.concurrent.duration._ +import scala.sys.process._ + +import org.apache.log4j.Logger +import org.apache.storm.shade.org.eclipse.jetty.util.QuotedStringTokenizer + +/** + * The class is used to execute command in a shell + */ +object ShellExec { + + private val LOG = Logger.getLogger(getClass) + private val PROCESS_TIMEOUT = 2.minutes + + /** + * The builtin command line parser by ProcessBuilder (implicit sys.process) don't + * respect the quote chars (' and ") + */ + private def splitQuotedString(str: String): List[String] = { + val splitter = new QuotedStringTokenizer(str, " \t\n\r") + splitter.asInstanceOf[java.util.Enumeration[String]].asScala.toList + } + + def exec(command: String, sender: String, timeout: Duration = PROCESS_TIMEOUT): Boolean = { + LOG.debug(s"$sender => `$command`") + + val p = splitQuotedString(command).run() + val f = Future(blocking(p.exitValue())) // wrap in Future + val retval = { + try { + Await.result(f, timeout) + } catch { + case _: TimeoutException => + LOG.error(s"timeout to execute command `$command`") + p.destroy() + p.exitValue() + } + } + LOG.debug(s"$sender <= exit $retval") + retval == 0 + } + + def execAndCaptureOutput(command: String, sender: String, timeout: Duration = PROCESS_TIMEOUT) + : String = { + LOG.debug(s"$sender => `$command`") + + val buf = new StringBuilder + val processLogger = ProcessLogger((o: String) => buf.append(o).append("\n"), + (e: String) => buf.append(e).append("\n")) + val p = splitQuotedString(command).run(processLogger) + val f = Future(blocking(p.exitValue())) // wrap in Future + val retval = { + try { + Await.result(f, timeout) + } catch { + case _: TimeoutException => + p.destroy() + p.exitValue() + } + } + val output = buf.toString().trim + val PREVIEW_MAX_LENGTH = 200 + val preview = if (output.length > PREVIEW_MAX_LENGTH) { + output.substring(0, PREVIEW_MAX_LENGTH) + "..." + } else { + output + } + + LOG.debug(s"$sender <= `$preview` exit $retval") + if (retval != 0) { + throw new RuntimeException( + s"exited ($retval) by executing `$command`") + } + output + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/Util.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/Util.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/Util.scala new file mode 100644 index 0000000..7e7085d --- /dev/null +++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/Util.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.integrationtest + +import scala.concurrent.duration._ +import scala.util.{Failure, Success, Try} + +import org.apache.log4j.Logger + +object Util { + + private val LOG = Logger.getLogger(getClass) + + def encodeUriComponent(s: String): String = { + try { + java.net.URLEncoder.encode(s, "UTF-8") + .replaceAll("\\+", "%20") + .replaceAll("\\%21", "!") + .replaceAll("\\%27", "'") + .replaceAll("\\%28", "(") + .replaceAll("\\%29", ")") + .replaceAll("\\%7E", "~") + } catch { + case ex: Throwable => s + } + } + + def retryUntil( + condition: () => Boolean, conditionDescription: String, maxTries: Int = 15, + interval: Duration = 10.seconds): Unit = { + var met = false + var tries = 0 + + while (!met && tries < maxTries) { + + met = Try(condition()) match { + case Success(true) => true + case Success(false) => false + case Failure(ex) => false + } + + tries += 1 + + if (!met) { + LOG.error(s"Failed due to (false == $conditionDescription), " + + s"retrying for the ${tries} times...") + Thread.sleep(interval.toMillis) + } else { + LOG.info(s"Success ($conditionDescription) after ${tries} retries") + } + } + + if (!met) { + throw new Exception(s"Failed after ${tries} retries, ($conditionDescription) == false") + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/hadoop/HadoopCluster.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/hadoop/HadoopCluster.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/hadoop/HadoopCluster.scala new file mode 100644 index 0000000..f836abd --- /dev/null +++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/hadoop/HadoopCluster.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.integrationtest.hadoop + +import org.apache.log4j.Logger + +import org.apache.gearpump.integrationtest.{Docker, Util} + +object HadoopCluster { + + /** Starts a Hadoop cluster */ + def withHadoopCluster(testCode: HadoopCluster => Unit): Unit = { + val hadoopCluster = new HadoopCluster + try { + hadoopCluster.start() + testCode(hadoopCluster) + } finally { + hadoopCluster.shutDown() + } + } +} +/** + * This class maintains a single node Hadoop cluster + */ +class HadoopCluster { + + private val LOG = Logger.getLogger(getClass) + private val HADOOP_DOCKER_IMAGE = "sequenceiq/hadoop-docker:2.6.0" + private val HADOOP_HOST = "hadoop0" + + def start(): Unit = { + Docker.createAndStartContainer(HADOOP_HOST, HADOOP_DOCKER_IMAGE, "") + + Util.retryUntil(() => isAlive, "Hadoop cluster is alive") + LOG.info("Hadoop cluster is started.") + } + + // Checks whether the cluster is alive by listing "/" + private def isAlive: Boolean = { + Docker.executeSilently(HADOOP_HOST, "/usr/local/hadoop/bin/hadoop fs -ls /") + } + + def getDefaultFS: String = { + val hostIPAddr = Docker.getContainerIPAddr(HADOOP_HOST) + s"hdfs://$hostIPAddr:9000" + } + + def shutDown(): Unit = { + Docker.killAndRemoveContainer(HADOOP_HOST) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/KafkaCluster.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/KafkaCluster.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/KafkaCluster.scala new file mode 100644 index 0000000..15ba084 --- /dev/null +++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/KafkaCluster.scala @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.integrationtest.kafka + +import org.apache.log4j.Logger + +import org.apache.gearpump.integrationtest.minicluster.MiniCluster +import org.apache.gearpump.integrationtest.{Docker, Util} + +object KafkaCluster { + + /** Starts a Kafka cluster */ + def withKafkaCluster(cluster: MiniCluster)(testCode: KafkaCluster => Unit): Unit = { + val kafkaCluster = new KafkaCluster(cluster.getNetworkGateway, "kafka") + try { + kafkaCluster.start() + testCode(kafkaCluster) + } finally { + kafkaCluster.shutDown() + } + } + + def withDataProducer(topic: String, brokerList: String) + (testCode: NumericalDataProducer => Unit): Unit = { + val producer = new NumericalDataProducer(topic, brokerList) + try { + producer.start() + testCode(producer) + } finally { + producer.stop() + } + } +} + +/** + * This class maintains a single node Kafka cluster with integrated Zookeeper. + */ +class KafkaCluster(val advertisedHost: String, zkChroot: String = "") { + + private val LOG = Logger.getLogger(getClass) + private val KAFKA_DOCKER_IMAGE = "spotify/kafka" + private val KAFKA_HOST = "kafka0" + private val KAFKA_HOME = "/opt/kafka_2.11-0.8.2.1/" + private val ZOOKEEPER_PORT = 2181 + private val BROKER_PORT = 9092 + val advertisedPort = BROKER_PORT + + def start(): Unit = { + Docker.createAndStartContainer(KAFKA_HOST, KAFKA_DOCKER_IMAGE, "", + environ = Map( + "ADVERTISED_HOST" -> advertisedHost, + "ADVERTISED_PORT" -> BROKER_PORT.toString, + "ZK_CHROOT" -> zkChroot), + tunnelPorts = Set(ZOOKEEPER_PORT, BROKER_PORT) + ) + Util.retryUntil(() => isAlive, "kafka cluster is alive") + LOG.debug("kafka cluster is started.") + } + + def isAlive: Boolean = { + !listTopics().contains("Connection refused") + } + + def shutDown(): Unit = { + Docker.killAndRemoveContainer(KAFKA_HOST) + } + + private lazy val hostIPAddr = Docker.getContainerIPAddr(KAFKA_HOST) + + def listTopics(): String = { + kafkaListTopics(KAFKA_HOST, KAFKA_HOME, getZookeeperConnectString) + } + + def getZookeeperConnectString: String = { + s"$hostIPAddr:$ZOOKEEPER_PORT/$zkChroot" + } + + def getBrokerListConnectString: String = { + s"$hostIPAddr:$BROKER_PORT" + } + + def createTopic(topic: String, partitions: Int = 1): Unit = { + LOG.debug(s"|=> Create kafka topic $topic with $partitions partitions") + + Docker.executeSilently(KAFKA_HOST, + s"$KAFKA_HOME/bin/kafka-topics.sh" + + s" --zookeeper $getZookeeperConnectString" + + s" --create --topic $topic --partitions $partitions --replication-factor 1") + } + + def produceDataToKafka(topic: String, messageNum: Int): Unit = { + Docker.executeSilently(KAFKA_HOST, + s"$KAFKA_HOME/bin/kafka-topics.sh" + + s" --zookeeper $getZookeeperConnectString" + + s" --create --topic $topic --partitions 1 --replication-factor 1") + + Docker.executeSilently(KAFKA_HOST, + s"$KAFKA_HOME/bin/kafka-producer-perf-test.sh" + + s" --broker-list $getBrokerListConnectString" + + s" --topic $topic --messages $messageNum") + } + + def getLatestOffset(topic: String): Int = { + kafkaFetchLatestOffset(KAFKA_HOST, topic, KAFKA_HOME, getBrokerListConnectString) + } + + private def kafkaListTopics( + container: String, kafkaHome: String, zookeeperConnectionString: String): String = { + + LOG.debug(s"|=> Kafka list topics...") + Docker.execute(container, + s"$kafkaHome/bin/kafka-topics.sh" + + s" --zookeeper $zookeeperConnectionString -list") + } + + private def kafkaFetchLatestOffset( + container: String, topic: String, kafkaHome: String, brokersList: String): Int = { + LOG.debug(s"|=> Get latest offset of topic $topic...") + val output = Docker.execute(container, + s"$kafkaHome/bin/kafka-run-class.sh kafka.tools.GetOffsetShell" + + s" --broker-list $brokersList " + + s" --topic $topic --time -1") + output.split(":")(2).toInt + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/NumericalDataProducer.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/NumericalDataProducer.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/NumericalDataProducer.scala new file mode 100644 index 0000000..1cf3125 --- /dev/null +++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/NumericalDataProducer.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.integrationtest.kafka + +import java.util.Properties + +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} +import org.apache.kafka.common.serialization.ByteArraySerializer +import org.apache.log4j.Logger + +import org.apache.gearpump.streaming.serializer.ChillSerializer + +class NumericalDataProducer(topic: String, bootstrapServers: String) { + + private val LOG = Logger.getLogger(getClass) + private val producer = createProducer + private val WRITE_SLEEP_NANOS = 10 + private val serializer = new ChillSerializer[Int] + var lastWriteNum = 0 + + def start(): Unit = { + produceThread.start() + } + + def stop(): Unit = { + if (produceThread.isAlive) { + produceThread.interrupt() + produceThread.join() + } + producer.close() + } + + /** How many message we have written in total */ + def producedNumbers: Range = { + Range(1, lastWriteNum + 1) + } + + private def createProducer: KafkaProducer[Array[Byte], Array[Byte]] = { + val properties = new Properties() + properties.setProperty("bootstrap.servers", bootstrapServers) + new KafkaProducer[Array[Byte], Array[Byte]](properties, + new ByteArraySerializer, new ByteArraySerializer) + } + + private val produceThread = new Thread(new Runnable { + override def run(): Unit = { + try { + while (!Thread.currentThread.isInterrupted) { + lastWriteNum += 1 + val msg = serializer.serialize(lastWriteNum) + val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, msg) + producer.send(record) + Thread.sleep(0, WRITE_SLEEP_NANOS) + } + } catch { + case ex: InterruptedException => + LOG.error("message producing is stopped by an interrupt") + } + } + }) +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/ResultVerifier.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/ResultVerifier.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/ResultVerifier.scala new file mode 100644 index 0000000..1f773d3 --- /dev/null +++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/ResultVerifier.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.integrationtest.kafka + +import scala.collection.mutable + +trait ResultVerifier { + def onNext(num: Int): Unit +} + +class MessageLossDetector(totalNum: Int) extends ResultVerifier { + private val bitSets = new mutable.BitSet(totalNum) + var result = List.empty[Int] + + override def onNext(num: Int): Unit = { + bitSets.add(num) + result :+= num + } + + def allReceived: Boolean = { + 1.to(totalNum).forall(bitSets) + } + + def received(num: Int): Boolean = { + bitSets(num) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/SimpleKafkaReader.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/SimpleKafkaReader.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/SimpleKafkaReader.scala new file mode 100644 index 0000000..392ca86 --- /dev/null +++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/SimpleKafkaReader.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.integrationtest.kafka + +import scala.util.{Failure, Success} + +import kafka.api.FetchRequestBuilder +import kafka.consumer.SimpleConsumer +import kafka.utils.Utils + +import org.apache.gearpump.streaming.serializer.ChillSerializer + +class SimpleKafkaReader(verifier: ResultVerifier, topic: String, partition: Int = 0, + host: String, port: Int) { + + private val consumer = new SimpleConsumer(host, port, 100000, 64 * 1024, "") + private val serializer = new ChillSerializer[Int] + private var offset = 0L + + def read(): Unit = { + val messageSet = consumer.fetch( + new FetchRequestBuilder().addFetch(topic, partition, offset, Int.MaxValue).build() + ).messageSet(topic, partition) + + for (messageAndOffset <- messageSet) { + serializer.deserialize(Utils.readBytes(messageAndOffset.message.payload)) match { + case Success(msg) => + offset = messageAndOffset.nextOffset + verifier.onNext(msg) + case Failure(e) => throw e + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/BaseContainer.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/BaseContainer.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/BaseContainer.scala new file mode 100644 index 0000000..73413da --- /dev/null +++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/BaseContainer.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.integrationtest.minicluster + +import scala.sys.process._ + +import org.apache.gearpump.integrationtest.Docker + +/** + * A helper to instantiate the base image for different usage. + */ +class BaseContainer(val host: String, command: String, + masterAddrs: List[(String, Int)], + tunnelPorts: Set[Int] = Set.empty) { + + private val IMAGE_NAME = "stanleyxu2005/gearpump-launcher" + private val DOCKER_IMAGE_GEARPUMP_HOME = "/opt/gearpump" + private val DOCKER_IMAGE_LOG_HOME = "/var/log/gearpump" + private val HOST_GEARPUMP_HOME = "pwd".!!.trim + "/output/target/pack" + private val HOST_LOG_HOME = { + val dir = "/tmp/gearpump" + s"mkdir -p $dir".!! + s"mktemp -p $dir -d".!!.trim + } + + private val CLUSTER_OPTS = { + masterAddrs.zipWithIndex.map { case (hostPort, index) => + s"-Dgearpump.cluster.masters.$index=${hostPort._1}:${hostPort._2}" + }.mkString(" ") + } + + def createAndStart(): String = { + Docker.createAndStartContainer(host, IMAGE_NAME, command, + environ = Map("JAVA_OPTS" -> CLUSTER_OPTS), + volumes = Map( + HOST_GEARPUMP_HOME -> DOCKER_IMAGE_GEARPUMP_HOME, + HOST_LOG_HOME -> DOCKER_IMAGE_LOG_HOME), + knownHosts = masterAddrs.map(_._1).filter(_ != host).toSet, + tunnelPorts = tunnelPorts) + } + + def killAndRemove(): Unit = { + Docker.killAndRemoveContainer(host) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/CommandLineClient.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/CommandLineClient.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/CommandLineClient.scala new file mode 100644 index 0000000..884a8d1 --- /dev/null +++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/CommandLineClient.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.integrationtest.minicluster + +import org.apache.log4j.Logger + +import org.apache.gearpump.cluster.MasterToAppMaster +import org.apache.gearpump.integrationtest.Docker + +/** + * A command-line client to operate a Gearpump cluster + */ +class CommandLineClient(host: String) { + + private val LOG = Logger.getLogger(getClass) + + def listApps(): Array[String] = { + gearCommand(host, "gear info").split("\n").filter( + _.startsWith("application: ") + ) + } + + def listRunningApps(): Array[String] = + listApps().filter( + _.contains(s", status: ${MasterToAppMaster.AppMasterActive}") + ) + + def queryApp(appId: Int): String = try { + listApps().filter( + _.startsWith(s"application: $appId") + ).head + } catch { + case ex: Throwable => + LOG.warn(s"swallowed an exception: $ex") + "" + } + + def submitAppAndCaptureOutput(jar: String, executorNum: Int, args: String = ""): String = { + gearCommand(host, s"gear app -verbose true -jar $jar -executors $executorNum $args") + } + + def submitApp(jar: String, args: String = ""): Int = { + LOG.debug(s"|=> Submit Application $jar...") + submitAppUse("gear app", jar, args) + } + + private def submitAppUse(launcher: String, jar: String, args: String = ""): Int = try { + gearCommand(host, s"$launcher -jar $jar $args").split("\n") + .filter(_.contains("The application id is ")).head.split(" ").last.toInt + } catch { + case ex: Throwable => + LOG.warn(s"swallowed an exception: $ex") + -1 + } + + def killApp(appId: Int): Boolean = { + tryGearCommand(host, s"gear kill -appid $appId") + } + + private def gearCommand(container: String, command: String): String = { + LOG.debug(s"|=> Gear command $command in container $container...") + Docker.execute(container, s"/opt/start $command") + } + + private def tryGearCommand(container: String, command: String): Boolean = { + LOG.debug(s"|=> Gear command $command in container $container...") + Docker.executeSilently(container, s"/opt/start $command") + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/MiniCluster.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/MiniCluster.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/MiniCluster.scala new file mode 100644 index 0000000..4d439e8 --- /dev/null +++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/MiniCluster.scala @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.integrationtest.minicluster + +import java.io.IOException +import scala.collection.mutable.ListBuffer + +import org.apache.log4j.Logger + +import org.apache.gearpump.integrationtest.{Docker, Util} + +/** + * This class is a test driver for end-to-end integration test. + */ +class MiniCluster { + + private val LOG = Logger.getLogger(getClass) + private val SUT_HOME = "/opt/gearpump" + + private val REST_SERVICE_PORT = 8090 + private val MASTER_PORT = 3000 + private val MASTER_ADDRS: List[(String, Int)] = { + (0 to 0).map(index => + ("master" + index, MASTER_PORT) + ).toList + } + + lazy val commandLineClient: CommandLineClient = new CommandLineClient(getMasterHosts.head) + + lazy val restClient: RestClient = { + val client = new RestClient(getMasterHosts.head, REST_SERVICE_PORT) + client + } + + private var workers: ListBuffer[String] = ListBuffer.empty + + def start(workerNum: Int = 2): Unit = { + + // Kill master + MASTER_ADDRS.foreach { case (host, _) => + if (Docker.containerExists(host)) { + Docker.killAndRemoveContainer(host) + } + } + + // Kill existing workers + workers ++= (0 until workerNum).map("worker" + _) + workers.foreach { worker => + if (Docker.containerExists(worker)) { + Docker.killAndRemoveContainer(worker) + } + } + + // Start Masters + MASTER_ADDRS.foreach({ case (host, port) => + addMasterNode(host, port) + }) + + // Start Workers + workers.foreach { worker => + val container = new BaseContainer(worker, "worker", MASTER_ADDRS) + container.createAndStart() + } + + // Check cluster status + expectRestClientAuthenticated() + expectClusterAvailable() + } + + private def addMasterNode(host: String, port: Int): Unit = { + val container = new BaseContainer(host, s"master -ip $host -port $port", MASTER_ADDRS) + container.createAndStart() + } + + def addWorkerNode(host: String): Unit = { + if (workers.find(_ == host).isEmpty) { + val container = new BaseContainer(host, "worker", MASTER_ADDRS) + container.createAndStart() + workers += host + } else { + throw new IOException(s"Cannot add new worker $host, " + + s"as worker with same hostname already exists") + } + } + + /** + * @throws RuntimeException if rest client is not authenticated after N attempts + */ + private def expectRestClientAuthenticated(): Unit = { + Util.retryUntil(() => { + restClient.login() + LOG.info("rest client has been authenticated") + true + }, "login successfully") + } + + /** + * @throws RuntimeException if service is not available after N attempts + */ + private def expectClusterAvailable(): Unit = { + Util.retryUntil(() => { + val response = restClient.queryMaster() + LOG.info(s"cluster is now available with response: $response.") + response.aliveFor > 0 + }, "cluster running") + } + + def isAlive: Boolean = { + getMasterHosts.exists(nodeIsOnline) + } + + def getNetworkGateway: String = { + Docker.getNetworkGateway(MASTER_ADDRS.head._1) + } + + def shutDown(): Unit = { + val removalHosts = (getMasterHosts ++ getWorkerHosts).toSet + .filter(nodeIsOnline).toArray + if (removalHosts.length > 0) { + Docker.killAndRemoveContainer(removalHosts) + } + workers.clear() + } + + def removeMasterNode(host: String): Unit = { + Docker.killAndRemoveContainer(host) + } + + def removeWorkerNode(host: String): Unit = { + workers -= host + Docker.killAndRemoveContainer(host) + } + + def restart(): Unit = { + shutDown() + Util.retryUntil(() => { + !(getMasterHosts ++ getWorkerHosts).exists(Docker.containerExists) + }, "all docker containers killed") + LOG.info("all docker containers have been killed. restarting...") + start() + } + + def getMastersAddresses: List[(String, Int)] = { + MASTER_ADDRS + } + + def getMasterHosts: List[String] = { + MASTER_ADDRS.map({ case (host, port) => host }) + } + + def getWorkerHosts: List[String] = { + workers.toList + } + + def nodeIsOnline(host: String): Boolean = { + Docker.containerIsRunning(host) + } + + private def builtInJarsUnder(folder: String): Array[String] = { + Docker.findJars(getMasterHosts.head, s"$SUT_HOME/$folder") + } + + private def queryBuiltInJars(folder: String, subtext: String): Seq[String] = { + builtInJarsUnder(folder).filter(_.contains(subtext)) + } + + def queryBuiltInExampleJars(subtext: String): Seq[String] = { + queryBuiltInJars("examples", subtext) + } + + def queryBuiltInITJars(subtext: String): Seq[String] = { + queryBuiltInJars("integrationtest", subtext) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/RestClient.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/RestClient.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/RestClient.scala new file mode 100644 index 0000000..1b143af --- /dev/null +++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/RestClient.scala @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.integrationtest.minicluster + +import scala.reflect.ClassTag + +import com.typesafe.config.{Config, ConfigFactory} +import org.apache.log4j.Logger +import upickle.Js +import upickle.default._ + +import org.apache.gearpump.cluster.AppMasterToMaster.MasterData +import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMastersData} +import org.apache.gearpump.cluster.MasterToClient.HistoryMetrics +import org.apache.gearpump.cluster.master.MasterSummary +import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary} +import org.apache.gearpump.cluster.{AppJar, MasterToAppMaster} +import org.apache.gearpump.integrationtest.{Docker, Util} +import org.apache.gearpump.services.AppMasterService.Status +import org.apache.gearpump.services.MasterService.{AppSubmissionResult, BuiltinPartitioners} +// NOTE: This cannot be removed!!! +import org.apache.gearpump.services.util.UpickleUtil._ +import org.apache.gearpump.streaming.ProcessorDescription +import org.apache.gearpump.streaming.appmaster.AppMaster.ExecutorBrief +import org.apache.gearpump.streaming.appmaster.DagManager.{DAGOperationResult, ReplaceProcessor} +import org.apache.gearpump.streaming.appmaster.StreamAppMasterSummary +import org.apache.gearpump.streaming.executor.Executor.ExecutorSummary +import org.apache.gearpump.util.{Constants, Graph} + +/** + * A REST client to operate a Gearpump cluster + */ +class RestClient(host: String, port: Int) { + + private val LOG = Logger.getLogger(getClass) + + private val cookieFile: String = "cookie.txt" + + implicit val graphReader: upickle.default.Reader[Graph[Int, String]] = + upickle.default.Reader[Graph[Int, String]] { + case Js.Obj(verties, edges) => + val vertexList = upickle.default.readJs[List[Int]](verties._2) + val edgeList = upickle.default.readJs[List[(Int, String, Int)]](edges._2) + Graph(vertexList, edgeList) + } + + private def decodeAs[T]( + expr: String)(implicit reader: upickle.default.Reader[T], classTag: ClassTag[T]): T = try { + read[T](expr) + } catch { + case ex: Throwable => + LOG.error(s"Failed to decode Rest response to ${classTag.runtimeClass.getSimpleName}") + throw ex + } + + def queryVersion(): String = { + curl("version") + } + + def listWorkers(): Array[WorkerSummary] = { + val resp = callApi("master/workerlist") + decodeAs[List[WorkerSummary]](resp).toArray + } + + def listRunningWorkers(): Array[WorkerSummary] = { + listWorkers().filter(_.state == MasterToAppMaster.AppMasterActive) + } + + def listApps(): Array[AppMasterData] = { + val resp = callApi("master/applist") + decodeAs[AppMastersData](resp).appMasters.toArray + } + + def listRunningApps(): Array[AppMasterData] = { + listApps().filter(_.status == MasterToAppMaster.AppMasterActive) + } + + def getNextAvailableAppId(): Int = { + listApps().length + 1 + } + + def submitApp(jar: String, executorNum: Int, args: String = "", config: String = "") + : Boolean = try { + var endpoint = "master/submitapp" + + var options = Seq(s"jar=@$jar") + if (config.length > 0) { + options :+= s"conf=@$config" + } + + options :+= s"executorcount=$executorNum" + + if (args != null && !args.isEmpty) { + options :+= "args=\"" + args + "\"" + } + + val resp = callApi(endpoint, options.map("-F " + _).mkString(" ")) + val result = decodeAs[AppSubmissionResult](resp) + assert(result.success) + true + } catch { + case ex: Throwable => + LOG.warn(s"swallowed an exception: $ex") + false + } + + def queryApp(appId: Int): AppMasterData = { + val resp = callApi(s"appmaster/$appId") + decodeAs[AppMasterData](resp) + } + + def queryAppMasterConfig(appId: Int): Config = { + val resp = callApi(s"appmaster/$appId/config") + ConfigFactory.parseString(resp) + } + + def queryStreamingAppDetail(appId: Int): StreamAppMasterSummary = { + val resp = callApi(s"appmaster/$appId?detail=true") + decodeAs[StreamAppMasterSummary](resp) + } + + def queryStreamingAppMetrics(appId: Int, current: Boolean, path: String = "processor*") + : HistoryMetrics = { + val args = if (current) "?readLatest=true" else "" + val resp = callApi(s"appmaster/$appId/metrics/app$appId.$path$args") + decodeAs[HistoryMetrics](resp) + } + + def queryExecutorSummary(appId: Int, executorId: Int): ExecutorSummary = { + val resp = callApi(s"appmaster/$appId/executor/$executorId") + decodeAs[ExecutorSummary](resp) + } + + def queryExecutorBrief(appId: Int): Array[ExecutorBrief] = { + queryStreamingAppDetail(appId).executors.toArray + } + + def queryExecutorMetrics(appId: Int, current: Boolean): HistoryMetrics = { + val args = if (current) "?readLatest=true" else "" + val resp = callApi(s"appmaster/$appId/metrics/app$appId.executor*$args") + decodeAs[HistoryMetrics](resp) + } + + def queryExecutorConfig(appId: Int, executorId: Int): Config = { + val resp = callApi(s"appmaster/$appId/executor/$executorId/config") + ConfigFactory.parseString(resp) + } + + def queryMaster(): MasterSummary = { + val resp = callApi("master") + decodeAs[MasterData](resp).masterDescription + } + + def queryMasterMetrics(current: Boolean): HistoryMetrics = { + val args = if (current) "?readLatest=true" else "" + val resp = callApi(s"master/metrics/master?$args") + decodeAs[HistoryMetrics](resp) + } + + def queryMasterConfig(): Config = { + val resp = callApi("master/config") + ConfigFactory.parseString(resp) + } + + def queryWorkerMetrics(workerId: WorkerId, current: Boolean): HistoryMetrics = { + val args = if (current) "?readLatest=true" else "" + val workerIdStr = WorkerId.render(workerId) + val resp = callApi(s"worker/$workerIdStr/metrics/worker$workerIdStr?$args") + decodeAs[HistoryMetrics](resp) + } + + def queryWorkerConfig(workerId: WorkerId): Config = { + val resp = callApi(s"worker/${WorkerId.render(workerId)}/config") + ConfigFactory.parseString(resp) + } + + def queryBuiltInPartitioners(): Array[String] = { + val resp = callApi("master/partitioners") + decodeAs[BuiltinPartitioners](resp).partitioners + } + + def uploadJar(localFilePath: String): AppJar = { + val resp = callApi(s"master/uploadjar -F jar=@$localFilePath", CRUD_POST) + decodeAs[AppJar](resp) + } + + def replaceStreamingAppProcessor(appId: Int, replaceMe: ProcessorDescription): Boolean = { + replaceStreamingAppProcessor(appId, replaceMe, false) + } + + def replaceStreamingAppProcessor( + appId: Int, replaceMe: ProcessorDescription, inheritConf: Boolean): Boolean = try { + val replaceOperation = new ReplaceProcessor(replaceMe.id, replaceMe, inheritConf) + val args = upickle.default.write(replaceOperation) + val resp = callApi(s"appmaster/$appId/dynamicdag?args=" + Util.encodeUriComponent(args), + CRUD_POST) + decodeAs[DAGOperationResult](resp) + true + } catch { + case ex: Throwable => + LOG.warn(s"swallowed an exception: $ex") + false + } + + def killAppMaster(appId: Int): Boolean = { + killExecutor(appId, Constants.APPMASTER_DEFAULT_EXECUTOR_ID) + } + + def killExecutor(appId: Int, executorId: Int): Boolean = try { + val jvmInfo = queryExecutorSummary(appId, executorId).jvmName.split("@") + val pid = jvmInfo(0).toInt + val hostname = jvmInfo(1) + Docker.killProcess(hostname, pid) + } catch { + case ex: Throwable => + LOG.warn(s"swallowed an exception: $ex") + false + } + + def killApp(appId: Int): Boolean = try { + val resp = callApi(s"appmaster/$appId", CRUD_DELETE) + resp.contains("\"status\":\"success\"") + } catch { + case ex: Throwable => + LOG.warn(s"swallowed an exception: $ex") + false + } + + def restartApp(appId: Int): Boolean = try { + val resp = callApi(s"appmaster/$appId/restart", CRUD_POST) + decodeAs[Status](resp).success + } catch { + case ex: Throwable => + LOG.warn(s"swallowed an exception: $ex") + false + } + + private val CRUD_POST = "-X POST" + private val CRUD_DELETE = "-X DELETE" + + private def callApi(endpoint: String, option: String = ""): String = { + curl(s"api/v1.0/$endpoint", Array(option, s"--cookie $cookieFile")) + } + + private def curl(endpoint: String, options: Array[String] = Array.empty[String]): String = { + Docker.curl(host, s"http://$host:$port/$endpoint", options) + } + + def login(): Unit = { + curl("login", Array(CRUD_POST, s"--cookie-jar $cookieFile", + "--data username=admin", "--data password=admin")) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/storm/StormClient.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/storm/StormClient.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/storm/StormClient.scala new file mode 100644 index 0000000..79adfc4 --- /dev/null +++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/storm/StormClient.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.integrationtest.storm + +import scala.util.Random + +import backtype.storm.utils.{DRPCClient, Utils} + +import org.apache.gearpump.integrationtest.minicluster.{BaseContainer, MiniCluster, RestClient} +import org.apache.gearpump.integrationtest.{Docker, Util} + +class StormClient(cluster: MiniCluster, restClient: RestClient) { + + private val masterAddrs: List[(String, Int)] = cluster.getMastersAddresses + private val CONFIG_FILE = s"/opt/gearpump/storm${new Random().nextInt()}.yaml" + private val DRPC_HOST = "storm0" + private val DRPC_PORT = 3772 + private val DRPC_INVOCATIONS_PORT = 3773 + private val STORM_DRPC = "storm-drpc" + private val NIMBUS_HOST = "storm1" + private val STORM_NIMBUS = "storm nimbus" + private val STORM_APP = "/opt/start storm app" + + private val drpcContainer = new BaseContainer(DRPC_HOST, STORM_DRPC, masterAddrs, + tunnelPorts = Set(DRPC_PORT, DRPC_INVOCATIONS_PORT)) + + private val nimbusContainer = + new BaseContainer(NIMBUS_HOST, s"$STORM_NIMBUS -output $CONFIG_FILE", masterAddrs) + + def start(): Unit = { + nimbusContainer.createAndStart() + ensureNimbusRunning() + + drpcContainer.createAndStart() + ensureDrpcServerRunning() + } + + private def ensureNimbusRunning(): Unit = { + Util.retryUntil(() => { + val response = Docker.execute(NIMBUS_HOST, "grep \"port\" " + CONFIG_FILE) + // Parse format nimbus.thrift.port: '39322' + val thriftPort = response.split(" ")(1).replace("'", "").toInt + + Docker.executeSilently(NIMBUS_HOST, s"""sh -c "netstat -na | grep $thriftPort" """) + }, "Nimbus running") + } + + private def ensureDrpcServerRunning(): Unit = { + Util.retryUntil(() => { + Docker.executeSilently(DRPC_HOST, s"""sh -c "netstat -na | grep $DRPC_PORT " """) + }, "DRPC running") + } + + def submitStormApp(jar: String, mainClass: String, args: String, appName: String): Int = { + Util.retryUntil(() => { + Docker.executeSilently(NIMBUS_HOST, s"$STORM_APP -config $CONFIG_FILE " + + s"-jar $jar $mainClass $args") + restClient.listRunningApps().exists(_.appName == appName) + }, "app running") + restClient.listRunningApps().filter(_.appName == appName).head.appId + } + + def getDRPCClient(drpcServerIp: String): DRPCClient = { + val config = Utils.readDefaultConfig() + new DRPCClient(config, drpcServerIp, DRPC_PORT) + } + + def shutDown(): Unit = { + + // Cleans up the storm.yaml config file + Docker.executeSilently(NIMBUS_HOST, s"rm $CONFIG_FILE ") + drpcContainer.killAndRemove() + nimbusContainer.killAndRemove() + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/storm010/src/main/scala/io/gearpump/integrationtest/storm/Adaptor.scala ---------------------------------------------------------------------- diff --git a/integrationtest/storm010/src/main/scala/io/gearpump/integrationtest/storm/Adaptor.scala b/integrationtest/storm010/src/main/scala/io/gearpump/integrationtest/storm/Adaptor.scala deleted file mode 100644 index 67a2491..0000000 --- a/integrationtest/storm010/src/main/scala/io/gearpump/integrationtest/storm/Adaptor.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.integrationtest.storm - -import backtype.storm.topology.base.BaseBasicBolt -import backtype.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer} -import backtype.storm.tuple.{Fields, Tuple, Values} -import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper - -class Adaptor extends BaseBasicBolt { - private var id = 0L - - override def execute(tuple: Tuple, collector: BasicOutputCollector): Unit = { - val bytes = tuple.getBinary(0) - collector.emit(new Values(s"$id".getBytes, bytes)) - id += 1 - } - - override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = { - declarer.declare(new Fields(FieldNameBasedTupleToKafkaMapper.BOLT_KEY, - FieldNameBasedTupleToKafkaMapper.BOLT_MESSAGE)) - } -}
