This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 87a988fdda Spark 3.5: Parallelize reading files in snapshot and
migrate procedures (#10037)
87a988fdda is described below
commit 87a988fdda5fc68fd3b04947e000ef190fa54f61
Author: Manu Zhang <[email protected]>
AuthorDate: Thu Jun 27 00:28:00 2024 +0800
Spark 3.5: Parallelize reading files in snapshot and migrate procedures
(#10037)
---
.../org/apache/iceberg/actions/MigrateTable.java | 12 ++
.../org/apache/iceberg/actions/SnapshotTable.java | 12 ++
.../apache/iceberg/data/TableMigrationUtil.java | 57 +++++++--
docs/docs/spark-procedures.md | 4 +-
.../extensions/TestMigrateTableProcedure.java | 42 +++++++
.../extensions/TestSnapshotTableProcedure.java | 39 ++++++
.../org/apache/iceberg/spark/SparkTableUtil.java | 139 ++++++++++++++++++++-
.../spark/actions/MigrateTableSparkAction.java | 11 +-
.../spark/actions/SnapshotTableSparkAction.java | 11 +-
.../spark/procedures/MigrateTableProcedure.java | 10 +-
.../spark/procedures/SnapshotTableProcedure.java | 9 +-
.../spark/actions/TestMigrateTableAction.java | 68 ++++++++++
.../spark/actions/TestSnapshotTableAction.java | 68 ++++++++++
13 files changed, 459 insertions(+), 23 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/actions/MigrateTable.java
b/api/src/main/java/org/apache/iceberg/actions/MigrateTable.java
index 5438c4b65a..8f9e8d69c9 100644
--- a/api/src/main/java/org/apache/iceberg/actions/MigrateTable.java
+++ b/api/src/main/java/org/apache/iceberg/actions/MigrateTable.java
@@ -19,6 +19,7 @@
package org.apache.iceberg.actions;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
/** An action that migrates an existing table to Iceberg. */
public interface MigrateTable extends Action<MigrateTable,
MigrateTable.Result> {
@@ -60,6 +61,17 @@ public interface MigrateTable extends Action<MigrateTable,
MigrateTable.Result>
throw new UnsupportedOperationException("Backup table name cannot be
specified");
}
+ /**
+ * Sets the executor service to use for parallel file reading. The default
is not using executor
+ * service.
+ *
+ * @param service executor service
+ * @return this for method chaining
+ */
+ default MigrateTable executeWith(ExecutorService service) {
+ throw new UnsupportedOperationException("Setting executor service is not
supported");
+ }
+
/** The action result that contains a summary of the execution. */
interface Result {
/** Returns the number of migrated data files. */
diff --git a/api/src/main/java/org/apache/iceberg/actions/SnapshotTable.java
b/api/src/main/java/org/apache/iceberg/actions/SnapshotTable.java
index 37c600ab03..a28e94bcdb 100644
--- a/api/src/main/java/org/apache/iceberg/actions/SnapshotTable.java
+++ b/api/src/main/java/org/apache/iceberg/actions/SnapshotTable.java
@@ -19,6 +19,7 @@
package org.apache.iceberg.actions;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
/** An action that creates an independent snapshot of an existing table. */
public interface SnapshotTable extends Action<SnapshotTable,
SnapshotTable.Result> {
@@ -57,6 +58,17 @@ public interface SnapshotTable extends Action<SnapshotTable,
SnapshotTable.Resul
*/
SnapshotTable tableProperty(String key, String value);
+ /**
+ * Sets the executor service to use for parallel file reading. The default
is not using executor
+ * service.
+ *
+ * @param service executor service
+ * @return this for method chaining
+ */
+ default SnapshotTable executeWith(ExecutorService service) {
+ throw new UnsupportedOperationException("Setting executor service is not
supported");
+ }
+
/** The action result that contains a summary of the execution. */
interface Result {
/** Returns the number of imported data files. */
diff --git a/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java
b/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java
index 5834a074a1..da6f2bfd11 100644
--- a/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java
+++ b/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java
@@ -24,8 +24,6 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -44,9 +42,8 @@ import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.orc.OrcMetrics;
import org.apache.iceberg.parquet.ParquetUtil;
-import
org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
-import
org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
public class TableMigrationUtil {
private static final PathFilter HIDDEN_PATH_FILTER =
@@ -112,7 +109,46 @@ public class TableMigrationUtil {
MetricsConfig metricsSpec,
NameMapping mapping,
int parallelism) {
- ExecutorService service = null;
+ return listPartition(
+ partition,
+ partitionUri,
+ format,
+ spec,
+ conf,
+ metricsSpec,
+ mapping,
+ migrationService(parallelism));
+ }
+
+ /**
+ * Returns the data files in a partition by listing the partition location.
Metrics are read from
+ * the files and the file reading is done in parallel by a specified number
of threads.
+ *
+ * <p>For Parquet and ORC partitions, this will read metrics from the file
footer. For Avro
+ * partitions, metrics other than row count are set to null.
+ *
+ * <p>Note: certain metrics, like NaN counts, that are only supported by
Iceberg file writers but
+ * not file footers, will not be populated.
+ *
+ * @param partition map of column names to column values for the partition
+ * @param partitionUri partition location URI
+ * @param format partition format, avro, parquet or orc
+ * @param spec a partition spec
+ * @param conf a Hadoop conf
+ * @param metricsSpec a metrics conf
+ * @param mapping a name mapping
+ * @param service executor service to use for file reading
+ * @return a List of DataFile
+ */
+ public static List<DataFile> listPartition(
+ Map<String, String> partition,
+ String partitionUri,
+ String format,
+ PartitionSpec spec,
+ Configuration conf,
+ MetricsConfig metricsSpec,
+ NameMapping mapping,
+ ExecutorService service) {
try {
List<String> partitionValues =
spec.fields().stream()
@@ -130,8 +166,7 @@ public class TableMigrationUtil {
Tasks.Builder<Integer> task =
Tasks.range(fileStatus.size()).stopOnFailure().throwFailureWhenFinished();
- if (parallelism > 1) {
- service = migrationService(parallelism);
+ if (service != null) {
task.executeWith(service);
}
@@ -215,11 +250,7 @@ public class TableMigrationUtil {
.build();
}
- private static ExecutorService migrationService(int parallelism) {
- return MoreExecutors.getExitingExecutorService(
- (ThreadPoolExecutor)
- Executors.newFixedThreadPool(
- parallelism,
- new
ThreadFactoryBuilder().setNameFormat("table-migration-%d").build()));
+ public static ExecutorService migrationService(int parallelism) {
+ return parallelism == 1 ? null :
ThreadPools.newWorkerPool("table-migration", parallelism);
}
}
diff --git a/docs/docs/spark-procedures.md b/docs/docs/spark-procedures.md
index dc439c04c8..31172fb531 100644
--- a/docs/docs/spark-procedures.md
+++ b/docs/docs/spark-procedures.md
@@ -546,6 +546,7 @@ See [`migrate`](#migrate) to replace an existing table with
an Iceberg table.
| `table` | ✔️ | string | Name of the new Iceberg table to create |
| `location` | | string | Table location for the new table (delegated to
the catalog by default) |
| `properties` | ️ | map<string, string> | Properties to add to the newly
created table |
+| `parallelism` | | int | Number of threads to use for file reading
(defaults to 1) |
#### Output
@@ -588,6 +589,7 @@ By default, the original table is retained with the name
`table_BACKUP_`.
| `properties` | ️ | map<string, string> | Properties for the new Iceberg
table |
| `drop_backup` | | boolean | When true, the original table will not be
retained as backup (defaults to false) |
| `backup_table_name` | | string | Name of the table that will be retained as
backup (defaults to `table_BACKUP_`) |
+| `parallelism` | | int | Number of threads to use for file reading
(defaults to 1) |
#### Output
@@ -629,7 +631,7 @@ will then treat these files as if they are part of the set
of files owned by Ic
| `source_table` | ✔️ | string | Table where
files should come from, paths are also possible in the form of
\`file_format\`.\`path\` |
| `partition_filter` | ️ | map<string, string> | A map of
partitions in the source table to import from
|
| `check_duplicate_files` | ️ | boolean | Whether to
prevent files existing in the table from being added (defaults to true)
|
-| `parallelism` | | int | number of
threads to use for file reading (defaults to 1)
|
+| `parallelism` | | int | Number of
threads to use for file reading (defaults to 1)
|
Warning : Schema is not validated, adding files with different schema to the
Iceberg table will cause issues.
diff --git
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
index 735a3bdee8..23c08b2572 100644
---
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
+++
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
@@ -24,6 +24,7 @@ import static org.assertj.core.api.Assumptions.assumeThat;
import java.io.IOException;
import java.nio.file.Files;
+import java.util.List;
import java.util.Map;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Table;
@@ -231,4 +232,45 @@ public class TestMigrateTableProcedure extends
ExtensionsTestBase {
Object result = scalarSql("CALL %s.system.migrate('%s')", catalogName,
tableName);
assertThat(result).isEqualTo(0L);
}
+
+ @TestTemplate
+ public void testMigrateWithParallelism() throws IOException {
+ assumeThat(catalogName).isEqualToIgnoringCase("spark_catalog");
+
+ String location = Files.createTempDirectory(temp,
"junit").toFile().toString();
+ sql(
+ "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet
LOCATION '%s'",
+ tableName, location);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+ List<Object[]> result =
+ sql("CALL %s.system.migrate(table => '%s', parallelism => %d)",
catalogName, tableName, 2);
+ assertEquals("Procedure output must match", ImmutableList.of(row(2L)),
result);
+
+ assertEquals(
+ "Should have expected rows",
+ ImmutableList.of(row(1L, "a"), row(2L, "b")),
+ sql("SELECT * FROM %s ORDER BY id", tableName));
+ }
+
+ @TestTemplate
+ public void testMigrateWithInvalidParallelism() throws IOException {
+ assumeThat(catalogName).isEqualToIgnoringCase("spark_catalog");
+
+ String location = Files.createTempDirectory(temp,
"junit").toFile().toString();
+ sql(
+ "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet
LOCATION '%s'",
+ tableName, location);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+ assertThatThrownBy(
+ () ->
+ sql(
+ "CALL %s.system.migrate(table => '%s', parallelism => %d)",
+ catalogName, tableName, -1))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Parallelism should be larger than 0");
+ }
}
diff --git
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSnapshotTableProcedure.java
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSnapshotTableProcedure.java
index 3b093947ca..cb18404349 100644
---
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSnapshotTableProcedure.java
+++
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSnapshotTableProcedure.java
@@ -24,6 +24,7 @@ import static org.assertj.core.api.Assumptions.assumeThat;
import java.io.IOException;
import java.nio.file.Files;
+import java.util.List;
import java.util.Map;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Table;
@@ -223,4 +224,42 @@ public class TestSnapshotTableProcedure extends
ExtensionsTestBase {
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Cannot handle an empty identifier for argument table");
}
+
+ @TestTemplate
+ public void testSnapshotWithParallelism() throws IOException {
+ String location = Files.createTempDirectory(temp,
"junit").toFile().toString();
+ sql(
+ "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet
LOCATION '%s'",
+ sourceName, location);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", sourceName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", sourceName);
+
+ List<Object[]> result =
+ sql(
+ "CALL %s.system.snapshot(source_table => '%s', table => '%s',
parallelism => %d)",
+ catalogName, sourceName, tableName, 2);
+ assertEquals("Procedure output must match", ImmutableList.of(row(2L)),
result);
+ assertEquals(
+ "Should have expected rows",
+ ImmutableList.of(row(1L, "a"), row(2L, "b")),
+ sql("SELECT * FROM %s ORDER BY id", tableName));
+ }
+
+ @TestTemplate
+ public void testSnapshotWithInvalidParallelism() throws IOException {
+ String location = Files.createTempDirectory(temp,
"junit").toFile().toString();
+ sql(
+ "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet
LOCATION '%s'",
+ sourceName, location);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", sourceName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", sourceName);
+
+ assertThatThrownBy(
+ () ->
+ sql(
+ "CALL %s.system.snapshot(source_table => '%s', table =>
'%s', parallelism => %d)",
+ catalogName, sourceName, tableName, -1))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Parallelism should be larger than 0");
+ }
}
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
index eb27e1483d..8447dbdcea 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
@@ -28,6 +28,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -290,6 +291,24 @@ public class SparkTableUtil {
parallelism);
}
+ private static List<DataFile> listPartition(
+ SparkPartition partition,
+ PartitionSpec spec,
+ SerializableConfiguration conf,
+ MetricsConfig metricsConfig,
+ NameMapping mapping,
+ ExecutorService service) {
+ return TableMigrationUtil.listPartition(
+ partition.values,
+ partition.uri,
+ partition.format,
+ spec,
+ conf.get(),
+ metricsConfig,
+ mapping,
+ service);
+ }
+
private static SparkPartition toSparkPartition(
CatalogTablePartition partition, CatalogTable table) {
Option<URI> locationUri = partition.storage().locationUri();
@@ -388,6 +407,54 @@ public class SparkTableUtil {
spark, sourceTableIdent, targetTable, stagingDir, partitionFilter,
checkDuplicateFiles, 1);
}
+ /**
+ * Import files from an existing Spark table to an Iceberg table.
+ *
+ * <p>The import uses the Spark session to get table metadata. It assumes no
operation is going on
+ * the original and target table and thus is not thread-safe.
+ *
+ * @param spark a Spark session
+ * @param sourceTableIdent an identifier of the source Spark table
+ * @param targetTable an Iceberg table where to import the data
+ * @param stagingDir a staging directory to store temporary manifest files
+ * @param parallelism number of threads to use for file reading
+ */
+ public static void importSparkTable(
+ SparkSession spark,
+ TableIdentifier sourceTableIdent,
+ Table targetTable,
+ String stagingDir,
+ int parallelism) {
+ importSparkTable(
+ spark,
+ sourceTableIdent,
+ targetTable,
+ stagingDir,
+ TableMigrationUtil.migrationService(parallelism));
+ }
+
+ /**
+ * Import files from an existing Spark table to an Iceberg table.
+ *
+ * <p>The import uses the Spark session to get table metadata. It assumes no
operation is going on
+ * the original and target table and thus is not thread-safe.
+ *
+ * @param spark a Spark session
+ * @param sourceTableIdent an identifier of the source Spark table
+ * @param targetTable an Iceberg table where to import the data
+ * @param stagingDir a staging directory to store temporary manifest files
+ * @param service executor service to use for file reading
+ */
+ public static void importSparkTable(
+ SparkSession spark,
+ TableIdentifier sourceTableIdent,
+ Table targetTable,
+ String stagingDir,
+ ExecutorService service) {
+ importSparkTable(
+ spark, sourceTableIdent, targetTable, stagingDir,
Collections.emptyMap(), false, service);
+ }
+
/**
* Import files from an existing Spark table to an Iceberg table.
*
@@ -411,6 +478,39 @@ public class SparkTableUtil {
Map<String, String> partitionFilter,
boolean checkDuplicateFiles,
int parallelism) {
+ importSparkTable(
+ spark,
+ sourceTableIdent,
+ targetTable,
+ stagingDir,
+ partitionFilter,
+ checkDuplicateFiles,
+ TableMigrationUtil.migrationService(parallelism));
+ }
+
+ /**
+ * Import files from an existing Spark table to an Iceberg table.
+ *
+ * <p>The import uses the Spark session to get table metadata. It assumes no
operation is going on
+ * the original and target table and thus is not thread-safe.
+ *
+ * @param spark a Spark session
+ * @param sourceTableIdent an identifier of the source Spark table
+ * @param targetTable an Iceberg table where to import the data
+ * @param stagingDir a staging directory to store temporary manifest files
+ * @param partitionFilter only import partitions whose values match those in
the map, can be
+ * partially defined
+ * @param checkDuplicateFiles if true, throw exception if import results in
a duplicate data file
+ * @param service executor service to use for file reading
+ */
+ public static void importSparkTable(
+ SparkSession spark,
+ TableIdentifier sourceTableIdent,
+ Table targetTable,
+ String stagingDir,
+ Map<String, String> partitionFilter,
+ boolean checkDuplicateFiles,
+ ExecutorService service) {
SessionCatalog catalog = spark.sessionState().catalog();
String db =
@@ -431,7 +531,7 @@ public class SparkTableUtil {
if (Objects.equal(spec, PartitionSpec.unpartitioned())) {
importUnpartitionedSparkTable(
- spark, sourceTableIdentWithDB, targetTable, checkDuplicateFiles,
parallelism);
+ spark, sourceTableIdentWithDB, targetTable, checkDuplicateFiles,
service);
} else {
List<SparkPartition> sourceTablePartitions =
getPartitions(spark, sourceTableIdent, partitionFilter);
@@ -445,7 +545,7 @@ public class SparkTableUtil {
spec,
stagingDir,
checkDuplicateFiles,
- parallelism);
+ service);
}
}
} catch (AnalysisException e) {
@@ -504,7 +604,7 @@ public class SparkTableUtil {
TableIdentifier sourceTableIdent,
Table targetTable,
boolean checkDuplicateFiles,
- int parallelism) {
+ ExecutorService service) {
try {
CatalogTable sourceTable =
spark.sessionState().catalog().getTableMetadata(sourceTableIdent);
Option<String> format =
@@ -530,7 +630,7 @@ public class SparkTableUtil {
conf,
metricsConfig,
nameMapping,
- parallelism);
+ service);
if (checkDuplicateFiles) {
Dataset<Row> importedFiles =
@@ -600,6 +700,35 @@ public class SparkTableUtil {
String stagingDir,
boolean checkDuplicateFiles,
int parallelism) {
+ importSparkPartitions(
+ spark,
+ partitions,
+ targetTable,
+ spec,
+ stagingDir,
+ checkDuplicateFiles,
+ TableMigrationUtil.migrationService(parallelism));
+ }
+
+ /**
+ * Import files from given partitions to an Iceberg table.
+ *
+ * @param spark a Spark session
+ * @param partitions partitions to import
+ * @param targetTable an Iceberg table where to import the data
+ * @param spec a partition spec
+ * @param stagingDir a staging directory to store temporary manifest files
+ * @param checkDuplicateFiles if true, throw exception if import results in
a duplicate data file
+ * @param service executor service to use for file reading
+ */
+ public static void importSparkPartitions(
+ SparkSession spark,
+ List<SparkPartition> partitions,
+ Table targetTable,
+ PartitionSpec spec,
+ String stagingDir,
+ boolean checkDuplicateFiles,
+ ExecutorService service) {
Configuration conf = spark.sessionState().newHadoopConf();
SerializableConfiguration serializableConf = new
SerializableConfiguration(conf);
int listingParallelism =
@@ -627,7 +756,7 @@ public class SparkTableUtil {
serializableConf,
metricsConfig,
nameMapping,
- parallelism)
+ service)
.iterator(),
Encoders.javaSerialization(DataFile.class));
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java
index 5f3cdd3f03..bdffeb4654 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java
@@ -19,6 +19,7 @@
package org.apache.iceberg.spark.actions;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.Table;
@@ -59,6 +60,7 @@ public class MigrateTableSparkAction extends
BaseTableCreationSparkAction<Migrat
private Identifier backupIdent;
private boolean dropBackup = false;
+ private ExecutorService executorService;
MigrateTableSparkAction(
SparkSession spark, CatalogPlugin sourceCatalog, Identifier
sourceTableIdent) {
@@ -108,6 +110,12 @@ public class MigrateTableSparkAction extends
BaseTableCreationSparkAction<Migrat
return this;
}
+ @Override
+ public MigrateTableSparkAction executeWith(ExecutorService service) {
+ this.executorService = service;
+ return this;
+ }
+
@Override
public MigrateTable.Result execute() {
String desc = String.format("Migrating table %s",
destTableIdent().toString());
@@ -137,7 +145,8 @@ public class MigrateTableSparkAction extends
BaseTableCreationSparkAction<Migrat
TableIdentifier v1BackupIdent = new TableIdentifier(backupIdent.name(),
backupNamespace);
String stagingLocation = getMetadataLocation(icebergTable);
LOG.info("Generating Iceberg metadata for {} in {}", destTableIdent(),
stagingLocation);
- SparkTableUtil.importSparkTable(spark(), v1BackupIdent, icebergTable,
stagingLocation);
+ SparkTableUtil.importSparkTable(
+ spark(), v1BackupIdent, icebergTable, stagingLocation,
executorService);
LOG.info("Committing staged changes to {}", destTableIdent());
stagedTable.commitStagedChanges();
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SnapshotTableSparkAction.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SnapshotTableSparkAction.java
index 8e59c13543..5f7f408cb0 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SnapshotTableSparkAction.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SnapshotTableSparkAction.java
@@ -19,6 +19,7 @@
package org.apache.iceberg.spark.actions;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.Table;
@@ -54,6 +55,7 @@ public class SnapshotTableSparkAction extends
BaseTableCreationSparkAction<Snaps
private StagingTableCatalog destCatalog;
private Identifier destTableIdent;
private String destTableLocation = null;
+ private ExecutorService executorService;
SnapshotTableSparkAction(
SparkSession spark, CatalogPlugin sourceCatalog, Identifier
sourceTableIdent) {
@@ -98,6 +100,12 @@ public class SnapshotTableSparkAction extends
BaseTableCreationSparkAction<Snaps
return this;
}
+ @Override
+ public SnapshotTableSparkAction executeWith(ExecutorService service) {
+ this.executorService = service;
+ return this;
+ }
+
@Override
public SnapshotTable.Result execute() {
String desc = String.format("Snapshotting table %s as %s",
sourceTableIdent(), destTableIdent);
@@ -126,7 +134,8 @@ public class SnapshotTableSparkAction extends
BaseTableCreationSparkAction<Snaps
TableIdentifier v1TableIdent = v1SourceTable().identifier();
String stagingLocation = getMetadataLocation(icebergTable);
LOG.info("Generating Iceberg metadata for {} in {}", destTableIdent(),
stagingLocation);
- SparkTableUtil.importSparkTable(spark(), v1TableIdent, icebergTable,
stagingLocation);
+ SparkTableUtil.importSparkTable(
+ spark(), v1TableIdent, icebergTable, stagingLocation,
executorService);
LOG.info("Committing staged changes to {}", destTableIdent());
stagedTable.commitStagedChanges();
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java
index 37b1e3bf01..a0bd04dd99 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java
@@ -40,7 +40,8 @@ class MigrateTableProcedure extends BaseProcedure {
ProcedureParameter.required("table", DataTypes.StringType),
ProcedureParameter.optional("properties", STRING_MAP),
ProcedureParameter.optional("drop_backup", DataTypes.BooleanType),
- ProcedureParameter.optional("backup_table_name", DataTypes.StringType)
+ ProcedureParameter.optional("backup_table_name", DataTypes.StringType),
+ ProcedureParameter.optional("parallelism", DataTypes.IntegerType)
};
private static final StructType OUTPUT_TYPE =
@@ -105,6 +106,13 @@ class MigrateTableProcedure extends BaseProcedure {
migrateTableSparkAction =
migrateTableSparkAction.backupTableName(backupTableName);
}
+ if (!args.isNullAt(4)) {
+ int parallelism = args.getInt(4);
+ Preconditions.checkArgument(parallelism > 0, "Parallelism should be
larger than 0");
+ migrateTableSparkAction =
+ migrateTableSparkAction.executeWith(executorService(parallelism,
"table-migration"));
+ }
+
MigrateTable.Result result = migrateTableSparkAction.execute();
return new InternalRow[] {newInternalRow(result.migratedDataFilesCount())};
}
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/SnapshotTableProcedure.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/SnapshotTableProcedure.java
index 7a015a51e8..f709f64ebf 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/SnapshotTableProcedure.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/SnapshotTableProcedure.java
@@ -38,7 +38,8 @@ class SnapshotTableProcedure extends BaseProcedure {
ProcedureParameter.required("source_table", DataTypes.StringType),
ProcedureParameter.required("table", DataTypes.StringType),
ProcedureParameter.optional("location", DataTypes.StringType),
- ProcedureParameter.optional("properties", STRING_MAP)
+ ProcedureParameter.optional("properties", STRING_MAP),
+ ProcedureParameter.optional("parallelism", DataTypes.IntegerType)
};
private static final StructType OUTPUT_TYPE =
@@ -102,6 +103,12 @@ class SnapshotTableProcedure extends BaseProcedure {
action.tableLocation(snapshotLocation);
}
+ if (!args.isNullAt(4)) {
+ int parallelism = args.getInt(4);
+ Preconditions.checkArgument(parallelism > 0, "Parallelism should be
larger than 0");
+ action = action.executeWith(executorService(parallelism,
"table-snapshot"));
+ }
+
SnapshotTable.Result result = action.tableProperties(properties).execute();
return new InternalRow[] {newInternalRow(result.importedDataFilesCount())};
}
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestMigrateTableAction.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestMigrateTableAction.java
new file mode 100644
index 0000000000..94afa50cf4
--- /dev/null
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestMigrateTableAction.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.spark.CatalogTestBase;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestMigrateTableAction extends CatalogTestBase {
+
+ @AfterEach
+ public void removeTables() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ sql("DROP TABLE IF EXISTS %s_BACKUP_", tableName);
+ }
+
+ @TestTemplate
+ public void testMigrateWithParallelTasks() throws IOException {
+ assumeThat(catalogName).isEqualToIgnoringCase("spark_catalog");
+ String location = Files.createTempDirectory(temp,
"junit").toFile().toString();
+ sql(
+ "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet
LOCATION '%s'",
+ tableName, location);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+ AtomicInteger migrationThreadsIndex = new AtomicInteger(0);
+ SparkActions.get()
+ .migrateTable(tableName)
+ .executeWith(
+ Executors.newFixedThreadPool(
+ 4,
+ runnable -> {
+ Thread thread = new Thread(runnable);
+ thread.setName("table-migration-" +
migrationThreadsIndex.getAndIncrement());
+ thread.setDaemon(true);
+ return thread;
+ }))
+ .execute();
+ assertThat(migrationThreadsIndex.get()).isEqualTo(2);
+ }
+}
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestSnapshotTableAction.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestSnapshotTableAction.java
new file mode 100644
index 0000000000..3b6869c397
--- /dev/null
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestSnapshotTableAction.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.spark.CatalogTestBase;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestSnapshotTableAction extends CatalogTestBase {
+ private static final String sourceName = "spark_catalog.default.source";
+
+ @AfterEach
+ public void removeTables() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ sql("DROP TABLE IF EXISTS %s PURGE", sourceName);
+ }
+
+ @TestTemplate
+ public void testSnapshotWithParallelTasks() throws IOException {
+ String location = Files.createTempDirectory(temp,
"junit").toFile().toString();
+ sql(
+ "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet
LOCATION '%s'",
+ sourceName, location);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", sourceName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", sourceName);
+
+ AtomicInteger snapshotThreadsIndex = new AtomicInteger(0);
+ SparkActions.get()
+ .snapshotTable(sourceName)
+ .as(tableName)
+ .executeWith(
+ Executors.newFixedThreadPool(
+ 4,
+ runnable -> {
+ Thread thread = new Thread(runnable);
+ thread.setName("table-snapshot-" +
snapshotThreadsIndex.getAndIncrement());
+ thread.setDaemon(true);
+ return thread;
+ }))
+ .execute();
+ assertThat(snapshotThreadsIndex.get()).isEqualTo(2);
+ }
+}