roadan closed pull request #1: Jetty Web Server and SparkSqlRunner upgraded to gradle,Spark 2.0 and code refactored to use apache in package names URL: https://github.com/apache/incubator-amaterasu/pull/1
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkSql/SparkSqlRunner.scala b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkSql/SparkSqlRunner.scala index 4be6a66..3096077 100644 --- a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkSql/SparkSqlRunner.scala +++ b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkSql/SparkSqlRunner.scala @@ -17,13 +17,11 @@ package org.apache.amaterasu.executor.execution.actions.runners.spark.SparkSql import java.io.File - import org.apache.amaterasu.common.execution.actions.Notifier import org.apache.amaterasu.common.logging.Logging import org.apache.amaterasu.common.runtime.Environment import org.apache.commons.io.FilenameUtils -import org.apache.spark.SparkContext -import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode, SparkSession} +import org.apache.spark.sql.{SparkSession, SaveMode,DataFrame} /** * Created by kirupa on 11/12/16. @@ -35,12 +33,11 @@ class SparkSqlRunner extends Logging { var notifier: Notifier = _ var jobId: String = _ var actionName: String = _ - var sc: SparkContext = _ var spark: SparkSession = _ def executeQuery(sparkSqlTempTable: String, - dataSource: String, - query: String) = { + dataSource: String, + query: String) = { notifier.info(s"================= started action $actionName =================") val file: File = new File(dataSource) @@ -91,7 +88,7 @@ object SparkSqlRunner { jobId: String, actionName: String, notifier: Notifier, - sc: SparkContext): SparkSqlRunner = { + spark: SparkSession): SparkSqlRunner = { val sparkSqlRunnerObj = new SparkSqlRunner @@ -99,8 +96,7 @@ object SparkSqlRunner { sparkSqlRunnerObj.jobId = jobId sparkSqlRunnerObj.actionName = actionName sparkSqlRunnerObj.notifier = notifier - sparkSqlRunnerObj.sc = sc - sparkSqlRunnerObj.spark = SparkSession.builder().config(sc.getConf).enableHiveSupport().getOrCreate() + sparkSqlRunnerObj.spark = spark sparkSqlRunnerObj } } diff --git a/executor/src/test/scala/org/apache/amaterasu/spark/SparkSqlRunnerTests.scala b/executor/src/test/scala/org/apache/amaterasu/spark/SparkSqlRunnerTests.scala index 5f76bd0..0e7b141 100644 --- a/executor/src/test/scala/org/apache/amaterasu/spark/SparkSqlRunnerTests.scala +++ b/executor/src/test/scala/org/apache/amaterasu/spark/SparkSqlRunnerTests.scala @@ -1,71 +1,87 @@ -//package org.apache.amaterasu.spark -// -//import org.apache.amaterasu.common.execution.actions.runners.spark.SparkSqlRunner -//import org.apache.amaterasu.common.runtime.Environment -//import org.apache.amaterasu.executor.runtime.AmaContext -//import org.apache.amaterasu.utilities.TestNotifier -//import org.apache.spark.{SparkConf, SparkContext, SparkEnv} -//import org.apache.log4j.Logger -//import org.apache.log4j.Level -//import org.apache.spark.sql.{SQLContext, SaveMode} -//import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} -// -//import scala.io.Source -// -///** -// * Created by kirupa on 10/12/16. -// */ -//class SparkSqlRunnerTests extends FlatSpec with Matchers with BeforeAndAfterAll { -// -// Logger.getLogger("org").setLevel(Level.OFF) -// Logger.getLogger("akka").setLevel(Level.OFF) -// Logger.getLogger("spark").setLevel(Level.OFF) -// Logger.getLogger("jetty").setLevel(Level.OFF) -// Logger.getRootLogger.setLevel(Level.OFF) -// -// -// val notifier = new TestNotifier() -// -// var sc: SparkContext = _ -// -// override protected def beforeAll(): Unit = { -// -// val env = new Environment() -// env.workingDir = "file:///tmp/" -// -// val conf = new SparkConf(true) -// .setMaster("local[*]") -// .setAppName("sql_job") -// .set("spark.local.ip", "127.0.0.1") -// -// sc = new SparkContext("local[*]", "job_5", conf) -// // sc = new SparkContext(conf) -// sc.setLogLevel("ERROR") -// -// println("11111111") -// println(sc) -// AmaContext.init(sc, new SQLContext(sc), "sql_job", env) -// super.beforeAll() -// } -// -// override protected def afterAll(): Unit = { -// sc.stop() -// -// super.afterAll() -// } -// -// "SparkSql" should "load PARQUET data and persist the Data in working directory" in { -// -// val sparkSql = SparkSqlRunner(AmaContext.env, "spark-sql-parquet", "spark-sql-parquet-action", notifier, sc) -// sparkSql.executeQuery("temptable", getClass.getResource("/SparkSql/parquet").getPath, "select * from temptable") -// -// } -// -// "SparkSql" should "load JSON data and persist the Data in working directory" in { -// -// val sparkSqlJson = SparkSqlRunner(AmaContext.env, "spark-sql-json", "spark-sql-json-action", notifier, sc) -// sparkSqlJson.executeQuery("temptable", getClass.getResource("/SparkSql/json/SparkSqlTestData.json").getPath, "select * from temptable") -// -// } -// -//} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.amaterasu.spark +import org.apache.amaterasu.common.runtime.Environment +import org.apache.amaterasu.executor.execution.actions.runners.spark.SparkSql.SparkSqlRunner +import org.apache.amaterasu.executor.runtime.AmaContext +import org.apache.amaterasu.utilities.TestNotifier +import org.apache.log4j.Logger +import org.apache.log4j.Level +import org.apache.spark.sql.{SaveMode, SparkSession} +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} + +import scala.io.Source + +/** + * Created by kirupa on 10/12/16. + */ +class SparkSqlRunnerTests extends FlatSpec with Matchers with BeforeAndAfterAll { + + Logger.getLogger("org").setLevel(Level.OFF) + Logger.getLogger("akka").setLevel(Level.OFF) + Logger.getLogger("spark").setLevel(Level.OFF) + Logger.getLogger("jetty").setLevel(Level.OFF) + Logger.getRootLogger.setLevel(Level.OFF) + + + val notifier = new TestNotifier() + + var spark: SparkSession = _ + + override protected def beforeAll(): Unit = { + + val env = new Environment() + env.workingDir = "file:/tmp/" + spark = SparkSession.builder() + .appName("sql_job") + .master("local[*]") + .config("spark.local.ip", "127.0.0.1") + .getOrCreate() + + AmaContext.init(spark,"sql_job",env) + super.beforeAll() + } + + override protected def afterAll(): Unit = { + this.spark.sparkContext.stop() + super.afterAll() + } + + + /* + Test whether the parquet data is successfully loaded and processed by SparkSQL + */ + "SparkSql" should "load PARQUET data and persist the Data in working directory" in { + + val sparkSql:SparkSqlRunner = SparkSqlRunner(AmaContext.env, "spark-sql-parquet", "spark-sql-parquet-action", notifier, spark) + sparkSql.executeQuery("temptable", getClass.getResource("/SparkSql/parquet").getPath, "select * from temptable") + + } + + + /* + Test whether the JSON data is successfully loaded by SparkSQL + */ + + "SparkSql" should "load JSON data and persist the Data in working directory" in { + + val sparkSqlJson = SparkSqlRunner(AmaContext.env, "spark-sql-json", "spark-sql-json-action", notifier, spark) + sparkSqlJson.executeQuery("temptable", getClass.getResource("/SparkSql/json/SparkSqlTestData.json").getPath, "select * from temptable") + + } + +} diff --git a/leader/build.gradle b/leader/build.gradle index 301682d..3fab00b 100644 --- a/leader/build.gradle +++ b/leader/build.gradle @@ -46,8 +46,10 @@ dependencies { compile group: 'org.eclipse.jetty', name: 'jetty-io', version: '9.2.19.v20160908' compile group: 'org.eclipse.jetty', name: 'jetty-servlet', version: '9.2.19.v20160908' compile group: 'javax.servlet', name: 'javax.servlet-api', version: '2.5.0' + compile group: 'org.eclipse.jetty.toolchain', name: 'jetty-test-helper', version: '4.0' compile group: 'org.eclipse.jgit', name: 'org.eclipse.jgit', version: '4.2.0.201601211800-r' compile group: 'org.yaml', name: 'snakeyaml', version: '1.18' + compile group: 'org.jsoup', name: 'jsoup', version: '1.10.2' testCompile project(':common') testCompile "gradle.plugin.com.github.maiflai:gradle-scalatest:0.14" diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/HttpServer.scala b/leader/src/main/scala/org/apache/amaterasu/leader/utilities/HttpServer.scala index ce9fea5..655a455 100644 --- a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/HttpServer.scala +++ b/leader/src/main/scala/org/apache/amaterasu/leader/utilities/HttpServer.scala @@ -16,19 +16,30 @@ */ package org.apache.amaterasu.leader.utilities +//import org.apache.amaterasu.Logging import org.apache.amaterasu.common.logging.Logging import org.apache.log4j.{BasicConfigurator, Level, Logger} -import org.eclipse.jetty.server.handler.ErrorHandler import org.eclipse.jetty.server.{Server, ServerConnector} +import org.eclipse.jetty.server.handler._ import org.eclipse.jetty.servlet.{DefaultServlet, ServletContextHandler, ServletHolder} +import org.eclipse.jetty.toolchain.test.MavenTestingUtils +import org.eclipse.jetty.util.thread.QueuedThreadPool import org.eclipse.jetty.util.log.StdErrLog +import org.eclipse.jetty.util.resource.Resource +import org.jsoup.Jsoup +import org.jsoup.select.Elements + +import scala.collection.JavaConverters._ +import scala.io.{BufferedSource, Source} +import scala.text.Document /** + * Created by kirupa * Implementation of Jetty Web server to server Amaterasu libraries and other distribution files */ object HttpServer extends Logging { - //private val logger = Logger.getLogger(HttpServer.getClass) - var server: Server = _ + val logger = Logger.getLogger(HttpServer.getClass) + var server: Server = null def start(port: String, serverRoot: String): Unit = { @@ -40,14 +51,25 @@ object HttpServer extends Logging { val connector = new ServerConnector(server) connector.setPort(port.toInt) server.addConnector(connector) + val rh0 = new ResourceHandler() + rh0.setDirectoriesListed(true) + rh0.setResourceBase(serverRoot) + val context0 = new ContextHandler() + context0.setContextPath("/*") + //context0.setContextPath("/") + //val dir0 = MavenTestingUtils.getTestResourceDir("dist") + //context0.setBaseResource(Resource.newResource(dir0)) + context0.setHandler(rh0) val context = new ServletContextHandler(ServletContextHandler.SESSIONS) context.setResourceBase(serverRoot) context.setContextPath("/") - server.setHandler(context) context.setErrorHandler(new ErrorHandler()) context.setInitParameter("dirAllowed", "true") context.setInitParameter("pathInfoOnly", "true") context.addServlet(new ServletHolder(new DefaultServlet()), "/") + val contexts = new ContextHandlerCollection() + contexts.setHandlers(Array(context0, context)) + server.setHandler(contexts) server.start() } @@ -63,4 +85,19 @@ object HttpServer extends Logging { Logger.getLogger("org.eclipse.jetty").setLevel(Level.OFF) Logger.getLogger("org.eclipse.jetty.websocket").setLevel(Level.OFF) } + + /* + Method: getFilesInDirectory + Description: provides a list of files in the given directory URL. + @Params: amaNode: Hostname of the URL, port: Port # of the host, directory: destination directory to fetch files + Note: Should the files in URL root be fetched, provide an empty value to directory. + */ + def getFilesInDirectory(amaNode: String, port: String, directory: String = ""): Array[String] = { + val html: BufferedSource = Source.fromURL("http://" + amaNode + ":" + port + "/" + directory) + val htmlDoc = Jsoup.parse(html.mkString) + val htmlElement: Elements = htmlDoc.body().select("a") + val files = htmlElement.asScala + val fileNames = files.map(url => url.attr("href")).filter(file => (!file.contains(".."))).map(name => name.replace("/", "")).toArray + fileNames + } } diff --git a/leader/src/test/resources/dist/jetty-fileserver-test-file-1.txt b/leader/src/test/resources/dist/jetty-fileserver-test-file-1.txt new file mode 100644 index 0000000..3a09fa7 --- /dev/null +++ b/leader/src/test/resources/dist/jetty-fileserver-test-file-1.txt @@ -0,0 +1 @@ +This is a test file 1 to download from Jetty webserver \ No newline at end of file diff --git a/leader/src/test/resources/dist/jetty-fileserver-test-file-2.txt b/leader/src/test/resources/dist/jetty-fileserver-test-file-2.txt new file mode 100644 index 0000000..971a33e --- /dev/null +++ b/leader/src/test/resources/dist/jetty-fileserver-test-file-2.txt @@ -0,0 +1 @@ +This is a test file 2 to download from Jetty webserver \ No newline at end of file diff --git a/leader/src/test/scala/org/apache/amaterasu/utilities/HttpServerTests.scala b/leader/src/test/scala/org/apache/amaterasu/utilities/HttpServerTests.scala index 27e44f3..6d426c5 100644 --- a/leader/src/test/scala/org/apache/amaterasu/utilities/HttpServerTests.scala +++ b/leader/src/test/scala/org/apache/amaterasu/utilities/HttpServerTests.scala @@ -16,11 +16,15 @@ */ package org.apache.amaterasu.utilities + import java.io.File import org.apache.amaterasu.leader.utilities.HttpServer +import org.jsoup.Jsoup +import org.jsoup.select.Elements import org.scalatest.{FlatSpec, Matchers} +import scala.collection.JavaConverters._ import scala.io.Source /** @@ -30,12 +34,13 @@ class HttpServerTests extends FlatSpec with Matchers { // this is an ugly hack, getClass.getResource("/").getPath should have worked but // stopped working when we moved to gradle :( - private val resources = new File(getClass.getResource("/simple-maki.yml").getPath).getParent + "Jetty Web server" should "start HTTP server, serve content and stop successfully" in { + val resources = new File(getClass.getResource("/simple-maki.yml").getPath).getParent var data = "" try { - HttpServer.start("8000", resources) + HttpServer.start("8000",resources) val html = Source.fromURL("http://localhost:8000/jetty-test-data.txt") data = html.mkString } @@ -44,4 +49,36 @@ class HttpServerTests extends FlatSpec with Matchers { } data should equal("This is a test file to download from Jetty webserver") } + "Jetty File server with '/' as root" should "start HTTP server, serve content and stop successfully" in { + var data = "" + val resources = new File(getClass.getResource("/dist").getPath).getParent + var urlCount:Int = 0 + println("resource location"+resources) + try { + HttpServer.start("8000",resources) + val urls = HttpServer.getFilesInDirectory("localhost","8000","dist") + urls.foreach(println) + urlCount = urls.length + } + finally { + HttpServer.stop() + } + urlCount should equal(2) + } + "Jetty File server with 'dist' as root" should "start HTTP server, serve content and stop successfully" in { + var data = "" + val resources = new File(getClass.getResource("/dist").getPath).getParent + var urlCount:Int = 0 + println("resource location"+resources) + try { + HttpServer.start("8000",resources+"/dist") + val urls = HttpServer.getFilesInDirectory("localhost","8000","") + urls.foreach(println) + urlCount = urls.length + } + finally { + HttpServer.stop() + } + urlCount should equal(2) + } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services