Repository: incubator-gearpump Updated Branches: refs/heads/master 6852b56e9 -> f8f916645
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStoreService.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStoreService.scala b/daemon/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStoreService.scala deleted file mode 100644 index 7a60019..0000000 --- a/daemon/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStoreService.scala +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.jarstore.dfs - -import java.io.File - -import akka.actor.ActorSystem -import com.typesafe.config.Config -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path -import org.apache.hadoop.fs.permission.{FsAction, FsPermission} -import org.slf4j.Logger - -import org.apache.gearpump.jarstore.{FilePath, JarStoreService} -import org.apache.gearpump.util.{Constants, LogUtil} - -/** - * DFSJarStoreService store the uploaded jar on HDFS - */ -class DFSJarStoreService extends JarStoreService { - private val LOG: Logger = LogUtil.getLogger(getClass) - private var rootPath: Path = null - - override val scheme: String = "hdfs" - - override def init(config: Config, actorRefFactory: ActorSystem): Unit = { - rootPath = new Path(config.getString(Constants.GEARPUMP_APP_JAR_STORE_ROOT_PATH)) - val fs = rootPath.getFileSystem(new Configuration()) - if (!fs.exists(rootPath)) { - fs.mkdirs(rootPath, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)) - } - } - - /** - * This function will copy the remote file to local file system, called from client side. - * - * @param localFile The destination of file path - * @param remotePath The remote file path from JarStore - */ - override def copyToLocalFile(localFile: File, remotePath: FilePath): Unit = { - val filePath = new Path(rootPath, remotePath.path) - val fs = filePath.getFileSystem(new Configuration()) - LOG.info(s"Copying to local file: ${localFile.getAbsolutePath} from ${filePath.toString}") - val target = new Path(localFile.toURI().toString) - fs.copyToLocalFile(filePath, target) - } - - /** - * This function will copy the local file to the remote JarStore, called from client side. - * - * @param localFile The local file - */ - override def copyFromLocal(localFile: File): FilePath = { - val remotePath = FilePath(Math.abs(new java.util.Random().nextLong()).toString) - val filePath = new Path(rootPath, remotePath.path) - val fs = filePath.getFileSystem(new Configuration()) - LOG.info(s"Copying from local file: ${localFile.getAbsolutePath} to ${filePath.toString}") - fs.copyFromLocalFile(new Path(localFile.toURI.toString), filePath) - remotePath - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStore.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStore.scala b/daemon/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStore.scala deleted file mode 100644 index 9bd7071..0000000 --- a/daemon/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStore.scala +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.jarstore.local - -import java.io.File - -import akka.actor.{Actor, Stash} -import akka.pattern.pipe -import org.slf4j.Logger - -import org.apache.gearpump.cluster.ClientToMaster.{GetJarStoreServer, JarStoreServerAddress} -import org.apache.gearpump.util._ - -/** - * LocalJarStore store the uploaded jar on local disk. - */ -class LocalJarStore(rootDirPath: String) extends Actor with Stash { - private val LOG: Logger = LogUtil.getLogger(getClass) - - val host = context.system.settings.config.getString(Constants.GEARPUMP_HOSTNAME) - val rootDirectory = new File(rootDirPath) - - FileUtils.forceMkdir(rootDirectory) - - val server = new FileServer(context.system, host, 0, rootDirectory) - - implicit val timeout = Constants.FUTURE_TIMEOUT - implicit val executionContext = context.dispatcher - - server.start pipeTo self - - def receive: Receive = { - case FileServer.Port(port) => - context.become(listen(port)) - unstashAll() - case _ => - stash() - } - - def listen(port: Int): Receive = { - case GetJarStoreServer => - sender ! JarStoreServerAddress(s"http://$host:$port/") - } - - override def postStop(): Unit = { - server.stop - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStoreService.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStoreService.scala b/daemon/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStoreService.scala deleted file mode 100644 index 1ab103f..0000000 --- a/daemon/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStoreService.scala +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.jarstore.local - -import java.io.File -import java.util.concurrent.TimeUnit -import scala.collection.JavaConverters._ -import scala.concurrent.duration.Duration -import scala.concurrent.{Await, ExecutionContext, Future} - -import akka.actor.{ActorRef, ActorSystem} -import akka.pattern.ask -import com.typesafe.config.Config -import org.slf4j.Logger - -import org.apache.gearpump.cluster.ClientToMaster.{GetJarStoreServer, JarStoreServerAddress} -import org.apache.gearpump.cluster.master.MasterProxy -import org.apache.gearpump.jarstore.{FilePath, JarStoreService} -import org.apache.gearpump.util._ - -/** - * LocalJarStoreService store the uploaded jar on local disk. - */ -class LocalJarStoreService extends JarStoreService { - private def LOG: Logger = LogUtil.getLogger(getClass) - private implicit val timeout = Constants.FUTURE_TIMEOUT - private var system: akka.actor.ActorSystem = null - private var master: ActorRef = null - private implicit def dispatcher: ExecutionContext = system.dispatcher - - override val scheme: String = "file" - - override def init(config: Config, system: ActorSystem): Unit = { - this.system = system - val masters = config.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS) - .asScala.flatMap(Util.parseHostList) - master = system.actorOf(MasterProxy.props(masters), s"masterproxy${Util.randInt()}") - } - - private lazy val client = (master ? GetJarStoreServer).asInstanceOf[Future[JarStoreServerAddress]] - .map { address => - val client = new FileServer.Client(system, address.url) - client - } - - /** - * This function will copy the remote file to local file system, called from client side. - * - * @param localFile The destination of file path - * @param remotePath The remote file path from JarStore - */ - override def copyToLocalFile(localFile: File, remotePath: FilePath): Unit = { - LOG.info(s"Copying to local file: ${localFile.getAbsolutePath} from $remotePath") - val future = client.flatMap(_.download(remotePath, localFile)) - Await.ready(future, Duration(60, TimeUnit.SECONDS)) - } - - /** - * This function will copy the local file to the remote JarStore, called from client side. - * @param localFile The local file - */ - override def copyFromLocal(localFile: File): FilePath = { - val future = client.flatMap(_.upload(localFile)) - Await.result(future, Duration(60, TimeUnit.SECONDS)) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/main/scala/org/apache/gearpump/util/FileDirective.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/org/apache/gearpump/util/FileDirective.scala b/daemon/src/main/scala/org/apache/gearpump/util/FileDirective.scala deleted file mode 100644 index 66bb9ba..0000000 --- a/daemon/src/main/scala/org/apache/gearpump/util/FileDirective.scala +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.util - -import java.io.File -import scala.concurrent.{ExecutionContext, Future} - -import akka.http.scaladsl.model.{HttpEntity, MediaTypes, Multipart} -import akka.http.scaladsl.server.Directives._ -import akka.http.scaladsl.server._ -import akka.stream.Materializer -import akka.stream.scaladsl.FileIO -import akka.util.ByteString - -/** - * FileDirective is a set of Akka-http directive to upload/download - * huge binary files to/from Akka-Http server. - */ -object FileDirective { - - // Form field name - type Name = String - - val CHUNK_SIZE = 262144 - - /** - * File information after a file is uploaded to server. - * - * @param originFileName original file name when user upload it in browser. - * @param file file name after the file is saved to server. - * @param length the length of the file - */ - case class FileInfo(originFileName: String, file: File, length: Long) - - class Form(val fields: Map[Name, FormField]) { - def getFile(fieldName: String): Option[FileInfo] = { - fields.get(fieldName).flatMap { - case Left(file) => Option(file) - case Right(_) => None - } - } - - def getValue(fieldName: String): Option[String] = { - fields.get(fieldName).flatMap { - case Left(_) => None - case Right(value) => Option(value) - } - } - } - - type FormField = Either[FileInfo, String] - - /** - * directive to uploadFile, it store the uploaded files - * to temporary directory, and return a Map from form field name - * to FileInfo. - */ - def uploadFile: Directive1[Form] = { - uploadFileTo(null) - } - - /** - * Store the uploaded files to specific rootDirectory. - * - * @param rootDirectory directory to store the files. - * @return - */ - def uploadFileTo(rootDirectory: File): Directive1[Form] = { - Directive[Tuple1[Form]] { inner => - extractMaterializer {implicit mat => - extractExecutionContext {implicit ec => - uploadFileImpl(rootDirectory)(mat, ec) { filesFuture => - ctx => { - filesFuture.map(map => inner(Tuple1(map))).flatMap(route => route(ctx)) - } - } - } - } - } - } - - // Downloads file from server - def downloadFile(file: File): Route = { - val responseEntity = HttpEntity( - MediaTypes.`application/octet-stream`, - file.length, - FileIO.fromFile(file, CHUNK_SIZE)) - complete(responseEntity) - } - - private def uploadFileImpl(rootDirectory: File)(implicit mat: Materializer, ec: ExecutionContext) - : Directive1[Future[Form]] = { - Directive[Tuple1[Future[Form]]] { inner => - entity(as[Multipart.FormData]) { (formdata: Multipart.FormData) => - val form = formdata.parts.mapAsync(1) { p => - if (p.filename.isDefined) { - - // Reserve the suffix - val targetPath = File.createTempFile(s"userfile_${p.name}_", - s"${p.filename.getOrElse("")}", rootDirectory) - val written = p.entity.dataBytes.runWith(FileIO.toFile(targetPath)) - written.map(written => - if (written.count > 0) { - Map(p.name -> Left(FileInfo(p.filename.get, targetPath, written.count))) - } else { - Map.empty[Name, FormField] - }) - } else { - val valueFuture = p.entity.dataBytes.runFold(ByteString.empty) {(total, input) => - total ++ input - } - valueFuture.map{value => - Map(p.name -> Right(value.utf8String)) - } - } - }.runFold(new Form(Map.empty[Name, FormField])) {(set, value) => - new Form(set.fields ++ value) - } - - inner(Tuple1(form)) - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/main/scala/org/apache/gearpump/util/FileServer.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/org/apache/gearpump/util/FileServer.scala b/daemon/src/main/scala/org/apache/gearpump/util/FileServer.scala deleted file mode 100644 index 3a0faad..0000000 --- a/daemon/src/main/scala/org/apache/gearpump/util/FileServer.scala +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.util - -import java.io.File -import scala.concurrent.{ExecutionContext, Future} - -import akka.actor.ActorSystem -import akka.http.scaladsl.Http -import akka.http.scaladsl.Http.ServerBinding -import akka.http.scaladsl.marshalling.Marshal -import akka.http.scaladsl.model.Uri.{Path, Query} -import akka.http.scaladsl.model.{HttpEntity, HttpRequest, MediaTypes, Multipart, _} -import akka.http.scaladsl.server.Directives._ -import akka.http.scaladsl.server._ -import akka.http.scaladsl.unmarshalling.Unmarshal -import akka.stream.ActorMaterializer -import akka.stream.scaladsl.{FileIO, Sink, Source} -import spray.json.DefaultJsonProtocol._ -import spray.json.JsonFormat - -import org.apache.gearpump.jarstore.FilePath -import org.apache.gearpump.util.FileDirective._ -import org.apache.gearpump.util.FileServer.Port - -/** - * A simple file server implemented with akka-http to store/fetch large - * binary files. - */ -class FileServer(system: ActorSystem, host: String, port: Int = 0, rootDirectory: File) { - import system.dispatcher - implicit val actorSystem = system - implicit val materializer = ActorMaterializer() - implicit def ec: ExecutionContext = system.dispatcher - - val route: Route = { - path("upload") { - uploadFileTo(rootDirectory) { form => - val fileName = form.fields.headOption.flatMap { pair => - val (_, fileInfo) = pair - fileInfo match { - case Left(file) => Option(file.file).map(_.getName) - case Right(_) => None - } - } - - if (fileName.isDefined) { - complete(fileName.get) - } else { - failWith(new Exception("File not found in the uploaded form")) - } - } - } ~ - path("download") { - parameters("file") { file: String => - downloadFile(new File(rootDirectory, file)) - } - } ~ - pathEndOrSingleSlash { - extractUri { uri => - val upload = uri.withPath(Uri.Path("/upload")).toString() - val entity = HttpEntity(ContentTypes.`text/html(UTF-8)`, - s""" - | - |<h2>Please specify a file to upload:</h2> - |<form action="$upload" enctype="multipart/form-data" method="post"> - |<input type="file" name="datafile" size="40"> - |</p> - |<div> - |<input type="submit" value="Submit"> - |</div> - |</form> - """.stripMargin) - complete(entity) - } - } - } - - private var connection: Future[ServerBinding] = null - - def start: Future[Port] = { - connection = Http().bindAndHandle(Route.handlerFlow(route), host, port) - connection.map(address => Port(address.localAddress.getPort)) - } - - def stop: Future[Unit] = { - connection.flatMap(_.unbind()) - } -} - -object FileServer { - - implicit def filePathFormat: JsonFormat[FilePath] = jsonFormat1(FilePath.apply) - - case class Port(port: Int) - - /** - * Client of [[org.apache.gearpump.util.FileServer]] - */ - class Client(system: ActorSystem, host: String, port: Int) { - - def this(system: ActorSystem, url: String) = { - this(system, Uri(url).authority.host.address(), Uri(url).authority.port) - } - - private implicit val actorSystem = system - private implicit val materializer = ActorMaterializer() - private implicit val ec = system.dispatcher - - val server = Uri(s"http://$host:$port") - val httpClient = Http(system).outgoingConnection(server.authority.host.address(), - server.authority.port) - - def upload(file: File): Future[FilePath] = { - val target = server.withPath(Path("/upload")) - - val request = entity(file).map { entity => - HttpRequest(HttpMethods.POST, uri = target, entity = entity) - } - - val response = Source.fromFuture(request).via(httpClient).runWith(Sink.head) - response.flatMap { some => - Unmarshal(some).to[String] - }.map { path => - FilePath(path) - } - } - - def download(remoteFile: FilePath, saveAs: File): Future[Unit] = { - val download = server.withPath(Path("/download")).withQuery(Query("file" -> remoteFile.path)) - // Download file to local - val response = Source.single(HttpRequest(uri = download)).via(httpClient).runWith(Sink.head) - val downloaded = response.flatMap { response => - response.entity.dataBytes.runWith(FileIO.toFile(saveAs)) - } - downloaded.map(written => Unit) - } - - private def entity(file: File)(implicit ec: ExecutionContext): Future[RequestEntity] = { - val entity = HttpEntity(MediaTypes.`application/octet-stream`, file.length(), - FileIO.fromFile(file, chunkSize = 100000)) - val body = Source.single( - Multipart.FormData.BodyPart( - "uploadfile", - entity, - Map("filename" -> file.getName))) - val form = Multipart.FormData(body) - - Marshal(form).to[RequestEntity] - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/test/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore ---------------------------------------------------------------------- diff --git a/daemon/src/test/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore b/daemon/src/test/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore new file mode 100644 index 0000000..e173a8a --- /dev/null +++ b/daemon/src/test/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.gearpump.jarstore.local.LocalJarStore +org.apache.gearpump.jarstore.dfs.DFSJarStore \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/test/resources/META-INF/services/org.apache.gearpump.jarstore.JarStoreService ---------------------------------------------------------------------- diff --git a/daemon/src/test/resources/META-INF/services/org.apache.gearpump.jarstore.JarStoreService b/daemon/src/test/resources/META-INF/services/org.apache.gearpump.jarstore.JarStoreService deleted file mode 100644 index bf37316..0000000 --- a/daemon/src/test/resources/META-INF/services/org.apache.gearpump.jarstore.JarStoreService +++ /dev/null @@ -1,20 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -org.apache.gearpump.jarstore.local.LocalJarStoreService -org.apache.gearpump.jarstore.dfs.DFSJarStoreService \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/test/scala/org/apache/gearpump/util/FileServerSpec.scala ---------------------------------------------------------------------- diff --git a/daemon/src/test/scala/org/apache/gearpump/util/FileServerSpec.scala b/daemon/src/test/scala/org/apache/gearpump/util/FileServerSpec.scala deleted file mode 100644 index 4b17951..0000000 --- a/daemon/src/test/scala/org/apache/gearpump/util/FileServerSpec.scala +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.util - -import java.io.File -import java.util.concurrent.TimeUnit -import scala.concurrent.Await -import scala.concurrent.duration.Duration - -import akka.actor.ActorSystem -import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} - -import org.apache.gearpump.cluster.TestUtil -import org.apache.gearpump.google.common.io.Files -import org.apache.gearpump.jarstore.FilePath -import org.apache.gearpump.util.FileServer._ - -class FileServerSpec extends WordSpecLike with Matchers with BeforeAndAfterAll { - - implicit val timeout = akka.util.Timeout(25, TimeUnit.SECONDS) - val host = "localhost" - private val LOG = LogUtil.getLogger(getClass) - - var system: ActorSystem = null - - override def afterAll { - if (null != system) { - system.terminate() - Await.result(system.whenTerminated, Duration.Inf) - } - } - - override def beforeAll { - val config = TestUtil.DEFAULT_CONFIG - system = ActorSystem("FileServerSpec", config) - } - - private def save(client: Client, data: Array[Byte]): FilePath = { - val file = File.createTempFile("fileserverspec", "test") - FileUtils.writeByteArrayToFile(file, data) - val future = client.upload(file) - import scala.concurrent.duration._ - val path = Await.result(future, 30.seconds) - file.delete() - path - } - - private def get(client: Client, remote: FilePath): Array[Byte] = { - val file = File.createTempFile("fileserverspec", "test") - val future = client.download(remote, file) - import scala.concurrent.duration._ - val data = Await.result(future, 10.seconds) - - val bytes = FileUtils.readFileToByteArray(file) - file.delete() - bytes - } - - "The file server" should { - "serve the data previously stored" in { - - val rootDir = Files.createTempDir() - - val server = new FileServer(system, host, 0, rootDir) - val port = Await.result((server.start), Duration(25, TimeUnit.SECONDS)) - - LOG.info("start test web server on port " + port) - - val sizes = List(1, 100, 1000000, 50000000) - val client = new Client(system, host, port.port) - - sizes.foreach { size => - val bytes = randomBytes(size) - val url = s"http://$host:${port.port}/$size" - val remote = save(client, bytes) - val fetchedBytes = get(client, remote) - assert(fetchedBytes sameElements bytes, s"fetch data is coruppted, $url, $rootDir") - } - server.stop - rootDir.delete() - } - } - - "The file server" should { - "handle missed file" in { - - val rootDir = Files.createTempDir() - - val server = new FileServer(system, host, 0, rootDir) - val port = Await.result((server.start), Duration(25, TimeUnit.SECONDS)) - - val client = new Client(system, host, port.port) - val fetchedBytes = get(client, FilePath("noexist")) - assert(fetchedBytes.length == 0) - rootDir.delete() - } - } - - private def randomBytes(size: Int): Array[Byte] = { - val bytes = new Array[Byte](size) - (new java.util.Random()).nextBytes(bytes) - bytes - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/project/Build.scala ---------------------------------------------------------------------- diff --git a/project/Build.scala b/project/Build.scala index 6b415bb..4ce3053 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -144,8 +144,6 @@ object Build extends sbt.Build { libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-cluster" % akkaVersion, "com.typesafe.akka" %% "akka-cluster-tools" % akkaVersion, - "com.typesafe.akka" %% "akka-http-experimental" % akkaVersion, - "com.typesafe.akka" %% "akka-http-spray-json-experimental" % akkaVersion, "commons-logging" % "commons-logging" % commonsLoggingVersion, "com.typesafe.akka" %% "akka-distributed-data-experimental" % akkaVersion, "org.apache.hadoop" % "hadoop-common" % hadoopVersion % "provided" @@ -184,6 +182,8 @@ object Build extends sbt.Build { "com.typesafe.akka" %% "akka-agent" % akkaVersion, "com.typesafe.akka" %% "akka-slf4j" % akkaVersion, "com.typesafe.akka" %% "akka-kernel" % akkaVersion, + "com.typesafe.akka" %% "akka-http-experimental" % akkaVersion, + "com.typesafe.akka" %% "akka-http-spray-json-experimental" % akkaVersion, "org.scala-lang" % "scala-reflect" % scalaVersionNumber, "org.scala-lang.modules" %% "scala-parser-combinators" % "1.0.4", "com.typesafe.akka" %% "akka-testkit" % akkaVersion % "test", @@ -310,8 +310,7 @@ object Build extends sbt.Build { lazy val services: Project = services_full.jvm. settings(serviceJvmSettings: _*) .settings(compile in Compile <<= (compile in Compile)) - .dependsOn(streaming % "test->test;compile->compile", - daemon % "test->test;compile->compile;provided") + .dependsOn(streaming % "test->test;compile->compile") lazy val serviceJvmSettings = commonSettings ++ noPublish ++ Seq( libraryDependencies ++= Seq( @@ -445,7 +444,7 @@ object Build extends sbt.Build { "org.apache.hadoop" % "hadoop-yarn-server-nodemanager" % hadoopVersion % "provided" ) )) - .dependsOn(services % "test->test;compile->compile", core % "provided") + .dependsOn(services % "test->test;compile->compile", daemon % "provided", core % "provided") .disablePlugins(sbtassembly.AssemblyPlugin) lazy val external_hbase = Project( http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/project/Pack.scala ---------------------------------------------------------------------- diff --git a/project/Pack.scala b/project/Pack.scala index 4efb854..1c87653 100644 --- a/project/Pack.scala +++ b/project/Pack.scala @@ -29,9 +29,10 @@ object Pack extends sbt.Build { "${PROG_HOME}/lib/yarn/*" ) - val applicationClassPath = daemonClassPath ++ Seq( + val applicationClassPath = Seq( // Current working directory - "." + ".", + "${PROG_HOME}/conf" ) val serviceClassPath = Seq( http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala index 1ca2306..b217363 100644 --- a/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala +++ b/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala @@ -33,7 +33,7 @@ import org.apache.gearpump.cluster.ClientToMaster._ import org.apache.gearpump.cluster.ClusterConfig import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterDataDetailRequest, AppMasterDataRequest} import org.apache.gearpump.cluster.MasterToClient._ -import org.apache.gearpump.jarstore.JarStoreService +import org.apache.gearpump.jarstore.{JarStoreClient, FileDirective} import org.apache.gearpump.services.AppMasterService.Status // NOTE: This cannot be removed!!! import org.apache.gearpump.services.util.UpickleUtil._ @@ -42,14 +42,14 @@ import org.apache.gearpump.streaming.appmaster.DagManager._ import org.apache.gearpump.streaming.appmaster.StreamAppMasterSummary import org.apache.gearpump.streaming.executor.Executor.{ExecutorConfig, ExecutorSummary, GetExecutorSummary, QueryExecutorConfig} import org.apache.gearpump.util.ActorUtil.{askActor, askAppMaster} -import org.apache.gearpump.util.FileDirective._ +import FileDirective._ import org.apache.gearpump.util.{Constants, Util} /** * Management service for AppMaster */ class AppMasterService(val master: ActorRef, - val jarStore: JarStoreService, override val system: ActorSystem) + val jarStoreClient: JarStoreClient, override val system: ActorSystem) extends BasicService { private val systemConfig = system.settings.config @@ -71,24 +71,24 @@ class AppMasterService(val master: ActorRef, val msg = java.net.URLDecoder.decode(args, "UTF-8") val dagOperation = read[DAGOperation](msg) (post & entity(as[Multipart.FormData])) { _ => - uploadFile { form => - val jar = form.getFile("jar").map(_.file) + uploadFile { form => + val jar = form.getFileInfo("jar").map(_.file) - if (jar.nonEmpty) { - dagOperation match { - case replace: ReplaceProcessor => - val description = replace.newProcessorDescription.copy(jar = - Util.uploadJar(jar.get, jarStore)) - val dagOperationWithJar = replace.copy(newProcessorDescription = description) - replaceProcessor(dagOperationWithJar) + if (jar.nonEmpty) { + dagOperation match { + case replace: ReplaceProcessor => + val description = replace.newProcessorDescription.copy(jar = + Util.uploadJar(jar.get, jarStoreClient)) + val dagOperationWithJar = replace.copy(newProcessorDescription = description) + replaceProcessor(dagOperationWithJar) + } + } else { + replaceProcessor(dagOperation) } - } else { - replaceProcessor(dagOperation) } + } ~ (post & entity(as[FormData])) { _ => + replaceProcessor(dagOperation) } - } ~ (post & entity(as[FormData])) { _ => - replaceProcessor(dagOperation) - } } } ~ path("stallingtasks") { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala index 32c9f08..0b8409f 100644 --- a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala +++ b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala @@ -40,19 +40,19 @@ import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, MasterConfig, import org.apache.gearpump.cluster.client.ClientContext import org.apache.gearpump.cluster.worker.WorkerSummary import org.apache.gearpump.cluster.{ClusterConfig, UserConfig} -import org.apache.gearpump.jarstore.JarStoreService +import org.apache.gearpump.jarstore.{JarStoreClient, FileDirective, JarStoreServer} import org.apache.gearpump.partitioner.{PartitionerByClassName, PartitionerDescription} import org.apache.gearpump.services.MasterService.{BuiltinPartitioners, SubmitApplicationRequest} // NOTE: This cannot be removed!!! import org.apache.gearpump.services.util.UpickleUtil._ import org.apache.gearpump.streaming.{ProcessorDescription, ProcessorId, StreamApplication} import org.apache.gearpump.util.ActorUtil._ -import org.apache.gearpump.util.FileDirective._ +import FileDirective._ import org.apache.gearpump.util.{Constants, Graph, Util} /** Manages service for master node */ class MasterService(val master: ActorRef, - val jarStore: JarStoreService, override val system: ActorSystem) + val jarStoreClient: JarStoreClient, override val system: ActorSystem) extends BasicService { import upickle.default.{read, write} @@ -116,8 +116,8 @@ class MasterService(val master: ActorRef, path("submitapp") { post { uploadFile { form => - val jar = form.getFile("jar").map(_.file) - val configFile = form.getFile("configfile").map(_.file) + val jar = form.getFileInfo("jar").map(_.file) + val configFile = form.getFileInfo("configfile").map(_.file) val configString = form.getValue("configstring").getOrElse("") val executorCount = form.getValue("executorcount").getOrElse("1").toInt val args = form.getValue("args").getOrElse("") @@ -139,8 +139,8 @@ class MasterService(val master: ActorRef, path("submitstormapp") { post { uploadFile { form => - val jar = form.getFile("jar").map(_.file) - val configFile = form.getFile("configfile").map(_.file) + val jar = form.getFileInfo("jar").map(_.file) + val configFile = form.getFileInfo("configfile").map(_.file) val args = form.getValue("args").getOrElse("") onComplete(Future( MasterService.submitStormApp(jar, configFile, args, systemConfig) @@ -180,12 +180,12 @@ class MasterService(val master: ActorRef, } ~ path("uploadjar") { uploadFile { form => - val jar = form.getFile("jar").map(_.file) + val jar = form.getFileInfo("jar").map(_.file) if (jar.isEmpty) { complete(write( MasterService.Status(success = false, reason = "Jar file not found"))) } else { - val jarFile = Util.uploadJar(jar.get, jarStore) + val jarFile = Util.uploadJar(jar.get, jarStoreClient) complete(write(jarFile)) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/services/jvm/src/main/scala/org/apache/gearpump/services/RestServices.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/RestServices.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/RestServices.scala index 7d67f60..d92972b 100644 --- a/services/jvm/src/main/scala/org/apache/gearpump/services/RestServices.scala +++ b/services/jvm/src/main/scala/org/apache/gearpump/services/RestServices.scala @@ -18,8 +18,6 @@ package org.apache.gearpump.services -import org.apache.gearpump.jarstore.local.LocalJarStoreService - import scala.concurrent.Await import scala.concurrent.duration._ @@ -31,7 +29,7 @@ import akka.stream.ActorMaterializer import akka.util.Timeout import org.apache.commons.lang.exception.ExceptionUtils -import org.apache.gearpump.jarstore.JarStoreService +import org.apache.gearpump.jarstore.JarStoreClient import org.apache.gearpump.util.{Constants, LogUtil} // NOTE: This cannot be removed!!! import org.apache.gearpump.services.util.UpickleUtil._ @@ -46,16 +44,7 @@ class RestServices(master: ActorRef, mat: ActorMaterializer, system: ActorSystem private val config = system.settings.config - // only LocalJarStoreService is supported now for "Compose DAG" - // since DFSJarStoreService requires HDFS to be on the classpath. - // Note this won't affect users "Submit Gearpump Application" through - // dashboard with "jarstore.rootpath" set to HDFS. - if (!JarStoreService.get(config).isInstanceOf[LocalJarStoreService]) { - LOG.warn("only local jar store is supported for Compose DAG") - } - private val jarStoreService = new LocalJarStoreService - jarStoreService.init(config, system) - + private val jarStoreClient = new JarStoreClient(config, system) private val securityEnabled = config.getBoolean( Constants.GEARPUMP_UI_SECURITY_AUTHENTICATION_ENABLED) @@ -101,9 +90,9 @@ class RestServices(master: ActorRef, mat: ActorMaterializer, system: ActorSystem private def services: RouteService = { val admin = new AdminService(system) - val masterService = new MasterService(master, jarStoreService, system) + val masterService = new MasterService(master, jarStoreClient, system) val worker = new WorkerService(master, system) - val app = new AppMasterService(master, jarStoreService, system) + val app = new AppMasterService(master, jarStoreClient, system) val sup = new SupervisorService(master, supervisor, system) new RouteService { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala b/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala index 2ece554..cec7367 100644 --- a/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala +++ b/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala @@ -36,7 +36,7 @@ import org.apache.gearpump.cluster.ClientToMaster.{GetLastFailure, QueryAppMaste import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterDataDetailRequest, AppMasterDataRequest} import org.apache.gearpump.cluster.MasterToClient._ import org.apache.gearpump.cluster.TestUtil -import org.apache.gearpump.jarstore.JarStoreService +import org.apache.gearpump.jarstore.{JarStoreClient, JarStoreServer} import org.apache.gearpump.streaming.executor.Executor.{ExecutorConfig, ExecutorSummary, GetExecutorSummary, QueryExecutorConfig} import org.apache.gearpump.util.LogUtil // NOTE: This cannot be removed!!! @@ -47,19 +47,13 @@ class AppMasterServiceSpec extends FlatSpec with ScalatestRouteTest override def testConfig: Config = TestUtil.UI_CONFIG - private val LOG: Logger = LogUtil.getLogger(getClass) - private def actorRefFactory = system - val mockAppMaster = TestProbe() val failure = LastFailure(System.currentTimeMillis(), "Some error") - - lazy val jarStoreService = JarStoreService.get(system.settings.config) - - def jarStore: JarStoreService = jarStoreService + val jarStoreClient = new JarStoreClient(system.settings.config, system) private def master = mockMaster.ref - private def appMasterRoute = new AppMasterService(master, jarStore, system).route + private def appMasterRoute = new AppMasterService(master, jarStoreClient, system).route mockAppMaster.setAutoPilot { new AutoPilot { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/services/jvm/src/test/scala/org/apache/gearpump/services/MasterServiceSpec.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/test/scala/org/apache/gearpump/services/MasterServiceSpec.scala b/services/jvm/src/test/scala/org/apache/gearpump/services/MasterServiceSpec.scala index e365e9f..39c0de0 100644 --- a/services/jvm/src/test/scala/org/apache/gearpump/services/MasterServiceSpec.scala +++ b/services/jvm/src/test/scala/org/apache/gearpump/services/MasterServiceSpec.scala @@ -40,7 +40,7 @@ import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMastersD import org.apache.gearpump.cluster.MasterToClient._ import org.apache.gearpump.cluster.TestUtil import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary} -import org.apache.gearpump.jarstore.JarStoreService +import org.apache.gearpump.jarstore.{JarStoreClient, JarStoreServer} import org.apache.gearpump.services.MasterService.{BuiltinPartitioners, SubmitApplicationRequest} // NOTE: This cannot be removed!!! import org.apache.gearpump.services.util.UpickleUtil._ @@ -53,17 +53,13 @@ class MasterServiceSpec extends FlatSpec with ScalatestRouteTest override def testConfig: Config = TestUtil.UI_CONFIG - private def actorRefFactory = system val workerId = 0 val mockWorker = TestProbe() - lazy val jarStoreService = JarStoreService.get(system.settings.config) - + val jarStoreClient = new JarStoreClient(system.settings.config, system) private def master = mockMaster.ref - def jarStore: JarStoreService = jarStoreService - - private def masterRoute = new MasterService(master, jarStore, system).route + private def masterRoute = new MasterService(master, jarStoreClient, system).route mockWorker.setAutoPilot { new AutoPilot { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/services/jvm/src/test/scala/org/apache/gearpump/services/WorkerServiceSpec.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/test/scala/org/apache/gearpump/services/WorkerServiceSpec.scala b/services/jvm/src/test/scala/org/apache/gearpump/services/WorkerServiceSpec.scala index 4658c98..b0e2101 100644 --- a/services/jvm/src/test/scala/org/apache/gearpump/services/WorkerServiceSpec.scala +++ b/services/jvm/src/test/scala/org/apache/gearpump/services/WorkerServiceSpec.scala @@ -34,7 +34,6 @@ import org.apache.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryWor import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, ResolveWorkerIdResult, WorkerConfig} import org.apache.gearpump.cluster.TestUtil import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary} -import org.apache.gearpump.jarstore.JarStoreService // NOTE: This cannot be removed!!! import org.apache.gearpump.services.util.UpickleUtil._ @@ -49,8 +48,6 @@ class WorkerServiceSpec protected def master = mockMaster.ref - lazy val jarStoreService = JarStoreService.get(system.settings.config) - protected def workerRoute = new WorkerService(master, system).route mockWorker.setAutoPilot {
