aokolnychyi commented on a change in pull request #2017:
URL: https://github.com/apache/iceberg/pull/2017#discussion_r550998490
##########
File path:
spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDelete.scala
##########
@@ -66,117 +75,120 @@ object RewriteDelete extends Rule[LogicalPlan] with
PredicateHelper with Logging
private val FILE_NAME_COL = "_file"
private val ROW_POS_COL = "_pos"
- override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
- // don't rewrite deletes that can be answered by passing filters to
deleteWhere in SupportsDelete
- case d @ DeleteFromTable(r: DataSourceV2Relation, Some(cond)) if
isMetadataDelete(r, cond) =>
- d
+ private case class MergeTable(
+ table: Table with SupportsMerge,
+ operation: String) extends Table with SupportsRead with SupportsWrite {
+ val mergeBuilder: MergeBuilder = table.newMergeBuilder(operation,
newWriteInfo(table.schema))
+
+ override def name: String = table.name
+ override def schema: StructType = table.schema
+ override def partitioning: Array[Transform] = table.partitioning
+ override def properties: util.Map[String, String] = table.properties
+ override def capabilities: util.Set[TableCapability] = table.capabilities
+ override def toString: String = table.toString
+
+ // TODO: refactor merge builder to accept options and info after
construction
+ override def newScanBuilder(options: CaseInsensitiveStringMap):
ScanBuilder = mergeBuilder.asScanBuilder()
+ override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder =
mergeBuilder.asWriteBuilder()
+
+ private def newWriteInfo(schema: StructType): LogicalWriteInfo = {
+ val uuid = UUID.randomUUID()
+ LogicalWriteInfoImpl(queryId = uuid.toString, schema,
CaseInsensitiveStringMap.empty)
+ }
+ }
- // rewrite all operations that require reading the table to delete records
- case DeleteFromTable(r: DataSourceV2Relation, Some(cond)) =>
- // TODO: do a switch based on whether we get BatchWrite or
DeltaBatchWrite
- val writeInfo = newWriteInfo(r.schema)
- val mergeBuilder = r.table.asMergeable.newMergeBuilder("delete",
writeInfo)
+ private class DeletableMergeTable(
+ table: Table with SupportsMerge with ExtendedSupportsDelete,
+ operation: String) extends MergeTable(table, operation) with
ExtendedSupportsDelete {
+ override def canDeleteWhere(filters: Array[sources.Filter]): Boolean =
table.canDeleteWhere(filters)
+ override def deleteWhere(filters: Array[sources.Filter]): Unit =
table.deleteWhere(filters)
+ }
- val scanPlan = buildScanPlan(r.table, r.output, mergeBuilder, cond)
+ override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+ case DeleteFromTable(r, Some(cond)) =>
+ // TODO: do a switch based on whether we get BatchWrite or
DeltaBatchWrite
+ val relation = r.collectFirst {
+ case v2: DataSourceV2Relation =>
+ v2
+ }.get
+
+ val mergeTable = relation.table match {
+ case withDelete: Table with SupportsMerge with ExtendedSupportsDelete
=>
+ new DeletableMergeTable(withDelete, "delete")
+ case _ =>
+ MergeTable(relation.table.asMergeable, "delete")
+ }
+ val scanPlan = buildScanPlan(mergeTable, relation, cond)
- val remainingRowFilter = Not(EqualNullSafe(cond, Literal(true,
BooleanType)))
+ val remainingRowFilter = not(cond)
val remainingRowsPlan = Filter(remainingRowFilter, scanPlan)
- val mergeWrite = mergeBuilder.asWriteBuilder.buildForBatch()
- val writePlan = buildWritePlan(remainingRowsPlan, r.output)
- ReplaceData(r, mergeWrite, writePlan)
+ val writePlan = buildWritePlan(remainingRowsPlan, relation.output)
+ val writeRelation = relation.copy(table = mergeTable, output =
addFileAndPos(relation.output))
+
+ if (SubqueryExpression.hasSubquery(cond)) {
+ DeleteFrom(writeRelation, None, writePlan, None)
+ } else {
+ DeleteFrom(writeRelation, Some(cond), writePlan, None)
+ }
}
private def buildScanPlan(
- table: Table,
- output: Seq[AttributeReference],
- mergeBuilder: MergeBuilder,
+ mergeTable: MergeTable,
+ tableRelation: DataSourceV2Relation,
cond: Expression): LogicalPlan = {
+ val mergeRelation = tableRelation.copy(table = mergeTable, output =
addFileAndPos(tableRelation.output))
- val scanBuilder = mergeBuilder.asScanBuilder
-
- val predicates = splitConjunctivePredicates(cond)
- val normalizedPredicates = DataSourceStrategy.normalizeExprs(predicates,
output)
- PushDownUtils.pushFilters(scanBuilder, normalizedPredicates)
-
- val scan = scanBuilder.build()
- val scanRelation = DataSourceV2ScanRelation(table, scan,
toOutputAttrs(scan.readSchema(), output))
-
- scan match {
+ mergeTable.mergeBuilder match {
case filterable: SupportsFileFilter =>
- val matchingFilePlan = buildFileFilterPlan(cond, scanRelation)
- val dynamicFileFilter = DynamicFileFilter(scanRelation,
matchingFilePlan, filterable)
- dynamicFileFilter
+ val filterRelation = tableRelation.copy(output =
addFileAndPos(tableRelation.output))
+ val filteredFiles = new FilterFiles()
+ val matchingFilePlan = buildFileFilterPlan(cond, filterRelation)
+ filterable.filterFiles(filteredFiles)
+ DynamicFileFilter(mergeRelation, matchingFilePlan, filteredFiles)
+
case _ =>
- scanRelation
+ mergeRelation
}
}
private def buildWritePlan(
remainingRowsPlan: LogicalPlan,
output: Seq[AttributeReference]): LogicalPlan = {
- val fileNameCol = findOutputAttr(remainingRowsPlan, FILE_NAME_COL)
- val rowPosCol = findOutputAttr(remainingRowsPlan, ROW_POS_COL)
+ val fileNameCol = UnresolvedAttribute(FILE_NAME_COL)
+ val rowPosCol = UnresolvedAttribute(ROW_POS_COL)
val order = Seq(SortOrder(fileNameCol, Ascending), SortOrder(rowPosCol,
Ascending))
val numShufflePartitions = SQLConf.get.numShufflePartitions
val repartition = RepartitionByExpression(Seq(fileNameCol),
remainingRowsPlan, numShufflePartitions)
val sort = Sort(order, global = false, repartition)
Project(output, sort)
}
- private def isMetadataDelete(relation: DataSourceV2Relation, cond:
Expression): Boolean = {
- relation.table match {
- case t: ExtendedSupportsDelete if !SubqueryExpression.hasSubquery(cond)
=>
- val predicates = splitConjunctivePredicates(cond)
- val normalizedPredicates =
DataSourceStrategy.normalizeExprs(predicates, relation.output)
- val dataSourceFilters = toDataSourceFilters(normalizedPredicates)
- val allPredicatesTranslated = normalizedPredicates.size ==
dataSourceFilters.length
- allPredicatesTranslated && t.canDeleteWhere(dataSourceFilters)
- case _ => false
- }
- }
-
- private def toDataSourceFilters(predicates: Seq[Expression]):
Array[sources.Filter] = {
- predicates.flatMap { p =>
- val translatedFilter = DataSourceStrategy.translateFilter(p,
supportNestedPredicatePushdown = true)
- if (translatedFilter.isEmpty) {
- logWarning(s"Cannot translate expression to source filter: $p")
- }
- translatedFilter
- }.toArray
- }
-
- private def newWriteInfo(schema: StructType): LogicalWriteInfo = {
- val uuid = UUID.randomUUID()
- LogicalWriteInfoImpl(queryId = uuid.toString, schema,
CaseInsensitiveStringMap.empty)
- }
-
- private def buildFileFilterPlan(cond: Expression, scanRelation:
DataSourceV2ScanRelation): LogicalPlan = {
+ private def buildFileFilterPlan(cond: Expression, scanRelation:
DataSourceV2Relation): LogicalPlan = {
val matchingFilter = Filter(cond, scanRelation)
- val fileAttr = findOutputAttr(matchingFilter, FILE_NAME_COL)
+ val fileAttr = UnresolvedAttribute(FILE_NAME_COL)
val agg = Aggregate(Seq(fileAttr), Seq(fileAttr), matchingFilter)
- Project(Seq(findOutputAttr(agg, FILE_NAME_COL)), agg)
+ Project(Seq(UnresolvedAttribute(FILE_NAME_COL)), agg)
}
- private def findOutputAttr(plan: LogicalPlan, attrName: String): Attribute =
{
- val resolver = SQLConf.get.resolver
- plan.output.find(attr => resolver(attr.name, attrName)).getOrElse {
- throw new AnalysisException(s"Cannot find $attrName in ${plan.output}")
+ private def not(expr: Expression): Expression = {
Review comment:
Why do we have to change this?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]