roadan closed pull request #2: Amaterasu 13
URL: https://github.com/apache/incubator-amaterasu/pull/2
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkSql/SparkSqlRunner.scala
b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkSql/SparkSqlRunner.scala
index d34be92..7a7fd0d 100644
---
a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkSql/SparkSqlRunner.scala
+++
b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkSql/SparkSqlRunner.scala
@@ -17,15 +17,17 @@
package org.apache.amaterasu.executor.execution.actions.runners.spark.SparkSql
import java.io.File
+
import org.apache.amaterasu.common.execution.actions.Notifier
import org.apache.amaterasu.common.logging.Logging
import org.apache.amaterasu.common.runtime.Environment
+import org.apache.amaterasu.executor.runtime.AmaContext
import org.apache.commons.io.FilenameUtils
-import org.apache.spark.sql.{SparkSession, SaveMode,DataFrame}
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
/**
* Amaterasu currently supports JSON and PARQUET as data sources.
- * CSV data source support will be provided in the later versions.
+ * CSV data source support will be provided in later versions.
*/
class SparkSqlRunner extends Logging {
var env: Environment = _
@@ -34,33 +36,80 @@ class SparkSqlRunner extends Logging {
var actionName: String = _
var spark: SparkSession = _
- def executeQuery(sparkSqlTempTable: String,
- dataSource: String,
- query: String) = {
-
- notifier.info(s"================= started action $actionName
=================")
- val file: File = new File(dataSource)
- notifier.info(s"================= auto-detecting file type of data source
=================")
- val loadData: DataFrame = file match {
- case _ if file.isFile => FilenameUtils.getExtension(file.toString) match
{
- case "json" => spark.read.json(dataSource)
- case "parquet" => spark.read.parquet(dataSource)
- }
- case _ if file.isDirectory => {
- val extensions = findFileType(file)
- extensions match {
- case _ if extensions.contains("json") => spark.read.json(dataSource)
- case _ if extensions.contains("parquet") =>
spark.read.parquet(dataSource)
- }
- }
- }
+ /*
+ Method: executeQuery
+ Description: when user specifies query in amaterasu format, this method
parse and executes the query.
+ If not in Amaterasu format, then directly executes the query
+ @Params: query string
+ */
+ def executeQuery(query: String): Unit = {
- loadData.createOrReplaceTempView(sparkSqlTempTable)
notifier.info(s"================= executing the SQL query
=================")
if (!query.isEmpty) {
- val sqlDf = spark.sql(query)
- println(s"${env.workingDir}/$jobId/$actionName")
-
sqlDf.write.mode(SaveMode.Overwrite).parquet(s"${env.workingDir}/$jobId/$actionName")
+
+ if (query.toLowerCase.contains("amacontext")) {
+
+ //Parse the incoming query
+ notifier.info(s"================= parsing the SQL query
=================")
+
+ val parser: List[String] = query.toLowerCase.split(" ").toList
+ var sqlPart1: String = ""
+ var sqlPart2: String = ""
+ var queryTempLen: Int = 0
+
+ //get only the sql part of the query
+ for (i <- 0 to parser.indexOf("from")) {
+ sqlPart1 += parser(i) + " "
+ }
+
+ if (parser.indexOf("readas") == -1) {
+ queryTempLen = parser.length - 1
+ }
+ else
+ queryTempLen = parser.length - 3
+
+ for (i <- parser.indexOf("from") + 1 to queryTempLen) {
+ if (!parser(i).contains("amacontext"))
+ sqlPart2 += " " + parser(i)
+ }
+
+ //If no read format is speicified by the user, use PARQUET as default
file format to load data
+ var fileFormat: String = null
+ //if there is no index for "readas" keyword, then set PARQUET as
default read format
+ if (parser.indexOf("readas") == -1) {
+ fileFormat = "parquet"
+ }
+ else
+ fileFormat = parser(parser.indexOf("readas") + 1)
+
+
+ val locationPath: String = parser.filter(word =>
word.contains("amacontext")).mkString("")
+ val directories = locationPath.split("_")
+ val actionName = directories(1)
+ val dfName = directories(2)
+ val parsedQuery = sqlPart1 + locationPath + sqlPart2
+
+ //Load the dataframe from previous action
+ val loadData: DataFrame = AmaContext.getDataFrame(actionName, dfName,
fileFormat)
+ loadData.createOrReplaceTempView(locationPath)
+
+ notifier.info("Executing SparkSql on: "+parsedQuery)
+ val sqlDf = spark.sql(parsedQuery)
+ //@TODO: outputFileFormat should be read from YAML file instead of
input fileformat
+ writeDf(sqlDf, fileFormat, env.workingDir, jobId, actionName)
+
+ notifier.info(s"================= finished action $actionName
=================")
+ }
+ else {
+
+ notifier.info("Executing SparkSql on: "+query)
+
+ val fildDf = spark.sql(query)
+ //@TODO: outputFileFormat should be read from YAML file instead of
output fileFormat being empty
+ writeDf(fildDf, "", env.workingDir, jobId, actionName)
+
+ notifier.info(s"================= finished action $actionName
=================")
+ }
}
notifier.info(s"================= finished action $actionName
=================")
@@ -79,6 +128,26 @@ class SparkSqlRunner extends Logging {
extensions
}
+ /*
+ Method to write dataframes to a specified format
+ @Params
+ df: Dataframe to be written
+ fileFormat: same as input file format
+ workingDir: temp directory
+ jobId, actionName: As specified by the user
+ */
+ def writeDf(df: DataFrame, outputFileFormat: String, workingDir: String,
jobId: String, actionName: String): Unit = {
+ outputFileFormat.toLowerCase match {
+ case "parquet" =>
df.write.mode(SaveMode.Overwrite).parquet(s"$workingDir/$jobId/$actionName/" +
actionName + "Df")
+ case "json" =>
df.write.mode(SaveMode.Overwrite).json(s"$workingDir/$jobId/$actionName/" +
actionName + "Df")
+ case "csv" =>
df.write.mode(SaveMode.Overwrite).csv(s"$workingDir/$jobId/$actionName/" +
actionName + "Df")
+ case "orc" =>
df.write.mode(SaveMode.Overwrite).orc(s"$workingDir/$jobId/$actionName/" +
actionName + "Df")
+ case "text" =>
df.write.mode(SaveMode.Overwrite).text(s"$workingDir/$jobId/$actionName/" +
actionName + "Df")
+ //case "jdbc" =>
df.write.mode(SaveMode.Overwrite).jdbc(s"$workingDir/$jobId/$actionName/" +
actionName + "Df")
+ case _ =>
df.write.mode(SaveMode.Overwrite).parquet(s"$workingDir/$jobId/$actionName/" +
actionName + "Df")
+ }
+ }
+
}
object SparkSqlRunner {
diff --git a/executor/src/test/resources/SparkSql/csv/SparkSqlTestCsv.csv
b/executor/src/test/resources/SparkSql/csv/SparkSqlTestCsv.csv
new file mode 100644
index 0000000..5f0ce0e
--- /dev/null
+++ b/executor/src/test/resources/SparkSql/csv/SparkSqlTestCsv.csv
@@ -0,0 +1,4 @@
+name,age
+sampath,22
+kirupa,30
+dev,19
\ No newline at end of file
diff --git a/executor/src/test/resources/SparkSql/json/SparkSqlTestData.json
b/executor/src/test/resources/SparkSql/json/SparkSqlTestData.json
index ce3e0d5..d297f1f 100644
--- a/executor/src/test/resources/SparkSql/json/SparkSqlTestData.json
+++ b/executor/src/test/resources/SparkSql/json/SparkSqlTestData.json
@@ -1,3 +1,3 @@
-{"name":"Michael","age":22}
-{"name":"Andy", "age":30}
-{"name":"Justin", "age":19}]
\ No newline at end of file
+{"name":"Sampath","age":22}
+{"name":"Kirupa", "age":30}
+{"name":"Dev", "age":19}]
\ No newline at end of file
diff --git
a/executor/src/test/scala/org/apache/amaterasu/spark/SparkSqlRunnerTests.scala
b/executor/src/test/scala/org/apache/amaterasu/spark/SparkSqlRunnerTests.scala
index cbb7b2a..0312dad 100644
---
a/executor/src/test/scala/org/apache/amaterasu/spark/SparkSqlRunnerTests.scala
+++
b/executor/src/test/scala/org/apache/amaterasu/spark/SparkSqlRunnerTests.scala
@@ -47,12 +47,12 @@ class SparkSqlRunnerTests extends FlatSpec with Matchers
with BeforeAndAfterAll
val env = new Environment()
env.workingDir = "file:/tmp/"
spark = SparkSession.builder()
- .appName("sql_job")
+ .appName("sql-job")
.master("local[*]")
.config("spark.local.ip", "127.0.0.1")
.getOrCreate()
- AmaContext.init(spark, "sql_job", env)
+ AmaContext.init(spark, "sql-job", env)
super.beforeAll()
}
@@ -64,26 +64,104 @@ class SparkSqlRunnerTests extends FlatSpec with Matchers
with BeforeAndAfterAll
/*
- Test whether the parquet data is successfully loaded and processed by
SparkSQL
+ Test whether parquet is used as default file format to load data from
previous actions
*/
- "SparkSql" should "load PARQUET data and persist the Data in working
directory" in {
+ "SparkSql" should "load data as parquet if no input foramt is specified" in {
+
+ val defaultParquetEnv = new Environment()
+ defaultParquetEnv.workingDir = "file:/tmp/"
+ AmaContext.init(spark, "sparkSqlDefaultParquetJob", defaultParquetEnv)
+ //Prepare test dataset
+ val inputDf =
spark.read.parquet(getClass.getResource("/SparkSql/parquet").getPath)
+
inputDf.write.mode(SaveMode.Overwrite).parquet(s"${defaultParquetEnv.workingDir}/sparkSqlDefaultParquetJob/sparkSqlDefaultParquetJobAction/sparkSqlDefaultParquetJobActionTempDf")
+ val sparkSql: SparkSqlRunner = SparkSqlRunner(AmaContext.env,
"sparkSqlDefaultParquetJob", "sparkSqlDefaultParquetJobAction", notifier, spark)
+ sparkSql.executeQuery("select * FROM
AMACONTEXT_sparkSqlDefaultParquetJobAction_sparkSqlDefaultParquetJobActionTempDf
where age=22")
+ val outputDf =
spark.read.parquet(s"${defaultParquetEnv.workingDir}/sparkSqlDefaultParquetJob/sparkSqlDefaultParquetJobAction/sparkSqlDefaultParquetJobActionDf")
+ println("Output Default Parquet: "+inputDf.count + "," +
outputDf.first().getString(1))
+ outputDf.first().getString(1) shouldEqual("Michael")
+ }
- val sparkSql: SparkSqlRunner = SparkSqlRunner(AmaContext.env,
"spark-sql-parquet", "spark-sql-parquet-action", notifier, spark)
- sparkSql.executeQuery("temptable",
getClass.getResource("/SparkSql/parquet").getPath, "select * from temptable")
+ /*
+ Test whether the parquet data is successfully parsed, loaded and processed
by SparkSQL
+ */
+ "SparkSql" should "load PARQUET data directly from previous action's
dataframe and persist the Data in working directory" in {
+
+ val tempParquetEnv = new Environment()
+ tempParquetEnv.workingDir = "file:/tmp/"
+ AmaContext.init(spark, "sparkSqlParquetJob", tempParquetEnv)
+ //Prepare test dataset
+ val inputDf =
spark.read.parquet(getClass.getResource("/SparkSql/parquet").getPath)
+
inputDf.write.mode(SaveMode.Overwrite).parquet(s"${tempParquetEnv.workingDir}/sparkSqlParquetJob/sparkSqlParquetJobAction/sparkSqlParquetJobActionTempDf")
+ val sparkSql: SparkSqlRunner = SparkSqlRunner(AmaContext.env,
"sparkSqlParquetJob", "sparkSqlParquetJobAction", notifier, spark)
+ sparkSql.executeQuery("select * FROM
AMACONTEXT_sparkSqlParquetJobAction_sparkSqlParquetJobActionTempDf READAS
parquet")
+ val outputDf =
spark.read.parquet(s"${tempParquetEnv.workingDir}/sparkSqlParquetJob/sparkSqlParquetJobAction/sparkSqlParquetJobActionDf")
+ println("Output Parquet: "+inputDf.count + "," + outputDf.count)
+ inputDf.first().getString(1) shouldEqual(outputDf.first().getString(1))
}
/*
- Test whether the JSON data is successfully loaded by SparkSQL
- */
+ Test whether the JSON data is successfully parsed, loaded by SparkSQL
+ */
+
+ "SparkSql" should "load JSON data directly from previous action's dataframe
and persist the Data in working directory" in {
+
+ val tempJsonEnv = new Environment()
+ tempJsonEnv.workingDir = "file:/tmp/"
+ AmaContext.init(spark, "sparkSqlJsonJob", tempJsonEnv)
+ //Prepare test dataset
+ val inputDf =
spark.read.json(getClass.getResource("/SparkSql/json").getPath)
+
inputDf.write.mode(SaveMode.Overwrite).json(s"${tempJsonEnv.workingDir}/sparkSqlJsonJob/sparkSqlJsonJobAction/sparkSqlJsonJobActionTempDf")
+ val sparkSql: SparkSqlRunner = SparkSqlRunner(AmaContext.env,
"sparkSqlJsonJob", "sparkSqlJsonJobAction", notifier, spark)
+ sparkSql.executeQuery("select * FROM
amacontext_sparkSqlJsonJobAction_sparkSqlJsonJobActionTempDf where age='30'
READAS json")
+ val outputDf =
spark.read.json(s"${tempJsonEnv.workingDir}/sparkSqlJsonJob/sparkSqlJsonJobAction/sparkSqlJsonJobActionDf")
+ println("Output JSON: "+inputDf.count + "," + outputDf.count)
+ outputDf.first().getString(1) shouldEqual("Kirupa")
+
+ }
- "SparkSql" should "load JSON data and persist the Data in working directory"
in {
+ /*
+ Test whether the CSV data is successfully parsed, loaded by SparkSQL
+ */
- val sparkSqlJson = SparkSqlRunner(AmaContext.env, "spark-sql-json",
"spark-sql-json-action", notifier, spark)
- sparkSqlJson.executeQuery("temptable",
getClass.getResource("/SparkSql/json/SparkSqlTestData.json").getPath, "select *
from temptable")
+ "SparkSql" should "load CSV data directly from previous action's dataframe
and persist the Data in working directory" in {
+
+ val tempCsvEnv = new Environment()
+ tempCsvEnv.workingDir = "file:/tmp/"
+ AmaContext.init(spark, "sparkSqlCsvJob", tempCsvEnv)
+ //Prepare test dataset
+ val inputDf = spark.read.csv(getClass.getResource("/SparkSql/csv").getPath)
+
inputDf.write.mode(SaveMode.Overwrite).csv(s"${tempCsvEnv.workingDir}/sparkSqlCsvJob/sparkSqlCsvJobAction/sparkSqlCsvJobActionTempDf")
+ val sparkSql: SparkSqlRunner = SparkSqlRunner(AmaContext.env,
"sparkSqlCsvJob", "sparkSqlCsvJobAction", notifier, spark)
+ sparkSql.executeQuery("select * FROM
amacontext_sparkSqlCsvJobAction_sparkSqlCsvJobActionTempDf READAS csv")
+ val outputDf =
spark.read.csv(s"${tempCsvEnv.workingDir}/sparkSqlCsvJob/sparkSqlCsvJobAction/sparkSqlCsvJobActionDf")
+ println("Output CSV: "+inputDf.count + "," + outputDf.count)
+ inputDf.first().getString(1) shouldEqual(outputDf.first().getString(1))
+ }
+ /*
+ Test whether the data can be directly read from a file and executed by
sparkSql
+ */
+
+ "SparkSql" should "load data directly from a file and persist the Data in
working directory" in {
+
+ val tempFileEnv = new Environment()
+ tempFileEnv.workingDir = "file:/tmp/"
+ AmaContext.init(spark, "sparkSqlFileJob", tempFileEnv)
+
+ val sparkSql: SparkSqlRunner = SparkSqlRunner(AmaContext.env,
"sparkSqlFileJob", "sparkSqlFileJobAction", notifier, spark)
+ sparkSql.executeQuery("SELECT * FROM
parquet.`"+getClass.getResource("/SparkSql/parquet").getPath+"`")
+ val outputParquetDf =
spark.read.parquet(s"${tempFileEnv.workingDir}/sparkSqlFileJob/sparkSqlFileJobAction/sparkSqlFileJobActionDf")
+ println("Output Parquet dataframe: "+ outputParquetDf.show)
+ outputParquetDf.first().getString(1) shouldEqual("Michael")
+ sparkSql.executeQuery("SELECT * FROM
json.`"+getClass.getResource("/SparkSql/json").getPath+"`")
+ //@TODO: change the below read.parquet to read.outputFileFormat specified
in the yaml file
+ val outputJsonDf =
spark.read.parquet(s"${tempFileEnv.workingDir}/sparkSqlFileJob/sparkSqlFileJobAction/sparkSqlFileJobActionDf")
+ println("Output Json dataframe: "+ outputJsonDf.show)
+ outputJsonDf.first().getString(1) shouldEqual("Sampath")
}
+
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services