aokolnychyi commented on code in PR #7447: URL: https://github.com/apache/iceberg/pull/7447#discussion_r1248258767
########## core/src/main/java/org/apache/iceberg/metrics/InMemoryReadMetricReporter.java: ########## @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.metrics; + +public class InMemoryReadMetricReporter implements MetricsReporter { Review Comment: It is a bit hard to envision the proper design for this class but what if we make it a bit more generic and move the casting logic into the method that would access it? ``` public class InMemoryMetricsReporter implements MetricsReporter { private MetricsReport metricsReport; @Override public void report(MetricsReport report) { this.metricsReport = report; } public ScanReport scanReport() { // TODO: maybe add a precondition with a good message? return (ScanReport) metricsReport; } } ``` ########## spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java: ########## @@ -73,9 +75,9 @@ class SparkBatchQueryScan extends SparkPartitioningAwareScan<PartitionScanTask> Scan<?, ? extends ScanTask, ? extends ScanTaskGroup<?>> scan, SparkReadConf readConf, Schema expectedSchema, - List<Expression> filters) { - - super(spark, table, scan, readConf, expectedSchema, filters); + List<Expression> filters, + Supplier<ScanReport> metricsReportSupplier) { Review Comment: What about calling git `scanReportSupplier` to be a bit specific? ########## spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScanBuilder.java: ########## @@ -39,6 +40,7 @@ class SparkStagedScanBuilder implements ScanBuilder { @Override public Scan build() { - return new SparkStagedScan(spark, table, readConf); + return new SparkStagedScan( + spark, table, readConf, (new InMemoryReadMetricReporter())::scanReport); Review Comment: This change would not be needed if we pass `null` to parent constructor in `SparkStagedScan`. ########## spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalFileSize.java: ########## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import java.util.Arrays; +import org.apache.spark.sql.connector.metric.CustomMetric; + +public class TotalFileSize implements CustomMetric { Review Comment: I am not sure whether I already asked, can we extend `CustomSumMetric` and rely on built-in method for aggregating the result? It applies to all our `CustomMetric` implementations. ########## spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java: ########## @@ -96,6 +97,7 @@ public class SparkScanBuilder private boolean caseSensitive; private List<Expression> filterExpressions = null; private Filter[] pushedFilters = NO_FILTERS; + private final InMemoryReadMetricReporter metricsReporter; Review Comment: Minor: This should go to the block with final variables above, you may init it in in the definition (up to you). ``` private final SparkReadConf readConf; private final List<String> metaColumns = Lists.newArrayList(); private final InMemoryMetricsReporter metricsReporter = new InMemoryMetricsReporter(); ``` ########## spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java: ########## @@ -73,9 +75,9 @@ class SparkBatchQueryScan extends SparkPartitioningAwareScan<PartitionScanTask> Scan<?, ? extends ScanTask, ? extends ScanTaskGroup<?>> scan, SparkReadConf readConf, Schema expectedSchema, - List<Expression> filters) { - - super(spark, table, scan, readConf, expectedSchema, filters); + List<Expression> filters, + Supplier<ScanReport> metricsReportSupplier) { Review Comment: It is actually called `scanReportSupplier` in other places, let's make it consistent. ########## spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java: ########## @@ -170,8 +189,35 @@ public String description() { table(), branch(), Spark3Util.describe(filterExpressions), groupingKeyFieldNamesAsString); } + @Override + public CustomTaskMetric[] reportDriverMetrics() { + ScanReport scanReport = metricsReportSupplier != null ? metricsReportSupplier.get() : null; + if (scanReport == null) { + return new CustomTaskMetric[0]; + } + + List<CustomTaskMetric> driverMetrics = Lists.newArrayList(); + driverMetrics.add(TaskTotalFileSize.from(scanReport)); Review Comment: Is there a reason why we don't include all metrics from `ScanMetricsResult`? ########## spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java: ########## @@ -39,8 +41,12 @@ class SparkStagedScan extends SparkScan { private List<ScanTaskGroup<ScanTask>> taskGroups = null; // lazy cache of tasks - SparkStagedScan(SparkSession spark, Table table, SparkReadConf readConf) { - super(spark, table, readConf, table.schema(), ImmutableList.of()); + SparkStagedScan( + SparkSession spark, + Table table, + SparkReadConf readConf, + Supplier<ScanReport> scanReportSupplier) { + super(spark, table, readConf, table.schema(), ImmutableList.of(), scanReportSupplier); Review Comment: Shall we simply pass `null` here and remove the supplier from the `SparkStagedScan` constructor? We know there would be no metrics report available as it is a staged scan. ########## spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java: ########## @@ -67,7 +84,9 @@ abstract class SparkScan implements Scan, SupportsReportStatistics { Table table, SparkReadConf readConf, Schema expectedSchema, - List<Expression> filters) { + List<Expression> filters, + Supplier<ScanReport> metricsReportSupplier) { + this.metricsReportSupplier = metricsReportSupplier; Review Comment: Can we add this assignment as the last line in the constructors to follow the existing style? Also, let's call it `scanReportSupplier`. ########## core/src/main/java/org/apache/iceberg/metrics/InMemoryReadMetricReporter.java: ########## @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.metrics; + +public class InMemoryReadMetricReporter implements MetricsReporter { Review Comment: In the future, we may use it in other places and add more accessor methods. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
