This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 869301b96b Core, Spark 3.4: Adjust split size to benefit from
parallelism (#7714)
869301b96b is described below
commit 869301b96bd3897ae11b4e8c5701f4091cd7914b
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Fri Jul 28 14:58:16 2023 -0700
Core, Spark 3.4: Adjust split size to benefit from parallelism (#7714)
---
core/src/main/java/org/apache/iceberg/TableProperties.java | 3 +++
.../main/java/org/apache/iceberg/util/TableScanUtil.java | 13 +++++++++++++
.../java/org/apache/iceberg/util/TestTableScanUtil.java | 14 ++++++++++++++
.../main/java/org/apache/iceberg/spark/SparkReadConf.java | 8 ++++++++
.../iceberg/spark/source/SparkPartitioningAwareScan.java | 4 ++--
.../java/org/apache/iceberg/spark/source/SparkScan.java | 12 ++++++++++++
6 files changed, 52 insertions(+), 2 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java
b/core/src/main/java/org/apache/iceberg/TableProperties.java
index b14354def6..a9116bc57f 100644
--- a/core/src/main/java/org/apache/iceberg/TableProperties.java
+++ b/core/src/main/java/org/apache/iceberg/TableProperties.java
@@ -216,6 +216,9 @@ public class TableProperties {
public static final String SPLIT_OPEN_FILE_COST =
"read.split.open-file-cost";
public static final long SPLIT_OPEN_FILE_COST_DEFAULT = 4 * 1024 * 1024; //
4MB
+ public static final String ADAPTIVE_SPLIT_SIZE_ENABLED =
"read.split.adaptive-size.enabled";
+ public static final boolean ADAPTIVE_SPLIT_SIZE_ENABLED_DEFAULT = true;
+
public static final String PARQUET_VECTORIZATION_ENABLED =
"read.parquet.vectorization.enabled";
public static final boolean PARQUET_VECTORIZATION_ENABLED_DEFAULT = true;
diff --git a/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java
b/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java
index af3c28c81d..6e25e380dd 100644
--- a/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java
@@ -18,6 +18,7 @@
*/
package org.apache.iceberg.util;
+import java.math.RoundingMode;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
@@ -42,10 +43,13 @@ import
org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.math.LongMath;
import org.apache.iceberg.types.Types;
public class TableScanUtil {
+ private static final long MIN_SPLIT_SIZE = 16 * 1024 * 1024; // 16 MB
+
private TableScanUtil() {}
public static boolean hasDeletes(CombinedScanTask task) {
@@ -246,6 +250,15 @@ public class TableScanUtil {
return mergedTasks;
}
+ public static long adjustSplitSize(long scanSize, int parallelism, long
splitSize) {
+ // use the configured split size if it produces at least one split per slot
+ // otherwise, adjust the split size to target parallelism with a
reasonable minimum
+ // increasing the split size may cause expensive spills and is not done
automatically
+ long splitCount = LongMath.divide(scanSize, splitSize,
RoundingMode.CEILING);
+ long adjustedSplitSize = Math.max(scanSize / parallelism,
Math.min(MIN_SPLIT_SIZE, splitSize));
+ return splitCount < parallelism ? adjustedSplitSize : splitSize;
+ }
+
private static void validatePlanningArguments(long splitSize, int lookback,
long openFileCost) {
Preconditions.checkArgument(splitSize > 0, "Split size must be > 0: %s",
splitSize);
Preconditions.checkArgument(lookback > 0, "Split planning lookback must be
> 0: %s", lookback);
diff --git a/core/src/test/java/org/apache/iceberg/util/TestTableScanUtil.java
b/core/src/test/java/org/apache/iceberg/util/TestTableScanUtil.java
index ee454de00c..0dff941616 100644
--- a/core/src/test/java/org/apache/iceberg/util/TestTableScanUtil.java
+++ b/core/src/test/java/org/apache/iceberg/util/TestTableScanUtil.java
@@ -240,6 +240,20 @@ public class TestTableScanUtil {
.hasMessageStartingWith("Cannot find field");
}
+ @Test
+ public void testAdaptiveSplitSize() {
+ long scanSize = 500L * 1024 * 1024 * 1024; // 500 GB
+ int parallelism = 500;
+ long smallDefaultSplitSize = 128 * 1024 * 1024; // 128 MB
+ long largeDefaultSplitSize = 2L * 1024 * 1024 * 1024; // 2 GB
+
+ long adjusted1 = TableScanUtil.adjustSplitSize(scanSize, parallelism,
smallDefaultSplitSize);
+ assertThat(adjusted1).isEqualTo(smallDefaultSplitSize);
+
+ long adjusted2 = TableScanUtil.adjustSplitSize(scanSize, parallelism,
largeDefaultSplitSize);
+ assertThat(adjusted2).isEqualTo(scanSize / parallelism);
+ }
+
private PartitionScanTask taskWithPartition(
PartitionSpec spec, StructLike partition, long sizeBytes) {
PartitionScanTask task = Mockito.mock(PartitionScanTask.class);
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
index 421e7a07a1..85e368d8cf 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
@@ -267,4 +267,12 @@ public class SparkReadConf {
.defaultValue(SparkSQLProperties.AGGREGATE_PUSH_DOWN_ENABLED_DEFAULT)
.parse();
}
+
+ public boolean adaptiveSplitSizeEnabled() {
+ return confParser
+ .booleanConf()
+ .tableProperty(TableProperties.ADAPTIVE_SPLIT_SIZE_ENABLED)
+ .defaultValue(TableProperties.ADAPTIVE_SPLIT_SIZE_ENABLED_DEFAULT)
+ .parse();
+ }
}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java
index 6538268697..141dd4dcba 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java
@@ -200,7 +200,7 @@ abstract class SparkPartitioningAwareScan<T extends
PartitionScanTask> extends S
CloseableIterable<ScanTaskGroup<T>> plannedTaskGroups =
TableScanUtil.planTaskGroups(
CloseableIterable.withNoopClose(tasks()),
- scan.targetSplitSize(),
+ adjustSplitSize(tasks(), scan.targetSplitSize()),
scan.splitLookback(),
scan.splitOpenFileCost());
this.taskGroups = Lists.newArrayList(plannedTaskGroups);
@@ -214,7 +214,7 @@ abstract class SparkPartitioningAwareScan<T extends
PartitionScanTask> extends S
List<ScanTaskGroup<T>> plannedTaskGroups =
TableScanUtil.planTaskGroups(
tasks(),
- scan.targetSplitSize(),
+ adjustSplitSize(tasks(), scan.targetSplitSize()),
scan.splitLookback(),
scan.splitOpenFileCost(),
groupingKeyType());
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
index 65c5e04f31..535d43853f 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import org.apache.iceberg.ScanTask;
import org.apache.iceberg.ScanTaskGroup;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
@@ -51,6 +52,7 @@ import
org.apache.iceberg.spark.source.metrics.TotalPlanningDuration;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.metric.CustomMetric;
@@ -221,4 +223,14 @@ abstract class SparkScan implements Scan,
SupportsReportStatistics {
new SkippedDataFiles()
};
}
+
+ protected long adjustSplitSize(List<? extends ScanTask> tasks, long
splitSize) {
+ if (readConf.splitSizeOption() == null &&
readConf.adaptiveSplitSizeEnabled()) {
+ long scanSize = tasks.stream().mapToLong(ScanTask::sizeBytes).sum();
+ int parallelism = sparkContext.defaultParallelism();
+ return TableScanUtil.adjustSplitSize(scanSize, parallelism, splitSize);
+ } else {
+ return splitSize;
+ }
+ }
}