roadan closed pull request #2: Amaterasu 13 URL: https://github.com/apache/incubator-amaterasu/pull/2
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkSql/SparkSqlRunner.scala b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkSql/SparkSqlRunner.scala index d34be92..7a7fd0d 100644 --- a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkSql/SparkSqlRunner.scala +++ b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkSql/SparkSqlRunner.scala @@ -17,15 +17,17 @@ package org.apache.amaterasu.executor.execution.actions.runners.spark.SparkSql import java.io.File + import org.apache.amaterasu.common.execution.actions.Notifier import org.apache.amaterasu.common.logging.Logging import org.apache.amaterasu.common.runtime.Environment +import org.apache.amaterasu.executor.runtime.AmaContext import org.apache.commons.io.FilenameUtils -import org.apache.spark.sql.{SparkSession, SaveMode,DataFrame} +import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} /** * Amaterasu currently supports JSON and PARQUET as data sources. - * CSV data source support will be provided in the later versions. + * CSV data source support will be provided in later versions. */ class SparkSqlRunner extends Logging { var env: Environment = _ @@ -34,33 +36,80 @@ class SparkSqlRunner extends Logging { var actionName: String = _ var spark: SparkSession = _ - def executeQuery(sparkSqlTempTable: String, - dataSource: String, - query: String) = { - - notifier.info(s"================= started action $actionName =================") - val file: File = new File(dataSource) - notifier.info(s"================= auto-detecting file type of data source =================") - val loadData: DataFrame = file match { - case _ if file.isFile => FilenameUtils.getExtension(file.toString) match { - case "json" => spark.read.json(dataSource) - case "parquet" => spark.read.parquet(dataSource) - } - case _ if file.isDirectory => { - val extensions = findFileType(file) - extensions match { - case _ if extensions.contains("json") => spark.read.json(dataSource) - case _ if extensions.contains("parquet") => spark.read.parquet(dataSource) - } - } - } + /* + Method: executeQuery + Description: when user specifies query in amaterasu format, this method parse and executes the query. + If not in Amaterasu format, then directly executes the query + @Params: query string + */ + def executeQuery(query: String): Unit = { - loadData.createOrReplaceTempView(sparkSqlTempTable) notifier.info(s"================= executing the SQL query =================") if (!query.isEmpty) { - val sqlDf = spark.sql(query) - println(s"${env.workingDir}/$jobId/$actionName") - sqlDf.write.mode(SaveMode.Overwrite).parquet(s"${env.workingDir}/$jobId/$actionName") + + if (query.toLowerCase.contains("amacontext")) { + + //Parse the incoming query + notifier.info(s"================= parsing the SQL query =================") + + val parser: List[String] = query.toLowerCase.split(" ").toList + var sqlPart1: String = "" + var sqlPart2: String = "" + var queryTempLen: Int = 0 + + //get only the sql part of the query + for (i <- 0 to parser.indexOf("from")) { + sqlPart1 += parser(i) + " " + } + + if (parser.indexOf("readas") == -1) { + queryTempLen = parser.length - 1 + } + else + queryTempLen = parser.length - 3 + + for (i <- parser.indexOf("from") + 1 to queryTempLen) { + if (!parser(i).contains("amacontext")) + sqlPart2 += " " + parser(i) + } + + //If no read format is speicified by the user, use PARQUET as default file format to load data + var fileFormat: String = null + //if there is no index for "readas" keyword, then set PARQUET as default read format + if (parser.indexOf("readas") == -1) { + fileFormat = "parquet" + } + else + fileFormat = parser(parser.indexOf("readas") + 1) + + + val locationPath: String = parser.filter(word => word.contains("amacontext")).mkString("") + val directories = locationPath.split("_") + val actionName = directories(1) + val dfName = directories(2) + val parsedQuery = sqlPart1 + locationPath + sqlPart2 + + //Load the dataframe from previous action + val loadData: DataFrame = AmaContext.getDataFrame(actionName, dfName, fileFormat) + loadData.createOrReplaceTempView(locationPath) + + notifier.info("Executing SparkSql on: "+parsedQuery) + val sqlDf = spark.sql(parsedQuery) + //@TODO: outputFileFormat should be read from YAML file instead of input fileformat + writeDf(sqlDf, fileFormat, env.workingDir, jobId, actionName) + + notifier.info(s"================= finished action $actionName =================") + } + else { + + notifier.info("Executing SparkSql on: "+query) + + val fildDf = spark.sql(query) + //@TODO: outputFileFormat should be read from YAML file instead of output fileFormat being empty + writeDf(fildDf, "", env.workingDir, jobId, actionName) + + notifier.info(s"================= finished action $actionName =================") + } } notifier.info(s"================= finished action $actionName =================") @@ -79,6 +128,26 @@ class SparkSqlRunner extends Logging { extensions } + /* + Method to write dataframes to a specified format + @Params + df: Dataframe to be written + fileFormat: same as input file format + workingDir: temp directory + jobId, actionName: As specified by the user + */ + def writeDf(df: DataFrame, outputFileFormat: String, workingDir: String, jobId: String, actionName: String): Unit = { + outputFileFormat.toLowerCase match { + case "parquet" => df.write.mode(SaveMode.Overwrite).parquet(s"$workingDir/$jobId/$actionName/" + actionName + "Df") + case "json" => df.write.mode(SaveMode.Overwrite).json(s"$workingDir/$jobId/$actionName/" + actionName + "Df") + case "csv" => df.write.mode(SaveMode.Overwrite).csv(s"$workingDir/$jobId/$actionName/" + actionName + "Df") + case "orc" => df.write.mode(SaveMode.Overwrite).orc(s"$workingDir/$jobId/$actionName/" + actionName + "Df") + case "text" => df.write.mode(SaveMode.Overwrite).text(s"$workingDir/$jobId/$actionName/" + actionName + "Df") + //case "jdbc" => df.write.mode(SaveMode.Overwrite).jdbc(s"$workingDir/$jobId/$actionName/" + actionName + "Df") + case _ => df.write.mode(SaveMode.Overwrite).parquet(s"$workingDir/$jobId/$actionName/" + actionName + "Df") + } + } + } object SparkSqlRunner { diff --git a/executor/src/test/resources/SparkSql/csv/SparkSqlTestCsv.csv b/executor/src/test/resources/SparkSql/csv/SparkSqlTestCsv.csv new file mode 100644 index 0000000..5f0ce0e --- /dev/null +++ b/executor/src/test/resources/SparkSql/csv/SparkSqlTestCsv.csv @@ -0,0 +1,4 @@ +name,age +sampath,22 +kirupa,30 +dev,19 \ No newline at end of file diff --git a/executor/src/test/resources/SparkSql/json/SparkSqlTestData.json b/executor/src/test/resources/SparkSql/json/SparkSqlTestData.json index ce3e0d5..d297f1f 100644 --- a/executor/src/test/resources/SparkSql/json/SparkSqlTestData.json +++ b/executor/src/test/resources/SparkSql/json/SparkSqlTestData.json @@ -1,3 +1,3 @@ -{"name":"Michael","age":22} -{"name":"Andy", "age":30} -{"name":"Justin", "age":19}] \ No newline at end of file +{"name":"Sampath","age":22} +{"name":"Kirupa", "age":30} +{"name":"Dev", "age":19}] \ No newline at end of file diff --git a/executor/src/test/scala/org/apache/amaterasu/spark/SparkSqlRunnerTests.scala b/executor/src/test/scala/org/apache/amaterasu/spark/SparkSqlRunnerTests.scala index cbb7b2a..0312dad 100644 --- a/executor/src/test/scala/org/apache/amaterasu/spark/SparkSqlRunnerTests.scala +++ b/executor/src/test/scala/org/apache/amaterasu/spark/SparkSqlRunnerTests.scala @@ -47,12 +47,12 @@ class SparkSqlRunnerTests extends FlatSpec with Matchers with BeforeAndAfterAll val env = new Environment() env.workingDir = "file:/tmp/" spark = SparkSession.builder() - .appName("sql_job") + .appName("sql-job") .master("local[*]") .config("spark.local.ip", "127.0.0.1") .getOrCreate() - AmaContext.init(spark, "sql_job", env) + AmaContext.init(spark, "sql-job", env) super.beforeAll() } @@ -64,26 +64,104 @@ class SparkSqlRunnerTests extends FlatSpec with Matchers with BeforeAndAfterAll /* - Test whether the parquet data is successfully loaded and processed by SparkSQL + Test whether parquet is used as default file format to load data from previous actions */ - "SparkSql" should "load PARQUET data and persist the Data in working directory" in { + "SparkSql" should "load data as parquet if no input foramt is specified" in { + + val defaultParquetEnv = new Environment() + defaultParquetEnv.workingDir = "file:/tmp/" + AmaContext.init(spark, "sparkSqlDefaultParquetJob", defaultParquetEnv) + //Prepare test dataset + val inputDf = spark.read.parquet(getClass.getResource("/SparkSql/parquet").getPath) + inputDf.write.mode(SaveMode.Overwrite).parquet(s"${defaultParquetEnv.workingDir}/sparkSqlDefaultParquetJob/sparkSqlDefaultParquetJobAction/sparkSqlDefaultParquetJobActionTempDf") + val sparkSql: SparkSqlRunner = SparkSqlRunner(AmaContext.env, "sparkSqlDefaultParquetJob", "sparkSqlDefaultParquetJobAction", notifier, spark) + sparkSql.executeQuery("select * FROM AMACONTEXT_sparkSqlDefaultParquetJobAction_sparkSqlDefaultParquetJobActionTempDf where age=22") + val outputDf = spark.read.parquet(s"${defaultParquetEnv.workingDir}/sparkSqlDefaultParquetJob/sparkSqlDefaultParquetJobAction/sparkSqlDefaultParquetJobActionDf") + println("Output Default Parquet: "+inputDf.count + "," + outputDf.first().getString(1)) + outputDf.first().getString(1) shouldEqual("Michael") + } - val sparkSql: SparkSqlRunner = SparkSqlRunner(AmaContext.env, "spark-sql-parquet", "spark-sql-parquet-action", notifier, spark) - sparkSql.executeQuery("temptable", getClass.getResource("/SparkSql/parquet").getPath, "select * from temptable") + /* + Test whether the parquet data is successfully parsed, loaded and processed by SparkSQL + */ + "SparkSql" should "load PARQUET data directly from previous action's dataframe and persist the Data in working directory" in { + + val tempParquetEnv = new Environment() + tempParquetEnv.workingDir = "file:/tmp/" + AmaContext.init(spark, "sparkSqlParquetJob", tempParquetEnv) + //Prepare test dataset + val inputDf = spark.read.parquet(getClass.getResource("/SparkSql/parquet").getPath) + inputDf.write.mode(SaveMode.Overwrite).parquet(s"${tempParquetEnv.workingDir}/sparkSqlParquetJob/sparkSqlParquetJobAction/sparkSqlParquetJobActionTempDf") + val sparkSql: SparkSqlRunner = SparkSqlRunner(AmaContext.env, "sparkSqlParquetJob", "sparkSqlParquetJobAction", notifier, spark) + sparkSql.executeQuery("select * FROM AMACONTEXT_sparkSqlParquetJobAction_sparkSqlParquetJobActionTempDf READAS parquet") + val outputDf = spark.read.parquet(s"${tempParquetEnv.workingDir}/sparkSqlParquetJob/sparkSqlParquetJobAction/sparkSqlParquetJobActionDf") + println("Output Parquet: "+inputDf.count + "," + outputDf.count) + inputDf.first().getString(1) shouldEqual(outputDf.first().getString(1)) } /* - Test whether the JSON data is successfully loaded by SparkSQL - */ + Test whether the JSON data is successfully parsed, loaded by SparkSQL + */ + + "SparkSql" should "load JSON data directly from previous action's dataframe and persist the Data in working directory" in { + + val tempJsonEnv = new Environment() + tempJsonEnv.workingDir = "file:/tmp/" + AmaContext.init(spark, "sparkSqlJsonJob", tempJsonEnv) + //Prepare test dataset + val inputDf = spark.read.json(getClass.getResource("/SparkSql/json").getPath) + inputDf.write.mode(SaveMode.Overwrite).json(s"${tempJsonEnv.workingDir}/sparkSqlJsonJob/sparkSqlJsonJobAction/sparkSqlJsonJobActionTempDf") + val sparkSql: SparkSqlRunner = SparkSqlRunner(AmaContext.env, "sparkSqlJsonJob", "sparkSqlJsonJobAction", notifier, spark) + sparkSql.executeQuery("select * FROM amacontext_sparkSqlJsonJobAction_sparkSqlJsonJobActionTempDf where age='30' READAS json") + val outputDf = spark.read.json(s"${tempJsonEnv.workingDir}/sparkSqlJsonJob/sparkSqlJsonJobAction/sparkSqlJsonJobActionDf") + println("Output JSON: "+inputDf.count + "," + outputDf.count) + outputDf.first().getString(1) shouldEqual("Kirupa") + + } - "SparkSql" should "load JSON data and persist the Data in working directory" in { + /* + Test whether the CSV data is successfully parsed, loaded by SparkSQL + */ - val sparkSqlJson = SparkSqlRunner(AmaContext.env, "spark-sql-json", "spark-sql-json-action", notifier, spark) - sparkSqlJson.executeQuery("temptable", getClass.getResource("/SparkSql/json/SparkSqlTestData.json").getPath, "select * from temptable") + "SparkSql" should "load CSV data directly from previous action's dataframe and persist the Data in working directory" in { + + val tempCsvEnv = new Environment() + tempCsvEnv.workingDir = "file:/tmp/" + AmaContext.init(spark, "sparkSqlCsvJob", tempCsvEnv) + //Prepare test dataset + val inputDf = spark.read.csv(getClass.getResource("/SparkSql/csv").getPath) + inputDf.write.mode(SaveMode.Overwrite).csv(s"${tempCsvEnv.workingDir}/sparkSqlCsvJob/sparkSqlCsvJobAction/sparkSqlCsvJobActionTempDf") + val sparkSql: SparkSqlRunner = SparkSqlRunner(AmaContext.env, "sparkSqlCsvJob", "sparkSqlCsvJobAction", notifier, spark) + sparkSql.executeQuery("select * FROM amacontext_sparkSqlCsvJobAction_sparkSqlCsvJobActionTempDf READAS csv") + val outputDf = spark.read.csv(s"${tempCsvEnv.workingDir}/sparkSqlCsvJob/sparkSqlCsvJobAction/sparkSqlCsvJobActionDf") + println("Output CSV: "+inputDf.count + "," + outputDf.count) + inputDf.first().getString(1) shouldEqual(outputDf.first().getString(1)) + } + /* + Test whether the data can be directly read from a file and executed by sparkSql + */ + + "SparkSql" should "load data directly from a file and persist the Data in working directory" in { + + val tempFileEnv = new Environment() + tempFileEnv.workingDir = "file:/tmp/" + AmaContext.init(spark, "sparkSqlFileJob", tempFileEnv) + + val sparkSql: SparkSqlRunner = SparkSqlRunner(AmaContext.env, "sparkSqlFileJob", "sparkSqlFileJobAction", notifier, spark) + sparkSql.executeQuery("SELECT * FROM parquet.`"+getClass.getResource("/SparkSql/parquet").getPath+"`") + val outputParquetDf = spark.read.parquet(s"${tempFileEnv.workingDir}/sparkSqlFileJob/sparkSqlFileJobAction/sparkSqlFileJobActionDf") + println("Output Parquet dataframe: "+ outputParquetDf.show) + outputParquetDf.first().getString(1) shouldEqual("Michael") + sparkSql.executeQuery("SELECT * FROM json.`"+getClass.getResource("/SparkSql/json").getPath+"`") + //@TODO: change the below read.parquet to read.outputFileFormat specified in the yaml file + val outputJsonDf = spark.read.parquet(s"${tempFileEnv.workingDir}/sparkSqlFileJob/sparkSqlFileJobAction/sparkSqlFileJobActionDf") + println("Output Json dataframe: "+ outputJsonDf.show) + outputJsonDf.first().getString(1) shouldEqual("Sampath") } + } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services