rdblue commented on a change in pull request #3369:
URL: https://github.com/apache/iceberg/pull/3369#discussion_r735761691
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
##########
@@ -212,33 +220,44 @@ private String getRowLevelOperationMode(String operation)
{
@Override
public boolean canDeleteWhere(Filter[] filters) {
- if (table().specs().size() > 1) {
- // cannot guarantee a metadata delete will be successful if we have
multiple specs
- return false;
- }
-
- Set<Integer> identitySourceIds = table().spec().identitySourceIds();
- Schema schema = table().schema();
+ Expression deleteExpr = Expressions.alwaysTrue();
for (Filter filter : filters) {
- // return false if the filter requires rewrite or if we cannot translate
the filter
- if (requiresRewrite(filter, schema, identitySourceIds) ||
SparkFilters.convert(filter) == null) {
+ Expression expr = SparkFilters.convert(filter);
+ if (expr != null) {
+ deleteExpr = Expressions.and(deleteExpr, expr);
+ } else {
return false;
}
}
- return true;
+ return deleteExpr == Expressions.alwaysTrue() ||
canDeleteUsingMetadata(deleteExpr);
}
- private boolean requiresRewrite(Filter filter, Schema schema, Set<Integer>
identitySourceIds) {
- // TODO: handle dots correctly via v2references
- // TODO: detect more cases that don't require rewrites
- Set<String> filterRefs = Sets.newHashSet(filter.references());
- return filterRefs.stream().anyMatch(ref -> {
- Types.NestedField field = schema.findField(ref);
- ValidationException.check(field != null, "Cannot find field %s in
schema", ref);
- return !identitySourceIds.contains(field.fieldId());
- });
+ // a metadata delete is possible iff matching files can be deleted entirely
+ private boolean canDeleteUsingMetadata(Expression deleteExpr) {
+ TableScan scan = table().newScan()
+ .filter(deleteExpr)
+ .includeColumnStats()
+ .ignoreResiduals();
+
+ try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) {
+ Map<Integer, Evaluator> evaluators = Maps.newHashMap();
+ StrictMetricsEvaluator metricsEvaluator = new
StrictMetricsEvaluator(table().schema(), deleteExpr);
+
+ return Iterables.all(tasks, task -> {
+ DataFile file = task.file();
+ PartitionSpec spec = task.spec();
+ Evaluator evaluator = evaluators.computeIfAbsent(
+ spec.specId(),
+ specId -> new Evaluator(spec.partitionType(),
Projections.strict(spec).project(deleteExpr)));
+ return evaluator.eval(file.partition()) || metricsEvaluator.eval(file);
Review comment:
This looks correct to me.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]