aokolnychyi commented on code in PR #4683:
URL: https://github.com/apache/iceberg/pull/4683#discussion_r874393713


##########
core/src/main/java/org/apache/iceberg/deletes/Deletes.java:
##########
@@ -63,14 +65,14 @@ public static <T> CloseableIterable<T> 
filter(CloseableIterable<T> rows, Functio
     return equalityFilter.filter(rows);
   }
 
-  public static <T> CloseableIterable<T> filter(CloseableIterable<T> rows, 
Function<T, Long> rowToPosition,
-                                                PositionDeleteIndex deleteSet) 
{
-    if (deleteSet.isEmpty()) {
-      return rows;
-    }
-
-    PositionSetDeleteFilter<T> filter = new 
PositionSetDeleteFilter<>(rowToPosition, deleteSet);
-    return filter.filter(rows);
+  public static <T> CloseableIterable<T> markDeleted(CloseableIterable<T> 
rows, Predicate<T> isDeleted,
+                                                     Consumer<T> deleteMarker) 
{
+    return CloseableIterable.transform(rows, row -> {
+      if (isDeleted.test(row)) {

Review Comment:
   I wonder whether we should be worried about evaluating an extra if condition 
for every single row and whether there is a way to rewrite it. Let's ignore 
this for now as there are quite a few ifs already.



##########
data/src/main/java/org/apache/iceberg/data/DeleteFilter.java:
##########
@@ -226,10 +226,15 @@ private CloseableIterable<T> 
applyPosDeletes(CloseableIterable<T> records) {
 
     // if there are fewer deletes than a reasonable number to keep in memory, 
use a set
     if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() < 
setFilterThreshold) {
-      return Deletes.filter(records, this::pos, 
Deletes.toPositionIndex(filePath, deletes));
+      Predicate<T> isInDeleteSet = record -> Deletes.toPositionIndex(filePath, 
deletes).isDeleted(pos(record));

Review Comment:
   Won't we call `Deletes$toPositionIndex` for every row now?



##########
data/src/main/java/org/apache/iceberg/data/DeleteFilter.java:
##########
@@ -290,8 +295,6 @@ private static Schema fileProjection(Schema tableSchema, 
Schema requestedSchema,
       requiredIds.addAll(eqDelete.equalityFieldIds());
     }
 
-    requiredIds.add(MetadataColumns.IS_DELETED.fieldId());

Review Comment:
   Could you comment on why this was removed?



##########
spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java:
##########
@@ -104,7 +103,6 @@ public class TestSparkParquetReadMetadataColumns {
       }
       row.update(1, UTF8String.fromString("str" + i));
       row.update(2, i);
-      row.update(3, false);

Review Comment:
   Why do we need these changes?



##########
data/src/main/java/org/apache/iceberg/data/DeleteFilter.java:
##########
@@ -94,6 +95,12 @@ protected DeleteFilter(String filePath, List<DeleteFile> 
deletes, Schema tableSc
     this.eqDeletes = eqDeleteBuilder.build();
     this.requiredSchema = fileProjection(tableSchema, requestedSchema, 
posDeletes, eqDeletes);
     this.posAccessor = 
requiredSchema.accessorForField(MetadataColumns.ROW_POSITION.fieldId());
+    this.hasColumnIsDeleted = 
requestedSchema.findField(MetadataColumns.IS_DELETED.fieldId()) != null;
+    this.columnIsDeletedPosition = 
requestedSchema.columns().indexOf(MetadataColumns.IS_DELETED);

Review Comment:
   Should we use `requiredSchema` or `requestedSchema`?



##########
data/src/main/java/org/apache/iceberg/data/DeleteFilter.java:
##########
@@ -94,6 +95,12 @@ protected DeleteFilter(String filePath, List<DeleteFile> 
deletes, Schema tableSc
     this.eqDeletes = eqDeleteBuilder.build();
     this.requiredSchema = fileProjection(tableSchema, requestedSchema, 
posDeletes, eqDeletes);
     this.posAccessor = 
requiredSchema.accessorForField(MetadataColumns.ROW_POSITION.fieldId());
+    this.hasColumnIsDeleted = 
requestedSchema.findField(MetadataColumns.IS_DELETED.fieldId()) != null;
+    this.columnIsDeletedPosition = 
requestedSchema.columns().indexOf(MetadataColumns.IS_DELETED);

Review Comment:
   This assumes query engines should be able to set a value in a row for a 
particular position. Seems reasonable but let us also think about alternatives.



##########
data/src/main/java/org/apache/iceberg/data/DeleteFilter.java:
##########
@@ -169,30 +176,23 @@ public CloseableIterable<T> 
findEqualityDeleteRows(CloseableIterable<T> records)
         .reduce(Predicate::or)
         .orElse(t -> false);
 
-    Filter<T> deletedRowsFilter = new Filter<T>() {
-      @Override
-      protected boolean shouldKeep(T item) {
-        return deletedRows.test(item);
-      }
-    };
-    return deletedRowsFilter.filter(records);
+    return CloseableIterable.filter(records, deletedRows);
   }
 
   private CloseableIterable<T> applyEqDeletes(CloseableIterable<T> records) {
-    // Predicate to test whether a row should be visible to user after 
applying equality deletions.
-    Predicate<T> remainingRows = applyEqDeletes().stream()
-        .map(Predicate::negate)
-        .reduce(Predicate::and)
-        .orElse(t -> true);
-
-    Filter<T> remainingRowsFilter = new Filter<T>() {
-      @Override
-      protected boolean shouldKeep(T item) {
-        return remainingRows.test(item);
-      }
-    };
+    Predicate<T> isEqDeleted = applyEqDeletes().stream()
+        .reduce(Predicate::or)
+        .orElse(t -> false);
 
-    return remainingRowsFilter.filter(records);
+    if (hasColumnIsDeleted) {
+      return Deletes.markDeleted(records, isEqDeleted, this::markRowDeleted);
+    } else {
+      return CloseableIterable.filter(records, isEqDeleted.negate());
+    }
+  }
+
+  protected void markRowDeleted(T item) {
+    throw new 
UnsupportedOperationException("GenericDeleteFilter.markRowDeleted() is not 
supported");

Review Comment:
   What about `this.getClass().getName() + " does not implement 
markRowDeleted"` to cover all potential implementations?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to