fix GEARPUMP-205 remove hdfs dependency from gear's classpath raise the pr to use travis UT
Author: huafengw <[email protected]> Closes #81 from huafengw/blob. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/f8f91664 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/f8f91664 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/f8f91664 Branch: refs/heads/master Commit: f8f916645cfb9b83767c9d3c3912d04825f38636 Parents: 6852b56 Author: huafengw <[email protected]> Authored: Wed Sep 7 12:17:52 2016 +0800 Committer: manuzhang <[email protected]> Committed: Wed Sep 7 12:17:52 2016 +0800 ---------------------------------------------------------------------- .../gearpump/cluster/client/ClientContext.scala | 8 +- .../gearpump/cluster/main/AppSubmitter.scala | 105 +++++++++++ .../org/apache/gearpump/cluster/main/Gear.scala | 80 +++++++++ .../org/apache/gearpump/cluster/main/Info.scala | 52 ++++++ .../org/apache/gearpump/cluster/main/Kill.scala | 49 ++++++ .../gearpump/cluster/main/MainRunner.scala | 42 +++++ .../apache/gearpump/cluster/main/Replay.scala | 47 +++++ .../gearpump/jarstore/FileDirective.scala | 172 +++++++++++++++++++ .../apache/gearpump/jarstore/FileServer.scala | 160 +++++++++++++++++ .../org/apache/gearpump/jarstore/JarStore.scala | 82 +++++++++ .../gearpump/jarstore/JarStoreClient.scala | 73 ++++++++ .../gearpump/jarstore/JarStoreServer.scala | 52 ++++++ .../gearpump/jarstore/JarStoreService.scala | 86 ---------- .../gearpump/jarstore/local/LocalJarStore.scala | 72 ++++++++ .../scala/org/apache/gearpump/util/Util.scala | 6 +- .../gearpump/jarstore/FileServerSpec.scala | 129 ++++++++++++++ .../org.apache.gearpump.jarstore.JarStore | 20 +++ ...org.apache.gearpump.jarstore.JarStoreService | 20 --- .../gearpump/cluster/main/AppSubmitter.scala | 106 ------------ .../org/apache/gearpump/cluster/main/Gear.scala | 81 --------- .../org/apache/gearpump/cluster/main/Info.scala | 53 ------ .../org/apache/gearpump/cluster/main/Kill.scala | 50 ------ .../gearpump/cluster/main/MainRunner.scala | 43 ----- .../apache/gearpump/cluster/main/Replay.scala | 48 ------ .../apache/gearpump/cluster/master/Master.scala | 10 +- .../apache/gearpump/cluster/worker/Worker.scala | 11 +- .../gearpump/jarstore/dfs/DFSJarStore.scala | 67 ++++++++ .../jarstore/dfs/DFSJarStoreService.scala | 76 -------- .../gearpump/jarstore/local/LocalJarStore.scala | 64 ------- .../jarstore/local/LocalJarStoreService.scala | 81 --------- .../apache/gearpump/util/FileDirective.scala | 140 --------------- .../org/apache/gearpump/util/FileServer.scala | 167 ------------------ .../org.apache.gearpump.jarstore.JarStore | 20 +++ ...org.apache.gearpump.jarstore.JarStoreService | 20 --- .../apache/gearpump/util/FileServerSpec.scala | 120 ------------- project/Build.scala | 9 +- project/Pack.scala | 5 +- .../gearpump/services/AppMasterService.scala | 34 ++-- .../gearpump/services/MasterService.scala | 18 +- .../apache/gearpump/services/RestServices.scala | 19 +- .../services/AppMasterServiceSpec.scala | 12 +- .../gearpump/services/MasterServiceSpec.scala | 10 +- .../gearpump/services/WorkerServiceSpec.scala | 3 - 43 files changed, 1279 insertions(+), 1243 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala b/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala index 245f1bc..0cba079 100644 --- a/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala @@ -33,7 +33,7 @@ import org.apache.gearpump.cluster.MasterToAppMaster.{AppMastersData, ReplayFrom import org.apache.gearpump.cluster.MasterToClient.ReplayApplicationResult import org.apache.gearpump.cluster._ import org.apache.gearpump.cluster.master.MasterProxy -import org.apache.gearpump.jarstore.JarStoreService +import org.apache.gearpump.jarstore.{JarStoreClient, JarStoreServer} import org.apache.gearpump.util.Constants._ import org.apache.gearpump.util.{ActorUtil, Constants, LogUtil, Util} @@ -59,8 +59,7 @@ class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) { LOG.info(s"Starting system ${system.name}") val shouldCleanupSystem = Option(sys).isEmpty - private val jarStoreService = JarStoreService.get(config) - jarStoreService.init(config, system) + private val jarStoreClient = new JarStoreClient(config, system) private lazy val master: ActorRef = { val masters = config.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).asScala @@ -140,8 +139,7 @@ class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) { private def loadFile(jarPath: String): AppJar = { val jarFile = new java.io.File(jarPath) - val path = jarStoreService.copyFromLocal(jarFile) - AppJar(jarFile.getName, path) + Util.uploadJar(jarFile, jarStoreClient) } private def checkAndAddNamePrefix(appName: String, namePrefix: String): String = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala b/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala new file mode 100644 index 0000000..b2eef7d --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.cluster.main + +import java.io.File +import java.net.{URL, URLClassLoader} +import java.util.jar.JarFile + +import org.apache.gearpump.util.{AkkaApp, Constants, LogUtil, Util} +import org.slf4j.Logger + +/** Tool to submit an application jar to cluster */ +object AppSubmitter extends AkkaApp with ArgumentsParser { + val LOG: Logger = LogUtil.getLogger(getClass) + + override val ignoreUnknownArgument = true + + override val description = "Submit an application to Master by providing a jar" + + override val options: Array[(String, CLIOption[Any])] = Array( + "namePrefix" -> CLIOption[String]("<application name prefix>", required = false, + defaultValue = Some("")), + "jar" -> CLIOption("<application>.jar", required = true), + "executors" -> CLIOption[Int]("number of executor to launch", required = false, + defaultValue = Some(1)), + "verbose" -> CLIOption("<print verbose log on console>", required = false, + defaultValue = Some(false)), + // For document purpose only, OPTION_CONFIG option is not used here. + // OPTION_CONFIG is parsed by parent shell command "Gear" transparently. + Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false, + defaultValue = None)) + + def main(akkaConf: Config, args: Array[String]): Unit = { + + val config = parse(args) + if (null != config) { + + val verbose = config.getBoolean("verbose") + if (verbose) { + LogUtil.verboseLogToConsole() + } + + val jar = config.getString("jar") + + // Set jar path to be submitted to cluster + System.setProperty(Constants.GEARPUMP_APP_JAR, jar) + System.setProperty(Constants.APPLICATION_EXECUTOR_NUMBER, config.getInt("executors").toString) + + val namePrefix = config.getString("namePrefix") + if (namePrefix.nonEmpty) { + if (!Util.validApplicationName(namePrefix)) { + throw new Exception(s"$namePrefix is not a valid prefix for an application name") + } + System.setProperty(Constants.GEARPUMP_APP_NAME_PREFIX, namePrefix) + } + + val jarFile = new java.io.File(jar) + + // Start main class + if (!jarFile.exists()) { + throw new Exception(s"jar $jar does not exist") + } + + val classLoader: URLClassLoader = new URLClassLoader(Array(new URL("file:" + + jarFile.getAbsolutePath)), Thread.currentThread().getContextClassLoader) + val (main, arguments) = parseMain(jarFile, config.remainArgs, classLoader) + + // Set to context classLoader. ActorSystem pick context classLoader in preference + Thread.currentThread().setContextClassLoader(classLoader) + val clazz = classLoader.loadClass(main) + val mainMethod = clazz.getMethod("main", classOf[Array[String]]) + mainMethod.invoke(null, arguments) + } + } + + private def parseMain(jar: File, remainArgs: Array[String], classLoader: ClassLoader) + : (String, Array[String]) = { + val mainInManifest = Option(new JarFile(jar).getManifest.getMainAttributes. + getValue("Main-Class")).getOrElse("") + + if (remainArgs.length > 0) { + classLoader.loadClass(remainArgs(0)) + (remainArgs(0), remainArgs.drop(1)) + } else if (mainInManifest.nonEmpty) { + (mainInManifest, remainArgs) + } else { + throw new Exception("No main class specified") + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/core/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala b/core/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala new file mode 100644 index 0000000..1511469 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.cluster.main + +import org.apache.gearpump.util.{Constants, LogUtil} +import org.slf4j.Logger + +object Gear { + + val OPTION_CONFIG = "conf" + + private val LOG: Logger = LogUtil.getLogger(getClass) + + val commands = Map("app" -> AppSubmitter, "kill" -> Kill, + "info" -> Info, "replay" -> Replay, "main" -> MainRunner) + + def usage(): Unit = { + val keys = commands.keys.toList.sorted + // scalastyle:off println + Console.err.println("Usage: " + "<" + keys.mkString("|") + ">") + // scalastyle:on println + } + + private def executeCommand(command: String, commandArgs: Array[String]) = { + commands.get(command).map(_.main(commandArgs)) + if (!commands.contains(command)) { + val allArgs = (command +: commandArgs.toList).toArray + MainRunner.main(allArgs) + } + } + + def main(inputArgs: Array[String]): Unit = { + val (configFile, args) = extractConfig(inputArgs) + if (configFile != null) { + // Sets custom config file... + System.setProperty(Constants.GEARPUMP_CUSTOM_CONFIG_FILE, configFile) + } + + if (args.length == 0) { + usage() + } else { + val command = args(0) + val commandArgs = args.drop(1) + executeCommand(command, commandArgs) + } + } + + private def extractConfig(inputArgs: Array[String]): (String, Array[String]) = { + var index = 0 + + var result = List.empty[String] + var configFile: String = null + while (index < inputArgs.length) { + val item = inputArgs(index) + if (item == s"-$OPTION_CONFIG") { + index += 1 + configFile = inputArgs(index) + } else { + result = result :+ item + } + index += 1 + } + (configFile, result.toArray) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/core/src/main/scala/org/apache/gearpump/cluster/main/Info.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/Info.scala b/core/src/main/scala/org/apache/gearpump/cluster/main/Info.scala new file mode 100644 index 0000000..e1fe291 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/cluster/main/Info.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.cluster.main + +import org.apache.gearpump.cluster.MasterToAppMaster.AppMastersData +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.util.{AkkaApp, LogUtil} +import org.slf4j.Logger + +/** Tool to query master info */ +object Info extends AkkaApp with ArgumentsParser { + + private val LOG: Logger = LogUtil.getLogger(getClass) + + override val options: Array[(String, CLIOption[Any])] = Array( + // For document purpose only, OPTION_CONFIG option is not used here. + // OPTION_CONFIG is parsed by parent shell command "Gear" transparently. + Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false, + defaultValue = None)) + + override val description = "Query the Application list" + + // scalastyle:off println + def main(akkaConf: Config, args: Array[String]): Unit = { + val client = ClientContext(akkaConf) + + val AppMastersData(appMasters) = client.listApps + Console.println("== Application Information ==") + Console.println("====================================") + appMasters.foreach { appData => + Console.println(s"application: ${appData.appId}, name: ${appData.appName}, " + + s"status: ${appData.status}, worker: ${appData.workerPath}") + } + client.close() + } + // scalastyle:on println +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/core/src/main/scala/org/apache/gearpump/cluster/main/Kill.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/Kill.scala b/core/src/main/scala/org/apache/gearpump/cluster/main/Kill.scala new file mode 100644 index 0000000..8ecaf85 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/cluster/main/Kill.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.main + +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.util.{AkkaApp, LogUtil} +import org.slf4j.Logger + +/** Tool to kill an App */ +object Kill extends AkkaApp with ArgumentsParser { + + private val LOG: Logger = LogUtil.getLogger(getClass) + + override val options: Array[(String, CLIOption[Any])] = Array( + "appid" -> CLIOption("<application id>", required = true), + // For document purpose only, OPTION_CONFIG option is not used here. + // OPTION_CONFIG is parsed by parent shell command "Gear" transparently. + Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false, + defaultValue = None)) + + override val description = "Kill an application with application Id" + + def main(akkaConf: Config, args: Array[String]): Unit = { + val config = parse(args) + + if (null != config) { + val client = ClientContext(akkaConf) + LOG.info("Client ") + client.shutdown(config.getInt("appid")) + client.close() + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/core/src/main/scala/org/apache/gearpump/cluster/main/MainRunner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/MainRunner.scala b/core/src/main/scala/org/apache/gearpump/cluster/main/MainRunner.scala new file mode 100644 index 0000000..8664232 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/cluster/main/MainRunner.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.main + +import org.apache.gearpump.util.{AkkaApp, LogUtil} +import org.slf4j.Logger + +/** Tool to run any main class by providing a jar */ +object MainRunner extends AkkaApp with ArgumentsParser { + private val LOG: Logger = LogUtil.getLogger(getClass) + + override val options: Array[(String, CLIOption[Any])] = Array( + // For document purpose only, OPTION_CONFIG option is not used here. + // OPTION_CONFIG is parsed by parent shell command "Gear" transparently. + Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false, + defaultValue = None)) + + def main(akkaConf: Config, args: Array[String]): Unit = { + val mainClazz = args(0) + val commandArgs = args.drop(1) + + val clazz = Thread.currentThread().getContextClassLoader().loadClass(mainClazz) + val mainMethod = clazz.getMethod("main", classOf[Array[String]]) + mainMethod.invoke(null, commandArgs) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/core/src/main/scala/org/apache/gearpump/cluster/main/Replay.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/Replay.scala b/core/src/main/scala/org/apache/gearpump/cluster/main/Replay.scala new file mode 100644 index 0000000..e648d61 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/cluster/main/Replay.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.cluster.main + +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.util.{AkkaApp, LogUtil} +import org.slf4j.Logger + +// Internal tool to restart an application +object Replay extends AkkaApp with ArgumentsParser { + + private val LOG: Logger = LogUtil.getLogger(getClass) + + override val options: Array[(String, CLIOption[Any])] = Array( + "appid" -> CLIOption("<application id>", required = true), + // For document purpose only, OPTION_CONFIG option is not used here. + // OPTION_CONFIG is parsed by parent shell command "Gear" transparently. + Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false, + defaultValue = None)) + + override val description = "Replay the application from current min clock(low watermark)" + + def main(akkaConf: Config, args: Array[String]): Unit = { + val config = parse(args) + + if (null != config) { + val client = ClientContext(akkaConf) + client.replayFromTimestampWindowTrailingEdge(config.getInt("appid")) + client.close() + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/core/src/main/scala/org/apache/gearpump/jarstore/FileDirective.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/jarstore/FileDirective.scala b/core/src/main/scala/org/apache/gearpump/jarstore/FileDirective.scala new file mode 100644 index 0000000..969da04 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/jarstore/FileDirective.scala @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.jarstore + +import java.io.File +import java.time.Instant +import scala.concurrent.{ExecutionContext, Future} + +import akka.http.scaladsl.model.{HttpEntity, MediaTypes, Multipart} +import akka.http.scaladsl.server.Directives._ +import akka.http.scaladsl.server._ +import akka.stream.Materializer +import akka.stream.scaladsl.{StreamConverters, FileIO} +import akka.util.ByteString + + +/** + * FileDirective is a set of Akka-http directive to upload/download + * huge binary files to/from Akka-Http server. + */ +object FileDirective { + + // Form field name + type Name = String + + val CHUNK_SIZE = 262144 + + /** + * File information after a file is uploaded to server. + * + * @param originFileName original file name when user upload it in browser. + * @param file file name after the file is saved to server. + * @param length the length of the file + */ + case class FileInfo(originFileName: String, file: File, length: Long) + + class Form(val fields: Map[Name, FormField]) { + def getFileInfo(fieldName: String): Option[FileInfo] = { + fields.get(fieldName).flatMap { + case Left(file) => Option(file) + case Right(_) => None + } + } + + def getValue(fieldName: String): Option[String] = { + fields.get(fieldName).flatMap { + case Left(_) => None + case Right(value) => Option(value) + } + } + } + + type FormField = Either[FileInfo, String] + + /** + * Store the uploaded files to temporary directory, and return a Map from form field name + * to FileInfo. + */ + def uploadFile: Directive1[Form] = { + Directive[Tuple1[Form]] { inner => + extractMaterializer {implicit mat => + extractExecutionContext {implicit ec => + uploadFileImpl(mat, ec) { formFuture => + ctx => { + formFuture.map(form => inner(Tuple1(form))).flatMap(route => route(ctx)) + } + } + } + } + } + } + + /** + * Store the uploaded files to JarStore, and return a Map from form field name + * to FilePath in JatStore. + */ + def uploadFileTo(jarStore: JarStore): Directive1[Map[Name, FilePath]] = { + Directive[Tuple1[Map[Name, FilePath]]] { inner => + extractMaterializer {implicit mat => + extractExecutionContext {implicit ec => + uploadFileImpl(jarStore)(mat, ec) { filesFuture => + ctx => { + filesFuture.map(map => inner(Tuple1(map))).flatMap(route => route(ctx)) + } + } + } + } + } + } + + // Downloads file from server + def downloadFileFrom(jarStore: JarStore, filePath: String): Route = { + val responseEntity = HttpEntity( + MediaTypes.`application/octet-stream`, + StreamConverters.fromInputStream( + () => jarStore.getFile(filePath), CHUNK_SIZE + )) + complete(responseEntity) + } + + private def uploadFileImpl(jarStore: JarStore) + (implicit mat: Materializer, ec: ExecutionContext): Directive1[Future[Map[Name, FilePath]]] = { + Directive[Tuple1[Future[Map[Name, FilePath]]]] { inner => + entity(as[Multipart.FormData]) { (formdata: Multipart.FormData) => + val fileNameMap = formdata.parts.mapAsync(1) { p => + if (p.filename.isDefined) { + val path = Instant.now().toEpochMilli + p.filename.get + val sink = StreamConverters.fromOutputStream(() => jarStore.createFile(path), + autoFlush = true) + p.entity.dataBytes.runWith(sink).map(written => + if (written.count > 0) { + Map(p.name -> FilePath(path)) + } else { + Map.empty[Name, FilePath] + }) + } else { + Future(Map.empty[Name, FilePath]) + } + }.runFold(Map.empty[Name, FilePath])((set, value) => set ++ value) + inner(Tuple1(fileNameMap)) + } + } + } + + private def uploadFileImpl(implicit mat: Materializer, ec: ExecutionContext) + : Directive1[Future[Form]] = { + Directive[Tuple1[Future[Form]]] { inner => + entity(as[Multipart.FormData]) { (formdata: Multipart.FormData) => + val form = formdata.parts.mapAsync(1) { p => + if (p.filename.isDefined) { + val targetPath = File.createTempFile(s"userfile_${p.name}_", + s"${p.filename.getOrElse("")}") + val writtenFuture = p.entity.dataBytes.runWith(FileIO.toFile(targetPath)) + writtenFuture.map(written => + if (written.count > 0) { + Map(p.name -> Left(FileInfo(p.filename.get, targetPath, written.count))) + } else { + Map.empty[Name, FormField] + }) + } else { + val valueFuture = p.entity.dataBytes.runFold(ByteString.empty) {(total, input) => + total ++ input + } + valueFuture.map{value => + Map(p.name -> Right(value.utf8String)) + } + } + }.runFold(new Form(Map.empty[Name, FormField])) {(set, value) => + new Form(set.fields ++ value) + } + + inner(Tuple1(form)) + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/core/src/main/scala/org/apache/gearpump/jarstore/FileServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/jarstore/FileServer.scala b/core/src/main/scala/org/apache/gearpump/jarstore/FileServer.scala new file mode 100644 index 0000000..4ce8f2d --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/jarstore/FileServer.scala @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.jarstore + +import java.io.File +import scala.concurrent.{ExecutionContext, Future} + +import akka.actor.ActorSystem +import akka.http.scaladsl.Http +import akka.http.scaladsl.Http.ServerBinding +import akka.http.scaladsl.marshalling.Marshal +import akka.http.scaladsl.model.Uri.{Path, Query} +import akka.http.scaladsl.model.{HttpEntity, HttpRequest, MediaTypes, Multipart, _} +import akka.http.scaladsl.server.Directives._ +import akka.http.scaladsl.server._ +import akka.http.scaladsl.unmarshalling.Unmarshal +import akka.stream.ActorMaterializer +import akka.stream.scaladsl.{FileIO, Sink, Source} +import spray.json.DefaultJsonProtocol._ +import spray.json.JsonFormat + +import org.apache.gearpump.jarstore.FileDirective._ +import org.apache.gearpump.jarstore.FileServer.Port + +/** + * A simple file server implemented with akka-http to store/fetch large + * binary files. + */ +class FileServer(system: ActorSystem, host: String, port: Int = 0, jarStore: JarStore) { + import system.dispatcher + implicit val actorSystem = system + implicit val materializer = ActorMaterializer() + implicit def ec: ExecutionContext = system.dispatcher + + val route: Route = { + path("upload") { + uploadFileTo(jarStore) { form => + val uploadedFilePath = form.headOption.map(_._2) + + if (uploadedFilePath.isDefined) { + complete(uploadedFilePath.get.path) + } else { + failWith(new Exception("File not found in the uploaded form")) + } + } + } ~ + path("download") { + parameters("file") { file: String => + downloadFileFrom(jarStore, file) + } + } ~ + pathEndOrSingleSlash { + extractUri { uri => + val upload = uri.withPath(Uri.Path("/upload")).toString() + val entity = HttpEntity(ContentTypes.`text/html(UTF-8)`, + s""" + | + |<h2>Please specify a file to upload:</h2> + |<form action="$upload" enctype="multipart/form-data" method="post"> + |<input type="file" name="datafile" size="40"> + |</p> + |<div> + |<input type="submit" value="Submit"> + |</div> + |</form> + """.stripMargin) + complete(entity) + } + } + } + + private var connection: Future[ServerBinding] = null + + def start: Future[Port] = { + connection = Http().bindAndHandle(Route.handlerFlow(route), host, port) + connection.map(address => Port(address.localAddress.getPort)) + } + + def stop: Future[Unit] = { + connection.flatMap(_.unbind()) + } +} + +object FileServer { + + implicit def filePathFormat: JsonFormat[FilePath] = jsonFormat1(FilePath.apply) + + case class Port(port: Int) + + /** + * Client of [[org.apache.gearpump.jarstore.FileServer]] + */ + class Client(system: ActorSystem, host: String, port: Int) { + + def this(system: ActorSystem, url: String) = { + this(system, Uri(url).authority.host.address(), Uri(url).authority.port) + } + + private implicit val actorSystem = system + private implicit val materializer = ActorMaterializer() + private implicit val ec = system.dispatcher + + val server = Uri(s"http://$host:$port") + val httpClient = Http(system).outgoingConnection(server.authority.host.address(), + server.authority.port) + + def upload(file: File): Future[FilePath] = { + val target = server.withPath(Path("/upload")) + + val request = entity(file).map { entity => + HttpRequest(HttpMethods.POST, uri = target, entity = entity) + } + + val response = Source.fromFuture(request).via(httpClient).runWith(Sink.head) + response.flatMap { some => + Unmarshal(some).to[String] + }.map { path => + FilePath(path) + } + } + + def download(remoteFile: FilePath, saveAs: File): Future[Unit] = { + val download = server.withPath(Path("/download")).withQuery(Query("file" -> remoteFile.path)) + // Download file to local + val response = Source.single(HttpRequest(uri = download)).via(httpClient).runWith(Sink.head) + val downloaded = response.flatMap { response => + response.entity.dataBytes.runWith(FileIO.toFile(saveAs)) + } + downloaded.map(written => Unit) + } + + private def entity(file: File)(implicit ec: ExecutionContext): Future[RequestEntity] = { + val entity = HttpEntity(MediaTypes.`application/octet-stream`, file.length(), + FileIO.fromFile(file, chunkSize = 100000)) + val body = Source.single( + Multipart.FormData.BodyPart( + "uploadfile", + entity, + Map("filename" -> file.getName))) + val form = Multipart.FormData(body) + + Marshal(form).to[RequestEntity] + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/core/src/main/scala/org/apache/gearpump/jarstore/JarStore.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/jarstore/JarStore.scala b/core/src/main/scala/org/apache/gearpump/jarstore/JarStore.scala new file mode 100644 index 0000000..a4a411f --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/jarstore/JarStore.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.jarstore + +import java.io.{InputStream, OutputStream} +import java.net.URI +import java.util.ServiceLoader + +import com.typesafe.config.Config +import org.apache.gearpump.util.Util + +import scala.collection.JavaConverters._ + +case class FilePath(path: String) + +/** + * JarStore is used to manage the upload/download of binary files, + * like user submitted application jar. + */ +trait JarStore { + /** + * The scheme of the JarStore. + * Like "hdfs" for HDFS file system, and "file" for a local + * file system. + */ + val scheme: String + + /** + * Init the Jar Store. + */ + def init(config: Config) + + /** + * Creates the file on JarStore. + * + * @param fileName name of the file to be created on JarStore. + * @return OutputStream returns a stream into which the data can be written. + */ + def createFile(fileName: String): OutputStream + + /** + * Gets the InputStream to read the file + * + * @param fileName name of the file to be read on JarStore. + * @return InputStream returns a stream from which the data can be read. + */ + def getFile(fileName: String): InputStream +} + +object JarStore { + + /** + * Get a active JarStore by specifying a scheme. + * + * Please see config [[org.apache.gearpump.util.Constants#GEARPUMP_APP_JAR_STORE_ROOT_PATH]] for + * more information. + */ + private lazy val jarstores: List[JarStore] = { + ServiceLoader.load(classOf[JarStore]).asScala.toList + } + + def get(rootPath: String): JarStore = { + val scheme = new URI(Util.resolvePath(rootPath)).getScheme + jarstores.find(_.scheme == scheme).get + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/core/src/main/scala/org/apache/gearpump/jarstore/JarStoreClient.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/jarstore/JarStoreClient.scala b/core/src/main/scala/org/apache/gearpump/jarstore/JarStoreClient.scala new file mode 100644 index 0000000..59cc405 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/jarstore/JarStoreClient.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.jarstore + +import java.io.File +import java.util.concurrent.TimeUnit +import scala.collection.JavaConverters._ +import scala.concurrent.duration.Duration +import scala.concurrent.Await + +import akka.pattern.ask +import akka.actor.{ActorSystem, ActorRef} +import com.typesafe.config.Config +import org.apache.gearpump.cluster.master.MasterProxy +import org.apache.gearpump.util.{Util, Constants, LogUtil} +import org.slf4j.Logger + +import org.apache.gearpump.cluster.ClientToMaster.{GetJarStoreServer, JarStoreServerAddress} +import scala.concurrent.{Future, ExecutionContext} + +class JarStoreClient(config: Config, system: ActorSystem) { + private def LOG: Logger = LogUtil.getLogger(getClass) + private implicit val timeout = Constants.FUTURE_TIMEOUT + private implicit def dispatcher: ExecutionContext = system.dispatcher + + private val master: ActorRef = { + val masters = config.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS) + .asScala.flatMap(Util.parseHostList) + system.actorOf(MasterProxy.props(masters), s"masterproxy${Util.randInt()}") + } + + private lazy val client = (master ? GetJarStoreServer).asInstanceOf[Future[JarStoreServerAddress]] + .map { address => + val client = new FileServer.Client(system, address.url) + client + } + + /** + * This function will copy the remote file to local file system, called from client side. + * + * @param localFile The destination of file path + * @param remotePath The remote file path from JarStore + */ + def copyToLocalFile(localFile: File, remotePath: FilePath): Unit = { + LOG.info(s"Copying to local file: ${localFile.getAbsolutePath} from $remotePath") + val future = client.flatMap(_.download(remotePath, localFile)) + Await.ready(future, Duration(60, TimeUnit.SECONDS)) + } + + /** + * This function will copy the local file to the remote JarStore, called from client side. + * @param localFile The local file + */ + def copyFromLocal(localFile: File): FilePath = { + val future = client.flatMap(_.upload(localFile)) + Await.result(future, Duration(60, TimeUnit.SECONDS)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/core/src/main/scala/org/apache/gearpump/jarstore/JarStoreServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/jarstore/JarStoreServer.scala b/core/src/main/scala/org/apache/gearpump/jarstore/JarStoreServer.scala new file mode 100644 index 0000000..1fb0de5 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/jarstore/JarStoreServer.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.jarstore + +import akka.actor.{Actor, Stash} +import akka.pattern.pipe + +import org.apache.gearpump.cluster.ClientToMaster.{GetJarStoreServer, JarStoreServerAddress} +import org.apache.gearpump.util._ + +class JarStoreServer(jarStoreRootPath: String) extends Actor with Stash { + private val host = context.system.settings.config.getString(Constants.GEARPUMP_HOSTNAME) + private val jarStore = JarStore.get(jarStoreRootPath) + jarStore.init(context.system.settings.config) + private val server = new FileServer(context.system, host, 0, jarStore) + implicit val timeout = Constants.FUTURE_TIMEOUT + implicit val executionContext = context.dispatcher + + server.start pipeTo self + + def receive: Receive = { + case FileServer.Port(port) => + context.become(listen(port)) + unstashAll() + case _ => + stash() + } + + def listen(port: Int): Receive = { + case GetJarStoreServer => + sender ! JarStoreServerAddress(s"http://$host:$port/") + } + + override def postStop(): Unit = { + server.stop + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/core/src/main/scala/org/apache/gearpump/jarstore/JarStoreService.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/jarstore/JarStoreService.scala b/core/src/main/scala/org/apache/gearpump/jarstore/JarStoreService.scala deleted file mode 100644 index 0ba9558..0000000 --- a/core/src/main/scala/org/apache/gearpump/jarstore/JarStoreService.scala +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.jarstore - -import java.io.File -import java.net.URI -import java.util.ServiceLoader -import scala.collection.JavaConverters._ - -import akka.actor.ActorSystem -import com.typesafe.config.Config - -import org.apache.gearpump.util.{Constants, Util} - -case class FilePath(path: String) - -/** - * JarStoreService is used to manage the upload/download of binary files, - * like user submitted application jar. - */ -trait JarStoreService { - /** - * The scheme of the JarStoreService. - * Like "hdfs" for HDFS file system, and "file" for a local - * file system. - */ - val scheme: String - - /** - * Init the Jar Store. - */ - def init(config: Config, system: ActorSystem) - - /** - * This function will copy the local file to the remote JarStore, called from client side. - * @param localFile The local file - */ - def copyFromLocal(localFile: File): FilePath - - /** - * This function will copy the remote file to local file system, called from client side. - * - * @param localFile The destination of file path - * @param remotePath The remote file path from JarStore - */ - def copyToLocalFile(localFile: File, remotePath: FilePath) -} - -object JarStoreService { - - /** - * Get a active JarStoreService by specifying a scheme. - * - * Please see config [[org.apache.gearpump.util.Constants#GEARPUMP_APP_JAR_STORE_ROOT_PATH]] for - * more information. - */ - def get(config: Config): JarStoreService = { - val jarStoreRootPath = config.getString(Constants.GEARPUMP_APP_JAR_STORE_ROOT_PATH) - get(jarStoreRootPath) - } - - private lazy val jarstoreServices: List[JarStoreService] = { - ServiceLoader.load(classOf[JarStoreService]).asScala.toList - } - - private def get(rootPath: String): JarStoreService = { - val scheme = new URI(Util.resolvePath(rootPath)).getScheme - jarstoreServices.find(_.scheme == scheme).get - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/core/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStore.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStore.scala b/core/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStore.scala new file mode 100644 index 0000000..c15a9be --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStore.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.jarstore.local + +import java.io._ + +import com.typesafe.config.Config +import org.apache.gearpump.jarstore.JarStore +import org.apache.gearpump.util.{LogUtil, FileUtils, Constants} +import org.slf4j.Logger + +/** + * LocalJarStore store the uploaded jar on local disk. + */ +class LocalJarStore extends JarStore { + private val LOG: Logger = LogUtil.getLogger(getClass) + private var rootPath: String = null + override val scheme: String = "file" + + class ClosedInputStream extends InputStream { + override def read(): Int = -1 + } + + override def init(config: Config): Unit = { + rootPath = config.getString(Constants.GEARPUMP_APP_JAR_STORE_ROOT_PATH) + FileUtils.forceMkdir(new File(rootPath)) + } + + /** + * Creates the file on JarStore. + * + * @param fileName name of the file to be created on JarStore. + * @return OutputStream returns a stream into which the data can be written. + */ + override def createFile(fileName: String): OutputStream = { + val localFile = new File(rootPath, fileName) + new FileOutputStream(localFile) + } + + /** + * Gets the InputStream to read the file + * + * @param fileName name of the file to be read on JarStore. + * @return InputStream returns a stream from which the data can be read. + */ + override def getFile(fileName: String): InputStream = { + val localFile = new File(rootPath, fileName) + val is = try { + new FileInputStream(localFile) + } catch { + case ex: Exception => + LOG.error(s"Fetch file $fileName failed: ${ex.getStackTrace}") + new ClosedInputStream + } + is + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/core/src/main/scala/org/apache/gearpump/util/Util.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/util/Util.scala b/core/src/main/scala/org/apache/gearpump/util/Util.scala index 19bd5a8..0faa46a 100644 --- a/core/src/main/scala/org/apache/gearpump/util/Util.scala +++ b/core/src/main/scala/org/apache/gearpump/util/Util.scala @@ -27,7 +27,7 @@ import scala.util.{Failure, Success, Try} import com.typesafe.config.{Config, ConfigFactory} import org.apache.gearpump.cluster.AppJar -import org.apache.gearpump.jarstore.JarStoreService +import org.apache.gearpump.jarstore.{JarStoreClient, JarStoreServer} import org.apache.gearpump.transport.HostPort object Util { @@ -123,8 +123,8 @@ object Util { } } - def uploadJar(jarFile: File, jarStoreService: JarStoreService): AppJar = { - val remotePath = jarStoreService.copyFromLocal(jarFile) + def uploadJar(jarFile: File, jarStoreClient: JarStoreClient): AppJar = { + val remotePath = jarStoreClient.copyFromLocal(jarFile) AppJar(jarFile.getName, remotePath) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/core/src/test/scala/org/apache/gearpump/jarstore/FileServerSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/jarstore/FileServerSpec.scala b/core/src/test/scala/org/apache/gearpump/jarstore/FileServerSpec.scala new file mode 100644 index 0000000..c99a031 --- /dev/null +++ b/core/src/test/scala/org/apache/gearpump/jarstore/FileServerSpec.scala @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.jarstore + +import java.io.File +import java.util.concurrent.TimeUnit + +import akka.actor.ActorSystem +import com.typesafe.config.{ConfigValueFactory, ConfigValue} +import org.apache.gearpump.cluster.TestUtil +import org.apache.gearpump.google.common.io.Files +import org.apache.gearpump.jarstore.local.LocalJarStore +import org.apache.gearpump.util.{FileUtils, LogUtil} +import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} +import org.apache.gearpump.jarstore.FileServer._ + +import scala.concurrent.Await +import scala.concurrent.duration.Duration + +class FileServerSpec extends WordSpecLike with Matchers with BeforeAndAfterAll { + + implicit val timeout = akka.util.Timeout(25, TimeUnit.SECONDS) + val host = "localhost" + private val LOG = LogUtil.getLogger(getClass) + + var system: ActorSystem = null + + override def afterAll { + if (null != system) { + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } + } + + override def beforeAll { + system = ActorSystem("FileServerSpec", TestUtil.DEFAULT_CONFIG) + } + + private def save(client: Client, data: Array[Byte]): FilePath = { + val file = File.createTempFile("fileserverspec", "test") + FileUtils.writeByteArrayToFile(file, data) + val future = client.upload(file) + import scala.concurrent.duration._ + val path = Await.result(future, 30.seconds) + file.delete() + path + } + + private def get(client: Client, remote: FilePath): Array[Byte] = { + val file = File.createTempFile("fileserverspec", "test") + val future = client.download(remote, file) + import scala.concurrent.duration._ + val data = Await.result(future, 10.seconds) + + val bytes = FileUtils.readFileToByteArray(file) + file.delete() + bytes + } + + "The file server" should { + "serve the data previously stored" in { + + val rootDir = Files.createTempDir() + val localJarStore: JarStore = new LocalJarStore + val conf = TestUtil.DEFAULT_CONFIG.withValue("gearpump.jarstore.rootpath", + ConfigValueFactory.fromAnyRef(rootDir.getAbsolutePath)) + localJarStore.init(conf) + + val server = new FileServer(system, host, 0, localJarStore) + val port = Await.result(server.start, Duration(25, TimeUnit.SECONDS)) + + LOG.info("start test web server on port " + port) + + val sizes = List(1, 100, 1000000, 50000000) + val client = new Client(system, host, port.port) + + sizes.foreach { size => + val bytes = randomBytes(size) + val url = s"http://$host:${port.port}/$size" + val remote = save(client, bytes) + val fetchedBytes = get(client, remote) + assert(fetchedBytes sameElements bytes, s"fetch data is coruppted, $url, $rootDir") + } + server.stop + rootDir.delete() + } + } + + "The file server" should { + "handle missed file" in { + + val rootDir = Files.createTempDir() + val localJarStore: JarStore = new LocalJarStore + val conf = TestUtil.DEFAULT_CONFIG.withValue("gearpump.jarstore.rootpath", + ConfigValueFactory.fromAnyRef(rootDir.getAbsolutePath)) + localJarStore.init(conf) + + val server = new FileServer(system, host, 0, localJarStore) + val port = Await.result(server.start, Duration(25, TimeUnit.SECONDS)) + + val client = new Client(system, host, port.port) + val fetchedBytes = get(client, FilePath("noexist")) + assert(fetchedBytes.length == 0) + rootDir.delete() + } + } + + private def randomBytes(size: Int): Array[Byte] = { + val bytes = new Array[Byte](size) + new java.util.Random().nextBytes(bytes) + bytes + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore ---------------------------------------------------------------------- diff --git a/daemon/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore b/daemon/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore new file mode 100644 index 0000000..e173a8a --- /dev/null +++ b/daemon/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.gearpump.jarstore.local.LocalJarStore +org.apache.gearpump.jarstore.dfs.DFSJarStore \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStoreService ---------------------------------------------------------------------- diff --git a/daemon/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStoreService b/daemon/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStoreService deleted file mode 100644 index bf37316..0000000 --- a/daemon/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStoreService +++ /dev/null @@ -1,20 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -org.apache.gearpump.jarstore.local.LocalJarStoreService -org.apache.gearpump.jarstore.dfs.DFSJarStoreService \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala deleted file mode 100644 index d0de51a..0000000 --- a/daemon/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.cluster.main - -import java.io.File -import java.net.{URL, URLClassLoader} -import java.util.jar.JarFile - -import org.slf4j.Logger - -import org.apache.gearpump.util.{AkkaApp, Constants, LogUtil, Util} - -/** Tool to submit an application jar to cluster */ -object AppSubmitter extends AkkaApp with ArgumentsParser { - val LOG: Logger = LogUtil.getLogger(getClass) - - override val ignoreUnknownArgument = true - - override val description = "Submit an application to Master by providing a jar" - - override val options: Array[(String, CLIOption[Any])] = Array( - "namePrefix" -> CLIOption[String]("<application name prefix>", required = false, - defaultValue = Some("")), - "jar" -> CLIOption("<application>.jar", required = true), - "executors" -> CLIOption[Int]("number of executor to launch", required = false, - defaultValue = Some(1)), - "verbose" -> CLIOption("<print verbose log on console>", required = false, - defaultValue = Some(false)), - // For document purpose only, OPTION_CONFIG option is not used here. - // OPTION_CONFIG is parsed by parent shell command "Gear" transparently. - Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false, - defaultValue = None)) - - def main(akkaConf: Config, args: Array[String]): Unit = { - - val config = parse(args) - if (null != config) { - - val verbose = config.getBoolean("verbose") - if (verbose) { - LogUtil.verboseLogToConsole() - } - - val jar = config.getString("jar") - - // Set jar path to be submitted to cluster - System.setProperty(Constants.GEARPUMP_APP_JAR, jar) - System.setProperty(Constants.APPLICATION_EXECUTOR_NUMBER, config.getInt("executors").toString) - - val namePrefix = config.getString("namePrefix") - if (namePrefix.nonEmpty) { - if (!Util.validApplicationName(namePrefix)) { - throw new Exception(s"$namePrefix is not a valid prefix for an application name") - } - System.setProperty(Constants.GEARPUMP_APP_NAME_PREFIX, namePrefix) - } - - val jarFile = new java.io.File(jar) - - // Start main class - if (!jarFile.exists()) { - throw new Exception(s"jar $jar does not exist") - } - - val classLoader: URLClassLoader = new URLClassLoader(Array(new URL("file:" + - jarFile.getAbsolutePath)), Thread.currentThread().getContextClassLoader) - val (main, arguments) = parseMain(jarFile, config.remainArgs, classLoader) - - // Set to context classLoader. ActorSystem pick context classLoader in preference - Thread.currentThread().setContextClassLoader(classLoader) - val clazz = classLoader.loadClass(main) - val mainMethod = clazz.getMethod("main", classOf[Array[String]]) - mainMethod.invoke(null, arguments) - } - } - - private def parseMain(jar: File, remainArgs: Array[String], classLoader: ClassLoader) - : (String, Array[String]) = { - val mainInManifest = Option(new JarFile(jar).getManifest.getMainAttributes. - getValue("Main-Class")).getOrElse("") - - if (remainArgs.length > 0) { - classLoader.loadClass(remainArgs(0)) - (remainArgs(0), remainArgs.drop(1)) - } else if (mainInManifest.nonEmpty) { - (mainInManifest, remainArgs) - } else { - throw new Exception("No main class specified") - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala deleted file mode 100644 index 672fee6..0000000 --- a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.cluster.main - -import org.slf4j.Logger - -import org.apache.gearpump.util.{Constants, LogUtil} - -object Gear { - - val OPTION_CONFIG = "conf" - - private val LOG: Logger = LogUtil.getLogger(getClass) - - val commands = Map("app" -> AppSubmitter, "kill" -> Kill, - "info" -> Info, "replay" -> Replay, "main" -> MainRunner) - - def usage(): Unit = { - val keys = commands.keys.toList.sorted - // scalastyle:off println - Console.err.println("Usage: " + "<" + keys.mkString("|") + ">") - // scalastyle:on println - } - - private def executeCommand(command: String, commandArgs: Array[String]) = { - commands.get(command).map(_.main(commandArgs)) - if (!commands.contains(command)) { - val allArgs = (command +: commandArgs.toList).toArray - MainRunner.main(allArgs) - } - } - - def main(inputArgs: Array[String]): Unit = { - val (configFile, args) = extractConfig(inputArgs) - if (configFile != null) { - // Sets custom config file... - System.setProperty(Constants.GEARPUMP_CUSTOM_CONFIG_FILE, configFile) - } - - if (args.length == 0) { - usage() - } else { - val command = args(0) - val commandArgs = args.drop(1) - executeCommand(command, commandArgs) - } - } - - private def extractConfig(inputArgs: Array[String]): (String, Array[String]) = { - var index = 0 - - var result = List.empty[String] - var configFile: String = null - while (index < inputArgs.length) { - val item = inputArgs(index) - if (item == s"-$OPTION_CONFIG") { - index += 1 - configFile = inputArgs(index) - } else { - result = result :+ item - } - index += 1 - } - (configFile, result.toArray) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/main/scala/org/apache/gearpump/cluster/main/Info.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Info.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Info.scala deleted file mode 100644 index bf444a3..0000000 --- a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Info.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.cluster.main - -import org.slf4j.Logger - -import org.apache.gearpump.cluster.MasterToAppMaster.AppMastersData -import org.apache.gearpump.cluster.client.ClientContext -import org.apache.gearpump.util.{AkkaApp, LogUtil} - -/** Tool to query master info */ -object Info extends AkkaApp with ArgumentsParser { - - private val LOG: Logger = LogUtil.getLogger(getClass) - - override val options: Array[(String, CLIOption[Any])] = Array( - // For document purpose only, OPTION_CONFIG option is not used here. - // OPTION_CONFIG is parsed by parent shell command "Gear" transparently. - Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false, - defaultValue = None)) - - override val description = "Query the Application list" - - // scalastyle:off println - def main(akkaConf: Config, args: Array[String]): Unit = { - val client = ClientContext(akkaConf) - - val AppMastersData(appMasters) = client.listApps - Console.println("== Application Information ==") - Console.println("====================================") - appMasters.foreach { appData => - Console.println(s"application: ${appData.appId}, name: ${appData.appName}, " + - s"status: ${appData.status}, worker: ${appData.workerPath}") - } - client.close() - } - // scalastyle:on println -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/main/scala/org/apache/gearpump/cluster/main/Kill.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Kill.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Kill.scala deleted file mode 100644 index 17f6214..0000000 --- a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Kill.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.cluster.main - -import org.slf4j.Logger - -import org.apache.gearpump.cluster.client.ClientContext -import org.apache.gearpump.util.{AkkaApp, LogUtil} - -/** Tool to kill an App */ -object Kill extends AkkaApp with ArgumentsParser { - - private val LOG: Logger = LogUtil.getLogger(getClass) - - override val options: Array[(String, CLIOption[Any])] = Array( - "appid" -> CLIOption("<application id>", required = true), - // For document purpose only, OPTION_CONFIG option is not used here. - // OPTION_CONFIG is parsed by parent shell command "Gear" transparently. - Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false, - defaultValue = None)) - - override val description = "Kill an application with application Id" - - def main(akkaConf: Config, args: Array[String]): Unit = { - val config = parse(args) - - if (null != config) { - val client = ClientContext(akkaConf) - LOG.info("Client ") - client.shutdown(config.getInt("appid")) - client.close() - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/main/scala/org/apache/gearpump/cluster/main/MainRunner.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/MainRunner.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/MainRunner.scala deleted file mode 100644 index c6c9f10..0000000 --- a/daemon/src/main/scala/org/apache/gearpump/cluster/main/MainRunner.scala +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.cluster.main - -import org.slf4j.Logger - -import org.apache.gearpump.util.{AkkaApp, LogUtil} - -/** Tool to run any main class by providing a jar */ -object MainRunner extends AkkaApp with ArgumentsParser { - private val LOG: Logger = LogUtil.getLogger(getClass) - - override val options: Array[(String, CLIOption[Any])] = Array( - // For document purpose only, OPTION_CONFIG option is not used here. - // OPTION_CONFIG is parsed by parent shell command "Gear" transparently. - Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false, - defaultValue = None)) - - def main(akkaConf: Config, args: Array[String]): Unit = { - val mainClazz = args(0) - val commandArgs = args.drop(1) - - val clazz = Thread.currentThread().getContextClassLoader().loadClass(mainClazz) - val mainMethod = clazz.getMethod("main", classOf[Array[String]]) - mainMethod.invoke(null, commandArgs) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/main/scala/org/apache/gearpump/cluster/main/Replay.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Replay.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Replay.scala deleted file mode 100644 index d721832..0000000 --- a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Replay.scala +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.cluster.main - -import org.slf4j.Logger - -import org.apache.gearpump.cluster.client.ClientContext -import org.apache.gearpump.util.{AkkaApp, LogUtil} - -// Internal tool to restart an application -object Replay extends AkkaApp with ArgumentsParser { - - private val LOG: Logger = LogUtil.getLogger(getClass) - - override val options: Array[(String, CLIOption[Any])] = Array( - "appid" -> CLIOption("<application id>", required = true), - // For document purpose only, OPTION_CONFIG option is not used here. - // OPTION_CONFIG is parsed by parent shell command "Gear" transparently. - Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false, - defaultValue = None)) - - override val description = "Replay the application from current min clock(low watermark)" - - def main(akkaConf: Config, args: Array[String]): Unit = { - val config = parse(args) - - if (null != config) { - val client = ClientContext(akkaConf) - client.replayFromTimestampWindowTrailingEdge(config.getInt("appid")) - client.close() - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala index 762cf27..6b4df07 100644 --- a/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala +++ b/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala @@ -20,6 +20,7 @@ package org.apache.gearpump.cluster.master import java.lang.management.ManagementFactory import org.apache.gearpump.cluster.worker.WorkerId +import org.apache.gearpump.jarstore.JarStoreServer import scala.collection.JavaConverters._ import scala.collection.immutable @@ -40,7 +41,6 @@ import org.apache.gearpump.cluster.WorkerToMaster._ import org.apache.gearpump.cluster.master.InMemoryKVService._ import org.apache.gearpump.cluster.master.Master.{MasterInfo, WorkerTerminated, _} import org.apache.gearpump.cluster.scheduler.Scheduler.ApplicationFinished -import org.apache.gearpump.jarstore.local.LocalJarStore import org.apache.gearpump.metrics.Metrics.ReportMetrics import org.apache.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService} import org.apache.gearpump.transport.HostPort @@ -79,11 +79,7 @@ private[cluster] class Master extends Actor with Stash { val jarStoreRootPath = systemConfig.getString(Constants.GEARPUMP_APP_JAR_STORE_ROOT_PATH) - private val jarStore = if (Util.isLocalPath(jarStoreRootPath)) { - Some(context.actorOf(Props(classOf[LocalJarStore], jarStoreRootPath))) - } else { - None - } + private val jarStore = context.actorOf(Props(classOf[JarStoreServer], jarStoreRootPath)) private val hostPort = HostPort(ActorUtil.getSystemAddress(context.system).hostPort) @@ -162,7 +158,7 @@ private[cluster] class Master extends Actor with Stash { def jarStoreService: Receive = { case GetJarStoreServer => - jarStore.foreach(_ forward GetJarStoreServer) + jarStore forward GetJarStoreServer } def kvServiceMsgHandler: Receive = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala index ff74368..1b52e5d 100644 --- a/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala +++ b/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala @@ -43,7 +43,7 @@ import org.apache.gearpump.cluster.WorkerToMaster._ import org.apache.gearpump.cluster.master.Master.MasterInfo import org.apache.gearpump.cluster.scheduler.Resource import org.apache.gearpump.cluster.{ClusterConfig, ExecutorJVMConfig} -import org.apache.gearpump.jarstore.JarStoreService +import org.apache.gearpump.jarstore.{JarStoreClient, JarStoreServer} import org.apache.gearpump.metrics.Metrics.ReportMetrics import org.apache.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService} import org.apache.gearpump.util.ActorSystemBooter.Daemon @@ -69,8 +69,7 @@ private[cluster] class Worker(masterProxy: ActorRef) extends Actor with TimeOutS private var masterInfo: MasterInfo = null private var executorNameToActor = Map.empty[String, ActorRef] private val executorProcLauncher: ExecutorProcessLauncher = getExecutorProcLauncher() - private val jarStoreService = JarStoreService.get(systemConfig) - jarStoreService.init(systemConfig, context.system) + private val jarStoreClient = new JarStoreClient(systemConfig, context.system) private val ioPool = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool()) private val resourceUpdateTimeoutMs = 30000 // Milliseconds @@ -171,7 +170,7 @@ private[cluster] class Worker(masterProxy: ActorRef) extends Actor with TimeOutS val actorName = ActorUtil.actorNameForExecutor(launch.appId, launch.executorId) val executor = context.actorOf(Props(classOf[ExecutorWatcher], launch, masterInfo, ioPool, - jarStoreService, executorProcLauncher)) + jarStoreClient, executorProcLauncher)) executorNameToActor += actorName -> executor resource = resource - launch.resource @@ -339,7 +338,7 @@ private[cluster] object Worker { launch: LaunchExecutor, masterInfo: MasterInfo, ioPool: ExecutionContext, - jarStoreService: JarStoreService, + jarStoreClient: JarStoreClient, procLauncher: ExecutorProcessLauncher) extends Actor { import launch.{appId, executorId, resource} @@ -407,7 +406,7 @@ private[cluster] object Worker { val process = Future { val jarPath = ctx.jar.map { appJar => val tempFile = File.createTempFile(appJar.name, ".jar") - jarStoreService.copyToLocalFile(tempFile, appJar.filePath) + jarStoreClient.copyToLocalFile(tempFile, appJar.filePath) val file = new URL("file:" + tempFile) file.getFile } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStore.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStore.scala b/daemon/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStore.scala new file mode 100644 index 0000000..ebaf354 --- /dev/null +++ b/daemon/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStore.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.jarstore.dfs + +import java.io.{InputStream, OutputStream} + +import com.typesafe.config.Config +import org.apache.gearpump.jarstore.JarStore +import org.apache.gearpump.util.Constants +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.permission.{FsAction, FsPermission} + +/** + * DFSJarStore store the uploaded jar on HDFS + */ +class DFSJarStore extends JarStore { + private var rootPath: Path = null + override val scheme: String = "hdfs" + + override def init(config: Config): Unit = { + rootPath = new Path(config.getString(Constants.GEARPUMP_APP_JAR_STORE_ROOT_PATH)) + val fs = rootPath.getFileSystem(new Configuration()) + if (!fs.exists(rootPath)) { + fs.mkdirs(rootPath, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)) + } + } + + /** + * Creates the file on JarStore. + * + * @param fileName name of the file to be created on JarStore. + * @return OutputStream returns a stream into which the data can be written. + */ + override def createFile(fileName: String): OutputStream = { + val filePath = new Path(rootPath, fileName) + val fs = filePath.getFileSystem(new Configuration()) + fs.create(filePath) + } + + /** + * Gets the InputStream to read the file + * + * @param fileName name of the file to be read on JarStore. + * @return InputStream returns a stream from which the data can be read. + */ + override def getFile(fileName: String): InputStream = { + val filePath = new Path(rootPath, fileName) + val fs = filePath.getFileSystem(new Configuration()) + fs.open(filePath) + } +}
