rdblue commented on code in PR #5872:
URL: https://github.com/apache/iceberg/pull/5872#discussion_r990820128


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java:
##########
@@ -145,6 +158,83 @@ public Filter[] pushedFilters() {
     return pushedFilters;
   }
 
+  @Override
+  public boolean pushAggregation(Aggregation aggregation) {
+    if (!(table instanceof BaseTable)) {
+      return false;
+    }
+    boolean aggregatePushdown =
+        Boolean.parseBoolean(
+            table
+                .properties()
+                .getOrDefault(AGGREGATE_PUSHDOWN_ENABLED, 
AGGREGATE_PUSHDOWN_ENABLED_DEFAULT));
+    if (!aggregatePushdown) {
+      return false;
+    }
+
+    Snapshot currentSnapshot = table.currentSnapshot();
+    if (currentSnapshot != null) {
+      Map<String, String> map = currentSnapshot.summary();
+      // if there are row-level deletes in current snapshot, the statics
+      // maybe changed, so disable push down aggregate
+      if (Integer.parseInt(map.get("total-position-deletes")) > 0
+          || Integer.parseInt(map.get("total-equality-deletes")) > 0) {
+        return false;
+      }
+    }
+
+    // If the group by expression is not the same as the partition, the 
statistics information
+    // in manifest files cannot be used to calculate min/max/count. However, 
if the
+    // group by expression is not the same as the partition, the statistics 
information can still
+    // be used to calculate min/max/count.
+    // Todo: enable aggregate push down for partition col group by expression
+    if (aggregation.groupByExpressions().length > 0) {
+      return false;
+    }
+
+    List<Expression> expressions =
+        
Lists.newArrayListWithExpectedSize(aggregation.aggregateExpressions().length);
+    List<AggregateFunc> pushed =
+        
Lists.newArrayListWithExpectedSize(aggregation.aggregateExpressions().length);
+    for (AggregateFunc aggregate : aggregation.aggregateExpressions()) {
+      Expression expr = SparkAggregates.convert(aggregate);
+      if (expr != null) {
+        try {
+          Binder.bind(schema.asStruct(), expr, caseSensitive);
+          expressions.add(expr);
+          pushed.add(aggregate);
+        } catch (ValidationException e) {
+          // binding to the table schema failed, so this expression cannot be 
pushed down
+          // disable aggregate push down
+          LOG.info(
+              "Failed to bind expression to table schema, can't push down 
aggregate to iceberg",
+              aggregate,
+              e.getMessage());
+          return false;
+        }
+      } else {
+        // only push down aggregates iff all of them can be pushed down.
+        LOG.info(
+            "Failed to convert this aggregate function to iceberg Aggregate, 
can't push down aggregate to iceberg",

Review Comment:
   These error messages can be more concise:
   > Failed to convert aggregate expression: %s
   
   You could also say "Cannot push down aggregate expression"
   
   Also, should these be INFO statements? I think it makes sense to have a 
message stating whether the group could be pushed down, but a message for each 
expression seems too verbose.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to