openinx commented on a change in pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372#discussion_r607650826
##########
File path: data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
##########
@@ -139,43 +158,142 @@ protected long pos(T record) {
CloseableIterable.transform(CloseableIterable.concat(deleteRecords),
Record::copy),
deleteSchema.asStruct());
- Predicate<T> isInDeleteSet = record ->
deleteSet.contains(projectRow.wrap(asStructLike(record)));
- isInDeleteSets.add(isInDeleteSet);
+ isDeleted = isDeleted == null ? record ->
deleteSet.contains(projectRow.wrap(asStructLike(record))) :
+ isDeleted.or(record ->
deleteSet.contains(projectRow.wrap(asStructLike(record))));
}
- return isInDeleteSets;
+ return isDeleted;
}
- public CloseableIterable<T> findEqualityDeleteRows(CloseableIterable<T>
records) {
- // Predicate to test whether a row has been deleted by equality deletions.
- Predicate<T> deletedRows = applyEqDeletes().stream()
- .reduce(Predicate::or)
- .orElse(t -> false);
+ private Predicate<T> buildPosDeletePredicate() {
+ if (posDeletes.isEmpty()) {
+ return null;
+ }
+
+ List<CloseableIterable<Record>> deletes = Lists.transform(posDeletes,
this::openPosDeletes);
+ Set<Long> deleteSet = Deletes.toPositionSet(dataFile.path(),
CloseableIterable.concat(deletes));
+ if (deleteSet.isEmpty()) {
+ return null;
+ }
+
+ return record -> deleteSet.contains(pos(record));
+ }
- Filter<T> deletedRowsFilter = new Filter<T>() {
+ public CloseableIterable<T> keepRowsFromDeletes(CloseableIterable<T>
records) {
+ Predicate<T> isDeletedFromPosDeletes = buildPosDeletePredicate();
+ if (isDeletedFromPosDeletes == null) {
+ return keepRowsFromEqualityDeletes(records);
+ }
+
+ Predicate<T> isDeletedFromEqDeletes = buildEqDeletePredicate();
+ if (isDeletedFromEqDeletes == null) {
+ return keepRowsFromPosDeletes(records);
+ }
+
+ CloseableIterable<T> markedRecords;
+
+ if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() <
setFilterThreshold) {
+ markedRecords = CloseableIterable.transform(records, record -> {
+ if (isDeletedFromPosDeletes.test(record) ||
isDeletedFromEqDeletes.test(record)) {
+ deleteMarker().accept(record);
+ }
+ return record;
+ });
+
+ } else {
+ List<CloseableIterable<Record>> deletes = Lists.transform(posDeletes,
this::openPosDeletes);
+ markedRecords =
CloseableIterable.transform(Deletes.streamingDeletedRowMarker(records,
this::pos,
+ Deletes.deletePositions(dataFile.path(), deletes), deleteMarker()),
record -> {
+ if (isDeletedFromEqDeletes.test(record)) {
+ deleteMarker().accept(record);
+ }
+ return record;
+ });
+ }
+
+ Filter<T> deletedRowsSelector = new Filter<T>() {
Review comment:
For my understanding, we have three kinds of delete reader:
1. `keepRowsFromDeletes` will return the iterator that rows has been
deleted by both eq-deletes & pos-deletes;
2. `keepRowsFromEqualityDeletes` will return the iterator that rows has been
deleted by equality-deletes only;
3. `keepRowsFromPosDeletes` will return the iterator that rows has been
deleted by pos-deletes only.
The difference is: we provide different ways to generate the iterator that
produce the mixed deletions and rows, finally those three kinds of `Iterable`
should be all filtered by the same `deletedRowsSelector`. So I think we may
could share the same `deletedRowsSelector`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]