[
https://issues.apache.org/jira/browse/SPARK-36986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Rodrigo Boavida updated SPARK-36986:
------------------------------------
Priority: Major (was: Minor)
> Improving external schema management flexibility
> ------------------------------------------------
>
> Key: SPARK-36986
> URL: https://issues.apache.org/jira/browse/SPARK-36986
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 3.3.0
> Reporter: Rodrigo Boavida
> Priority: Major
>
> Our spark usage, requires us to build an external schema and pass it on while
> creating a DataSet.
> While working through this, I found a couple of optimizations would improve
> greatly Spark's flexibility to handle external schema management.
> 1 - ability to retrieve a field's name and schema in one single call,
> requesting to return a tupple by index.
> Means extending the StructType class to support an additional method
> This is what the function would look like:
> /**
> * Returns the index and field structure by name.
> * If it doesn't find it, returns None.
> * Avoids two client calls/loops to obtain consolidated field info.
> *
> */
> def getIndexAndFieldByName(name: String): Option[(Int, StructField)] = {
> val field = nameToField.get(name)
> if(field.isDefined) \{ Some((fieldIndex(name), field.get)) }
> else
> { None }
> }
> This is particularly useful from an efficiency perspective, when we're
> parsing a Json structure and we want to check for every field what is the
> name and field type already defined in the schema
>
> 2 - Allowing for a dataset to be created from a schema, and passing the
> corresponding internal rows which the internal types map with the schema
> already defined externally. This allows to create Spark fields based on any
> data structure, without depending on Spark's internal conversions (in
> particular for Json parsing), and improves performance by skipping the
> CatalystConverts job of converting native Java types into Spark types.
> This is what the function would look like:
>
> /**
> * Creates a [[Dataset]] from an RDD of spark.sql.catalyst.InternalRow. This
> method allows
> * the caller to create externally the InternalRow set, as we as define the
> schema externally.
> *
> * @since 3.3.0
> */
> def createDataset(data: RDD[InternalRow], schema: StructType): DataFrame =
> \{ val attributes = schema.toAttributes val plan = LogicalRDD(attributes,
> data)(self) val qe = sessionState.executePlan(plan) qe.assertAnalyzed()
> new Dataset[Row](this, plan, RowEncoder(schema)) }
>
> This is similar to this function:
> def createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame
> But doesn't depend on Spark internally creating the RDD based by inferring
> for example from a Json structure. Which is not useful if we're managing the
> schema externally.
> Also skips the Catalyst conversions and corresponding object overhead, making
> the internal rows generation much more efficient, by being done explicitly
> from the caller.
>
> I will create a corresponding branch for PR review, assuming that there are
> no concerns with the above proposals.
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]