aokolnychyi commented on code in PR #5376:
URL: https://github.com/apache/iceberg/pull/5376#discussion_r952132633
##########
api/src/main/java/org/apache/iceberg/Schema.java:
##########
@@ -233,6 +233,16 @@ public Map<String, Integer> getAliases() {
return aliasToId;
}
+ /**
+ * Returns a map for this schema between field id and qualified field names.
Initializes the map,
+ * if it has not been initialized by calls to #{@link #findColumnName(int)}
Review Comment:
nit: Redundant `#` before `{@link ...}`?
##########
core/src/main/java/org/apache/iceberg/BaseFilesTable.java:
##########
@@ -143,39 +144,53 @@ static class ManifestReadTask extends BaseFileScanTask
implements DataTask {
private final FileIO io;
private final Map<Integer, PartitionSpec> specsById;
private final ManifestFile manifest;
- private final Schema schema;
+ private final Schema dataTableSchema;
+ private final Schema projectedSchema;
ManifestReadTask(
Table table,
ManifestFile manifest,
- Schema schema,
+ Schema projectedSchema,
Review Comment:
nit: You can probably call it just `projection`, give that you call the
derived var as `fileProjection`.
##########
core/src/main/java/org/apache/iceberg/BaseFilesTable.java:
##########
@@ -186,4 +201,43 @@ ManifestFile manifest() {
return manifest;
}
}
+
+ static class ContentFileStructWithMetrics implements StructLike {
+ private final Schema projectedFilesSchema;
+ private final StructLike fileAsStruct;
+ private final Map<String, StructLike> readableMetrics;
+
+ ContentFileStructWithMetrics(
+ Schema projectedFileSchema,
+ StructLike fileAsStruct,
+ Map<String, StructLike> readableMetrics) {
+ this.projectedFilesSchema = projectedFileSchema;
+ this.fileAsStruct = fileAsStruct;
+ this.readableMetrics = readableMetrics;
+ }
+
+ @Override
+ public int size() {
+ return projectedFilesSchema.columns().size() + 1;
+ }
+
+ @Override
+ public <T> T get(int pos, Class<T> javaClass) {
+ if (pos < projectedFilesSchema.columns().size()) {
+ return fileAsStruct.get(pos, javaClass);
+ } else if (pos == projectedFilesSchema.columns().size()) {
+ return javaClass.cast(readableMetrics);
+ } else {
+ throw new IllegalArgumentException(
+ String.format(
+ "Illegal position access for FileRow: %d, max allowed is %d",
+ pos, fileAsStruct.size()));
+ }
+ }
+
+ @Override
+ public <T> void set(int pos, T value) {
+ throw new UnsupportedOperationException("FileEntryRow is read only");
Review Comment:
nit: What's `FileEntryRow`?
##########
core/src/main/java/org/apache/iceberg/MetricsUtil.java:
##########
@@ -56,4 +64,116 @@ public static MetricsModes.MetricsMode metricsMode(
String columnName = inputSchema.findColumnName(fieldId);
return metricsConfig.columnMode(columnName);
}
+
+ /**
+ * Return a readable metrics map
+ *
+ * @param schema schema of original data table
+ * @param contentFile content file with metrics
+ * @return map of readable column name to column metric, of which the bounds
are made readable
+ */
+ public static Map<String, StructLike> readableMetricsMap(
+ Schema schema, ContentFile<?> contentFile) {
Review Comment:
nit: If you call it `file` instead of `contentFile`, the arg definition
would fit on one line.
##########
api/src/main/java/org/apache/iceberg/Schema.java:
##########
@@ -233,6 +233,16 @@ public Map<String, Integer> getAliases() {
return aliasToId;
}
+ /**
+ * Returns a map for this schema between field id and qualified field names.
Initializes the map,
+ * if it has not been initialized by calls to #{@link #findColumnName(int)}
Review Comment:
nit: Missing `.` at the end of the sentence?
##########
core/src/main/java/org/apache/iceberg/BaseFilesTable.java:
##########
@@ -143,39 +144,53 @@ static class ManifestReadTask extends BaseFileScanTask
implements DataTask {
private final FileIO io;
private final Map<Integer, PartitionSpec> specsById;
private final ManifestFile manifest;
- private final Schema schema;
+ private final Schema dataTableSchema;
+ private final Schema projectedSchema;
ManifestReadTask(
Table table,
ManifestFile manifest,
- Schema schema,
+ Schema projectedSchema,
String schemaString,
String specString,
ResidualEvaluator residuals) {
super(DataFiles.fromManifest(manifest), null, schemaString, specString,
residuals);
this.io = table.io();
this.specsById = Maps.newHashMap(table.specs());
this.manifest = manifest;
- this.schema = schema;
+ this.dataTableSchema = table.schema();
+ this.projectedSchema = projectedSchema;
}
@Override
public CloseableIterable<StructLike> rows() {
- return CloseableIterable.transform(manifestEntries(), file ->
(StructLike) file);
+ Set<Integer> projectedIds =
TypeUtil.getProjectedIds(DataFile.READABLE_METRICS.type());
+ Schema fileProjection = TypeUtil.selectNot(projectedSchema,
projectedIds);
Review Comment:
nit: Strictly speaking, you can directly use `projection` when metrics are
not projection.
```
if (projection.findColumnName(DataFile.READABLE_METRICS.fieldId()) == null) {
return CloseableIterable.transform(files(projection), file -> (StructLike)
file);
} else {
Schema fileProjection = TypeUtil.selectNot(projection,
READABLE_METRICS_FIELD_IDS);
return CloseableIterable.transform(files(fileProjection), ...);
}
```
##########
core/src/main/java/org/apache/iceberg/BaseFilesTable.java:
##########
@@ -143,39 +144,53 @@ static class ManifestReadTask extends BaseFileScanTask
implements DataTask {
private final FileIO io;
private final Map<Integer, PartitionSpec> specsById;
private final ManifestFile manifest;
- private final Schema schema;
+ private final Schema dataTableSchema;
+ private final Schema projectedSchema;
ManifestReadTask(
Table table,
ManifestFile manifest,
- Schema schema,
+ Schema projectedSchema,
String schemaString,
String specString,
ResidualEvaluator residuals) {
super(DataFiles.fromManifest(manifest), null, schemaString, specString,
residuals);
this.io = table.io();
this.specsById = Maps.newHashMap(table.specs());
this.manifest = manifest;
- this.schema = schema;
+ this.dataTableSchema = table.schema();
+ this.projectedSchema = projectedSchema;
}
@Override
public CloseableIterable<StructLike> rows() {
- return CloseableIterable.transform(manifestEntries(), file ->
(StructLike) file);
+ Set<Integer> projectedIds =
TypeUtil.getProjectedIds(DataFile.READABLE_METRICS.type());
+ Schema fileProjection = TypeUtil.selectNot(projectedSchema,
projectedIds);
+ if (projectedSchema.findColumnName(DataFile.READABLE_METRICS.fieldId())
== null) {
+ return CloseableIterable.transform(files(fileProjection), file ->
(StructLike) file);
+ } else {
+ return CloseableIterable.transform(
+ files(fileProjection), file -> wrapWithMetrics(file,
fileProjection));
+ }
}
- private CloseableIterable<? extends ContentFile<?>> manifestEntries() {
+ private CloseableIterable<? extends ContentFile<?>> files(Schema
fileProjection) {
switch (manifest.content()) {
case DATA:
- return ManifestFiles.read(manifest, io, specsById).project(schema);
+ return ManifestFiles.read(manifest, io,
specsById).project(fileProjection);
case DELETES:
- return ManifestFiles.readDeleteManifest(manifest, io,
specsById).project(schema);
+ return ManifestFiles.readDeleteManifest(manifest, io,
specsById).project(fileProjection);
default:
throw new IllegalArgumentException(
"Unsupported manifest content type:" + manifest.content());
}
}
+ private StructLike wrapWithMetrics(ContentFile<?> file, Schema
fileProjection) {
Review Comment:
If you decide to compute `fileProjection` only if metrics are projected, you
could remove the second arg and the place that calls this method will fit on
one line.
```
private StructLike wrapWithMetrics(ContentFile<?> file) {
int structSize = projection.columns().size();
Map<String, StructLike> metrics = ...
return new ContentFileStructWithMetrics(structSize, (StructLike) file,
metrics);
}
```
##########
core/src/main/java/org/apache/iceberg/BaseFilesTable.java:
##########
@@ -143,39 +144,53 @@ static class ManifestReadTask extends BaseFileScanTask
implements DataTask {
private final FileIO io;
private final Map<Integer, PartitionSpec> specsById;
private final ManifestFile manifest;
- private final Schema schema;
+ private final Schema dataTableSchema;
+ private final Schema projectedSchema;
ManifestReadTask(
Table table,
ManifestFile manifest,
- Schema schema,
+ Schema projectedSchema,
String schemaString,
String specString,
ResidualEvaluator residuals) {
super(DataFiles.fromManifest(manifest), null, schemaString, specString,
residuals);
this.io = table.io();
this.specsById = Maps.newHashMap(table.specs());
this.manifest = manifest;
- this.schema = schema;
+ this.dataTableSchema = table.schema();
+ this.projectedSchema = projectedSchema;
}
@Override
public CloseableIterable<StructLike> rows() {
- return CloseableIterable.transform(manifestEntries(), file ->
(StructLike) file);
+ Set<Integer> projectedIds =
TypeUtil.getProjectedIds(DataFile.READABLE_METRICS.type());
Review Comment:
What about defining a constant for this in `ManifestReadTask` and having
more specific name?
Something like `READABLE_METRICS_FIELD_IDS`?
##########
core/src/main/java/org/apache/iceberg/BaseFilesTable.java:
##########
@@ -186,4 +201,43 @@ ManifestFile manifest() {
return manifest;
}
}
+
+ static class ContentFileStructWithMetrics implements StructLike {
+ private final Schema projectedFilesSchema;
Review Comment:
I'd probably just define a variable called `size` instead of referencing the
complete schema.
Overall, `StructLike` is serializable so we have to be careful with what we
put into fields. Serializing an int will be much more efficient and would work
across different serialization mechanisms.
##########
core/src/main/java/org/apache/iceberg/BaseFilesTable.java:
##########
@@ -186,4 +201,43 @@ ManifestFile manifest() {
return manifest;
}
}
+
+ static class ContentFileStructWithMetrics implements StructLike {
+ private final Schema projectedFilesSchema;
+ private final StructLike fileAsStruct;
+ private final Map<String, StructLike> readableMetrics;
+
+ ContentFileStructWithMetrics(
+ Schema projectedFileSchema,
+ StructLike fileAsStruct,
+ Map<String, StructLike> readableMetrics) {
+ this.projectedFilesSchema = projectedFileSchema;
+ this.fileAsStruct = fileAsStruct;
+ this.readableMetrics = readableMetrics;
+ }
+
+ @Override
+ public int size() {
+ return projectedFilesSchema.columns().size() + 1;
+ }
+
+ @Override
+ public <T> T get(int pos, Class<T> javaClass) {
+ if (pos < projectedFilesSchema.columns().size()) {
+ return fileAsStruct.get(pos, javaClass);
+ } else if (pos == projectedFilesSchema.columns().size()) {
+ return javaClass.cast(readableMetrics);
+ } else {
+ throw new IllegalArgumentException(
+ String.format(
+ "Illegal position access for FileRow: %d, max allowed is %d",
Review Comment:
nit: What's `FileRow`?
##########
core/src/main/java/org/apache/iceberg/BaseFilesTable.java:
##########
@@ -143,39 +144,53 @@ static class ManifestReadTask extends BaseFileScanTask
implements DataTask {
private final FileIO io;
private final Map<Integer, PartitionSpec> specsById;
private final ManifestFile manifest;
- private final Schema schema;
+ private final Schema dataTableSchema;
+ private final Schema projectedSchema;
ManifestReadTask(
Table table,
ManifestFile manifest,
- Schema schema,
+ Schema projectedSchema,
String schemaString,
String specString,
ResidualEvaluator residuals) {
super(DataFiles.fromManifest(manifest), null, schemaString, specString,
residuals);
this.io = table.io();
this.specsById = Maps.newHashMap(table.specs());
this.manifest = manifest;
- this.schema = schema;
+ this.dataTableSchema = table.schema();
+ this.projectedSchema = projectedSchema;
}
@Override
public CloseableIterable<StructLike> rows() {
- return CloseableIterable.transform(manifestEntries(), file ->
(StructLike) file);
+ Set<Integer> projectedIds =
TypeUtil.getProjectedIds(DataFile.READABLE_METRICS.type());
Review Comment:
Could you please add tests where we project only some subfields in the
struct value?
I don't expect schema pruning to happen on the Iceberg side but it should
work correctly.
```
SELECT readable_metrics['col1'].lower_bound,
readable_metrics['col1'].upper_bound ...
```
##########
core/src/main/java/org/apache/iceberg/BaseFilesTable.java:
##########
@@ -143,39 +144,53 @@ static class ManifestReadTask extends BaseFileScanTask
implements DataTask {
private final FileIO io;
private final Map<Integer, PartitionSpec> specsById;
private final ManifestFile manifest;
- private final Schema schema;
+ private final Schema dataTableSchema;
+ private final Schema projectedSchema;
ManifestReadTask(
Table table,
ManifestFile manifest,
- Schema schema,
+ Schema projectedSchema,
String schemaString,
String specString,
ResidualEvaluator residuals) {
super(DataFiles.fromManifest(manifest), null, schemaString, specString,
residuals);
this.io = table.io();
this.specsById = Maps.newHashMap(table.specs());
this.manifest = manifest;
- this.schema = schema;
+ this.dataTableSchema = table.schema();
+ this.projectedSchema = projectedSchema;
}
@Override
public CloseableIterable<StructLike> rows() {
- return CloseableIterable.transform(manifestEntries(), file ->
(StructLike) file);
+ Set<Integer> projectedIds =
TypeUtil.getProjectedIds(DataFile.READABLE_METRICS.type());
+ Schema fileProjection = TypeUtil.selectNot(projectedSchema,
projectedIds);
+ if (projectedSchema.findColumnName(DataFile.READABLE_METRICS.fieldId())
== null) {
+ return CloseableIterable.transform(files(fileProjection), file ->
(StructLike) file);
+ } else {
+ return CloseableIterable.transform(
+ files(fileProjection), file -> wrapWithMetrics(file,
fileProjection));
+ }
}
- private CloseableIterable<? extends ContentFile<?>> manifestEntries() {
+ private CloseableIterable<? extends ContentFile<?>> files(Schema
fileProjection) {
Review Comment:
Looks good to me.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]