[
https://issues.apache.org/jira/browse/SPARK-57489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jungtaek Lim resolved SPARK-57489.
----------------------------------
Fix Version/s: 4.3.0
Resolution: Fixed
Issue resolved by pull request 56546
[https://github.com/apache/spark/pull/56546]
> Deterministic dropDuplicates key resolution via shared analyzer rule
> --------------------------------------------------------------------
>
> Key: SPARK-57489
> URL: https://issues.apache.org/jira/browse/SPARK-57489
> Project: Spark
> Issue Type: Task
> Components: Structured Streaming
> Affects Versions: 4.3.0
> Reporter: Jungtaek Lim
> Assignee: Jungtaek Lim
> Priority: Major
> Labels: pull-request-available
> Fix For: 4.3.0
>
>
> The operator `dropDuplicates` performs the deduplication among input rows,
> via checking the values of columns with the provided set of columns. User
> specifies the columns to use for deduplication, while Spark needs to map
> those columns to attributes. Unlike other built-in operators (Project,
> Filter) which defer to the resolution of column to attribute, Spark handles
> the resolution in DataFrame implementation.
> This introduces two major issues:
> 1. An optimization ends up with a behavior requirement
> In Spark Classic, DataFrame implementation does not only do the resolution,
> but also does the optimization of deduplicating column names if a user ever
> specifies the same column name multiple times. (Or, the schema of DataFrame
> somehow has the same column name multiple times.)
> | private def groupColsFromDropDuplicates({_}colNames{_}: Seq[String]):
> Seq[Attribute] = {
> val resolver = sparkSession.sessionState.analyzer.resolver
> val allColumns = queryExecution.analyzed.output
> _// SPARK-31990: We must keep `toSet.toSeq` here because of the backward
> compatibility issue_
> _// (the Streaming's state store depends on the `groupCols` order)._
> colNames{color:#FF0000}.toSet.toSeq{color}.flatMap { ({_}colName{_}:
> String) =>
> _// It is possibly there are more than one columns with the same name,_
> _// so we call filter instead of find._
> val cols = allColumns.filter(col => resolver(col.name, colName))
> if (cols.isEmpty) {
> throw
> QueryCompilationErrors.cannotResolveColumnNameAmongAttributesError(
> colName, schema.fieldNames.mkString(", "))
> }
> cols
> }
> }|
> For the code colored as red, Spark leverages Set to deduplicate the column
> names, which is rather intuitive that the ordering is not guaranteed. This is
> fine for batch query because the output is not impacted by the order of
> columns here, but it doesn’t work with the stateful version of
> dropDuplicates, because the state store binds the column by position rather
> than by name.
> Noting the mention of SPARK-31990 - Apache Spark community tried replacing it
> with distinct (it wasn’t actually to guarantee the ordering as well - it was
> just to make the code be concise) and the community had to roll it back since
> it worked differently. After rolling back the change, the community didn’t
> have time to revisit this to fix this holistically.
> 2. Different implementations between Spark Classic and Spark Connect
> Fortunately, Spark Connect does not do the optimization we mentioned in the
> above section. (It’s still not ideal to do the resolution in the DataFrame
> implementation, though.)
> | val queryExecution = new QueryExecution(session,
> transformRelation(rel.getInput))
> val resolver = session.sessionState.analyzer.resolver
> val allColumns = queryExecution.analyzed.output
> if (rel.getAllColumnsAsKeys) {
> Deduplicate(allColumns, queryExecution.analyzed)
> } else {
> val toGroupColumnNames = rel.getColumnNamesList.asScala.toSeq
> val groupCols = toGroupColumnNames.flatMap { ({_}colName{_}: String) =>
> _// It is possibly there are more than one columns with the same name,_
> _// so we call filter instead of find._
> val cols = allColumns.filter(col => resolver(col.name, colName))
> if (cols.isEmpty) {
> val fieldNames = allColumns.map(_.name).mkString(", ")
> throw InvalidInputErrors.invalidDeduplicateColumn(colName,
> fieldNames)
> }
> cols
> }
> Deduplicate(groupCols, queryExecution.analyzed)
> }|
> But this introduces another headache, migrating the streaming query which ran
> with Spark Classic to Spark Connect. The path of migration only works when
> toSet is literally no-op (ordering is preserved), and for other cases, Spark
> will reject the streaming query to run (or even worse, silently introducing
> correctness issues).
>
> This ticket tracks the effort of addressing two issues altogether via
> ensuring deterministic dropDuplicates key resolution via shared analyzer
> rule. This changes the state schema for streaming dropDuplicates, hence we
> need to also deal with pinning.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]