rdblue commented on code in PR #5872:
URL: https://github.com/apache/iceberg/pull/5872#discussion_r990820167
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java:
##########
@@ -145,6 +158,83 @@ public Filter[] pushedFilters() {
return pushedFilters;
}
+ @Override
+ public boolean pushAggregation(Aggregation aggregation) {
+ if (!(table instanceof BaseTable)) {
+ return false;
+ }
+ boolean aggregatePushdown =
+ Boolean.parseBoolean(
+ table
+ .properties()
+ .getOrDefault(AGGREGATE_PUSHDOWN_ENABLED,
AGGREGATE_PUSHDOWN_ENABLED_DEFAULT));
+ if (!aggregatePushdown) {
+ return false;
+ }
+
+ Snapshot currentSnapshot = table.currentSnapshot();
+ if (currentSnapshot != null) {
+ Map<String, String> map = currentSnapshot.summary();
+ // if there are row-level deletes in current snapshot, the statics
+ // maybe changed, so disable push down aggregate
+ if (Integer.parseInt(map.get("total-position-deletes")) > 0
+ || Integer.parseInt(map.get("total-equality-deletes")) > 0) {
+ return false;
+ }
+ }
+
+ // If the group by expression is not the same as the partition, the
statistics information
+ // in manifest files cannot be used to calculate min/max/count. However,
if the
+ // group by expression is not the same as the partition, the statistics
information can still
+ // be used to calculate min/max/count.
+ // Todo: enable aggregate push down for partition col group by expression
+ if (aggregation.groupByExpressions().length > 0) {
+ return false;
+ }
+
+ List<Expression> expressions =
+
Lists.newArrayListWithExpectedSize(aggregation.aggregateExpressions().length);
+ List<AggregateFunc> pushed =
+
Lists.newArrayListWithExpectedSize(aggregation.aggregateExpressions().length);
+ for (AggregateFunc aggregate : aggregation.aggregateExpressions()) {
+ Expression expr = SparkAggregates.convert(aggregate);
+ if (expr != null) {
+ try {
+ Binder.bind(schema.asStruct(), expr, caseSensitive);
+ expressions.add(expr);
+ pushed.add(aggregate);
+ } catch (ValidationException e) {
+ // binding to the table schema failed, so this expression cannot be
pushed down
+ // disable aggregate push down
+ LOG.info(
+ "Failed to bind expression to table schema, can't push down
aggregate to iceberg",
Review Comment:
How about "Failed to bind" or "Cannot push down aggregate (failed to bind):
%s"?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]