aokolnychyi commented on code in PR #5376:
URL: https://github.com/apache/iceberg/pull/5376#discussion_r956441487
##########
api/src/main/java/org/apache/iceberg/types/TypeUtil.java:
##########
@@ -148,6 +148,16 @@ public static Schema join(Schema left, Schema right) {
return new Schema(joinedColumns);
}
+ public static Schema joinCommon(Schema left, Schema right) {
Review Comment:
What about simply adapting the existing `join` method? Are there any
scenarios where we want to skip the validation and simply add all columns (old
logic)?
##########
api/src/main/java/org/apache/iceberg/types/TypeUtil.java:
##########
@@ -148,6 +148,16 @@ public static Schema join(Schema left, Schema right) {
return new Schema(joinedColumns);
}
+ public static Schema joinCommon(Schema left, Schema right) {
+ List<Types.NestedField> joinedColumns = Lists.newArrayList(left.columns());
+ for (Types.NestedField column : right.columns()) {
+ if (!joinedColumns.contains(column)) {
Review Comment:
We are calling `contains` on a list, which is suboptimal. We also don't do
any validation when IDs are conflicting but the rest of the field metadata
(like type) is different.
What about something like this?
```
List<Types.NestedField> joinedColumns = Lists.newArrayList(left.columns());
for (Types.NestedField rightColumn : right.columns()) {
Types.NestedField leftColumn = left.findField(rightColumn.fieldId());
Preconditions.checkArgument(
leftColumn == null || leftColumn.equals(rightColumn),
"...");
if (leftColumn == null) {
joinedColumns.add(rightColumn);
}
}
return new Schema(joinedColumns);
```
##########
api/src/main/java/org/apache/iceberg/DataFile.java:
##########
@@ -99,10 +99,24 @@ public interface DataFile extends ContentFile<DataFile> {
optional(140, "sort_order_id", IntegerType.get(), "Sort order ID");
Types.NestedField SPEC_ID = optional(141, "spec_id", IntegerType.get(),
"Partition spec ID");
+ Types.StructType READABLE_METRICS_VALUE_TYPE =
Review Comment:
Are there better names we can use?
`READABLE_METRICS_VALUE_TYPE` - the struct type or the type of the value in
the map
`READABLE_METRICS_VALUE ` - the type of the map with metrics
Will `READABLE_METRICS_VALUE_TYPE` and `READABLE_METRICS_TYPE` make more
sense?
##########
core/src/main/java/org/apache/iceberg/BaseFilesTable.java:
##########
@@ -140,42 +143,92 @@ protected CloseableIterable<FileScanTask> doPlanFiles() {
}
static class ManifestReadTask extends BaseFileScanTask implements DataTask {
+
+ static final Set<Integer> READABLE_METRICS_FIELD_IDS =
+ TypeUtil.getProjectedIds(DataFile.READABLE_METRICS.type());
+ static final Schema MIN_PROJECTION_FOR_READABLE_METRICS =
+ new Schema(
+ DataFile.COLUMN_SIZES,
+ DataFile.VALUE_COUNTS,
+ DataFile.NULL_VALUE_COUNTS,
+ DataFile.NAN_VALUE_COUNTS,
+ DataFile.LOWER_BOUNDS,
+ DataFile.UPPER_BOUNDS);
+
private final FileIO io;
private final Map<Integer, PartitionSpec> specsById;
private final ManifestFile manifest;
- private final Schema schema;
+ private final Schema dataTableSchema;
+ private final Schema projection;
ManifestReadTask(
Table table,
ManifestFile manifest,
- Schema schema,
+ Schema projection,
String schemaString,
String specString,
ResidualEvaluator residuals) {
super(DataFiles.fromManifest(manifest), null, schemaString, specString,
residuals);
this.io = table.io();
this.specsById = Maps.newHashMap(table.specs());
this.manifest = manifest;
- this.schema = schema;
+ this.dataTableSchema = table.schema();
+ this.projection = projection;
}
@Override
public CloseableIterable<StructLike> rows() {
- return CloseableIterable.transform(manifestEntries(), file ->
(StructLike) file);
+ if (projection.findColumnName(DataFile.READABLE_METRICS.fieldId()) ==
null) {
+ return CloseableIterable.transform(files(projection), file ->
(StructLike) file);
+ } else {
+ Schema fileProjection = TypeUtil.selectNot(projection,
READABLE_METRICS_FIELD_IDS);
+ Schema minProjection =
+ TypeUtil.joinCommon(fileProjection,
MIN_PROJECTION_FOR_READABLE_METRICS);
+ return CloseableIterable.transform(files(minProjection),
this::withReadableMetrics);
+ }
}
- private CloseableIterable<? extends ContentFile<?>> manifestEntries() {
+ private CloseableIterable<? extends ContentFile<?>> files(Schema
fileProjection) {
switch (manifest.content()) {
case DATA:
- return ManifestFiles.read(manifest, io, specsById).project(schema);
+ return ManifestFiles.read(manifest, io,
specsById).project(fileProjection);
case DELETES:
- return ManifestFiles.readDeleteManifest(manifest, io,
specsById).project(schema);
+ return ManifestFiles.readDeleteManifest(manifest, io,
specsById).project(fileProjection);
default:
throw new IllegalArgumentException(
"Unsupported manifest content type:" + manifest.content());
}
}
+ private StructLike withReadableMetrics(ContentFile<?> file) {
+ int structSize = projection.columns().size();
+ Map<String, StructLike> metrics =
+ MetricsUtil.readableMetricsMap(
+ dataTableSchema, file, readableMetricsProjection(projection));
+ return new ContentFileStructWithMetrics(structSize, (StructLike) file,
metrics);
+ }
+
+ // Handles projections for readable metrics struct
+ private Map<Integer, Integer> readableMetricsProjection(Schema
projectedSchema) {
Review Comment:
Optional: It probably belongs more to `MetricsUtil` than here. We could pass
`StructType metricsType` and compute the map in the utility so that if there is
another place that needs it, we won't have to move this method around. The
utility can also make sure the map of indices is serializable.
##########
core/src/main/java/org/apache/iceberg/BaseFilesTable.java:
##########
@@ -140,42 +143,92 @@ protected CloseableIterable<FileScanTask> doPlanFiles() {
}
static class ManifestReadTask extends BaseFileScanTask implements DataTask {
+
+ static final Set<Integer> READABLE_METRICS_FIELD_IDS =
+ TypeUtil.getProjectedIds(DataFile.READABLE_METRICS.type());
+ static final Schema MIN_PROJECTION_FOR_READABLE_METRICS =
+ new Schema(
+ DataFile.COLUMN_SIZES,
+ DataFile.VALUE_COUNTS,
+ DataFile.NULL_VALUE_COUNTS,
+ DataFile.NAN_VALUE_COUNTS,
+ DataFile.LOWER_BOUNDS,
+ DataFile.UPPER_BOUNDS);
+
private final FileIO io;
private final Map<Integer, PartitionSpec> specsById;
private final ManifestFile manifest;
- private final Schema schema;
+ private final Schema dataTableSchema;
+ private final Schema projection;
ManifestReadTask(
Table table,
ManifestFile manifest,
- Schema schema,
+ Schema projection,
String schemaString,
String specString,
ResidualEvaluator residuals) {
super(DataFiles.fromManifest(manifest), null, schemaString, specString,
residuals);
this.io = table.io();
this.specsById = Maps.newHashMap(table.specs());
this.manifest = manifest;
- this.schema = schema;
+ this.dataTableSchema = table.schema();
+ this.projection = projection;
}
@Override
public CloseableIterable<StructLike> rows() {
- return CloseableIterable.transform(manifestEntries(), file ->
(StructLike) file);
+ if (projection.findColumnName(DataFile.READABLE_METRICS.fieldId()) ==
null) {
+ return CloseableIterable.transform(files(projection), file ->
(StructLike) file);
+ } else {
+ Schema fileProjection = TypeUtil.selectNot(projection,
READABLE_METRICS_FIELD_IDS);
+ Schema minProjection =
+ TypeUtil.joinCommon(fileProjection,
MIN_PROJECTION_FOR_READABLE_METRICS);
+ return CloseableIterable.transform(files(minProjection),
this::withReadableMetrics);
+ }
}
- private CloseableIterable<? extends ContentFile<?>> manifestEntries() {
+ private CloseableIterable<? extends ContentFile<?>> files(Schema
fileProjection) {
switch (manifest.content()) {
case DATA:
- return ManifestFiles.read(manifest, io, specsById).project(schema);
+ return ManifestFiles.read(manifest, io,
specsById).project(fileProjection);
case DELETES:
- return ManifestFiles.readDeleteManifest(manifest, io,
specsById).project(schema);
+ return ManifestFiles.readDeleteManifest(manifest, io,
specsById).project(fileProjection);
default:
throw new IllegalArgumentException(
"Unsupported manifest content type:" + manifest.content());
}
}
+ private StructLike withReadableMetrics(ContentFile<?> file) {
+ int structSize = projection.columns().size();
+ Map<String, StructLike> metrics =
+ MetricsUtil.readableMetricsMap(
+ dataTableSchema, file, readableMetricsProjection(projection));
+ return new ContentFileStructWithMetrics(structSize, (StructLike) file,
metrics);
+ }
+
+ // Handles projections for readable metrics struct
+ private Map<Integer, Integer> readableMetricsProjection(Schema
projectedSchema) {
+ Map<Integer, Integer> projectionMap = Maps.newHashMap();
+ Type type =
projectedSchema.findType(DataFile.READABLE_METRICS_VALUE.valueId());
+ if (type != null) {
Review Comment:
I don't think `type` can be null. We don't have to add more validation as it
is checked above. If the type is null, the projection map will be empty, which
will break `ReadableMetricsStruct` later. Seems like we can simply remove the
if statement.
##########
core/src/main/java/org/apache/iceberg/BaseFilesTable.java:
##########
@@ -140,42 +143,92 @@ protected CloseableIterable<FileScanTask> doPlanFiles() {
}
static class ManifestReadTask extends BaseFileScanTask implements DataTask {
+
+ static final Set<Integer> READABLE_METRICS_FIELD_IDS =
Review Comment:
Any particular reasons not to make it private?
##########
core/src/main/java/org/apache/iceberg/BaseFilesTable.java:
##########
@@ -186,4 +239,41 @@ ManifestFile manifest() {
return manifest;
}
}
+
+ static class ContentFileStructWithMetrics implements StructLike {
+ private final int structSize;
+ private final StructLike fileAsStruct;
+ private final Map<String, StructLike> readableMetrics;
+
+ ContentFileStructWithMetrics(
+ int structSize, StructLike fileAsStruct, Map<String, StructLike>
readableMetrics) {
+ this.structSize = structSize;
+ this.fileAsStruct = fileAsStruct;
+ this.readableMetrics = readableMetrics;
+ }
+
+ @Override
+ public int size() {
+ return structSize;
+ }
+
+ @Override
+ public <T> T get(int pos, Class<T> javaClass) {
+ if (pos < (structSize - 1)) {
+ return fileAsStruct.get(pos, javaClass);
+ } else if (pos == (structSize - 1)) {
+ return javaClass.cast(readableMetrics);
+ } else {
+ throw new IllegalArgumentException(
+ String.format(
+ "Illegal position access for ContentFileStructWithMetrics: %d,
max allowed is %d",
+ pos, (structSize - 1)));
+ }
+ }
+
+ @Override
+ public <T> void set(int pos, T value) {
+ throw new UnsupportedOperationException("FileEntryRow is read only");
Review Comment:
Still references `FileEntryRow`?
##########
core/src/main/java/org/apache/iceberg/BaseFilesTable.java:
##########
@@ -140,42 +143,92 @@ protected CloseableIterable<FileScanTask> doPlanFiles() {
}
static class ManifestReadTask extends BaseFileScanTask implements DataTask {
+
+ static final Set<Integer> READABLE_METRICS_FIELD_IDS =
+ TypeUtil.getProjectedIds(DataFile.READABLE_METRICS.type());
+ static final Schema MIN_PROJECTION_FOR_READABLE_METRICS =
+ new Schema(
+ DataFile.COLUMN_SIZES,
+ DataFile.VALUE_COUNTS,
+ DataFile.NULL_VALUE_COUNTS,
+ DataFile.NAN_VALUE_COUNTS,
+ DataFile.LOWER_BOUNDS,
+ DataFile.UPPER_BOUNDS);
+
private final FileIO io;
private final Map<Integer, PartitionSpec> specsById;
private final ManifestFile manifest;
- private final Schema schema;
+ private final Schema dataTableSchema;
+ private final Schema projection;
ManifestReadTask(
Table table,
ManifestFile manifest,
- Schema schema,
+ Schema projection,
String schemaString,
String specString,
ResidualEvaluator residuals) {
super(DataFiles.fromManifest(manifest), null, schemaString, specString,
residuals);
this.io = table.io();
this.specsById = Maps.newHashMap(table.specs());
this.manifest = manifest;
- this.schema = schema;
+ this.dataTableSchema = table.schema();
+ this.projection = projection;
}
@Override
public CloseableIterable<StructLike> rows() {
- return CloseableIterable.transform(manifestEntries(), file ->
(StructLike) file);
+ if (projection.findColumnName(DataFile.READABLE_METRICS.fieldId()) ==
null) {
+ return CloseableIterable.transform(files(projection), file ->
(StructLike) file);
+ } else {
+ Schema fileProjection = TypeUtil.selectNot(projection,
READABLE_METRICS_FIELD_IDS);
+ Schema minProjection =
Review Comment:
I think this logic should be part of `BaseFilesTableScan` and
`BaseAllFilesTableScan`.
Otherwise, our scans won't report the correct schema in `Scan$schema()`.
##########
core/src/main/java/org/apache/iceberg/BaseFilesTable.java:
##########
@@ -140,42 +143,92 @@ protected CloseableIterable<FileScanTask> doPlanFiles() {
}
static class ManifestReadTask extends BaseFileScanTask implements DataTask {
+
+ static final Set<Integer> READABLE_METRICS_FIELD_IDS =
+ TypeUtil.getProjectedIds(DataFile.READABLE_METRICS.type());
+ static final Schema MIN_PROJECTION_FOR_READABLE_METRICS =
+ new Schema(
+ DataFile.COLUMN_SIZES,
+ DataFile.VALUE_COUNTS,
+ DataFile.NULL_VALUE_COUNTS,
+ DataFile.NAN_VALUE_COUNTS,
+ DataFile.LOWER_BOUNDS,
+ DataFile.UPPER_BOUNDS);
+
private final FileIO io;
private final Map<Integer, PartitionSpec> specsById;
private final ManifestFile manifest;
- private final Schema schema;
+ private final Schema dataTableSchema;
+ private final Schema projection;
ManifestReadTask(
Table table,
ManifestFile manifest,
- Schema schema,
+ Schema projection,
String schemaString,
String specString,
ResidualEvaluator residuals) {
super(DataFiles.fromManifest(manifest), null, schemaString, specString,
residuals);
this.io = table.io();
this.specsById = Maps.newHashMap(table.specs());
this.manifest = manifest;
- this.schema = schema;
+ this.dataTableSchema = table.schema();
+ this.projection = projection;
}
@Override
public CloseableIterable<StructLike> rows() {
- return CloseableIterable.transform(manifestEntries(), file ->
(StructLike) file);
+ if (projection.findColumnName(DataFile.READABLE_METRICS.fieldId()) ==
null) {
+ return CloseableIterable.transform(files(projection), file ->
(StructLike) file);
+ } else {
+ Schema fileProjection = TypeUtil.selectNot(projection,
READABLE_METRICS_FIELD_IDS);
+ Schema minProjection =
+ TypeUtil.joinCommon(fileProjection,
MIN_PROJECTION_FOR_READABLE_METRICS);
+ return CloseableIterable.transform(files(minProjection),
this::withReadableMetrics);
+ }
}
- private CloseableIterable<? extends ContentFile<?>> manifestEntries() {
+ private CloseableIterable<? extends ContentFile<?>> files(Schema
fileProjection) {
switch (manifest.content()) {
case DATA:
- return ManifestFiles.read(manifest, io, specsById).project(schema);
+ return ManifestFiles.read(manifest, io,
specsById).project(fileProjection);
case DELETES:
- return ManifestFiles.readDeleteManifest(manifest, io,
specsById).project(schema);
+ return ManifestFiles.readDeleteManifest(manifest, io,
specsById).project(fileProjection);
default:
throw new IllegalArgumentException(
"Unsupported manifest content type:" + manifest.content());
}
}
+ private StructLike withReadableMetrics(ContentFile<?> file) {
+ int structSize = projection.columns().size();
+ Map<String, StructLike> metrics =
+ MetricsUtil.readableMetricsMap(
+ dataTableSchema, file, readableMetricsProjection(projection));
+ return new ContentFileStructWithMetrics(structSize, (StructLike) file,
metrics);
+ }
+
+ // Handles projections for readable metrics struct
+ private Map<Integer, Integer> readableMetricsProjection(Schema
projectedSchema) {
+ Map<Integer, Integer> projectionMap = Maps.newHashMap();
+ Type type =
projectedSchema.findType(DataFile.READABLE_METRICS_VALUE.valueId());
+ if (type != null) {
+ Set<Types.NestedField> projectedFields =
Sets.newHashSet(type.asStructType().fields());
+ int projectedIndex = 0;
+ for (int fieldIndex = 0;
+ fieldIndex < DataFile.READABLE_METRICS_VALUE_TYPE.fields().size();
Review Comment:
nit: Will a helper variable avoid the need to split this into multiple lines?
```
List<Types.NestedField> allMetricFields =
READABLE_METRICS_STRUCT_TYPE.fields();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]