chenjunjiedada commented on a change in pull request #2216:
URL: https://github.com/apache/iceberg/pull/2216#discussion_r589042721
##########
File path: data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
##########
@@ -110,7 +110,44 @@ protected long pos(T record) {
return applyEqDeletes(applyPosDeletes(records));
}
- private CloseableIterable<T> applyEqDeletes(CloseableIterable<T> records) {
+ public CloseableIterable<T> matchEqDeletes(CloseableIterable<T> records) {
+ if (eqDeletes.isEmpty()) {
+ return records;
+ }
+
+ Multimap<Set<Integer>, DeleteFile> filesByDeleteIds =
Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList);
+ for (DeleteFile delete : eqDeletes) {
+ filesByDeleteIds.put(Sets.newHashSet(delete.equalityFieldIds()), delete);
+ }
+
+ CloseableIterable<T> remainRecords = records;
+ CloseableIterable<T> matchedRecords = CloseableIterable.empty();
+ for (Map.Entry<Set<Integer>, Collection<DeleteFile>> entry :
filesByDeleteIds.asMap().entrySet()) {
+ Set<Integer> ids = entry.getKey();
+ Iterable<DeleteFile> deletes = entry.getValue();
+
+ Schema deleteSchema = TypeUtil.select(requiredSchema, ids);
+
+ // a projection to select and reorder fields of the file schema to match
the delete rows
+ StructProjection projectRow = StructProjection.create(requiredSchema,
deleteSchema);
+
+ Iterable<CloseableIterable<Record>> deleteRecords =
Iterables.transform(deletes,
+ delete -> openDeletes(delete, deleteSchema));
+ StructLikeSet deleteSet = Deletes.toEqualitySet(
+ // copy the delete records because they will be held in a set
+ CloseableIterable.transform(CloseableIterable.concat(deleteRecords),
Record::copy),
+ deleteSchema.asStruct());
+
+ matchedRecords =
CloseableIterable.concat(Lists.newArrayList(matchedRecords,
Deletes.match(remainRecords,
Review comment:
I think it would not iterate the data set several times since these are
iterable chains and should be computed lazily.
For a filter chain of a data set of `N` elements with filters (`F1, F2, F3,
F4`), suppose it will filter out (`N1, N2, N3, N4`) items, I think it iterates
data set one time and the number of filter calls should be:
- `F1` N times
- `F2` (N-N1) times
- `F3` (N-N1-N2) times
- `F4` (N-N1-N2-N3) times
For a matching chain of a data set of `N` elements with filters (`F1, F2,
F3, F4`), suppose it matches out (`N1, N2, N3, N4`) items, I think it iterates
data set one time and the number of filter calls should be:
- `F1` 2N times (filter and match)
- `F2` 2(N-N1) times (filter and match)
- `F3` 2(N-N1-N2) times (filter and match)
- `F4` 2(N-N1-N2-N3) times (filter and match)
@rdblue Could you please help to correct me if I am wrong?
Here is an alternative implementation that collects all delete sets in a
list and does the projection in the filter. It doesn't depend on temporary
iterables and looks a bit straightforward. I could change to this one if you
like it.
```java
public static <T> CloseableIterable<T> match(CloseableIterable<T> rows,
BiFunction<T,
StructProjection, StructLike> rowToDeleteKey,
List<Pair<StructProjection,
StructLikeSet>> unprojectedDeleteSets) {
if (unprojectedDeleteSets.isEmpty()) {
return rows;
}
EqualitySetDeleteMatcher<T> equalityFilter = new
EqualitySetDeleteMatcher<>(rowToDeleteKey, unprojectedDeleteSets);
return equalityFilter.filter(rows);
}
private static class EqualitySetDeleteMatcher<T> extends Filter<T> {
private final List<Pair<StructProjection, StructLikeSet>> deleteSets;
private final BiFunction<T, StructProjection, StructLike>
extractEqStruct;
protected EqualitySetDeleteMatcher(BiFunction<T, StructProjection,
StructLike> extractEq,
List<Pair<StructProjection,
StructLikeSet>> deleteSets) {
this.extractEqStruct = extractEq;
this.deleteSets = deleteSets;
}
@Override
protected boolean shouldKeep(T row) {
for (Pair<StructProjection, StructLikeSet> deleteSet : deleteSets) {
if (deleteSet.second().contains(extractEqStruct.apply(row,
deleteSet.first()))) {
return true;
}
}
return false;
}
}
```
PS: For the delete files with the same equality field IDs we will collect
the deletes in one set.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]