openinx commented on a change in pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372#discussion_r607599844
##########
File path: core/src/main/java/org/apache/iceberg/deletes/Deletes.java
##########
@@ -233,6 +246,37 @@ public void close() {
}
}
+ static class PositionStreamDeletedRowMarker<T> extends
PositionStreamDeleteFilter<T> {
+ private final Consumer<T> deleteMarker;
+
+ private PositionStreamDeletedRowMarker(CloseableIterable<T> rows,
Function<T, Long> extractPos,
+ CloseableIterable<Long>
deletePositions,
+ Consumer<T> deleteMarker) {
+ super(rows, extractPos, deletePositions);
+ this.deleteMarker = deleteMarker;
+ }
+
+ @Override
+ protected FilterIterator<T> positionIterator(CloseableIterator<T> items,
+ CloseableIterator<Long>
deletePositions) {
+ return new PositionMarkerIterator(items, deletePositions);
+ }
+
+ private class PositionMarkerIterator extends PositionFilterIterator {
+ protected PositionMarkerIterator(CloseableIterator<T> items,
CloseableIterator<Long> deletePositions) {
Review comment:
Nit: this could be `package-private`.
##########
File path: data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
##########
@@ -96,6 +98,23 @@ public Schema requiredSchema() {
return requiredSchema;
}
+ protected int deleteMarkerIndex() {
+ int index = 0;
+ for (Types.NestedField field : requiredSchema().columns()) {
+ if (field.fieldId() != MetadataColumns.DELETE_MARK.fieldId()) {
+ index = index + 1;
+ } else {
+ break;
+ }
+ }
+
+ return index;
+ }
+
+ protected abstract Consumer<T> deleteMarker();
+
+ protected abstract Function<T, Boolean> deleteChecker();
Review comment:
Checked all this usage, seems we don't have to return a `Function<T,
Boolean>`, it's more simpler to define a method like:
```java
protected abstract boolean isDeletedRow(T row);
```
##########
File path: data/src/main/java/org/apache/iceberg/data/GenericDeleteFilter.java
##########
@@ -40,6 +42,16 @@ protected long pos(Record record) {
return (Long) posAccessor().get(record);
}
+ @Override
+ protected Consumer<Record> deleteMarker() {
+ return record -> record.set(deleteMarkerIndex(), true);
+ }
+
+ @Override
+ protected Function<Record, Boolean> deleteChecker() {
+ return record -> record.get(deleteMarkerIndex(), Boolean.class);
Review comment:
Could we use the lazy approach to access the index of `_deleted`, this
is a very hot code path because each record will need to search the deleted
column index.
##########
File path: data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
##########
@@ -139,43 +158,142 @@ protected long pos(T record) {
CloseableIterable.transform(CloseableIterable.concat(deleteRecords),
Record::copy),
deleteSchema.asStruct());
- Predicate<T> isInDeleteSet = record ->
deleteSet.contains(projectRow.wrap(asStructLike(record)));
- isInDeleteSets.add(isInDeleteSet);
+ isDeleted = isDeleted == null ? record ->
deleteSet.contains(projectRow.wrap(asStructLike(record))) :
+ isDeleted.or(record ->
deleteSet.contains(projectRow.wrap(asStructLike(record))));
}
- return isInDeleteSets;
+ return isDeleted;
}
- public CloseableIterable<T> findEqualityDeleteRows(CloseableIterable<T>
records) {
- // Predicate to test whether a row has been deleted by equality deletions.
- Predicate<T> deletedRows = applyEqDeletes().stream()
- .reduce(Predicate::or)
- .orElse(t -> false);
+ private Predicate<T> buildPosDeletePredicate() {
+ if (posDeletes.isEmpty()) {
+ return null;
+ }
+
+ List<CloseableIterable<Record>> deletes = Lists.transform(posDeletes,
this::openPosDeletes);
+ Set<Long> deleteSet = Deletes.toPositionSet(dataFile.path(),
CloseableIterable.concat(deletes));
+ if (deleteSet.isEmpty()) {
+ return null;
+ }
+
+ return record -> deleteSet.contains(pos(record));
+ }
- Filter<T> deletedRowsFilter = new Filter<T>() {
+ public CloseableIterable<T> keepRowsFromDeletes(CloseableIterable<T>
records) {
+ Predicate<T> isDeletedFromPosDeletes = buildPosDeletePredicate();
+ if (isDeletedFromPosDeletes == null) {
+ return keepRowsFromEqualityDeletes(records);
+ }
+
+ Predicate<T> isDeletedFromEqDeletes = buildEqDeletePredicate();
+ if (isDeletedFromEqDeletes == null) {
+ return keepRowsFromPosDeletes(records);
+ }
+
+ CloseableIterable<T> markedRecords;
+
+ if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() <
setFilterThreshold) {
+ markedRecords = CloseableIterable.transform(records, record -> {
+ if (isDeletedFromPosDeletes.test(record) ||
isDeletedFromEqDeletes.test(record)) {
+ deleteMarker().accept(record);
+ }
+ return record;
+ });
+
+ } else {
+ List<CloseableIterable<Record>> deletes = Lists.transform(posDeletes,
this::openPosDeletes);
+ markedRecords =
CloseableIterable.transform(Deletes.streamingDeletedRowMarker(records,
this::pos,
+ Deletes.deletePositions(dataFile.path(), deletes), deleteMarker()),
record -> {
+ if (isDeletedFromEqDeletes.test(record)) {
Review comment:
If the row has been marked as `deleted` in the
`streamingDeletedRowMarker`, then I think we don't have to check this row
again for equality delete files. The double-check will effect the read
performance a lot.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]