This is an automated email from the ASF dual-hosted git repository.

amoghj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 645ef83eec Spark 3.4: Backport Spark actions and procedures for 
RewriteTablePath (#12111)
645ef83eec is described below

commit 645ef83eec0b993dcc79310de343c04689a5c651
Author: Hongyue/Steve Zhang <[email protected]>
AuthorDate: Sun Jan 26 16:43:44 2025 -0800

    Spark 3.4: Backport Spark actions and procedures for RewriteTablePath 
(#12111)
---
 .../extensions/TestRewriteTablePathProcedure.java  |  184 ++++
 .../iceberg/spark/actions/BaseSparkAction.java     |    5 +
 .../spark/actions/RewriteTablePathSparkAction.java |  714 +++++++++++++
 .../apache/iceberg/spark/actions/SparkActions.java |    5 +
 .../procedures/RewriteTablePathProcedure.java      |  130 +++
 .../iceberg/spark/procedures/SparkProcedures.java  |    1 +
 .../spark/actions/TestRewriteTablePathsAction.java | 1080 ++++++++++++++++++++
 7 files changed, 2119 insertions(+)

diff --git 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteTablePathProcedure.java
 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteTablePathProcedure.java
new file mode 100644
index 0000000000..ae82cf5961
--- /dev/null
+++ 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteTablePathProcedure.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.RewriteTablePathUtil;
+import org.apache.iceberg.Table;
+import org.apache.spark.sql.AnalysisException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestRewriteTablePathProcedure extends SparkExtensionsTestBase {
+  @Rule public TemporaryFolder temp = new TemporaryFolder();
+
+  public String staging = null;
+  public String targetTableDir = null;
+
+  public TestRewriteTablePathProcedure(
+      String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @Before
+  public void setupTableLocation() throws Exception {
+    this.staging = temp.newFolder("staging").toURI().toString();
+    this.targetTableDir = temp.newFolder("targetTable").toURI().toString();
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", 
tableName);
+  }
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @Test
+  public void testRewriteTablePathWithPositionalArgument() {
+    Table table = validationCatalog.loadTable(tableIdent);
+    String metadataJson =
+        (((HasTableOperations) 
table).operations()).current().metadataFileLocation();
+
+    List<Object[]> result =
+        sql(
+            "CALL %s.system.rewrite_table_path('%s', '%s', '%s')",
+            catalogName, tableIdent, table.location(), targetTableDir);
+    assertThat(result).hasSize(1);
+    assertThat(result.get(0)[0])
+        .as("Should return correct latest version")
+        .isEqualTo(RewriteTablePathUtil.fileName(metadataJson));
+    assertThat(result.get(0)[1])
+        .as("Should return file_list_location")
+        .asString()
+        .startsWith(table.location())
+        .endsWith("file-list");
+    checkFileListLocationCount((String) result.get(0)[1], 1);
+  }
+
+  @Test
+  public void testRewriteTablePathWithNamedArgument() {
+    Table table = validationCatalog.loadTable(tableIdent);
+    String v0Metadata =
+        RewriteTablePathUtil.fileName(
+            (((HasTableOperations) 
table).operations()).current().metadataFileLocation());
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    String v1Metadata =
+        RewriteTablePathUtil.fileName(
+            (((HasTableOperations) 
table).operations()).refresh().metadataFileLocation());
+
+    String expectedFileListLocation = staging + "file-list";
+
+    List<Object[]> result =
+        sql(
+            "CALL %s.system.rewrite_table_path("
+                + "table => '%s', "
+                + "target_prefix => '%s', "
+                + "source_prefix => '%s', "
+                + "end_version => '%s', "
+                + "start_version => '%s', "
+                + "staging_location => '%s')",
+            catalogName,
+            tableIdent,
+            this.targetTableDir,
+            table.location(),
+            v1Metadata,
+            v0Metadata,
+            this.staging);
+    assertThat(result).hasSize(1);
+    assertThat(result.get(0)[0]).as("Should return correct latest 
version").isEqualTo(v1Metadata);
+    assertThat(result.get(0)[1])
+        .as("Should return correct file_list_location")
+        .isEqualTo(expectedFileListLocation);
+    checkFileListLocationCount((String) result.get(0)[1], 4);
+  }
+
+  @Test
+  public void testProcedureWithInvalidInput() {
+
+    assertThatThrownBy(
+            () -> sql("CALL %s.system.rewrite_table_path('%s')", catalogName, 
tableIdent))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining("Missing required parameters: 
[source_prefix,target_prefix]");
+    assertThatThrownBy(
+            () ->
+                sql(
+                    "CALL %s.system.rewrite_table_path('%s','%s')",
+                    catalogName, tableIdent, this.targetTableDir))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining("Missing required parameters: [target_prefix]");
+    assertThatThrownBy(
+            () ->
+                sql(
+                    "CALL %s.system.rewrite_table_path('%s', '%s','%s')",
+                    catalogName, "notExists", this.targetTableDir, 
this.targetTableDir))
+        .isInstanceOf(RuntimeException.class)
+        .hasMessageContaining("Couldn't load table");
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    String v0Metadata =
+        RewriteTablePathUtil.fileName(
+            (((HasTableOperations) 
table).operations()).current().metadataFileLocation());
+    assertThatThrownBy(
+            () ->
+                sql(
+                    "CALL %s.system.rewrite_table_path("
+                        + "table => '%s', "
+                        + "source_prefix => '%s', "
+                        + "target_prefix => '%s', "
+                        + "start_version => '%s')",
+                    catalogName,
+                    tableIdent,
+                    table.location(),
+                    this.targetTableDir,
+                    "v20.metadata.json"))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining(
+            "Cannot find provided version file %s in metadata log.", 
"v20.metadata.json");
+    assertThatThrownBy(
+            () ->
+                sql(
+                    "CALL %s.system.rewrite_table_path("
+                        + "table => '%s', "
+                        + "source_prefix => '%s', "
+                        + "target_prefix => '%s', "
+                        + "start_version => '%s',"
+                        + "end_version => '%s')",
+                    catalogName,
+                    tableIdent,
+                    table.location(),
+                    this.targetTableDir,
+                    v0Metadata,
+                    "v11.metadata.json"))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining(
+            "Cannot find provided version file %s in metadata log.", 
"v11.metadata.json");
+  }
+
+  private void checkFileListLocationCount(String fileListLocation, long 
expectedFileCount) {
+    long fileCount = 
spark.read().format("text").load(fileListLocation).count();
+    assertThat(fileCount).isEqualTo(expectedFileCount);
+  }
+}
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
index 13ce67cda1..65c605519d 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
@@ -140,6 +140,11 @@ abstract class BaseSparkAction<ThisT> {
     return new BaseTable(ops, metadataFileLocation);
   }
 
+  protected Table newStaticTable(String metadataFileLocation, FileIO io) {
+    StaticTableOperations ops = new 
StaticTableOperations(metadataFileLocation, io);
+    return new BaseTable(ops, metadataFileLocation);
+  }
+
   protected Dataset<FileInfo> contentFileDS(Table table) {
     return contentFileDS(table, null);
   }
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
new file mode 100644
index 0000000000..55888f7f5e
--- /dev/null
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
@@ -0,0 +1,714 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteTablePathUtil;
+import org.apache.iceberg.RewriteTablePathUtil.PositionDeleteReaderWriter;
+import org.apache.iceberg.RewriteTablePathUtil.RewriteResult;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StaticTableOperations;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableMetadata.MetadataLogEntry;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.iceberg.actions.ImmutableRewriteTablePath;
+import org.apache.iceberg.actions.RewriteTablePath;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.avro.DataWriter;
+import org.apache.iceberg.data.orc.GenericOrcReader;
+import org.apache.iceberg.data.orc.GenericOrcWriter;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.DeleteSchemaUtil;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.spark.source.SerializableTableWithSize;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.api.java.function.ForeachFunction;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.api.java.function.ReduceFunction;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.functions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+public class RewriteTablePathSparkAction extends 
BaseSparkAction<RewriteTablePath>
+    implements RewriteTablePath {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RewriteTablePathSparkAction.class);
+  private static final String RESULT_LOCATION = "file-list";
+
+  private String sourcePrefix;
+  private String targetPrefix;
+  private String startVersionName;
+  private String endVersionName;
+  private String stagingDir;
+
+  private final Table table;
+  private Broadcast<Table> tableBroadcast = null;
+
+  RewriteTablePathSparkAction(SparkSession spark, Table table) {
+    super(spark);
+    this.table = table;
+  }
+
+  @Override
+  protected RewriteTablePath self() {
+    return this;
+  }
+
+  @Override
+  public RewriteTablePath rewriteLocationPrefix(String sPrefix, String 
tPrefix) {
+    Preconditions.checkArgument(
+        sPrefix != null && !sPrefix.isEmpty(), "Source prefix('%s') cannot be 
empty.", sPrefix);
+    this.sourcePrefix = sPrefix;
+    this.targetPrefix = tPrefix;
+    return this;
+  }
+
+  @Override
+  public RewriteTablePath startVersion(String sVersion) {
+    Preconditions.checkArgument(
+        sVersion != null && !sVersion.trim().isEmpty(),
+        "Start version('%s') cannot be empty.",
+        sVersion);
+    this.startVersionName = sVersion;
+    return this;
+  }
+
+  @Override
+  public RewriteTablePath endVersion(String eVersion) {
+    Preconditions.checkArgument(
+        eVersion != null && !eVersion.trim().isEmpty(),
+        "End version('%s') cannot be empty.",
+        eVersion);
+    this.endVersionName = eVersion;
+    return this;
+  }
+
+  @Override
+  public RewriteTablePath stagingLocation(String stagingLocation) {
+    Preconditions.checkArgument(
+        stagingLocation != null && !stagingLocation.isEmpty(),
+        "Staging location('%s') cannot be empty.",
+        stagingLocation);
+    this.stagingDir = stagingLocation;
+    return this;
+  }
+
+  @Override
+  public Result execute() {
+    validateInputs();
+    JobGroupInfo info = newJobGroupInfo("REWRITE-TABLE-PATH", jobDesc());
+    return withJobGroupInfo(info, this::doExecute);
+  }
+
+  private Result doExecute() {
+    String resultLocation = rebuildMetadata();
+    return ImmutableRewriteTablePath.Result.builder()
+        .stagingLocation(stagingDir)
+        .fileListLocation(resultLocation)
+        .latestVersion(RewriteTablePathUtil.fileName(endVersionName))
+        .build();
+  }
+
+  private void validateInputs() {
+    Preconditions.checkArgument(
+        sourcePrefix != null && !sourcePrefix.isEmpty(),
+        "Source prefix('%s') cannot be empty.",
+        sourcePrefix);
+    Preconditions.checkArgument(
+        targetPrefix != null && !targetPrefix.isEmpty(),
+        "Target prefix('%s') cannot be empty.",
+        targetPrefix);
+    Preconditions.checkArgument(
+        !sourcePrefix.equals(targetPrefix),
+        "Source prefix cannot be the same as target prefix (%s)",
+        sourcePrefix);
+
+    validateAndSetEndVersion();
+    validateAndSetStartVersion();
+
+    if (stagingDir == null) {
+      stagingDir = getMetadataLocation(table) + "copy-table-staging-" + 
UUID.randomUUID() + "/";
+    } else if (!stagingDir.endsWith("/")) {
+      stagingDir = stagingDir + "/";
+    }
+  }
+
+  private void validateAndSetEndVersion() {
+    TableMetadata tableMetadata = ((HasTableOperations) 
table).operations().current();
+
+    if (endVersionName == null) {
+      LOG.info("No end version specified. Will stage all files to the latest 
table version.");
+      Preconditions.checkNotNull(
+          tableMetadata.metadataFileLocation(), "Metadata file location should 
not be null");
+      this.endVersionName = tableMetadata.metadataFileLocation();
+    } else {
+      this.endVersionName = validateVersion(tableMetadata, endVersionName);
+    }
+  }
+
+  private void validateAndSetStartVersion() {
+    TableMetadata tableMetadata = ((HasTableOperations) 
table).operations().current();
+
+    if (startVersionName != null) {
+      this.startVersionName = validateVersion(tableMetadata, startVersionName);
+    }
+  }
+
+  private String validateVersion(TableMetadata tableMetadata, String 
versionFileName) {
+    String versionFile = null;
+    if (versionInFilePath(tableMetadata.metadataFileLocation(), 
versionFileName)) {
+      versionFile = tableMetadata.metadataFileLocation();
+    }
+
+    for (MetadataLogEntry log : tableMetadata.previousFiles()) {
+      if (versionInFilePath(log.file(), versionFileName)) {
+        versionFile = log.file();
+      }
+    }
+
+    Preconditions.checkArgument(
+        versionFile != null,
+        "Cannot find provided version file %s in metadata log.",
+        versionFileName);
+    Preconditions.checkArgument(
+        fileExist(versionFile), "Version file %s does not exist.", 
versionFile);
+    return versionFile;
+  }
+
+  private boolean versionInFilePath(String path, String version) {
+    return RewriteTablePathUtil.fileName(path).equals(version);
+  }
+
+  private String jobDesc() {
+    if (startVersionName != null) {
+      return String.format(
+          "Replacing path prefixes '%s' with '%s' in the metadata files of 
table %s,"
+              + "up to version '%s'.",
+          sourcePrefix, targetPrefix, table.name(), endVersionName);
+    } else {
+      return String.format(
+          "Replacing path prefixes '%s' with '%s' in the metadata files of 
table %s,"
+              + "from version '%s' to '%s'.",
+          sourcePrefix, targetPrefix, table.name(), startVersionName, 
endVersionName);
+    }
+  }
+
+  /**
+   * Rebuild metadata in a staging location, with paths rewritten.
+   *
+   * <ul>
+   *   <li>Rebuild version files to staging
+   *   <li>Rebuild manifest list files to staging
+   *   <li>Rebuild manifest to staging
+   *   <li>Get all files needed to move
+   * </ul>
+   */
+  private String rebuildMetadata() {
+    TableMetadata startMetadata =
+        startVersionName != null
+            ? ((HasTableOperations) newStaticTable(startVersionName, 
table.io()))
+                .operations()
+                .current()
+            : null;
+    TableMetadata endMetadata =
+        ((HasTableOperations) newStaticTable(endVersionName, 
table.io())).operations().current();
+
+    Preconditions.checkArgument(
+        endMetadata.statisticsFiles() == null || 
endMetadata.statisticsFiles().isEmpty(),
+        "Statistic files are not supported yet.");
+
+    // rebuild version files
+    RewriteResult<Snapshot> rewriteVersionResult = 
rewriteVersionFiles(endMetadata);
+    Set<Snapshot> deltaSnapshots = deltaSnapshots(startMetadata, 
rewriteVersionResult.toRewrite());
+
+    Set<String> manifestsToRewrite = manifestsToRewrite(deltaSnapshots, 
startMetadata);
+    Set<Snapshot> validSnapshots =
+        Sets.difference(snapshotSet(endMetadata), snapshotSet(startMetadata));
+
+    // rebuild manifest-list files
+    RewriteResult<ManifestFile> rewriteManifestListResult =
+        validSnapshots.stream()
+            .map(snapshot -> rewriteManifestList(snapshot, endMetadata, 
manifestsToRewrite))
+            .reduce(new RewriteResult<>(), RewriteResult::append);
+
+    // rebuild manifest files
+    RewriteContentFileResult rewriteManifestResult =
+        rewriteManifests(endMetadata, rewriteManifestListResult.toRewrite());
+
+    // rebuild position delete files
+    Set<DeleteFile> deleteFiles =
+        rewriteManifestResult.toRewrite().stream()
+            .filter(e -> e instanceof DeleteFile)
+            .map(e -> (DeleteFile) e)
+            .collect(Collectors.toSet());
+    rewritePositionDeletes(endMetadata, deleteFiles);
+
+    Set<Pair<String, String>> copyPlan = Sets.newHashSet();
+    copyPlan.addAll(rewriteVersionResult.copyPlan());
+    copyPlan.addAll(rewriteManifestListResult.copyPlan());
+    copyPlan.addAll(rewriteManifestResult.copyPlan());
+
+    return saveFileList(copyPlan);
+  }
+
+  private String saveFileList(Set<Pair<String, String>> filesToMove) {
+    List<Tuple2<String, String>> fileList =
+        filesToMove.stream()
+            .map(p -> Tuple2.apply(p.first(), p.second()))
+            .collect(Collectors.toList());
+    Dataset<Tuple2<String, String>> fileListDataset =
+        spark().createDataset(fileList, Encoders.tuple(Encoders.STRING(), 
Encoders.STRING()));
+    String fileListPath = stagingDir + RESULT_LOCATION;
+    fileListDataset
+        .repartition(1)
+        .write()
+        .mode(SaveMode.Overwrite)
+        .format("csv")
+        .save(fileListPath);
+    return fileListPath;
+  }
+
+  private Set<Snapshot> deltaSnapshots(TableMetadata startMetadata, 
Set<Snapshot> allSnapshots) {
+    if (startMetadata == null) {
+      return allSnapshots;
+    } else {
+      Set<Long> startSnapshotIds =
+          
startMetadata.snapshots().stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
+      return allSnapshots.stream()
+          .filter(s -> !startSnapshotIds.contains(s.snapshotId()))
+          .collect(Collectors.toSet());
+    }
+  }
+
+  private RewriteResult<Snapshot> rewriteVersionFiles(TableMetadata 
endMetadata) {
+    RewriteResult<Snapshot> result = new RewriteResult<>();
+    result.toRewrite().addAll(endMetadata.snapshots());
+    result.copyPlan().add(rewriteVersionFile(endMetadata, endVersionName));
+
+    List<MetadataLogEntry> versions = endMetadata.previousFiles();
+    for (int i = versions.size() - 1; i >= 0; i--) {
+      String versionFilePath = versions.get(i).file();
+      if (versionFilePath.equals(startVersionName)) {
+        break;
+      }
+
+      Preconditions.checkArgument(
+          fileExist(versionFilePath),
+          String.format("Version file %s doesn't exist", versionFilePath));
+      TableMetadata tableMetadata =
+          new StaticTableOperations(versionFilePath, table.io()).current();
+
+      result.toRewrite().addAll(tableMetadata.snapshots());
+      result.copyPlan().add(rewriteVersionFile(tableMetadata, 
versionFilePath));
+    }
+
+    return result;
+  }
+
+  private Pair<String, String> rewriteVersionFile(TableMetadata metadata, 
String versionFilePath) {
+    String stagingPath = RewriteTablePathUtil.stagingPath(versionFilePath, 
stagingDir);
+    TableMetadata newTableMetadata =
+        RewriteTablePathUtil.replacePaths(metadata, sourcePrefix, 
targetPrefix);
+    TableMetadataParser.overwrite(newTableMetadata, 
table.io().newOutputFile(stagingPath));
+    return Pair.of(stagingPath, newPath(versionFilePath, sourcePrefix, 
targetPrefix));
+  }
+
+  /**
+   * Rewrite a manifest list representing a snapshot.
+   *
+   * @param snapshot snapshot represented by the manifest list
+   * @param tableMetadata metadata of table
+   * @param manifestsToRewrite filter of manifests to rewrite.
+   * @return a result including a copy plan for the manifests contained in the 
manifest list, as
+   *     well as for the manifest list itself
+   */
+  private RewriteResult<ManifestFile> rewriteManifestList(
+      Snapshot snapshot, TableMetadata tableMetadata, Set<String> 
manifestsToRewrite) {
+    RewriteResult<ManifestFile> result = new RewriteResult<>();
+
+    String path = snapshot.manifestListLocation();
+    String outputPath = RewriteTablePathUtil.stagingPath(path, stagingDir);
+    RewriteResult<ManifestFile> rewriteResult =
+        RewriteTablePathUtil.rewriteManifestList(
+            snapshot,
+            table.io(),
+            tableMetadata,
+            manifestsToRewrite,
+            sourcePrefix,
+            targetPrefix,
+            stagingDir,
+            outputPath);
+
+    result.append(rewriteResult);
+    // add the manifest list copy plan itself to the result
+    result.copyPlan().add(Pair.of(outputPath, newPath(path, sourcePrefix, 
targetPrefix)));
+    return result;
+  }
+
+  private Set<String> manifestsToRewrite(
+      Set<Snapshot> deltaSnapshots, TableMetadata startMetadata) {
+    try {
+      Table endStaticTable = newStaticTable(endVersionName, table.io());
+      Dataset<Row> lastVersionFiles = 
manifestDS(endStaticTable).select("path");
+      if (startMetadata == null) {
+        return 
Sets.newHashSet(lastVersionFiles.distinct().as(Encoders.STRING()).collectAsList());
+      } else {
+        Set<Long> deltaSnapshotIds =
+            
deltaSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
+        return Sets.newHashSet(
+            lastVersionFiles
+                .distinct()
+                .filter(
+                    functions
+                        .column(ManifestFile.SNAPSHOT_ID.name())
+                        .isInCollection(deltaSnapshotIds))
+                .as(Encoders.STRING())
+                .collectAsList());
+      }
+    } catch (Exception e) {
+      throw new UnsupportedOperationException(
+          "Unable to build the manifest files dataframe. The end version in 
use may contain invalid snapshots. "
+              + "Please choose an earlier version without invalid snapshots.",
+          e);
+    }
+  }
+
+  public static class RewriteContentFileResult extends 
RewriteResult<ContentFile<?>> {
+    public RewriteContentFileResult append(RewriteResult<ContentFile<?>> r1) {
+      this.copyPlan().addAll(r1.copyPlan());
+      this.toRewrite().addAll(r1.toRewrite());
+      return this;
+    }
+
+    public RewriteContentFileResult appendDataFile(RewriteResult<DataFile> r1) 
{
+      this.copyPlan().addAll(r1.copyPlan());
+      this.toRewrite().addAll(r1.toRewrite());
+      return this;
+    }
+
+    public RewriteContentFileResult appendDeleteFile(RewriteResult<DeleteFile> 
r1) {
+      this.copyPlan().addAll(r1.copyPlan());
+      this.toRewrite().addAll(r1.toRewrite());
+      return this;
+    }
+  }
+
+  /** Rewrite manifest files in a distributed manner and return rewritten data 
files path pairs. */
+  private RewriteContentFileResult rewriteManifests(
+      TableMetadata tableMetadata, Set<ManifestFile> toRewrite) {
+    if (toRewrite.isEmpty()) {
+      return new RewriteContentFileResult();
+    }
+
+    Encoder<ManifestFile> manifestFileEncoder = 
Encoders.javaSerialization(ManifestFile.class);
+    Dataset<ManifestFile> manifestDS =
+        spark().createDataset(Lists.newArrayList(toRewrite), 
manifestFileEncoder);
+
+    return manifestDS
+        .repartition(toRewrite.size())
+        .map(
+            toManifests(
+                tableBroadcast(),
+                stagingDir,
+                tableMetadata.formatVersion(),
+                sourcePrefix,
+                targetPrefix),
+            Encoders.bean(RewriteContentFileResult.class))
+        // duplicates are expected here as the same data file can have 
different statuses
+        // (e.g. added and deleted)
+        .reduce((ReduceFunction<RewriteContentFileResult>) 
RewriteContentFileResult::append);
+  }
+
+  private static MapFunction<ManifestFile, RewriteContentFileResult> 
toManifests(
+      Broadcast<Table> table,
+      String stagingLocation,
+      int format,
+      String sourcePrefix,
+      String targetPrefix) {
+
+    return manifestFile -> {
+      RewriteContentFileResult result = new RewriteContentFileResult();
+      switch (manifestFile.content()) {
+        case DATA:
+          result.appendDataFile(
+              writeDataManifest(
+                  manifestFile, table, stagingLocation, format, sourcePrefix, 
targetPrefix));
+          break;
+        case DELETES:
+          result.appendDeleteFile(
+              writeDeleteManifest(
+                  manifestFile, table, stagingLocation, format, sourcePrefix, 
targetPrefix));
+          break;
+        default:
+          throw new UnsupportedOperationException(
+              "Unsupported manifest type: " + manifestFile.content());
+      }
+      return result;
+    };
+  }
+
+  private static RewriteResult<DataFile> writeDataManifest(
+      ManifestFile manifestFile,
+      Broadcast<Table> table,
+      String stagingLocation,
+      int format,
+      String sourcePrefix,
+      String targetPrefix) {
+    try {
+      String stagingPath = 
RewriteTablePathUtil.stagingPath(manifestFile.path(), stagingLocation);
+      FileIO io = table.getValue().io();
+      OutputFile outputFile = io.newOutputFile(stagingPath);
+      Map<Integer, PartitionSpec> specsById = table.getValue().specs();
+      return RewriteTablePathUtil.rewriteDataManifest(
+          manifestFile, outputFile, io, format, specsById, sourcePrefix, 
targetPrefix);
+    } catch (IOException e) {
+      throw new RuntimeIOException(e);
+    }
+  }
+
+  private static RewriteResult<DeleteFile> writeDeleteManifest(
+      ManifestFile manifestFile,
+      Broadcast<Table> table,
+      String stagingLocation,
+      int format,
+      String sourcePrefix,
+      String targetPrefix) {
+    try {
+      String stagingPath = 
RewriteTablePathUtil.stagingPath(manifestFile.path(), stagingLocation);
+      FileIO io = table.getValue().io();
+      OutputFile outputFile = io.newOutputFile(stagingPath);
+      Map<Integer, PartitionSpec> specsById = table.getValue().specs();
+      return RewriteTablePathUtil.rewriteDeleteManifest(
+          manifestFile,
+          outputFile,
+          io,
+          format,
+          specsById,
+          sourcePrefix,
+          targetPrefix,
+          stagingLocation);
+    } catch (IOException e) {
+      throw new RuntimeIOException(e);
+    }
+  }
+
+  private void rewritePositionDeletes(TableMetadata metadata, Set<DeleteFile> 
toRewrite) {
+    if (toRewrite.isEmpty()) {
+      return;
+    }
+
+    Encoder<DeleteFile> deleteFileEncoder = 
Encoders.javaSerialization(DeleteFile.class);
+    Dataset<DeleteFile> deleteFileDs =
+        spark().createDataset(Lists.newArrayList(toRewrite), 
deleteFileEncoder);
+
+    PositionDeleteReaderWriter posDeleteReaderWriter = new 
SparkPositionDeleteReaderWriter();
+    deleteFileDs
+        .repartition(toRewrite.size())
+        .foreach(
+            rewritePositionDelete(
+                tableBroadcast(), sourcePrefix, targetPrefix, stagingDir, 
posDeleteReaderWriter));
+  }
+
+  private static class SparkPositionDeleteReaderWriter implements 
PositionDeleteReaderWriter {
+    @Override
+    public CloseableIterable<Record> reader(
+        InputFile inputFile, FileFormat format, PartitionSpec spec) {
+      return positionDeletesReader(inputFile, format, spec);
+    }
+
+    @Override
+    public PositionDeleteWriter<Record> writer(
+        OutputFile outputFile,
+        FileFormat format,
+        PartitionSpec spec,
+        StructLike partition,
+        Schema rowSchema)
+        throws IOException {
+      return positionDeletesWriter(outputFile, format, spec, partition, 
rowSchema);
+    }
+  }
+
+  private ForeachFunction<DeleteFile> rewritePositionDelete(
+      Broadcast<Table> tableArg,
+      String sourcePrefixArg,
+      String targetPrefixArg,
+      String stagingLocationArg,
+      PositionDeleteReaderWriter posDeleteReaderWriter) {
+    return deleteFile -> {
+      FileIO io = tableArg.getValue().io();
+      String newPath = RewriteTablePathUtil.stagingPath(deleteFile.location(), 
stagingLocationArg);
+      OutputFile outputFile = io.newOutputFile(newPath);
+      PartitionSpec spec = 
tableArg.getValue().specs().get(deleteFile.specId());
+      RewriteTablePathUtil.rewritePositionDeleteFile(
+          deleteFile,
+          outputFile,
+          io,
+          spec,
+          sourcePrefixArg,
+          targetPrefixArg,
+          posDeleteReaderWriter);
+    };
+  }
+
+  private static CloseableIterable<Record> positionDeletesReader(
+      InputFile inputFile, FileFormat format, PartitionSpec spec) {
+    Schema deleteSchema = DeleteSchemaUtil.posDeleteReadSchema(spec.schema());
+    switch (format) {
+      case AVRO:
+        return Avro.read(inputFile)
+            .project(deleteSchema)
+            .reuseContainers()
+            .createReaderFunc(DataReader::create)
+            .build();
+
+      case PARQUET:
+        return Parquet.read(inputFile)
+            .project(deleteSchema)
+            .reuseContainers()
+            .createReaderFunc(
+                fileSchema -> GenericParquetReaders.buildReader(deleteSchema, 
fileSchema))
+            .build();
+
+      case ORC:
+        return ORC.read(inputFile)
+            .project(deleteSchema)
+            .createReaderFunc(fileSchema -> 
GenericOrcReader.buildReader(deleteSchema, fileSchema))
+            .build();
+
+      default:
+        throw new UnsupportedOperationException("Unsupported file format: " + 
format);
+    }
+  }
+
+  private static PositionDeleteWriter<Record> positionDeletesWriter(
+      OutputFile outputFile,
+      FileFormat format,
+      PartitionSpec spec,
+      StructLike partition,
+      Schema rowSchema)
+      throws IOException {
+    switch (format) {
+      case AVRO:
+        return Avro.writeDeletes(outputFile)
+            .createWriterFunc(DataWriter::create)
+            .withPartition(partition)
+            .rowSchema(rowSchema)
+            .withSpec(spec)
+            .buildPositionWriter();
+      case PARQUET:
+        return Parquet.writeDeletes(outputFile)
+            .createWriterFunc(GenericParquetWriter::buildWriter)
+            .withPartition(partition)
+            .rowSchema(rowSchema)
+            .withSpec(spec)
+            .buildPositionWriter();
+      case ORC:
+        return ORC.writeDeletes(outputFile)
+            .createWriterFunc(GenericOrcWriter::buildWriter)
+            .withPartition(partition)
+            .rowSchema(rowSchema)
+            .withSpec(spec)
+            .buildPositionWriter();
+      default:
+        throw new UnsupportedOperationException("Unsupported file format: " + 
format);
+    }
+  }
+
+  private Set<Snapshot> snapshotSet(TableMetadata metadata) {
+    if (metadata == null) {
+      return Sets.newHashSet();
+    } else {
+      return Sets.newHashSet(metadata.snapshots());
+    }
+  }
+
+  private boolean fileExist(String path) {
+    if (path == null || path.trim().isEmpty()) {
+      return false;
+    }
+    return table.io().newInputFile(path).exists();
+  }
+
+  private static String newPath(String path, String sourcePrefix, String 
targetPrefix) {
+    return RewriteTablePathUtil.combinePaths(
+        targetPrefix, RewriteTablePathUtil.relativize(path, sourcePrefix));
+  }
+
+  private String getMetadataLocation(Table tbl) {
+    String currentMetadataPath =
+        ((HasTableOperations) 
tbl).operations().current().metadataFileLocation();
+    int lastIndex = currentMetadataPath.lastIndexOf(File.separator);
+    String metadataDir = "";
+    if (lastIndex != -1) {
+      metadataDir = currentMetadataPath.substring(0, lastIndex + 1);
+    }
+
+    Preconditions.checkArgument(
+        !metadataDir.isEmpty(), "Failed to get the metadata file root 
directory");
+    return metadataDir;
+  }
+
+  @VisibleForTesting
+  Broadcast<Table> tableBroadcast() {
+    if (tableBroadcast == null) {
+      this.tableBroadcast = 
sparkContext().broadcast(SerializableTableWithSize.copyOf(table));
+    }
+
+    return tableBroadcast;
+  }
+}
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
index ba9fa2e7b4..aa4ef987e7 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
@@ -108,4 +108,9 @@ public class SparkActions implements ActionsProvider {
   public RemoveDanglingDeleteFiles removeDanglingDeleteFiles(Table table) {
     return new RemoveDanglingDeletesSparkAction(spark, table);
   }
+
+  @Override
+  public RewriteTablePathSparkAction rewriteTablePath(Table table) {
+    return new RewriteTablePathSparkAction(spark, table);
+  }
 }
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteTablePathProcedure.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteTablePathProcedure.java
new file mode 100644
index 0000000000..b936dcfced
--- /dev/null
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteTablePathProcedure.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.procedures;
+
+import org.apache.iceberg.actions.RewriteTablePath;
+import org.apache.iceberg.spark.actions.RewriteTablePathSparkAction;
+import org.apache.iceberg.spark.actions.SparkActions;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+public class RewriteTablePathProcedure extends BaseProcedure {
+
+  private static final ProcedureParameter TABLE_PARAM =
+      ProcedureParameter.required("table", DataTypes.StringType);
+  private static final ProcedureParameter SOURCE_PREFIX_PARAM =
+      ProcedureParameter.required("source_prefix", DataTypes.StringType);
+  private static final ProcedureParameter TARGET_PREFIX_PARAM =
+      ProcedureParameter.required("target_prefix", DataTypes.StringType);
+  private static final ProcedureParameter START_VERSION_PARAM =
+      ProcedureParameter.optional("start_version", DataTypes.StringType);
+  private static final ProcedureParameter END_VERSION_PARM =
+      ProcedureParameter.optional("end_version", DataTypes.StringType);
+  private static final ProcedureParameter STAGING_LOCATION_PARAM =
+      ProcedureParameter.optional("staging_location", DataTypes.StringType);
+
+  private static final ProcedureParameter[] PARAMETERS =
+      new ProcedureParameter[] {
+        TABLE_PARAM,
+        SOURCE_PREFIX_PARAM,
+        TARGET_PREFIX_PARAM,
+        START_VERSION_PARAM,
+        END_VERSION_PARM,
+        STAGING_LOCATION_PARAM
+      };
+
+  private static final StructType OUTPUT_TYPE =
+      new StructType(
+          new StructField[] {
+            new StructField("latest_version", DataTypes.StringType, true, 
Metadata.empty()),
+            new StructField("file_list_location", DataTypes.StringType, true, 
Metadata.empty())
+          });
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<RewriteTablePathProcedure>() {
+      @Override
+      protected RewriteTablePathProcedure doBuild() {
+        return new RewriteTablePathProcedure(tableCatalog());
+      }
+    };
+  }
+
+  private RewriteTablePathProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), 
PARAMETERS, args);
+    Identifier tableIdent = input.ident(TABLE_PARAM);
+    String sourcePrefix = input.asString(SOURCE_PREFIX_PARAM);
+    String targetPrefix = input.asString(TARGET_PREFIX_PARAM);
+    String startVersion = input.asString(START_VERSION_PARAM, null);
+    String endVersion = input.asString(END_VERSION_PARM, null);
+    String stagingLocation = input.asString(STAGING_LOCATION_PARAM, null);
+
+    return withIcebergTable(
+        tableIdent,
+        table -> {
+          RewriteTablePathSparkAction action = 
SparkActions.get().rewriteTablePath(table);
+
+          if (startVersion != null) {
+            action.startVersion(startVersion);
+          }
+          if (endVersion != null) {
+            action.endVersion(endVersion);
+          }
+          if (stagingLocation != null) {
+            action.stagingLocation(stagingLocation);
+          }
+
+          return toOutputRows(action.rewriteLocationPrefix(sourcePrefix, 
targetPrefix).execute());
+        });
+  }
+
+  private InternalRow[] toOutputRows(RewriteTablePath.Result result) {
+    return new InternalRow[] {
+      newInternalRow(
+          UTF8String.fromString(result.latestVersion()),
+          UTF8String.fromString(result.fileListLocation()))
+    };
+  }
+
+  @Override
+  public String description() {
+    return "RewriteTablePathProcedure";
+  }
+}
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
index d636a21ddc..3539704430 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
@@ -62,6 +62,7 @@ public class SparkProcedures {
     mapBuilder.put("rewrite_position_delete_files", 
RewritePositionDeleteFilesProcedure::builder);
     mapBuilder.put("fast_forward", FastForwardBranchProcedure::builder);
     mapBuilder.put("compute_table_stats", ComputeTableStatsProcedure::builder);
+    mapBuilder.put("rewrite_table_path", RewriteTablePathProcedure::builder);
     return mapBuilder.build();
   }
 
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
new file mode 100644
index 0000000000..2766078517
--- /dev/null
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
@@ -0,0 +1,1080 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.GenericStatisticsFile;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StaticTableOperations;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.actions.ActionsProvider;
+import org.apache.iceberg.actions.RewriteTablePath;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.SparkCatalog;
+import org.apache.iceberg.spark.SparkTestBase;
+import org.apache.iceberg.spark.source.ThreeColumnRecord;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.SparkEnv;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.storage.BlockId;
+import org.apache.spark.storage.BlockInfoManager;
+import org.apache.spark.storage.BlockManager;
+import org.apache.spark.storage.BroadcastBlockId;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import scala.Tuple2;
+
+public class TestRewriteTablePathsAction extends SparkTestBase {
+
+  @Rule public TemporaryFolder temp = new TemporaryFolder();
+
+  protected ActionsProvider actions() {
+    return SparkActions.get();
+  }
+
+  private static final HadoopTables TABLES = new HadoopTables(new 
Configuration());
+  protected static final Schema SCHEMA =
+      new Schema(
+          optional(1, "c1", Types.IntegerType.get()),
+          optional(2, "c2", Types.StringType.get()),
+          optional(3, "c3", Types.StringType.get()));
+
+  protected String tableLocation = null;
+  public String staging = null;
+  public String tableDir = null;
+  public String newTableDir = null;
+  public String targetTableDir = null;
+  private Table table = null;
+
+  private final String ns = "testns";
+  private final String backupNs = "backupns";
+
+  @Before
+  public void setupTableLocation() throws Exception {
+    this.tableLocation = temp.newFolder().toURI().toString();
+    this.staging = temp.newFolder("staging").toURI().toString();
+    this.tableDir = temp.newFolder("table").toURI().toString();
+    this.newTableDir = temp.newFolder("newTable").toURI().toString();
+    this.targetTableDir = temp.newFolder("targetTable").toURI().toString();
+    this.table = createATableWith2Snapshots(tableLocation);
+    createNameSpaces();
+  }
+
+  @After
+  public void cleanupTableSetup() throws Exception {
+    dropNameSpaces();
+  }
+
+  private Table createATableWith2Snapshots(String location) {
+    return createTableWithSnapshots(location, 2);
+  }
+
+  private Table createTableWithSnapshots(String location, int snapshotNumber) {
+    return createTableWithSnapshots(location, snapshotNumber, 
Maps.newHashMap());
+  }
+
+  protected Table createTableWithSnapshots(
+      String location, int snapshotNumber, Map<String, String> properties) {
+    Table newTable = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), 
properties, location);
+
+    List<ThreeColumnRecord> records =
+        Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"));
+
+    Dataset<Row> df = spark.createDataFrame(records, 
ThreeColumnRecord.class).coalesce(1);
+
+    for (int i = 0; i < snapshotNumber; i++) {
+      df.select("c1", "c2", 
"c3").write().format("iceberg").mode("append").save(location);
+    }
+
+    return newTable;
+  }
+
+  private void createNameSpaces() {
+    sql("CREATE DATABASE IF NOT EXISTS %s", ns);
+    sql("CREATE DATABASE IF NOT EXISTS %s", backupNs);
+  }
+
+  private void dropNameSpaces() {
+    sql("DROP DATABASE IF EXISTS %s CASCADE", ns);
+    sql("DROP DATABASE IF EXISTS %s CASCADE", backupNs);
+  }
+
+  @Test
+  public void testRewritePath() throws Exception {
+    String targetTableLocation = targetTableLocation();
+
+    // check the data file location before the rebuild
+    List<String> validDataFiles =
+        spark
+            .read()
+            .format("iceberg")
+            .load(tableLocation + "#files")
+            .select("file_path")
+            .as(Encoders.STRING())
+            .collectAsList();
+    assertThat(validDataFiles.size()).isEqualTo(2);
+
+    RewriteTablePath.Result result =
+        actions()
+            .rewriteTablePath(table)
+            .rewriteLocationPrefix(tableLocation, targetTableLocation)
+            .endVersion("v3.metadata.json")
+            .execute();
+
+    assertThat(result.latestVersion()).isEqualTo("v3.metadata.json");
+
+    checkFileNum(3, 2, 2, 9, result);
+
+    // copy the metadata files and data files
+    copyTableFiles(result);
+
+    // verify the data file path after the rebuild
+    List<String> validDataFilesAfterRebuilt =
+        spark
+            .read()
+            .format("iceberg")
+            .load(targetTableLocation + "#files")
+            .select("file_path")
+            .as(Encoders.STRING())
+            .collectAsList();
+    assertThat(validDataFilesAfterRebuilt)
+        .hasSize(2)
+        .allMatch(item -> item.startsWith(targetTableLocation));
+
+    // verify data rows
+    List<Object[]> actual = rows(targetTableLocation);
+    List<Object[]> expected = rows(tableLocation);
+    assertEquals("Rows should match after copy", expected, actual);
+  }
+
+  @Test
+  public void testSameLocations() {
+    assertThatThrownBy(
+            () ->
+                actions()
+                    .rewriteTablePath(table)
+                    .rewriteLocationPrefix(tableLocation, tableLocation)
+                    .endVersion("v1.metadata.json")
+                    .execute())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Source prefix cannot be the same as target 
prefix");
+  }
+
+  @Test
+  public void testStartVersion() throws Exception {
+    RewriteTablePath.Result result =
+        actions()
+            .rewriteTablePath(table)
+            .rewriteLocationPrefix(tableLocation, targetTableLocation())
+            .startVersion("v2.metadata.json")
+            .execute();
+
+    checkFileNum(1, 1, 1, 4, result);
+
+    List<Tuple2<String, String>> paths = 
readPathPairList(result.fileListLocation());
+
+    String currentSnapshotId = 
String.valueOf(table.currentSnapshot().snapshotId());
+    assertThat(paths.stream().filter(c -> 
c._2().contains(currentSnapshotId)).count())
+        .withFailMessage("Should have the current snapshot file")
+        .isEqualTo(1);
+
+    String parentSnapshotId = 
String.valueOf(table.currentSnapshot().parentId());
+    assertThat(paths.stream().filter(c -> 
c._2().contains(parentSnapshotId)).count())
+        .withFailMessage("Should NOT have the parent snapshot file")
+        .isEqualTo(0);
+  }
+
+  @Test
+  public void testTableWith3Snapshots() throws Exception {
+    String location = newTableLocation();
+    Table tableWith3Snaps = createTableWithSnapshots(location, 3);
+    RewriteTablePath.Result result =
+        actions()
+            .rewriteTablePath(tableWith3Snaps)
+            .rewriteLocationPrefix(location, 
temp.newFolder().toURI().toString())
+            .startVersion("v2.metadata.json")
+            .execute();
+
+    checkFileNum(2, 2, 2, 8, result);
+
+    // start from the first version
+    RewriteTablePath.Result result1 =
+        actions()
+            .rewriteTablePath(tableWith3Snaps)
+            .rewriteLocationPrefix(location, 
temp.newFolder().toURI().toString())
+            .startVersion("v1.metadata.json")
+            .execute();
+
+    checkFileNum(3, 3, 3, 12, result1);
+  }
+
+  @Test
+  public void testFullTableRewritePath() throws Exception {
+    RewriteTablePath.Result result =
+        actions()
+            .rewriteTablePath(table)
+            .rewriteLocationPrefix(tableLocation, targetTableLocation())
+            .execute();
+
+    checkFileNum(3, 2, 2, 9, result);
+  }
+
+  @Test
+  public void testDeleteDataFile() throws Exception {
+    List<String> validDataFiles =
+        spark
+            .read()
+            .format("iceberg")
+            .load(table.location() + "#files")
+            .select("file_path")
+            .as(Encoders.STRING())
+            .collectAsList();
+
+    
table.newDelete().deleteFile(validDataFiles.stream().findFirst().get()).commit();
+
+    RewriteTablePath.Result result =
+        actions()
+            .rewriteTablePath(table)
+            .rewriteLocationPrefix(table.location(), targetTableLocation())
+            .stagingLocation(stagingLocation())
+            .execute();
+
+    checkFileNum(4, 3, 3, 12, result);
+
+    // copy the metadata files and data files
+    copyTableFiles(result);
+
+    // verify data rows
+    Dataset<Row> resultDF = 
spark.read().format("iceberg").load(targetTableLocation());
+    assertThat(resultDF.as(Encoders.bean(ThreeColumnRecord.class)).count())
+        .withFailMessage("There are only one row left since we deleted a data 
file")
+        .isEqualTo(1);
+  }
+
+  @Test
+  public void testPositionDeletes() throws Exception {
+    List<Pair<CharSequence, Long>> deletes =
+        Lists.newArrayList(
+            Pair.of(
+                
table.currentSnapshot().addedDataFiles(table.io()).iterator().next().location(),
+                0L));
+
+    File file = new File(removePrefix(table.location() + 
"/data/deeply/nested/deletes.parquet"));
+    DeleteFile positionDeletes =
+        FileHelpers.writeDeleteFile(
+                table, table.io().newOutputFile(file.toURI().toString()), 
deletes)
+            .first();
+
+    table.newRowDelta().addDeletes(positionDeletes).commit();
+
+    
assertThat(spark.read().format("iceberg").load(table.location()).count()).isEqualTo(1);
+
+    RewriteTablePath.Result result =
+        actions()
+            .rewriteTablePath(table)
+            .stagingLocation(stagingLocation())
+            .rewriteLocationPrefix(table.location(), targetTableLocation())
+            .execute();
+
+    // We have one more snapshot, an additional manifest list, and a new 
(delete) manifest,
+    // and an additional position delete
+    checkFileNum(4, 3, 3, 13, result);
+
+    // copy the metadata files and data files
+    copyTableFiles(result);
+
+    // Positional delete affects a single row, so only one row must remain
+    
assertThat(spark.read().format("iceberg").load(targetTableLocation()).count()).isEqualTo(1);
+  }
+
+  @Test
+  public void testPositionDeleteWithRow() throws Exception {
+    String dataFileLocation =
+        
table.currentSnapshot().addedDataFiles(table.io()).iterator().next().location();
+    List<PositionDelete<?>> deletes = Lists.newArrayList();
+    OutputFile deleteFile =
+        table
+            .io()
+            .newOutputFile(
+                new File(removePrefix(table.location() + 
"/data/deeply/nested/deletes.parquet"))
+                    .toURI()
+                    .toString());
+    deletes.add(positionDelete(SCHEMA, dataFileLocation, 0L, 1, "AAAAAAAAAA", 
"AAAA"));
+    DeleteFile positionDeletes = FileHelpers.writePosDeleteFile(table, 
deleteFile, null, deletes);
+    table.newRowDelta().addDeletes(positionDeletes).commit();
+
+    
assertThat(spark.read().format("iceberg").load(table.location()).count()).isEqualTo(1);
+
+    RewriteTablePath.Result result =
+        actions()
+            .rewriteTablePath(table)
+            .stagingLocation(stagingLocation())
+            .rewriteLocationPrefix(table.location(), targetTableLocation())
+            .execute();
+
+    // We have one more snapshot, an additional manifest list, and a new 
(delete) manifest,
+    // and an additional position delete
+    checkFileNum(4, 3, 3, 13, result);
+
+    // copy the metadata files and data files
+    copyTableFiles(result);
+
+    // check copied position delete row
+    Object[] deletedRow = (Object[]) rows(targetTableLocation() + 
"#position_deletes").get(0)[2];
+    assertEquals(
+        "Position deletes should be equal", new Object[] {1, "AAAAAAAAAA", 
"AAAA"}, deletedRow);
+
+    // Positional delete affects a single row, so only one row must remain
+    
assertThat(spark.read().format("iceberg").load(targetTableLocation()).count()).isEqualTo(1);
+  }
+
+  @Test
+  public void testPositionDeletesAcrossFiles() throws Exception {
+    Stream<DataFile> allFiles =
+        StreamSupport.stream(table.snapshots().spliterator(), false)
+            .flatMap(s -> 
StreamSupport.stream(s.addedDataFiles(table.io()).spliterator(), false));
+    List<Pair<CharSequence, Long>> deletes =
+        allFiles.map(f -> Pair.of((CharSequence) f.location(), 
0L)).collect(Collectors.toList());
+
+    // a single position delete with two entries
+    assertThat(deletes.size()).isEqualTo(2);
+
+    File file = new File(removePrefix(table.location() + 
"/data/deeply/nested/file.parquet"));
+    DeleteFile positionDeletes =
+        FileHelpers.writeDeleteFile(
+                table, table.io().newOutputFile(file.toURI().toString()), 
deletes)
+            .first();
+
+    table.newRowDelta().addDeletes(positionDeletes).commit();
+
+    
assertThat(spark.read().format("iceberg").load(table.location()).count()).isEqualTo(0);
+
+    RewriteTablePath.Result result =
+        actions()
+            .rewriteTablePath(table)
+            .stagingLocation(stagingLocation())
+            .rewriteLocationPrefix(table.location(), targetTableLocation())
+            .execute();
+
+    // We have one more snapshot, an additional manifest list, and a new 
(delete) manifest,
+    // and an additional position delete
+    checkFileNum(4, 3, 3, 13, result);
+
+    // copy the metadata files and data files
+    copyTableFiles(result);
+
+    
assertThat(spark.read().format("iceberg").load(targetTableLocation()).count()).isEqualTo(0);
+  }
+
+  @Test
+  public void testEqualityDeletes() throws Exception {
+    Table sourceTable = createTableWithSnapshots(newTableLocation(), 1);
+
+    // Add more varied data
+    List<ThreeColumnRecord> records =
+        Lists.newArrayList(
+            new ThreeColumnRecord(2, "AAAAAAAAAA", "AAAA"),
+            new ThreeColumnRecord(3, "BBBBBBBBBB", "BBBB"),
+            new ThreeColumnRecord(4, "CCCCCCCCCC", "CCCC"),
+            new ThreeColumnRecord(5, "DDDDDDDDDD", "DDDD"));
+    spark
+        .createDataFrame(records, ThreeColumnRecord.class)
+        .coalesce(1)
+        .select("c1", "c2", "c3")
+        .write()
+        .format("iceberg")
+        .mode("append")
+        .save(newTableLocation());
+
+    Schema deleteRowSchema = sourceTable.schema().select("c2");
+    Record dataDelete = GenericRecord.create(deleteRowSchema);
+    List<Record> dataDeletes =
+        Lists.newArrayList(
+            dataDelete.copy("c2", "AAAAAAAAAA"), dataDelete.copy("c2", 
"CCCCCCCCCC"));
+    File file = new File(removePrefix(sourceTable.location()) + 
"/data/deeply/nested/file.parquet");
+    DeleteFile equalityDeletes =
+        FileHelpers.writeDeleteFile(
+            sourceTable,
+            sourceTable.io().newOutputFile(file.toURI().toString()),
+            TestHelpers.Row.of(0),
+            dataDeletes,
+            deleteRowSchema);
+    sourceTable.newRowDelta().addDeletes(equalityDeletes).commit();
+
+    RewriteTablePath.Result result =
+        actions()
+            .rewriteTablePath(sourceTable)
+            .stagingLocation(stagingLocation())
+            .rewriteLocationPrefix(newTableLocation(), targetTableLocation())
+            .execute();
+
+    // We have four metadata files: for the table creation, for the initial 
snapshot, for the
+    // second append here, and for commit with equality deletes. Thus, we have 
three manifest lists.
+    // We have a data file for each snapshot (two with data, one with equality 
deletes)
+    checkFileNum(4, 3, 3, 13, result);
+
+    // copy the metadata files and data files
+    copyTableFiles(result);
+
+    // Equality deletes affect three rows, so just two rows must remain
+    
assertThat(spark.read().format("iceberg").load(targetTableLocation()).count()).isEqualTo(2);
+  }
+
+  @Test
+  public void testFullTableRewritePathWithDeletedVersionFiles() throws 
Exception {
+    String location = newTableLocation();
+    Table sourceTable = createTableWithSnapshots(location, 2);
+    // expire the first snapshot
+    Table staticTable = newStaticTable(location + "metadata/v2.metadata.json", 
table.io());
+    actions()
+        .expireSnapshots(sourceTable)
+        .expireSnapshotId(staticTable.currentSnapshot().snapshotId())
+        .execute();
+
+    // create 100 more snapshots
+    List<ThreeColumnRecord> records =
+        Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"));
+    Dataset<Row> df = spark.createDataFrame(records, 
ThreeColumnRecord.class).coalesce(1);
+    for (int i = 0; i < 100; i++) {
+      df.select("c1", "c2", 
"c3").write().format("iceberg").mode("append").save(location);
+    }
+    sourceTable.refresh();
+
+    // v1/v2/v3.metadata.json has been deleted in v104.metadata.json, and 
there is no way to find
+    // the first snapshot
+    // from the version file history
+    RewriteTablePath.Result result =
+        actions()
+            .rewriteTablePath(sourceTable)
+            .stagingLocation(stagingLocation())
+            .rewriteLocationPrefix(location, targetTableLocation())
+            .execute();
+
+    checkFileNum(101, 101, 101, 406, result);
+  }
+
+  @Test
+  public void testRewritePathWithoutSnapshot() throws Exception {
+    RewriteTablePath.Result result =
+        actions()
+            .rewriteTablePath(table)
+            .rewriteLocationPrefix(tableLocation, newTableLocation())
+            .endVersion("v1.metadata.json")
+            .execute();
+
+    // the only rebuilt file is v1.metadata.json since it contains no snapshot
+    checkFileNum(1, 0, 0, 1, result);
+  }
+
+  @Test
+  public void testExpireSnapshotBeforeRewrite() throws Exception {
+    // expire one snapshot
+    
actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute();
+
+    RewriteTablePath.Result result =
+        actions()
+            .rewriteTablePath(table)
+            .stagingLocation(stagingLocation())
+            .rewriteLocationPrefix(table.location(), targetTableLocation())
+            .execute();
+
+    checkFileNum(4, 1, 2, 9, result);
+  }
+
+  @Test
+  public void testStartSnapshotWithoutValidSnapshot() throws Exception {
+    // expire one snapshot
+    
actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute();
+
+    assertThat(((List) table.snapshots()).size())
+        .withFailMessage("1 out 2 snapshot has been removed")
+        .isEqualTo(1);
+
+    RewriteTablePath.Result result =
+        actions()
+            .rewriteTablePath(table)
+            .rewriteLocationPrefix(table.location(), targetTableLocation())
+            .stagingLocation(stagingLocation())
+            .startVersion("v2.metadata.json")
+            .execute();
+
+    // 2 metadata.json, 1 manifest list file, 1 manifest files
+    checkFileNum(2, 1, 1, 5, result);
+  }
+
+  @Test
+  public void testMoveTheVersionExpireSnapshot() throws Exception {
+    // expire one snapshot
+    
actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute();
+
+    // only move version v4, which is the version generated by snapshot 
expiration
+    RewriteTablePath.Result result =
+        actions()
+            .rewriteTablePath(table)
+            .rewriteLocationPrefix(table.location(), targetTableLocation())
+            .stagingLocation(stagingLocation())
+            .startVersion("v3.metadata.json")
+            .execute();
+
+    // only v4.metadata.json needs to move
+    checkFileNum(1, 0, 0, 1, result);
+  }
+
+  @Test
+  public void testMoveVersionWithInvalidSnapshots() throws Exception {
+    // expire one snapshot
+    
actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute();
+
+    assertThatThrownBy(
+            () ->
+                actions()
+                    .rewriteTablePath(table)
+                    .rewriteLocationPrefix(table.location(), 
newTableLocation())
+                    .stagingLocation(stagingLocation())
+                    .endVersion("v3.metadata.json")
+                    .execute())
+        .isInstanceOf(UnsupportedOperationException.class)
+        .hasMessageContaining(
+            "Unable to build the manifest files dataframe. The end version in 
use may contain invalid snapshots. "
+                + "Please choose an earlier version without invalid 
snapshots.");
+  }
+
+  @Test
+  public void testRollBack() throws Exception {
+    long secondSnapshotId = table.currentSnapshot().snapshotId();
+
+    // roll back to the first snapshot(v2)
+    
table.manageSnapshots().setCurrentSnapshot(table.currentSnapshot().parentId()).commit();
+
+    // add a new snapshot
+    List<ThreeColumnRecord> records =
+        Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"));
+    Dataset<Row> df = spark.createDataFrame(records, 
ThreeColumnRecord.class).coalesce(1);
+    df.select("c1", "c2", 
"c3").write().format("iceberg").mode("append").save(table.location());
+
+    table.refresh();
+
+    // roll back to the second snapshot(v3)
+    table.manageSnapshots().setCurrentSnapshot(secondSnapshotId).commit();
+
+    RewriteTablePath.Result result =
+        actions()
+            .rewriteTablePath(table)
+            .rewriteLocationPrefix(table.location(), newTableLocation())
+            .stagingLocation(stagingLocation())
+            .execute();
+    checkFileNum(6, 3, 3, 15, result);
+  }
+
+  @Test
+  public void testWriteAuditPublish() throws Exception {
+    // enable WAP
+    table.updateProperties().set(TableProperties.WRITE_AUDIT_PUBLISH_ENABLED, 
"true").commit();
+    spark.conf().set("spark.wap.id", "1");
+
+    // add a new snapshot without changing the current snapshot of the table
+    List<ThreeColumnRecord> records =
+        Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"));
+    Dataset<Row> df = spark.createDataFrame(records, 
ThreeColumnRecord.class).coalesce(1);
+    df.select("c1", "c2", 
"c3").write().format("iceberg").mode("append").save(table.location());
+
+    table.refresh();
+
+    RewriteTablePath.Result result =
+        actions()
+            .rewriteTablePath(table)
+            .rewriteLocationPrefix(table.location(), newTableLocation())
+            .stagingLocation(stagingLocation())
+            .execute();
+
+    // There are 3 snapshots in total, although the current snapshot is the 
second one.
+    checkFileNum(5, 3, 3, 14, result);
+  }
+
+  @Test
+  public void testSchemaChange() throws Exception {
+    // change the schema
+    table.updateSchema().addColumn("c4", Types.StringType.get()).commit();
+
+    // copy table
+    RewriteTablePath.Result result =
+        actions()
+            .rewriteTablePath(table)
+            .rewriteLocationPrefix(table.location(), newTableLocation())
+            .stagingLocation(stagingLocation())
+            .execute();
+
+    // check the result
+    checkFileNum(4, 2, 2, 10, result);
+  }
+
+  @Test
+  public void testSnapshotIdInheritanceEnabled() throws Exception {
+    String sourceTableLocation = newTableLocation();
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, "true");
+
+    Table sourceTable = createTableWithSnapshots(sourceTableLocation, 2, 
properties);
+
+    RewriteTablePath.Result result =
+        actions()
+            .rewriteTablePath(sourceTable)
+            .stagingLocation(stagingLocation())
+            .rewriteLocationPrefix(sourceTableLocation, targetTableLocation())
+            .execute();
+
+    checkFileNum(3, 2, 2, 9, result);
+  }
+
+  @Test
+  public void testMetadataCompression() throws Exception {
+    String sourceTableLocation = newTableLocation();
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(TableProperties.METADATA_COMPRESSION, "gzip");
+    Table sourceTable = createTableWithSnapshots(sourceTableLocation, 2, 
properties);
+
+    RewriteTablePath.Result result =
+        actions()
+            .rewriteTablePath(sourceTable)
+            .rewriteLocationPrefix(sourceTableLocation, targetTableLocation())
+            .endVersion("v2.gz.metadata.json")
+            .execute();
+
+    checkFileNum(2, 1, 1, 5, result);
+
+    result =
+        actions()
+            .rewriteTablePath(sourceTable)
+            .rewriteLocationPrefix(sourceTableLocation, targetTableLocation())
+            .startVersion("v1.gz.metadata.json")
+            .execute();
+
+    checkFileNum(2, 2, 2, 8, result);
+  }
+
+  @Test
+  public void testInvalidArgs() {
+    RewriteTablePath actions = actions().rewriteTablePath(table);
+
+    assertThatThrownBy(() -> actions.rewriteLocationPrefix("", null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Source prefix('') cannot be empty");
+
+    assertThatThrownBy(() -> actions.rewriteLocationPrefix(null, null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Source prefix('null') cannot be empty");
+
+    assertThatThrownBy(() -> actions.stagingLocation(""))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Staging location('') cannot be empty");
+
+    assertThatThrownBy(() -> actions.stagingLocation(null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Staging location('null') cannot be empty");
+
+    assertThatThrownBy(() -> actions.startVersion(null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Start version('null') cannot be empty");
+
+    assertThatThrownBy(() -> actions.endVersion(" "))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("End version(' ') cannot be empty");
+
+    assertThatThrownBy(() -> actions.endVersion(null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("End version('null') cannot be empty");
+  }
+
+  @Test
+  public void testStatisticFile() throws IOException {
+    String sourceTableLocation = newTableLocation();
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("format-version", "2");
+    String tableName = "v2tblwithstats";
+    Table sourceTable =
+        createMetastoreTable(sourceTableLocation, properties, "default", 
tableName, 0);
+
+    TableMetadata metadata = currentMetadata(sourceTable);
+    TableMetadata withStatistics =
+        TableMetadata.buildFrom(metadata)
+            .setStatistics(
+                43,
+                new GenericStatisticsFile(
+                    43, "/some/path/to/stats/file", 128, 27, 
ImmutableList.of()))
+            .build();
+
+    OutputFile file = 
sourceTable.io().newOutputFile(metadata.metadataFileLocation());
+    TableMetadataParser.overwrite(withStatistics, file);
+
+    assertThatThrownBy(
+            () ->
+                actions()
+                    .rewriteTablePath(sourceTable)
+                    .rewriteLocationPrefix(sourceTableLocation, 
targetTableLocation())
+                    .execute())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Statistic files are not supported yet");
+  }
+
+  @Test
+  public void testMetadataCompressionWithMetastoreTable() throws Exception {
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(TableProperties.METADATA_COMPRESSION, "gzip");
+    Table sourceTable =
+        createMetastoreTable(
+            newTableLocation(), properties, "default", 
"testMetadataCompression", 2);
+
+    TableMetadata currentMetadata = currentMetadata(sourceTable);
+
+    // set the second version as the endVersion
+    String endVersion = 
fileName(currentMetadata.previousFiles().get(1).file());
+    RewriteTablePath.Result result =
+        actions()
+            .rewriteTablePath(sourceTable)
+            .rewriteLocationPrefix(newTableLocation(), targetTableLocation())
+            .endVersion(endVersion)
+            .execute();
+
+    checkFileNum(2, 1, 1, 5, result);
+
+    // set the first version as the lastCopiedVersion
+    String firstVersion = 
fileName(currentMetadata.previousFiles().get(0).file());
+    result =
+        actions()
+            .rewriteTablePath(sourceTable)
+            .rewriteLocationPrefix(newTableLocation(), targetTableLocation())
+            .startVersion(firstVersion)
+            .execute();
+
+    checkFileNum(2, 2, 2, 8, result);
+  }
+
+  // Metastore table tests
+  @Test
+  public void testMetadataLocationChange() throws Exception {
+    Table sourceTable =
+        createMetastoreTable(newTableLocation(), Maps.newHashMap(), "default", 
"tbl", 1);
+    String metadataFilePath = 
currentMetadata(sourceTable).metadataFileLocation();
+
+    String newMetadataDir = "new-metadata-dir";
+    sourceTable
+        .updateProperties()
+        .set(TableProperties.WRITE_METADATA_LOCATION, newTableLocation() + 
newMetadataDir)
+        .commit();
+
+    spark.sql("insert into hive.default.tbl values (1, 'AAAAAAAAAA', 'AAAA')");
+    sourceTable.refresh();
+
+    // copy table
+    RewriteTablePath.Result result =
+        actions()
+            .rewriteTablePath(sourceTable)
+            .rewriteLocationPrefix(newTableLocation(), targetTableLocation())
+            .execute();
+
+    checkFileNum(4, 2, 2, 10, result);
+
+    // pick up a version from the old metadata dir as the end version
+    RewriteTablePath.Result result1 =
+        actions()
+            .rewriteTablePath(sourceTable)
+            .rewriteLocationPrefix(newTableLocation(), targetTableLocation())
+            .endVersion(fileName(metadataFilePath))
+            .execute();
+
+    checkFileNum(2, 1, 1, 5, result1);
+
+    // pick up a version from the old metadata dir as the last copied version
+    RewriteTablePath.Result result2 =
+        actions()
+            .rewriteTablePath(sourceTable)
+            .rewriteLocationPrefix(newTableLocation(), targetTableLocation())
+            .startVersion(fileName(metadataFilePath))
+            .execute();
+
+    checkFileNum(2, 1, 1, 5, result2);
+  }
+
+  @Test
+  public void testDeleteFrom() throws Exception {
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("format-version", "2");
+    properties.put("write.delete.mode", "merge-on-read");
+    String tableName = "v2tbl";
+    Table sourceTable =
+        createMetastoreTable(newTableLocation(), properties, "default", 
tableName, 0);
+    // ingest data
+    List<ThreeColumnRecord> records =
+        Lists.newArrayList(
+            new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"),
+            new ThreeColumnRecord(2, "AAAAAAAAAA", "AAAA"),
+            new ThreeColumnRecord(3, "AAAAAAAAAA", "AAAA"));
+
+    Dataset<Row> df = spark.createDataFrame(records, 
ThreeColumnRecord.class).coalesce(1);
+
+    df.select("c1", "c2", "c3")
+        .write()
+        .format("iceberg")
+        .mode("append")
+        .saveAsTable("hive.default." + tableName);
+    sourceTable.refresh();
+
+    // generate position delete files
+    spark.sql(String.format("delete from hive.default.%s where c1 = 1", 
tableName));
+    sourceTable.refresh();
+
+    List<Object[]> originalData =
+        rowsToJava(
+            spark
+                .read()
+                .format("iceberg")
+                .load("hive.default." + tableName)
+                .sort("c1", "c2", "c3")
+                .collectAsList());
+    // two rows
+    assertThat(originalData.size()).isEqualTo(2);
+
+    // copy table and check the results
+    RewriteTablePath.Result result =
+        actions()
+            .rewriteTablePath(sourceTable)
+            .rewriteLocationPrefix(newTableLocation(), targetTableLocation())
+            .execute();
+
+    checkFileNum(3, 2, 2, 9, result);
+    // one data and one metadata file
+    copyTableFiles(result);
+
+    // register table
+    String metadataLocation = 
currentMetadata(sourceTable).metadataFileLocation();
+    String versionFile = fileName(metadataLocation);
+    String targetTableName = "copiedV2Table";
+    TableIdentifier tableIdentifier = TableIdentifier.of("default", 
targetTableName);
+    catalog.registerTable(tableIdentifier, targetTableLocation() + 
"/metadata/" + versionFile);
+
+    List<Object[]> copiedData =
+        rowsToJava(
+            spark
+                .read()
+                .format("iceberg")
+                .load("hive.default." + targetTableName)
+                .sort("c1", "c2", "c3")
+                .collectAsList());
+
+    assertEquals("Rows must match", originalData, copiedData);
+  }
+
+  @Test
+  public void testKryoDeserializeBroadcastValues() {
+    sparkContext.getConf().set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer");
+    RewriteTablePathSparkAction action =
+        (RewriteTablePathSparkAction) actions().rewriteTablePath(table);
+    Broadcast<Table> tableBroadcast = action.tableBroadcast();
+    // force deserializing broadcast values
+    removeBroadcastValuesFromLocalBlockManager(tableBroadcast.id());
+    assertThat(tableBroadcast.getValue().uuid()).isEqualTo(table.uuid());
+  }
+
+  protected void checkFileNum(
+      int versionFileCount,
+      int manifestListCount,
+      int manifestFileCount,
+      int totalCount,
+      RewriteTablePath.Result result) {
+    List<String> filesToMove =
+        spark
+            .read()
+            .format("text")
+            .load(result.fileListLocation())
+            .as(Encoders.STRING())
+            .collectAsList();
+    assertThat(filesToMove.stream().filter(f -> 
f.endsWith(".metadata.json")).count())
+        .withFailMessage("Wrong rebuilt version file count")
+        .isEqualTo(versionFileCount);
+    assertThat(filesToMove.stream().filter(f -> f.contains("snap-")).count())
+        .withFailMessage("Wrong rebuilt Manifest list file count")
+        .isEqualTo(manifestListCount);
+    assertThat(filesToMove.stream().filter(f -> 
f.endsWith("-m0.avro")).count())
+        .withFailMessage("Wrong rebuilt Manifest file file count")
+        .isEqualTo(manifestFileCount);
+    assertThat(filesToMove.size()).withFailMessage("Wrong total file 
count").isEqualTo(totalCount);
+  }
+
+  protected String newTableLocation() throws IOException {
+    return newTableDir;
+  }
+
+  protected String targetTableLocation() throws IOException {
+    return targetTableDir;
+  }
+
+  protected String stagingLocation() throws IOException {
+    return staging;
+  }
+
+  private void copyTableFiles(RewriteTablePath.Result result) throws Exception 
{
+    List<Tuple2<String, String>> filesToMove = 
readPathPairList(result.fileListLocation());
+
+    for (Tuple2<String, String> pathPair : filesToMove) {
+      FileUtils.copyFile(new File(URI.create(pathPair._1())), new 
File(URI.create(pathPair._2())));
+    }
+  }
+
+  private String removePrefix(String path) {
+    return path.substring(path.lastIndexOf(":") + 1);
+  }
+
+  protected Table newStaticTable(String metadataFileLocation, FileIO io) {
+    StaticTableOperations ops = new 
StaticTableOperations(metadataFileLocation, io);
+    return new BaseTable(ops, metadataFileLocation);
+  }
+
+  private List<Tuple2<String, String>> readPathPairList(String path) {
+    Encoder<Tuple2<String, String>> encoder = 
Encoders.tuple(Encoders.STRING(), Encoders.STRING());
+    return spark
+        .read()
+        .format("csv")
+        .schema(encoder.schema())
+        .load(path)
+        .as(encoder)
+        .collectAsList();
+  }
+
+  private Table createMetastoreTable(
+      String location,
+      Map<String, String> properties,
+      String namespace,
+      String tableName,
+      int snapshotNumber) {
+    spark.conf().set("spark.sql.catalog.hive", SparkCatalog.class.getName());
+    spark.conf().set("spark.sql.catalog.hive.type", "hive");
+    spark.conf().set("spark.sql.catalog.hive.default-namespace", "default");
+    spark.conf().set("spark.sql.catalog.hive.cache-enabled", "false");
+
+    StringBuilder propertiesStr = new StringBuilder();
+    properties.forEach((k, v) -> propertiesStr.append("'" + k + "'='" + v + 
"',"));
+    String tblProperties =
+        propertiesStr.substring(0, propertiesStr.length() > 0 ? 
propertiesStr.length() - 1 : 0);
+
+    sql("DROP TABLE IF EXISTS hive.%s.%s", namespace, tableName);
+    if (tblProperties.isEmpty()) {
+      String sqlStr =
+          String.format(
+              "CREATE TABLE hive.%s.%s (c1 bigint, c2 string, c3 string)", 
namespace, tableName);
+      if (!location.isEmpty()) {
+        sqlStr = String.format("%s USING iceberg LOCATION '%s'", sqlStr, 
location);
+      }
+      sql(sqlStr);
+    } else {
+      String sqlStr =
+          String.format(
+              "CREATE TABLE hive.%s.%s (c1 bigint, c2 string, c3 string)", 
namespace, tableName);
+      if (!location.isEmpty()) {
+        sqlStr = String.format("%s USING iceberg LOCATION '%s'", sqlStr, 
location);
+      }
+
+      sqlStr = String.format("%s TBLPROPERTIES (%s)", sqlStr, tblProperties);
+      sql(sqlStr);
+    }
+
+    for (int i = 0; i < snapshotNumber; i++) {
+      sql("insert into hive.%s.%s values (%s, 'AAAAAAAAAA', 'AAAA')", 
namespace, tableName, i);
+    }
+    return catalog.loadTable(TableIdentifier.of(namespace, tableName));
+  }
+
+  private static String fileName(String path) {
+    String filename = path;
+    int lastIndex = path.lastIndexOf(File.separator);
+    if (lastIndex != -1) {
+      filename = path.substring(lastIndex + 1);
+    }
+    return filename;
+  }
+
+  private TableMetadata currentMetadata(Table tbl) {
+    return ((HasTableOperations) tbl).operations().current();
+  }
+
+  private List<Object[]> rows(String location) {
+    return 
rowsToJava(spark.read().format("iceberg").load(location).collectAsList());
+  }
+
+  private PositionDelete<GenericRecord> positionDelete(
+      Schema tableSchema, CharSequence path, Long position, Object... values) {
+    PositionDelete<GenericRecord> posDelete = PositionDelete.create();
+    GenericRecord nested = GenericRecord.create(tableSchema);
+    for (int i = 0; i < values.length; i++) {
+      nested.set(i, values[i]);
+    }
+    posDelete.set(path, position, nested);
+    return posDelete;
+  }
+
+  private void removeBroadcastValuesFromLocalBlockManager(long id) {
+    BlockId blockId = new BroadcastBlockId(id, "");
+    SparkEnv env = SparkEnv.get();
+    env.broadcastManager().cachedValues().clear();
+    BlockManager blockManager = env.blockManager();
+    BlockInfoManager blockInfoManager = blockManager.blockInfoManager();
+    blockInfoManager.lockForWriting(blockId, true);
+    blockInfoManager.removeBlock(blockId);
+    blockManager.memoryStore().remove(blockId);
+  }
+}

Reply via email to