aokolnychyi commented on code in PR #7447:
URL: https://github.com/apache/iceberg/pull/7447#discussion_r1248258767


##########
core/src/main/java/org/apache/iceberg/metrics/InMemoryReadMetricReporter.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.metrics;
+
+public class InMemoryReadMetricReporter implements MetricsReporter {

Review Comment:
   It is a bit hard to envision the proper design for this class but what if we 
make it a bit more generic and move the casting logic into the method that 
would access it?
   
   ```
   public class InMemoryMetricsReporter implements MetricsReporter {
   
     private MetricsReport metricsReport;
   
     @Override
     public void report(MetricsReport report) {
       this.metricsReport = report;
     }
   
     public ScanReport scanReport() {
       // TODO: maybe add a precondition with a good message?
       return (ScanReport) metricsReport;
     }
   }
   ```



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java:
##########
@@ -73,9 +75,9 @@ class SparkBatchQueryScan extends 
SparkPartitioningAwareScan<PartitionScanTask>
       Scan<?, ? extends ScanTask, ? extends ScanTaskGroup<?>> scan,
       SparkReadConf readConf,
       Schema expectedSchema,
-      List<Expression> filters) {
-
-    super(spark, table, scan, readConf, expectedSchema, filters);
+      List<Expression> filters,
+      Supplier<ScanReport> metricsReportSupplier) {

Review Comment:
   What about calling git `scanReportSupplier` to be a bit specific?



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScanBuilder.java:
##########
@@ -39,6 +40,7 @@ class SparkStagedScanBuilder implements ScanBuilder {
 
   @Override
   public Scan build() {
-    return new SparkStagedScan(spark, table, readConf);
+    return new SparkStagedScan(
+        spark, table, readConf, (new 
InMemoryReadMetricReporter())::scanReport);

Review Comment:
   This change would not be needed if we pass `null` to parent constructor in 
`SparkStagedScan`.



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalFileSize.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source.metrics;
+
+import java.util.Arrays;
+import org.apache.spark.sql.connector.metric.CustomMetric;
+
+public class TotalFileSize implements CustomMetric {

Review Comment:
   I am not sure whether I already asked, can we extend `CustomSumMetric` and 
rely on built-in method for aggregating the result? It applies to all our 
`CustomMetric` implementations.



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java:
##########
@@ -96,6 +97,7 @@ public class SparkScanBuilder
   private boolean caseSensitive;
   private List<Expression> filterExpressions = null;
   private Filter[] pushedFilters = NO_FILTERS;
+  private final InMemoryReadMetricReporter metricsReporter;

Review Comment:
   Minor: This should go to the block with final variables above, you may init 
it in in the definition (up to you).
   
   ```
   private final SparkReadConf readConf;
   private final List<String> metaColumns = Lists.newArrayList();
   private final InMemoryMetricsReporter metricsReporter = new 
InMemoryMetricsReporter();
   ```



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java:
##########
@@ -73,9 +75,9 @@ class SparkBatchQueryScan extends 
SparkPartitioningAwareScan<PartitionScanTask>
       Scan<?, ? extends ScanTask, ? extends ScanTaskGroup<?>> scan,
       SparkReadConf readConf,
       Schema expectedSchema,
-      List<Expression> filters) {
-
-    super(spark, table, scan, readConf, expectedSchema, filters);
+      List<Expression> filters,
+      Supplier<ScanReport> metricsReportSupplier) {

Review Comment:
   It is actually called `scanReportSupplier` in other places, let's make it 
consistent.



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java:
##########
@@ -170,8 +189,35 @@ public String description() {
         table(), branch(), Spark3Util.describe(filterExpressions), 
groupingKeyFieldNamesAsString);
   }
 
+  @Override
+  public CustomTaskMetric[] reportDriverMetrics() {
+    ScanReport scanReport = metricsReportSupplier != null ? 
metricsReportSupplier.get() : null;
+    if (scanReport == null) {
+      return new CustomTaskMetric[0];
+    }
+
+    List<CustomTaskMetric> driverMetrics = Lists.newArrayList();
+    driverMetrics.add(TaskTotalFileSize.from(scanReport));

Review Comment:
   Is there a reason why we don't include all metrics from `ScanMetricsResult`?



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java:
##########
@@ -39,8 +41,12 @@ class SparkStagedScan extends SparkScan {
 
   private List<ScanTaskGroup<ScanTask>> taskGroups = null; // lazy cache of 
tasks
 
-  SparkStagedScan(SparkSession spark, Table table, SparkReadConf readConf) {
-    super(spark, table, readConf, table.schema(), ImmutableList.of());
+  SparkStagedScan(
+      SparkSession spark,
+      Table table,
+      SparkReadConf readConf,
+      Supplier<ScanReport> scanReportSupplier) {
+    super(spark, table, readConf, table.schema(), ImmutableList.of(), 
scanReportSupplier);

Review Comment:
   Shall we simply pass `null` here and remove the supplier from the 
`SparkStagedScan` constructor? We know there would be no metrics report 
available as it is a staged scan.



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java:
##########
@@ -67,7 +84,9 @@ abstract class SparkScan implements Scan, 
SupportsReportStatistics {
       Table table,
       SparkReadConf readConf,
       Schema expectedSchema,
-      List<Expression> filters) {
+      List<Expression> filters,
+      Supplier<ScanReport> metricsReportSupplier) {
+    this.metricsReportSupplier = metricsReportSupplier;

Review Comment:
   Can we add this assignment as the last line in the constructors to follow 
the existing style?
   Also, let's call it `scanReportSupplier`.



##########
core/src/main/java/org/apache/iceberg/metrics/InMemoryReadMetricReporter.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.metrics;
+
+public class InMemoryReadMetricReporter implements MetricsReporter {

Review Comment:
   In the future, we may use it in other places and add more accessor methods.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to