Repository: incubator-gearpump Updated Branches: refs/heads/master 7edaf4bd4 -> 32f150717
fix GEARPUMP-64, only allow LocalJarStore for ComposeDAG Author: manuzhang <[email protected]> Closes #27 from manuzhang/GEARPUMP-64. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/32f15071 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/32f15071 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/32f15071 Branch: refs/heads/master Commit: 32f1507171eb4398312deb323f7872eb0bb429f5 Parents: 7edaf4b Author: manuzhang <[email protected]> Authored: Tue May 31 10:10:21 2016 +0800 Committer: manuzhang <[email protected]> Committed: Tue May 31 10:10:21 2016 +0800 ---------------------------------------------------------------------- .../worker/DefaultExecutorProcessLauncher.scala | 2 +- .../org/apache/gearpump/cluster/worker/Worker.scala | 1 - .../gearpump/jarstore/dfs/DFSJarStoreService.scala | 4 ++-- .../cluster/worker/CGroupProcessLauncher.scala | 2 +- .../org/apache/gearpump/services/RestServices.scala | 14 ++++++++++++-- 5 files changed, 16 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/32f15071/daemon/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala index 2f1e23d..b4e6f9e 100644 --- a/daemon/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala +++ b/daemon/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala @@ -33,7 +33,7 @@ class DefaultExecutorProcessLauncher(val config: Config) extends ExecutorProcess appId: Int, executorId: Int, resource: Resource, config: Config, options: Array[String], classPath: Array[String], mainClass: String, arguments: Array[String]): RichProcess = { - LOG.info(s"Launch executor, classpath: ${classPath.mkString(File.pathSeparator)}") + LOG.info(s"Launch executor $executorId, classpath: ${classPath.mkString(File.pathSeparator)}") Util.startProcess(options, classPath, mainClass, arguments) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/32f15071/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala index 5ec8644..bc60694 100644 --- a/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala +++ b/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala @@ -465,7 +465,6 @@ private[cluster] object Worker { val options = ctx.jvmArguments ++ username ++ logArgs ++ remoteDebugConfig ++ verboseGCConfig ++ ipv4 ++ configArgs - LOG.info(s"Launch executor, classpath: ${classPath.mkString(File.pathSeparator)}") val process = procLauncher.createProcess(appId, executorId, resource, executorConfig, options, classPath, ctx.mainClass, ctx.arguments) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/32f15071/daemon/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStoreService.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStoreService.scala b/daemon/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStoreService.scala index d34f5c2..7a60019 100644 --- a/daemon/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStoreService.scala +++ b/daemon/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStoreService.scala @@ -53,9 +53,9 @@ class DFSJarStoreService extends JarStoreService { * @param remotePath The remote file path from JarStore */ override def copyToLocalFile(localFile: File, remotePath: FilePath): Unit = { - LOG.info(s"Copying to local file: ${localFile.getAbsolutePath} from ${remotePath}") val filePath = new Path(rootPath, remotePath.path) val fs = filePath.getFileSystem(new Configuration()) + LOG.info(s"Copying to local file: ${localFile.getAbsolutePath} from ${filePath.toString}") val target = new Path(localFile.toURI().toString) fs.copyToLocalFile(filePath, target) } @@ -67,9 +67,9 @@ class DFSJarStoreService extends JarStoreService { */ override def copyFromLocal(localFile: File): FilePath = { val remotePath = FilePath(Math.abs(new java.util.Random().nextLong()).toString) - LOG.info(s"Copying from local file: ${localFile.getAbsolutePath} to ${remotePath}") val filePath = new Path(rootPath, remotePath.path) val fs = filePath.getFileSystem(new Configuration()) + LOG.info(s"Copying from local file: ${localFile.getAbsolutePath} to ${filePath.toString}") fs.copyFromLocalFile(new Path(localFile.toURI.toString), filePath) remotePath } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/32f15071/experiments/cgroup/src/main/scala/org/apache/gearpump/cluster/worker/CGroupProcessLauncher.scala ---------------------------------------------------------------------- diff --git a/experiments/cgroup/src/main/scala/org/apache/gearpump/cluster/worker/CGroupProcessLauncher.scala b/experiments/cgroup/src/main/scala/org/apache/gearpump/cluster/worker/CGroupProcessLauncher.scala index dc2eabd..e6e25c3 100644 --- a/experiments/cgroup/src/main/scala/org/apache/gearpump/cluster/worker/CGroupProcessLauncher.scala +++ b/experiments/cgroup/src/main/scala/org/apache/gearpump/cluster/worker/CGroupProcessLauncher.scala @@ -48,7 +48,7 @@ class CGroupProcessLauncher(val config: Config) extends ExecutorProcessLauncher cgroupManager.map(_.startNewExecutor(appConfig, resource.slots, appId, executorId)).getOrElse(List.empty) } else List.empty - LOG.info(s"Launch executor with CGroup ${cgroupCommand.mkString(" ")}, " + + LOG.info(s"Launch executor $executorId with CGroup ${cgroupCommand.mkString(" ")}, " + s"classpath: ${classPath.mkString(File.pathSeparator)}") val java = System.getProperty("java.home") + "/bin/java" http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/32f15071/services/jvm/src/main/scala/org/apache/gearpump/services/RestServices.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/RestServices.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/RestServices.scala index 87f9e34..7d67f60 100644 --- a/services/jvm/src/main/scala/org/apache/gearpump/services/RestServices.scala +++ b/services/jvm/src/main/scala/org/apache/gearpump/services/RestServices.scala @@ -18,6 +18,8 @@ package org.apache.gearpump.services +import org.apache.gearpump.jarstore.local.LocalJarStoreService + import scala.concurrent.Await import scala.concurrent.duration._ @@ -38,14 +40,22 @@ import org.apache.gearpump.services.util.UpickleUtil._ class RestServices(master: ActorRef, mat: ActorMaterializer, system: ActorSystem) extends RouteService { + private val LOG = LogUtil.getLogger(getClass) + implicit val timeout = Constants.FUTURE_TIMEOUT private val config = system.settings.config - private val jarStoreService = JarStoreService.get(config) + // only LocalJarStoreService is supported now for "Compose DAG" + // since DFSJarStoreService requires HDFS to be on the classpath. + // Note this won't affect users "Submit Gearpump Application" through + // dashboard with "jarstore.rootpath" set to HDFS. + if (!JarStoreService.get(config).isInstanceOf[LocalJarStoreService]) { + LOG.warn("only local jar store is supported for Compose DAG") + } + private val jarStoreService = new LocalJarStoreService jarStoreService.init(config, system) - private val LOG = LogUtil.getLogger(getClass) private val securityEnabled = config.getBoolean( Constants.GEARPUMP_UI_SECURITY_AUTHENTICATION_ENABLED)
