aokolnychyi commented on code in PR #5376:
URL: https://github.com/apache/iceberg/pull/5376#discussion_r944700255


##########
core/src/main/java/org/apache/iceberg/BaseFilesTable.java:
##########
@@ -143,33 +174,49 @@ static class ManifestReadTask extends BaseFileScanTask 
implements DataTask {
     private final FileIO io;
     private final Map<Integer, PartitionSpec> specsById;
     private final ManifestFile manifest;
-    private final Schema schema;
+    private final Schema dataTableSchema;
+    private final Schema filesTableSchema;
+    private final Schema projectedSchema;
+    private final Map<Integer, String> quotedNameById;
+    private final boolean isPartitioned;
 
     ManifestReadTask(
         Table table,
         ManifestFile manifest,
         Schema schema,
+        Schema projectedSchema,
         String schemaString,
         String specString,
         ResidualEvaluator residuals) {
       super(DataFiles.fromManifest(manifest), null, schemaString, specString, 
residuals);
       this.io = table.io();
       this.specsById = Maps.newHashMap(table.specs());
       this.manifest = manifest;
-      this.schema = schema;
+      this.filesTableSchema = schema;
+      this.projectedSchema = projectedSchema;
+      this.dataTableSchema = table.schema();
+      this.quotedNameById = 
TypeUtil.indexQuotedNameById(table.schema().asStruct(), name -> name);
+      this.isPartitioned = Partitioning.partitionType(table).fields().size() > 
0;
     }
 
     @Override
     public CloseableIterable<StructLike> rows() {
-      return CloseableIterable.transform(manifestEntries(), file -> 
(StructLike) file);
+      return CloseableIterable.transform(
+          manifestEntries(),
+          fileEntry ->
+              StaticDataTask.Row.of(
+                  projectedFields(fileEntry, 
accessors(isPartitioned)).toArray()));
     }
 
     private CloseableIterable<? extends ContentFile<?>> manifestEntries() {
+      Schema finalProjectedSchema =
+          TypeUtil.selectNot(filesTableSchema, 
TypeUtil.getProjectedIds(READABLE_METRICS.type()));

Review Comment:
   I think this broke file projection as we should not be using the full 
metadata schema. I believe `schema` previously referred to a projection.



##########
core/src/main/java/org/apache/iceberg/BaseFilesTable.java:
##########
@@ -18,25 +18,50 @@
  */
 package org.apache.iceberg;
 
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.LoadingCache;
+import java.util.List;
 import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.expressions.ManifestEvaluator;
 import org.apache.iceberg.expressions.ResidualEvaluator;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileIO;
 import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
 import org.apache.iceberg.types.Types.StructType;
 
 /** Base class logic for files metadata tables */
 abstract class BaseFilesTable extends BaseMetadataTable {
 
+  static final Types.StructType READABLE_METRICS_VALUE =
+      Types.StructType.of(
+          optional(303, "column_size", Types.LongType.get(), "Total size on 
disk"),
+          optional(304, "value_count", Types.LongType.get(), "Total count, 
including null and NaN"),
+          optional(305, "null_value_count", Types.LongType.get(), "Null value 
count"),
+          optional(306, "nan_value_count", Types.LongType.get(), "Nan value 
count"),
+          optional(307, "lower_bound", Types.StringType.get(), "Lower bound in 
string form"),
+          optional(308, "upper_bound", Types.StringType.get(), "Upper bound in 
string form"));
+
+  static final Types.NestedField READABLE_METRICS =
+      required(
+          300,

Review Comment:
   We start from 300 to avoid future conflicts with the file schema, right?



##########
core/src/main/java/org/apache/iceberg/MetricsUtil.java:
##########
@@ -56,4 +64,127 @@ public static MetricsModes.MetricsMode metricsMode(
     String columnName = inputSchema.findColumnName(fieldId);
     return metricsConfig.columnMode(columnName);
   }
+
+  /**
+   * Return a readable metrics map
+   *
+   * @param schema schema of original data table
+   * @param quotedNameById pre-computed map of all column ids in schema to 
readable name, see {@link
+   *     org.apache.iceberg.types.TypeUtil#indexQuotedNameById}
+   * @param columnSizes column size metrics
+   * @param valueCounts value count metrics
+   * @param nullValueCounts null value metrics
+   * @param nanValueCounts nan value metrics
+   * @param lowerBounds lower bound metrics
+   * @param upperBounds upper bound metrics
+   * @return map of readable column name to column metric, of which the bounds 
are made readable
+   */
+  public static Map<String, StructLike> readableMetricsMap(
+      Schema schema,
+      Map<Integer, String> quotedNameById,
+      Map<Integer, Long> columnSizes,

Review Comment:
   What about passing `ContentFile<?>` directly to reduce the number of 
arguments?



##########
core/src/main/java/org/apache/iceberg/BaseFilesTable.java:
##########
@@ -143,33 +174,49 @@ static class ManifestReadTask extends BaseFileScanTask 
implements DataTask {
     private final FileIO io;
     private final Map<Integer, PartitionSpec> specsById;
     private final ManifestFile manifest;
-    private final Schema schema;
+    private final Schema dataTableSchema;
+    private final Schema filesTableSchema;
+    private final Schema projectedSchema;
+    private final Map<Integer, String> quotedNameById;
+    private final boolean isPartitioned;
 
     ManifestReadTask(
         Table table,
         ManifestFile manifest,
         Schema schema,
+        Schema projectedSchema,
         String schemaString,
         String specString,
         ResidualEvaluator residuals) {
       super(DataFiles.fromManifest(manifest), null, schemaString, specString, 
residuals);
       this.io = table.io();
       this.specsById = Maps.newHashMap(table.specs());
       this.manifest = manifest;
-      this.schema = schema;
+      this.filesTableSchema = schema;
+      this.projectedSchema = projectedSchema;
+      this.dataTableSchema = table.schema();
+      this.quotedNameById = 
TypeUtil.indexQuotedNameById(table.schema().asStruct(), name -> name);

Review Comment:
   Do we actually need it since we are not quoting? Can't we use 
`schema.findColumnName(id)`?



##########
core/src/main/java/org/apache/iceberg/BaseFilesTable.java:
##########
@@ -185,5 +232,60 @@ public Iterable<FileScanTask> split(long splitSize) {
     ManifestFile manifest() {
       return manifest;
     }
+
+    private List<Function<ContentFile<?>, Object>> accessors(boolean 
partitioned) {

Review Comment:
   These two methods are hard to grasp. I think our goal is to wrap the struct 
we get back from reading manifests and attach metrics to it if they were 
projected. Let me think more on this one.



##########
core/src/main/java/org/apache/iceberg/MetricsUtil.java:
##########
@@ -18,14 +18,22 @@
  */
 package org.apache.iceberg;
 
+import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.transforms.Transforms;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Types;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class MetricsUtil {
 
+  private static final Logger logger = 
LoggerFactory.getLogger(MetricsUtil.class);

Review Comment:
   nit: `logger` -> `LOG` as it is a constant.



##########
core/src/main/java/org/apache/iceberg/BaseFilesTable.java:
##########
@@ -143,33 +174,49 @@ static class ManifestReadTask extends BaseFileScanTask 
implements DataTask {
     private final FileIO io;
     private final Map<Integer, PartitionSpec> specsById;
     private final ManifestFile manifest;
-    private final Schema schema;
+    private final Schema dataTableSchema;
+    private final Schema filesTableSchema;
+    private final Schema projectedSchema;
+    private final Map<Integer, String> quotedNameById;
+    private final boolean isPartitioned;
 
     ManifestReadTask(
         Table table,
         ManifestFile manifest,
         Schema schema,
+        Schema projectedSchema,
         String schemaString,
         String specString,
         ResidualEvaluator residuals) {
       super(DataFiles.fromManifest(manifest), null, schemaString, specString, 
residuals);
       this.io = table.io();
       this.specsById = Maps.newHashMap(table.specs());
       this.manifest = manifest;
-      this.schema = schema;
+      this.filesTableSchema = schema;
+      this.projectedSchema = projectedSchema;
+      this.dataTableSchema = table.schema();
+      this.quotedNameById = 
TypeUtil.indexQuotedNameById(table.schema().asStruct(), name -> name);

Review Comment:
   I am not sure it is a good idea to compute this for every manifest task as 
there may be a lot of columns. I don't think it will degrade the performance 
too much but will definitely generate some extra objects. I'd probably compute 
it once and reuse to avoid surprises.



##########
core/src/main/java/org/apache/iceberg/BaseFilesTable.java:
##########
@@ -18,25 +18,50 @@
  */
 package org.apache.iceberg;
 
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.LoadingCache;
+import java.util.List;
 import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.expressions.ManifestEvaluator;
 import org.apache.iceberg.expressions.ResidualEvaluator;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileIO;
 import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
 import org.apache.iceberg.types.Types.StructType;
 
 /** Base class logic for files metadata tables */
 abstract class BaseFilesTable extends BaseMetadataTable {
 
+  static final Types.StructType READABLE_METRICS_VALUE =
+      Types.StructType.of(
+          optional(303, "column_size", Types.LongType.get(), "Total size on 
disk"),
+          optional(304, "value_count", Types.LongType.get(), "Total count, 
including null and NaN"),
+          optional(305, "null_value_count", Types.LongType.get(), "Null value 
count"),
+          optional(306, "nan_value_count", Types.LongType.get(), "Nan value 
count"),

Review Comment:
   nit: `Nan` -> `NaN`



##########
core/src/main/java/org/apache/iceberg/BaseFilesTable.java:
##########
@@ -143,33 +174,49 @@ static class ManifestReadTask extends BaseFileScanTask 
implements DataTask {
     private final FileIO io;
     private final Map<Integer, PartitionSpec> specsById;
     private final ManifestFile manifest;
-    private final Schema schema;
+    private final Schema dataTableSchema;
+    private final Schema filesTableSchema;
+    private final Schema projectedSchema;
+    private final Map<Integer, String> quotedNameById;
+    private final boolean isPartitioned;
 
     ManifestReadTask(
         Table table,
         ManifestFile manifest,
         Schema schema,
+        Schema projectedSchema,
         String schemaString,
         String specString,
         ResidualEvaluator residuals) {
       super(DataFiles.fromManifest(manifest), null, schemaString, specString, 
residuals);
       this.io = table.io();
       this.specsById = Maps.newHashMap(table.specs());
       this.manifest = manifest;
-      this.schema = schema;
+      this.filesTableSchema = schema;
+      this.projectedSchema = projectedSchema;
+      this.dataTableSchema = table.schema();
+      this.quotedNameById = 
TypeUtil.indexQuotedNameById(table.schema().asStruct(), name -> name);
+      this.isPartitioned = Partitioning.partitionType(table).fields().size() > 
0;

Review Comment:
   Can we check if the schema contains `DataFile.PARTITION_ID` instead of 
creating a common partition type? I think we constructed this common partition 
type earlier and included/excluded `DataFile.PARTITION_ID` based on that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to