This is an automated email from the ASF dual-hosted git repository.
szehon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new dd2197f835 API: Fix aggregate pushdown when optional DataFile stats
are null (#10273)
dd2197f835 is described below
commit dd2197f835aa3f180c0f49246055ce665f9fef59
Author: Joshua Kolash <[email protected]>
AuthorDate: Thu May 23 15:48:33 2024 -0400
API: Fix aggregate pushdown when optional DataFile stats are null (#10273)
---
.../apache/iceberg/expressions/BoundAggregate.java | 7 +++
.../apache/iceberg/expressions/CountNonNull.java | 3 +-
.../apache/iceberg/expressions/MaxAggregate.java | 2 +-
.../apache/iceberg/expressions/MinAggregate.java | 2 +-
.../expressions/TestAggregateEvaluator.java | 52 ++++++++++++++++++++++
5 files changed, 63 insertions(+), 3 deletions(-)
diff --git
a/api/src/main/java/org/apache/iceberg/expressions/BoundAggregate.java
b/api/src/main/java/org/apache/iceberg/expressions/BoundAggregate.java
index f8db6eac20..4be4154c38 100644
--- a/api/src/main/java/org/apache/iceberg/expressions/BoundAggregate.java
+++ b/api/src/main/java/org/apache/iceberg/expressions/BoundAggregate.java
@@ -87,6 +87,13 @@ public class BoundAggregate<T, C> extends
Aggregate<BoundTerm<T>> implements Bou
}
}
+ <V> boolean safeContainsKey(Map<Integer, V> map, int key) {
+ if (map == null) {
+ return false;
+ }
+ return map.containsKey(key);
+ }
+
<V> V safeGet(Map<Integer, V> map, int key) {
return safeGet(map, key, null);
}
diff --git a/api/src/main/java/org/apache/iceberg/expressions/CountNonNull.java
b/api/src/main/java/org/apache/iceberg/expressions/CountNonNull.java
index 10afd72e2e..ecc83c9ef6 100644
--- a/api/src/main/java/org/apache/iceberg/expressions/CountNonNull.java
+++ b/api/src/main/java/org/apache/iceberg/expressions/CountNonNull.java
@@ -39,7 +39,8 @@ public class CountNonNull<T> extends CountAggregate<T> {
@Override
protected boolean hasValue(DataFile file) {
- return file.valueCounts().containsKey(fieldId) &&
file.nullValueCounts().containsKey(fieldId);
+ return safeContainsKey(file.valueCounts(), fieldId)
+ && file.nullValueCounts().containsKey(fieldId);
}
@Override
diff --git a/api/src/main/java/org/apache/iceberg/expressions/MaxAggregate.java
b/api/src/main/java/org/apache/iceberg/expressions/MaxAggregate.java
index 754da9046f..d37af7470d 100644
--- a/api/src/main/java/org/apache/iceberg/expressions/MaxAggregate.java
+++ b/api/src/main/java/org/apache/iceberg/expressions/MaxAggregate.java
@@ -40,7 +40,7 @@ public class MaxAggregate<T> extends ValueAggregate<T> {
@Override
protected boolean hasValue(DataFile file) {
- boolean hasBound = file.upperBounds().containsKey(fieldId);
+ boolean hasBound = safeContainsKey(file.upperBounds(), fieldId);
Long valueCount = safeGet(file.valueCounts(), fieldId);
Long nullCount = safeGet(file.nullValueCounts(), fieldId);
boolean boundAllNull =
diff --git a/api/src/main/java/org/apache/iceberg/expressions/MinAggregate.java
b/api/src/main/java/org/apache/iceberg/expressions/MinAggregate.java
index a6bcea4145..667b66d650 100644
--- a/api/src/main/java/org/apache/iceberg/expressions/MinAggregate.java
+++ b/api/src/main/java/org/apache/iceberg/expressions/MinAggregate.java
@@ -40,7 +40,7 @@ public class MinAggregate<T> extends ValueAggregate<T> {
@Override
protected boolean hasValue(DataFile file) {
- boolean hasBound = file.lowerBounds().containsKey(fieldId);
+ boolean hasBound = safeContainsKey(file.lowerBounds(), fieldId);
Long valueCount = safeGet(file.valueCounts(), fieldId);
Long nullCount = safeGet(file.nullValueCounts(), fieldId);
boolean boundAllNull =
diff --git
a/api/src/test/java/org/apache/iceberg/expressions/TestAggregateEvaluator.java
b/api/src/test/java/org/apache/iceberg/expressions/TestAggregateEvaluator.java
index 7230e721ae..b418dede86 100644
---
a/api/src/test/java/org/apache/iceberg/expressions/TestAggregateEvaluator.java
+++
b/api/src/test/java/org/apache/iceberg/expressions/TestAggregateEvaluator.java
@@ -95,6 +95,22 @@ public class TestAggregateEvaluator {
FILE, MISSING_SOME_NULLS_STATS_1, MISSING_SOME_NULLS_STATS_2
};
+ private static final DataFile MISSING_ALL_OPTIONAL_STATS =
+ new TestDataFile(
+ "file_null_stats.avro",
+ Row.of(),
+ 20,
+ // any value counts, including nulls
+ null,
+ // null value counts
+ null,
+ // nan value counts
+ null,
+ // lower bounds
+ null,
+ // upper bounds
+ null);
+
@Test
public void testIntAggregate() {
List<Expression> list =
@@ -173,6 +189,42 @@ public class TestAggregateEvaluator {
assertEvaluatorResult(result, expected);
}
+ @Test
+ public void testIntAggregateAllMissingStats() {
+ List<Expression> list =
+ ImmutableList.of(
+ Expressions.countStar(),
+ Expressions.count("id"),
+ Expressions.max("id"),
+ Expressions.min("id"));
+ AggregateEvaluator aggregateEvaluator = AggregateEvaluator.create(SCHEMA,
list);
+
+ aggregateEvaluator.update(MISSING_ALL_OPTIONAL_STATS);
+
+ assertThat(aggregateEvaluator.allAggregatorsValid()).isFalse();
+ StructLike result = aggregateEvaluator.result();
+ Object[] expected = {20L, null, null, null};
+ assertEvaluatorResult(result, expected);
+ }
+
+ @Test
+ public void testOptionalColAllMissingStats() {
+ List<Expression> list =
+ ImmutableList.of(
+ Expressions.countStar(),
+ Expressions.count("no_stats"),
+ Expressions.max("no_stats"),
+ Expressions.min("no_stats"));
+ AggregateEvaluator aggregateEvaluator = AggregateEvaluator.create(SCHEMA,
list);
+
+ aggregateEvaluator.update(MISSING_ALL_OPTIONAL_STATS);
+
+ assertThat(aggregateEvaluator.allAggregatorsValid()).isFalse();
+ StructLike result = aggregateEvaluator.result();
+ Object[] expected = {20L, null, null, null};
+ assertEvaluatorResult(result, expected);
+ }
+
private void assertEvaluatorResult(StructLike result, Object[] expected) {
Object[] actual = new Object[result.size()];
for (int i = 0; i < result.size(); i++) {