This is an automated email from the ASF dual-hosted git repository.
amoghj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 645ef83eec Spark 3.4: Backport Spark actions and procedures for
RewriteTablePath (#12111)
645ef83eec is described below
commit 645ef83eec0b993dcc79310de343c04689a5c651
Author: Hongyue/Steve Zhang <[email protected]>
AuthorDate: Sun Jan 26 16:43:44 2025 -0800
Spark 3.4: Backport Spark actions and procedures for RewriteTablePath
(#12111)
---
.../extensions/TestRewriteTablePathProcedure.java | 184 ++++
.../iceberg/spark/actions/BaseSparkAction.java | 5 +
.../spark/actions/RewriteTablePathSparkAction.java | 714 +++++++++++++
.../apache/iceberg/spark/actions/SparkActions.java | 5 +
.../procedures/RewriteTablePathProcedure.java | 130 +++
.../iceberg/spark/procedures/SparkProcedures.java | 1 +
.../spark/actions/TestRewriteTablePathsAction.java | 1080 ++++++++++++++++++++
7 files changed, 2119 insertions(+)
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteTablePathProcedure.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteTablePathProcedure.java
new file mode 100644
index 0000000000..ae82cf5961
--- /dev/null
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteTablePathProcedure.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.RewriteTablePathUtil;
+import org.apache.iceberg.Table;
+import org.apache.spark.sql.AnalysisException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestRewriteTablePathProcedure extends SparkExtensionsTestBase {
+ @Rule public TemporaryFolder temp = new TemporaryFolder();
+
+ public String staging = null;
+ public String targetTableDir = null;
+
+ public TestRewriteTablePathProcedure(
+ String catalogName, String implementation, Map<String, String> config) {
+ super(catalogName, implementation, config);
+ }
+
+ @Before
+ public void setupTableLocation() throws Exception {
+ this.staging = temp.newFolder("staging").toURI().toString();
+ this.targetTableDir = temp.newFolder("targetTable").toURI().toString();
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+ }
+
+ @After
+ public void removeTables() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ }
+
+ @Test
+ public void testRewriteTablePathWithPositionalArgument() {
+ Table table = validationCatalog.loadTable(tableIdent);
+ String metadataJson =
+ (((HasTableOperations)
table).operations()).current().metadataFileLocation();
+
+ List<Object[]> result =
+ sql(
+ "CALL %s.system.rewrite_table_path('%s', '%s', '%s')",
+ catalogName, tableIdent, table.location(), targetTableDir);
+ assertThat(result).hasSize(1);
+ assertThat(result.get(0)[0])
+ .as("Should return correct latest version")
+ .isEqualTo(RewriteTablePathUtil.fileName(metadataJson));
+ assertThat(result.get(0)[1])
+ .as("Should return file_list_location")
+ .asString()
+ .startsWith(table.location())
+ .endsWith("file-list");
+ checkFileListLocationCount((String) result.get(0)[1], 1);
+ }
+
+ @Test
+ public void testRewriteTablePathWithNamedArgument() {
+ Table table = validationCatalog.loadTable(tableIdent);
+ String v0Metadata =
+ RewriteTablePathUtil.fileName(
+ (((HasTableOperations)
table).operations()).current().metadataFileLocation());
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ String v1Metadata =
+ RewriteTablePathUtil.fileName(
+ (((HasTableOperations)
table).operations()).refresh().metadataFileLocation());
+
+ String expectedFileListLocation = staging + "file-list";
+
+ List<Object[]> result =
+ sql(
+ "CALL %s.system.rewrite_table_path("
+ + "table => '%s', "
+ + "target_prefix => '%s', "
+ + "source_prefix => '%s', "
+ + "end_version => '%s', "
+ + "start_version => '%s', "
+ + "staging_location => '%s')",
+ catalogName,
+ tableIdent,
+ this.targetTableDir,
+ table.location(),
+ v1Metadata,
+ v0Metadata,
+ this.staging);
+ assertThat(result).hasSize(1);
+ assertThat(result.get(0)[0]).as("Should return correct latest
version").isEqualTo(v1Metadata);
+ assertThat(result.get(0)[1])
+ .as("Should return correct file_list_location")
+ .isEqualTo(expectedFileListLocation);
+ checkFileListLocationCount((String) result.get(0)[1], 4);
+ }
+
+ @Test
+ public void testProcedureWithInvalidInput() {
+
+ assertThatThrownBy(
+ () -> sql("CALL %s.system.rewrite_table_path('%s')", catalogName,
tableIdent))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("Missing required parameters:
[source_prefix,target_prefix]");
+ assertThatThrownBy(
+ () ->
+ sql(
+ "CALL %s.system.rewrite_table_path('%s','%s')",
+ catalogName, tableIdent, this.targetTableDir))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("Missing required parameters: [target_prefix]");
+ assertThatThrownBy(
+ () ->
+ sql(
+ "CALL %s.system.rewrite_table_path('%s', '%s','%s')",
+ catalogName, "notExists", this.targetTableDir,
this.targetTableDir))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining("Couldn't load table");
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ String v0Metadata =
+ RewriteTablePathUtil.fileName(
+ (((HasTableOperations)
table).operations()).current().metadataFileLocation());
+ assertThatThrownBy(
+ () ->
+ sql(
+ "CALL %s.system.rewrite_table_path("
+ + "table => '%s', "
+ + "source_prefix => '%s', "
+ + "target_prefix => '%s', "
+ + "start_version => '%s')",
+ catalogName,
+ tableIdent,
+ table.location(),
+ this.targetTableDir,
+ "v20.metadata.json"))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining(
+ "Cannot find provided version file %s in metadata log.",
"v20.metadata.json");
+ assertThatThrownBy(
+ () ->
+ sql(
+ "CALL %s.system.rewrite_table_path("
+ + "table => '%s', "
+ + "source_prefix => '%s', "
+ + "target_prefix => '%s', "
+ + "start_version => '%s',"
+ + "end_version => '%s')",
+ catalogName,
+ tableIdent,
+ table.location(),
+ this.targetTableDir,
+ v0Metadata,
+ "v11.metadata.json"))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining(
+ "Cannot find provided version file %s in metadata log.",
"v11.metadata.json");
+ }
+
+ private void checkFileListLocationCount(String fileListLocation, long
expectedFileCount) {
+ long fileCount =
spark.read().format("text").load(fileListLocation).count();
+ assertThat(fileCount).isEqualTo(expectedFileCount);
+ }
+}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
index 13ce67cda1..65c605519d 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
@@ -140,6 +140,11 @@ abstract class BaseSparkAction<ThisT> {
return new BaseTable(ops, metadataFileLocation);
}
+ protected Table newStaticTable(String metadataFileLocation, FileIO io) {
+ StaticTableOperations ops = new
StaticTableOperations(metadataFileLocation, io);
+ return new BaseTable(ops, metadataFileLocation);
+ }
+
protected Dataset<FileInfo> contentFileDS(Table table) {
return contentFileDS(table, null);
}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
new file mode 100644
index 0000000000..55888f7f5e
--- /dev/null
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
@@ -0,0 +1,714 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteTablePathUtil;
+import org.apache.iceberg.RewriteTablePathUtil.PositionDeleteReaderWriter;
+import org.apache.iceberg.RewriteTablePathUtil.RewriteResult;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StaticTableOperations;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableMetadata.MetadataLogEntry;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.iceberg.actions.ImmutableRewriteTablePath;
+import org.apache.iceberg.actions.RewriteTablePath;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.avro.DataWriter;
+import org.apache.iceberg.data.orc.GenericOrcReader;
+import org.apache.iceberg.data.orc.GenericOrcWriter;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.DeleteSchemaUtil;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.spark.source.SerializableTableWithSize;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.api.java.function.ForeachFunction;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.api.java.function.ReduceFunction;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.functions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+public class RewriteTablePathSparkAction extends
BaseSparkAction<RewriteTablePath>
+ implements RewriteTablePath {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(RewriteTablePathSparkAction.class);
+ private static final String RESULT_LOCATION = "file-list";
+
+ private String sourcePrefix;
+ private String targetPrefix;
+ private String startVersionName;
+ private String endVersionName;
+ private String stagingDir;
+
+ private final Table table;
+ private Broadcast<Table> tableBroadcast = null;
+
+ RewriteTablePathSparkAction(SparkSession spark, Table table) {
+ super(spark);
+ this.table = table;
+ }
+
+ @Override
+ protected RewriteTablePath self() {
+ return this;
+ }
+
+ @Override
+ public RewriteTablePath rewriteLocationPrefix(String sPrefix, String
tPrefix) {
+ Preconditions.checkArgument(
+ sPrefix != null && !sPrefix.isEmpty(), "Source prefix('%s') cannot be
empty.", sPrefix);
+ this.sourcePrefix = sPrefix;
+ this.targetPrefix = tPrefix;
+ return this;
+ }
+
+ @Override
+ public RewriteTablePath startVersion(String sVersion) {
+ Preconditions.checkArgument(
+ sVersion != null && !sVersion.trim().isEmpty(),
+ "Start version('%s') cannot be empty.",
+ sVersion);
+ this.startVersionName = sVersion;
+ return this;
+ }
+
+ @Override
+ public RewriteTablePath endVersion(String eVersion) {
+ Preconditions.checkArgument(
+ eVersion != null && !eVersion.trim().isEmpty(),
+ "End version('%s') cannot be empty.",
+ eVersion);
+ this.endVersionName = eVersion;
+ return this;
+ }
+
+ @Override
+ public RewriteTablePath stagingLocation(String stagingLocation) {
+ Preconditions.checkArgument(
+ stagingLocation != null && !stagingLocation.isEmpty(),
+ "Staging location('%s') cannot be empty.",
+ stagingLocation);
+ this.stagingDir = stagingLocation;
+ return this;
+ }
+
+ @Override
+ public Result execute() {
+ validateInputs();
+ JobGroupInfo info = newJobGroupInfo("REWRITE-TABLE-PATH", jobDesc());
+ return withJobGroupInfo(info, this::doExecute);
+ }
+
+ private Result doExecute() {
+ String resultLocation = rebuildMetadata();
+ return ImmutableRewriteTablePath.Result.builder()
+ .stagingLocation(stagingDir)
+ .fileListLocation(resultLocation)
+ .latestVersion(RewriteTablePathUtil.fileName(endVersionName))
+ .build();
+ }
+
+ private void validateInputs() {
+ Preconditions.checkArgument(
+ sourcePrefix != null && !sourcePrefix.isEmpty(),
+ "Source prefix('%s') cannot be empty.",
+ sourcePrefix);
+ Preconditions.checkArgument(
+ targetPrefix != null && !targetPrefix.isEmpty(),
+ "Target prefix('%s') cannot be empty.",
+ targetPrefix);
+ Preconditions.checkArgument(
+ !sourcePrefix.equals(targetPrefix),
+ "Source prefix cannot be the same as target prefix (%s)",
+ sourcePrefix);
+
+ validateAndSetEndVersion();
+ validateAndSetStartVersion();
+
+ if (stagingDir == null) {
+ stagingDir = getMetadataLocation(table) + "copy-table-staging-" +
UUID.randomUUID() + "/";
+ } else if (!stagingDir.endsWith("/")) {
+ stagingDir = stagingDir + "/";
+ }
+ }
+
+ private void validateAndSetEndVersion() {
+ TableMetadata tableMetadata = ((HasTableOperations)
table).operations().current();
+
+ if (endVersionName == null) {
+ LOG.info("No end version specified. Will stage all files to the latest
table version.");
+ Preconditions.checkNotNull(
+ tableMetadata.metadataFileLocation(), "Metadata file location should
not be null");
+ this.endVersionName = tableMetadata.metadataFileLocation();
+ } else {
+ this.endVersionName = validateVersion(tableMetadata, endVersionName);
+ }
+ }
+
+ private void validateAndSetStartVersion() {
+ TableMetadata tableMetadata = ((HasTableOperations)
table).operations().current();
+
+ if (startVersionName != null) {
+ this.startVersionName = validateVersion(tableMetadata, startVersionName);
+ }
+ }
+
+ private String validateVersion(TableMetadata tableMetadata, String
versionFileName) {
+ String versionFile = null;
+ if (versionInFilePath(tableMetadata.metadataFileLocation(),
versionFileName)) {
+ versionFile = tableMetadata.metadataFileLocation();
+ }
+
+ for (MetadataLogEntry log : tableMetadata.previousFiles()) {
+ if (versionInFilePath(log.file(), versionFileName)) {
+ versionFile = log.file();
+ }
+ }
+
+ Preconditions.checkArgument(
+ versionFile != null,
+ "Cannot find provided version file %s in metadata log.",
+ versionFileName);
+ Preconditions.checkArgument(
+ fileExist(versionFile), "Version file %s does not exist.",
versionFile);
+ return versionFile;
+ }
+
+ private boolean versionInFilePath(String path, String version) {
+ return RewriteTablePathUtil.fileName(path).equals(version);
+ }
+
+ private String jobDesc() {
+ if (startVersionName != null) {
+ return String.format(
+ "Replacing path prefixes '%s' with '%s' in the metadata files of
table %s,"
+ + "up to version '%s'.",
+ sourcePrefix, targetPrefix, table.name(), endVersionName);
+ } else {
+ return String.format(
+ "Replacing path prefixes '%s' with '%s' in the metadata files of
table %s,"
+ + "from version '%s' to '%s'.",
+ sourcePrefix, targetPrefix, table.name(), startVersionName,
endVersionName);
+ }
+ }
+
+ /**
+ * Rebuild metadata in a staging location, with paths rewritten.
+ *
+ * <ul>
+ * <li>Rebuild version files to staging
+ * <li>Rebuild manifest list files to staging
+ * <li>Rebuild manifest to staging
+ * <li>Get all files needed to move
+ * </ul>
+ */
+ private String rebuildMetadata() {
+ TableMetadata startMetadata =
+ startVersionName != null
+ ? ((HasTableOperations) newStaticTable(startVersionName,
table.io()))
+ .operations()
+ .current()
+ : null;
+ TableMetadata endMetadata =
+ ((HasTableOperations) newStaticTable(endVersionName,
table.io())).operations().current();
+
+ Preconditions.checkArgument(
+ endMetadata.statisticsFiles() == null ||
endMetadata.statisticsFiles().isEmpty(),
+ "Statistic files are not supported yet.");
+
+ // rebuild version files
+ RewriteResult<Snapshot> rewriteVersionResult =
rewriteVersionFiles(endMetadata);
+ Set<Snapshot> deltaSnapshots = deltaSnapshots(startMetadata,
rewriteVersionResult.toRewrite());
+
+ Set<String> manifestsToRewrite = manifestsToRewrite(deltaSnapshots,
startMetadata);
+ Set<Snapshot> validSnapshots =
+ Sets.difference(snapshotSet(endMetadata), snapshotSet(startMetadata));
+
+ // rebuild manifest-list files
+ RewriteResult<ManifestFile> rewriteManifestListResult =
+ validSnapshots.stream()
+ .map(snapshot -> rewriteManifestList(snapshot, endMetadata,
manifestsToRewrite))
+ .reduce(new RewriteResult<>(), RewriteResult::append);
+
+ // rebuild manifest files
+ RewriteContentFileResult rewriteManifestResult =
+ rewriteManifests(endMetadata, rewriteManifestListResult.toRewrite());
+
+ // rebuild position delete files
+ Set<DeleteFile> deleteFiles =
+ rewriteManifestResult.toRewrite().stream()
+ .filter(e -> e instanceof DeleteFile)
+ .map(e -> (DeleteFile) e)
+ .collect(Collectors.toSet());
+ rewritePositionDeletes(endMetadata, deleteFiles);
+
+ Set<Pair<String, String>> copyPlan = Sets.newHashSet();
+ copyPlan.addAll(rewriteVersionResult.copyPlan());
+ copyPlan.addAll(rewriteManifestListResult.copyPlan());
+ copyPlan.addAll(rewriteManifestResult.copyPlan());
+
+ return saveFileList(copyPlan);
+ }
+
+ private String saveFileList(Set<Pair<String, String>> filesToMove) {
+ List<Tuple2<String, String>> fileList =
+ filesToMove.stream()
+ .map(p -> Tuple2.apply(p.first(), p.second()))
+ .collect(Collectors.toList());
+ Dataset<Tuple2<String, String>> fileListDataset =
+ spark().createDataset(fileList, Encoders.tuple(Encoders.STRING(),
Encoders.STRING()));
+ String fileListPath = stagingDir + RESULT_LOCATION;
+ fileListDataset
+ .repartition(1)
+ .write()
+ .mode(SaveMode.Overwrite)
+ .format("csv")
+ .save(fileListPath);
+ return fileListPath;
+ }
+
+ private Set<Snapshot> deltaSnapshots(TableMetadata startMetadata,
Set<Snapshot> allSnapshots) {
+ if (startMetadata == null) {
+ return allSnapshots;
+ } else {
+ Set<Long> startSnapshotIds =
+
startMetadata.snapshots().stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
+ return allSnapshots.stream()
+ .filter(s -> !startSnapshotIds.contains(s.snapshotId()))
+ .collect(Collectors.toSet());
+ }
+ }
+
+ private RewriteResult<Snapshot> rewriteVersionFiles(TableMetadata
endMetadata) {
+ RewriteResult<Snapshot> result = new RewriteResult<>();
+ result.toRewrite().addAll(endMetadata.snapshots());
+ result.copyPlan().add(rewriteVersionFile(endMetadata, endVersionName));
+
+ List<MetadataLogEntry> versions = endMetadata.previousFiles();
+ for (int i = versions.size() - 1; i >= 0; i--) {
+ String versionFilePath = versions.get(i).file();
+ if (versionFilePath.equals(startVersionName)) {
+ break;
+ }
+
+ Preconditions.checkArgument(
+ fileExist(versionFilePath),
+ String.format("Version file %s doesn't exist", versionFilePath));
+ TableMetadata tableMetadata =
+ new StaticTableOperations(versionFilePath, table.io()).current();
+
+ result.toRewrite().addAll(tableMetadata.snapshots());
+ result.copyPlan().add(rewriteVersionFile(tableMetadata,
versionFilePath));
+ }
+
+ return result;
+ }
+
+ private Pair<String, String> rewriteVersionFile(TableMetadata metadata,
String versionFilePath) {
+ String stagingPath = RewriteTablePathUtil.stagingPath(versionFilePath,
stagingDir);
+ TableMetadata newTableMetadata =
+ RewriteTablePathUtil.replacePaths(metadata, sourcePrefix,
targetPrefix);
+ TableMetadataParser.overwrite(newTableMetadata,
table.io().newOutputFile(stagingPath));
+ return Pair.of(stagingPath, newPath(versionFilePath, sourcePrefix,
targetPrefix));
+ }
+
+ /**
+ * Rewrite a manifest list representing a snapshot.
+ *
+ * @param snapshot snapshot represented by the manifest list
+ * @param tableMetadata metadata of table
+ * @param manifestsToRewrite filter of manifests to rewrite.
+ * @return a result including a copy plan for the manifests contained in the
manifest list, as
+ * well as for the manifest list itself
+ */
+ private RewriteResult<ManifestFile> rewriteManifestList(
+ Snapshot snapshot, TableMetadata tableMetadata, Set<String>
manifestsToRewrite) {
+ RewriteResult<ManifestFile> result = new RewriteResult<>();
+
+ String path = snapshot.manifestListLocation();
+ String outputPath = RewriteTablePathUtil.stagingPath(path, stagingDir);
+ RewriteResult<ManifestFile> rewriteResult =
+ RewriteTablePathUtil.rewriteManifestList(
+ snapshot,
+ table.io(),
+ tableMetadata,
+ manifestsToRewrite,
+ sourcePrefix,
+ targetPrefix,
+ stagingDir,
+ outputPath);
+
+ result.append(rewriteResult);
+ // add the manifest list copy plan itself to the result
+ result.copyPlan().add(Pair.of(outputPath, newPath(path, sourcePrefix,
targetPrefix)));
+ return result;
+ }
+
+ private Set<String> manifestsToRewrite(
+ Set<Snapshot> deltaSnapshots, TableMetadata startMetadata) {
+ try {
+ Table endStaticTable = newStaticTable(endVersionName, table.io());
+ Dataset<Row> lastVersionFiles =
manifestDS(endStaticTable).select("path");
+ if (startMetadata == null) {
+ return
Sets.newHashSet(lastVersionFiles.distinct().as(Encoders.STRING()).collectAsList());
+ } else {
+ Set<Long> deltaSnapshotIds =
+
deltaSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
+ return Sets.newHashSet(
+ lastVersionFiles
+ .distinct()
+ .filter(
+ functions
+ .column(ManifestFile.SNAPSHOT_ID.name())
+ .isInCollection(deltaSnapshotIds))
+ .as(Encoders.STRING())
+ .collectAsList());
+ }
+ } catch (Exception e) {
+ throw new UnsupportedOperationException(
+ "Unable to build the manifest files dataframe. The end version in
use may contain invalid snapshots. "
+ + "Please choose an earlier version without invalid snapshots.",
+ e);
+ }
+ }
+
+ public static class RewriteContentFileResult extends
RewriteResult<ContentFile<?>> {
+ public RewriteContentFileResult append(RewriteResult<ContentFile<?>> r1) {
+ this.copyPlan().addAll(r1.copyPlan());
+ this.toRewrite().addAll(r1.toRewrite());
+ return this;
+ }
+
+ public RewriteContentFileResult appendDataFile(RewriteResult<DataFile> r1)
{
+ this.copyPlan().addAll(r1.copyPlan());
+ this.toRewrite().addAll(r1.toRewrite());
+ return this;
+ }
+
+ public RewriteContentFileResult appendDeleteFile(RewriteResult<DeleteFile>
r1) {
+ this.copyPlan().addAll(r1.copyPlan());
+ this.toRewrite().addAll(r1.toRewrite());
+ return this;
+ }
+ }
+
+ /** Rewrite manifest files in a distributed manner and return rewritten data
files path pairs. */
+ private RewriteContentFileResult rewriteManifests(
+ TableMetadata tableMetadata, Set<ManifestFile> toRewrite) {
+ if (toRewrite.isEmpty()) {
+ return new RewriteContentFileResult();
+ }
+
+ Encoder<ManifestFile> manifestFileEncoder =
Encoders.javaSerialization(ManifestFile.class);
+ Dataset<ManifestFile> manifestDS =
+ spark().createDataset(Lists.newArrayList(toRewrite),
manifestFileEncoder);
+
+ return manifestDS
+ .repartition(toRewrite.size())
+ .map(
+ toManifests(
+ tableBroadcast(),
+ stagingDir,
+ tableMetadata.formatVersion(),
+ sourcePrefix,
+ targetPrefix),
+ Encoders.bean(RewriteContentFileResult.class))
+ // duplicates are expected here as the same data file can have
different statuses
+ // (e.g. added and deleted)
+ .reduce((ReduceFunction<RewriteContentFileResult>)
RewriteContentFileResult::append);
+ }
+
+ private static MapFunction<ManifestFile, RewriteContentFileResult>
toManifests(
+ Broadcast<Table> table,
+ String stagingLocation,
+ int format,
+ String sourcePrefix,
+ String targetPrefix) {
+
+ return manifestFile -> {
+ RewriteContentFileResult result = new RewriteContentFileResult();
+ switch (manifestFile.content()) {
+ case DATA:
+ result.appendDataFile(
+ writeDataManifest(
+ manifestFile, table, stagingLocation, format, sourcePrefix,
targetPrefix));
+ break;
+ case DELETES:
+ result.appendDeleteFile(
+ writeDeleteManifest(
+ manifestFile, table, stagingLocation, format, sourcePrefix,
targetPrefix));
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported manifest type: " + manifestFile.content());
+ }
+ return result;
+ };
+ }
+
+ private static RewriteResult<DataFile> writeDataManifest(
+ ManifestFile manifestFile,
+ Broadcast<Table> table,
+ String stagingLocation,
+ int format,
+ String sourcePrefix,
+ String targetPrefix) {
+ try {
+ String stagingPath =
RewriteTablePathUtil.stagingPath(manifestFile.path(), stagingLocation);
+ FileIO io = table.getValue().io();
+ OutputFile outputFile = io.newOutputFile(stagingPath);
+ Map<Integer, PartitionSpec> specsById = table.getValue().specs();
+ return RewriteTablePathUtil.rewriteDataManifest(
+ manifestFile, outputFile, io, format, specsById, sourcePrefix,
targetPrefix);
+ } catch (IOException e) {
+ throw new RuntimeIOException(e);
+ }
+ }
+
+ private static RewriteResult<DeleteFile> writeDeleteManifest(
+ ManifestFile manifestFile,
+ Broadcast<Table> table,
+ String stagingLocation,
+ int format,
+ String sourcePrefix,
+ String targetPrefix) {
+ try {
+ String stagingPath =
RewriteTablePathUtil.stagingPath(manifestFile.path(), stagingLocation);
+ FileIO io = table.getValue().io();
+ OutputFile outputFile = io.newOutputFile(stagingPath);
+ Map<Integer, PartitionSpec> specsById = table.getValue().specs();
+ return RewriteTablePathUtil.rewriteDeleteManifest(
+ manifestFile,
+ outputFile,
+ io,
+ format,
+ specsById,
+ sourcePrefix,
+ targetPrefix,
+ stagingLocation);
+ } catch (IOException e) {
+ throw new RuntimeIOException(e);
+ }
+ }
+
+ private void rewritePositionDeletes(TableMetadata metadata, Set<DeleteFile>
toRewrite) {
+ if (toRewrite.isEmpty()) {
+ return;
+ }
+
+ Encoder<DeleteFile> deleteFileEncoder =
Encoders.javaSerialization(DeleteFile.class);
+ Dataset<DeleteFile> deleteFileDs =
+ spark().createDataset(Lists.newArrayList(toRewrite),
deleteFileEncoder);
+
+ PositionDeleteReaderWriter posDeleteReaderWriter = new
SparkPositionDeleteReaderWriter();
+ deleteFileDs
+ .repartition(toRewrite.size())
+ .foreach(
+ rewritePositionDelete(
+ tableBroadcast(), sourcePrefix, targetPrefix, stagingDir,
posDeleteReaderWriter));
+ }
+
+ private static class SparkPositionDeleteReaderWriter implements
PositionDeleteReaderWriter {
+ @Override
+ public CloseableIterable<Record> reader(
+ InputFile inputFile, FileFormat format, PartitionSpec spec) {
+ return positionDeletesReader(inputFile, format, spec);
+ }
+
+ @Override
+ public PositionDeleteWriter<Record> writer(
+ OutputFile outputFile,
+ FileFormat format,
+ PartitionSpec spec,
+ StructLike partition,
+ Schema rowSchema)
+ throws IOException {
+ return positionDeletesWriter(outputFile, format, spec, partition,
rowSchema);
+ }
+ }
+
+ private ForeachFunction<DeleteFile> rewritePositionDelete(
+ Broadcast<Table> tableArg,
+ String sourcePrefixArg,
+ String targetPrefixArg,
+ String stagingLocationArg,
+ PositionDeleteReaderWriter posDeleteReaderWriter) {
+ return deleteFile -> {
+ FileIO io = tableArg.getValue().io();
+ String newPath = RewriteTablePathUtil.stagingPath(deleteFile.location(),
stagingLocationArg);
+ OutputFile outputFile = io.newOutputFile(newPath);
+ PartitionSpec spec =
tableArg.getValue().specs().get(deleteFile.specId());
+ RewriteTablePathUtil.rewritePositionDeleteFile(
+ deleteFile,
+ outputFile,
+ io,
+ spec,
+ sourcePrefixArg,
+ targetPrefixArg,
+ posDeleteReaderWriter);
+ };
+ }
+
+ private static CloseableIterable<Record> positionDeletesReader(
+ InputFile inputFile, FileFormat format, PartitionSpec spec) {
+ Schema deleteSchema = DeleteSchemaUtil.posDeleteReadSchema(spec.schema());
+ switch (format) {
+ case AVRO:
+ return Avro.read(inputFile)
+ .project(deleteSchema)
+ .reuseContainers()
+ .createReaderFunc(DataReader::create)
+ .build();
+
+ case PARQUET:
+ return Parquet.read(inputFile)
+ .project(deleteSchema)
+ .reuseContainers()
+ .createReaderFunc(
+ fileSchema -> GenericParquetReaders.buildReader(deleteSchema,
fileSchema))
+ .build();
+
+ case ORC:
+ return ORC.read(inputFile)
+ .project(deleteSchema)
+ .createReaderFunc(fileSchema ->
GenericOrcReader.buildReader(deleteSchema, fileSchema))
+ .build();
+
+ default:
+ throw new UnsupportedOperationException("Unsupported file format: " +
format);
+ }
+ }
+
+ private static PositionDeleteWriter<Record> positionDeletesWriter(
+ OutputFile outputFile,
+ FileFormat format,
+ PartitionSpec spec,
+ StructLike partition,
+ Schema rowSchema)
+ throws IOException {
+ switch (format) {
+ case AVRO:
+ return Avro.writeDeletes(outputFile)
+ .createWriterFunc(DataWriter::create)
+ .withPartition(partition)
+ .rowSchema(rowSchema)
+ .withSpec(spec)
+ .buildPositionWriter();
+ case PARQUET:
+ return Parquet.writeDeletes(outputFile)
+ .createWriterFunc(GenericParquetWriter::buildWriter)
+ .withPartition(partition)
+ .rowSchema(rowSchema)
+ .withSpec(spec)
+ .buildPositionWriter();
+ case ORC:
+ return ORC.writeDeletes(outputFile)
+ .createWriterFunc(GenericOrcWriter::buildWriter)
+ .withPartition(partition)
+ .rowSchema(rowSchema)
+ .withSpec(spec)
+ .buildPositionWriter();
+ default:
+ throw new UnsupportedOperationException("Unsupported file format: " +
format);
+ }
+ }
+
+ private Set<Snapshot> snapshotSet(TableMetadata metadata) {
+ if (metadata == null) {
+ return Sets.newHashSet();
+ } else {
+ return Sets.newHashSet(metadata.snapshots());
+ }
+ }
+
+ private boolean fileExist(String path) {
+ if (path == null || path.trim().isEmpty()) {
+ return false;
+ }
+ return table.io().newInputFile(path).exists();
+ }
+
+ private static String newPath(String path, String sourcePrefix, String
targetPrefix) {
+ return RewriteTablePathUtil.combinePaths(
+ targetPrefix, RewriteTablePathUtil.relativize(path, sourcePrefix));
+ }
+
+ private String getMetadataLocation(Table tbl) {
+ String currentMetadataPath =
+ ((HasTableOperations)
tbl).operations().current().metadataFileLocation();
+ int lastIndex = currentMetadataPath.lastIndexOf(File.separator);
+ String metadataDir = "";
+ if (lastIndex != -1) {
+ metadataDir = currentMetadataPath.substring(0, lastIndex + 1);
+ }
+
+ Preconditions.checkArgument(
+ !metadataDir.isEmpty(), "Failed to get the metadata file root
directory");
+ return metadataDir;
+ }
+
+ @VisibleForTesting
+ Broadcast<Table> tableBroadcast() {
+ if (tableBroadcast == null) {
+ this.tableBroadcast =
sparkContext().broadcast(SerializableTableWithSize.copyOf(table));
+ }
+
+ return tableBroadcast;
+ }
+}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
index ba9fa2e7b4..aa4ef987e7 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
@@ -108,4 +108,9 @@ public class SparkActions implements ActionsProvider {
public RemoveDanglingDeleteFiles removeDanglingDeleteFiles(Table table) {
return new RemoveDanglingDeletesSparkAction(spark, table);
}
+
+ @Override
+ public RewriteTablePathSparkAction rewriteTablePath(Table table) {
+ return new RewriteTablePathSparkAction(spark, table);
+ }
}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteTablePathProcedure.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteTablePathProcedure.java
new file mode 100644
index 0000000000..b936dcfced
--- /dev/null
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteTablePathProcedure.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.procedures;
+
+import org.apache.iceberg.actions.RewriteTablePath;
+import org.apache.iceberg.spark.actions.RewriteTablePathSparkAction;
+import org.apache.iceberg.spark.actions.SparkActions;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+public class RewriteTablePathProcedure extends BaseProcedure {
+
+ private static final ProcedureParameter TABLE_PARAM =
+ ProcedureParameter.required("table", DataTypes.StringType);
+ private static final ProcedureParameter SOURCE_PREFIX_PARAM =
+ ProcedureParameter.required("source_prefix", DataTypes.StringType);
+ private static final ProcedureParameter TARGET_PREFIX_PARAM =
+ ProcedureParameter.required("target_prefix", DataTypes.StringType);
+ private static final ProcedureParameter START_VERSION_PARAM =
+ ProcedureParameter.optional("start_version", DataTypes.StringType);
+ private static final ProcedureParameter END_VERSION_PARM =
+ ProcedureParameter.optional("end_version", DataTypes.StringType);
+ private static final ProcedureParameter STAGING_LOCATION_PARAM =
+ ProcedureParameter.optional("staging_location", DataTypes.StringType);
+
+ private static final ProcedureParameter[] PARAMETERS =
+ new ProcedureParameter[] {
+ TABLE_PARAM,
+ SOURCE_PREFIX_PARAM,
+ TARGET_PREFIX_PARAM,
+ START_VERSION_PARAM,
+ END_VERSION_PARM,
+ STAGING_LOCATION_PARAM
+ };
+
+ private static final StructType OUTPUT_TYPE =
+ new StructType(
+ new StructField[] {
+ new StructField("latest_version", DataTypes.StringType, true,
Metadata.empty()),
+ new StructField("file_list_location", DataTypes.StringType, true,
Metadata.empty())
+ });
+
+ public static SparkProcedures.ProcedureBuilder builder() {
+ return new BaseProcedure.Builder<RewriteTablePathProcedure>() {
+ @Override
+ protected RewriteTablePathProcedure doBuild() {
+ return new RewriteTablePathProcedure(tableCatalog());
+ }
+ };
+ }
+
+ private RewriteTablePathProcedure(TableCatalog tableCatalog) {
+ super(tableCatalog);
+ }
+
+ @Override
+ public ProcedureParameter[] parameters() {
+ return PARAMETERS;
+ }
+
+ @Override
+ public StructType outputType() {
+ return OUTPUT_TYPE;
+ }
+
+ @Override
+ public InternalRow[] call(InternalRow args) {
+ ProcedureInput input = new ProcedureInput(spark(), tableCatalog(),
PARAMETERS, args);
+ Identifier tableIdent = input.ident(TABLE_PARAM);
+ String sourcePrefix = input.asString(SOURCE_PREFIX_PARAM);
+ String targetPrefix = input.asString(TARGET_PREFIX_PARAM);
+ String startVersion = input.asString(START_VERSION_PARAM, null);
+ String endVersion = input.asString(END_VERSION_PARM, null);
+ String stagingLocation = input.asString(STAGING_LOCATION_PARAM, null);
+
+ return withIcebergTable(
+ tableIdent,
+ table -> {
+ RewriteTablePathSparkAction action =
SparkActions.get().rewriteTablePath(table);
+
+ if (startVersion != null) {
+ action.startVersion(startVersion);
+ }
+ if (endVersion != null) {
+ action.endVersion(endVersion);
+ }
+ if (stagingLocation != null) {
+ action.stagingLocation(stagingLocation);
+ }
+
+ return toOutputRows(action.rewriteLocationPrefix(sourcePrefix,
targetPrefix).execute());
+ });
+ }
+
+ private InternalRow[] toOutputRows(RewriteTablePath.Result result) {
+ return new InternalRow[] {
+ newInternalRow(
+ UTF8String.fromString(result.latestVersion()),
+ UTF8String.fromString(result.fileListLocation()))
+ };
+ }
+
+ @Override
+ public String description() {
+ return "RewriteTablePathProcedure";
+ }
+}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
index d636a21ddc..3539704430 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
@@ -62,6 +62,7 @@ public class SparkProcedures {
mapBuilder.put("rewrite_position_delete_files",
RewritePositionDeleteFilesProcedure::builder);
mapBuilder.put("fast_forward", FastForwardBranchProcedure::builder);
mapBuilder.put("compute_table_stats", ComputeTableStatsProcedure::builder);
+ mapBuilder.put("rewrite_table_path", RewriteTablePathProcedure::builder);
return mapBuilder.build();
}
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
new file mode 100644
index 0000000000..2766078517
--- /dev/null
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
@@ -0,0 +1,1080 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.GenericStatisticsFile;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StaticTableOperations;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.actions.ActionsProvider;
+import org.apache.iceberg.actions.RewriteTablePath;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.SparkCatalog;
+import org.apache.iceberg.spark.SparkTestBase;
+import org.apache.iceberg.spark.source.ThreeColumnRecord;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.SparkEnv;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.storage.BlockId;
+import org.apache.spark.storage.BlockInfoManager;
+import org.apache.spark.storage.BlockManager;
+import org.apache.spark.storage.BroadcastBlockId;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import scala.Tuple2;
+
+public class TestRewriteTablePathsAction extends SparkTestBase {
+
+ @Rule public TemporaryFolder temp = new TemporaryFolder();
+
+ protected ActionsProvider actions() {
+ return SparkActions.get();
+ }
+
+ private static final HadoopTables TABLES = new HadoopTables(new
Configuration());
+ protected static final Schema SCHEMA =
+ new Schema(
+ optional(1, "c1", Types.IntegerType.get()),
+ optional(2, "c2", Types.StringType.get()),
+ optional(3, "c3", Types.StringType.get()));
+
+ protected String tableLocation = null;
+ public String staging = null;
+ public String tableDir = null;
+ public String newTableDir = null;
+ public String targetTableDir = null;
+ private Table table = null;
+
+ private final String ns = "testns";
+ private final String backupNs = "backupns";
+
+ @Before
+ public void setupTableLocation() throws Exception {
+ this.tableLocation = temp.newFolder().toURI().toString();
+ this.staging = temp.newFolder("staging").toURI().toString();
+ this.tableDir = temp.newFolder("table").toURI().toString();
+ this.newTableDir = temp.newFolder("newTable").toURI().toString();
+ this.targetTableDir = temp.newFolder("targetTable").toURI().toString();
+ this.table = createATableWith2Snapshots(tableLocation);
+ createNameSpaces();
+ }
+
+ @After
+ public void cleanupTableSetup() throws Exception {
+ dropNameSpaces();
+ }
+
+ private Table createATableWith2Snapshots(String location) {
+ return createTableWithSnapshots(location, 2);
+ }
+
+ private Table createTableWithSnapshots(String location, int snapshotNumber) {
+ return createTableWithSnapshots(location, snapshotNumber,
Maps.newHashMap());
+ }
+
+ protected Table createTableWithSnapshots(
+ String location, int snapshotNumber, Map<String, String> properties) {
+ Table newTable = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(),
properties, location);
+
+ List<ThreeColumnRecord> records =
+ Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"));
+
+ Dataset<Row> df = spark.createDataFrame(records,
ThreeColumnRecord.class).coalesce(1);
+
+ for (int i = 0; i < snapshotNumber; i++) {
+ df.select("c1", "c2",
"c3").write().format("iceberg").mode("append").save(location);
+ }
+
+ return newTable;
+ }
+
+ private void createNameSpaces() {
+ sql("CREATE DATABASE IF NOT EXISTS %s", ns);
+ sql("CREATE DATABASE IF NOT EXISTS %s", backupNs);
+ }
+
+ private void dropNameSpaces() {
+ sql("DROP DATABASE IF EXISTS %s CASCADE", ns);
+ sql("DROP DATABASE IF EXISTS %s CASCADE", backupNs);
+ }
+
+ @Test
+ public void testRewritePath() throws Exception {
+ String targetTableLocation = targetTableLocation();
+
+ // check the data file location before the rebuild
+ List<String> validDataFiles =
+ spark
+ .read()
+ .format("iceberg")
+ .load(tableLocation + "#files")
+ .select("file_path")
+ .as(Encoders.STRING())
+ .collectAsList();
+ assertThat(validDataFiles.size()).isEqualTo(2);
+
+ RewriteTablePath.Result result =
+ actions()
+ .rewriteTablePath(table)
+ .rewriteLocationPrefix(tableLocation, targetTableLocation)
+ .endVersion("v3.metadata.json")
+ .execute();
+
+ assertThat(result.latestVersion()).isEqualTo("v3.metadata.json");
+
+ checkFileNum(3, 2, 2, 9, result);
+
+ // copy the metadata files and data files
+ copyTableFiles(result);
+
+ // verify the data file path after the rebuild
+ List<String> validDataFilesAfterRebuilt =
+ spark
+ .read()
+ .format("iceberg")
+ .load(targetTableLocation + "#files")
+ .select("file_path")
+ .as(Encoders.STRING())
+ .collectAsList();
+ assertThat(validDataFilesAfterRebuilt)
+ .hasSize(2)
+ .allMatch(item -> item.startsWith(targetTableLocation));
+
+ // verify data rows
+ List<Object[]> actual = rows(targetTableLocation);
+ List<Object[]> expected = rows(tableLocation);
+ assertEquals("Rows should match after copy", expected, actual);
+ }
+
+ @Test
+ public void testSameLocations() {
+ assertThatThrownBy(
+ () ->
+ actions()
+ .rewriteTablePath(table)
+ .rewriteLocationPrefix(tableLocation, tableLocation)
+ .endVersion("v1.metadata.json")
+ .execute())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Source prefix cannot be the same as target
prefix");
+ }
+
+ @Test
+ public void testStartVersion() throws Exception {
+ RewriteTablePath.Result result =
+ actions()
+ .rewriteTablePath(table)
+ .rewriteLocationPrefix(tableLocation, targetTableLocation())
+ .startVersion("v2.metadata.json")
+ .execute();
+
+ checkFileNum(1, 1, 1, 4, result);
+
+ List<Tuple2<String, String>> paths =
readPathPairList(result.fileListLocation());
+
+ String currentSnapshotId =
String.valueOf(table.currentSnapshot().snapshotId());
+ assertThat(paths.stream().filter(c ->
c._2().contains(currentSnapshotId)).count())
+ .withFailMessage("Should have the current snapshot file")
+ .isEqualTo(1);
+
+ String parentSnapshotId =
String.valueOf(table.currentSnapshot().parentId());
+ assertThat(paths.stream().filter(c ->
c._2().contains(parentSnapshotId)).count())
+ .withFailMessage("Should NOT have the parent snapshot file")
+ .isEqualTo(0);
+ }
+
+ @Test
+ public void testTableWith3Snapshots() throws Exception {
+ String location = newTableLocation();
+ Table tableWith3Snaps = createTableWithSnapshots(location, 3);
+ RewriteTablePath.Result result =
+ actions()
+ .rewriteTablePath(tableWith3Snaps)
+ .rewriteLocationPrefix(location,
temp.newFolder().toURI().toString())
+ .startVersion("v2.metadata.json")
+ .execute();
+
+ checkFileNum(2, 2, 2, 8, result);
+
+ // start from the first version
+ RewriteTablePath.Result result1 =
+ actions()
+ .rewriteTablePath(tableWith3Snaps)
+ .rewriteLocationPrefix(location,
temp.newFolder().toURI().toString())
+ .startVersion("v1.metadata.json")
+ .execute();
+
+ checkFileNum(3, 3, 3, 12, result1);
+ }
+
+ @Test
+ public void testFullTableRewritePath() throws Exception {
+ RewriteTablePath.Result result =
+ actions()
+ .rewriteTablePath(table)
+ .rewriteLocationPrefix(tableLocation, targetTableLocation())
+ .execute();
+
+ checkFileNum(3, 2, 2, 9, result);
+ }
+
+ @Test
+ public void testDeleteDataFile() throws Exception {
+ List<String> validDataFiles =
+ spark
+ .read()
+ .format("iceberg")
+ .load(table.location() + "#files")
+ .select("file_path")
+ .as(Encoders.STRING())
+ .collectAsList();
+
+
table.newDelete().deleteFile(validDataFiles.stream().findFirst().get()).commit();
+
+ RewriteTablePath.Result result =
+ actions()
+ .rewriteTablePath(table)
+ .rewriteLocationPrefix(table.location(), targetTableLocation())
+ .stagingLocation(stagingLocation())
+ .execute();
+
+ checkFileNum(4, 3, 3, 12, result);
+
+ // copy the metadata files and data files
+ copyTableFiles(result);
+
+ // verify data rows
+ Dataset<Row> resultDF =
spark.read().format("iceberg").load(targetTableLocation());
+ assertThat(resultDF.as(Encoders.bean(ThreeColumnRecord.class)).count())
+ .withFailMessage("There are only one row left since we deleted a data
file")
+ .isEqualTo(1);
+ }
+
+ @Test
+ public void testPositionDeletes() throws Exception {
+ List<Pair<CharSequence, Long>> deletes =
+ Lists.newArrayList(
+ Pair.of(
+
table.currentSnapshot().addedDataFiles(table.io()).iterator().next().location(),
+ 0L));
+
+ File file = new File(removePrefix(table.location() +
"/data/deeply/nested/deletes.parquet"));
+ DeleteFile positionDeletes =
+ FileHelpers.writeDeleteFile(
+ table, table.io().newOutputFile(file.toURI().toString()),
deletes)
+ .first();
+
+ table.newRowDelta().addDeletes(positionDeletes).commit();
+
+
assertThat(spark.read().format("iceberg").load(table.location()).count()).isEqualTo(1);
+
+ RewriteTablePath.Result result =
+ actions()
+ .rewriteTablePath(table)
+ .stagingLocation(stagingLocation())
+ .rewriteLocationPrefix(table.location(), targetTableLocation())
+ .execute();
+
+ // We have one more snapshot, an additional manifest list, and a new
(delete) manifest,
+ // and an additional position delete
+ checkFileNum(4, 3, 3, 13, result);
+
+ // copy the metadata files and data files
+ copyTableFiles(result);
+
+ // Positional delete affects a single row, so only one row must remain
+
assertThat(spark.read().format("iceberg").load(targetTableLocation()).count()).isEqualTo(1);
+ }
+
+ @Test
+ public void testPositionDeleteWithRow() throws Exception {
+ String dataFileLocation =
+
table.currentSnapshot().addedDataFiles(table.io()).iterator().next().location();
+ List<PositionDelete<?>> deletes = Lists.newArrayList();
+ OutputFile deleteFile =
+ table
+ .io()
+ .newOutputFile(
+ new File(removePrefix(table.location() +
"/data/deeply/nested/deletes.parquet"))
+ .toURI()
+ .toString());
+ deletes.add(positionDelete(SCHEMA, dataFileLocation, 0L, 1, "AAAAAAAAAA",
"AAAA"));
+ DeleteFile positionDeletes = FileHelpers.writePosDeleteFile(table,
deleteFile, null, deletes);
+ table.newRowDelta().addDeletes(positionDeletes).commit();
+
+
assertThat(spark.read().format("iceberg").load(table.location()).count()).isEqualTo(1);
+
+ RewriteTablePath.Result result =
+ actions()
+ .rewriteTablePath(table)
+ .stagingLocation(stagingLocation())
+ .rewriteLocationPrefix(table.location(), targetTableLocation())
+ .execute();
+
+ // We have one more snapshot, an additional manifest list, and a new
(delete) manifest,
+ // and an additional position delete
+ checkFileNum(4, 3, 3, 13, result);
+
+ // copy the metadata files and data files
+ copyTableFiles(result);
+
+ // check copied position delete row
+ Object[] deletedRow = (Object[]) rows(targetTableLocation() +
"#position_deletes").get(0)[2];
+ assertEquals(
+ "Position deletes should be equal", new Object[] {1, "AAAAAAAAAA",
"AAAA"}, deletedRow);
+
+ // Positional delete affects a single row, so only one row must remain
+
assertThat(spark.read().format("iceberg").load(targetTableLocation()).count()).isEqualTo(1);
+ }
+
+ @Test
+ public void testPositionDeletesAcrossFiles() throws Exception {
+ Stream<DataFile> allFiles =
+ StreamSupport.stream(table.snapshots().spliterator(), false)
+ .flatMap(s ->
StreamSupport.stream(s.addedDataFiles(table.io()).spliterator(), false));
+ List<Pair<CharSequence, Long>> deletes =
+ allFiles.map(f -> Pair.of((CharSequence) f.location(),
0L)).collect(Collectors.toList());
+
+ // a single position delete with two entries
+ assertThat(deletes.size()).isEqualTo(2);
+
+ File file = new File(removePrefix(table.location() +
"/data/deeply/nested/file.parquet"));
+ DeleteFile positionDeletes =
+ FileHelpers.writeDeleteFile(
+ table, table.io().newOutputFile(file.toURI().toString()),
deletes)
+ .first();
+
+ table.newRowDelta().addDeletes(positionDeletes).commit();
+
+
assertThat(spark.read().format("iceberg").load(table.location()).count()).isEqualTo(0);
+
+ RewriteTablePath.Result result =
+ actions()
+ .rewriteTablePath(table)
+ .stagingLocation(stagingLocation())
+ .rewriteLocationPrefix(table.location(), targetTableLocation())
+ .execute();
+
+ // We have one more snapshot, an additional manifest list, and a new
(delete) manifest,
+ // and an additional position delete
+ checkFileNum(4, 3, 3, 13, result);
+
+ // copy the metadata files and data files
+ copyTableFiles(result);
+
+
assertThat(spark.read().format("iceberg").load(targetTableLocation()).count()).isEqualTo(0);
+ }
+
+ @Test
+ public void testEqualityDeletes() throws Exception {
+ Table sourceTable = createTableWithSnapshots(newTableLocation(), 1);
+
+ // Add more varied data
+ List<ThreeColumnRecord> records =
+ Lists.newArrayList(
+ new ThreeColumnRecord(2, "AAAAAAAAAA", "AAAA"),
+ new ThreeColumnRecord(3, "BBBBBBBBBB", "BBBB"),
+ new ThreeColumnRecord(4, "CCCCCCCCCC", "CCCC"),
+ new ThreeColumnRecord(5, "DDDDDDDDDD", "DDDD"));
+ spark
+ .createDataFrame(records, ThreeColumnRecord.class)
+ .coalesce(1)
+ .select("c1", "c2", "c3")
+ .write()
+ .format("iceberg")
+ .mode("append")
+ .save(newTableLocation());
+
+ Schema deleteRowSchema = sourceTable.schema().select("c2");
+ Record dataDelete = GenericRecord.create(deleteRowSchema);
+ List<Record> dataDeletes =
+ Lists.newArrayList(
+ dataDelete.copy("c2", "AAAAAAAAAA"), dataDelete.copy("c2",
"CCCCCCCCCC"));
+ File file = new File(removePrefix(sourceTable.location()) +
"/data/deeply/nested/file.parquet");
+ DeleteFile equalityDeletes =
+ FileHelpers.writeDeleteFile(
+ sourceTable,
+ sourceTable.io().newOutputFile(file.toURI().toString()),
+ TestHelpers.Row.of(0),
+ dataDeletes,
+ deleteRowSchema);
+ sourceTable.newRowDelta().addDeletes(equalityDeletes).commit();
+
+ RewriteTablePath.Result result =
+ actions()
+ .rewriteTablePath(sourceTable)
+ .stagingLocation(stagingLocation())
+ .rewriteLocationPrefix(newTableLocation(), targetTableLocation())
+ .execute();
+
+ // We have four metadata files: for the table creation, for the initial
snapshot, for the
+ // second append here, and for commit with equality deletes. Thus, we have
three manifest lists.
+ // We have a data file for each snapshot (two with data, one with equality
deletes)
+ checkFileNum(4, 3, 3, 13, result);
+
+ // copy the metadata files and data files
+ copyTableFiles(result);
+
+ // Equality deletes affect three rows, so just two rows must remain
+
assertThat(spark.read().format("iceberg").load(targetTableLocation()).count()).isEqualTo(2);
+ }
+
+ @Test
+ public void testFullTableRewritePathWithDeletedVersionFiles() throws
Exception {
+ String location = newTableLocation();
+ Table sourceTable = createTableWithSnapshots(location, 2);
+ // expire the first snapshot
+ Table staticTable = newStaticTable(location + "metadata/v2.metadata.json",
table.io());
+ actions()
+ .expireSnapshots(sourceTable)
+ .expireSnapshotId(staticTable.currentSnapshot().snapshotId())
+ .execute();
+
+ // create 100 more snapshots
+ List<ThreeColumnRecord> records =
+ Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"));
+ Dataset<Row> df = spark.createDataFrame(records,
ThreeColumnRecord.class).coalesce(1);
+ for (int i = 0; i < 100; i++) {
+ df.select("c1", "c2",
"c3").write().format("iceberg").mode("append").save(location);
+ }
+ sourceTable.refresh();
+
+ // v1/v2/v3.metadata.json has been deleted in v104.metadata.json, and
there is no way to find
+ // the first snapshot
+ // from the version file history
+ RewriteTablePath.Result result =
+ actions()
+ .rewriteTablePath(sourceTable)
+ .stagingLocation(stagingLocation())
+ .rewriteLocationPrefix(location, targetTableLocation())
+ .execute();
+
+ checkFileNum(101, 101, 101, 406, result);
+ }
+
+ @Test
+ public void testRewritePathWithoutSnapshot() throws Exception {
+ RewriteTablePath.Result result =
+ actions()
+ .rewriteTablePath(table)
+ .rewriteLocationPrefix(tableLocation, newTableLocation())
+ .endVersion("v1.metadata.json")
+ .execute();
+
+ // the only rebuilt file is v1.metadata.json since it contains no snapshot
+ checkFileNum(1, 0, 0, 1, result);
+ }
+
+ @Test
+ public void testExpireSnapshotBeforeRewrite() throws Exception {
+ // expire one snapshot
+
actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute();
+
+ RewriteTablePath.Result result =
+ actions()
+ .rewriteTablePath(table)
+ .stagingLocation(stagingLocation())
+ .rewriteLocationPrefix(table.location(), targetTableLocation())
+ .execute();
+
+ checkFileNum(4, 1, 2, 9, result);
+ }
+
+ @Test
+ public void testStartSnapshotWithoutValidSnapshot() throws Exception {
+ // expire one snapshot
+
actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute();
+
+ assertThat(((List) table.snapshots()).size())
+ .withFailMessage("1 out 2 snapshot has been removed")
+ .isEqualTo(1);
+
+ RewriteTablePath.Result result =
+ actions()
+ .rewriteTablePath(table)
+ .rewriteLocationPrefix(table.location(), targetTableLocation())
+ .stagingLocation(stagingLocation())
+ .startVersion("v2.metadata.json")
+ .execute();
+
+ // 2 metadata.json, 1 manifest list file, 1 manifest files
+ checkFileNum(2, 1, 1, 5, result);
+ }
+
+ @Test
+ public void testMoveTheVersionExpireSnapshot() throws Exception {
+ // expire one snapshot
+
actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute();
+
+ // only move version v4, which is the version generated by snapshot
expiration
+ RewriteTablePath.Result result =
+ actions()
+ .rewriteTablePath(table)
+ .rewriteLocationPrefix(table.location(), targetTableLocation())
+ .stagingLocation(stagingLocation())
+ .startVersion("v3.metadata.json")
+ .execute();
+
+ // only v4.metadata.json needs to move
+ checkFileNum(1, 0, 0, 1, result);
+ }
+
+ @Test
+ public void testMoveVersionWithInvalidSnapshots() throws Exception {
+ // expire one snapshot
+
actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute();
+
+ assertThatThrownBy(
+ () ->
+ actions()
+ .rewriteTablePath(table)
+ .rewriteLocationPrefix(table.location(),
newTableLocation())
+ .stagingLocation(stagingLocation())
+ .endVersion("v3.metadata.json")
+ .execute())
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessageContaining(
+ "Unable to build the manifest files dataframe. The end version in
use may contain invalid snapshots. "
+ + "Please choose an earlier version without invalid
snapshots.");
+ }
+
+ @Test
+ public void testRollBack() throws Exception {
+ long secondSnapshotId = table.currentSnapshot().snapshotId();
+
+ // roll back to the first snapshot(v2)
+
table.manageSnapshots().setCurrentSnapshot(table.currentSnapshot().parentId()).commit();
+
+ // add a new snapshot
+ List<ThreeColumnRecord> records =
+ Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"));
+ Dataset<Row> df = spark.createDataFrame(records,
ThreeColumnRecord.class).coalesce(1);
+ df.select("c1", "c2",
"c3").write().format("iceberg").mode("append").save(table.location());
+
+ table.refresh();
+
+ // roll back to the second snapshot(v3)
+ table.manageSnapshots().setCurrentSnapshot(secondSnapshotId).commit();
+
+ RewriteTablePath.Result result =
+ actions()
+ .rewriteTablePath(table)
+ .rewriteLocationPrefix(table.location(), newTableLocation())
+ .stagingLocation(stagingLocation())
+ .execute();
+ checkFileNum(6, 3, 3, 15, result);
+ }
+
+ @Test
+ public void testWriteAuditPublish() throws Exception {
+ // enable WAP
+ table.updateProperties().set(TableProperties.WRITE_AUDIT_PUBLISH_ENABLED,
"true").commit();
+ spark.conf().set("spark.wap.id", "1");
+
+ // add a new snapshot without changing the current snapshot of the table
+ List<ThreeColumnRecord> records =
+ Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"));
+ Dataset<Row> df = spark.createDataFrame(records,
ThreeColumnRecord.class).coalesce(1);
+ df.select("c1", "c2",
"c3").write().format("iceberg").mode("append").save(table.location());
+
+ table.refresh();
+
+ RewriteTablePath.Result result =
+ actions()
+ .rewriteTablePath(table)
+ .rewriteLocationPrefix(table.location(), newTableLocation())
+ .stagingLocation(stagingLocation())
+ .execute();
+
+ // There are 3 snapshots in total, although the current snapshot is the
second one.
+ checkFileNum(5, 3, 3, 14, result);
+ }
+
+ @Test
+ public void testSchemaChange() throws Exception {
+ // change the schema
+ table.updateSchema().addColumn("c4", Types.StringType.get()).commit();
+
+ // copy table
+ RewriteTablePath.Result result =
+ actions()
+ .rewriteTablePath(table)
+ .rewriteLocationPrefix(table.location(), newTableLocation())
+ .stagingLocation(stagingLocation())
+ .execute();
+
+ // check the result
+ checkFileNum(4, 2, 2, 10, result);
+ }
+
+ @Test
+ public void testSnapshotIdInheritanceEnabled() throws Exception {
+ String sourceTableLocation = newTableLocation();
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, "true");
+
+ Table sourceTable = createTableWithSnapshots(sourceTableLocation, 2,
properties);
+
+ RewriteTablePath.Result result =
+ actions()
+ .rewriteTablePath(sourceTable)
+ .stagingLocation(stagingLocation())
+ .rewriteLocationPrefix(sourceTableLocation, targetTableLocation())
+ .execute();
+
+ checkFileNum(3, 2, 2, 9, result);
+ }
+
+ @Test
+ public void testMetadataCompression() throws Exception {
+ String sourceTableLocation = newTableLocation();
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(TableProperties.METADATA_COMPRESSION, "gzip");
+ Table sourceTable = createTableWithSnapshots(sourceTableLocation, 2,
properties);
+
+ RewriteTablePath.Result result =
+ actions()
+ .rewriteTablePath(sourceTable)
+ .rewriteLocationPrefix(sourceTableLocation, targetTableLocation())
+ .endVersion("v2.gz.metadata.json")
+ .execute();
+
+ checkFileNum(2, 1, 1, 5, result);
+
+ result =
+ actions()
+ .rewriteTablePath(sourceTable)
+ .rewriteLocationPrefix(sourceTableLocation, targetTableLocation())
+ .startVersion("v1.gz.metadata.json")
+ .execute();
+
+ checkFileNum(2, 2, 2, 8, result);
+ }
+
+ @Test
+ public void testInvalidArgs() {
+ RewriteTablePath actions = actions().rewriteTablePath(table);
+
+ assertThatThrownBy(() -> actions.rewriteLocationPrefix("", null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Source prefix('') cannot be empty");
+
+ assertThatThrownBy(() -> actions.rewriteLocationPrefix(null, null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Source prefix('null') cannot be empty");
+
+ assertThatThrownBy(() -> actions.stagingLocation(""))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Staging location('') cannot be empty");
+
+ assertThatThrownBy(() -> actions.stagingLocation(null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Staging location('null') cannot be empty");
+
+ assertThatThrownBy(() -> actions.startVersion(null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Start version('null') cannot be empty");
+
+ assertThatThrownBy(() -> actions.endVersion(" "))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("End version(' ') cannot be empty");
+
+ assertThatThrownBy(() -> actions.endVersion(null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("End version('null') cannot be empty");
+ }
+
+ @Test
+ public void testStatisticFile() throws IOException {
+ String sourceTableLocation = newTableLocation();
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("format-version", "2");
+ String tableName = "v2tblwithstats";
+ Table sourceTable =
+ createMetastoreTable(sourceTableLocation, properties, "default",
tableName, 0);
+
+ TableMetadata metadata = currentMetadata(sourceTable);
+ TableMetadata withStatistics =
+ TableMetadata.buildFrom(metadata)
+ .setStatistics(
+ 43,
+ new GenericStatisticsFile(
+ 43, "/some/path/to/stats/file", 128, 27,
ImmutableList.of()))
+ .build();
+
+ OutputFile file =
sourceTable.io().newOutputFile(metadata.metadataFileLocation());
+ TableMetadataParser.overwrite(withStatistics, file);
+
+ assertThatThrownBy(
+ () ->
+ actions()
+ .rewriteTablePath(sourceTable)
+ .rewriteLocationPrefix(sourceTableLocation,
targetTableLocation())
+ .execute())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Statistic files are not supported yet");
+ }
+
+ @Test
+ public void testMetadataCompressionWithMetastoreTable() throws Exception {
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(TableProperties.METADATA_COMPRESSION, "gzip");
+ Table sourceTable =
+ createMetastoreTable(
+ newTableLocation(), properties, "default",
"testMetadataCompression", 2);
+
+ TableMetadata currentMetadata = currentMetadata(sourceTable);
+
+ // set the second version as the endVersion
+ String endVersion =
fileName(currentMetadata.previousFiles().get(1).file());
+ RewriteTablePath.Result result =
+ actions()
+ .rewriteTablePath(sourceTable)
+ .rewriteLocationPrefix(newTableLocation(), targetTableLocation())
+ .endVersion(endVersion)
+ .execute();
+
+ checkFileNum(2, 1, 1, 5, result);
+
+ // set the first version as the lastCopiedVersion
+ String firstVersion =
fileName(currentMetadata.previousFiles().get(0).file());
+ result =
+ actions()
+ .rewriteTablePath(sourceTable)
+ .rewriteLocationPrefix(newTableLocation(), targetTableLocation())
+ .startVersion(firstVersion)
+ .execute();
+
+ checkFileNum(2, 2, 2, 8, result);
+ }
+
+ // Metastore table tests
+ @Test
+ public void testMetadataLocationChange() throws Exception {
+ Table sourceTable =
+ createMetastoreTable(newTableLocation(), Maps.newHashMap(), "default",
"tbl", 1);
+ String metadataFilePath =
currentMetadata(sourceTable).metadataFileLocation();
+
+ String newMetadataDir = "new-metadata-dir";
+ sourceTable
+ .updateProperties()
+ .set(TableProperties.WRITE_METADATA_LOCATION, newTableLocation() +
newMetadataDir)
+ .commit();
+
+ spark.sql("insert into hive.default.tbl values (1, 'AAAAAAAAAA', 'AAAA')");
+ sourceTable.refresh();
+
+ // copy table
+ RewriteTablePath.Result result =
+ actions()
+ .rewriteTablePath(sourceTable)
+ .rewriteLocationPrefix(newTableLocation(), targetTableLocation())
+ .execute();
+
+ checkFileNum(4, 2, 2, 10, result);
+
+ // pick up a version from the old metadata dir as the end version
+ RewriteTablePath.Result result1 =
+ actions()
+ .rewriteTablePath(sourceTable)
+ .rewriteLocationPrefix(newTableLocation(), targetTableLocation())
+ .endVersion(fileName(metadataFilePath))
+ .execute();
+
+ checkFileNum(2, 1, 1, 5, result1);
+
+ // pick up a version from the old metadata dir as the last copied version
+ RewriteTablePath.Result result2 =
+ actions()
+ .rewriteTablePath(sourceTable)
+ .rewriteLocationPrefix(newTableLocation(), targetTableLocation())
+ .startVersion(fileName(metadataFilePath))
+ .execute();
+
+ checkFileNum(2, 1, 1, 5, result2);
+ }
+
+ @Test
+ public void testDeleteFrom() throws Exception {
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("format-version", "2");
+ properties.put("write.delete.mode", "merge-on-read");
+ String tableName = "v2tbl";
+ Table sourceTable =
+ createMetastoreTable(newTableLocation(), properties, "default",
tableName, 0);
+ // ingest data
+ List<ThreeColumnRecord> records =
+ Lists.newArrayList(
+ new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"),
+ new ThreeColumnRecord(2, "AAAAAAAAAA", "AAAA"),
+ new ThreeColumnRecord(3, "AAAAAAAAAA", "AAAA"));
+
+ Dataset<Row> df = spark.createDataFrame(records,
ThreeColumnRecord.class).coalesce(1);
+
+ df.select("c1", "c2", "c3")
+ .write()
+ .format("iceberg")
+ .mode("append")
+ .saveAsTable("hive.default." + tableName);
+ sourceTable.refresh();
+
+ // generate position delete files
+ spark.sql(String.format("delete from hive.default.%s where c1 = 1",
tableName));
+ sourceTable.refresh();
+
+ List<Object[]> originalData =
+ rowsToJava(
+ spark
+ .read()
+ .format("iceberg")
+ .load("hive.default." + tableName)
+ .sort("c1", "c2", "c3")
+ .collectAsList());
+ // two rows
+ assertThat(originalData.size()).isEqualTo(2);
+
+ // copy table and check the results
+ RewriteTablePath.Result result =
+ actions()
+ .rewriteTablePath(sourceTable)
+ .rewriteLocationPrefix(newTableLocation(), targetTableLocation())
+ .execute();
+
+ checkFileNum(3, 2, 2, 9, result);
+ // one data and one metadata file
+ copyTableFiles(result);
+
+ // register table
+ String metadataLocation =
currentMetadata(sourceTable).metadataFileLocation();
+ String versionFile = fileName(metadataLocation);
+ String targetTableName = "copiedV2Table";
+ TableIdentifier tableIdentifier = TableIdentifier.of("default",
targetTableName);
+ catalog.registerTable(tableIdentifier, targetTableLocation() +
"/metadata/" + versionFile);
+
+ List<Object[]> copiedData =
+ rowsToJava(
+ spark
+ .read()
+ .format("iceberg")
+ .load("hive.default." + targetTableName)
+ .sort("c1", "c2", "c3")
+ .collectAsList());
+
+ assertEquals("Rows must match", originalData, copiedData);
+ }
+
+ @Test
+ public void testKryoDeserializeBroadcastValues() {
+ sparkContext.getConf().set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer");
+ RewriteTablePathSparkAction action =
+ (RewriteTablePathSparkAction) actions().rewriteTablePath(table);
+ Broadcast<Table> tableBroadcast = action.tableBroadcast();
+ // force deserializing broadcast values
+ removeBroadcastValuesFromLocalBlockManager(tableBroadcast.id());
+ assertThat(tableBroadcast.getValue().uuid()).isEqualTo(table.uuid());
+ }
+
+ protected void checkFileNum(
+ int versionFileCount,
+ int manifestListCount,
+ int manifestFileCount,
+ int totalCount,
+ RewriteTablePath.Result result) {
+ List<String> filesToMove =
+ spark
+ .read()
+ .format("text")
+ .load(result.fileListLocation())
+ .as(Encoders.STRING())
+ .collectAsList();
+ assertThat(filesToMove.stream().filter(f ->
f.endsWith(".metadata.json")).count())
+ .withFailMessage("Wrong rebuilt version file count")
+ .isEqualTo(versionFileCount);
+ assertThat(filesToMove.stream().filter(f -> f.contains("snap-")).count())
+ .withFailMessage("Wrong rebuilt Manifest list file count")
+ .isEqualTo(manifestListCount);
+ assertThat(filesToMove.stream().filter(f ->
f.endsWith("-m0.avro")).count())
+ .withFailMessage("Wrong rebuilt Manifest file file count")
+ .isEqualTo(manifestFileCount);
+ assertThat(filesToMove.size()).withFailMessage("Wrong total file
count").isEqualTo(totalCount);
+ }
+
+ protected String newTableLocation() throws IOException {
+ return newTableDir;
+ }
+
+ protected String targetTableLocation() throws IOException {
+ return targetTableDir;
+ }
+
+ protected String stagingLocation() throws IOException {
+ return staging;
+ }
+
+ private void copyTableFiles(RewriteTablePath.Result result) throws Exception
{
+ List<Tuple2<String, String>> filesToMove =
readPathPairList(result.fileListLocation());
+
+ for (Tuple2<String, String> pathPair : filesToMove) {
+ FileUtils.copyFile(new File(URI.create(pathPair._1())), new
File(URI.create(pathPair._2())));
+ }
+ }
+
+ private String removePrefix(String path) {
+ return path.substring(path.lastIndexOf(":") + 1);
+ }
+
+ protected Table newStaticTable(String metadataFileLocation, FileIO io) {
+ StaticTableOperations ops = new
StaticTableOperations(metadataFileLocation, io);
+ return new BaseTable(ops, metadataFileLocation);
+ }
+
+ private List<Tuple2<String, String>> readPathPairList(String path) {
+ Encoder<Tuple2<String, String>> encoder =
Encoders.tuple(Encoders.STRING(), Encoders.STRING());
+ return spark
+ .read()
+ .format("csv")
+ .schema(encoder.schema())
+ .load(path)
+ .as(encoder)
+ .collectAsList();
+ }
+
+ private Table createMetastoreTable(
+ String location,
+ Map<String, String> properties,
+ String namespace,
+ String tableName,
+ int snapshotNumber) {
+ spark.conf().set("spark.sql.catalog.hive", SparkCatalog.class.getName());
+ spark.conf().set("spark.sql.catalog.hive.type", "hive");
+ spark.conf().set("spark.sql.catalog.hive.default-namespace", "default");
+ spark.conf().set("spark.sql.catalog.hive.cache-enabled", "false");
+
+ StringBuilder propertiesStr = new StringBuilder();
+ properties.forEach((k, v) -> propertiesStr.append("'" + k + "'='" + v +
"',"));
+ String tblProperties =
+ propertiesStr.substring(0, propertiesStr.length() > 0 ?
propertiesStr.length() - 1 : 0);
+
+ sql("DROP TABLE IF EXISTS hive.%s.%s", namespace, tableName);
+ if (tblProperties.isEmpty()) {
+ String sqlStr =
+ String.format(
+ "CREATE TABLE hive.%s.%s (c1 bigint, c2 string, c3 string)",
namespace, tableName);
+ if (!location.isEmpty()) {
+ sqlStr = String.format("%s USING iceberg LOCATION '%s'", sqlStr,
location);
+ }
+ sql(sqlStr);
+ } else {
+ String sqlStr =
+ String.format(
+ "CREATE TABLE hive.%s.%s (c1 bigint, c2 string, c3 string)",
namespace, tableName);
+ if (!location.isEmpty()) {
+ sqlStr = String.format("%s USING iceberg LOCATION '%s'", sqlStr,
location);
+ }
+
+ sqlStr = String.format("%s TBLPROPERTIES (%s)", sqlStr, tblProperties);
+ sql(sqlStr);
+ }
+
+ for (int i = 0; i < snapshotNumber; i++) {
+ sql("insert into hive.%s.%s values (%s, 'AAAAAAAAAA', 'AAAA')",
namespace, tableName, i);
+ }
+ return catalog.loadTable(TableIdentifier.of(namespace, tableName));
+ }
+
+ private static String fileName(String path) {
+ String filename = path;
+ int lastIndex = path.lastIndexOf(File.separator);
+ if (lastIndex != -1) {
+ filename = path.substring(lastIndex + 1);
+ }
+ return filename;
+ }
+
+ private TableMetadata currentMetadata(Table tbl) {
+ return ((HasTableOperations) tbl).operations().current();
+ }
+
+ private List<Object[]> rows(String location) {
+ return
rowsToJava(spark.read().format("iceberg").load(location).collectAsList());
+ }
+
+ private PositionDelete<GenericRecord> positionDelete(
+ Schema tableSchema, CharSequence path, Long position, Object... values) {
+ PositionDelete<GenericRecord> posDelete = PositionDelete.create();
+ GenericRecord nested = GenericRecord.create(tableSchema);
+ for (int i = 0; i < values.length; i++) {
+ nested.set(i, values[i]);
+ }
+ posDelete.set(path, position, nested);
+ return posDelete;
+ }
+
+ private void removeBroadcastValuesFromLocalBlockManager(long id) {
+ BlockId blockId = new BroadcastBlockId(id, "");
+ SparkEnv env = SparkEnv.get();
+ env.broadcastManager().cachedValues().clear();
+ BlockManager blockManager = env.blockManager();
+ BlockInfoManager blockInfoManager = blockManager.blockInfoManager();
+ blockInfoManager.lockForWriting(blockId, true);
+ blockInfoManager.removeBlock(blockId);
+ blockManager.memoryStore().remove(blockId);
+ }
+}