Jungtaek Lim created SPARK-57489:
------------------------------------
Summary: Deterministic dropDuplicates key resolution via shared
analyzer rule
Key: SPARK-57489
URL: https://issues.apache.org/jira/browse/SPARK-57489
Project: Spark
Issue Type: Task
Components: Structured Streaming
Affects Versions: 4.3.0
Reporter: Jungtaek Lim
The operator `dropDuplicates` performs the deduplication among input rows, via
checking the values of columns with the provided set of columns. User specifies
the columns to use for deduplication, while Spark needs to map those columns to
attributes. Unlike other built-in operators (Project, Filter) which defer to
the resolution of column to attribute, Spark handles the resolution in
DataFrame implementation.
This introduces two major issues:
1. An optimization ends up with a behavior requirement
In Spark Classic, DataFrame implementation does not only do the resolution, but
also does the optimization of deduplicating column names if a user ever
specifies the same column name multiple times. (Or, the schema of DataFrame
somehow has the same column name multiple times.)
| private def groupColsFromDropDuplicates({_}colNames{_}: Seq[String]):
Seq[Attribute] = {
val resolver = sparkSession.sessionState.analyzer.resolver
val allColumns = queryExecution.analyzed.output
_// SPARK-31990: We must keep `toSet.toSeq` here because of the backward
compatibility issue_
_// (the Streaming's state store depends on the `groupCols` order)._
colNames{color:#FF0000}.toSet.toSeq{color}.flatMap { ({_}colName{_}: String)
=>
_// It is possibly there are more than one columns with the same name,_
_// so we call filter instead of find._
val cols = allColumns.filter(col => resolver(col.name, colName))
if (cols.isEmpty) {
throw QueryCompilationErrors.cannotResolveColumnNameAmongAttributesError(
colName, schema.fieldNames.mkString(", "))
}
cols
}
}|
For the code colored as red, Spark leverages Set to deduplicate the column
names, which is rather intuitive that the ordering is not guaranteed. This is
fine for batch query because the output is not impacted by the order of columns
here, but it doesn’t work with the stateful version of dropDuplicates, because
the state store binds the column by position rather than by name.
Noting the mention of SPARK-31990 - Apache Spark community tried replacing it
with distinct (it wasn’t actually to guarantee the ordering as well - it was
just to make the code be concise) and the community had to roll it back since
it worked differently. After rolling back the change, the community didn’t have
time to revisit this to fix this holistically.
2. Different implementations between Spark Classic and Spark Connect
Fortunately, Spark Connect does not do the optimization we mentioned in the
above section. (It’s still not ideal to do the resolution in the DataFrame
implementation, though.)
| val queryExecution = new QueryExecution(session,
transformRelation(rel.getInput))
val resolver = session.sessionState.analyzer.resolver
val allColumns = queryExecution.analyzed.output
if (rel.getAllColumnsAsKeys) {
Deduplicate(allColumns, queryExecution.analyzed)
} else {
val toGroupColumnNames = rel.getColumnNamesList.asScala.toSeq
val groupCols = toGroupColumnNames.flatMap { ({_}colName{_}: String) =>
_// It is possibly there are more than one columns with the same name,_
_// so we call filter instead of find._
val cols = allColumns.filter(col => resolver(col.name, colName))
if (cols.isEmpty) {
val fieldNames = allColumns.map(_.name).mkString(", ")
throw InvalidInputErrors.invalidDeduplicateColumn(colName, fieldNames)
}
cols
}
Deduplicate(groupCols, queryExecution.analyzed)
}|
But this introduces another headache, migrating the streaming query which ran
with Spark Classic to Spark Connect. The path of migration only works when
toSet is literally no-op (ordering is preserved), and for other cases, Spark
will reject the streaming query to run (or even worse, silently introducing
correctness issues).
This ticket tracks the effort of addressing two issues altogether via ensuring
deterministic dropDuplicates key resolution via shared analyzer rule. This
changes the state schema for streaming dropDuplicates, hence we need to also
deal with pinning.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]