aokolnychyi commented on code in PR #5376:
URL: https://github.com/apache/iceberg/pull/5376#discussion_r944700255
##########
core/src/main/java/org/apache/iceberg/BaseFilesTable.java:
##########
@@ -143,33 +174,49 @@ static class ManifestReadTask extends BaseFileScanTask
implements DataTask {
private final FileIO io;
private final Map<Integer, PartitionSpec> specsById;
private final ManifestFile manifest;
- private final Schema schema;
+ private final Schema dataTableSchema;
+ private final Schema filesTableSchema;
+ private final Schema projectedSchema;
+ private final Map<Integer, String> quotedNameById;
+ private final boolean isPartitioned;
ManifestReadTask(
Table table,
ManifestFile manifest,
Schema schema,
+ Schema projectedSchema,
String schemaString,
String specString,
ResidualEvaluator residuals) {
super(DataFiles.fromManifest(manifest), null, schemaString, specString,
residuals);
this.io = table.io();
this.specsById = Maps.newHashMap(table.specs());
this.manifest = manifest;
- this.schema = schema;
+ this.filesTableSchema = schema;
+ this.projectedSchema = projectedSchema;
+ this.dataTableSchema = table.schema();
+ this.quotedNameById =
TypeUtil.indexQuotedNameById(table.schema().asStruct(), name -> name);
+ this.isPartitioned = Partitioning.partitionType(table).fields().size() >
0;
}
@Override
public CloseableIterable<StructLike> rows() {
- return CloseableIterable.transform(manifestEntries(), file ->
(StructLike) file);
+ return CloseableIterable.transform(
+ manifestEntries(),
+ fileEntry ->
+ StaticDataTask.Row.of(
+ projectedFields(fileEntry,
accessors(isPartitioned)).toArray()));
}
private CloseableIterable<? extends ContentFile<?>> manifestEntries() {
+ Schema finalProjectedSchema =
+ TypeUtil.selectNot(filesTableSchema,
TypeUtil.getProjectedIds(READABLE_METRICS.type()));
Review Comment:
I think this broke file projection as we should not be using the full
metadata schema. I believe `schema` previously referred to a projection.
##########
core/src/main/java/org/apache/iceberg/BaseFilesTable.java:
##########
@@ -18,25 +18,50 @@
*/
package org.apache.iceberg;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
+import java.util.List;
import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.ManifestEvaluator;
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.StructType;
/** Base class logic for files metadata tables */
abstract class BaseFilesTable extends BaseMetadataTable {
+ static final Types.StructType READABLE_METRICS_VALUE =
+ Types.StructType.of(
+ optional(303, "column_size", Types.LongType.get(), "Total size on
disk"),
+ optional(304, "value_count", Types.LongType.get(), "Total count,
including null and NaN"),
+ optional(305, "null_value_count", Types.LongType.get(), "Null value
count"),
+ optional(306, "nan_value_count", Types.LongType.get(), "Nan value
count"),
+ optional(307, "lower_bound", Types.StringType.get(), "Lower bound in
string form"),
+ optional(308, "upper_bound", Types.StringType.get(), "Upper bound in
string form"));
+
+ static final Types.NestedField READABLE_METRICS =
+ required(
+ 300,
Review Comment:
We start from 300 to avoid future conflicts with the file schema, right?
##########
core/src/main/java/org/apache/iceberg/MetricsUtil.java:
##########
@@ -56,4 +64,127 @@ public static MetricsModes.MetricsMode metricsMode(
String columnName = inputSchema.findColumnName(fieldId);
return metricsConfig.columnMode(columnName);
}
+
+ /**
+ * Return a readable metrics map
+ *
+ * @param schema schema of original data table
+ * @param quotedNameById pre-computed map of all column ids in schema to
readable name, see {@link
+ * org.apache.iceberg.types.TypeUtil#indexQuotedNameById}
+ * @param columnSizes column size metrics
+ * @param valueCounts value count metrics
+ * @param nullValueCounts null value metrics
+ * @param nanValueCounts nan value metrics
+ * @param lowerBounds lower bound metrics
+ * @param upperBounds upper bound metrics
+ * @return map of readable column name to column metric, of which the bounds
are made readable
+ */
+ public static Map<String, StructLike> readableMetricsMap(
+ Schema schema,
+ Map<Integer, String> quotedNameById,
+ Map<Integer, Long> columnSizes,
Review Comment:
What about passing `ContentFile<?>` directly to reduce the number of
arguments?
##########
core/src/main/java/org/apache/iceberg/BaseFilesTable.java:
##########
@@ -143,33 +174,49 @@ static class ManifestReadTask extends BaseFileScanTask
implements DataTask {
private final FileIO io;
private final Map<Integer, PartitionSpec> specsById;
private final ManifestFile manifest;
- private final Schema schema;
+ private final Schema dataTableSchema;
+ private final Schema filesTableSchema;
+ private final Schema projectedSchema;
+ private final Map<Integer, String> quotedNameById;
+ private final boolean isPartitioned;
ManifestReadTask(
Table table,
ManifestFile manifest,
Schema schema,
+ Schema projectedSchema,
String schemaString,
String specString,
ResidualEvaluator residuals) {
super(DataFiles.fromManifest(manifest), null, schemaString, specString,
residuals);
this.io = table.io();
this.specsById = Maps.newHashMap(table.specs());
this.manifest = manifest;
- this.schema = schema;
+ this.filesTableSchema = schema;
+ this.projectedSchema = projectedSchema;
+ this.dataTableSchema = table.schema();
+ this.quotedNameById =
TypeUtil.indexQuotedNameById(table.schema().asStruct(), name -> name);
Review Comment:
Do we actually need it since we are not quoting? Can't we use
`schema.findColumnName(id)`?
##########
core/src/main/java/org/apache/iceberg/BaseFilesTable.java:
##########
@@ -185,5 +232,60 @@ public Iterable<FileScanTask> split(long splitSize) {
ManifestFile manifest() {
return manifest;
}
+
+ private List<Function<ContentFile<?>, Object>> accessors(boolean
partitioned) {
Review Comment:
These two methods are hard to grasp. I think our goal is to wrap the struct
we get back from reading manifests and attach metrics to it if they were
projected. Let me think more on this one.
##########
core/src/main/java/org/apache/iceberg/MetricsUtil.java:
##########
@@ -18,14 +18,22 @@
*/
package org.apache.iceberg;
+import java.nio.ByteBuffer;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.transforms.Transforms;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Types;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class MetricsUtil {
+ private static final Logger logger =
LoggerFactory.getLogger(MetricsUtil.class);
Review Comment:
nit: `logger` -> `LOG` as it is a constant.
##########
core/src/main/java/org/apache/iceberg/BaseFilesTable.java:
##########
@@ -143,33 +174,49 @@ static class ManifestReadTask extends BaseFileScanTask
implements DataTask {
private final FileIO io;
private final Map<Integer, PartitionSpec> specsById;
private final ManifestFile manifest;
- private final Schema schema;
+ private final Schema dataTableSchema;
+ private final Schema filesTableSchema;
+ private final Schema projectedSchema;
+ private final Map<Integer, String> quotedNameById;
+ private final boolean isPartitioned;
ManifestReadTask(
Table table,
ManifestFile manifest,
Schema schema,
+ Schema projectedSchema,
String schemaString,
String specString,
ResidualEvaluator residuals) {
super(DataFiles.fromManifest(manifest), null, schemaString, specString,
residuals);
this.io = table.io();
this.specsById = Maps.newHashMap(table.specs());
this.manifest = manifest;
- this.schema = schema;
+ this.filesTableSchema = schema;
+ this.projectedSchema = projectedSchema;
+ this.dataTableSchema = table.schema();
+ this.quotedNameById =
TypeUtil.indexQuotedNameById(table.schema().asStruct(), name -> name);
Review Comment:
I am not sure it is a good idea to compute this for every manifest task as
there may be a lot of columns. I don't think it will degrade the performance
too much but will definitely generate some extra objects. I'd probably compute
it once and reuse to avoid surprises.
##########
core/src/main/java/org/apache/iceberg/BaseFilesTable.java:
##########
@@ -18,25 +18,50 @@
*/
package org.apache.iceberg;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
+import java.util.List;
import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.ManifestEvaluator;
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.StructType;
/** Base class logic for files metadata tables */
abstract class BaseFilesTable extends BaseMetadataTable {
+ static final Types.StructType READABLE_METRICS_VALUE =
+ Types.StructType.of(
+ optional(303, "column_size", Types.LongType.get(), "Total size on
disk"),
+ optional(304, "value_count", Types.LongType.get(), "Total count,
including null and NaN"),
+ optional(305, "null_value_count", Types.LongType.get(), "Null value
count"),
+ optional(306, "nan_value_count", Types.LongType.get(), "Nan value
count"),
Review Comment:
nit: `Nan` -> `NaN`
##########
core/src/main/java/org/apache/iceberg/BaseFilesTable.java:
##########
@@ -143,33 +174,49 @@ static class ManifestReadTask extends BaseFileScanTask
implements DataTask {
private final FileIO io;
private final Map<Integer, PartitionSpec> specsById;
private final ManifestFile manifest;
- private final Schema schema;
+ private final Schema dataTableSchema;
+ private final Schema filesTableSchema;
+ private final Schema projectedSchema;
+ private final Map<Integer, String> quotedNameById;
+ private final boolean isPartitioned;
ManifestReadTask(
Table table,
ManifestFile manifest,
Schema schema,
+ Schema projectedSchema,
String schemaString,
String specString,
ResidualEvaluator residuals) {
super(DataFiles.fromManifest(manifest), null, schemaString, specString,
residuals);
this.io = table.io();
this.specsById = Maps.newHashMap(table.specs());
this.manifest = manifest;
- this.schema = schema;
+ this.filesTableSchema = schema;
+ this.projectedSchema = projectedSchema;
+ this.dataTableSchema = table.schema();
+ this.quotedNameById =
TypeUtil.indexQuotedNameById(table.schema().asStruct(), name -> name);
+ this.isPartitioned = Partitioning.partitionType(table).fields().size() >
0;
Review Comment:
Can we check if the schema contains `DataFile.PARTITION_ID` instead of
creating a common partition type? I think we constructed this common partition
type earlier and included/excluded `DataFile.PARTITION_ID` based on that.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]