psvri commented on code in PR #15070:
URL: https://github.com/apache/iceberg/pull/15070#discussion_r2902198176
##########
spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java:
##########
@@ -766,6 +766,53 @@ public void testNaN() {
assertEquals("expected and actual should equal", expected, actual);
}
+ @TestTemplate
+ public void testNanWithLowerAndUpperBoundMetrics() {
+ sql("CREATE TABLE %s (id int, data float) USING iceberg PARTITIONED BY
(id)", tableName);
+ sql(
+ "INSERT INTO %s VALUES (1, float('nan')),"
+ + "(1, float('nan')), "
+ + "(1, 10.0), "
+ + "(2, 2), "
+ + "(2, float('nan')), "
+ + "(3, float('nan')), "
+ + "(3, 1)",
+ tableName);
+
+ // Validate all files has upper bound, lower bound and nan count
+ String countsQuery =
+ "select readable_metrics.data.nan_value_count > 0, "
+ + "isnull(readable_metrics.data.lower_bound), "
+ + "isnull(readable_metrics.data.upper_bound) "
+ + "from %s.files";
+
+ Object[] expectedResult = new Object[] {true, false, false};
+ assertThat(sql(countsQuery, tableName))
+ .as("Data files should contain nan count, lower bound and upper
bound.")
+ .allMatch(row -> Arrays.equals(row, expectedResult));
+
+ // Check aggregates are not pushdown
+ String select = "SELECT count(*), max(data), min(data), count(data) FROM
%s";
+
+ List<Object[]> explain = sql("EXPLAIN " + select, tableName);
+ String explainString =
explain.get(0)[0].toString().toLowerCase(Locale.ROOT);
+ boolean explainContainsPushDownAggregates = false;
Review Comment:
We are doing the same process in other test cases, hence I followed the same
here. When its not pushed down, we get a different explain string as shown
below.
Aggregates are not pushed down .
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[count(1), max(data#299), min(data#299),
count(data#299)])
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=223]
+- HashAggregate(keys=[], functions=[partial_count(1),
partial_max(data#299), partial_min(data#299), partial_count(data#299)])
+- BatchScan spark_catalog.default.temp[data#299]
spark_catalog.default.temp (branch=null) [filters=, groupedBy=] RuntimeFilters:
[]
```
Aggregates are pushed down
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[sum(agg_func_0#229L),
max(agg_func_1#230), min(agg_func_2#231), sum(agg_func_3#232L)])
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=175]
+- HashAggregate(keys=[], functions=[partial_sum(agg_func_0#229L),
partial_max(agg_func_1#230), partial_min(agg_func_2#231),
partial_sum(agg_func_3#232L)])
+- Project [count(*)#233L AS agg_func_0#229L, max(data)#234 AS
agg_func_1#230, min(data)#235 AS agg_func_2#231, count(data)#236L AS
agg_func_3#232L]
+- LocalTableScan [count(*)#233L, max(data)#234, min(data)#235,
count(data)#236L]
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]