roadan closed pull request #2: Amaterasu 13
URL: https://github.com/apache/incubator-amaterasu/pull/2
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkSql/SparkSqlRunner.scala
 
b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkSql/SparkSqlRunner.scala
index d34be92..7a7fd0d 100644
--- 
a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkSql/SparkSqlRunner.scala
+++ 
b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkSql/SparkSqlRunner.scala
@@ -17,15 +17,17 @@
 package org.apache.amaterasu.executor.execution.actions.runners.spark.SparkSql
 
 import java.io.File
+
 import org.apache.amaterasu.common.execution.actions.Notifier
 import org.apache.amaterasu.common.logging.Logging
 import org.apache.amaterasu.common.runtime.Environment
+import org.apache.amaterasu.executor.runtime.AmaContext
 import org.apache.commons.io.FilenameUtils
-import org.apache.spark.sql.{SparkSession, SaveMode,DataFrame}
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
 
 /**
   * Amaterasu currently supports JSON and PARQUET as data sources.
-  * CSV data source support will be provided in the later versions.
+  * CSV data source support will be provided in later versions.
   */
 class SparkSqlRunner extends Logging {
   var env: Environment = _
@@ -34,33 +36,80 @@ class SparkSqlRunner extends Logging {
   var actionName: String = _
   var spark: SparkSession = _
 
-  def executeQuery(sparkSqlTempTable: String,
-                   dataSource: String,
-                   query: String) = {
-
-    notifier.info(s"================= started action $actionName 
=================")
-    val file: File = new File(dataSource)
-    notifier.info(s"================= auto-detecting file type of data source 
=================")
-    val loadData: DataFrame = file match {
-      case _ if file.isFile => FilenameUtils.getExtension(file.toString) match 
{
-        case "json" => spark.read.json(dataSource)
-        case "parquet" => spark.read.parquet(dataSource)
-      }
-      case _ if file.isDirectory => {
-        val extensions = findFileType(file)
-        extensions match {
-          case _ if extensions.contains("json") => spark.read.json(dataSource)
-          case _ if extensions.contains("parquet") => 
spark.read.parquet(dataSource)
-        }
-      }
-    }
+  /*
+  Method: executeQuery
+  Description: when user specifies query in amaterasu format, this method 
parse and executes the query.
+               If not in Amaterasu format, then directly executes the query
+  @Params: query string
+   */
+  def executeQuery(query: String): Unit = {
 
-    loadData.createOrReplaceTempView(sparkSqlTempTable)
     notifier.info(s"================= executing the SQL query 
=================")
     if (!query.isEmpty) {
-      val sqlDf = spark.sql(query)
-      println(s"${env.workingDir}/$jobId/$actionName")
-      
sqlDf.write.mode(SaveMode.Overwrite).parquet(s"${env.workingDir}/$jobId/$actionName")
+
+      if (query.toLowerCase.contains("amacontext")) {
+
+        //Parse the incoming query
+        notifier.info(s"================= parsing the SQL query 
=================")
+
+        val parser: List[String] = query.toLowerCase.split(" ").toList
+        var sqlPart1: String = ""
+        var sqlPart2: String = ""
+        var queryTempLen: Int = 0
+
+        //get only the sql part of the query
+        for (i <- 0 to parser.indexOf("from")) {
+          sqlPart1 += parser(i) + " "
+        }
+
+        if (parser.indexOf("readas") == -1) {
+          queryTempLen = parser.length - 1
+        }
+        else
+          queryTempLen = parser.length - 3
+
+        for (i <- parser.indexOf("from") + 1 to queryTempLen) {
+          if (!parser(i).contains("amacontext"))
+            sqlPart2 += " " + parser(i)
+        }
+
+        //If no read format is speicified by the user, use PARQUET as default 
file format to load data
+        var fileFormat: String = null
+        //if there is no index for "readas" keyword, then set PARQUET as 
default read format
+        if (parser.indexOf("readas") == -1) {
+          fileFormat = "parquet"
+        }
+        else
+          fileFormat = parser(parser.indexOf("readas") + 1)
+
+
+        val locationPath: String = parser.filter(word => 
word.contains("amacontext")).mkString("")
+        val directories = locationPath.split("_")
+        val actionName = directories(1)
+        val dfName = directories(2)
+        val parsedQuery = sqlPart1 + locationPath + sqlPart2
+
+        //Load the dataframe from previous action
+        val loadData: DataFrame = AmaContext.getDataFrame(actionName, dfName, 
fileFormat)
+        loadData.createOrReplaceTempView(locationPath)
+
+        notifier.info("Executing SparkSql on: "+parsedQuery)
+        val sqlDf = spark.sql(parsedQuery)
+        //@TODO: outputFileFormat should be read from YAML file instead of 
input fileformat
+        writeDf(sqlDf, fileFormat, env.workingDir, jobId, actionName)
+
+        notifier.info(s"================= finished action $actionName 
=================")
+      }
+      else {
+
+        notifier.info("Executing SparkSql on: "+query)
+
+        val fildDf = spark.sql(query)
+        //@TODO: outputFileFormat should be read from YAML file instead of 
output fileFormat being empty
+        writeDf(fildDf, "", env.workingDir, jobId, actionName)
+
+        notifier.info(s"================= finished action $actionName 
=================")
+      }
     }
 
     notifier.info(s"================= finished action $actionName 
=================")
@@ -79,6 +128,26 @@ class SparkSqlRunner extends Logging {
     extensions
   }
 
+  /*
+  Method to write dataframes to a specified format
+  @Params
+  df: Dataframe to be written
+  fileFormat: same as input file format
+  workingDir: temp directory
+  jobId, actionName: As specified by the user
+  */
+  def writeDf(df: DataFrame, outputFileFormat: String, workingDir: String, 
jobId: String, actionName: String): Unit = {
+    outputFileFormat.toLowerCase match {
+      case "parquet" => 
df.write.mode(SaveMode.Overwrite).parquet(s"$workingDir/$jobId/$actionName/" + 
actionName + "Df")
+      case "json" => 
df.write.mode(SaveMode.Overwrite).json(s"$workingDir/$jobId/$actionName/" + 
actionName + "Df")
+      case "csv" => 
df.write.mode(SaveMode.Overwrite).csv(s"$workingDir/$jobId/$actionName/" + 
actionName + "Df")
+      case "orc" => 
df.write.mode(SaveMode.Overwrite).orc(s"$workingDir/$jobId/$actionName/" + 
actionName + "Df")
+      case "text" => 
df.write.mode(SaveMode.Overwrite).text(s"$workingDir/$jobId/$actionName/" + 
actionName + "Df")
+      //case "jdbc" => 
df.write.mode(SaveMode.Overwrite).jdbc(s"$workingDir/$jobId/$actionName/" + 
actionName + "Df")
+      case _ => 
df.write.mode(SaveMode.Overwrite).parquet(s"$workingDir/$jobId/$actionName/" + 
actionName + "Df")
+    }
+  }
+
 }
 
 object SparkSqlRunner {
diff --git a/executor/src/test/resources/SparkSql/csv/SparkSqlTestCsv.csv 
b/executor/src/test/resources/SparkSql/csv/SparkSqlTestCsv.csv
new file mode 100644
index 0000000..5f0ce0e
--- /dev/null
+++ b/executor/src/test/resources/SparkSql/csv/SparkSqlTestCsv.csv
@@ -0,0 +1,4 @@
+name,age
+sampath,22
+kirupa,30
+dev,19
\ No newline at end of file
diff --git a/executor/src/test/resources/SparkSql/json/SparkSqlTestData.json 
b/executor/src/test/resources/SparkSql/json/SparkSqlTestData.json
index ce3e0d5..d297f1f 100644
--- a/executor/src/test/resources/SparkSql/json/SparkSqlTestData.json
+++ b/executor/src/test/resources/SparkSql/json/SparkSqlTestData.json
@@ -1,3 +1,3 @@
-{"name":"Michael","age":22}
-{"name":"Andy", "age":30}
-{"name":"Justin", "age":19}]
\ No newline at end of file
+{"name":"Sampath","age":22}
+{"name":"Kirupa", "age":30}
+{"name":"Dev", "age":19}]
\ No newline at end of file
diff --git 
a/executor/src/test/scala/org/apache/amaterasu/spark/SparkSqlRunnerTests.scala 
b/executor/src/test/scala/org/apache/amaterasu/spark/SparkSqlRunnerTests.scala
index cbb7b2a..0312dad 100644
--- 
a/executor/src/test/scala/org/apache/amaterasu/spark/SparkSqlRunnerTests.scala
+++ 
b/executor/src/test/scala/org/apache/amaterasu/spark/SparkSqlRunnerTests.scala
@@ -47,12 +47,12 @@ class SparkSqlRunnerTests extends FlatSpec with Matchers 
with BeforeAndAfterAll
     val env = new Environment()
     env.workingDir = "file:/tmp/"
     spark = SparkSession.builder()
-      .appName("sql_job")
+      .appName("sql-job")
       .master("local[*]")
       .config("spark.local.ip", "127.0.0.1")
       .getOrCreate()
 
-    AmaContext.init(spark, "sql_job", env)
+    AmaContext.init(spark, "sql-job", env)
 
     super.beforeAll()
   }
@@ -64,26 +64,104 @@ class SparkSqlRunnerTests extends FlatSpec with Matchers 
with BeforeAndAfterAll
 
 
   /*
-  Test whether the parquet data is successfully loaded and processed by 
SparkSQL
+  Test whether parquet is used as default file format to load data from 
previous actions
    */
-  "SparkSql" should "load PARQUET data and persist the Data in working 
directory" in {
 
+  "SparkSql" should "load data as parquet if no input foramt is specified" in {
+
+    val defaultParquetEnv = new Environment()
+    defaultParquetEnv.workingDir = "file:/tmp/"
+    AmaContext.init(spark, "sparkSqlDefaultParquetJob", defaultParquetEnv)
+    //Prepare test dataset
+    val inputDf = 
spark.read.parquet(getClass.getResource("/SparkSql/parquet").getPath)
+    
inputDf.write.mode(SaveMode.Overwrite).parquet(s"${defaultParquetEnv.workingDir}/sparkSqlDefaultParquetJob/sparkSqlDefaultParquetJobAction/sparkSqlDefaultParquetJobActionTempDf")
+    val sparkSql: SparkSqlRunner = SparkSqlRunner(AmaContext.env, 
"sparkSqlDefaultParquetJob", "sparkSqlDefaultParquetJobAction", notifier, spark)
+    sparkSql.executeQuery("select * FROM 
AMACONTEXT_sparkSqlDefaultParquetJobAction_sparkSqlDefaultParquetJobActionTempDf
 where age=22")
+    val outputDf = 
spark.read.parquet(s"${defaultParquetEnv.workingDir}/sparkSqlDefaultParquetJob/sparkSqlDefaultParquetJobAction/sparkSqlDefaultParquetJobActionDf")
+    println("Output Default Parquet: "+inputDf.count + "," + 
outputDf.first().getString(1))
+    outputDf.first().getString(1) shouldEqual("Michael")
+  }
 
-    val sparkSql: SparkSqlRunner = SparkSqlRunner(AmaContext.env, 
"spark-sql-parquet", "spark-sql-parquet-action", notifier, spark)
-    sparkSql.executeQuery("temptable", 
getClass.getResource("/SparkSql/parquet").getPath, "select * from temptable")
+  /*
+  Test whether the parquet data is successfully parsed, loaded and processed 
by SparkSQL
+   */
 
+  "SparkSql" should "load PARQUET data directly from previous action's 
dataframe and persist the Data in working directory" in {
+
+    val tempParquetEnv = new Environment()
+    tempParquetEnv.workingDir = "file:/tmp/"
+    AmaContext.init(spark, "sparkSqlParquetJob", tempParquetEnv)
+    //Prepare test dataset
+    val inputDf = 
spark.read.parquet(getClass.getResource("/SparkSql/parquet").getPath)
+    
inputDf.write.mode(SaveMode.Overwrite).parquet(s"${tempParquetEnv.workingDir}/sparkSqlParquetJob/sparkSqlParquetJobAction/sparkSqlParquetJobActionTempDf")
+    val sparkSql: SparkSqlRunner = SparkSqlRunner(AmaContext.env, 
"sparkSqlParquetJob", "sparkSqlParquetJobAction", notifier, spark)
+    sparkSql.executeQuery("select * FROM 
AMACONTEXT_sparkSqlParquetJobAction_sparkSqlParquetJobActionTempDf READAS 
parquet")
+    val outputDf = 
spark.read.parquet(s"${tempParquetEnv.workingDir}/sparkSqlParquetJob/sparkSqlParquetJobAction/sparkSqlParquetJobActionDf")
+    println("Output Parquet: "+inputDf.count + "," + outputDf.count)
+    inputDf.first().getString(1) shouldEqual(outputDf.first().getString(1))
   }
 
 
   /*
-  Test whether the JSON data is successfully loaded by SparkSQL
-   */
+  Test whether the JSON data is successfully parsed, loaded by SparkSQL
+  */
+
+  "SparkSql" should "load JSON data directly from previous action's dataframe 
and persist the Data in working directory" in {
+
+    val tempJsonEnv = new Environment()
+    tempJsonEnv.workingDir = "file:/tmp/"
+    AmaContext.init(spark, "sparkSqlJsonJob", tempJsonEnv)
+    //Prepare test dataset
+    val inputDf = 
spark.read.json(getClass.getResource("/SparkSql/json").getPath)
+    
inputDf.write.mode(SaveMode.Overwrite).json(s"${tempJsonEnv.workingDir}/sparkSqlJsonJob/sparkSqlJsonJobAction/sparkSqlJsonJobActionTempDf")
+    val sparkSql: SparkSqlRunner = SparkSqlRunner(AmaContext.env, 
"sparkSqlJsonJob", "sparkSqlJsonJobAction", notifier, spark)
+    sparkSql.executeQuery("select * FROM 
amacontext_sparkSqlJsonJobAction_sparkSqlJsonJobActionTempDf  where age='30' 
READAS json")
+    val outputDf = 
spark.read.json(s"${tempJsonEnv.workingDir}/sparkSqlJsonJob/sparkSqlJsonJobAction/sparkSqlJsonJobActionDf")
+    println("Output JSON: "+inputDf.count + "," + outputDf.count)
+    outputDf.first().getString(1) shouldEqual("Kirupa")
+
+  }
 
-  "SparkSql" should "load JSON data and persist the Data in working directory" 
in {
+  /*
+  Test whether the CSV data is successfully parsed, loaded by SparkSQL
+  */
 
-    val sparkSqlJson = SparkSqlRunner(AmaContext.env, "spark-sql-json", 
"spark-sql-json-action", notifier, spark)
-    sparkSqlJson.executeQuery("temptable", 
getClass.getResource("/SparkSql/json/SparkSqlTestData.json").getPath, "select * 
from temptable")
+  "SparkSql" should "load CSV data directly from previous action's dataframe 
and persist the Data in working directory" in {
+
+    val tempCsvEnv = new Environment()
+    tempCsvEnv.workingDir = "file:/tmp/"
+    AmaContext.init(spark, "sparkSqlCsvJob", tempCsvEnv)
+    //Prepare test dataset
+    val inputDf = spark.read.csv(getClass.getResource("/SparkSql/csv").getPath)
+    
inputDf.write.mode(SaveMode.Overwrite).csv(s"${tempCsvEnv.workingDir}/sparkSqlCsvJob/sparkSqlCsvJobAction/sparkSqlCsvJobActionTempDf")
+    val sparkSql: SparkSqlRunner = SparkSqlRunner(AmaContext.env, 
"sparkSqlCsvJob", "sparkSqlCsvJobAction", notifier, spark)
+    sparkSql.executeQuery("select * FROM 
amacontext_sparkSqlCsvJobAction_sparkSqlCsvJobActionTempDf READAS csv")
+    val outputDf = 
spark.read.csv(s"${tempCsvEnv.workingDir}/sparkSqlCsvJob/sparkSqlCsvJobAction/sparkSqlCsvJobActionDf")
+    println("Output CSV: "+inputDf.count + "," + outputDf.count)
+    inputDf.first().getString(1) shouldEqual(outputDf.first().getString(1))
+  }
+  /*
+  Test whether the data can be directly read from a file and executed by 
sparkSql
+  */
+
+  "SparkSql" should "load data directly from a file and persist the Data in 
working directory" in {
+
+    val tempFileEnv = new Environment()
+    tempFileEnv.workingDir = "file:/tmp/"
+    AmaContext.init(spark, "sparkSqlFileJob", tempFileEnv)
+
+    val sparkSql: SparkSqlRunner = SparkSqlRunner(AmaContext.env, 
"sparkSqlFileJob", "sparkSqlFileJobAction", notifier, spark)
+    sparkSql.executeQuery("SELECT * FROM 
parquet.`"+getClass.getResource("/SparkSql/parquet").getPath+"`")
+    val outputParquetDf = 
spark.read.parquet(s"${tempFileEnv.workingDir}/sparkSqlFileJob/sparkSqlFileJobAction/sparkSqlFileJobActionDf")
+    println("Output Parquet dataframe: "+ outputParquetDf.show)
+    outputParquetDf.first().getString(1) shouldEqual("Michael")
+    sparkSql.executeQuery("SELECT * FROM 
json.`"+getClass.getResource("/SparkSql/json").getPath+"`")
+    //@TODO: change the below read.parquet to read.outputFileFormat specified 
in the yaml file
+    val outputJsonDf = 
spark.read.parquet(s"${tempFileEnv.workingDir}/sparkSqlFileJob/sparkSqlFileJobAction/sparkSqlFileJobActionDf")
+    println("Output Json dataframe: "+ outputJsonDf.show)
+    outputJsonDf.first().getString(1) shouldEqual("Sampath")
 
   }
 
+
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to