[ 
https://issues.apache.org/jira/browse/SPARK-36986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rodrigo Boavida updated SPARK-36986:
------------------------------------
    Priority: Major  (was: Minor)

> Improving external schema management flexibility
> ------------------------------------------------
>
>                 Key: SPARK-36986
>                 URL: https://issues.apache.org/jira/browse/SPARK-36986
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.3.0
>            Reporter: Rodrigo Boavida
>            Priority: Major
>
> Our spark usage, requires us to build an external schema and pass it on while 
> creating a DataSet.
> While working through this, I found a couple of optimizations would improve 
> greatly Spark's flexibility to handle external schema management.
> 1 - ability to retrieve a field's name and schema in one single call, 
> requesting to return a tupple by index. 
> Means extending the StructType class to support an additional method
> This is what the function would look like:
> /**
>  * Returns the index and field structure by name.
>  * If it doesn't find it, returns None.
>  * Avoids two client calls/loops to obtain consolidated field info.
>  *
>  */
>  def getIndexAndFieldByName(name: String): Option[(Int, StructField)] = {
>    val field = nameToField.get(name)
>    if(field.isDefined) \{     Some((fieldIndex(name), field.get))   }
> else
> {     None   }
> }
> This is particularly useful from an efficiency perspective, when we're 
> parsing a Json structure and we want to check for every field what is the 
> name and field type already defined in the schema
>  
> 2 - Allowing for a dataset to be created from a schema, and passing the 
> corresponding internal rows which the internal types map with the schema 
> already defined externally. This allows to create Spark fields based on any 
> data structure, without depending on Spark's internal conversions (in 
> particular for Json parsing), and improves performance by skipping the 
> CatalystConverts job of converting native Java types into Spark types.
> This is what the function would look like:
>  
> /**
>  * Creates a [[Dataset]] from an RDD of spark.sql.catalyst.InternalRow. This 
> method allows
>  * the caller to create externally the InternalRow set, as we as define the 
> schema externally.
>  *
>  * @since 3.3.0
>  */
>  def createDataset(data: RDD[InternalRow], schema: StructType): DataFrame = 
> \{   val attributes = schema.toAttributes   val plan = LogicalRDD(attributes, 
> data)(self)   val qe = sessionState.executePlan(plan)   qe.assertAnalyzed()   
> new Dataset[Row](this, plan, RowEncoder(schema)) }
>  
> This is similar to this function:
> def createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame
> But doesn't depend on Spark internally creating the RDD based by inferring 
> for example from a Json structure. Which is not useful if we're managing the 
> schema externally.
> Also skips the Catalyst conversions and corresponding object overhead, making 
> the internal rows generation much more efficient, by being done explicitly 
> from the caller.
>  
> I will create a corresponding branch for PR review, assuming that there are 
> no concerns with the above proposals.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to