So the schema is limited to holding only the DEFINITION of schema. For
example as you say  the columns, I.e. first column User:Int 2nd column
String:password.

Not location of source I.e. csv file with or without header.  SQL DB tables.

I am pleased for once I am wrong about being another bug, and it was a
design decision adding flexibility.









On Sat, 28 Mar 2020, 15:24 Russell Spitzer, <russell.spit...@gmail.com>
wrote:

> This is probably more of a question for the user support list, but I
> believe I understand the issue.
>
> Schema inside of spark refers to the structure of the output rows, for
> example the schema for a particular dataframe could be
> (User: Int, Password: String) - Two Columns the first is User of type int
> and the second is Password of Type String.
>
> When you pass the schema from one reader to another, you are only
> copyting this structure, not all of the other options associated with the
> dataframe.
> This is usually useful when you are reading from sources with different
> options but data that needs to be read into the same structure.
>
> The other properties such as "format" and "options" exist independently of
> Schema. This is helpful if I was reading from both MySQL and
> a comma separated file for example. While the Schema is the same, the
> options like ("inferSchema") do not apply to both MySql and CSV and
> format actually picks whether to us "JDBC" or "CSV" so copying that
> wouldn't be helpful either.
>
> I hope this clears things up,
> Russ
>
> On Sat, Mar 28, 2020, 12:33 AM Zahid Rahman <zahidr1...@gmail.com> wrote:
>
>> Hi,
>> version: spark-3.0.0-preview2-bin-hadoop2.7
>>
>> As you can see from the code :
>>
>> STEP 1:  I  create a object of type static frame which holds all the
>> information to the datasource (csv files).
>>
>> STEP 2: Then I create a variable  called staticSchema  assigning the
>> information of the schema from the original static data frame.
>>
>> STEP 3: then I create another variable called val streamingDataFrame of
>> type spark.readStream.
>> and Into the .schema function parameters I pass the object staticSchema
>> which is meant to hold the information to the  csv files including the
>> .load(path) function etc.
>>
>> So then when I am creating val StreamingDataFrame and passing it
>> .schema(staticSchema)
>> the variable StreamingDataFrame  should have all the information.
>> I should only have to call .option("maxFilePerTrigger",1) and not .format
>> ("csv") .option("header","true").load("/data/retail-data/by-day/*.csv")
>> Otherwise what is the point of passing .schema(staticSchema) to
>> StreamingDataFrame.
>>
>> You can replicate it using the complete code below.
>>
>> import org.apache.spark.sql.SparkSession
>> import org.apache.spark.sql.functions.{window,column,desc,col}
>>
>> object RetailData {
>>
>>   def main(args: Array[String]): Unit = {
>>
>>     // create spark session
>>     val spark = 
>> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Retail 
>> Data").getOrCreate();
>>     // set spark runtime  configuration
>>     spark.conf.set("spark.sql.shuffle.partitions","5")
>>     
>> spark.conf.set("spark.sql.streaming.forceDeleteTempCheckpointLocation","True")
>>
>>     // create a static frame
>>   val staticDataFrame = spark.read.format("csv")
>>     .option ("header","true")
>>     .option("inferschema","true")
>>     .load("/data/retail-data/by-day/*.csv")
>>
>>
>>     staticDataFrame.createOrReplaceTempView("retail_data")
>>     val staticSchema = staticDataFrame.schema
>>
>>     staticDataFrame
>>       .selectExpr(
>>         "CustomerId","UnitPrice * Quantity as total_cost", "InvoiceDate")
>>       .groupBy(col("CustomerId"),
>>         window(col("InvoiceDate"),
>>         "1 day"))
>>       .sum("total_cost")
>>       .sort(desc("sum(total_cost)"))
>>       .show(2)
>>
>>     val streamingDataFrame = spark.readStream
>>       .schema(staticSchema)
>>       .format("csv")
>>       .option("maxFilesPerTrigger", 1)
>>       .option("header","true")
>>       .load("/data/retail-data/by-day/*.csv")
>>
>>       println(streamingDataFrame.isStreaming)
>>
>>     // lazy operation so we will need to call a streaming action to start 
>> the action
>>     val purchaseByCustomerPerHour = streamingDataFrame
>>     .selectExpr(
>>       "CustomerId",
>>       "(UnitPrice * Quantity) as total_cost",
>>       "InvoiceDate")
>>     .groupBy(
>>       col("CustomerId"), window(col("InvoiceDate"), "1 day"))
>>     .sum("total_cost")
>>
>>     // stream action to write to console
>>     purchaseByCustomerPerHour.writeStream
>>       .format("console")
>>       .queryName("customer_purchases")
>>       .outputMode("complete")
>>       .start()
>>
>>   } // main
>>
>> } // object
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> val staticSchema = staticDataFrame.schema
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> Backbutton.co.uk
>> ¯\_(ツ)_/¯
>> ♡۶Java♡۶RMI ♡۶
>> Make Use Method {MUM}
>> makeuse.org
>> <http://www.backbutton.co.uk>
>>
>

Reply via email to