Repository: incubator-gearpump
Updated Branches:
  refs/heads/master 7edaf4bd4 -> 32f150717


fix GEARPUMP-64, only allow LocalJarStore for ComposeDAG

Author: manuzhang <[email protected]>

Closes #27 from manuzhang/GEARPUMP-64.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/32f15071
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/32f15071
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/32f15071

Branch: refs/heads/master
Commit: 32f1507171eb4398312deb323f7872eb0bb429f5
Parents: 7edaf4b
Author: manuzhang <[email protected]>
Authored: Tue May 31 10:10:21 2016 +0800
Committer: manuzhang <[email protected]>
Committed: Tue May 31 10:10:21 2016 +0800

----------------------------------------------------------------------
 .../worker/DefaultExecutorProcessLauncher.scala       |  2 +-
 .../org/apache/gearpump/cluster/worker/Worker.scala   |  1 -
 .../gearpump/jarstore/dfs/DFSJarStoreService.scala    |  4 ++--
 .../cluster/worker/CGroupProcessLauncher.scala        |  2 +-
 .../org/apache/gearpump/services/RestServices.scala   | 14 ++++++++++++--
 5 files changed, 16 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/32f15071/daemon/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala
----------------------------------------------------------------------
diff --git 
a/daemon/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala
 
b/daemon/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala
index 2f1e23d..b4e6f9e 100644
--- 
a/daemon/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala
+++ 
b/daemon/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala
@@ -33,7 +33,7 @@ class DefaultExecutorProcessLauncher(val config: Config) 
extends ExecutorProcess
       appId: Int, executorId: Int, resource: Resource, config: Config, 
options: Array[String],
       classPath: Array[String], mainClass: String, arguments: Array[String]): 
RichProcess = {
 
-    LOG.info(s"Launch executor, classpath: 
${classPath.mkString(File.pathSeparator)}")
+    LOG.info(s"Launch executor $executorId, classpath: 
${classPath.mkString(File.pathSeparator)}")
     Util.startProcess(options, classPath, mainClass, arguments)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/32f15071/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala
----------------------------------------------------------------------
diff --git 
a/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala 
b/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala
index 5ec8644..bc60694 100644
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala
+++ b/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala
@@ -465,7 +465,6 @@ private[cluster] object Worker {
         val options = ctx.jvmArguments ++ username ++
           logArgs ++ remoteDebugConfig ++ verboseGCConfig ++ ipv4 ++ configArgs
 
-        LOG.info(s"Launch executor, classpath: 
${classPath.mkString(File.pathSeparator)}")
         val process = procLauncher.createProcess(appId, executorId, resource, 
executorConfig,
           options, classPath, ctx.mainClass, ctx.arguments)
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/32f15071/daemon/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStoreService.scala
----------------------------------------------------------------------
diff --git 
a/daemon/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStoreService.scala
 
b/daemon/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStoreService.scala
index d34f5c2..7a60019 100644
--- 
a/daemon/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStoreService.scala
+++ 
b/daemon/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStoreService.scala
@@ -53,9 +53,9 @@ class DFSJarStoreService extends JarStoreService {
    * @param remotePath The remote file path from JarStore
    */
   override def copyToLocalFile(localFile: File, remotePath: FilePath): Unit = {
-    LOG.info(s"Copying to local file: ${localFile.getAbsolutePath} from 
${remotePath}")
     val filePath = new Path(rootPath, remotePath.path)
     val fs = filePath.getFileSystem(new Configuration())
+    LOG.info(s"Copying to local file: ${localFile.getAbsolutePath} from 
${filePath.toString}")
     val target = new Path(localFile.toURI().toString)
     fs.copyToLocalFile(filePath, target)
   }
@@ -67,9 +67,9 @@ class DFSJarStoreService extends JarStoreService {
    */
   override def copyFromLocal(localFile: File): FilePath = {
     val remotePath = FilePath(Math.abs(new 
java.util.Random().nextLong()).toString)
-    LOG.info(s"Copying from local file: ${localFile.getAbsolutePath} to 
${remotePath}")
     val filePath = new Path(rootPath, remotePath.path)
     val fs = filePath.getFileSystem(new Configuration())
+    LOG.info(s"Copying from local file: ${localFile.getAbsolutePath} to 
${filePath.toString}")
     fs.copyFromLocalFile(new Path(localFile.toURI.toString), filePath)
     remotePath
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/32f15071/experiments/cgroup/src/main/scala/org/apache/gearpump/cluster/worker/CGroupProcessLauncher.scala
----------------------------------------------------------------------
diff --git 
a/experiments/cgroup/src/main/scala/org/apache/gearpump/cluster/worker/CGroupProcessLauncher.scala
 
b/experiments/cgroup/src/main/scala/org/apache/gearpump/cluster/worker/CGroupProcessLauncher.scala
index dc2eabd..e6e25c3 100644
--- 
a/experiments/cgroup/src/main/scala/org/apache/gearpump/cluster/worker/CGroupProcessLauncher.scala
+++ 
b/experiments/cgroup/src/main/scala/org/apache/gearpump/cluster/worker/CGroupProcessLauncher.scala
@@ -48,7 +48,7 @@ class CGroupProcessLauncher(val config: Config) extends 
ExecutorProcessLauncher
       cgroupManager.map(_.startNewExecutor(appConfig, resource.slots, appId,
         executorId)).getOrElse(List.empty)
     } else List.empty
-    LOG.info(s"Launch executor with CGroup ${cgroupCommand.mkString(" ")}, " +
+    LOG.info(s"Launch executor $executorId with CGroup 
${cgroupCommand.mkString(" ")}, " +
       s"classpath: ${classPath.mkString(File.pathSeparator)}")
 
     val java = System.getProperty("java.home") + "/bin/java"

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/32f15071/services/jvm/src/main/scala/org/apache/gearpump/services/RestServices.scala
----------------------------------------------------------------------
diff --git 
a/services/jvm/src/main/scala/org/apache/gearpump/services/RestServices.scala 
b/services/jvm/src/main/scala/org/apache/gearpump/services/RestServices.scala
index 87f9e34..7d67f60 100644
--- 
a/services/jvm/src/main/scala/org/apache/gearpump/services/RestServices.scala
+++ 
b/services/jvm/src/main/scala/org/apache/gearpump/services/RestServices.scala
@@ -18,6 +18,8 @@
 
 package org.apache.gearpump.services
 
+import org.apache.gearpump.jarstore.local.LocalJarStoreService
+
 import scala.concurrent.Await
 import scala.concurrent.duration._
 
@@ -38,14 +40,22 @@ import org.apache.gearpump.services.util.UpickleUtil._
 class RestServices(master: ActorRef, mat: ActorMaterializer, system: 
ActorSystem)
   extends RouteService {
 
+  private val LOG = LogUtil.getLogger(getClass)
+
   implicit val timeout = Constants.FUTURE_TIMEOUT
 
   private val config = system.settings.config
 
-  private val jarStoreService = JarStoreService.get(config)
+  // only LocalJarStoreService is supported now for "Compose DAG"
+  // since DFSJarStoreService requires HDFS to be on the classpath.
+  // Note this won't affect users  "Submit Gearpump Application" through
+  // dashboard with "jarstore.rootpath" set to HDFS.
+  if (!JarStoreService.get(config).isInstanceOf[LocalJarStoreService]) {
+    LOG.warn("only local jar store is supported for Compose DAG")
+  }
+  private val jarStoreService = new LocalJarStoreService
   jarStoreService.init(config, system)
 
-  private val LOG = LogUtil.getLogger(getClass)
 
   private val securityEnabled = config.getBoolean(
     Constants.GEARPUMP_UI_SECURITY_AUTHENTICATION_ENABLED)

Reply via email to