roadan closed pull request #1: Jetty Web Server and SparkSqlRunner upgraded to 
gradle,Spark 2.0 and code refactored to use apache in package names
URL: https://github.com/apache/incubator-amaterasu/pull/1
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkSql/SparkSqlRunner.scala
 
b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkSql/SparkSqlRunner.scala
index 4be6a66..3096077 100644
--- 
a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkSql/SparkSqlRunner.scala
+++ 
b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkSql/SparkSqlRunner.scala
@@ -17,13 +17,11 @@
 package org.apache.amaterasu.executor.execution.actions.runners.spark.SparkSql
 
 import java.io.File
-
 import org.apache.amaterasu.common.execution.actions.Notifier
 import org.apache.amaterasu.common.logging.Logging
 import org.apache.amaterasu.common.runtime.Environment
 import org.apache.commons.io.FilenameUtils
-import org.apache.spark.SparkContext
-import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode, SparkSession}
+import org.apache.spark.sql.{SparkSession, SaveMode,DataFrame}
 
 /**
   * Created by kirupa on 11/12/16.
@@ -35,12 +33,11 @@ class SparkSqlRunner extends Logging {
   var notifier: Notifier = _
   var jobId: String = _
   var actionName: String = _
-  var sc: SparkContext = _
   var spark: SparkSession = _
 
   def executeQuery(sparkSqlTempTable: String,
-                    dataSource: String,
-                    query: String) = {
+                   dataSource: String,
+                   query: String) = {
 
     notifier.info(s"================= started action $actionName 
=================")
     val file: File = new File(dataSource)
@@ -91,7 +88,7 @@ object SparkSqlRunner {
             jobId: String,
             actionName: String,
             notifier: Notifier,
-            sc: SparkContext): SparkSqlRunner = {
+            spark: SparkSession): SparkSqlRunner = {
 
     val sparkSqlRunnerObj = new SparkSqlRunner
 
@@ -99,8 +96,7 @@ object SparkSqlRunner {
     sparkSqlRunnerObj.jobId = jobId
     sparkSqlRunnerObj.actionName = actionName
     sparkSqlRunnerObj.notifier = notifier
-    sparkSqlRunnerObj.sc = sc
-    sparkSqlRunnerObj.spark = 
SparkSession.builder().config(sc.getConf).enableHiveSupport().getOrCreate()
+    sparkSqlRunnerObj.spark = spark
     sparkSqlRunnerObj
   }
 }
diff --git 
a/executor/src/test/scala/org/apache/amaterasu/spark/SparkSqlRunnerTests.scala 
b/executor/src/test/scala/org/apache/amaterasu/spark/SparkSqlRunnerTests.scala
index 5f76bd0..0e7b141 100644
--- 
a/executor/src/test/scala/org/apache/amaterasu/spark/SparkSqlRunnerTests.scala
+++ 
b/executor/src/test/scala/org/apache/amaterasu/spark/SparkSqlRunnerTests.scala
@@ -1,71 +1,87 @@
-//package org.apache.amaterasu.spark
-//
-//import 
org.apache.amaterasu.common.execution.actions.runners.spark.SparkSqlRunner
-//import org.apache.amaterasu.common.runtime.Environment
-//import org.apache.amaterasu.executor.runtime.AmaContext
-//import org.apache.amaterasu.utilities.TestNotifier
-//import org.apache.spark.{SparkConf, SparkContext, SparkEnv}
-//import org.apache.log4j.Logger
-//import org.apache.log4j.Level
-//import org.apache.spark.sql.{SQLContext, SaveMode}
-//import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-//
-//import scala.io.Source
-//
-///**
-//  * Created by kirupa on 10/12/16.
-//  */
-//class SparkSqlRunnerTests extends FlatSpec with Matchers with 
BeforeAndAfterAll {
-//
-//  Logger.getLogger("org").setLevel(Level.OFF)
-//  Logger.getLogger("akka").setLevel(Level.OFF)
-//  Logger.getLogger("spark").setLevel(Level.OFF)
-//  Logger.getLogger("jetty").setLevel(Level.OFF)
-//  Logger.getRootLogger.setLevel(Level.OFF)
-//
-//
-//  val notifier = new TestNotifier()
-//
-//  var sc: SparkContext = _
-//
-//  override protected def beforeAll(): Unit = {
-//
-//    val env = new Environment()
-//    env.workingDir = "file:///tmp/"
-//
-//    val conf = new SparkConf(true)
-//      .setMaster("local[*]")
-//      .setAppName("sql_job")
-//      .set("spark.local.ip", "127.0.0.1")
-//
-//    sc = new SparkContext("local[*]", "job_5", conf)
-// //   sc = new SparkContext(conf)
-//    sc.setLogLevel("ERROR")
-//
-//    println("11111111")
-//    println(sc)
-//    AmaContext.init(sc, new SQLContext(sc), "sql_job", env)
-//    super.beforeAll()
-//  }
-//
-//  override protected def afterAll(): Unit = {
-//    sc.stop()
-//
-//    super.afterAll()
-//  }
-//
-//  "SparkSql" should "load PARQUET data and persist the Data in working 
directory" in {
-//
-//    val sparkSql = SparkSqlRunner(AmaContext.env, "spark-sql-parquet", 
"spark-sql-parquet-action", notifier, sc)
-//    sparkSql.executeQuery("temptable", 
getClass.getResource("/SparkSql/parquet").getPath, "select * from temptable")
-//
-//  }
-//
-//  "SparkSql" should "load JSON data and persist the Data in working 
directory" in {
-//
-//    val sparkSqlJson = SparkSqlRunner(AmaContext.env, "spark-sql-json", 
"spark-sql-json-action", notifier, sc)
-//    sparkSqlJson.executeQuery("temptable", 
getClass.getResource("/SparkSql/json/SparkSqlTestData.json").getPath, "select * 
from temptable")
-//
-//  }
-//
-//}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.spark
+import org.apache.amaterasu.common.runtime.Environment
+import 
org.apache.amaterasu.executor.execution.actions.runners.spark.SparkSql.SparkSqlRunner
+import org.apache.amaterasu.executor.runtime.AmaContext
+import org.apache.amaterasu.utilities.TestNotifier
+import org.apache.log4j.Logger
+import org.apache.log4j.Level
+import org.apache.spark.sql.{SaveMode, SparkSession}
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
+import scala.io.Source
+
+/**
+  * Created by kirupa on 10/12/16.
+  */
+class SparkSqlRunnerTests extends FlatSpec with Matchers with 
BeforeAndAfterAll {
+
+  Logger.getLogger("org").setLevel(Level.OFF)
+  Logger.getLogger("akka").setLevel(Level.OFF)
+  Logger.getLogger("spark").setLevel(Level.OFF)
+  Logger.getLogger("jetty").setLevel(Level.OFF)
+  Logger.getRootLogger.setLevel(Level.OFF)
+
+
+  val notifier = new TestNotifier()
+
+  var spark: SparkSession = _
+
+  override protected def beforeAll(): Unit = {
+
+    val env = new Environment()
+    env.workingDir = "file:/tmp/"
+    spark = SparkSession.builder()
+      .appName("sql_job")
+      .master("local[*]")
+      .config("spark.local.ip", "127.0.0.1")
+      .getOrCreate()
+
+    AmaContext.init(spark,"sql_job",env)
+    super.beforeAll()
+  }
+
+  override protected def afterAll(): Unit = {
+    this.spark.sparkContext.stop()
+    super.afterAll()
+  }
+
+
+  /*
+  Test whether the parquet data is successfully loaded and processed by 
SparkSQL
+   */
+  "SparkSql" should "load PARQUET data and persist the Data in working 
directory" in {
+
+    val sparkSql:SparkSqlRunner = SparkSqlRunner(AmaContext.env, 
"spark-sql-parquet", "spark-sql-parquet-action", notifier, spark)
+    sparkSql.executeQuery("temptable", 
getClass.getResource("/SparkSql/parquet").getPath, "select * from temptable")
+
+  }
+
+
+  /*
+  Test whether the JSON data is successfully loaded by SparkSQL
+   */
+
+  "SparkSql" should "load JSON data and persist the Data in working directory" 
in {
+
+    val sparkSqlJson = SparkSqlRunner(AmaContext.env, "spark-sql-json", 
"spark-sql-json-action", notifier, spark)
+    sparkSqlJson.executeQuery("temptable", 
getClass.getResource("/SparkSql/json/SparkSqlTestData.json").getPath, "select * 
from temptable")
+
+  }
+
+}
diff --git a/leader/build.gradle b/leader/build.gradle
index 301682d..3fab00b 100644
--- a/leader/build.gradle
+++ b/leader/build.gradle
@@ -46,8 +46,10 @@ dependencies {
     compile group: 'org.eclipse.jetty', name: 'jetty-io', version: 
'9.2.19.v20160908'
     compile group: 'org.eclipse.jetty', name: 'jetty-servlet', version: 
'9.2.19.v20160908'
     compile group: 'javax.servlet', name: 'javax.servlet-api', version: '2.5.0'
+    compile group: 'org.eclipse.jetty.toolchain', name: 'jetty-test-helper', 
version: '4.0'
     compile group: 'org.eclipse.jgit', name: 'org.eclipse.jgit', version: 
'4.2.0.201601211800-r'
     compile group: 'org.yaml', name: 'snakeyaml', version: '1.18'
+    compile group: 'org.jsoup', name: 'jsoup', version: '1.10.2'
 
     testCompile project(':common')
     testCompile "gradle.plugin.com.github.maiflai:gradle-scalatest:0.14"
diff --git 
a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/HttpServer.scala 
b/leader/src/main/scala/org/apache/amaterasu/leader/utilities/HttpServer.scala
index ce9fea5..655a455 100644
--- 
a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/HttpServer.scala
+++ 
b/leader/src/main/scala/org/apache/amaterasu/leader/utilities/HttpServer.scala
@@ -16,19 +16,30 @@
  */
 package org.apache.amaterasu.leader.utilities
 
+//import org.apache.amaterasu.Logging
 import org.apache.amaterasu.common.logging.Logging
 import org.apache.log4j.{BasicConfigurator, Level, Logger}
-import org.eclipse.jetty.server.handler.ErrorHandler
 import org.eclipse.jetty.server.{Server, ServerConnector}
+import org.eclipse.jetty.server.handler._
 import org.eclipse.jetty.servlet.{DefaultServlet, ServletContextHandler, 
ServletHolder}
+import org.eclipse.jetty.toolchain.test.MavenTestingUtils
+import org.eclipse.jetty.util.thread.QueuedThreadPool
 import org.eclipse.jetty.util.log.StdErrLog
+import org.eclipse.jetty.util.resource.Resource
+import org.jsoup.Jsoup
+import org.jsoup.select.Elements
+
+import scala.collection.JavaConverters._
+import scala.io.{BufferedSource, Source}
+import scala.text.Document
 
 /**
+  * Created by kirupa
   * Implementation of Jetty Web server to server Amaterasu libraries and other 
distribution files
   */
 object HttpServer extends Logging {
-  //private val logger = Logger.getLogger(HttpServer.getClass)
-  var server: Server = _
+  val logger = Logger.getLogger(HttpServer.getClass)
+  var server: Server = null
 
   def start(port: String, serverRoot: String): Unit = {
 
@@ -40,14 +51,25 @@ object HttpServer extends Logging {
     val connector = new ServerConnector(server)
     connector.setPort(port.toInt)
     server.addConnector(connector)
+    val rh0 = new ResourceHandler()
+    rh0.setDirectoriesListed(true)
+    rh0.setResourceBase(serverRoot)
+    val context0 = new ContextHandler()
+    context0.setContextPath("/*")
+    //context0.setContextPath("/")
+    //val dir0 = MavenTestingUtils.getTestResourceDir("dist")
+    //context0.setBaseResource(Resource.newResource(dir0))
+    context0.setHandler(rh0)
     val context = new ServletContextHandler(ServletContextHandler.SESSIONS)
     context.setResourceBase(serverRoot)
     context.setContextPath("/")
-    server.setHandler(context)
     context.setErrorHandler(new ErrorHandler())
     context.setInitParameter("dirAllowed", "true")
     context.setInitParameter("pathInfoOnly", "true")
     context.addServlet(new ServletHolder(new DefaultServlet()), "/")
+    val contexts = new ContextHandlerCollection()
+    contexts.setHandlers(Array(context0, context))
+    server.setHandler(contexts)
     server.start()
   }
 
@@ -63,4 +85,19 @@ object HttpServer extends Logging {
     Logger.getLogger("org.eclipse.jetty").setLevel(Level.OFF)
     Logger.getLogger("org.eclipse.jetty.websocket").setLevel(Level.OFF)
   }
+
+  /*
+  Method: getFilesInDirectory
+  Description: provides a list of files in the given directory URL.
+  @Params: amaNode: Hostname of the URL, port: Port # of the host, directory: 
destination directory to fetch files
+  Note: Should the files in URL root be fetched, provide an empty value to 
directory.
+   */
+  def getFilesInDirectory(amaNode: String, port: String, directory: String = 
""): Array[String] = {
+    val html: BufferedSource = Source.fromURL("http://"; + amaNode + ":" + port 
+ "/" + directory)
+    val htmlDoc = Jsoup.parse(html.mkString)
+    val htmlElement: Elements = htmlDoc.body().select("a")
+    val files = htmlElement.asScala
+    val fileNames = files.map(url => url.attr("href")).filter(file => 
(!file.contains(".."))).map(name => name.replace("/", "")).toArray
+    fileNames
+  }
 }
diff --git a/leader/src/test/resources/dist/jetty-fileserver-test-file-1.txt 
b/leader/src/test/resources/dist/jetty-fileserver-test-file-1.txt
new file mode 100644
index 0000000..3a09fa7
--- /dev/null
+++ b/leader/src/test/resources/dist/jetty-fileserver-test-file-1.txt
@@ -0,0 +1 @@
+This is a test file 1 to download from Jetty webserver
\ No newline at end of file
diff --git a/leader/src/test/resources/dist/jetty-fileserver-test-file-2.txt 
b/leader/src/test/resources/dist/jetty-fileserver-test-file-2.txt
new file mode 100644
index 0000000..971a33e
--- /dev/null
+++ b/leader/src/test/resources/dist/jetty-fileserver-test-file-2.txt
@@ -0,0 +1 @@
+This is a test file 2 to download from Jetty webserver
\ No newline at end of file
diff --git 
a/leader/src/test/scala/org/apache/amaterasu/utilities/HttpServerTests.scala 
b/leader/src/test/scala/org/apache/amaterasu/utilities/HttpServerTests.scala
index 27e44f3..6d426c5 100644
--- a/leader/src/test/scala/org/apache/amaterasu/utilities/HttpServerTests.scala
+++ b/leader/src/test/scala/org/apache/amaterasu/utilities/HttpServerTests.scala
@@ -16,11 +16,15 @@
  */
 package org.apache.amaterasu.utilities
 
+
 import java.io.File
 
 import org.apache.amaterasu.leader.utilities.HttpServer
+import org.jsoup.Jsoup
+import org.jsoup.select.Elements
 import org.scalatest.{FlatSpec, Matchers}
 
+import scala.collection.JavaConverters._
 import scala.io.Source
 
 /**
@@ -30,12 +34,13 @@ class HttpServerTests extends FlatSpec with Matchers {
 
   // this is an ugly hack, getClass.getResource("/").getPath should have 
worked but
   // stopped working when we moved to gradle :(
-  private val resources = new 
File(getClass.getResource("/simple-maki.yml").getPath).getParent
+
 
   "Jetty Web server" should "start HTTP server, serve content and stop 
successfully" in {
+    val resources = new 
File(getClass.getResource("/simple-maki.yml").getPath).getParent
     var data = ""
     try {
-      HttpServer.start("8000", resources)
+      HttpServer.start("8000",resources)
       val html = Source.fromURL("http://localhost:8000/jetty-test-data.txt";)
       data = html.mkString
     }
@@ -44,4 +49,36 @@ class HttpServerTests extends FlatSpec with Matchers {
     }
     data should equal("This is a test file to download from Jetty webserver")
   }
+  "Jetty File server with '/' as root" should "start HTTP server, serve 
content and stop successfully" in {
+    var data = ""
+    val resources = new File(getClass.getResource("/dist").getPath).getParent
+    var urlCount:Int = 0
+    println("resource location"+resources)
+    try {
+      HttpServer.start("8000",resources)
+      val urls = HttpServer.getFilesInDirectory("localhost","8000","dist")
+      urls.foreach(println)
+      urlCount = urls.length
+    }
+    finally {
+      HttpServer.stop()
+    }
+    urlCount should equal(2)
+  }
+  "Jetty File server with 'dist' as root" should "start HTTP server, serve 
content and stop successfully" in {
+    var data = ""
+    val resources = new File(getClass.getResource("/dist").getPath).getParent
+    var urlCount:Int = 0
+    println("resource location"+resources)
+    try {
+      HttpServer.start("8000",resources+"/dist")
+      val urls = HttpServer.getFilesInDirectory("localhost","8000","")
+      urls.foreach(println)
+      urlCount = urls.length
+    }
+    finally {
+      HttpServer.stop()
+    }
+    urlCount should equal(2)
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to