aokolnychyi commented on code in PR #15297:
URL: https://github.com/apache/iceberg/pull/15297#discussion_r2794465996


##########
spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java:
##########
@@ -320,74 +319,63 @@ private boolean 
metricsModeSupportsAggregatePushDown(List<BoundAggregate<?, ?>>
 
   @Override
   public void pruneColumns(StructType requestedSchema) {
-    StructType requestedProjection =
-        new StructType(
-            Stream.of(requestedSchema.fields())
-                .filter(field -> 
MetadataColumns.nonMetadataColumn(field.name()))
-                .toArray(StructField[]::new));
-
-    // the projection should include all columns that will be returned, 
including those only used in
-    // filters
-    this.schema =
-        SparkSchemaUtil.prune(schema, requestedProjection, filterExpression(), 
caseSensitive);
-
-    Stream.of(requestedSchema.fields())
-        .map(StructField::name)
-        .filter(MetadataColumns::isMetadataColumn)
-        .distinct()
-        .forEach(metaColumns::add);
-  }
-
-  private Schema schemaWithMetadataColumns() {
-    // metadata columns
-    List<Types.NestedField> metadataFields =
-        metaColumns.stream()
-            .distinct()
-            .map(name -> MetadataColumns.metadataColumn(table, name))
-            .collect(Collectors.toList());
-    Schema metadataSchema = calculateMetadataSchema(metadataFields);
-
-    // schema or rows returned by readers
-    return TypeUtil.join(schema, metadataSchema);
-  }
-
-  private Schema calculateMetadataSchema(List<Types.NestedField> 
metaColumnFields) {
-    Optional<Types.NestedField> partitionField =
-        metaColumnFields.stream()
-            .filter(f -> MetadataColumns.PARTITION_COLUMN_ID == f.fieldId())
-            .findFirst();
-
-    // only calculate potential column id collision if partition metadata 
column was requested
-    if (!partitionField.isPresent()) {
-      return new Schema(metaColumnFields);
-    }
-
-    Set<Integer> idsToReassign =
-        
TypeUtil.indexById(partitionField.get().type().asStructType()).keySet();
-
-    // Calculate used ids by union metadata columns with all base table schemas
-    Set<Integer> currentlyUsedIds =
-        
metaColumnFields.stream().map(Types.NestedField::fieldId).collect(Collectors.toSet());
-    Set<Integer> allUsedIds =
-        table.schemas().values().stream()
-            .map(currSchema -> 
TypeUtil.indexById(currSchema.asStruct()).keySet())
-            .reduce(currentlyUsedIds, Sets::union);
-
-    // Reassign selected ids to deduplicate with used ids.
-    AtomicInteger nextId = new AtomicInteger();
-    return new Schema(
-        metaColumnFields,
-        ImmutableSet.of(),
-        oldId -> {
-          if (!idsToReassign.contains(oldId)) {
-            return oldId;
-          }
-          int candidate = nextId.incrementAndGet();
-          while (allUsedIds.contains(candidate)) {
-            candidate = nextId.incrementAndGet();
-          }
-          return candidate;
-        });
+    List<StructField> dataFields = Lists.newArrayList();
+
+    for (StructField field : requestedSchema.fields()) {
+      if (MetadataColumns.isMetadataColumn(field.name())) {
+        metaFieldNames.add(field.name());
+      } else {
+        dataFields.add(field);
+      }
+    }
+
+    StructType requestedProjection = SparkSchemaUtil.toStructType(dataFields);
+    this.projection = prune(projection, requestedProjection);
+  }
+
+  // the projection should include all columns that will be returned,
+  // including those only used in filters
+  private Schema prune(Schema schema, StructType requestedSchema) {
+    return SparkSchemaUtil.prune(schema, requestedSchema, filterExpression(), 
caseSensitive);
+  }
+
+  // schema of rows that must be returned by readers
+  protected Schema projectionWithMetadataColumns() {
+    return TypeUtil.join(projection, calculateMetadataSchema());
+  }
+
+  // computes metadata schema avoiding conflicts between partition and data 
field IDs
+  private Schema calculateMetadataSchema() {
+    List<Types.NestedField> metaFields = metaFields();
+    Optional<Types.NestedField> partitionField = 
findPartitionField(metaFields);
+
+    if (partitionField.isEmpty()) {
+      return new Schema(metaFields);
+    }
+
+    Types.StructType partitionType = 
partitionField.get().type().asStructType();
+    Set<Integer> partitionFieldIds = TypeUtil.getProjectedIds(partitionType);
+    GetID getId = TypeUtil.reassignConflictingIds(partitionFieldIds, 
allUsedFieldIds());
+    return new Schema(metaFields, getId);
+  }
+
+  private List<Types.NestedField> metaFields() {
+    return metaFieldNames.stream()
+        .map(name -> MetadataColumns.metadataColumn(table, name))
+        .collect(Collectors.toList());
+  }
+
+  private Optional<Types.NestedField> 
findPartitionField(List<Types.NestedField> fields) {
+    return fields.stream()
+        .filter(field -> MetadataColumns.PARTITION_COLUMN_ID == 
field.fieldId())
+        .findFirst();
+  }
+
+  // collects used data field IDs across all known table schemas
+  private Set<Integer> allUsedFieldIds() {

Review Comment:
   We don't need to track used metadata column IDs here. They start from end of 
INT range and can't conflict by definition. If they do, something is 
fundamentally wrong.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to