This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 87a988fdda Spark 3.5: Parallelize reading files in snapshot and 
migrate procedures (#10037)
87a988fdda is described below

commit 87a988fdda5fc68fd3b04947e000ef190fa54f61
Author: Manu Zhang <[email protected]>
AuthorDate: Thu Jun 27 00:28:00 2024 +0800

    Spark 3.5: Parallelize reading files in snapshot and migrate procedures 
(#10037)
---
 .../org/apache/iceberg/actions/MigrateTable.java   |  12 ++
 .../org/apache/iceberg/actions/SnapshotTable.java  |  12 ++
 .../apache/iceberg/data/TableMigrationUtil.java    |  57 +++++++--
 docs/docs/spark-procedures.md                      |   4 +-
 .../extensions/TestMigrateTableProcedure.java      |  42 +++++++
 .../extensions/TestSnapshotTableProcedure.java     |  39 ++++++
 .../org/apache/iceberg/spark/SparkTableUtil.java   | 139 ++++++++++++++++++++-
 .../spark/actions/MigrateTableSparkAction.java     |  11 +-
 .../spark/actions/SnapshotTableSparkAction.java    |  11 +-
 .../spark/procedures/MigrateTableProcedure.java    |  10 +-
 .../spark/procedures/SnapshotTableProcedure.java   |   9 +-
 .../spark/actions/TestMigrateTableAction.java      |  68 ++++++++++
 .../spark/actions/TestSnapshotTableAction.java     |  68 ++++++++++
 13 files changed, 459 insertions(+), 23 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/actions/MigrateTable.java 
b/api/src/main/java/org/apache/iceberg/actions/MigrateTable.java
index 5438c4b65a..8f9e8d69c9 100644
--- a/api/src/main/java/org/apache/iceberg/actions/MigrateTable.java
+++ b/api/src/main/java/org/apache/iceberg/actions/MigrateTable.java
@@ -19,6 +19,7 @@
 package org.apache.iceberg.actions;
 
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 
 /** An action that migrates an existing table to Iceberg. */
 public interface MigrateTable extends Action<MigrateTable, 
MigrateTable.Result> {
@@ -60,6 +61,17 @@ public interface MigrateTable extends Action<MigrateTable, 
MigrateTable.Result>
     throw new UnsupportedOperationException("Backup table name cannot be 
specified");
   }
 
+  /**
+   * Sets the executor service to use for parallel file reading. The default 
is not using executor
+   * service.
+   *
+   * @param service executor service
+   * @return this for method chaining
+   */
+  default MigrateTable executeWith(ExecutorService service) {
+    throw new UnsupportedOperationException("Setting executor service is not 
supported");
+  }
+
   /** The action result that contains a summary of the execution. */
   interface Result {
     /** Returns the number of migrated data files. */
diff --git a/api/src/main/java/org/apache/iceberg/actions/SnapshotTable.java 
b/api/src/main/java/org/apache/iceberg/actions/SnapshotTable.java
index 37c600ab03..a28e94bcdb 100644
--- a/api/src/main/java/org/apache/iceberg/actions/SnapshotTable.java
+++ b/api/src/main/java/org/apache/iceberg/actions/SnapshotTable.java
@@ -19,6 +19,7 @@
 package org.apache.iceberg.actions;
 
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 
 /** An action that creates an independent snapshot of an existing table. */
 public interface SnapshotTable extends Action<SnapshotTable, 
SnapshotTable.Result> {
@@ -57,6 +58,17 @@ public interface SnapshotTable extends Action<SnapshotTable, 
SnapshotTable.Resul
    */
   SnapshotTable tableProperty(String key, String value);
 
+  /**
+   * Sets the executor service to use for parallel file reading. The default 
is not using executor
+   * service.
+   *
+   * @param service executor service
+   * @return this for method chaining
+   */
+  default SnapshotTable executeWith(ExecutorService service) {
+    throw new UnsupportedOperationException("Setting executor service is not 
supported");
+  }
+
   /** The action result that contains a summary of the execution. */
   interface Result {
     /** Returns the number of imported data files. */
diff --git a/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java 
b/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java
index 5834a074a1..da6f2bfd11 100644
--- a/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java
+++ b/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java
@@ -24,8 +24,6 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -44,9 +42,8 @@ import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.mapping.NameMapping;
 import org.apache.iceberg.orc.OrcMetrics;
 import org.apache.iceberg.parquet.ParquetUtil;
-import 
org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
-import 
org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
 
 public class TableMigrationUtil {
   private static final PathFilter HIDDEN_PATH_FILTER =
@@ -112,7 +109,46 @@ public class TableMigrationUtil {
       MetricsConfig metricsSpec,
       NameMapping mapping,
       int parallelism) {
-    ExecutorService service = null;
+    return listPartition(
+        partition,
+        partitionUri,
+        format,
+        spec,
+        conf,
+        metricsSpec,
+        mapping,
+        migrationService(parallelism));
+  }
+
+  /**
+   * Returns the data files in a partition by listing the partition location. 
Metrics are read from
+   * the files and the file reading is done in parallel by a specified number 
of threads.
+   *
+   * <p>For Parquet and ORC partitions, this will read metrics from the file 
footer. For Avro
+   * partitions, metrics other than row count are set to null.
+   *
+   * <p>Note: certain metrics, like NaN counts, that are only supported by 
Iceberg file writers but
+   * not file footers, will not be populated.
+   *
+   * @param partition map of column names to column values for the partition
+   * @param partitionUri partition location URI
+   * @param format partition format, avro, parquet or orc
+   * @param spec a partition spec
+   * @param conf a Hadoop conf
+   * @param metricsSpec a metrics conf
+   * @param mapping a name mapping
+   * @param service executor service to use for file reading
+   * @return a List of DataFile
+   */
+  public static List<DataFile> listPartition(
+      Map<String, String> partition,
+      String partitionUri,
+      String format,
+      PartitionSpec spec,
+      Configuration conf,
+      MetricsConfig metricsSpec,
+      NameMapping mapping,
+      ExecutorService service) {
     try {
       List<String> partitionValues =
           spec.fields().stream()
@@ -130,8 +166,7 @@ public class TableMigrationUtil {
       Tasks.Builder<Integer> task =
           
Tasks.range(fileStatus.size()).stopOnFailure().throwFailureWhenFinished();
 
-      if (parallelism > 1) {
-        service = migrationService(parallelism);
+      if (service != null) {
         task.executeWith(service);
       }
 
@@ -215,11 +250,7 @@ public class TableMigrationUtil {
         .build();
   }
 
-  private static ExecutorService migrationService(int parallelism) {
-    return MoreExecutors.getExitingExecutorService(
-        (ThreadPoolExecutor)
-            Executors.newFixedThreadPool(
-                parallelism,
-                new 
ThreadFactoryBuilder().setNameFormat("table-migration-%d").build()));
+  public static ExecutorService migrationService(int parallelism) {
+    return parallelism == 1 ? null : 
ThreadPools.newWorkerPool("table-migration", parallelism);
   }
 }
diff --git a/docs/docs/spark-procedures.md b/docs/docs/spark-procedures.md
index dc439c04c8..31172fb531 100644
--- a/docs/docs/spark-procedures.md
+++ b/docs/docs/spark-procedures.md
@@ -546,6 +546,7 @@ See [`migrate`](#migrate) to replace an existing table with 
an Iceberg table.
 | `table`       | ✔️  | string | Name of the new Iceberg table to create |
 | `location`    |    | string | Table location for the new table (delegated to 
the catalog by default) |
 | `properties`  | ️   | map<string, string> | Properties to add to the newly 
created table |
+| `parallelism` |    | int | Number of threads to use for file reading 
(defaults to 1) |
 
 #### Output
 
@@ -588,6 +589,7 @@ By default, the original table is retained with the name 
`table_BACKUP_`.
 | `properties`  | ️   | map<string, string> | Properties for the new Iceberg 
table |
 | `drop_backup` |   | boolean | When true, the original table will not be 
retained as backup (defaults to false) |
 | `backup_table_name` |  | string | Name of the table that will be retained as 
backup (defaults to `table_BACKUP_`) |
+| `parallelism` |   | int | Number of threads to use for file reading 
(defaults to 1) |
 
 #### Output
 
@@ -629,7 +631,7 @@ will then treat these files as if they are part of the set 
of files  owned by Ic
 | `source_table`          | ✔️        | string              | Table where 
files should come from, paths are also possible in the form of 
\`file_format\`.\`path\` |
 | `partition_filter`      | ️         | map<string, string> | A map of 
partitions in the source table to import from                                   
           |
 | `check_duplicate_files` | ️         | boolean             | Whether to 
prevent files existing in the table from being added (defaults to true)         
         |
-| `parallelism`           |           | int                 | number of 
threads to use for file reading (defaults to 1)                                 
        |
+| `parallelism`           |           | int                 | Number of 
threads to use for file reading (defaults to 1)                                 
        |
 
 Warning : Schema is not validated, adding files with different schema to the 
Iceberg table will cause issues.
 
diff --git 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
index 735a3bdee8..23c08b2572 100644
--- 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
+++ 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
@@ -24,6 +24,7 @@ import static org.assertj.core.api.Assumptions.assumeThat;
 
 import java.io.IOException;
 import java.nio.file.Files;
+import java.util.List;
 import java.util.Map;
 import org.apache.iceberg.ParameterizedTestExtension;
 import org.apache.iceberg.Table;
@@ -231,4 +232,45 @@ public class TestMigrateTableProcedure extends 
ExtensionsTestBase {
     Object result = scalarSql("CALL %s.system.migrate('%s')", catalogName, 
tableName);
     assertThat(result).isEqualTo(0L);
   }
+
+  @TestTemplate
+  public void testMigrateWithParallelism() throws IOException {
+    assumeThat(catalogName).isEqualToIgnoringCase("spark_catalog");
+
+    String location = Files.createTempDirectory(temp, 
"junit").toFile().toString();
+    sql(
+        "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet 
LOCATION '%s'",
+        tableName, location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+    List<Object[]> result =
+        sql("CALL %s.system.migrate(table => '%s', parallelism => %d)", 
catalogName, tableName, 2);
+    assertEquals("Procedure output must match", ImmutableList.of(row(2L)), 
result);
+
+    assertEquals(
+        "Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(2L, "b")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @TestTemplate
+  public void testMigrateWithInvalidParallelism() throws IOException {
+    assumeThat(catalogName).isEqualToIgnoringCase("spark_catalog");
+
+    String location = Files.createTempDirectory(temp, 
"junit").toFile().toString();
+    sql(
+        "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet 
LOCATION '%s'",
+        tableName, location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+    assertThatThrownBy(
+            () ->
+                sql(
+                    "CALL %s.system.migrate(table => '%s', parallelism => %d)",
+                    catalogName, tableName, -1))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Parallelism should be larger than 0");
+  }
 }
diff --git 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSnapshotTableProcedure.java
 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSnapshotTableProcedure.java
index 3b093947ca..cb18404349 100644
--- 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSnapshotTableProcedure.java
+++ 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSnapshotTableProcedure.java
@@ -24,6 +24,7 @@ import static org.assertj.core.api.Assumptions.assumeThat;
 
 import java.io.IOException;
 import java.nio.file.Files;
+import java.util.List;
 import java.util.Map;
 import org.apache.iceberg.ParameterizedTestExtension;
 import org.apache.iceberg.Table;
@@ -223,4 +224,42 @@ public class TestSnapshotTableProcedure extends 
ExtensionsTestBase {
         .isInstanceOf(IllegalArgumentException.class)
         .hasMessage("Cannot handle an empty identifier for argument table");
   }
+
+  @TestTemplate
+  public void testSnapshotWithParallelism() throws IOException {
+    String location = Files.createTempDirectory(temp, 
"junit").toFile().toString();
+    sql(
+        "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet 
LOCATION '%s'",
+        sourceName, location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", sourceName);
+    sql("INSERT INTO TABLE %s VALUES (2, 'b')", sourceName);
+
+    List<Object[]> result =
+        sql(
+            "CALL %s.system.snapshot(source_table => '%s', table => '%s', 
parallelism => %d)",
+            catalogName, sourceName, tableName, 2);
+    assertEquals("Procedure output must match", ImmutableList.of(row(2L)), 
result);
+    assertEquals(
+        "Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(2L, "b")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @TestTemplate
+  public void testSnapshotWithInvalidParallelism() throws IOException {
+    String location = Files.createTempDirectory(temp, 
"junit").toFile().toString();
+    sql(
+        "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet 
LOCATION '%s'",
+        sourceName, location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", sourceName);
+    sql("INSERT INTO TABLE %s VALUES (2, 'b')", sourceName);
+
+    assertThatThrownBy(
+            () ->
+                sql(
+                    "CALL %s.system.snapshot(source_table => '%s', table => 
'%s', parallelism => %d)",
+                    catalogName, sourceName, tableName, -1))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Parallelism should be larger than 0");
+  }
 }
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
index eb27e1483d..8447dbdcea 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
@@ -28,6 +28,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -290,6 +291,24 @@ public class SparkTableUtil {
         parallelism);
   }
 
+  private static List<DataFile> listPartition(
+      SparkPartition partition,
+      PartitionSpec spec,
+      SerializableConfiguration conf,
+      MetricsConfig metricsConfig,
+      NameMapping mapping,
+      ExecutorService service) {
+    return TableMigrationUtil.listPartition(
+        partition.values,
+        partition.uri,
+        partition.format,
+        spec,
+        conf.get(),
+        metricsConfig,
+        mapping,
+        service);
+  }
+
   private static SparkPartition toSparkPartition(
       CatalogTablePartition partition, CatalogTable table) {
     Option<URI> locationUri = partition.storage().locationUri();
@@ -388,6 +407,54 @@ public class SparkTableUtil {
         spark, sourceTableIdent, targetTable, stagingDir, partitionFilter, 
checkDuplicateFiles, 1);
   }
 
+  /**
+   * Import files from an existing Spark table to an Iceberg table.
+   *
+   * <p>The import uses the Spark session to get table metadata. It assumes no 
operation is going on
+   * the original and target table and thus is not thread-safe.
+   *
+   * @param spark a Spark session
+   * @param sourceTableIdent an identifier of the source Spark table
+   * @param targetTable an Iceberg table where to import the data
+   * @param stagingDir a staging directory to store temporary manifest files
+   * @param parallelism number of threads to use for file reading
+   */
+  public static void importSparkTable(
+      SparkSession spark,
+      TableIdentifier sourceTableIdent,
+      Table targetTable,
+      String stagingDir,
+      int parallelism) {
+    importSparkTable(
+        spark,
+        sourceTableIdent,
+        targetTable,
+        stagingDir,
+        TableMigrationUtil.migrationService(parallelism));
+  }
+
+  /**
+   * Import files from an existing Spark table to an Iceberg table.
+   *
+   * <p>The import uses the Spark session to get table metadata. It assumes no 
operation is going on
+   * the original and target table and thus is not thread-safe.
+   *
+   * @param spark a Spark session
+   * @param sourceTableIdent an identifier of the source Spark table
+   * @param targetTable an Iceberg table where to import the data
+   * @param stagingDir a staging directory to store temporary manifest files
+   * @param service executor service to use for file reading
+   */
+  public static void importSparkTable(
+      SparkSession spark,
+      TableIdentifier sourceTableIdent,
+      Table targetTable,
+      String stagingDir,
+      ExecutorService service) {
+    importSparkTable(
+        spark, sourceTableIdent, targetTable, stagingDir, 
Collections.emptyMap(), false, service);
+  }
+
   /**
    * Import files from an existing Spark table to an Iceberg table.
    *
@@ -411,6 +478,39 @@ public class SparkTableUtil {
       Map<String, String> partitionFilter,
       boolean checkDuplicateFiles,
       int parallelism) {
+    importSparkTable(
+        spark,
+        sourceTableIdent,
+        targetTable,
+        stagingDir,
+        partitionFilter,
+        checkDuplicateFiles,
+        TableMigrationUtil.migrationService(parallelism));
+  }
+
+  /**
+   * Import files from an existing Spark table to an Iceberg table.
+   *
+   * <p>The import uses the Spark session to get table metadata. It assumes no 
operation is going on
+   * the original and target table and thus is not thread-safe.
+   *
+   * @param spark a Spark session
+   * @param sourceTableIdent an identifier of the source Spark table
+   * @param targetTable an Iceberg table where to import the data
+   * @param stagingDir a staging directory to store temporary manifest files
+   * @param partitionFilter only import partitions whose values match those in 
the map, can be
+   *     partially defined
+   * @param checkDuplicateFiles if true, throw exception if import results in 
a duplicate data file
+   * @param service executor service to use for file reading
+   */
+  public static void importSparkTable(
+      SparkSession spark,
+      TableIdentifier sourceTableIdent,
+      Table targetTable,
+      String stagingDir,
+      Map<String, String> partitionFilter,
+      boolean checkDuplicateFiles,
+      ExecutorService service) {
     SessionCatalog catalog = spark.sessionState().catalog();
 
     String db =
@@ -431,7 +531,7 @@ public class SparkTableUtil {
 
       if (Objects.equal(spec, PartitionSpec.unpartitioned())) {
         importUnpartitionedSparkTable(
-            spark, sourceTableIdentWithDB, targetTable, checkDuplicateFiles, 
parallelism);
+            spark, sourceTableIdentWithDB, targetTable, checkDuplicateFiles, 
service);
       } else {
         List<SparkPartition> sourceTablePartitions =
             getPartitions(spark, sourceTableIdent, partitionFilter);
@@ -445,7 +545,7 @@ public class SparkTableUtil {
               spec,
               stagingDir,
               checkDuplicateFiles,
-              parallelism);
+              service);
         }
       }
     } catch (AnalysisException e) {
@@ -504,7 +604,7 @@ public class SparkTableUtil {
       TableIdentifier sourceTableIdent,
       Table targetTable,
       boolean checkDuplicateFiles,
-      int parallelism) {
+      ExecutorService service) {
     try {
       CatalogTable sourceTable = 
spark.sessionState().catalog().getTableMetadata(sourceTableIdent);
       Option<String> format =
@@ -530,7 +630,7 @@ public class SparkTableUtil {
               conf,
               metricsConfig,
               nameMapping,
-              parallelism);
+              service);
 
       if (checkDuplicateFiles) {
         Dataset<Row> importedFiles =
@@ -600,6 +700,35 @@ public class SparkTableUtil {
       String stagingDir,
       boolean checkDuplicateFiles,
       int parallelism) {
+    importSparkPartitions(
+        spark,
+        partitions,
+        targetTable,
+        spec,
+        stagingDir,
+        checkDuplicateFiles,
+        TableMigrationUtil.migrationService(parallelism));
+  }
+
+  /**
+   * Import files from given partitions to an Iceberg table.
+   *
+   * @param spark a Spark session
+   * @param partitions partitions to import
+   * @param targetTable an Iceberg table where to import the data
+   * @param spec a partition spec
+   * @param stagingDir a staging directory to store temporary manifest files
+   * @param checkDuplicateFiles if true, throw exception if import results in 
a duplicate data file
+   * @param service executor service to use for file reading
+   */
+  public static void importSparkPartitions(
+      SparkSession spark,
+      List<SparkPartition> partitions,
+      Table targetTable,
+      PartitionSpec spec,
+      String stagingDir,
+      boolean checkDuplicateFiles,
+      ExecutorService service) {
     Configuration conf = spark.sessionState().newHadoopConf();
     SerializableConfiguration serializableConf = new 
SerializableConfiguration(conf);
     int listingParallelism =
@@ -627,7 +756,7 @@ public class SparkTableUtil {
                             serializableConf,
                             metricsConfig,
                             nameMapping,
-                            parallelism)
+                            service)
                         .iterator(),
             Encoders.javaSerialization(DataFile.class));
 
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java
index 5f3cdd3f03..bdffeb4654 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java
@@ -19,6 +19,7 @@
 package org.apache.iceberg.spark.actions;
 
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.Table;
@@ -59,6 +60,7 @@ public class MigrateTableSparkAction extends 
BaseTableCreationSparkAction<Migrat
 
   private Identifier backupIdent;
   private boolean dropBackup = false;
+  private ExecutorService executorService;
 
   MigrateTableSparkAction(
       SparkSession spark, CatalogPlugin sourceCatalog, Identifier 
sourceTableIdent) {
@@ -108,6 +110,12 @@ public class MigrateTableSparkAction extends 
BaseTableCreationSparkAction<Migrat
     return this;
   }
 
+  @Override
+  public MigrateTableSparkAction executeWith(ExecutorService service) {
+    this.executorService = service;
+    return this;
+  }
+
   @Override
   public MigrateTable.Result execute() {
     String desc = String.format("Migrating table %s", 
destTableIdent().toString());
@@ -137,7 +145,8 @@ public class MigrateTableSparkAction extends 
BaseTableCreationSparkAction<Migrat
       TableIdentifier v1BackupIdent = new TableIdentifier(backupIdent.name(), 
backupNamespace);
       String stagingLocation = getMetadataLocation(icebergTable);
       LOG.info("Generating Iceberg metadata for {} in {}", destTableIdent(), 
stagingLocation);
-      SparkTableUtil.importSparkTable(spark(), v1BackupIdent, icebergTable, 
stagingLocation);
+      SparkTableUtil.importSparkTable(
+          spark(), v1BackupIdent, icebergTable, stagingLocation, 
executorService);
 
       LOG.info("Committing staged changes to {}", destTableIdent());
       stagedTable.commitStagedChanges();
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SnapshotTableSparkAction.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SnapshotTableSparkAction.java
index 8e59c13543..5f7f408cb0 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SnapshotTableSparkAction.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SnapshotTableSparkAction.java
@@ -19,6 +19,7 @@
 package org.apache.iceberg.spark.actions;
 
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.Table;
@@ -54,6 +55,7 @@ public class SnapshotTableSparkAction extends 
BaseTableCreationSparkAction<Snaps
   private StagingTableCatalog destCatalog;
   private Identifier destTableIdent;
   private String destTableLocation = null;
+  private ExecutorService executorService;
 
   SnapshotTableSparkAction(
       SparkSession spark, CatalogPlugin sourceCatalog, Identifier 
sourceTableIdent) {
@@ -98,6 +100,12 @@ public class SnapshotTableSparkAction extends 
BaseTableCreationSparkAction<Snaps
     return this;
   }
 
+  @Override
+  public SnapshotTableSparkAction executeWith(ExecutorService service) {
+    this.executorService = service;
+    return this;
+  }
+
   @Override
   public SnapshotTable.Result execute() {
     String desc = String.format("Snapshotting table %s as %s", 
sourceTableIdent(), destTableIdent);
@@ -126,7 +134,8 @@ public class SnapshotTableSparkAction extends 
BaseTableCreationSparkAction<Snaps
       TableIdentifier v1TableIdent = v1SourceTable().identifier();
       String stagingLocation = getMetadataLocation(icebergTable);
       LOG.info("Generating Iceberg metadata for {} in {}", destTableIdent(), 
stagingLocation);
-      SparkTableUtil.importSparkTable(spark(), v1TableIdent, icebergTable, 
stagingLocation);
+      SparkTableUtil.importSparkTable(
+          spark(), v1TableIdent, icebergTable, stagingLocation, 
executorService);
 
       LOG.info("Committing staged changes to {}", destTableIdent());
       stagedTable.commitStagedChanges();
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java
index 37b1e3bf01..a0bd04dd99 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java
@@ -40,7 +40,8 @@ class MigrateTableProcedure extends BaseProcedure {
         ProcedureParameter.required("table", DataTypes.StringType),
         ProcedureParameter.optional("properties", STRING_MAP),
         ProcedureParameter.optional("drop_backup", DataTypes.BooleanType),
-        ProcedureParameter.optional("backup_table_name", DataTypes.StringType)
+        ProcedureParameter.optional("backup_table_name", DataTypes.StringType),
+        ProcedureParameter.optional("parallelism", DataTypes.IntegerType)
       };
 
   private static final StructType OUTPUT_TYPE =
@@ -105,6 +106,13 @@ class MigrateTableProcedure extends BaseProcedure {
       migrateTableSparkAction = 
migrateTableSparkAction.backupTableName(backupTableName);
     }
 
+    if (!args.isNullAt(4)) {
+      int parallelism = args.getInt(4);
+      Preconditions.checkArgument(parallelism > 0, "Parallelism should be 
larger than 0");
+      migrateTableSparkAction =
+          migrateTableSparkAction.executeWith(executorService(parallelism, 
"table-migration"));
+    }
+
     MigrateTable.Result result = migrateTableSparkAction.execute();
     return new InternalRow[] {newInternalRow(result.migratedDataFilesCount())};
   }
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/SnapshotTableProcedure.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/SnapshotTableProcedure.java
index 7a015a51e8..f709f64ebf 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/SnapshotTableProcedure.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/SnapshotTableProcedure.java
@@ -38,7 +38,8 @@ class SnapshotTableProcedure extends BaseProcedure {
         ProcedureParameter.required("source_table", DataTypes.StringType),
         ProcedureParameter.required("table", DataTypes.StringType),
         ProcedureParameter.optional("location", DataTypes.StringType),
-        ProcedureParameter.optional("properties", STRING_MAP)
+        ProcedureParameter.optional("properties", STRING_MAP),
+        ProcedureParameter.optional("parallelism", DataTypes.IntegerType)
       };
 
   private static final StructType OUTPUT_TYPE =
@@ -102,6 +103,12 @@ class SnapshotTableProcedure extends BaseProcedure {
       action.tableLocation(snapshotLocation);
     }
 
+    if (!args.isNullAt(4)) {
+      int parallelism = args.getInt(4);
+      Preconditions.checkArgument(parallelism > 0, "Parallelism should be 
larger than 0");
+      action = action.executeWith(executorService(parallelism, 
"table-snapshot"));
+    }
+
     SnapshotTable.Result result = action.tableProperties(properties).execute();
     return new InternalRow[] {newInternalRow(result.importedDataFilesCount())};
   }
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestMigrateTableAction.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestMigrateTableAction.java
new file mode 100644
index 0000000000..94afa50cf4
--- /dev/null
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestMigrateTableAction.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.spark.CatalogTestBase;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestMigrateTableAction extends CatalogTestBase {
+
+  @AfterEach
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+    sql("DROP TABLE IF EXISTS %s_BACKUP_", tableName);
+  }
+
+  @TestTemplate
+  public void testMigrateWithParallelTasks() throws IOException {
+    assumeThat(catalogName).isEqualToIgnoringCase("spark_catalog");
+    String location = Files.createTempDirectory(temp, 
"junit").toFile().toString();
+    sql(
+        "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet 
LOCATION '%s'",
+        tableName, location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+    AtomicInteger migrationThreadsIndex = new AtomicInteger(0);
+    SparkActions.get()
+        .migrateTable(tableName)
+        .executeWith(
+            Executors.newFixedThreadPool(
+                4,
+                runnable -> {
+                  Thread thread = new Thread(runnable);
+                  thread.setName("table-migration-" + 
migrationThreadsIndex.getAndIncrement());
+                  thread.setDaemon(true);
+                  return thread;
+                }))
+        .execute();
+    assertThat(migrationThreadsIndex.get()).isEqualTo(2);
+  }
+}
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestSnapshotTableAction.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestSnapshotTableAction.java
new file mode 100644
index 0000000000..3b6869c397
--- /dev/null
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestSnapshotTableAction.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.spark.CatalogTestBase;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestSnapshotTableAction extends CatalogTestBase {
+  private static final String sourceName = "spark_catalog.default.source";
+
+  @AfterEach
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+    sql("DROP TABLE IF EXISTS %s PURGE", sourceName);
+  }
+
+  @TestTemplate
+  public void testSnapshotWithParallelTasks() throws IOException {
+    String location = Files.createTempDirectory(temp, 
"junit").toFile().toString();
+    sql(
+        "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet 
LOCATION '%s'",
+        sourceName, location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", sourceName);
+    sql("INSERT INTO TABLE %s VALUES (2, 'b')", sourceName);
+
+    AtomicInteger snapshotThreadsIndex = new AtomicInteger(0);
+    SparkActions.get()
+        .snapshotTable(sourceName)
+        .as(tableName)
+        .executeWith(
+            Executors.newFixedThreadPool(
+                4,
+                runnable -> {
+                  Thread thread = new Thread(runnable);
+                  thread.setName("table-snapshot-" + 
snapshotThreadsIndex.getAndIncrement());
+                  thread.setDaemon(true);
+                  return thread;
+                }))
+        .execute();
+    assertThat(snapshotThreadsIndex.get()).isEqualTo(2);
+  }
+}


Reply via email to