Repository: incubator-gearpump Updated Branches: refs/heads/master f634aab41 -> e9956325d
[GEARPUMP-366] Create JarStore dir if not exists on job submit Author: manuzhang <[email protected]> Closes #239 from manuzhang/fix_job_submit. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/e9956325 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/e9956325 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/e9956325 Branch: refs/heads/master Commit: e9956325d4f55abc5babb6ea79227e105413d744 Parents: f634aab Author: manuzhang <[email protected]> Authored: Fri Mar 16 13:28:05 2018 +0800 Committer: manuzhang <[email protected]> Committed: Fri Mar 16 13:28:12 2018 +0800 ---------------------------------------------------------------------- .../gearpump/jarstore/local/LocalJarStore.scala | 9 ++++++++- .../apache/gearpump/jarstore/dfs/DFSJarStore.scala | 15 ++++++++++----- 2 files changed, 18 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9956325/core/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStore.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStore.scala b/core/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStore.scala index 0a11c03..d14677b 100644 --- a/core/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStore.scala +++ b/core/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStore.scala @@ -39,7 +39,7 @@ class LocalJarStore extends JarStore { override def init(config: Config): Unit = { rootPath = Util.asSubDirOfGearpumpHome( config.getString(Constants.GEARPUMP_APP_JAR_STORE_ROOT_PATH)) - FileUtils.forceMkdir(rootPath) + createDirIfNotExists(rootPath) } /** @@ -49,6 +49,7 @@ class LocalJarStore extends JarStore { * @return OutputStream returns a stream into which the data can be written. */ override def createFile(fileName: String): OutputStream = { + createDirIfNotExists(rootPath) val localFile = new File(rootPath, fileName) new FileOutputStream(localFile) } @@ -70,4 +71,10 @@ class LocalJarStore extends JarStore { } is } + + private def createDirIfNotExists(file: File): Unit = { + if (!file.exists()) { + FileUtils.forceMkdir(file) + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9956325/gearpump-hadoop/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStore.scala ---------------------------------------------------------------------- diff --git a/gearpump-hadoop/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStore.scala b/gearpump-hadoop/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStore.scala index ae4cf46..cd1a13f 100644 --- a/gearpump-hadoop/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStore.scala +++ b/gearpump-hadoop/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStore.scala @@ -29,15 +29,12 @@ import org.apache.hadoop.fs.permission.{FsAction, FsPermission} * DFSJarStore store the uploaded jar on HDFS */ class DFSJarStore extends JarStore { - private var rootPath: Path = null + private var rootPath: Path = _ override val scheme: String = "hdfs" override def init(config: Config): Unit = { rootPath = new Path(config.getString(Constants.GEARPUMP_APP_JAR_STORE_ROOT_PATH)) - val fs = rootPath.getFileSystem(new Configuration()) - if (!fs.exists(rootPath)) { - fs.mkdirs(rootPath, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)) - } + createDirIfNotExists(rootPath) } /** @@ -47,6 +44,7 @@ class DFSJarStore extends JarStore { * @return OutputStream returns a stream into which the data can be written. */ override def createFile(fileName: String): OutputStream = { + createDirIfNotExists(rootPath) val filePath = new Path(rootPath, fileName) val fs = filePath.getFileSystem(new Configuration()) fs.create(filePath) @@ -63,4 +61,11 @@ class DFSJarStore extends JarStore { val fs = filePath.getFileSystem(new Configuration()) fs.open(filePath) } + + private def createDirIfNotExists(path: Path): Unit = { + val fs = path.getFileSystem(new Configuration()) + if (!fs.exists(path)) { + fs.mkdirs(path, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)) + } + } }
