This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 869301b96b Core, Spark 3.4: Adjust split size to benefit from 
parallelism (#7714)
869301b96b is described below

commit 869301b96bd3897ae11b4e8c5701f4091cd7914b
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Fri Jul 28 14:58:16 2023 -0700

    Core, Spark 3.4: Adjust split size to benefit from parallelism (#7714)
---
 core/src/main/java/org/apache/iceberg/TableProperties.java |  3 +++
 .../main/java/org/apache/iceberg/util/TableScanUtil.java   | 13 +++++++++++++
 .../java/org/apache/iceberg/util/TestTableScanUtil.java    | 14 ++++++++++++++
 .../main/java/org/apache/iceberg/spark/SparkReadConf.java  |  8 ++++++++
 .../iceberg/spark/source/SparkPartitioningAwareScan.java   |  4 ++--
 .../java/org/apache/iceberg/spark/source/SparkScan.java    | 12 ++++++++++++
 6 files changed, 52 insertions(+), 2 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java 
b/core/src/main/java/org/apache/iceberg/TableProperties.java
index b14354def6..a9116bc57f 100644
--- a/core/src/main/java/org/apache/iceberg/TableProperties.java
+++ b/core/src/main/java/org/apache/iceberg/TableProperties.java
@@ -216,6 +216,9 @@ public class TableProperties {
   public static final String SPLIT_OPEN_FILE_COST = 
"read.split.open-file-cost";
   public static final long SPLIT_OPEN_FILE_COST_DEFAULT = 4 * 1024 * 1024; // 
4MB
 
+  public static final String ADAPTIVE_SPLIT_SIZE_ENABLED = 
"read.split.adaptive-size.enabled";
+  public static final boolean ADAPTIVE_SPLIT_SIZE_ENABLED_DEFAULT = true;
+
   public static final String PARQUET_VECTORIZATION_ENABLED = 
"read.parquet.vectorization.enabled";
   public static final boolean PARQUET_VECTORIZATION_ENABLED_DEFAULT = true;
 
diff --git a/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java 
b/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java
index af3c28c81d..6e25e380dd 100644
--- a/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iceberg.util;
 
+import java.math.RoundingMode;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
@@ -42,10 +43,13 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.math.LongMath;
 import org.apache.iceberg.types.Types;
 
 public class TableScanUtil {
 
+  private static final long MIN_SPLIT_SIZE = 16 * 1024 * 1024; // 16 MB
+
   private TableScanUtil() {}
 
   public static boolean hasDeletes(CombinedScanTask task) {
@@ -246,6 +250,15 @@ public class TableScanUtil {
     return mergedTasks;
   }
 
+  public static long adjustSplitSize(long scanSize, int parallelism, long 
splitSize) {
+    // use the configured split size if it produces at least one split per slot
+    // otherwise, adjust the split size to target parallelism with a 
reasonable minimum
+    // increasing the split size may cause expensive spills and is not done 
automatically
+    long splitCount = LongMath.divide(scanSize, splitSize, 
RoundingMode.CEILING);
+    long adjustedSplitSize = Math.max(scanSize / parallelism, 
Math.min(MIN_SPLIT_SIZE, splitSize));
+    return splitCount < parallelism ? adjustedSplitSize : splitSize;
+  }
+
   private static void validatePlanningArguments(long splitSize, int lookback, 
long openFileCost) {
     Preconditions.checkArgument(splitSize > 0, "Split size must be > 0: %s", 
splitSize);
     Preconditions.checkArgument(lookback > 0, "Split planning lookback must be 
> 0: %s", lookback);
diff --git a/core/src/test/java/org/apache/iceberg/util/TestTableScanUtil.java 
b/core/src/test/java/org/apache/iceberg/util/TestTableScanUtil.java
index ee454de00c..0dff941616 100644
--- a/core/src/test/java/org/apache/iceberg/util/TestTableScanUtil.java
+++ b/core/src/test/java/org/apache/iceberg/util/TestTableScanUtil.java
@@ -240,6 +240,20 @@ public class TestTableScanUtil {
         .hasMessageStartingWith("Cannot find field");
   }
 
+  @Test
+  public void testAdaptiveSplitSize() {
+    long scanSize = 500L * 1024 * 1024 * 1024; // 500 GB
+    int parallelism = 500;
+    long smallDefaultSplitSize = 128 * 1024 * 1024; // 128 MB
+    long largeDefaultSplitSize = 2L * 1024 * 1024 * 1024; // 2 GB
+
+    long adjusted1 = TableScanUtil.adjustSplitSize(scanSize, parallelism, 
smallDefaultSplitSize);
+    assertThat(adjusted1).isEqualTo(smallDefaultSplitSize);
+
+    long adjusted2 = TableScanUtil.adjustSplitSize(scanSize, parallelism, 
largeDefaultSplitSize);
+    assertThat(adjusted2).isEqualTo(scanSize / parallelism);
+  }
+
   private PartitionScanTask taskWithPartition(
       PartitionSpec spec, StructLike partition, long sizeBytes) {
     PartitionScanTask task = Mockito.mock(PartitionScanTask.class);
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
index 421e7a07a1..85e368d8cf 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
@@ -267,4 +267,12 @@ public class SparkReadConf {
         .defaultValue(SparkSQLProperties.AGGREGATE_PUSH_DOWN_ENABLED_DEFAULT)
         .parse();
   }
+
+  public boolean adaptiveSplitSizeEnabled() {
+    return confParser
+        .booleanConf()
+        .tableProperty(TableProperties.ADAPTIVE_SPLIT_SIZE_ENABLED)
+        .defaultValue(TableProperties.ADAPTIVE_SPLIT_SIZE_ENABLED_DEFAULT)
+        .parse();
+  }
 }
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java
index 6538268697..141dd4dcba 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java
@@ -200,7 +200,7 @@ abstract class SparkPartitioningAwareScan<T extends 
PartitionScanTask> extends S
         CloseableIterable<ScanTaskGroup<T>> plannedTaskGroups =
             TableScanUtil.planTaskGroups(
                 CloseableIterable.withNoopClose(tasks()),
-                scan.targetSplitSize(),
+                adjustSplitSize(tasks(), scan.targetSplitSize()),
                 scan.splitLookback(),
                 scan.splitOpenFileCost());
         this.taskGroups = Lists.newArrayList(plannedTaskGroups);
@@ -214,7 +214,7 @@ abstract class SparkPartitioningAwareScan<T extends 
PartitionScanTask> extends S
         List<ScanTaskGroup<T>> plannedTaskGroups =
             TableScanUtil.planTaskGroups(
                 tasks(),
-                scan.targetSplitSize(),
+                adjustSplitSize(tasks(), scan.targetSplitSize()),
                 scan.splitLookback(),
                 scan.splitOpenFileCost(),
                 groupingKeyType());
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
index 65c5e04f31..535d43853f 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
+import org.apache.iceberg.ScanTask;
 import org.apache.iceberg.ScanTaskGroup;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
@@ -51,6 +52,7 @@ import 
org.apache.iceberg.spark.source.metrics.TotalPlanningDuration;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.connector.metric.CustomMetric;
@@ -221,4 +223,14 @@ abstract class SparkScan implements Scan, 
SupportsReportStatistics {
       new SkippedDataFiles()
     };
   }
+
+  protected long adjustSplitSize(List<? extends ScanTask> tasks, long 
splitSize) {
+    if (readConf.splitSizeOption() == null && 
readConf.adaptiveSplitSizeEnabled()) {
+      long scanSize = tasks.stream().mapToLong(ScanTask::sizeBytes).sum();
+      int parallelism = sparkContext.defaultParallelism();
+      return TableScanUtil.adjustSplitSize(scanSize, parallelism, splitSize);
+    } else {
+      return splitSize;
+    }
+  }
 }

Reply via email to