szehon-ho commented on code in PR #7539:
URL: https://github.com/apache/iceberg/pull/7539#discussion_r1191922266


##########
core/src/main/java/org/apache/iceberg/BaseEntriesTable.java:
##########
@@ -125,31 +130,120 @@ ManifestFile manifest() {
 
     @Override
     public CloseableIterable<StructLike> rows() {
-      // Project data-file fields
-      CloseableIterable<StructLike> prunedRows;
-      if (manifest.content() == ManifestContent.DATA) {
-        prunedRows =
+      Types.NestedField readableMetricsField = 
projection.findField(MetricsUtil.READABLE_METRICS);
+
+      if (readableMetricsField == null) {
+        CloseableIterable<StructLike> entryAsStruct =
             CloseableIterable.transform(
-                ManifestFiles.read(manifest, io).project(fileSchema).entries(),
-                file -> (GenericManifestEntry<DataFile>) file);
+                entries(fileProjection),
+                entry -> (GenericManifestEntry<? extends ContentFile<?>>) 
entry);
+
+        StructProjection structProjection = projectNonReadable(projection);
+        return CloseableIterable.transform(entryAsStruct, 
structProjection::wrap);
       } else {
-        prunedRows =
-            CloseableIterable.transform(
-                ManifestFiles.readDeleteManifest(manifest, io, specsById)
-                    .project(fileSchema)
-                    .entries(),
-                file -> (GenericManifestEntry<DeleteFile>) file);
+        Schema requiredFileProjection = requiredFileProjection();
+        Schema actualProjection = removeReadableMetrics(readableMetricsField);
+        StructProjection structProjection = 
projectNonReadable(actualProjection);
+
+        return CloseableIterable.transform(
+            entries(requiredFileProjection),
+            entry -> withReadableMetrics(structProjection, entry, 
readableMetricsField));
       }
+    }
+
+    /**
+     * Remove virtual columns from the file projection and ensure that the 
underlying metrics used
+     * to create those columns are part of the file projection
+     *
+     * @return file projection with required columns to read readable metrics
+     */
+    private Schema requiredFileProjection() {
+      Schema projectionForReadableMetrics =
+          new Schema(
+              MetricsUtil.READABLE_METRIC_COLS.stream()
+                  .map(MetricsUtil.ReadableMetricColDefinition::originalCol)
+                  .collect(Collectors.toList()));
+      return TypeUtil.join(fileProjection, projectionForReadableMetrics);
+    }
 
-      // Project non-readable fields
-      Schema readSchema = ManifestEntry.wrapFileSchema(fileSchema.asStruct());
-      StructProjection projection = StructProjection.create(readSchema, 
schema);
-      return CloseableIterable.transform(prunedRows, projection::wrap);
+    private Schema removeReadableMetrics(Types.NestedField 
readableMetricsField) {
+      Set<Integer> readableMetricsIds = 
TypeUtil.getProjectedIds(readableMetricsField.type());
+      return TypeUtil.selectNot(projection, readableMetricsIds);
+    }
+
+    private StructProjection projectNonReadable(Schema projectedSchema) {
+      Schema manifestEntrySchema = 
ManifestEntry.wrapFileSchema(fileProjection.asStruct());
+      return StructProjection.create(manifestEntrySchema, projectedSchema);
+    }
+
+    private CloseableIterable<? extends ManifestEntry<? extends 
ContentFile<?>>> entries(
+        Schema newFileProjection) {
+      return ManifestFiles.open(manifest, io, 
specsById).project(newFileProjection).entries();
+    }
+
+    private StructLike withReadableMetrics(
+        StructProjection structProjection,
+        ManifestEntry<? extends ContentFile<?>> entry,
+        Types.NestedField readableMetricsField) {
+      int projectionColumnCount = projection.columns().size();
+      int metricsPosition = projection.columns().indexOf(readableMetricsField);
+
+      StructProjection entryStruct = structProjection.wrap((StructLike) entry);
+
+      StructType projectedMetricType =
+          
projection.findField(MetricsUtil.READABLE_METRICS).type().asStructType();
+      MetricsUtil.ReadableMetricsStruct readableMetrics =
+          MetricsUtil.readableMetricsStruct(dataTableSchema, entry.file(), 
projectedMetricType);
+
+      return new ManifestEntryStructWithMetrics(
+          projectionColumnCount, metricsPosition, entryStruct, 
readableMetrics);
     }
 
     @Override
     public Iterable<FileScanTask> split(long splitSize) {
       return ImmutableList.of(this); // don't split
     }
   }
+
+  static class ManifestEntryStructWithMetrics implements StructLike {

Review Comment:
   For another pr, I think we can combine this and BaseFilesTable's similar 
class (StructProjection is instance of StructLike)



##########
core/src/main/java/org/apache/iceberg/BaseEntriesTable.java:
##########
@@ -125,31 +130,120 @@ ManifestFile manifest() {
 
     @Override
     public CloseableIterable<StructLike> rows() {
-      // Project data-file fields
-      CloseableIterable<StructLike> prunedRows;
-      if (manifest.content() == ManifestContent.DATA) {
-        prunedRows =
+      Types.NestedField readableMetricsField = 
projection.findField(MetricsUtil.READABLE_METRICS);
+
+      if (readableMetricsField == null) {
+        CloseableIterable<StructLike> entryAsStruct =
             CloseableIterable.transform(
-                ManifestFiles.read(manifest, io).project(fileSchema).entries(),
-                file -> (GenericManifestEntry<DataFile>) file);
+                entries(fileProjection),
+                entry -> (GenericManifestEntry<? extends ContentFile<?>>) 
entry);
+
+        StructProjection structProjection = projectNonReadable(projection);
+        return CloseableIterable.transform(entryAsStruct, 
structProjection::wrap);
       } else {
-        prunedRows =
-            CloseableIterable.transform(
-                ManifestFiles.readDeleteManifest(manifest, io, specsById)
-                    .project(fileSchema)
-                    .entries(),
-                file -> (GenericManifestEntry<DeleteFile>) file);
+        Schema requiredFileProjection = requiredFileProjection();
+        Schema actualProjection = removeReadableMetrics(readableMetricsField);
+        StructProjection structProjection = 
projectNonReadable(actualProjection);
+
+        return CloseableIterable.transform(
+            entries(requiredFileProjection),
+            entry -> withReadableMetrics(structProjection, entry, 
readableMetricsField));
       }
+    }
+
+    /**
+     * Remove virtual columns from the file projection and ensure that the 
underlying metrics used

Review Comment:
   Hey I'm sorry I missed this, can we remove this line "Remove virtual columns 
from the file projection and."  
   
   I guess this comment was rather for the other operation, but I dont think we 
need the comment anymore now that its on a self explanatory method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to