This is an automated email from the ASF dual-hosted git repository.
RussellSpitzer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 62fe817f74 Spark 4.1: Add session configs for adaptive split sizing
and parallelism (#16088)
62fe817f74 is described below
commit 62fe817f7459a904944f5b902867db61816635ee
Author: Karuppayya <[email protected]>
AuthorDate: Thu May 14 17:30:28 2026 -0700
Spark 4.1: Add session configs for adaptive split sizing and parallelism
(#16088)
---
.../org/apache/iceberg/spark/SparkReadConf.java | 12 +++
.../apache/iceberg/spark/SparkSQLProperties.java | 9 ++
.../org/apache/iceberg/spark/source/SparkScan.java | 13 ++-
.../apache/iceberg/spark/TestSparkReadConf.java | 95 ++++++++++++++++++++++
4 files changed, 127 insertions(+), 2 deletions(-)
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
index 36c34251c3..8128babfa3 100644
--- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
+++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
@@ -279,6 +279,7 @@ public class SparkReadConf {
public boolean adaptiveSplitSizeEnabled() {
return confParser
.booleanConf()
+ .sessionConf(SparkSQLProperties.READ_ADAPTIVE_SPLIT_SIZE_ENABLED)
.tableProperty(TableProperties.ADAPTIVE_SPLIT_SIZE_ENABLED)
.defaultValue(TableProperties.ADAPTIVE_SPLIT_SIZE_ENABLED_DEFAULT)
.parse();
@@ -290,6 +291,17 @@ public class SparkReadConf {
return Math.max(defaultParallelism, numShufflePartitions);
}
+ public int splitParallelism() {
+ int parallelism =
+ confParser
+ .intConf()
+
.sessionConf(SparkSQLProperties.READ_ADAPTIVE_SPLIT_SIZE_PARALLELISM)
+ .defaultValue(parallelism())
+ .parse();
+ Preconditions.checkArgument(parallelism > 0, "Split parallelism must be >
0: %s", parallelism);
+ return parallelism;
+ }
+
public boolean distributedPlanningEnabled() {
return table instanceof SupportsDistributedScanPlanning distributed
&& distributed.allowDistributedPlanning()
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
index af549dfd8e..ddedc36c71 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
@@ -110,6 +110,15 @@ public class SparkSQLProperties {
// Prefix for custom snapshot properties
public static final String SNAPSHOT_PROPERTY_PREFIX =
"spark.sql.iceberg.snapshot-property.";
+ // Controls whether adaptive split sizing is enabled
+ public static final String READ_ADAPTIVE_SPLIT_SIZE_ENABLED =
+ "spark.sql.iceberg.read.adaptive-split-size.enabled";
+
+ // Overrides the parallelism used for adaptive split sizing. When unset, the
parallelism
+ // defaults to max(spark.default.parallelism, spark.sql.shuffle.partitions).
+ public static final String READ_ADAPTIVE_SPLIT_SIZE_PARALLELISM =
+ "spark.sql.iceberg.read.adaptive-split-size.parallelism";
+
// Controls whether to enable async micro batch planning for session
public static final String ASYNC_MICRO_BATCH_PLANNING_ENABLED =
"spark.sql.iceberg.async-micro-batch-planning-enabled";
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
index 6b80199a25..ee61523d80 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
@@ -369,8 +369,17 @@ abstract class SparkScan implements Scan,
SupportsReportStatistics {
protected long adjustSplitSize(List<? extends ScanTask> tasks, long
splitSize) {
if (readConf.splitSizeOption() == null &&
readConf.adaptiveSplitSizeEnabled()) {
long scanSize = tasks.stream().mapToLong(ScanTask::sizeBytes).sum();
- int parallelism = readConf.parallelism();
- return TableScanUtil.adjustSplitSize(scanSize, parallelism, splitSize);
+ int parallelism = readConf.splitParallelism();
+ long adjustedSplitSize = TableScanUtil.adjustSplitSize(scanSize,
parallelism, splitSize);
+ if (adjustedSplitSize != splitSize) {
+ LOG.debug(
+ "Adjusted split size from {} to {} for table {} with parallelism
{}",
+ splitSize,
+ adjustedSplitSize,
+ table().name(),
+ parallelism);
+ }
+ return adjustedSplitSize;
} else {
return splitSize;
}
diff --git
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkReadConf.java
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkReadConf.java
new file mode 100644
index 0000000000..c3fc69c8b2
--- /dev/null
+++
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkReadConf.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static
org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
+
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestSparkReadConf extends TestBaseWithCatalog {
+
+ @BeforeEach
+ public void before() {
+ super.before();
+ sql("CREATE TABLE %s (id BIGINT, data STRING) USING iceberg", tableName);
+ }
+
+ @AfterEach
+ public void after() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ }
+
+ @TestTemplate
+ public void testSplitParallelismDefault() {
+ Table table = validationCatalog.loadTable(tableIdent);
+ SparkReadConf conf = new SparkReadConf(spark, table,
CaseInsensitiveStringMap.empty());
+ assertThat(conf.splitParallelism()).isEqualTo(conf.parallelism());
+ }
+
+ @TestTemplate
+ public void testSplitParallelismSessionConf() {
+ Table table = validationCatalog.loadTable(tableIdent);
+ withSQLConf(
+ ImmutableMap.of(
+ SQLConf.SHUFFLE_PARTITIONS().key(),
+ "999",
+ SparkSQLProperties.READ_ADAPTIVE_SPLIT_SIZE_PARALLELISM,
+ "42"),
+ () -> {
+ SparkReadConf conf = new SparkReadConf(spark, table,
CaseInsensitiveStringMap.empty());
+ assertThat(conf.splitParallelism()).isEqualTo(42);
+ });
+ }
+
+ @TestTemplate
+ public void testSplitParallelismRejectsZero() {
+ Table table = validationCatalog.loadTable(tableIdent);
+ withSQLConf(
+
ImmutableMap.of(SparkSQLProperties.READ_ADAPTIVE_SPLIT_SIZE_PARALLELISM, "0"),
+ () -> {
+ SparkReadConf conf = new SparkReadConf(spark, table,
CaseInsensitiveStringMap.empty());
+ assertThatIllegalArgumentException()
+ .isThrownBy(conf::splitParallelism)
+ .withMessageContaining("Split parallelism must be > 0");
+ });
+ }
+
+ @TestTemplate
+ public void testSplitParallelismRejectsNegative() {
+ Table table = validationCatalog.loadTable(tableIdent);
+ withSQLConf(
+
ImmutableMap.of(SparkSQLProperties.READ_ADAPTIVE_SPLIT_SIZE_PARALLELISM, "-5"),
+ () -> {
+ SparkReadConf conf = new SparkReadConf(spark, table,
CaseInsensitiveStringMap.empty());
+ assertThatIllegalArgumentException()
+ .isThrownBy(conf::splitParallelism)
+ .withMessageContaining("Split parallelism must be > 0");
+ });
+ }
+}