Very kind of you. On Sat, 28 Mar 2020, 15:24 Russell Spitzer, <russell.spit...@gmail.com> wrote:
> This is probably more of a question for the user support list, but I > believe I understand the issue. > > Schema inside of spark refers to the structure of the output rows, for > example the schema for a particular dataframe could be > (User: Int, Password: String) - Two Columns the first is User of type int > and the second is Password of Type String. > > When you pass the schema from one reader to another, you are only > copyting this structure, not all of the other options associated with the > dataframe. > This is usually useful when you are reading from sources with different > options but data that needs to be read into the same structure. > > The other properties such as "format" and "options" exist independently of > Schema. This is helpful if I was reading from both MySQL and > a comma separated file for example. While the Schema is the same, the > options like ("inferSchema") do not apply to both MySql and CSV and > format actually picks whether to us "JDBC" or "CSV" so copying that > wouldn't be helpful either. > > I hope this clears things up, > Russ > > On Sat, Mar 28, 2020, 12:33 AM Zahid Rahman <zahidr1...@gmail.com> wrote: > >> Hi, >> version: spark-3.0.0-preview2-bin-hadoop2.7 >> >> As you can see from the code : >> >> STEP 1: I create a object of type static frame which holds all the >> information to the datasource (csv files). >> >> STEP 2: Then I create a variable called staticSchema assigning the >> information of the schema from the original static data frame. >> >> STEP 3: then I create another variable called val streamingDataFrame of >> type spark.readStream. >> and Into the .schema function parameters I pass the object staticSchema >> which is meant to hold the information to the csv files including the >> .load(path) function etc. >> >> So then when I am creating val StreamingDataFrame and passing it >> .schema(staticSchema) >> the variable StreamingDataFrame should have all the information. >> I should only have to call .option("maxFilePerTrigger",1) and not .format >> ("csv") .option("header","true").load("/data/retail-data/by-day/*.csv") >> Otherwise what is the point of passing .schema(staticSchema) to >> StreamingDataFrame. >> >> You can replicate it using the complete code below. >> >> import org.apache.spark.sql.SparkSession >> import org.apache.spark.sql.functions.{window,column,desc,col} >> >> object RetailData { >> >> def main(args: Array[String]): Unit = { >> >> // create spark session >> val spark = >> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Retail >> Data").getOrCreate(); >> // set spark runtime configuration >> spark.conf.set("spark.sql.shuffle.partitions","5") >> >> spark.conf.set("spark.sql.streaming.forceDeleteTempCheckpointLocation","True") >> >> // create a static frame >> val staticDataFrame = spark.read.format("csv") >> .option ("header","true") >> .option("inferschema","true") >> .load("/data/retail-data/by-day/*.csv") >> >> >> staticDataFrame.createOrReplaceTempView("retail_data") >> val staticSchema = staticDataFrame.schema >> >> staticDataFrame >> .selectExpr( >> "CustomerId","UnitPrice * Quantity as total_cost", "InvoiceDate") >> .groupBy(col("CustomerId"), >> window(col("InvoiceDate"), >> "1 day")) >> .sum("total_cost") >> .sort(desc("sum(total_cost)")) >> .show(2) >> >> val streamingDataFrame = spark.readStream >> .schema(staticSchema) >> .format("csv") >> .option("maxFilesPerTrigger", 1) >> .option("header","true") >> .load("/data/retail-data/by-day/*.csv") >> >> println(streamingDataFrame.isStreaming) >> >> // lazy operation so we will need to call a streaming action to start >> the action >> val purchaseByCustomerPerHour = streamingDataFrame >> .selectExpr( >> "CustomerId", >> "(UnitPrice * Quantity) as total_cost", >> "InvoiceDate") >> .groupBy( >> col("CustomerId"), window(col("InvoiceDate"), "1 day")) >> .sum("total_cost") >> >> // stream action to write to console >> purchaseByCustomerPerHour.writeStream >> .format("console") >> .queryName("customer_purchases") >> .outputMode("complete") >> .start() >> >> } // main >> >> } // object >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> val staticSchema = staticDataFrame.schema >> >> >> >> >> >> >> >> >> >> >> >> >> >> Backbutton.co.uk >> ¯\_(ツ)_/¯ >> ♡۶Java♡۶RMI ♡۶ >> Make Use Method {MUM} >> makeuse.org >> <http://www.backbutton.co.uk> >> >