[ 
https://issues.apache.org/jira/browse/SPARK-57489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18089337#comment-18089337
 ] 

Jungtaek Lim commented on SPARK-57489:
--------------------------------------

Will submit a PR for this soon.

> Deterministic dropDuplicates key resolution via shared analyzer rule
> --------------------------------------------------------------------
>
>                 Key: SPARK-57489
>                 URL: https://issues.apache.org/jira/browse/SPARK-57489
>             Project: Spark
>          Issue Type: Task
>          Components: Structured Streaming
>    Affects Versions: 4.3.0
>            Reporter: Jungtaek Lim
>            Priority: Major
>
> The operator `dropDuplicates` performs the deduplication among input rows, 
> via checking the values of columns with the provided set of columns. User 
> specifies the columns to use for deduplication, while Spark needs to map 
> those columns to attributes. Unlike other built-in operators (Project, 
> Filter) which defer to the resolution of column to attribute, Spark handles 
> the resolution in DataFrame implementation.
> This introduces two major issues:
> 1. An optimization ends up with a behavior requirement
> In Spark Classic, DataFrame implementation does not only do the resolution, 
> but also does the optimization of deduplicating column names if a user ever 
> specifies the same column name multiple times. (Or, the schema of DataFrame 
> somehow has the same column name multiple times.)
> | private def groupColsFromDropDuplicates({_}colNames{_}: Seq[String]): 
> Seq[Attribute] = {
>    val resolver = sparkSession.sessionState.analyzer.resolver
>    val allColumns = queryExecution.analyzed.output
>    _// SPARK-31990: We must keep `toSet.toSeq` here because of the backward 
> compatibility issue_
>    _// (the Streaming's state store depends on the `groupCols` order)._
>    colNames{color:#FF0000}.toSet.toSeq{color}.flatMap { ({_}colName{_}: 
> String) =>
>      _// It is possibly there are more than one columns with the same name,_
>      _// so we call filter instead of find._
>      val cols = allColumns.filter(col => resolver(col.name, colName))
>      if (cols.isEmpty) {
>        throw 
> QueryCompilationErrors.cannotResolveColumnNameAmongAttributesError(
>          colName, schema.fieldNames.mkString(", "))
>      }
>      cols
>    }
>  }|
> For the code colored as red, Spark leverages Set to deduplicate the column 
> names, which is rather intuitive that the ordering is not guaranteed. This is 
> fine for batch query because the output is not impacted by the order of 
> columns here, but it doesn’t work with the stateful version of 
> dropDuplicates, because the state store binds the column by position rather 
> than by name.
> Noting the mention of SPARK-31990 - Apache Spark community tried replacing it 
> with distinct (it wasn’t actually to guarantee the ordering as well - it was 
> just to make the code be concise) and the community had to roll it back since 
> it worked differently. After rolling back the change, the community didn’t 
> have time to revisit this to fix this holistically.
> 2. Different implementations between Spark Classic and Spark Connect
> Fortunately, Spark Connect does not do the optimization we mentioned in the 
> above section. (It’s still not ideal to do the resolution in the DataFrame 
> implementation, though.)
> |   val queryExecution = new QueryExecution(session, 
> transformRelation(rel.getInput))
>    val resolver = session.sessionState.analyzer.resolver
>    val allColumns = queryExecution.analyzed.output
>    if (rel.getAllColumnsAsKeys) {
>      Deduplicate(allColumns, queryExecution.analyzed)
>    } else {
>      val toGroupColumnNames = rel.getColumnNamesList.asScala.toSeq
>      val groupCols = toGroupColumnNames.flatMap { ({_}colName{_}: String) =>
>        _// It is possibly there are more than one columns with the same name,_
>        _// so we call filter instead of find._
>        val cols = allColumns.filter(col => resolver(col.name, colName))
>        if (cols.isEmpty) {
>          val fieldNames = allColumns.map(_.name).mkString(", ")
>          throw InvalidInputErrors.invalidDeduplicateColumn(colName, 
> fieldNames)
>        }
>        cols
>      }
>      Deduplicate(groupCols, queryExecution.analyzed)
>    }|
> But this introduces another headache, migrating the streaming query which ran 
> with Spark Classic to Spark Connect. The path of migration only works when 
> toSet is literally no-op (ordering is preserved), and for other cases, Spark 
> will reject the streaming query to run (or even worse, silently introducing 
> correctness issues).
>  
> This ticket tracks the effort of addressing two issues altogether via 
> ensuring deterministic dropDuplicates key resolution via shared analyzer 
> rule. This changes the state schema for streaming dropDuplicates, hence we 
> need to also deal with pinning.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to