http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/daemon/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStore.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStore.scala b/daemon/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStore.scala deleted file mode 100644 index ebaf354..0000000 --- a/daemon/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStore.scala +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.jarstore.dfs - -import java.io.{InputStream, OutputStream} - -import com.typesafe.config.Config -import org.apache.gearpump.jarstore.JarStore -import org.apache.gearpump.util.Constants -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path -import org.apache.hadoop.fs.permission.{FsAction, FsPermission} - -/** - * DFSJarStore store the uploaded jar on HDFS - */ -class DFSJarStore extends JarStore { - private var rootPath: Path = null - override val scheme: String = "hdfs" - - override def init(config: Config): Unit = { - rootPath = new Path(config.getString(Constants.GEARPUMP_APP_JAR_STORE_ROOT_PATH)) - val fs = rootPath.getFileSystem(new Configuration()) - if (!fs.exists(rootPath)) { - fs.mkdirs(rootPath, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)) - } - } - - /** - * Creates the file on JarStore. - * - * @param fileName name of the file to be created on JarStore. - * @return OutputStream returns a stream into which the data can be written. - */ - override def createFile(fileName: String): OutputStream = { - val filePath = new Path(rootPath, fileName) - val fs = filePath.getFileSystem(new Configuration()) - fs.create(filePath) - } - - /** - * Gets the InputStream to read the file - * - * @param fileName name of the file to be read on JarStore. - * @return InputStream returns a stream from which the data can be read. - */ - override def getFile(fileName: String): InputStream = { - val filePath = new Path(rootPath, fileName) - val fs = filePath.getFileSystem(new Configuration()) - fs.open(filePath) - } -}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/daemon/src/test/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore ---------------------------------------------------------------------- diff --git a/daemon/src/test/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore b/daemon/src/test/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore deleted file mode 100644 index e173a8a..0000000 --- a/daemon/src/test/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore +++ /dev/null @@ -1,20 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -org.apache.gearpump.jarstore.local.LocalJarStore -org.apache.gearpump.jarstore.dfs.DFSJarStore \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/daemon/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala ---------------------------------------------------------------------- diff --git a/daemon/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala b/daemon/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala deleted file mode 100644 index a6b75cb..0000000 --- a/daemon/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.cluster - -import scala.concurrent.duration.Duration -import scala.concurrent.{Await, Future} - -import akka.actor.{Actor, ActorRef, ActorSystem, Props} -import akka.pattern.ask -import akka.testkit.TestActorRef -import com.typesafe.config.ConfigValueFactory - -import org.apache.gearpump.cluster.AppMasterToMaster.GetAllWorkers -import org.apache.gearpump.cluster.MasterToAppMaster.WorkerList -import org.apache.gearpump.cluster.master.Master -import org.apache.gearpump.cluster.worker.Worker -import org.apache.gearpump.util.Constants - -class MiniCluster { - private val mockMasterIP = "127.0.0.1" - - implicit val system = ActorSystem("system", TestUtil.MASTER_CONFIG. - withValue(Constants.NETTY_TCP_HOSTNAME, ConfigValueFactory.fromAnyRef(mockMasterIP))) - - val (mockMaster, worker) = { - val master = system.actorOf(Props(classOf[Master]), "master") - val worker = system.actorOf(Props(classOf[Worker], master), "worker") - - // Wait until worker register itself to master - waitUtilWorkerIsRegistered(master) - (master, worker) - } - - def launchActor(props: Props): TestActorRef[Actor] = { - TestActorRef(props) - } - - private def waitUtilWorkerIsRegistered(master: ActorRef): Unit = { - while (!isWorkerRegistered(master)) {} - } - - private def isWorkerRegistered(master: ActorRef): Boolean = { - import scala.concurrent.duration._ - implicit val dispatcher = system.dispatcher - - implicit val futureTimeout = Constants.FUTURE_TIMEOUT - - val workerListFuture = (master ? GetAllWorkers).asInstanceOf[Future[WorkerList]] - - // Waits until the worker is registered. - val workers = Await.result[WorkerList](workerListFuture, 15.seconds) - workers.workers.size > 0 - } - - def shutDown(): Unit = { - system.terminate() - Await.result(system.whenTerminated, Duration.Inf) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/daemon/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala ---------------------------------------------------------------------- diff --git a/daemon/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala b/daemon/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala deleted file mode 100644 index 90fdd39..0000000 --- a/daemon/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.cluster.main - -import java.util.Properties - -import akka.testkit.TestProbe -import org.apache.gearpump.cluster.MasterToWorker.WorkerRegistered -import org.apache.gearpump.cluster.master.MasterProxy -import org.apache.gearpump.transport.HostPort - -import scala.concurrent.Future -import scala.util.{Success, Try} - -import com.typesafe.config.{ConfigFactory, Config} -import org.scalatest._ - -import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication} -import org.apache.gearpump.cluster.MasterToAppMaster.{AppMastersDataRequest, ReplayFromTimestampWindowTrailingEdge, _} -import org.apache.gearpump.cluster.MasterToClient.{ReplayApplicationResult, ResolveAppIdResult, ShutdownApplicationResult} -import org.apache.gearpump.cluster.WorkerToMaster.RegisterNewWorker -import org.apache.gearpump.cluster.{MasterHarness, TestUtil} -import org.apache.gearpump.util.Constants._ -import org.apache.gearpump.util.{Constants, LogUtil, Util} - -class MainSpec extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness { - - private val LOG = LogUtil.getLogger(getClass) - - override def config: Config = TestUtil.DEFAULT_CONFIG - - override def beforeEach(): Unit = { - startActorSystem() - } - - override def afterEach(): Unit = { - shutdownActorSystem() - } - - "Worker" should "register worker address to master when started." in { - - val masterReceiver = createMockMaster() - - val tempTestConf = convertTestConf(getHost, getPort) - - val options = Array( - s"-D$GEARPUMP_CUSTOM_CONFIG_FILE=${tempTestConf.toString}", - s"-D${PREFER_IPV4}=true" - ) ++ getMasterListOption() - - val worker = Util.startProcess(options, - getContextClassPath, - getMainClassName(Worker), - Array.empty) - - try { - masterReceiver.expectMsg(PROCESS_BOOT_TIME, RegisterNewWorker) - - tempTestConf.delete() - } finally { - worker.destroy() - } - } - - "Master" should "accept worker RegisterNewWorker when started" in { - val worker = TestProbe()(getActorSystem) - - val host = "127.0.0.1" - val port = Util.findFreePort().get - - val properties = new Properties() - properties.put(s"${GEARPUMP_CLUSTER_MASTERS}.0", s"$host:$port") - properties.put(s"${GEARPUMP_HOSTNAME}", s"$host") - val masterConfig = ConfigFactory.parseProperties(properties) - .withFallback(TestUtil.MASTER_CONFIG) - Future { - Master.main(masterConfig, Array("-ip", "127.0.0.1", "-port", port.toString)) - } - - val masterProxy = getActorSystem.actorOf( - MasterProxy.props(List(HostPort("127.0.0.1", port))), "mainSpec") - - worker.send(masterProxy, RegisterNewWorker) - worker.expectMsgType[WorkerRegistered](PROCESS_BOOT_TIME) - } - - "Info" should "be started without exception" in { - - val masterReceiver = createMockMaster() - - Future { - org.apache.gearpump.cluster.main.Info.main(masterConfig, Array.empty) - } - - masterReceiver.expectMsg(PROCESS_BOOT_TIME, AppMastersDataRequest) - masterReceiver.reply(AppMastersData(List(AppMasterData(AppMasterActive, 0, "appName")))) - } - - "Kill" should "be started without exception" in { - - val masterReceiver = createMockMaster() - - Future { - Kill.main(masterConfig, Array("-appid", "0")) - } - - masterReceiver.expectMsg(PROCESS_BOOT_TIME, ShutdownApplication(0)) - masterReceiver.reply(ShutdownApplicationResult(Success(0))) - } - - "Replay" should "be started without exception" in { - - val masterReceiver = createMockMaster() - - Future { - Replay.main(masterConfig, Array("-appid", "0")) - } - - masterReceiver.expectMsgType[ResolveAppId](PROCESS_BOOT_TIME) - masterReceiver.reply(ResolveAppIdResult(Success(masterReceiver.ref))) - masterReceiver.expectMsgType[ReplayFromTimestampWindowTrailingEdge](PROCESS_BOOT_TIME) - masterReceiver.reply(ReplayApplicationResult(Success(0))) - } - - "Local" should "be started without exception" in { - val port = Util.findFreePort().get - val options = Array(s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.0=$getHost:$port", - s"-D${Constants.GEARPUMP_HOSTNAME}=$getHost", - s"-D${PREFER_IPV4}=true") - - val local = Util.startProcess(options, - getContextClassPath, - getMainClassName(Local), - Array.empty) - - def retry(times: Int)(fn: => Boolean): Boolean = { - - LOG.info(s"Local Test: Checking whether local port is available, remain times $times ..") - - val result = fn - if (result || times <= 0) { - result - } else { - Thread.sleep(1000) - retry(times - 1)(fn) - } - } - - try { - assert(retry(10)(isPortUsed("127.0.0.1", port)), - "local is not started successfully, as port is not used " + port) - } finally { - local.destroy() - } - } - - "Gear" should "support app|info|kill|shell|replay" in { - - val commands = Array("app", "info", "kill", "shell", "replay") - - assert(Try(Gear.main(Array.empty)).isSuccess, "print help, no throw") - - for (command <- commands) { - assert(Try(Gear.main(Array("-noexist"))).isFailure, - "pass unknown option, throw, command: " + command) - } - - assert(Try(Gear.main(Array("unknownCommand"))).isFailure, "unknown command, throw ") - - val tryThis = Try(Gear.main(Array("unknownCommand", "-noexist"))) - assert(tryThis.isFailure, "unknown command, throw") - } -} - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/daemon/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala ---------------------------------------------------------------------- diff --git a/daemon/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala b/daemon/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala deleted file mode 100644 index e1ba8f6..0000000 --- a/daemon/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.cluster.main - -import scala.concurrent.Await -import scala.concurrent.duration._ - -import akka.actor.{ActorSystem, Props} -import akka.testkit.TestProbe -import com.typesafe.config.Config -import org.scalatest.{FlatSpec, Matchers} - -import org.apache.gearpump.cluster.TestUtil - -class MasterWatcherSpec extends FlatSpec with Matchers { - def config: Config = TestUtil.MASTER_CONFIG - - "MasterWatcher" should "kill itself when can not get a quorum" in { - val system = ActorSystem("ForMasterWatcher", config) - - val actorWatcher = TestProbe()(system) - - val masterWatcher = system.actorOf(Props(classOf[MasterWatcher], "watcher")) - actorWatcher watch masterWatcher - actorWatcher.expectTerminated(masterWatcher, 5.seconds) - system.terminate() - Await.result(system.whenTerminated, Duration.Inf) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/daemon/src/test/scala/org/apache/gearpump/cluster/master/AppManagerSpec.scala ---------------------------------------------------------------------- diff --git a/daemon/src/test/scala/org/apache/gearpump/cluster/master/AppManagerSpec.scala b/daemon/src/test/scala/org/apache/gearpump/cluster/master/AppManagerSpec.scala deleted file mode 100644 index 58e3593..0000000 --- a/daemon/src/test/scala/org/apache/gearpump/cluster/master/AppManagerSpec.scala +++ /dev/null @@ -1,184 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.cluster.master - -import scala.util.Success - -import akka.actor.{Actor, ActorRef, Props} -import akka.testkit.TestProbe -import com.typesafe.config.Config -import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} - -import org.apache.gearpump.cluster.AppMasterToMaster.{AppDataSaved, _} -import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication, SubmitApplication} -import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterRegistered, AppMastersData, AppMastersDataRequest, _} -import org.apache.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ShutdownApplicationResult, SubmitApplicationResult} -import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeInfo, ApplicationState} -import org.apache.gearpump.cluster.master.AppManager._ -import org.apache.gearpump.cluster.master.InMemoryKVService.{GetKV, GetKVSuccess, PutKV, PutKVSuccess} -import org.apache.gearpump.cluster.{TestUtil, _} -import org.apache.gearpump.util.LogUtil - -class AppManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness { - var kvService: TestProbe = null - var haService: TestProbe = null - var appLauncher: TestProbe = null - var appManager: ActorRef = null - private val LOG = LogUtil.getLogger(getClass) - - override def config: Config = TestUtil.DEFAULT_CONFIG - - override def beforeEach(): Unit = { - startActorSystem() - kvService = TestProbe()(getActorSystem) - appLauncher = TestProbe()(getActorSystem) - - appManager = getActorSystem.actorOf(Props(new AppManager(kvService.ref, - new DummyAppMasterLauncherFactory(appLauncher)))) - kvService.expectMsgType[GetKV] - kvService.reply(GetKVSuccess(MASTER_STATE, MasterState(0, Map.empty, Set.empty, Set.empty))) - } - - override def afterEach(): Unit = { - shutdownActorSystem() - } - - "AppManager" should "handle AppMaster message correctly" in { - val appMaster = TestProbe()(getActorSystem) - val appId = 1 - - val register = RegisterAppMaster(appMaster.ref, AppMasterRuntimeInfo(appId, "appName")) - appMaster.send(appManager, register) - appMaster.expectMsgType[AppMasterRegistered] - - appMaster.send(appManager, ActivateAppMaster(appId)) - appMaster.expectMsgType[AppMasterActivated] - } - - "DataStoreService" should "support Put and Get" in { - val appMaster = TestProbe()(getActorSystem) - appMaster.send(appManager, SaveAppData(0, "key", 1)) - kvService.expectMsgType[PutKV] - kvService.reply(PutKVSuccess) - appMaster.expectMsg(AppDataSaved) - - appMaster.send(appManager, GetAppData(0, "key")) - kvService.expectMsgType[GetKV] - kvService.reply(GetKVSuccess("key", 1)) - appMaster.expectMsg(GetAppDataResult("key", 1)) - } - - "AppManager" should "support application submission and shutdown" in { - testClientSubmission(withRecover = false) - } - - "AppManager" should "support application submission and recover if appmaster dies" in { - LOG.info("=================testing recover==============") - testClientSubmission(withRecover = true) - } - - "AppManager" should "handle client message correctly" in { - val mockClient = TestProbe()(getActorSystem) - mockClient.send(appManager, ShutdownApplication(1)) - assert(mockClient.receiveN(1).head.asInstanceOf[ShutdownApplicationResult].appId.isFailure) - - mockClient.send(appManager, ResolveAppId(1)) - assert(mockClient.receiveN(1).head.asInstanceOf[ResolveAppIdResult].appMaster.isFailure) - - mockClient.send(appManager, AppMasterDataRequest(1)) - mockClient.expectMsg(AppMasterData(AppMasterNonExist)) - } - - "AppManager" should "reject the application submission if the app name already existed" in { - val app = TestUtil.dummyApp - val submit = SubmitApplication(app, None, "username") - val client = TestProbe()(getActorSystem) - val appMaster = TestProbe()(getActorSystem) - val worker = TestProbe()(getActorSystem) - val appId = 1 - - client.send(appManager, submit) - - kvService.expectMsgType[PutKV] - appLauncher.expectMsg(LauncherStarted(appId)) - appMaster.send(appManager, RegisterAppMaster(appMaster.ref, - AppMasterRuntimeInfo(appId, app.name))) - appMaster.expectMsgType[AppMasterRegistered] - - client.send(appManager, submit) - assert(client.receiveN(1).head.asInstanceOf[SubmitApplicationResult].appId.isFailure) - } - - def testClientSubmission(withRecover: Boolean): Unit = { - val app = TestUtil.dummyApp - val submit = SubmitApplication(app, None, "username") - val client = TestProbe()(getActorSystem) - val appMaster = TestProbe()(getActorSystem) - val worker = TestProbe()(getActorSystem) - val appId = 1 - - client.send(appManager, submit) - - kvService.expectMsgType[PutKV] - appLauncher.expectMsg(LauncherStarted(appId)) - appMaster.send(appManager, RegisterAppMaster(appMaster.ref, - AppMasterRuntimeInfo(appId, app.name))) - kvService.expectMsgType[PutKV] - appMaster.expectMsgType[AppMasterRegistered] - - client.send(appManager, ResolveAppId(appId)) - client.expectMsg(ResolveAppIdResult(Success(appMaster.ref))) - - client.send(appManager, AppMastersDataRequest) - client.expectMsgType[AppMastersData] - - client.send(appManager, AppMasterDataRequest(appId, false)) - client.expectMsgType[AppMasterData] - - if (!withRecover) { - client.send(appManager, ShutdownApplication(appId)) - client.expectMsg(ShutdownApplicationResult(Success(appId))) - } else { - // Do recovery - getActorSystem.stop(appMaster.ref) - kvService.expectMsgType[GetKV] - val appState = ApplicationState(appId, "application1", 1, app, None, "username", null) - kvService.reply(GetKVSuccess(APP_STATE, appState)) - appLauncher.expectMsg(LauncherStarted(appId)) - } - } -} - -class DummyAppMasterLauncherFactory(test: TestProbe) extends AppMasterLauncherFactory { - - override def props(appId: Int, executorId: Int, app: AppDescription, jar: Option[AppJar], - username: String, master: ActorRef, client: Option[ActorRef]): Props = { - Props(new DummyAppMasterLauncher(test, appId)) - } -} - -class DummyAppMasterLauncher(test: TestProbe, appId: Int) extends Actor { - - test.ref ! LauncherStarted(appId) - override def receive: Receive = { - case any: Any => test.ref forward any - } -} - -case class LauncherStarted(appId: Int) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/daemon/src/test/scala/org/apache/gearpump/cluster/master/InMemoryKVServiceSpec.scala ---------------------------------------------------------------------- diff --git a/daemon/src/test/scala/org/apache/gearpump/cluster/master/InMemoryKVServiceSpec.scala b/daemon/src/test/scala/org/apache/gearpump/cluster/master/InMemoryKVServiceSpec.scala deleted file mode 100644 index 325a484..0000000 --- a/daemon/src/test/scala/org/apache/gearpump/cluster/master/InMemoryKVServiceSpec.scala +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.cluster.master - -import scala.concurrent.duration._ - -import akka.actor.Props -import akka.testkit.TestProbe -import com.typesafe.config.Config -import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} - -import org.apache.gearpump.cluster.master.InMemoryKVService._ -import org.apache.gearpump.cluster.{MasterHarness, TestUtil} - -class InMemoryKVServiceSpec - extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness { - - override def beforeEach(): Unit = { - startActorSystem() - } - - override def afterEach(): Unit = { - shutdownActorSystem() - } - - override def config: Config = TestUtil.MASTER_CONFIG - - "KVService" should "get, put, delete correctly" in { - val system = getActorSystem - val kvService = system.actorOf(Props(new InMemoryKVService())) - val group = "group" - - val client = TestProbe()(system) - - client.send(kvService, PutKV(group, "key", 1)) - client.expectMsg(PutKVSuccess) - - client.send(kvService, PutKV(group, "key", 2)) - client.expectMsg(PutKVSuccess) - - client.send(kvService, GetKV(group, "key")) - client.expectMsg(GetKVSuccess("key", 2)) - - client.send(kvService, DeleteKVGroup(group)) - - // After DeleteGroup, it no longer accept Get and Put message for this group. - client.send(kvService, GetKV(group, "key")) - client.expectNoMsg(3.seconds) - - client.send(kvService, PutKV(group, "key", 3)) - client.expectNoMsg(3.seconds) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/daemon/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala ---------------------------------------------------------------------- diff --git a/daemon/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala b/daemon/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala deleted file mode 100644 index e82dff3..0000000 --- a/daemon/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala +++ /dev/null @@ -1,232 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.cluster.scheduler - -import org.apache.gearpump.cluster.worker.WorkerId - -import scala.concurrent.duration._ - -import akka.actor.{ActorSystem, Props} -import akka.testkit.{ImplicitSender, TestKit, TestProbe} -import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} - -import org.apache.gearpump.cluster.AppMasterToMaster.RequestResource -import org.apache.gearpump.cluster.MasterToAppMaster.ResourceAllocated -import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, WorkerRegistered} -import org.apache.gearpump.cluster.TestUtil -import org.apache.gearpump.cluster.WorkerToMaster.ResourceUpdate -import org.apache.gearpump.cluster.master.Master.MasterInfo -import org.apache.gearpump.cluster.scheduler.Priority.{HIGH, LOW, NORMAL} -import org.apache.gearpump.cluster.scheduler.Scheduler.ApplicationFinished - -class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender - with WordSpecLike with Matchers with BeforeAndAfterAll{ - - def this() = this(ActorSystem("PrioritySchedulerSpec", TestUtil.DEFAULT_CONFIG)) - val appId = 0 - val workerId1: WorkerId = WorkerId(1, 0L) - val workerId2: WorkerId = WorkerId(2, 0L) - val mockAppMaster = TestProbe() - val mockWorker1 = TestProbe() - val mockWorker2 = TestProbe() - - override def afterAll { - TestKit.shutdownActorSystem(system) - } - - "The scheduler" should { - "update resource only when the worker is registered" in { - val scheduler = system.actorOf(Props(classOf[PriorityScheduler])) - scheduler ! ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)) - expectMsg(UpdateResourceFailed(s"ResourceUpdate failed! The worker $workerId1 has not been " + - s"registered into master")) - } - - "drop application's resource requests when the application is removed" in { - val scheduler = system.actorOf(Props(classOf[PriorityScheduler])) - val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, HIGH, Relaxation.ANY) - val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, HIGH, Relaxation.ANY) - scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref) - scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref) - scheduler.tell(ApplicationFinished(appId), mockAppMaster.ref) - scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref) - scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref) - mockAppMaster.expectNoMsg(5.seconds) - } - } - - def sameElement(left: ResourceAllocated, right: ResourceAllocated): Boolean = { - left.allocations.sortBy(_.workerId).sameElements(right.allocations.sortBy(_.workerId)) - } - - "The resource request with higher priority" should { - "be handled first" in { - val scheduler = system.actorOf(Props(classOf[PriorityScheduler])) - val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, LOW, Relaxation.ANY) - val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, NORMAL, Relaxation.ANY) - val request3 = ResourceRequest(Resource(30), WorkerId.unspecified, HIGH, Relaxation.ANY) - - scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref) - scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref) - scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref) - scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref) - scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref) - scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref) - - var expect = ResourceAllocated( - Array(ResourceAllocation(Resource(30), mockWorker1.ref, workerId1))) - mockAppMaster.expectMsgPF(5.seconds) { - case request: ResourceAllocated if sameElement(request, expect) => Unit - } - - expect = ResourceAllocated( - Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1))) - mockAppMaster.expectMsgPF(5.seconds) { - case request: ResourceAllocated if sameElement(request, expect) => Unit - } - - expect = ResourceAllocated( - Array(ResourceAllocation(Resource(40), mockWorker1.ref, workerId1))) - mockAppMaster.expectMsgPF(5.seconds) { - case request: ResourceAllocated if sameElement(request, expect) => Unit - } - - scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref) - scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource.empty), mockWorker1.ref) - scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref) - - expect = ResourceAllocated( - Array(ResourceAllocation(Resource(40), mockWorker2.ref, workerId2))) - mockAppMaster.expectMsgPF(5.seconds) { - case request: ResourceAllocated if sameElement(request, expect) => Unit - } - } - } - - "The resource request which delivered earlier" should { - "be handled first if the priorities are the same" in { - val scheduler = system.actorOf(Props(classOf[PriorityScheduler])) - val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, HIGH, Relaxation.ANY) - val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, HIGH, Relaxation.ANY) - scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref) - scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref) - scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref) - scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref) - - var expect = ResourceAllocated( - Array(ResourceAllocation(Resource(40), mockWorker1.ref, workerId1))) - mockAppMaster.expectMsgPF(5.seconds) { - case request: ResourceAllocated if sameElement(request, expect) => Unit - } - expect = ResourceAllocated( - Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1))) - mockAppMaster.expectMsgPF(5.seconds) { - case request: ResourceAllocated if sameElement(request, expect) => Unit - } - } - } - - "The PriorityScheduler" should { - "handle the resource request with different relaxation" in { - val scheduler = system.actorOf(Props(classOf[PriorityScheduler])) - val request1 = ResourceRequest(Resource(40), workerId2, HIGH, Relaxation.SPECIFICWORKER) - val request2 = ResourceRequest(Resource(20), workerId1, NORMAL, Relaxation.SPECIFICWORKER) - - scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref) - scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref) - scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref) - scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref) - - var expect = ResourceAllocated( - Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1))) - mockAppMaster.expectMsgPF(5.seconds) { - case request: ResourceAllocated if sameElement(request, expect) => Unit - } - - scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref) - scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref) - - expect = ResourceAllocated( - Array(ResourceAllocation(Resource(40), mockWorker2.ref, workerId2))) - mockAppMaster.expectMsgPF(5.seconds) { - case request: ResourceAllocated if sameElement(request, expect) => Unit - } - - val request3 = ResourceRequest( - Resource(30), WorkerId.unspecified, NORMAL, Relaxation.ANY, executorNum = 2) - scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref) - - expect = ResourceAllocated(Array( - ResourceAllocation(Resource(15), mockWorker1.ref, workerId1), - ResourceAllocation(Resource(15), mockWorker2.ref, workerId2))) - mockAppMaster.expectMsgPF(5.seconds) { - case request: ResourceAllocated if sameElement(request, expect) => Unit - } - - // We have to manually update the resource on each worker - scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(65)), mockWorker1.ref) - scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(45)), mockWorker2.ref) - val request4 = ResourceRequest(Resource(60), WorkerId(0, 0L), NORMAL, Relaxation.ONEWORKER) - scheduler.tell(RequestResource(appId, request4), mockAppMaster.ref) - - expect = ResourceAllocated( - Array(ResourceAllocation(Resource(60), mockWorker1.ref, workerId1))) - mockAppMaster.expectMsgPF(5.seconds) { - case request: ResourceAllocated if sameElement(request, expect) => Unit - } - } - } - - "The PriorityScheduler" should { - "handle the resource request with different executor number" in { - val scheduler = system.actorOf(Props(classOf[PriorityScheduler])) - scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref) - scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref) - scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref) - scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref) - - // By default, the request requires only one executor - val request2 = ResourceRequest(Resource(20), WorkerId.unspecified) - scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref) - val allocations2 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated] - assert(allocations2.allocations.length == 1) - assert(allocations2.allocations.head.resource == Resource(20)) - - val request3 = ResourceRequest(Resource(24), WorkerId.unspecified, executorNum = 3) - scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref) - val allocations3 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated] - assert(allocations3.allocations.length == 3) - assert(allocations3.allocations.forall(_.resource == Resource(8))) - - // The total available resource can not satisfy the requirements with executor number - scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(30)), mockWorker1.ref) - scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(30)), mockWorker2.ref) - val request4 = ResourceRequest(Resource(60), WorkerId.unspecified, executorNum = 3) - scheduler.tell(RequestResource(appId, request4), mockAppMaster.ref) - val allocations4 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated] - assert(allocations4.allocations.length == 2) - assert(allocations4.allocations.forall(_.resource == Resource(20))) - - // When new resources are available, the remaining request will be satisfied - scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(40)), mockWorker1.ref) - val allocations5 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated] - assert(allocations5.allocations.length == 1) - assert(allocations4.allocations.forall(_.resource == Resource(20))) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/daemon/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala ---------------------------------------------------------------------- diff --git a/daemon/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala b/daemon/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala deleted file mode 100644 index bf25057..0000000 --- a/daemon/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.cluster.worker - -import scala.concurrent.Await -import scala.concurrent.duration._ - -import akka.actor.{ActorSystem, PoisonPill, Props} -import akka.testkit.TestProbe -import com.typesafe.config.{Config, ConfigFactory} -import org.scalatest._ - -import org.apache.gearpump.cluster.AppMasterToWorker.{ChangeExecutorResource, LaunchExecutor, ShutdownExecutor} -import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, WorkerRegistered} -import org.apache.gearpump.cluster.WorkerToAppMaster.{ExecutorLaunchRejected, ShutdownExecutorFailed, ShutdownExecutorSucceed} -import org.apache.gearpump.cluster.WorkerToMaster.{RegisterNewWorker, RegisterWorker, ResourceUpdate} -import org.apache.gearpump.cluster.master.Master.MasterInfo -import org.apache.gearpump.cluster.scheduler.Resource -import org.apache.gearpump.cluster.{ExecutorJVMConfig, MasterHarness, TestUtil} -import org.apache.gearpump.util.{ActorSystemBooter, ActorUtil, Constants} - -class WorkerSpec extends WordSpec with Matchers with BeforeAndAfterEach with MasterHarness { - override def config: Config = TestUtil.DEFAULT_CONFIG - - val appId = 1 - val workerId: WorkerId = WorkerId(1, 0L) - val executorId = 1 - var masterProxy: TestProbe = null - var mockMaster: TestProbe = null - var client: TestProbe = null - val workerSlots = 50 - - override def beforeEach(): Unit = { - startActorSystem() - mockMaster = TestProbe()(getActorSystem) - masterProxy = TestProbe()(getActorSystem) - client = TestProbe()(getActorSystem) - } - - override def afterEach(): Unit = { - shutdownActorSystem() - } - - "The new started worker" should { - "kill itself if no response from Master after registering" in { - val worker = getActorSystem.actorOf(Props(classOf[Worker], mockMaster.ref)) - mockMaster watch worker - mockMaster.expectMsg(RegisterNewWorker) - mockMaster.expectTerminated(worker, 60.seconds) - } - } - - "Worker" should { - "init its resource from the gearpump config" in { - val config = ConfigFactory.parseString(s"${Constants.GEARPUMP_WORKER_SLOTS} = $workerSlots"). - withFallback(TestUtil.DEFAULT_CONFIG) - val workerSystem = ActorSystem("WorkerSystem", config) - val worker = workerSystem.actorOf(Props(classOf[Worker], mockMaster.ref)) - mockMaster watch worker - mockMaster.expectMsg(RegisterNewWorker) - - worker.tell(WorkerRegistered(workerId, MasterInfo(mockMaster.ref)), mockMaster.ref) - mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(workerSlots))) - - worker.tell( - UpdateResourceFailed("Test resource update failed", new Exception()), mockMaster.ref) - mockMaster.expectTerminated(worker, 5.seconds) - workerSystem.terminate() - Await.result(workerSystem.whenTerminated, Duration.Inf) - } - } - - "Worker" should { - "update its remaining resource when launching and shutting down executors" in { - val worker = getActorSystem.actorOf(Props(classOf[Worker], masterProxy.ref)) - masterProxy.expectMsg(RegisterNewWorker) - - worker.tell(WorkerRegistered(workerId, MasterInfo(mockMaster.ref)), mockMaster.ref) - mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(100))) - - val executorName = ActorUtil.actorNameForExecutor(appId, executorId) - // This is an actor path which the ActorSystemBooter will report back to, - // not needed in this test - val reportBack = "dummy" - val executionContext = ExecutorJVMConfig(Array.empty[String], - getActorSystem.settings.config.getString(Constants.GEARPUMP_APPMASTER_ARGS).split(" "), - classOf[ActorSystemBooter].getName, Array(executorName, reportBack), None, - username = "user") - - // Test LaunchExecutor - worker.tell(LaunchExecutor(appId, executorId, Resource(101), executionContext), - mockMaster.ref) - mockMaster.expectMsg(ExecutorLaunchRejected("There is no free resource on this machine")) - - worker.tell(LaunchExecutor(appId, executorId, Resource(5), executionContext), mockMaster.ref) - mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(95))) - - worker.tell(ChangeExecutorResource(appId, executorId, Resource(2)), client.ref) - mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(98))) - - // Test terminationWatch - worker.tell(ShutdownExecutor(appId, executorId, "Test shut down executor"), client.ref) - mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(100))) - client.expectMsg(ShutdownExecutorSucceed(1, 1)) - - worker.tell(ShutdownExecutor(appId, executorId + 1, "Test shut down executor"), client.ref) - client.expectMsg(ShutdownExecutorFailed( - s"Can not find executor ${executorId + 1} for app $appId")) - - mockMaster.ref ! PoisonPill - masterProxy.expectMsg(RegisterWorker(workerId)) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/dev-tools/build ---------------------------------------------------------------------- diff --git a/dev-tools/build b/dev-tools/build new file mode 100755 index 0000000..c43d35f --- /dev/null +++ b/dev-tools/build @@ -0,0 +1,72 @@ +#!/bin/bash +usage() { +cat << EOF +usage: $0 options commands +OPTIONS: + -h Show this message +COMMANDS: + all: normal build of all components + clean: clean all sbt artifacts + publish: publish to local + publish-m2: publish to local .m2 + reset: remove all idea project files + scrub: remove all untracked git files + test: run coverage and test +EOF +} +while getopts âhâ OPTION +do + case "${OPTION}" in + h) + usage + exit 1 + ;; + ?) + usage + exit + ;; + esac +done +shift $((OPTIND-1)) +if [ $# = 0 ]; then + echo sbt assembly pack pack-archive + sbt assembly pack pack-archive +else + while (( "$#" )); do + case "$1" in + all) + echo sbt assembly pack pack-archive + sbt assembly pack pack-archive + ;; + clean) + echo rm -rf $(find . -maxdepth 4 -name target -o -name project|grep -v '^./project'|sed '/project\/project/d'|sed '/project\/target/d') + rm -rf $(find . -maxdepth 4 -name target -o -name project|grep -v '^./project'|sed '/project\/project/d'|sed '/project\/target/d') + rm -rf project/project project/target output + ;; + publish) + echo sbt publish-local + sbt publish-local + ;; + publish-m2) + echo sbt publish-m2 + sbt publish-m2 + ;; + reset) + echo rm -rf $(find . -name .idea -type d) + rm -rf $(find . -name .idea -type d) + ;; + scrub) + git clean -df + ;; + test) + echo sbt coverage test + sbt coverage test + ;; + *) + usage + exit + ;; + esac + shift + done +fi http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/dev-tools/create_apache_bin_release.sh ---------------------------------------------------------------------- diff --git a/dev-tools/create_apache_bin_release.sh b/dev-tools/create_apache_bin_release.sh new file mode 100755 index 0000000..b35f98f --- /dev/null +++ b/dev-tools/create_apache_bin_release.sh @@ -0,0 +1,115 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +usage() { +cat << EOF +usage: $0 options +OPTIONS: + -h Show this message + -k GPG_KEY + -p GPG_PASSPHRASE + -r RAT Tool + -v Verify signed release + -c Clean before building +EOF +} +VERIFY=false +RUN_RAT=false +CLEAN=false +RELEASE_VERSION=$(grep '^version' version.sbt|sed 's/^.*"\(.*\)"$/\1/') +GEARPUMP_ARCHIVE_FOLDER=gearpump-2.11-${RELEASE_VERSION} +GEARPUMP_SCALA_VERSION=gearpump_2.11-${RELEASE_VERSION} +GEARPUMP_RELEASE_VERSION=${GEARPUMP_SCALA_VERSION}-incubating +while getopts âhcrvk:p:â OPTION +do + case "${OPTION}" in + h) + usage + exit 1 + ;; + k) + GPG_KEY=$OPTARG + ;; + p) + GPG_PASSPHRASE=$OPTARG + ;; + r) + RUN_RAT=true + ;; + v) + VERIFY=true + ;; + c) + CLEAN=true + ;; + ?) + usage + exit + ;; + esac +done +shift $((OPTIND-1)) +if [ $VERIFY = "true" ]; then + gpg --import KEYS 2>/dev/null + echo Verifying ${GEARPUMP_RELEASE_VERSION}-bin.tgz.asc + gpg --verify ${GEARPUMP_RELEASE_VERSION}-bin.tgz.asc + exit 0 +fi +if [ $RUN_RAT = "true" ]; then + java -jar ~/rat/trunk/apache-rat/target/apache-rat-0.12-SNAPSHOT.jar -A -f -E ./.rat-excludes -d . | grep '^== File' + exit 0 +fi +if [ -z $GPG_KEY ]; then + echo Missing -k option + usage + exit 1 +fi +if [ -z $GPG_PASSPHRASE ]; then + echo Missing -p option + usage + exit 1 +fi + +if [ $CLEAN = "true" ]; then + dev-tools/build clean reset scrub +fi +dev-tools/build all +PACKED_ARCHIVE=output/target/gearpump-2.11-${RELEASE_VERSION}.tar.gz +if [ ! -f $PACKED_ARCHIVE ]; then + echo "missing $PACKED_ARCHIVE" + echo "You must run 'sbt assembly pack pack-archive' first" + exit 1 +fi +mkdir tmp +cd tmp +tar xzf ../$PACKED_ARCHIVE +mv $GEARPUMP_ARCHIVE_FOLDER/* . +rmdir $GEARPUMP_ARCHIVE_FOLDER +cp ../NOTICE ../README.md ../CHANGELOG.md ../DISCLAIMER . +cp ../LICENSE.bin LICENSE +cp -r ../licenses . +rsync -a ../tmp/ $GEARPUMP_RELEASE_VERSION +tar czf ../${GEARPUMP_RELEASE_VERSION}-bin.tgz $GEARPUMP_RELEASE_VERSION +echo Signing ../${GEARPUMP_RELEASE_VERSION}-bin.tgz +echo $GPG_PASSPHRASE | gpg --batch --default-key $GPG_KEY --passphrase-fd 0 --armour --output ../${GEARPUMP_RELEASE_VERSION}-bin.tgz.asc --detach-sig ../${GEARPUMP_RELEASE_VERSION}-bin.tgz +gpg --print-md MD5 ../${GEARPUMP_RELEASE_VERSION}-bin.tgz > ../${GEARPUMP_RELEASE_VERSION}-bin.tgz.md5 +gpg --print-md SHA1 ../${GEARPUMP_RELEASE_VERSION}-bin.tgz > ../${GEARPUMP_RELEASE_VERSION}-bin.tgz.sha +cd .. +rm -rf tmp + + + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/dev-tools/create_apache_source_release.sh ---------------------------------------------------------------------- diff --git a/dev-tools/create_apache_source_release.sh b/dev-tools/create_apache_source_release.sh index 5ef2a9f..8961ee9 100755 --- a/dev-tools/create_apache_source_release.sh +++ b/dev-tools/create_apache_source_release.sh @@ -77,8 +77,13 @@ if [ -z $GPG_PASSPHRASE ]; then exit 1 fi +dev-tools/build clean reset scrub echo .git > exclude-list echo .DS_Store >> exclude-list +for i in $(ls licenses/*|grep -v LICENSE-jquery.txt|grep -v LICENSE-bootstrap.txt); do + echo $i >> exclude-list +done +cat exclude-list rsync -a --exclude-from exclude-list ../incubator-gearpump/ $GEARPUMP_RELEASE_VERSION tar czf ${GEARPUMP_RELEASE_VERSION}-src.tgz $GEARPUMP_RELEASE_VERSION echo Signing ${GEARPUMP_RELEASE_VERSION}-src.tgz http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/dev-tools/dependencies.sh ---------------------------------------------------------------------- diff --git a/dev-tools/dependencies.sh b/dev-tools/dependencies.sh new file mode 100755 index 0000000..f67ad3a --- /dev/null +++ b/dev-tools/dependencies.sh @@ -0,0 +1,51 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# +# This will generate a file LICENSE.dependencies which can be used as input to the LICENSE.bin (binary release) +# Some additional work is needed to automate the LICENSE.bin file generation - and it may be difficult to do so. +# LICENSE.dependencies does save time by categorizing dependencies under the different licenses +# +sbt dependencyLicenseInfo | tee dependencyInfo +cat dependencyInfo | sed -E "s/"$'\E'"\[([0-9]{1,3}((;[0-9]{1,3})*)?)?[m|K]//g" | grep '^\[info\]' | grep -v '^\[info\] Updating'|grep -v '^\[info\] Resolving'|grep -v '^\[info\] Done'|grep -v '^\[info\] Loading '|grep -v '^\[info\] Set ' > licenses.out +cat licenses.out | grep '\[info\] [A-Z]' | sed 's/^\[info\] //' | sort | uniq > license.types +# add a space after 'No license specified' +sed -n '/^\[info\] No license specified$/ { += +p +}' licenses.out | grep -v '^\[info\]' > lines +cat lines | sort -nr > lines1 +mv lines1 lines +for i in $(<lines);do +echo ex -sc "'"${i}'i|[info] '"'" -cx licenses.out > cmd +sh cmd +done + +rm -f LICENSE.dependencies +touch LICENSE.dependencies +cat license.types | while read LINE; do + echo cat licenses.out \| sed "'"'/^\[info\] '$LINE'$/,/^\[info\] $/!d;//d'"'" \| sort \| uniq > cmd + echo "$LINE" >> LICENSE.dependencies + sh cmd >> LICENSE.dependencies + echo ' ' >> LICENSE.dependencies +done +cat LICENSE.dependencies | sed 's/^\[info\] //' > LICENSE.d +mv LICENSE.d LICENSE.dependencies + +#cleanup +rm -f cmd license.types licenses.out lines dependencyInfo http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/docs/build_doc.sh ---------------------------------------------------------------------- diff --git a/docs/build_doc.sh b/docs/build_doc.sh index da73dc6..a3e70de 100755 --- a/docs/build_doc.sh +++ b/docs/build_doc.sh @@ -1,4 +1,21 @@ #!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + CURDIR=`pwd` CURDIRNAME=`basename $CURDIR` @@ -47,8 +64,10 @@ export BUILD_API=$2 # render file templates echo "Rendering file templates using mustache..." TEMP_DIR="tmp" -rm -rf $TEMP_DIR -copy_dir docs $TEMP_DIR +if [ -d "$TEMP_DIR" ]; then + rm -rf "$TEMP_DIR" +fi +copy_dir contents $TEMP_DIR render_files version.yml "$TEMP_DIR/introduction $TEMP_DIR/dev $TEMP_DIR/deployment $TEMP_DIR/api $TEMP_DIR/index.md" # generate site documents http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/docs/contents/api/java.md ---------------------------------------------------------------------- diff --git a/docs/contents/api/java.md b/docs/contents/api/java.md new file mode 100644 index 0000000..3b94f91 --- /dev/null +++ b/docs/contents/api/java.md @@ -0,0 +1 @@ +Placeholder http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/docs/contents/api/scala.md ---------------------------------------------------------------------- diff --git a/docs/contents/api/scala.md b/docs/contents/api/scala.md new file mode 100644 index 0000000..3b94f91 --- /dev/null +++ b/docs/contents/api/scala.md @@ -0,0 +1 @@ +Placeholder http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/docs/contents/deployment/deployment-configuration.md ---------------------------------------------------------------------- diff --git a/docs/contents/deployment/deployment-configuration.md b/docs/contents/deployment/deployment-configuration.md new file mode 100644 index 0000000..1dadbd7 --- /dev/null +++ b/docs/contents/deployment/deployment-configuration.md @@ -0,0 +1,84 @@ +## Master and Worker configuration + +Master and Worker daemons will only read configuration from `conf/gear.conf`. + +Master reads configuration from section master and gearpump: + + :::bash + master { + } + gearpump{ + } + + +Worker reads configuration from section worker and gearpump: + + :::bash + worker { + } + gearpump{ + } + + +## Configuration for user submitted application job + +For user application job, it will read configuration file `gear.conf` and `application.conf` from classpath, while `application.conf` has higher priority. +The default classpath contains: + +1. `conf/` +2. current working directory. + +For example, you can put a `application.conf` on your working directory, and then it will be effective when you submit a new job application. + +## Logging + +To change the log level, you need to change both `gear.conf`, and `log4j.properties`. + +### To change the log level for master and worker daemon + +Please change `log4j.rootLevel` in `log4j.properties`, `gearpump-master.akka.loglevel` and `gearpump-worker.akka.loglevel` in `gear.conf`. + +### To change the log level for application job + +Please change `log4j.rootLevel` in `log4j.properties`, and `akka.loglevel` in `gear.conf` or `application.conf`. + +## Gearpump Default Configuration + +This is the default configuration for `gear.conf`. + +| config item | default value | description | +| -------------- | -------------- | ---------------- | +| gearpump.hostname | "127.0.0.1" | hostname of current machine. If you are using local mode, then set this to 127.0.0.1. If you are using cluster mode, make sure this hostname can be accessed by other machines. | +| gearpump.cluster.masters | ["127.0.0.1:3000"] | Config to set the master nodes of the cluster. If there are multiple master in the list, then the master nodes runs in HA mode. For example, you may start three master, on node1: `bin/master -ip node1 -port 3000`, on node2: `bin/master -ip node2 -port 3000`, on node3: `bin/master -ip node3 -port 3000`, then you need to set `gearpump.cluster.masters = ["node1:3000","node2:3000","node3:3000"]` | +| gearpump.task-dispatcher | "gearpump.shared-thread-pool-dispatcher" | default dispatcher for task actor | +| gearpump.metrics.enabled | true | flag to enable the metrics system | +| gearpump.metrics.sample-rate | 1 | We will take one sample every `gearpump.metrics.sample-rate` data points. Note it may have impact that the statistics on UI portal is not accurate. Change it to 1 if you want accurate metrics in UI | +| gearpump.metrics.report-interval-ms | 15000 | we will report once every 15 seconds | +| gearpump.metrics.reporter | "akka" | available value: "graphite", "akka", "logfile" which write the metrics data to different places. | +| gearpump.retainHistoryData.hours | 72 | max hours of history data to retain, Note: Due to implementation limitation(we store all history in memory), please don't set this to too big which may exhaust memory. | +| gearpump.retainHistoryData.intervalMs | 3600000 | time interval between two data points for history data (unit: ms). Usually this is set to a big value so that we only store coarse-grain data | +| gearpump.retainRecentData.seconds | 300 | max seconds of recent data to retain. This is for the fine-grain data | +| gearpump.retainRecentData.intervalMs | 15000 | time interval between two data points for recent data (unit: ms) | +| gearpump.log.daemon.dir | "logs" | The log directory for daemon processes(relative to current working directory) | +| gearpump.log.application.dir | "logs" | The log directory for applications(relative to current working directory) | +| gearpump.serializers | a map | custom serializer for streaming application, e.g. `"scala.Array" = ""` | +| gearpump.worker.slots | 1000 | How many slots each worker contains | +| gearpump.appmaster.vmargs | "-server -Xss1M -XX:+HeapDumpOnOutOfMemoryError -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=80 -XX:+UseParNewGC -XX:NewRatio=3 -Djava.rmi.server.hostname=localhost" | JVM arguments for AppMaster | +| gearpump.appmaster.extraClasspath | "" | JVM default class path for AppMaster | +| gearpump.executor.vmargs | "-server -Xss1M -XX:+HeapDumpOnOutOfMemoryError -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=80 -XX:+UseParNewGC -XX:NewRatio=3 -Djava.rmi.server.hostname=localhost" | JVM arguments for executor | +| gearpump.executor.extraClasspath | "" | JVM default class path for executor | +| gearpump.jarstore.rootpath | "jarstore/" | Define where the submitted jar file will be stored. This path follows the hadoop path schema. For HDFS, use `hdfs://host:port/path/`, and HDFS HA, `hdfs://namespace/path/`; if you want to store on master nodes, then use local directory. `jarstore.rootpath = "jarstore/"` will point to relative directory where master is started. `jarstore.rootpath = "/jarstore/"` will point to absolute directory on master server | +| gearpump.scheduling.scheduler-class |"org.apache.gearpump.cluster.scheduler.PriorityScheduler" | Class to schedule the applications. | +| gearpump.services.host | "127.0.0.1" | dashboard UI host address | +| gearpump.services.port | 8090 | dashboard UI host port | +| gearpump.netty.buffer-size | 5242880 | netty connection buffer size | +| gearpump.netty.max-retries | 30 | maximum number of retries for a netty client to connect to remote host | +| gearpump.netty.base-sleep-ms | 100 | base sleep time for a netty client to retry a connection. Actual sleep time is a multiple of this value | +| gearpump.netty.max-sleep-ms | 1000 | maximum sleep time for a netty client to retry a connection | +| gearpump.netty.message-batch-size | 262144 | netty max batch size | +| gearpump.netty.flush-check-interval | 10 | max flush interval for the netty layer, in milliseconds | +| gearpump.netty.dispatcher | "gearpump.shared-thread-pool-dispatcher" | default dispatcher for netty client and server | +| gearpump.shared-thread-pool-dispatcher | default Dispatcher with "fork-join-executor" | default shared thread pool dispatcher | +| gearpump.single-thread-dispatcher | PinnedDispatcher | default single thread dispatcher | +| gearpump.serialization-framework | "org.apache.gearpump.serializer.FastKryoSerializationFramework" | Gearpump has built-in serialization framework using Kryo. Users are allowed to use a different serialization framework, like Protobuf. See `org.apache.gearpump.serializer.FastKryoSerializationFramework` to find how a custom serialization framework can be defined | +| worker.executor-share-same-jvm-as-worker | false | whether the executor actor is started in the same jvm(process) from which running the worker actor, the intention of this setting is for the convenience of single machine debugging, however, the app jar need to be added to the worker's classpath when you set it true and have a 'real' worker in the cluster | \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/docs/contents/deployment/deployment-docker.md ---------------------------------------------------------------------- diff --git a/docs/contents/deployment/deployment-docker.md b/docs/contents/deployment/deployment-docker.md new file mode 100644 index 0000000..c71ed9d --- /dev/null +++ b/docs/contents/deployment/deployment-docker.md @@ -0,0 +1,5 @@ +## Gearpump Docker Container + +There is pre-built docker container available at [Docker Repo](https://hub.docker.com/r/gearpump/gearpump/) + +Check the documents there to find how to launch a Gearpump cluster in one line. http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/docs/contents/deployment/deployment-ha.md ---------------------------------------------------------------------- diff --git a/docs/contents/deployment/deployment-ha.md b/docs/contents/deployment/deployment-ha.md new file mode 100644 index 0000000..9e907c0 --- /dev/null +++ b/docs/contents/deployment/deployment-ha.md @@ -0,0 +1,75 @@ +To support HA, we allow to start master on multiple nodes. They will form a quorum to decide consistency. For example, if we start master on 5 nodes and 2 nodes are down, then the cluster is still consistent and functional. + +Here are the steps to enable the HA mode: + +### 1. Configure. + +#### Select master machines + +Distribute the package to all nodes. Modify `conf/gear.conf` on all nodes. You MUST configure + + :::bash + gearpump.hostname + +to make it point to your hostname(or ip), and + + :::bash + gearpump.cluster.masters + +to a list of master nodes. For example, if I have 3 master nodes (node1, node2, and node3), then the `gearpump.cluster.masters` can be set as + + :::bash + gearpump.cluster { + masters = ["node1:3000", "node2:3000", "node3:3000"] + } + + +#### Configure distributed storage to store application jars. +In `conf/gear.conf`, For entry `gearpump.jarstore.rootpath`, please choose the storage folder for application jars. You need to make sure this jar storage is highly available. We support two storage systems: + + 1). HDFS + + You need to configure the `gearpump.jarstore.rootpath` like this + + :::bash + hdfs://host:port/path/ + + + For HDFS HA, + + :::bash + hdfs://namespace/path/ + + + 2). Shared NFS folder + + First you need to map the NFS directory to local directory(same path) on all machines of master nodes. +Then you need to set the `gearpump.jarstore.rootpath` like this: + + :::bash + file:///your_nfs_mapping_directory + + + 3). If you don't set this value, we will use the local directory of master node. + NOTE! There is no HA guarantee in this case, which means we are unable to recover running applications when master goes down. + +### 2. Start Daemon. + +On node1, node2, node3, Start Master + + :::bash + ## on node1 + bin/master -ip node1 -port 3000 + + ## on node2 + bin/master -ip node2 -port 3000 + + ## on node3 + bin/master -ip node3 -port 3000 + + +### 3. Done! + +Now you have a highly available HA cluster. You can kill any node, the master HA will take effect. + +**NOTE**: It can take up to 15 seconds for master node to fail-over. You can change the fail-over timeout time by adding config in `gear.conf` `gearpump-master.akka.cluster.auto-down-unreachable-after=10s` or set it to a smaller value http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/docs/contents/deployment/deployment-local.md ---------------------------------------------------------------------- diff --git a/docs/contents/deployment/deployment-local.md b/docs/contents/deployment/deployment-local.md new file mode 100644 index 0000000..81e6029 --- /dev/null +++ b/docs/contents/deployment/deployment-local.md @@ -0,0 +1,34 @@ +You can start the Gearpump service in a single JVM(local mode), or in a distributed cluster(cluster mode). To start the cluster in local mode, you can use the local /local.bat helper scripts, it is very useful for developing or troubleshooting. + +Below are the steps to start a Gearpump service in **Local** mode: + +### Step 1: Get your Gearpump binary ready +To get your Gearpump service running in local mode, you first need to have a Gearpump distribution binary ready. +Please follow [this guide](get-gearpump-distribution) to have the binary. + +### Step 2: Start the cluster +You can start a local mode cluster in single line + + :::bash + ## start the master and 2 workers in single JVM. The master will listen on 3000 + ## you can Ctrl+C to kill the local cluster after you finished the startup tutorial. + bin/local + + +**NOTE:** You may need to execute `chmod +x bin/*` in shell to make the script file `local` executable. + +**NOTE:** You can change the default port by changing config `gearpump.cluster.masters` in `conf/gear.conf`. + +**NOTE: Change the working directory**. Log files by default will be generated under current working directory. So, please "cd" to required working directly before running the shell commands. + +**NOTE: Run as Daemon**. You can run it as a background process. For example, use [nohup](http://linux.die.net/man/1/nohup) on Linux. + +### Step 3: Start the Web UI server +Open another shell, + + :::bash + bin/services + +You can manage the applications in UI [http://127.0.0.1:8090](http://127.0.0.1:8090) or by [Command Line tool](../introduction/commandline). +The default username and password is "admin:admin", you can check +[UI Authentication](../deployment/deployment-ui-authentication) to find how to manage users. http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/docs/contents/deployment/deployment-msg-delivery.md ---------------------------------------------------------------------- diff --git a/docs/contents/deployment/deployment-msg-delivery.md b/docs/contents/deployment/deployment-msg-delivery.md new file mode 100644 index 0000000..53e8c2e --- /dev/null +++ b/docs/contents/deployment/deployment-msg-delivery.md @@ -0,0 +1,60 @@ +## How to deploy for At Least Once Message Delivery? + +As introduced in the [What is At Least Once Message Delivery](../introduction/message-delivery#what-is-at-least-once-message-delivery), Gearpump has a built in KafkaSource. To get at least once message delivery, users should deploy a Kafka cluster as the offset store along with the Gearpump cluster. + +Here's an example to deploy a local Kafka cluster. + +1. download the latest Kafka from the official website and extract to a local directory (`$KAFKA_HOME`) + +2. Boot up the single-node Zookeeper instance packaged with Kafka. + + :::bash + $KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties + + +3. Start a Kafka broker + + :::bash + $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/kafka.properties + + +4. When creating a offset store for `KafkaSource`, set the zookeeper connect string to `localhost:2181` and broker list to `localhost:9092` in `KafkaStorageFactory`. + + :::scala + val offsetStorageFactory = new KafkaStorageFactory("localhost:2181", "localhost:9092") + val source = new KafkaSource("topic1", "localhost:2181", offsetStorageFactory) + + +## How to deploy for Exactly Once Message Delivery? + +Exactly Once Message Delivery requires both an offset store and a checkpoint store. For the offset store, a Kafka cluster should be deployed as in the previous section. As for the checkpoint store, Gearpump has built-in support for Hadoop file systems, like HDFS. Hence, users should deploy a HDFS cluster alongside the Gearpump cluster. + +Here's an example to deploy a local HDFS cluster. + +1. download Hadoop 2.6 from the official website and extracts it to a local directory `HADOOP_HOME` + +2. add following configuration to `$HADOOP_HOME/etc/core-site.xml` + + :::xml + <configuration> + <property> + <name>fs.defaultFS</name> + <value>hdfs://localhost:9000</value> + </property> + </configuration> + + +3. start HDFS + + :::bash + $HADOOP_HOME/sbin/start-dfs.sh + + +4. When creating a `HadoopCheckpointStore`, set the hadoop configuration as in the `core-site.xml` + + :::scala + val hadoopConfig = new Configuration + hadoopConfig.set("fs.defaultFS", "hdfs://localhost:9000") + val checkpointStoreFactory = new HadoopCheckpointStoreFactory("MessageCount", hadoopConfig, new FileSizeRotation(1000)) + + \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/docs/contents/deployment/deployment-resource-isolation.md ---------------------------------------------------------------------- diff --git a/docs/contents/deployment/deployment-resource-isolation.md b/docs/contents/deployment/deployment-resource-isolation.md new file mode 100644 index 0000000..ee47802 --- /dev/null +++ b/docs/contents/deployment/deployment-resource-isolation.md @@ -0,0 +1,112 @@ +CGroup (abbreviated from control groups) is a Linux kernel feature to limit, account, and isolate resource usage (CPU, memory, disk I/O, etc.) of process groups.In Gearpump, we use cgroup to manage CPU resources. + +## Start CGroup Service + +CGroup feature is only supported by Linux whose kernel version is larger than 2.6.18. Please also make sure the SELinux is disabled before start CGroup. + +The following steps are supposed to be executed by root user. + +1. Check `/etc/cgconfig.conf` exist or not. If not exists, please `yum install libcgroup`. + +2. Run following command to see whether the **cpu** subsystem is already mounted to the file system. + + :::bash + lssubsys -m + + Each subsystem in CGroup will have a corresponding mount file path in local file system. For example, the following output shows that **cpu** subsystem is mounted to file path `/sys/fs/cgroup/cpu` + + :::bash + cpu /sys/fs/cgroup/cpu + net_cls /sys/fs/cgroup/net_cls + blkio /sys/fs/cgroup/blkio + perf_event /sys/fs/cgroup/perf_event + + +3. If you want to assign permission to user **gear** to launch Gearpump Worker and applications with resource isolation enabled, you need to check gear's uid and gid in `/etc/passwd` file, let's take **500** for example. + +4. Add following content to `/etc/cgconfig.conf` + + + # The mount point of cpu subsystem. + # If your system already mounted it, this segment should be eliminated. + mount { + cpu = /cgroup/cpu; + } + + # Here the group name "gearpump" represents a node in CGroup's hierarchy tree. + # When the CGroup service is started, there will be a folder generated under the mount point of cpu subsystem, + # whose name is "gearpump". + + group gearpump { + perm { + task { + uid = 500; + gid = 500; + } + admin { + uid = 500; + gid = 500; + } + } + cpu { + } + } + + + Please note that if the output of step 2 shows that **cpu** subsystem is already mounted, then the `mount` segment should not be included. + +4. Then Start cgroup service + + :::bash + sudo service cgconfig restart + + +5. There should be a folder **gearpump** generated under the mount point of cpu subsystem and its owner is **gear:gear**. + +6. Repeat the above-mentioned steps on each machine where you want to launch Gearpump. + +## Enable Cgroups in Gearpump +1. Login into the machine which has CGroup prepared with user **gear**. + + :::bash + ssh gear@node + + +2. Enter into Gearpump's home folder, edit gear.conf under folder `${GEARPUMP_HOME}/conf/` + + :::bash + gearpump.worker.executor-process-launcher = "org.apache.gearpump.cluster.worker.CGroupProcessLauncher" + + gearpump.cgroup.root = "gearpump" + + + Please note the gearpump.cgroup.root **gearpump** must be consistent with the group name in /etc/cgconfig.conf. + +3. Repeat the above-mentioned steps on each machine where you want to launch Gearpump + +4. Start the Gearpump cluster, please refer to [Deploy Gearpump in Standalone Mode](deployment-standalone) + +## Launch Application From Command Line +1. Login into the machine which has Gearpump distribution. + +2. Enter into Gearpump's home folder, edit gear.conf under folder `${GEARPUMP_HOME}/conf/` + + :::bash + gearpump.cgroup.cpu-core-limit-per-executor = ${your_preferred_int_num} + + + Here the configuration is the number of CPU cores per executor can use and -1 means no limitation + +3. Submit application + + :::bash + bin/gear app -jar examples/sol-{{SCALA_BINARY_VERSION}}-{{GEARPUMP_VERSION}}-assembly.jar -streamProducer 10 -streamProcessor 10 + + +4. Then you can run command `top` to monitor the cpu usage. + +## Launch Application From Dashboard +If you want to submit the application from dashboard, by default the `gearpump.cgroup.cpu-core-limit-per-executor` is inherited from Worker's configuration. You can provide your own conf file to override it. + +## Limitations +Windows and Mac OS X don't support CGroup, so the resource isolation will not work even if you turn it on. There will not be any limitation for single executor's cpu usage. http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/docs/contents/deployment/deployment-security.md ---------------------------------------------------------------------- diff --git a/docs/contents/deployment/deployment-security.md b/docs/contents/deployment/deployment-security.md new file mode 100644 index 0000000..e20fc67 --- /dev/null +++ b/docs/contents/deployment/deployment-security.md @@ -0,0 +1,80 @@ +Until now Gearpump supports deployment in a secured Yarn cluster and writing to secured HBase, where "secured" means Kerberos enabled. +Further security related feature is in progress. + +## How to launch Gearpump in a secured Yarn cluster +Suppose user `gear` will launch gearpump on YARN, then the corresponding principal `gear` should be created in KDC server. + +1. Create Kerberos principal for user `gear`, on the KDC machine + + :::bash + sudo kadmin.local + + In the kadmin.local or kadmin shell, create the principal + + :::bash + kadmin: addprinc gear/[email protected] + + Remember that user `gear` must exist on every node of Yarn. + +2. Upload the gearpump-{{SCALA_BINARY_VERSION}}-{{GEARPUMP_VERSION}}.zip to remote HDFS Folder, suggest to put it under `/usr/lib/gearpump/gearpump-{{SCALA_BINARY_VERSION}}-{{GEARPUMP_VERSION}}.zip` + +3. Create HDFS folder /user/gear/, make sure all read-write rights are granted for user `gear` + + :::bash + drwxr-xr-x - gear gear 0 2015-11-27 14:03 /user/gear + + +4. Put the YARN configurations under classpath. + Before calling `yarnclient launch`, make sure you have put all yarn configuration files under classpath. Typically, you can just copy all files under `$HADOOP_HOME/etc/hadoop` from one of the YARN cluster machine to `conf/yarnconf` of gearpump. `$HADOOP_HOME` points to the Hadoop installation directory. + +5. Get Kerberos credentials to submit the job: + + :::bash + kinit gearpump/[email protected] + + + Here you can login with keytab or password. Please refer Kerberos's document for details. + + :::bash + yarnclient launch -package /usr/lib/gearpump/gearpump-{{SCALA_BINARY_VERSION}}-{{GEARPUMP_VERSION}}.zip + + +## How to write to secured HBase +When the remote HBase is security enabled, a kerberos keytab and the corresponding principal name need to be +provided for the gearpump-hbase connector. Specifically, the `UserConfig` object passed into the HBaseSink should contain +`{("gearpump.keytab.file", "\\$keytab"), ("gearpump.kerberos.principal", "\\$principal")}`. example code of writing to secured HBase: + + :::scala + val principal = "gearpump/[email protected]" + val keytabContent = Files.toByteArray(new File("path_to_keytab_file")) + val appConfig = UserConfig.empty + .withString("gearpump.kerberos.principal", principal) + .withBytes("gearpump.keytab.file", keytabContent) + val sink = new HBaseSink(appConfig, "$tableName") + val sinkProcessor = DataSinkProcessor(sink, "$sinkNum") + val split = Processor[Split]("$splitNum") + val computation = split ~> sinkProcessor + val application = StreamApplication("HBase", Graph(computation), UserConfig.empty) + + +Note here the keytab file set into config should be a byte array. + +## Future Plan + +### More external components support +1. HDFS +2. Kafka + +### Authentication(Kerberos) +Since Gearpumpâs Master-Worker structure is similar to HDFSâs NameNode-DataNode and Yarnâs ResourceManager-NodeManager, we may follow the way they use. + +1. User creates kerberos principal and keytab for Gearpump. +2. Deploy the keytab files to all the cluster nodes. +3. Configure Gearpumpâs conf file, specify kerberos principal and local keytab file location. +4. Start Master and Worker. + +Every application has a submitter/user. We will separate the application from different users, like different log folders for different applications. +Only authenticated users can submit the application to Gearpump's Master. + +### Authorization +Hopefully more on this soon http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/docs/contents/deployment/deployment-standalone.md ---------------------------------------------------------------------- diff --git a/docs/contents/deployment/deployment-standalone.md b/docs/contents/deployment/deployment-standalone.md new file mode 100644 index 0000000..c9d5549 --- /dev/null +++ b/docs/contents/deployment/deployment-standalone.md @@ -0,0 +1,59 @@ +Standalone mode is a distributed cluster mode. That is, Gearpump runs as service without the help from other services (e.g. YARN). + +To deploy Gearpump in cluster mode, please first check that the [Pre-requisites](hardware-requirement) are met. + +### How to Install +You need to have Gearpump binary at hand. Please refer to [How to get gearpump distribution](get-gearpump-distribution) to get the Gearpump binary. + +You are suggested to unzip the package to same directory path on every machine you planned to install Gearpump. +To install Gearpump, you at least need to change the configuration in `conf/gear.conf`. + +Config | Default value | Description +------------ | ---------------|------------ +gearpump.hostname | "127.0.0.1" | Host or IP address of current machine. The ip/host need to be reachable from other machines in the cluster. +gearpump.cluster.masters | ["127.0.0.1:3000"] | List of all master nodes, with each item represents host and port of one master. +gearpump.worker.slots | 1000 | how many slots this worker has + +Besides this, there are other optional configurations related with logs, metrics, transports, ui. You can refer to [Configuration Guide](deployment-configuration) for more details. + +### Start the Cluster Daemons in Standlone mode +In Standalone mode, you can start master and worker in different JVMs. + +##### To start master: + + :::bash + bin/master -ip xx -port xx + +The ip and port will be checked against settings under `conf/gear.conf`, so you need to make sure they are consistent. + +**NOTE:** You may need to execute `chmod +x bin/*` in shell to make the script file `master` executable. + +**NOTE**: for high availability, please check [Master HA Guide](deployment-ha) + +##### To start worker: + + :::bash + bin/worker + +### Start UI + + :::bash + bin/services + + +After UI is started, you can browse to `http://{web_ui_host}:8090` to view the cluster status. +The default username and password is "admin:admin", you can check +[UI Authentication](deployment-ui-authentication) to find how to manage users. + + + +**NOTE:** The UI port can be configured in `gear.conf`. Check [Configuration Guide](deployment-configuration) for information. + +### Bash tool to start cluster + +There is a bash tool `bin/start-cluster.sh` can launch the cluster conveniently. You need to change the file `conf/masters`, `conf/workers` and `conf/dashboard` to specify the corresponding machines. +Before running the bash tool, please make sure the Gearpump package is already unzipped to the same directory path on every machine. +`bin/stop-cluster.sh` is used to stop the whole cluster of course. + +The bash tool is able to launch the cluster without changing the `conf/gear.conf` on every machine. The bash sets the `gearpump.cluster.masters` and other configurations using JAVA_OPTS. +However, please note when you log into any these unconfigured machine and try to launch the dashboard or submit the application, you still need to modify `conf/gear.conf` manually because the JAVA_OPTS is missing.
