aokolnychyi commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r918441075


##########
core/src/main/java/org/apache/iceberg/DataFiles.java:
##########
@@ -108,6 +108,31 @@ public static DataFile fromManifest(ManifestFile manifest) 
{
         .build();
   }
 
+  public static DataFile fromPositionDelete(DeleteFile deleteFile, 
PartitionSpec spec) {

Review Comment:
   Does this have to be specific to position deletes? Will constructing 
`DataFile` from an equality delete file be any different?



##########
core/src/main/java/org/apache/iceberg/DataFiles.java:
##########
@@ -108,6 +108,31 @@ public static DataFile fromManifest(ManifestFile manifest) 
{
         .build();
   }
 
+  public static DataFile fromPositionDelete(DeleteFile deleteFile, 
PartitionSpec spec) {
+    
Preconditions.checkArgument(deleteFile.content().equals(FileContent.POSITION_DELETES),
+        "Expected positional delete file, found %s", deleteFile.content());
+    Metrics metrics = new Metrics(
+        deleteFile.recordCount(),
+        deleteFile.columnSizes(),
+        deleteFile.valueCounts(),
+        deleteFile.nullValueCounts(),
+        deleteFile.nanValueCounts(),
+        deleteFile.lowerBounds(),
+        deleteFile.upperBounds()
+    );
+
+    return DataFiles.builder(spec)
+        .withEncryptionKeyMetadata(deleteFile.keyMetadata())
+        .withFormat(deleteFile.format())
+        .withFileSizeInBytes(deleteFile.fileSizeInBytes())
+        .withPath(deleteFile.path().toString())
+        .withPartition(deleteFile.partition())
+        .withRecordCount(deleteFile.recordCount())
+        .withSortOrder(SortOrder.unsorted())

Review Comment:
   Shall we propagate the sort order? We can add `withSortOrderId` to the 
builder.



##########
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Map;
+import org.apache.iceberg.io.DeleteSchemaUtil;
+
+public class PositionDeletesTable extends BaseMetadataTable {
+
+  PositionDeletesTable(TableOperations ops, Table table) {
+    super(ops, table, table.name() + ".position_deletes");
+  }
+
+  PositionDeletesTable(TableOperations ops, Table table, String name) {
+    super(ops, table, name);
+  }
+
+  @Override
+  MetadataTableType metadataTableType() {
+    return MetadataTableType.POSITION_DELETES;
+  }
+
+  @Override
+  public TableScan newScan() {
+    return new PositionDeletesTableScan(operations(), table(), schema());
+  }
+
+  @Override
+  public Schema schema() {
+    return DeleteSchemaUtil.metadataTableSchema(table());
+  }
+
+  @Override
+  public PartitionSpec spec() {
+    return table().spec();

Review Comment:
   I am not sure this is correct. All other metadata tables are unpartitioned 
and there is logic in the scan builder that assumes all metadata tables are 
unpartitioned. Do we need to make this one partitioned for a reason?



##########
core/src/main/java/org/apache/iceberg/PositionDeletesTableScan.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.ParallelIterable;
+import org.apache.iceberg.util.PropertyUtil;
+
+public class PositionDeletesTableScan extends BaseMetadataTableScan {
+
+  public PositionDeletesTableScan(TableOperations ops, Table table, Schema 
schema) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES);
+  }
+
+  PositionDeletesTableScan(TableOperations ops, Table table, Schema schema, 
TableScanContext context) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES, context);
+  }
+
+  @Override
+  protected TableScan newRefinedScan(TableOperations ops, Table table, Schema 
schema, TableScanContext context) {
+    return new PositionDeletesTableScan(ops, table, schema, context);
+  }
+
+  @Override
+  protected CloseableIterable<FileScanTask> doPlanFiles() {
+    Expression rowFilter = context().rowFilter();
+    String schemaString = SchemaParser.toJson(tableSchema());
+    boolean ignoreResiduals = context().ignoreResiduals();
+    Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;

Review Comment:
   nit: use shouldIgnoreResiduals() directly?
   
   ```
   Expression filter = shouldIgnoreResiduals() ? Expressions.alwaysTrue() : 
rowFilter;  
   ```



##########
core/src/main/java/org/apache/iceberg/PositionDeletesTableScan.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.ParallelIterable;
+import org.apache.iceberg.util.PropertyUtil;
+
+public class PositionDeletesTableScan extends BaseMetadataTableScan {
+
+  public PositionDeletesTableScan(TableOperations ops, Table table, Schema 
schema) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES);
+  }
+
+  PositionDeletesTableScan(TableOperations ops, Table table, Schema schema, 
TableScanContext context) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES, context);
+  }
+
+  @Override
+  protected TableScan newRefinedScan(TableOperations ops, Table table, Schema 
schema, TableScanContext context) {
+    return new PositionDeletesTableScan(ops, table, schema, context);
+  }
+
+  @Override
+  protected CloseableIterable<FileScanTask> doPlanFiles() {
+    Expression rowFilter = context().rowFilter();
+    String schemaString = SchemaParser.toJson(tableSchema());
+    boolean ignoreResiduals = context().ignoreResiduals();
+    Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
+
+    Map<Integer, PartitionSpec> transformedSpecs = table().specs()
+        .entrySet()
+        .stream()
+        .map(e -> Pair.of(e.getKey(), 
BaseMetadataTable.transformSpec(tableSchema(), e.getValue())))
+        .collect(Collectors.toMap(Pair::first, Pair::second));
+
+    CloseableIterable<ManifestFile> deleteManifests = 
CloseableIterable.withNoopClose(
+        snapshot().deleteManifests(tableOps().io()));
+    CloseableIterable<CloseableIterable<FileScanTask>> results = 
CloseableIterable.transform(deleteManifests, m -> {
+
+      // Filter partitions
+      CloseableIterable<ManifestEntry<DeleteFile>> deleteFileEntries = 
ManifestFiles

Review Comment:
   Respect case sensitivity configured for this scan in `isCaseSensitive()`?



##########
core/src/main/java/org/apache/iceberg/PositionDeletesTableScan.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.ParallelIterable;
+import org.apache.iceberg.util.PropertyUtil;
+
+public class PositionDeletesTableScan extends BaseMetadataTableScan {
+
+  public PositionDeletesTableScan(TableOperations ops, Table table, Schema 
schema) {

Review Comment:
   nit: can be package-private?



##########
core/src/main/java/org/apache/iceberg/BaseMetadataTable.java:
##########
@@ -60,8 +61,12 @@ protected BaseMetadataTable(TableOperations ops, Table 
table, String name) {
    * @return a spec used to rewrite the metadata table filters to partition 
filters using an inclusive projection
    */
   static PartitionSpec transformSpec(Schema metadataTableSchema, PartitionSpec 
spec) {
+    return transformSpec(metadataTableSchema, spec.partitionType());
+  }
+
+  static PartitionSpec transformSpec(Schema metadataTableSchema, 
Types.StructType partitionType) {

Review Comment:
   Is there a reason to add an overloaded method that accepts `StructType`?
   Seems like it is not being used right now.



##########
core/src/main/java/org/apache/iceberg/io/DeleteSchemaUtil.java:
##########
@@ -20,13 +20,38 @@
 package org.apache.iceberg.io;
 
 import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Partitioning;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.types.Types;
 
 public class DeleteSchemaUtil {
   private DeleteSchemaUtil() {
   }
 
+  public static Schema metadataTableSchema(Table table) {

Review Comment:
   Are we sure this logic belongs here? Up to you but `PositionDeletesTable` 
seemed more appropriate.



##########
core/src/main/java/org/apache/iceberg/PositionDeletesTableScan.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.ParallelIterable;
+import org.apache.iceberg.util.PropertyUtil;
+
+public class PositionDeletesTableScan extends BaseMetadataTableScan {
+
+  public PositionDeletesTableScan(TableOperations ops, Table table, Schema 
schema) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES);
+  }
+
+  PositionDeletesTableScan(TableOperations ops, Table table, Schema schema, 
TableScanContext context) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES, context);
+  }
+
+  @Override
+  protected TableScan newRefinedScan(TableOperations ops, Table table, Schema 
schema, TableScanContext context) {
+    return new PositionDeletesTableScan(ops, table, schema, context);
+  }
+
+  @Override
+  protected CloseableIterable<FileScanTask> doPlanFiles() {
+    Expression rowFilter = context().rowFilter();
+    String schemaString = SchemaParser.toJson(tableSchema());
+    boolean ignoreResiduals = context().ignoreResiduals();
+    Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
+
+    Map<Integer, PartitionSpec> transformedSpecs = table().specs()
+        .entrySet()
+        .stream()
+        .map(e -> Pair.of(e.getKey(), 
BaseMetadataTable.transformSpec(tableSchema(), e.getValue())))
+        .collect(Collectors.toMap(Pair::first, Pair::second));
+
+    CloseableIterable<ManifestFile> deleteManifests = 
CloseableIterable.withNoopClose(
+        snapshot().deleteManifests(tableOps().io()));
+    CloseableIterable<CloseableIterable<FileScanTask>> results = 
CloseableIterable.transform(deleteManifests, m -> {
+
+      // Filter partitions
+      CloseableIterable<ManifestEntry<DeleteFile>> deleteFileEntries = 
ManifestFiles

Review Comment:
   What about projection?



##########
core/src/main/java/org/apache/iceberg/PositionDeletesTableScan.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.ParallelIterable;
+import org.apache.iceberg.util.PropertyUtil;
+
+public class PositionDeletesTableScan extends BaseMetadataTableScan {

Review Comment:
   Why not make it a nested class like in other metadata tables? Because it is 
too big?
   Just out of curiosity.



##########
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Map;
+import org.apache.iceberg.io.DeleteSchemaUtil;
+
+public class PositionDeletesTable extends BaseMetadataTable {
+
+  PositionDeletesTable(TableOperations ops, Table table) {
+    super(ops, table, table.name() + ".position_deletes");
+  }
+
+  PositionDeletesTable(TableOperations ops, Table table, String name) {
+    super(ops, table, name);
+  }
+
+  @Override
+  MetadataTableType metadataTableType() {
+    return MetadataTableType.POSITION_DELETES;
+  }
+
+  @Override
+  public TableScan newScan() {
+    return new PositionDeletesTableScan(operations(), table(), schema());
+  }
+
+  @Override
+  public Schema schema() {
+    return DeleteSchemaUtil.metadataTableSchema(table());
+  }
+
+  @Override
+  public PartitionSpec spec() {
+    return table().spec();
+  }
+
+  @Override
+  public Map<Integer, PartitionSpec> specs() {

Review Comment:
   Same here.



##########
core/src/main/java/org/apache/iceberg/io/DeleteSchemaUtil.java:
##########
@@ -20,13 +20,38 @@
 package org.apache.iceberg.io;
 
 import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Partitioning;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.types.Types;
 
 public class DeleteSchemaUtil {
   private DeleteSchemaUtil() {
   }
 
+  public static Schema metadataTableSchema(Table table) {
+    return metadataTableSchema(table, Partitioning.partitionType(table));
+  }
+
+  public static Schema metadataTableSchema(Table table, Types.StructType 
partitionType) {
+    Schema result = new Schema(
+        MetadataColumns.DELETE_FILE_PATH,
+        MetadataColumns.DELETE_FILE_POS,
+        Types.NestedField.optional(
+            MetadataColumns.DELETE_FILE_ROW_FIELD_ID, "row", 
table.schema().asStruct(),
+            MetadataColumns.DELETE_FILE_ROW_DOC));
+
+    if (partitionType.fields().size() > 0) {

Review Comment:
   I feel like using `TypeUtil.selectNot` will make it easier to read:
   
   ```
   Types.StructType partitionType = Partitioning.partitionType(table);
   Schema schema = new Schema(...);
   
   if (partitionType.fields().size() < 1) {
     // avoid returning an empty struct, which is not always supported. 
instead, drop the partition field
     return TypeUtil.selectNot(schema, Sets.newHashSet(PARTITION_ID));
   } else {
     return schema;
   }
   ```



##########
core/src/main/java/org/apache/iceberg/PositionDeletesTableScan.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.ParallelIterable;
+import org.apache.iceberg.util.PropertyUtil;
+
+public class PositionDeletesTableScan extends BaseMetadataTableScan {
+
+  public PositionDeletesTableScan(TableOperations ops, Table table, Schema 
schema) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES);
+  }
+
+  PositionDeletesTableScan(TableOperations ops, Table table, Schema schema, 
TableScanContext context) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES, context);
+  }
+
+  @Override
+  protected TableScan newRefinedScan(TableOperations ops, Table table, Schema 
schema, TableScanContext context) {
+    return new PositionDeletesTableScan(ops, table, schema, context);
+  }
+
+  @Override
+  protected CloseableIterable<FileScanTask> doPlanFiles() {
+    Expression rowFilter = context().rowFilter();
+    String schemaString = SchemaParser.toJson(tableSchema());
+    boolean ignoreResiduals = context().ignoreResiduals();
+    Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
+
+    Map<Integer, PartitionSpec> transformedSpecs = table().specs()
+        .entrySet()
+        .stream()
+        .map(e -> Pair.of(e.getKey(), 
BaseMetadataTable.transformSpec(tableSchema(), e.getValue())))
+        .collect(Collectors.toMap(Pair::first, Pair::second));
+
+    CloseableIterable<ManifestFile> deleteManifests = 
CloseableIterable.withNoopClose(
+        snapshot().deleteManifests(tableOps().io()));
+    CloseableIterable<CloseableIterable<FileScanTask>> results = 
CloseableIterable.transform(deleteManifests, m -> {
+
+      // Filter partitions
+      CloseableIterable<ManifestEntry<DeleteFile>> deleteFileEntries = 
ManifestFiles
+          .readDeleteManifest(m, tableOps().io(), transformedSpecs)
+          .filterRows(rowFilter)
+          .liveEntries();
+
+      // Filter delete file type
+      CloseableIterable<ManifestEntry<DeleteFile>> positionDeleteEntries = 
CloseableIterable.filter(deleteFileEntries,
+          entry -> 
entry.file().content().equals(FileContent.POSITION_DELETES));
+
+      return CloseableIterable.transform(positionDeleteEntries, entry -> {
+        PartitionSpec spec = transformedSpecs.get(entry.file().specId());
+        ResidualEvaluator residuals = ResidualEvaluator.of(spec, filter, 
context().caseSensitive());

Review Comment:
   nit: `context().caseSensitive()` -> `isCaseSensitive()`



##########
core/src/main/java/org/apache/iceberg/io/DeleteSchemaUtil.java:
##########
@@ -20,13 +20,38 @@
 package org.apache.iceberg.io;
 
 import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Partitioning;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.types.Types;
 
 public class DeleteSchemaUtil {
   private DeleteSchemaUtil() {
   }
 
+  public static Schema metadataTableSchema(Table table) {
+    return metadataTableSchema(table, Partitioning.partitionType(table));
+  }
+
+  public static Schema metadataTableSchema(Table table, Types.StructType 
partitionType) {
+    Schema result = new Schema(
+        MetadataColumns.DELETE_FILE_PATH,

Review Comment:
   Are these real metadata columns or just columns that happen to have the same 
name? I don't think we should reuse column IDs as metadata columns are using 
reserved ones.



##########
core/src/main/java/org/apache/iceberg/PositionDeletesTableScan.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.ParallelIterable;
+import org.apache.iceberg.util.PropertyUtil;
+
+public class PositionDeletesTableScan extends BaseMetadataTableScan {
+
+  public PositionDeletesTableScan(TableOperations ops, Table table, Schema 
schema) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES);
+  }
+
+  PositionDeletesTableScan(TableOperations ops, Table table, Schema schema, 
TableScanContext context) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES, context);
+  }
+
+  @Override
+  protected TableScan newRefinedScan(TableOperations ops, Table table, Schema 
schema, TableScanContext context) {
+    return new PositionDeletesTableScan(ops, table, schema, context);
+  }
+
+  @Override
+  protected CloseableIterable<FileScanTask> doPlanFiles() {
+    Expression rowFilter = context().rowFilter();
+    String schemaString = SchemaParser.toJson(tableSchema());
+    boolean ignoreResiduals = context().ignoreResiduals();
+    Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
+
+    Map<Integer, PartitionSpec> transformedSpecs = table().specs()
+        .entrySet()
+        .stream()
+        .map(e -> Pair.of(e.getKey(), 
BaseMetadataTable.transformSpec(tableSchema(), e.getValue())))
+        .collect(Collectors.toMap(Pair::first, Pair::second));
+
+    CloseableIterable<ManifestFile> deleteManifests = 
CloseableIterable.withNoopClose(
+        snapshot().deleteManifests(tableOps().io()));
+    CloseableIterable<CloseableIterable<FileScanTask>> results = 
CloseableIterable.transform(deleteManifests, m -> {
+
+      // Filter partitions
+      CloseableIterable<ManifestEntry<DeleteFile>> deleteFileEntries = 
ManifestFiles
+          .readDeleteManifest(m, tableOps().io(), transformedSpecs)
+          .filterRows(rowFilter)
+          .liveEntries();
+
+      // Filter delete file type
+      CloseableIterable<ManifestEntry<DeleteFile>> positionDeleteEntries = 
CloseableIterable.filter(deleteFileEntries,
+          entry -> 
entry.file().content().equals(FileContent.POSITION_DELETES));
+
+      return CloseableIterable.transform(positionDeleteEntries, entry -> {
+        PartitionSpec spec = transformedSpecs.get(entry.file().specId());
+        ResidualEvaluator residuals = ResidualEvaluator.of(spec, filter, 
context().caseSensitive());

Review Comment:
   Seems like we are doing repetitive work here for each task/spec.
   Some sort of `LoadingCache<Integer, ResidualEvaluator> residualCache` would 
make sense here like in a few other places. 



##########
core/src/main/java/org/apache/iceberg/PositionDeletesTableScan.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.util.PropertyUtil;
+
+public class PositionDeletesTableScan extends BaseMetadataTableScan {
+
+  public PositionDeletesTableScan(TableOperations ops, Table table, Schema 
schema) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES);
+  }
+
+  PositionDeletesTableScan(TableOperations ops, Table table, Schema schema, 
TableScanContext context) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES, context);
+  }
+
+  @Override
+  protected TableScan newRefinedScan(TableOperations ops, Table table, Schema 
schema, TableScanContext context) {
+    return new PositionDeletesTableScan(ops, table, schema, context);
+  }
+
+  @Override
+  protected CloseableIterable<FileScanTask> doPlanFiles() {
+    // TODO- add partition column and implement filter push down
+    boolean ignoreResiduals = context().ignoreResiduals();
+    String schemaString = SchemaParser.toJson(tableSchema());
+    Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : 
context().rowFilter();
+    ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
+    DeleteFileIndex deleteIndex = DeleteFileIndex.builderFor(tableOps().io(), 
snapshot().deleteManifests())
+        .specsById(table().specs())
+        .build();
+
+    CloseableIterable<DeleteFile> deleteFiles = 
CloseableIterable.withNoopClose(deleteIndex.referencedDeleteFiles());
+    CloseableIterable<DeleteFile> positionDeleteFiles = 
CloseableIterable.filter(deleteFiles,
+        df -> df.content().equals(FileContent.POSITION_DELETES));
+    return CloseableIterable.transform(positionDeleteFiles, f -> {
+      PartitionSpec spec = table().specs().get(f.specId());
+      String specString = PartitionSpecParser.toJson(spec);
+      return new BaseFileScanTask(DataFiles.fromPositionDelete(f, spec),
+          null, /* Deletes */
+          schemaString,
+          specString,
+          residuals);
+    });
+  }
+
+  @Override
+  public long targetSplitSize() {
+    long tableValue = tableOps().current().propertyAsLong(

Review Comment:
   Hm, I'd be surprised if a metadata table does not respect the metadata split 
size.



##########
core/src/main/java/org/apache/iceberg/BaseMetadataTable.java:
##########
@@ -60,8 +61,12 @@ protected BaseMetadataTable(TableOperations ops, Table 
table, String name) {
    * @return a spec used to rewrite the metadata table filters to partition 
filters using an inclusive projection
    */
   static PartitionSpec transformSpec(Schema metadataTableSchema, PartitionSpec 
spec) {
+    return transformSpec(metadataTableSchema, spec.partitionType());
+  }
+
+  static PartitionSpec transformSpec(Schema metadataTableSchema, 
Types.StructType partitionType) {
     PartitionSpec.Builder identitySpecBuilder = 
PartitionSpec.builderFor(metadataTableSchema).checkConflicts(false);
-    spec.fields().forEach(pf -> identitySpecBuilder.add(pf.fieldId(), 
pf.name(), "identity"));
+    partitionType.fields().forEach(pf -> identitySpecBuilder.add(pf.fieldId(), 
pf.fieldId(), pf.name(), "identity"));

Review Comment:
   Was the switch from `spec.fields()` to `spec.partitionType().fields()` 
deliberate?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to