Jungtaek Lim created SPARK-57489:
------------------------------------

             Summary: Deterministic dropDuplicates key resolution via shared 
analyzer rule
                 Key: SPARK-57489
                 URL: https://issues.apache.org/jira/browse/SPARK-57489
             Project: Spark
          Issue Type: Task
          Components: Structured Streaming
    Affects Versions: 4.3.0
            Reporter: Jungtaek Lim


The operator `dropDuplicates` performs the deduplication among input rows, via 
checking the values of columns with the provided set of columns. User specifies 
the columns to use for deduplication, while Spark needs to map those columns to 
attributes. Unlike other built-in operators (Project, Filter) which defer to 
the resolution of column to attribute, Spark handles the resolution in 
DataFrame implementation.

This introduces two major issues:

1. An optimization ends up with a behavior requirement


In Spark Classic, DataFrame implementation does not only do the resolution, but 
also does the optimization of deduplicating column names if a user ever 
specifies the same column name multiple times. (Or, the schema of DataFrame 
somehow has the same column name multiple times.)
| private def groupColsFromDropDuplicates({_}colNames{_}: Seq[String]): 
Seq[Attribute] = {
   val resolver = sparkSession.sessionState.analyzer.resolver
   val allColumns = queryExecution.analyzed.output
   _// SPARK-31990: We must keep `toSet.toSeq` here because of the backward 
compatibility issue_
   _// (the Streaming's state store depends on the `groupCols` order)._
   colNames{color:#FF0000}.toSet.toSeq{color}.flatMap { ({_}colName{_}: String) 
=>
     _// It is possibly there are more than one columns with the same name,_
     _// so we call filter instead of find._
     val cols = allColumns.filter(col => resolver(col.name, colName))
     if (cols.isEmpty) {
       throw QueryCompilationErrors.cannotResolveColumnNameAmongAttributesError(
         colName, schema.fieldNames.mkString(", "))
     }
     cols
   }
 }|

For the code colored as red, Spark leverages Set to deduplicate the column 
names, which is rather intuitive that the ordering is not guaranteed. This is 
fine for batch query because the output is not impacted by the order of columns 
here, but it doesn’t work with the stateful version of dropDuplicates, because 
the state store binds the column by position rather than by name.

Noting the mention of SPARK-31990 - Apache Spark community tried replacing it 
with distinct (it wasn’t actually to guarantee the ordering as well - it was 
just to make the code be concise) and the community had to roll it back since 
it worked differently. After rolling back the change, the community didn’t have 
time to revisit this to fix this holistically.

2. Different implementations between Spark Classic and Spark Connect


Fortunately, Spark Connect does not do the optimization we mentioned in the 
above section. (It’s still not ideal to do the resolution in the DataFrame 
implementation, though.)
|   val queryExecution = new QueryExecution(session, 
transformRelation(rel.getInput))
   val resolver = session.sessionState.analyzer.resolver
   val allColumns = queryExecution.analyzed.output
   if (rel.getAllColumnsAsKeys) {
     Deduplicate(allColumns, queryExecution.analyzed)
   } else {
     val toGroupColumnNames = rel.getColumnNamesList.asScala.toSeq
     val groupCols = toGroupColumnNames.flatMap { ({_}colName{_}: String) =>
       _// It is possibly there are more than one columns with the same name,_
       _// so we call filter instead of find._
       val cols = allColumns.filter(col => resolver(col.name, colName))
       if (cols.isEmpty) {
         val fieldNames = allColumns.map(_.name).mkString(", ")
         throw InvalidInputErrors.invalidDeduplicateColumn(colName, fieldNames)
       }
       cols
     }
     Deduplicate(groupCols, queryExecution.analyzed)
   }|

But this introduces another headache, migrating the streaming query which ran 
with Spark Classic to Spark Connect. The path of migration only works when 
toSet is literally no-op (ordering is preserved), and for other cases, Spark 
will reject the streaming query to run (or even worse, silently introducing 
correctness issues).

 

This ticket tracks the effort of addressing two issues altogether via ensuring 
deterministic dropDuplicates key resolution via shared analyzer rule. This 
changes the state schema for streaming dropDuplicates, hence we need to also 
deal with pinning.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to