This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 71b05af09e Spark 4.1: Simplify description and toString in scans
(#15281)
71b05af09e is described below
commit 71b05af09e1012c38eadd2e9393a517dfaf6ab21
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Tue Feb 10 07:06:09 2026 -0800
Spark 4.1: Simplify description and toString in scans (#15281)
---
.../apache/iceberg/spark/extensions/TestMerge.java | 2 +-
.../iceberg/spark/source/SparkBatchQueryScan.java | 23 ++++++-----
.../iceberg/spark/source/SparkChangelogScan.java | 25 ++++--------
.../iceberg/spark/source/SparkCopyOnWriteScan.java | 14 +++----
.../iceberg/spark/source/SparkLocalScan.java | 8 ++--
.../spark/source/SparkPartitioningAwareScan.java | 7 ++++
.../org/apache/iceberg/spark/source/SparkScan.java | 15 +++-----
.../iceberg/spark/source/SparkStagedScan.java | 8 ++--
.../apache/iceberg/spark/source/TestSparkScan.java | 45 ++++++++++++++++++++++
.../iceberg/spark/sql/TestFilterPushDown.java | 2 +-
10 files changed, 90 insertions(+), 59 deletions(-)
diff --git
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
index dd1a5b74aa..3f584031d9 100644
---
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
+++
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
@@ -3014,7 +3014,7 @@ public abstract class TestMerge extends
SparkRowLevelOperationsTestBase {
assertThat(planAsString)
.as("Pushed filters must match")
- .contains("[filters=" + icebergFilters + ",");
+ .contains(", filters=" + icebergFilters + ",");
});
}
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
index a361a7f1ba..0ec77d9d06 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
@@ -255,8 +255,8 @@ class SparkBatchQueryScan extends
SparkPartitioningAwareScan<PartitionScanTask>
return table().name().equals(that.table().name())
&& Objects.equals(branch(), that.branch())
&& readSchema().equals(that.readSchema()) // compare Spark schemas to
ignore field ids
- &&
filterExpressions().toString().equals(that.filterExpressions().toString())
- &&
runtimeFilterExpressions.toString().equals(that.runtimeFilterExpressions.toString())
+ && filtersDesc().equals(that.filtersDesc())
+ && runtimeFiltersDesc().equals(that.runtimeFiltersDesc())
&& Objects.equals(snapshotId, that.snapshotId)
&& Objects.equals(startSnapshotId, that.startSnapshotId)
&& Objects.equals(endSnapshotId, that.endSnapshotId)
@@ -270,8 +270,8 @@ class SparkBatchQueryScan extends
SparkPartitioningAwareScan<PartitionScanTask>
table().name(),
branch(),
readSchema(),
- filterExpressions().toString(),
- runtimeFilterExpressions.toString(),
+ filtersDesc(),
+ runtimeFiltersDesc(),
snapshotId,
startSnapshotId,
endSnapshotId,
@@ -280,14 +280,13 @@ class SparkBatchQueryScan extends
SparkPartitioningAwareScan<PartitionScanTask>
}
@Override
- public String toString() {
+ public String description() {
return String.format(
- "IcebergScan(table=%s, branch=%s, type=%s, filters=%s,
runtimeFilters=%s, caseSensitive=%s)",
- table(),
- branch(),
- expectedSchema().asStruct(),
- filterExpressions(),
- runtimeFilterExpressions,
- caseSensitive());
+ "IcebergScan(table=%s, branch=%s, filters=%s, runtimeFilters=%s,
groupedBy=%s)",
+ table(), branch(), filtersDesc(), runtimeFiltersDesc(),
groupingKeyDesc());
+ }
+
+ private String runtimeFiltersDesc() {
+ return Spark3Util.describe(runtimeFilterExpressions);
}
}
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
index 55ea137ca1..eb4659f3eb 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
@@ -129,23 +129,11 @@ class SparkChangelogScan implements Scan,
SupportsReportStatistics {
public String description() {
return String.format(
Locale.ROOT,
- "%s [fromSnapshotId=%d, toSnapshotId=%d, filters=%s]",
+ "IcebergChangelogScan(table=%s, fromSnapshotId=%d, toSnapshotId=%d,
filters=%s)",
table,
startSnapshotId,
endSnapshotId,
- Spark3Util.describe(filters));
- }
-
- @Override
- public String toString() {
- return String.format(
- Locale.ROOT,
- "IcebergChangelogScan(table=%s, type=%s, fromSnapshotId=%d,
toSnapshotId=%d, filters=%s)",
- table,
- expectedSchema.asStruct(),
- startSnapshotId,
- endSnapshotId,
- Spark3Util.describe(filters));
+ filtersDesc());
}
@Override
@@ -161,14 +149,17 @@ class SparkChangelogScan implements Scan,
SupportsReportStatistics {
SparkChangelogScan that = (SparkChangelogScan) o;
return table.name().equals(that.table.name())
&& readSchema().equals(that.readSchema()) // compare Spark schemas to
ignore field IDs
- && filters.toString().equals(that.filters.toString())
+ && filtersDesc().equals(that.filtersDesc())
&& Objects.equals(startSnapshotId, that.startSnapshotId)
&& Objects.equals(endSnapshotId, that.endSnapshotId);
}
@Override
public int hashCode() {
- return Objects.hash(
- table.name(), readSchema(), filters.toString(), startSnapshotId,
endSnapshotId);
+ return Objects.hash(table.name(), readSchema(), filtersDesc(),
startSnapshotId, endSnapshotId);
+ }
+
+ private String filtersDesc() {
+ return Spark3Util.describe(filters);
}
}
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
index ee4be24618..38664ce8bb 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
@@ -162,7 +162,7 @@ class SparkCopyOnWriteScan extends
SparkPartitioningAwareScan<FileScanTask>
SparkCopyOnWriteScan that = (SparkCopyOnWriteScan) o;
return table().name().equals(that.table().name())
&& readSchema().equals(that.readSchema()) // compare Spark schemas to
ignore field ids
- &&
filterExpressions().toString().equals(that.filterExpressions().toString())
+ && filtersDesc().equals(that.filtersDesc())
&& Objects.equals(snapshotId(), that.snapshotId())
&& Objects.equals(filteredLocations, that.filteredLocations);
}
@@ -170,18 +170,14 @@ class SparkCopyOnWriteScan extends
SparkPartitioningAwareScan<FileScanTask>
@Override
public int hashCode() {
return Objects.hash(
- table().name(),
- readSchema(),
- filterExpressions().toString(),
- snapshotId(),
- filteredLocations);
+ table().name(), readSchema(), filtersDesc(), snapshotId(),
filteredLocations);
}
@Override
- public String toString() {
+ public String description() {
return String.format(
- "IcebergCopyOnWriteScan(table=%s, type=%s, filters=%s,
caseSensitive=%s)",
- table(), expectedSchema().asStruct(), filterExpressions(),
caseSensitive());
+ "IcebergCopyOnWriteScan(table=%s, filters=%s, groupedBy=%s)",
+ table(), filtersDesc(), groupingKeyDesc());
}
private Long currentSnapshotId() {
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkLocalScan.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkLocalScan.java
index c2f9707775..13806297f7 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkLocalScan.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkLocalScan.java
@@ -22,7 +22,6 @@ import java.util.List;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.spark.Spark3Util;
-import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.read.LocalScan;
import org.apache.spark.sql.types.StructType;
@@ -54,13 +53,12 @@ class SparkLocalScan implements LocalScan {
@Override
public String description() {
- return String.format("%s [filters=%s]", table,
Spark3Util.describe(filterExpressions));
+ String filtersDesc = Spark3Util.describe(filterExpressions);
+ return String.format("IcebergLocalScan(table=%s, filters=%s)", table,
filtersDesc);
}
@Override
public String toString() {
- return String.format(
- "IcebergLocalScan(table=%s, type=%s, filters=%s)",
- table, SparkSchemaUtil.convert(readSchema).asStruct(),
filterExpressions);
+ return description();
}
}
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java
index c9726518ee..a70176253b 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java
@@ -43,6 +43,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkReadConf;
+import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.types.Types.StructType;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.StructLikeSet;
@@ -250,4 +251,10 @@ abstract class SparkPartitioningAwareScan<T extends
PartitionScanTask> extends S
return keys;
}
+
+ protected String groupingKeyDesc() {
+ return groupingKeyType().fields().stream()
+ .map(NestedField::name)
+ .collect(Collectors.joining(", "));
+ }
}
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
index 106b296de0..14fe80c43d 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
@@ -153,6 +153,10 @@ abstract class SparkScan implements Scan,
SupportsReportStatistics {
return filterExpressions;
}
+ protected String filtersDesc() {
+ return Spark3Util.describe(filterExpressions);
+ }
+
protected Types.StructType groupingKeyType() {
return Types.StructType.of();
}
@@ -257,15 +261,8 @@ abstract class SparkScan implements Scan,
SupportsReportStatistics {
}
@Override
- public String description() {
- String groupingKeyFieldNamesAsString =
- groupingKeyType().fields().stream()
- .map(Types.NestedField::name)
- .collect(Collectors.joining(", "));
-
- return String.format(
- "%s (branch=%s) [filters=%s, groupedBy=%s]",
- table(), branch(), Spark3Util.describe(filterExpressions),
groupingKeyFieldNamesAsString);
+ public String toString() {
+ return description();
}
@Override
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java
index d2eb4e5a56..394c922736 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java
@@ -86,13 +86,11 @@ class SparkStagedScan extends SparkScan {
@Override
public int hashCode() {
return Objects.hash(
- table().name(), taskSetId, readSchema(), splitSize, splitSize,
openFileCost);
+ table().name(), taskSetId, readSchema(), splitSize, splitLookback,
openFileCost);
}
@Override
- public String toString() {
- return String.format(
- "IcebergStagedScan(table=%s, type=%s, taskSetID=%s, caseSensitive=%s)",
- table(), expectedSchema().asStruct(), taskSetId, caseSensitive());
+ public String description() {
+ return String.format("IcebergStagedScan(table=%s, taskSetID=%s)", table(),
taskSetId);
}
}
diff --git
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScan.java
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScan.java
index 417a84d827..d9968c6697 100644
---
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScan.java
+++
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScan.java
@@ -67,6 +67,7 @@ import org.apache.spark.sql.connector.expressions.filter.Not;
import org.apache.spark.sql.connector.expressions.filter.Or;
import org.apache.spark.sql.connector.expressions.filter.Predicate;
import org.apache.spark.sql.connector.read.Batch;
+import org.apache.spark.sql.connector.read.Scan;
import org.apache.spark.sql.connector.read.ScanBuilder;
import org.apache.spark.sql.connector.read.Statistics;
import org.apache.spark.sql.connector.read.SupportsPushDownV2Filters;
@@ -1022,6 +1023,50 @@ public class TestSparkScan extends TestBaseWithCatalog {
assertThat(scan.planInputPartitions()).hasSize(4);
}
+ @TestTemplate
+ public void testBatchQueryScanDescription() throws Exception {
+ createPartitionedTable(spark, tableName, "data");
+ SparkScanBuilder builder = scanBuilder();
+
+ withSQLConf(
+ ImmutableMap.of(SparkSQLProperties.PRESERVE_DATA_GROUPING, "true"),
+ () -> {
+ Predicate predicate1 = new Predicate("=",
expressions(fieldRef("id"), intLit(1)));
+ Predicate predicate2 = new Predicate(">",
expressions(fieldRef("id"), intLit(0)));
+ pushFilters(builder, predicate1, predicate2);
+
+ Scan scan = builder.build();
+ String description = scan.description();
+
+ assertThat(description).contains("IcebergScan");
+ assertThat(description).contains(tableName);
+ assertThat(description).contains("filters=id = 1, id > 0");
+ assertThat(description).contains("groupedBy=data");
+ });
+ }
+
+ @TestTemplate
+ public void testCopyOnWriteScanDescription() throws Exception {
+ createPartitionedTable(spark, tableName, "data");
+ SparkScanBuilder builder = scanBuilder();
+
+ withSQLConf(
+ ImmutableMap.of(SparkSQLProperties.PRESERVE_DATA_GROUPING, "true"),
+ () -> {
+ Predicate predicate1 = new Predicate("=",
expressions(fieldRef("id"), intLit(2)));
+ Predicate predicate2 = new Predicate("<",
expressions(fieldRef("id"), intLit(10)));
+ pushFilters(builder, predicate1, predicate2);
+
+ Scan scan = builder.buildCopyOnWriteScan();
+ String description = scan.description();
+
+ assertThat(description).contains("IcebergCopyOnWriteScan");
+ assertThat(description).contains(tableName);
+ assertThat(description).contains("filters=id = 2, id < 10");
+ assertThat(description).contains("groupedBy=data");
+ });
+ }
+
private SparkScanBuilder scanBuilder() throws Exception {
Table table = Spark3Util.loadIcebergTable(spark, tableName);
CaseInsensitiveStringMap options =
diff --git
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
index a984c4c826..e5a9d63b68 100644
---
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
+++
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
@@ -676,7 +676,7 @@ public class TestFilterPushDown extends TestBaseWithCatalog
{
assertThat(planAsString)
.as("Pushed filters must match")
- .contains("[filters=" + icebergFilters + ",");
+ .contains(", filters=" + icebergFilters + ",");
}
private Timestamp timestamp(String timestampAsString) {