aokolnychyi commented on code in PR #5376:
URL: https://github.com/apache/iceberg/pull/5376#discussion_r949458152
##########
core/src/main/java/org/apache/iceberg/MetricsUtil.java:
##########
@@ -56,4 +64,123 @@ public static MetricsModes.MetricsMode metricsMode(
String columnName = inputSchema.findColumnName(fieldId);
return metricsConfig.columnMode(columnName);
}
+
+ /**
+ * Return a readable metrics map
+ *
+ * @param schema schema of original data table
+ * @param namesById pre-computed map of all column ids in schema to readable
name, see {@link
+ * org.apache.iceberg.types.TypeUtil#indexNameById(Types.StructType)}
+ * @param contentFile content file with metrics
+ * @return map of readable column name to column metric, of which the bounds
are made readable
+ */
+ public static Map<String, StructLike> readableMetricsMap(
+ Schema schema, Map<Integer, String> namesById, ContentFile<?>
contentFile) {
Review Comment:
If we go with `schema.findColumnName`, we may drop `namesById` here. If you
shorten contentFile to file, it would even fit on one line.
##########
core/src/main/java/org/apache/iceberg/BaseFilesTable.java:
##########
@@ -185,5 +235,51 @@ public Iterable<FileScanTask> split(long splitSize) {
ManifestFile manifest() {
return manifest;
}
+
+ private List<Function<ContentFile<?>, Object>> accessors(boolean
partitioned) {
+ List<Function<ContentFile<?>, Object>> accessors =
+ Lists.newArrayList(
+ file -> file.content().id(),
+ ContentFile::path,
+ file -> file.format().toString(),
+ ContentFile::specId,
+ ContentFile::partition,
+ ContentFile::recordCount,
+ ContentFile::fileSizeInBytes,
+ ContentFile::columnSizes,
+ ContentFile::valueCounts,
+ ContentFile::nullValueCounts,
+ ContentFile::nanValueCounts,
+ ContentFile::lowerBounds,
+ ContentFile::upperBounds,
+ ContentFile::keyMetadata,
+ ContentFile::splitOffsets,
+ ContentFile::equalityFieldIds,
+ ContentFile::sortOrderId,
+ file -> MetricsUtil.readableMetricsMap(dataTableSchema,
dataTableFields, file));
+ return partitioned
+ ? accessors
+ : Stream.concat(
+ accessors.subList(0, 4).stream(), accessors.subList(5,
accessors.size()).stream())
+ .collect(Collectors.toList());
+ }
+
+ private List<Object> projectedFields(
Review Comment:
Are we calling this per row now?
##########
core/src/main/java/org/apache/iceberg/MetricsUtil.java:
##########
@@ -56,4 +64,123 @@ public static MetricsModes.MetricsMode metricsMode(
String columnName = inputSchema.findColumnName(fieldId);
return metricsConfig.columnMode(columnName);
}
+
+ /**
+ * Return a readable metrics map
+ *
+ * @param schema schema of original data table
+ * @param namesById pre-computed map of all column ids in schema to readable
name, see {@link
+ * org.apache.iceberg.types.TypeUtil#indexNameById(Types.StructType)}
+ * @param contentFile content file with metrics
+ * @return map of readable column name to column metric, of which the bounds
are made readable
+ */
+ public static Map<String, StructLike> readableMetricsMap(
+ Schema schema, Map<Integer, String> namesById, ContentFile<?>
contentFile) {
+ Map<String, StructLike> metricsStruct =
Maps.newHashMapWithExpectedSize(namesById.size());
+
+ Map<Integer, Long> columnSizes = contentFile.columnSizes();
+ Map<Integer, Long> valueCounts = contentFile.valueCounts();
+ Map<Integer, Long> nullValueCounts = contentFile.nullValueCounts();
+ Map<Integer, Long> nanValueCounts = contentFile.nanValueCounts();
+ Map<Integer, ByteBuffer> lowerBounds = contentFile.lowerBounds();
+ Map<Integer, ByteBuffer> upperBounds = contentFile.upperBounds();
+
+ for (int id : namesById.keySet()) {
+ Types.NestedField field = schema.findField(id);
+ if (field.type().isPrimitiveType()) {
+ // Iceberg stores metrics only for primitive types
+ String colName = namesById.get(id);
+ ReadableMetricsStruct struct =
+ new ReadableMetricsStruct(
+ columnSizes == null ? null : columnSizes.get(id),
+ valueCounts == null ? null : valueCounts.get(id),
+ nullValueCounts == null ? null : nullValueCounts.get(id),
+ nanValueCounts == null ? null : nanValueCounts.get(id),
+ lowerBounds == null ? null : convertToReadable(field,
lowerBounds.get(id)),
+ upperBounds == null ? null : convertToReadable(field,
upperBounds.get(id)));
+ metricsStruct.put(colName, struct);
+ }
+ }
+ return metricsStruct;
Review Comment:
nit: What about an empty line before the return statement to separate the
for loop above?
##########
core/src/main/java/org/apache/iceberg/MetricsUtil.java:
##########
@@ -56,4 +64,123 @@ public static MetricsModes.MetricsMode metricsMode(
String columnName = inputSchema.findColumnName(fieldId);
return metricsConfig.columnMode(columnName);
}
+
+ /**
+ * Return a readable metrics map
+ *
+ * @param schema schema of original data table
+ * @param namesById pre-computed map of all column ids in schema to readable
name, see {@link
+ * org.apache.iceberg.types.TypeUtil#indexNameById(Types.StructType)}
+ * @param contentFile content file with metrics
+ * @return map of readable column name to column metric, of which the bounds
are made readable
+ */
+ public static Map<String, StructLike> readableMetricsMap(
+ Schema schema, Map<Integer, String> namesById, ContentFile<?>
contentFile) {
+ Map<String, StructLike> metricsStruct =
Maps.newHashMapWithExpectedSize(namesById.size());
Review Comment:
nit: `metricsStruct` -> `metricsMap`?
##########
spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java:
##########
@@ -1743,11 +1749,99 @@ private GenericData.Record manifestRecord(
.build();
}
- private void asMetadataRecord(GenericData.Record file) {
+ static final Types.StructType EXPECTED_METRICS_VALUE_TYPE =
Review Comment:
If we move the field to `DataFile`, we can reuse it in tests as well.
##########
core/src/main/java/org/apache/iceberg/BaseFilesTable.java:
##########
@@ -18,25 +18,50 @@
*/
package org.apache.iceberg;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
+import java.util.List;
import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.ManifestEvaluator;
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.StructType;
/** Base class logic for files metadata tables */
abstract class BaseFilesTable extends BaseMetadataTable {
+ static final Types.StructType READABLE_METRICS_VALUE =
Review Comment:
The current solution seems safe enough but your idea to put it in `DataFile`
can be a bit better, I guess. Then we can just reuse the next ID that we were
supposed to assign and this field will be accessible in all other places too.
We don't have to return the new field in `DataFile$getType`.
##########
core/src/main/java/org/apache/iceberg/BaseFilesTable.java:
##########
@@ -185,5 +232,60 @@ public Iterable<FileScanTask> split(long splitSize) {
ManifestFile manifest() {
return manifest;
}
+
+ private List<Function<ContentFile<?>, Object>> accessors(boolean
partitioned) {
Review Comment:
Oh, I see, let me think.
##########
core/src/main/java/org/apache/iceberg/MetricsUtil.java:
##########
@@ -56,4 +64,123 @@ public static MetricsModes.MetricsMode metricsMode(
String columnName = inputSchema.findColumnName(fieldId);
return metricsConfig.columnMode(columnName);
}
+
+ /**
+ * Return a readable metrics map
+ *
+ * @param schema schema of original data table
+ * @param namesById pre-computed map of all column ids in schema to readable
name, see {@link
+ * org.apache.iceberg.types.TypeUtil#indexNameById(Types.StructType)}
+ * @param contentFile content file with metrics
+ * @return map of readable column name to column metric, of which the bounds
are made readable
+ */
+ public static Map<String, StructLike> readableMetricsMap(
+ Schema schema, Map<Integer, String> namesById, ContentFile<?>
contentFile) {
+ Map<String, StructLike> metricsStruct =
Maps.newHashMapWithExpectedSize(namesById.size());
+
+ Map<Integer, Long> columnSizes = contentFile.columnSizes();
+ Map<Integer, Long> valueCounts = contentFile.valueCounts();
+ Map<Integer, Long> nullValueCounts = contentFile.nullValueCounts();
+ Map<Integer, Long> nanValueCounts = contentFile.nanValueCounts();
+ Map<Integer, ByteBuffer> lowerBounds = contentFile.lowerBounds();
+ Map<Integer, ByteBuffer> upperBounds = contentFile.upperBounds();
+
+ for (int id : namesById.keySet()) {
+ Types.NestedField field = schema.findField(id);
+ if (field.type().isPrimitiveType()) {
+ // Iceberg stores metrics only for primitive types
+ String colName = namesById.get(id);
+ ReadableMetricsStruct struct =
+ new ReadableMetricsStruct(
+ columnSizes == null ? null : columnSizes.get(id),
+ valueCounts == null ? null : valueCounts.get(id),
+ nullValueCounts == null ? null : nullValueCounts.get(id),
+ nanValueCounts == null ? null : nanValueCounts.get(id),
+ lowerBounds == null ? null : convertToReadable(field,
lowerBounds.get(id)),
+ upperBounds == null ? null : convertToReadable(field,
upperBounds.get(id)));
+ metricsStruct.put(colName, struct);
+ }
+ }
+ return metricsStruct;
+ }
+
+ public static String convertToReadable(Types.NestedField field, ByteBuffer
value) {
+ if (field == null || value == null) {
+ return null;
+ }
+ try {
+ return Transforms.identity(field.type())
+ .toHumanString(Conversions.fromByteBuffer(field.type(), value));
+ } catch (Exception e) {
+ LOG.warn("Error converting metric to readable form", e);
+ return null;
+ }
+ }
+
+ public static class ReadableMetricsStruct implements StructLike {
+
+ private final Long columnSize;
+ private final Long valueCount;
+ private final Long nullValueCount;
+ private final Long nanValueCount;
+ private final String lowerBound;
+ private final String upperBound;
+
+ public ReadableMetricsStruct(
+ Long columnSize,
+ Long valueCount,
+ Long nullValueCount,
+ Long nanValueCount,
+ String lowerBound,
+ String upperBound) {
+ this.columnSize = columnSize;
+ this.valueCount = valueCount;
+ this.nullValueCount = nullValueCount;
+ this.nanValueCount = nanValueCount;
+ this.lowerBound = lowerBound;
+ this.upperBound = upperBound;
+ }
+
+ @Override
+ public int size() {
+ return 6;
+ }
+
+ @Override
+ public <T> T get(int pos, Class<T> javaClass) {
+ Object value;
+ switch (pos) {
Review Comment:
nit: Would having `private Object get(int pos)` help separate getting the
value from casting?
##########
core/src/main/java/org/apache/iceberg/BaseFilesTable.java:
##########
@@ -143,33 +177,49 @@ static class ManifestReadTask extends BaseFileScanTask
implements DataTask {
private final FileIO io;
private final Map<Integer, PartitionSpec> specsById;
private final ManifestFile manifest;
- private final Schema schema;
+ private final Schema filesTableSchema;
+ private final Schema dataTableSchema;
+ private final Schema projectedSchema;
+ private final Map<Integer, String> dataTableFields;
+ private final boolean isPartitioned;
ManifestReadTask(
Table table,
ManifestFile manifest,
Schema schema,
+ Schema projectedSchema,
String schemaString,
String specString,
- ResidualEvaluator residuals) {
+ ResidualEvaluator residuals,
+ Map<Integer, String> dataTableFields) {
super(DataFiles.fromManifest(manifest), null, schemaString, specString,
residuals);
this.io = table.io();
this.specsById = Maps.newHashMap(table.specs());
this.manifest = manifest;
- this.schema = schema;
+ this.filesTableSchema = schema;
+ this.dataTableSchema = table.schema();
+ this.dataTableFields = dataTableFields;
+ this.isPartitioned = filesTableSchema.findField(DataFile.PARTITION_ID)
!= null;
+ this.projectedSchema = projectedSchema;
}
@Override
public CloseableIterable<StructLike> rows() {
- return CloseableIterable.transform(manifestEntries(), file ->
(StructLike) file);
+ return CloseableIterable.transform(
+ manifestEntries(),
+ fileEntry ->
+ StaticDataTask.Row.of(
+ projectedFields(fileEntry,
accessors(isPartitioned)).toArray()));
}
private CloseableIterable<? extends ContentFile<?>> manifestEntries() {
Review Comment:
Not directly related to this PR but it should probably be called `files` as
we are returning `ContentFile` and not the manifest entry objects.
##########
core/src/main/java/org/apache/iceberg/BaseFilesTable.java:
##########
@@ -82,11 +107,20 @@ private static CloseableIterable<FileScanTask> planFiles(
Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
+ Map<Integer, String> fieldById =
TypeUtil.indexNameById(table.schema().asStruct());
Review Comment:
Actually, my original comment was only partially true. I thought we were
quoting, which would generate new strings but we just need to index columns. If
so, we can probably use `schema$findColumnName` directly.
I believe `Schema` uses `TypeUtil` under the hood and will cache the map for
us already.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]