This is an automated email from the ASF dual-hosted git repository.
huaxingao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 721980dd9c Spark: Disable min/max aggregation push down for string
under any mode (#16320)
721980dd9c is described below
commit 721980dd9c43e0d2daed9213430242443472a9c9
Author: Dong Wang <[email protected]>
AuthorDate: Thu May 14 06:12:28 2026 +0800
Spark: Disable min/max aggregation push down for string under any mode
(#16320)
---
.../iceberg/spark/source/SparkScanBuilder.java | 19 +++++++++----------
.../iceberg/spark/sql/TestAggregatePushDown.java | 20 ++++++++++----------
.../iceberg/spark/source/SparkScanBuilder.java | 19 +++++++++----------
.../iceberg/spark/sql/TestAggregatePushDown.java | 20 ++++++++++----------
.../iceberg/spark/source/SparkScanBuilder.java | 19 +++++++++----------
.../iceberg/spark/sql/TestAggregatePushDown.java | 20 ++++++++++----------
.../iceberg/spark/source/SparkScanBuilder.java | 19 +++++++++----------
.../iceberg/spark/sql/TestAggregatePushDown.java | 20 ++++++++++----------
8 files changed, 76 insertions(+), 80 deletions(-)
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
index 6c24d30e8b..3e26da4662 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -307,16 +307,15 @@ public class SparkScanBuilder
colName);
return false;
}
- } else if (mode instanceof MetricsModes.Truncate) {
- // lower_bounds and upper_bounds may be truncated, so disable push
down
- if (aggregate.type().typeId() == Type.TypeID.STRING) {
- if (aggregate.op() == Expression.Operation.MAX
- || aggregate.op() == Expression.Operation.MIN) {
- LOG.info(
- "Skipping aggregate pushdown: Cannot produce min or max from
truncated values for column {}",
- colName);
- return false;
- }
+ } else if (aggregate.type().typeId() == Type.TypeID.STRING) {
+ // lower_bounds and upper_bounds may have been truncated before, so
disable push down
+ // regardless of the current mode
+ if (aggregate.op() == Expression.Operation.MAX
+ || aggregate.op() == Expression.Operation.MIN) {
+ LOG.info(
+ "Skipping aggregate pushdown: Cannot produce min or max from
truncated values for column {}",
+ colName);
+ return false;
}
}
}
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java
index 75b308e58a..462cc5064e 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java
@@ -334,14 +334,14 @@ public class TestAggregatePushDown extends
CatalogTestBase {
@TestTemplate
public void testAggregateNotPushDownForStringType() {
sql("CREATE TABLE %s (id LONG, data STRING) USING iceberg", tableName);
- sql(
- "INSERT INTO TABLE %s VALUES (1, '1111'), (1, '2222'), (2, '3333'),
(2, '4444'), (3, '5555'), (3, '6666') ",
- tableName);
sql(
"ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')",
- tableName, TableProperties.DEFAULT_WRITE_METRICS_MODE, "truncate(16)");
+ tableName, TableProperties.DEFAULT_WRITE_METRICS_MODE, "truncate(5)");
+ sql(
+ "INSERT INTO TABLE %s VALUES (1, '111111'), (1, '2222'), (2, '3333'),
(2, '4444'), (3, '5555'), (3, '666666') ",
+ tableName);
- String select1 = "SELECT MAX(id), MAX(data) FROM %s";
+ String select1 = "SELECT MAX(id), MAX(data), MIN(data) FROM %s";
List<Object[]> explain1 = sql("EXPLAIN " + select1, tableName);
String explainString1 =
explain1.get(0)[0].toString().toLowerCase(Locale.ROOT);
@@ -356,7 +356,7 @@ public class TestAggregatePushDown extends CatalogTestBase {
List<Object[]> actual1 = sql(select1, tableName);
List<Object[]> expected1 = Lists.newArrayList();
- expected1.add(new Object[] {3L, "6666"});
+ expected1.add(new Object[] {3L, "666666", "111111"});
assertEquals("expected and actual should equal", expected1, actual1);
String select2 = "SELECT COUNT(data) FROM %s";
@@ -379,7 +379,7 @@ public class TestAggregatePushDown extends CatalogTestBase {
sql(
"ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')",
tableName, TableProperties.DEFAULT_WRITE_METRICS_MODE, "full");
- String select3 = "SELECT count(data), max(data) FROM %s";
+ String select3 = "SELECT count(data), max(data), min(data) FROM %s";
List<Object[]> explain3 = sql("EXPLAIN " + select3, tableName);
String explainString3 =
explain3.get(0)[0].toString().toLowerCase(Locale.ROOT);
if (explainString3.contains("count(data)") &&
explainString3.contains("max(data)")) {
@@ -387,12 +387,12 @@ public class TestAggregatePushDown extends
CatalogTestBase {
}
assertThat(explainContainsPushDownAggregates)
- .as("explain should contain the pushed down aggregates")
- .isTrue();
+ .as("explain should not contain the pushed down aggregates")
+ .isFalse();
List<Object[]> actual3 = sql(select3, tableName);
List<Object[]> expected3 = Lists.newArrayList();
- expected3.add(new Object[] {6L, "6666"});
+ expected3.add(new Object[] {6L, "666666", "111111"});
assertEquals("expected and actual should equal", expected3, actual3);
}
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
index 6c24d30e8b..3e26da4662 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -307,16 +307,15 @@ public class SparkScanBuilder
colName);
return false;
}
- } else if (mode instanceof MetricsModes.Truncate) {
- // lower_bounds and upper_bounds may be truncated, so disable push
down
- if (aggregate.type().typeId() == Type.TypeID.STRING) {
- if (aggregate.op() == Expression.Operation.MAX
- || aggregate.op() == Expression.Operation.MIN) {
- LOG.info(
- "Skipping aggregate pushdown: Cannot produce min or max from
truncated values for column {}",
- colName);
- return false;
- }
+ } else if (aggregate.type().typeId() == Type.TypeID.STRING) {
+ // lower_bounds and upper_bounds may have been truncated before, so
disable push down
+ // regardless of the current mode
+ if (aggregate.op() == Expression.Operation.MAX
+ || aggregate.op() == Expression.Operation.MIN) {
+ LOG.info(
+ "Skipping aggregate pushdown: Cannot produce min or max from
truncated values for column {}",
+ colName);
+ return false;
}
}
}
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java
index 75b308e58a..462cc5064e 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java
@@ -334,14 +334,14 @@ public class TestAggregatePushDown extends
CatalogTestBase {
@TestTemplate
public void testAggregateNotPushDownForStringType() {
sql("CREATE TABLE %s (id LONG, data STRING) USING iceberg", tableName);
- sql(
- "INSERT INTO TABLE %s VALUES (1, '1111'), (1, '2222'), (2, '3333'),
(2, '4444'), (3, '5555'), (3, '6666') ",
- tableName);
sql(
"ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')",
- tableName, TableProperties.DEFAULT_WRITE_METRICS_MODE, "truncate(16)");
+ tableName, TableProperties.DEFAULT_WRITE_METRICS_MODE, "truncate(5)");
+ sql(
+ "INSERT INTO TABLE %s VALUES (1, '111111'), (1, '2222'), (2, '3333'),
(2, '4444'), (3, '5555'), (3, '666666') ",
+ tableName);
- String select1 = "SELECT MAX(id), MAX(data) FROM %s";
+ String select1 = "SELECT MAX(id), MAX(data), MIN(data) FROM %s";
List<Object[]> explain1 = sql("EXPLAIN " + select1, tableName);
String explainString1 =
explain1.get(0)[0].toString().toLowerCase(Locale.ROOT);
@@ -356,7 +356,7 @@ public class TestAggregatePushDown extends CatalogTestBase {
List<Object[]> actual1 = sql(select1, tableName);
List<Object[]> expected1 = Lists.newArrayList();
- expected1.add(new Object[] {3L, "6666"});
+ expected1.add(new Object[] {3L, "666666", "111111"});
assertEquals("expected and actual should equal", expected1, actual1);
String select2 = "SELECT COUNT(data) FROM %s";
@@ -379,7 +379,7 @@ public class TestAggregatePushDown extends CatalogTestBase {
sql(
"ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')",
tableName, TableProperties.DEFAULT_WRITE_METRICS_MODE, "full");
- String select3 = "SELECT count(data), max(data) FROM %s";
+ String select3 = "SELECT count(data), max(data), min(data) FROM %s";
List<Object[]> explain3 = sql("EXPLAIN " + select3, tableName);
String explainString3 =
explain3.get(0)[0].toString().toLowerCase(Locale.ROOT);
if (explainString3.contains("count(data)") &&
explainString3.contains("max(data)")) {
@@ -387,12 +387,12 @@ public class TestAggregatePushDown extends
CatalogTestBase {
}
assertThat(explainContainsPushDownAggregates)
- .as("explain should contain the pushed down aggregates")
- .isTrue();
+ .as("explain should not contain the pushed down aggregates")
+ .isFalse();
List<Object[]> actual3 = sql(select3, tableName);
List<Object[]> expected3 = Lists.newArrayList();
- expected3.add(new Object[] {6L, "6666"});
+ expected3.add(new Object[] {6L, "666666", "111111"});
assertEquals("expected and actual should equal", expected3, actual3);
}
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
index 6c24d30e8b..3e26da4662 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -307,16 +307,15 @@ public class SparkScanBuilder
colName);
return false;
}
- } else if (mode instanceof MetricsModes.Truncate) {
- // lower_bounds and upper_bounds may be truncated, so disable push
down
- if (aggregate.type().typeId() == Type.TypeID.STRING) {
- if (aggregate.op() == Expression.Operation.MAX
- || aggregate.op() == Expression.Operation.MIN) {
- LOG.info(
- "Skipping aggregate pushdown: Cannot produce min or max from
truncated values for column {}",
- colName);
- return false;
- }
+ } else if (aggregate.type().typeId() == Type.TypeID.STRING) {
+ // lower_bounds and upper_bounds may have been truncated before, so
disable push down
+ // regardless of the current mode
+ if (aggregate.op() == Expression.Operation.MAX
+ || aggregate.op() == Expression.Operation.MIN) {
+ LOG.info(
+ "Skipping aggregate pushdown: Cannot produce min or max from
truncated values for column {}",
+ colName);
+ return false;
}
}
}
diff --git
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java
index 6eac5474af..e707067980 100644
---
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java
+++
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java
@@ -334,14 +334,14 @@ public class TestAggregatePushDown extends
CatalogTestBase {
@TestTemplate
public void testAggregateNotPushDownForStringType() {
sql("CREATE TABLE %s (id LONG, data STRING) USING iceberg", tableName);
- sql(
- "INSERT INTO TABLE %s VALUES (1, '1111'), (1, '2222'), (2, '3333'),
(2, '4444'), (3, '5555'), (3, '6666') ",
- tableName);
sql(
"ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')",
- tableName, TableProperties.DEFAULT_WRITE_METRICS_MODE, "truncate(16)");
+ tableName, TableProperties.DEFAULT_WRITE_METRICS_MODE, "truncate(5)");
+ sql(
+ "INSERT INTO TABLE %s VALUES (1, '111111'), (1, '2222'), (2, '3333'),
(2, '4444'), (3, '5555'), (3, '666666') ",
+ tableName);
- String select1 = "SELECT MAX(id), MAX(data) FROM %s";
+ String select1 = "SELECT MAX(id), MAX(data), MIN(data) FROM %s";
List<Object[]> explain1 = sql("EXPLAIN " + select1, tableName);
String explainString1 =
explain1.get(0)[0].toString().toLowerCase(Locale.ROOT);
@@ -356,7 +356,7 @@ public class TestAggregatePushDown extends CatalogTestBase {
List<Object[]> actual1 = sql(select1, tableName);
List<Object[]> expected1 = Lists.newArrayList();
- expected1.add(new Object[] {3L, "6666"});
+ expected1.add(new Object[] {3L, "666666", "111111"});
assertEquals("expected and actual should equal", expected1, actual1);
String select2 = "SELECT COUNT(data) FROM %s";
@@ -379,7 +379,7 @@ public class TestAggregatePushDown extends CatalogTestBase {
sql(
"ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')",
tableName, TableProperties.DEFAULT_WRITE_METRICS_MODE, "full");
- String select3 = "SELECT count(data), max(data) FROM %s";
+ String select3 = "SELECT count(data), max(data), min(data) FROM %s";
List<Object[]> explain3 = sql("EXPLAIN " + select3, tableName);
String explainString3 =
explain3.get(0)[0].toString().toLowerCase(Locale.ROOT);
if (explainString3.contains("count(data)") &&
explainString3.contains("max(data)")) {
@@ -387,12 +387,12 @@ public class TestAggregatePushDown extends
CatalogTestBase {
}
assertThat(explainContainsPushDownAggregates)
- .as("explain should contain the pushed down aggregates")
- .isTrue();
+ .as("explain should not contain the pushed down aggregates")
+ .isFalse();
List<Object[]> actual3 = sql(select3, tableName);
List<Object[]> expected3 = Lists.newArrayList();
- expected3.add(new Object[] {6L, "6666"});
+ expected3.add(new Object[] {6L, "666666", "111111"});
assertEquals("expected and actual should equal", expected3, actual3);
}
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
index 6423ee4076..ebbae01a70 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -223,16 +223,15 @@ public class SparkScanBuilder extends BaseSparkScanBuilder
colName);
return false;
}
- } else if (mode instanceof MetricsModes.Truncate) {
- // lower_bounds and upper_bounds may be truncated, so disable push
down
- if (aggregate.type().typeId() == Type.TypeID.STRING) {
- if (aggregate.op() == Expression.Operation.MAX
- || aggregate.op() == Expression.Operation.MIN) {
- LOG.info(
- "Skipping aggregate pushdown: Cannot produce min or max from
truncated values for column {}",
- colName);
- return false;
- }
+ } else if (aggregate.type().typeId() == Type.TypeID.STRING) {
+ // lower_bounds and upper_bounds may have been truncated before, so
disable push down
+ // regardless of the current mode
+ if (aggregate.op() == Expression.Operation.MAX
+ || aggregate.op() == Expression.Operation.MIN) {
+ LOG.info(
+ "Skipping aggregate pushdown: Cannot produce min or max from
truncated values for column {}",
+ colName);
+ return false;
}
}
}
diff --git
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java
index 6eac5474af..e707067980 100644
---
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java
+++
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java
@@ -334,14 +334,14 @@ public class TestAggregatePushDown extends
CatalogTestBase {
@TestTemplate
public void testAggregateNotPushDownForStringType() {
sql("CREATE TABLE %s (id LONG, data STRING) USING iceberg", tableName);
- sql(
- "INSERT INTO TABLE %s VALUES (1, '1111'), (1, '2222'), (2, '3333'),
(2, '4444'), (3, '5555'), (3, '6666') ",
- tableName);
sql(
"ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')",
- tableName, TableProperties.DEFAULT_WRITE_METRICS_MODE, "truncate(16)");
+ tableName, TableProperties.DEFAULT_WRITE_METRICS_MODE, "truncate(5)");
+ sql(
+ "INSERT INTO TABLE %s VALUES (1, '111111'), (1, '2222'), (2, '3333'),
(2, '4444'), (3, '5555'), (3, '666666') ",
+ tableName);
- String select1 = "SELECT MAX(id), MAX(data) FROM %s";
+ String select1 = "SELECT MAX(id), MAX(data), MIN(data) FROM %s";
List<Object[]> explain1 = sql("EXPLAIN " + select1, tableName);
String explainString1 =
explain1.get(0)[0].toString().toLowerCase(Locale.ROOT);
@@ -356,7 +356,7 @@ public class TestAggregatePushDown extends CatalogTestBase {
List<Object[]> actual1 = sql(select1, tableName);
List<Object[]> expected1 = Lists.newArrayList();
- expected1.add(new Object[] {3L, "6666"});
+ expected1.add(new Object[] {3L, "666666", "111111"});
assertEquals("expected and actual should equal", expected1, actual1);
String select2 = "SELECT COUNT(data) FROM %s";
@@ -379,7 +379,7 @@ public class TestAggregatePushDown extends CatalogTestBase {
sql(
"ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')",
tableName, TableProperties.DEFAULT_WRITE_METRICS_MODE, "full");
- String select3 = "SELECT count(data), max(data) FROM %s";
+ String select3 = "SELECT count(data), max(data), min(data) FROM %s";
List<Object[]> explain3 = sql("EXPLAIN " + select3, tableName);
String explainString3 =
explain3.get(0)[0].toString().toLowerCase(Locale.ROOT);
if (explainString3.contains("count(data)") &&
explainString3.contains("max(data)")) {
@@ -387,12 +387,12 @@ public class TestAggregatePushDown extends
CatalogTestBase {
}
assertThat(explainContainsPushDownAggregates)
- .as("explain should contain the pushed down aggregates")
- .isTrue();
+ .as("explain should not contain the pushed down aggregates")
+ .isFalse();
List<Object[]> actual3 = sql(select3, tableName);
List<Object[]> expected3 = Lists.newArrayList();
- expected3.add(new Object[] {6L, "6666"});
+ expected3.add(new Object[] {6L, "666666", "111111"});
assertEquals("expected and actual should equal", expected3, actual3);
}