szehon-ho commented on code in PR #7539:
URL: https://github.com/apache/iceberg/pull/7539#discussion_r1191798950
##########
core/src/main/java/org/apache/iceberg/BaseEntriesTable.java:
##########
@@ -125,31 +130,107 @@ ManifestFile manifest() {
@Override
public CloseableIterable<StructLike> rows() {
- // Project data-file fields
- CloseableIterable<StructLike> prunedRows;
- if (manifest.content() == ManifestContent.DATA) {
- prunedRows =
+ Types.NestedField readableMetricsField =
projection.findField(MetricsUtil.READABLE_METRICS);
+
+ if (readableMetricsField == null) {
+ CloseableIterable<StructLike> entryAsStruct =
CloseableIterable.transform(
- ManifestFiles.read(manifest, io).project(fileSchema).entries(),
- file -> (GenericManifestEntry<DataFile>) file);
+ entries(fileSchema),
+ entry -> (GenericManifestEntry<? extends ContentFile<?>>)
entry);
+
+ StructProjection structProjection = projectNonReadable(projection);
+ return CloseableIterable.transform(entryAsStruct,
structProjection::wrap);
} else {
- prunedRows =
- CloseableIterable.transform(
- ManifestFiles.readDeleteManifest(manifest, io, specsById)
- .project(fileSchema)
- .entries(),
- file -> (GenericManifestEntry<DeleteFile>) file);
+ Set<Integer> readableMetricsIds =
TypeUtil.getProjectedIds(readableMetricsField.type());
Review Comment:
I had some idea to make this method even a bit more clear, again by making
some private method that better describe these steps:
```
@Override
public CloseableIterable<StructLike> rows() {
Types.NestedField readableMetricsField =
projection.findField(MetricsUtil.READABLE_METRICS);
if (readableMetricsField == null) {
CloseableIterable<StructLike> entryAsStruct =
CloseableIterable.transform(
entries(fileProjection),
entry -> (GenericManifestEntry<? extends ContentFile<?>>)
entry);
StructProjection structProjection = projectNonReadable(projection);
return CloseableIterable.transform(entryAsStruct,
structProjection::wrap);
} else {
Schema requiredFileProjection =
requiredFileProjection(fileProjection);
Schema actualProjection = removeReadableMetrics(projection,
readableMetricsField);
StructProjection structProjection =
projectNonReadable(actualProjection);
return CloseableIterable.transform(
entries(requiredFileProjection),
entry ->
withReadableMetrics(structProjection, entry,
readableMetricsField));
}
}
```
Now its a bit more symmetrical.
We would need following methods:
```
/**
* Remove virtual columns from the file projection and ensure that the
underlying metrics
* used to create those columns are part of the file projection
* @return file projection with required columns to read readable metrics
*/
private Schema requiredFileProjection(Schema fileProjection) {
Schema projectionForReadableMetrics =
new Schema(
MetricsUtil.READABLE_METRIC_COLS.stream()
.map(MetricsUtil.ReadableMetricColDefinition::originalCol)
.collect(Collectors.toList()));
return TypeUtil.join(fileProjection, projectionForReadableMetrics);
}
private Schema removeReadableMetrics(Schema projection,
Types.NestedField readableMetricsField) {
Set<Integer> readableMetricsIds =
TypeUtil.getProjectedIds(readableMetricsField.type());
return TypeUtil.selectNot(projection, readableMetricsIds);
}
```
I also change withReadableMetrics a bit for this to work:
```
private StructLike withReadableMetrics(
StructProjection structProjection,
ManifestEntry<? extends ContentFile<?>> entry,
Types.NestedField readableMetricsField) {
int projectionColumnCount = projection.columns().size();
int metricsPosition =
projection.columns().indexOf(readableMetricsField);
StructProjection entryStruct = structProjection.wrap((StructLike)
entry);
StructType projectedMetricType =
projection.findField(MetricsUtil.READABLE_METRICS).type().asStructType();
MetricsUtil.ReadableMetricsStruct readableMetrics =
MetricsUtil.readableMetricsStruct(dataTableSchema, entry.file(),
projectedMetricType);
return new ManifestEntryStructWithMetrics(
projectionColumnCount, metricsPosition, entryStruct,
readableMetrics);
}
```
##########
core/src/main/java/org/apache/iceberg/BaseEntriesTable.java:
##########
@@ -92,26 +95,28 @@ static CloseableIterable<FileScanTask> planFiles(
}
static class ManifestReadTask extends BaseFileScanTask implements DataTask {
- private final Schema schema;
+ private final Schema projection;
private final Schema fileSchema;
+ private final Schema dataTableSchema;
private final FileIO io;
private final ManifestFile manifest;
private final Map<Integer, PartitionSpec> specsById;
ManifestReadTask(
Table table,
ManifestFile manifest,
- Schema schema,
+ Schema projection,
String schemaString,
String specString,
ResidualEvaluator residuals) {
super(DataFiles.fromManifest(manifest), null, schemaString, specString,
residuals);
- this.schema = schema;
+ this.projection = projection;
this.io = table.io();
this.manifest = manifest;
this.specsById = Maps.newHashMap(table.specs());
+ this.dataTableSchema = table.schema();
- Type fileProjection = schema.findType("data_file");
+ Type fileProjection = projection.findType("data_file");
Review Comment:
Can we rename to be more consistent with the schema => projection change?
fileProjection => fileProjectionType, fileSchema => fileProjection
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]