rdblue commented on code in PR #5872:
URL: https://github.com/apache/iceberg/pull/5872#discussion_r990820290


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java:
##########
@@ -145,6 +158,83 @@ public Filter[] pushedFilters() {
     return pushedFilters;
   }
 
+  @Override
+  public boolean pushAggregation(Aggregation aggregation) {
+    if (!(table instanceof BaseTable)) {
+      return false;
+    }
+    boolean aggregatePushdown =
+        Boolean.parseBoolean(
+            table
+                .properties()
+                .getOrDefault(AGGREGATE_PUSHDOWN_ENABLED, 
AGGREGATE_PUSHDOWN_ENABLED_DEFAULT));
+    if (!aggregatePushdown) {
+      return false;
+    }
+
+    Snapshot currentSnapshot = table.currentSnapshot();
+    if (currentSnapshot != null) {
+      Map<String, String> map = currentSnapshot.summary();
+      // if there are row-level deletes in current snapshot, the statics
+      // maybe changed, so disable push down aggregate
+      if (Integer.parseInt(map.get("total-position-deletes")) > 0
+          || Integer.parseInt(map.get("total-equality-deletes")) > 0) {
+        return false;
+      }
+    }
+
+    // If the group by expression is not the same as the partition, the 
statistics information
+    // in manifest files cannot be used to calculate min/max/count. However, 
if the
+    // group by expression is not the same as the partition, the statistics 
information can still
+    // be used to calculate min/max/count.
+    // Todo: enable aggregate push down for partition col group by expression
+    if (aggregation.groupByExpressions().length > 0) {
+      return false;
+    }
+
+    List<Expression> expressions =
+        
Lists.newArrayListWithExpectedSize(aggregation.aggregateExpressions().length);
+    List<AggregateFunc> pushed =
+        
Lists.newArrayListWithExpectedSize(aggregation.aggregateExpressions().length);
+    for (AggregateFunc aggregate : aggregation.aggregateExpressions()) {
+      Expression expr = SparkAggregates.convert(aggregate);
+      if (expr != null) {
+        try {
+          Binder.bind(schema.asStruct(), expr, caseSensitive);
+          expressions.add(expr);
+          pushed.add(aggregate);
+        } catch (ValidationException e) {
+          // binding to the table schema failed, so this expression cannot be 
pushed down
+          // disable aggregate push down
+          LOG.info(
+              "Failed to bind expression to table schema, can't push down 
aggregate to iceberg",
+              aggregate,
+              e.getMessage());
+          return false;
+        }
+      } else {
+        // only push down aggregates iff all of them can be pushed down.
+        LOG.info(
+            "Failed to convert this aggregate function to iceberg Aggregate, 
can't push down aggregate to iceberg",
+            aggregate);
+        return false;
+      }
+    }
+
+    this.aggregateExpressions = expressions;
+
+    pushedAggregateSchema =
+        SparkPushedDownAggregateUtil.buildSchemaForPushedDownAggregate(
+            aggregateExpressions, caseSensitive, schema);
+    if (pushedAggregateSchema != null

Review Comment:
   What if the aggregation can't be pushed down because of missing metadata? 
For example, if lower/upper bounds are missing for a column because the metrics 
mode doesn't track the metadata?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to