This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 15485f5523 Spark: Backport adding branch support to rewrite_data_files
procedure (#15067)
15485f5523 is described below
commit 15485f5523d08aae2a503c143c51b6df2debb655
Author: Harsh Sharma <[email protected]>
AuthorDate: Wed Jan 21 13:31:01 2026 +0530
Spark: Backport adding branch support to rewrite_data_files procedure
(#15067)
backports #14964
---
.../extensions/TestRewriteDataFilesProcedure.java | 207 +++++++++++++++++++++
.../spark/actions/RewriteDataFilesSparkAction.java | 17 +-
.../procedures/RewriteDataFilesProcedure.java | 24 ++-
.../extensions/TestRewriteDataFilesProcedure.java | 206 ++++++++++++++++++++
.../spark/actions/RewriteDataFilesSparkAction.java | 17 +-
.../procedures/RewriteDataFilesProcedure.java | 24 ++-
.../extensions/TestRewriteDataFilesProcedure.java | 206 ++++++++++++++++++++
.../spark/actions/RewriteDataFilesSparkAction.java | 17 +-
.../procedures/RewriteDataFilesProcedure.java | 32 +++-
9 files changed, 727 insertions(+), 23 deletions(-)
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index d011ad0628..a2a693c56b 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -33,6 +33,7 @@ import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.EnvironmentContext;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.ValidationException;
@@ -45,6 +46,7 @@ import org.apache.iceberg.spark.ExtendedParser;
import org.apache.iceberg.spark.SparkCatalogConfig;
import org.apache.iceberg.spark.SparkTableCache;
import org.apache.iceberg.spark.SystemFunctionPushDownHelper;
+import org.apache.iceberg.spark.actions.SparkActions;
import org.apache.iceberg.spark.source.ThreeColumnRecord;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
@@ -1056,4 +1058,209 @@ public class TestRewriteDataFilesProcedure extends
ExtensionsTestBase {
private List<Object[]> currentData(String table) {
return rowsToJava(spark.sql("SELECT * FROM " + table + " order by c1, c2,
c3").collectAsList());
}
+
+ @TestTemplate
+ public void testRewriteDataFilesOnBranch() {
+ createTable();
+ insertData(10);
+
+ String branchName = "testBranch";
+ sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branchName);
+
+ List<Object[]> expectedRecords =
+ rowsToJava(
+ spark
+ .sql(
+ String.format(
+ "SELECT * FROM %s.branch_%s ORDER BY c1, c2, c3",
tableName, branchName))
+ .collectAsList());
+
+ // Get snapshot IDs before rewrite
+ Table table = validationCatalog.loadTable(tableIdent);
+ long mainSnapshotId = table.currentSnapshot().snapshotId();
+ long branchSnapshotId = table.refs().get(branchName).snapshotId();
+
+ // Call rewrite_data_files on the branch
+ List<Object[]> output =
+ sql(
+ "CALL %s.system.rewrite_data_files(table => '%s', branch => '%s')",
+ catalogName, tableName, branchName);
+
+ assertEquals(
+ "Action should rewrite 10 data files and add 1 data file",
+ row(10, 1),
+ Arrays.copyOf(output.get(0), 2));
+
+ // Verify branch data is preserved after compaction
+ List<Object[]> actualRecords =
+ rowsToJava(
+ spark
+ .sql(
+ String.format(
+ "SELECT * FROM %s.branch_%s ORDER BY c1, c2, c3",
tableName, branchName))
+ .collectAsList());
+ assertEquals("Data after compaction should not change", expectedRecords,
actualRecords);
+
+ // Verify branch snapshot changed
+ table.refresh();
+ assertThat(table.refs().get(branchName).snapshotId())
+ .as("Branch snapshot should be updated when files are rewritten")
+ .isNotEqualTo(branchSnapshotId);
+
+ // Verify main snapshot unchanged
+ assertThat(table.currentSnapshot().snapshotId())
+ .as("Main snapshot should remain unchanged")
+ .isEqualTo(mainSnapshotId);
+ }
+
+ @TestTemplate
+ public void testRewriteDataFilesToNullBranchFails() {
+ createTable();
+ insertData(10);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ assertThatThrownBy(() ->
SparkActions.get(spark).rewriteDataFiles(table).toBranch(null))
+ .as("Invalid branch")
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid branch name: null");
+ }
+
+ @TestTemplate
+ public void testRewriteDataFilesOnBranchWithFilter() {
+ createPartitionTable();
+ insertData(10);
+
+ String branchName = "filteredBranch";
+ sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branchName);
+
+ List<Object[]> expectedRecords =
+ rowsToJava(
+ spark
+ .sql(
+ String.format(
+ "SELECT * FROM %s.branch_%s ORDER BY c1, c2, c3",
tableName, branchName))
+ .collectAsList());
+
+ // Get snapshot IDs before rewrite
+ Table table = validationCatalog.loadTable(tableIdent);
+ long mainSnapshotId = table.currentSnapshot().snapshotId();
+ long branchSnapshotId = table.refs().get(branchName).snapshotId();
+
+ // Call rewrite_data_files on the branch with filter (select only
partition c2 = 'bar')
+ List<Object[]> output =
+ sql(
+ "CALL %s.system.rewrite_data_files(table => '%s', branch => '%s',
where => 'c2 = \"bar\"')",
+ catalogName, tableName, branchName);
+
+ assertEquals(
+ "Action should rewrite 5 data files from single matching partition"
+ + "(containing c2 = bar) and add 1 data file",
+ row(5, 1),
+ Arrays.copyOf(output.get(0), 2));
+
+ // Verify branch data is preserved after compaction
+ List<Object[]> actualRecords =
+ rowsToJava(
+ spark
+ .sql(
+ String.format(
+ "SELECT * FROM %s.branch_%s ORDER BY c1, c2, c3",
tableName, branchName))
+ .collectAsList());
+ assertEquals("Data after compaction should not change", expectedRecords,
actualRecords);
+
+ // Verify branch snapshot changed after rewrite
+ table.refresh();
+ assertThat(table.refs().get(branchName).snapshotId())
+ .as("Branch snapshot should be updated when files are rewritten")
+ .isNotEqualTo(branchSnapshotId);
+
+ // Verify main snapshot unchanged
+ assertThat(table.currentSnapshot().snapshotId())
+ .as("Main snapshot should remain unchanged")
+ .isEqualTo(mainSnapshotId);
+ }
+
+ @TestTemplate
+ public void testBranchCompactionDoesNotAffectMain() {
+ createTable();
+ // create 10 files under non-partitioned table
+ insertData(10);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ // Create branch from current main state
+ String branchName = "compactionBranch";
+ sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branchName);
+
+ // Add more data to MAIN to make it diverge from branch
+ insertData(tableName, 10);
+
+ // Refresh to get new main snapshot after divergence
+ table.refresh();
+ long mainSnapshotAfterDivergence = table.currentSnapshot().snapshotId();
+ List<Object[]> expectedMainRecords = currentData();
+
+ // Get branch data before compaction
+ List<Object[]> expectedBranchRecords =
+ rowsToJava(
+ spark
+ .sql(
+ String.format(
+ "SELECT * FROM %s.branch_%s ORDER BY c1, c2, c3",
tableName, branchName))
+ .collectAsList());
+
+ long branchSnapshotBeforeCompaction =
table.refs().get(branchName).snapshotId();
+
+ // Verify that branch and main have diverged
+ assertThat(branchSnapshotBeforeCompaction)
+ .as("Branch and main should have different snapshots")
+ .isNotEqualTo(mainSnapshotAfterDivergence);
+
+ List<Object[]> output =
+ sql(
+ "CALL %s.system.rewrite_data_files(table => '%s', branch => '%s')",
+ catalogName, tableName, branchName);
+
+ assertEquals(
+ "Action should rewrite 10 data files and add 1 data file",
+ row(10, 1),
+ Arrays.copyOf(output.get(0), 2));
+
+ table.refresh();
+
+ // Verify main snapshot unchanged
+ assertThat(table.currentSnapshot().snapshotId())
+ .as("Main snapshot ID must remain unchanged after branch compaction")
+ .isEqualTo(mainSnapshotAfterDivergence);
+
+ // Verify main data unchanged
+ List<Object[]> actualMainRecords = currentData();
+ assertEquals(
+ "Main data after compaction should not change", expectedMainRecords,
actualMainRecords);
+
+ // Verify branch data unchanged
+ List<Object[]> actualBranchRecords =
+ rowsToJava(
+ spark
+ .sql(
+ String.format(
+ "SELECT * FROM %s.branch_%s ORDER BY c1, c2, c3",
tableName, branchName))
+ .collectAsList());
+ assertEquals(
+ "Branch data after compaction should not change",
+ expectedBranchRecords,
+ actualBranchRecords);
+
+ // Verify branch snapshot changed
+ long branchSnapshotAfterCompaction =
table.refs().get(branchName).snapshotId();
+ assertThat(branchSnapshotAfterCompaction)
+ .as("Branch snapshot must be updated after compaction")
+ .isNotEqualTo(branchSnapshotBeforeCompaction);
+
+ // Verify the new branch snapshot is a child of the previous branch
snapshot
+ assertThat(table.snapshot(branchSnapshotAfterCompaction).parentId())
+ .as("New branch snapshot must be a child of the previous branch
snapshot")
+ .isEqualTo(branchSnapshotBeforeCompaction);
+ }
}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
index a42ce0ecf7..2f5cc25074 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
@@ -96,6 +97,7 @@ public class RewriteDataFilesSparkAction
private boolean removeDanglingDeletes;
private boolean useStartingSequenceNumber;
private boolean caseSensitive;
+ private String branch = SnapshotRef.MAIN_BRANCH;
private BinPackRewriteFilePlanner planner = null;
private FileRewriteRunner<FileGroupInfo, FileScanTask, DataFile,
RewriteFileGroup> runner = null;
@@ -157,13 +159,24 @@ public class RewriteDataFilesSparkAction
return this;
}
+ public RewriteDataFilesSparkAction toBranch(String targetBranch) {
+ Preconditions.checkArgument(targetBranch != null, "Invalid branch name:
null");
+ this.branch = targetBranch;
+ return this;
+ }
+
@Override
public RewriteDataFiles.Result execute() {
if (table.currentSnapshot() == null) {
return EMPTY_RESULT;
}
- long startingSnapshotId = table.currentSnapshot().snapshotId();
+ Preconditions.checkArgument(
+ table.snapshot(branch) != null,
+ "Cannot rewrite data files for branch %s: branch does not exist",
+ branch);
+
+ long startingSnapshotId = table.snapshot(branch).snapshotId();
init(startingSnapshotId);
@@ -230,7 +243,7 @@ public class RewriteDataFilesSparkAction
@VisibleForTesting
RewriteDataFilesCommitManager commitManager(long startingSnapshotId) {
return new RewriteDataFilesCommitManager(
- table, startingSnapshotId, useStartingSequenceNumber, commitSummary());
+ table, startingSnapshotId, useStartingSequenceNumber, commitSummary(),
branch);
}
private Builder doExecute(
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
index f1958a5a17..60ed7b84d4 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.spark.procedures;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.RewriteDataFiles;
@@ -30,6 +31,7 @@ import org.apache.iceberg.expressions.Zorder;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.ExtendedParser;
+import org.apache.iceberg.spark.actions.RewriteDataFilesSparkAction;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
@@ -57,10 +59,12 @@ class RewriteDataFilesProcedure extends BaseProcedure {
ProcedureParameter.optional("options", STRING_MAP);
private static final ProcedureParameter WHERE_PARAM =
ProcedureParameter.optional("where", DataTypes.StringType);
+ private static final ProcedureParameter BRANCH_PARAM =
+ ProcedureParameter.optional("branch", DataTypes.StringType);
private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
- TABLE_PARAM, STRATEGY_PARAM, SORT_ORDER_PARAM, OPTIONS_PARAM,
WHERE_PARAM
+ TABLE_PARAM, STRATEGY_PARAM, SORT_ORDER_PARAM, OPTIONS_PARAM,
WHERE_PARAM, BRANCH_PARAM
};
// counts are not nullable since the action result is never null
@@ -109,17 +113,29 @@ class RewriteDataFilesProcedure extends BaseProcedure {
String sortOrderString = input.asString(SORT_ORDER_PARAM, null);
Map<String, String> options = input.asStringMap(OPTIONS_PARAM,
ImmutableMap.of());
String where = input.asString(WHERE_PARAM, null);
+ String branchParam = input.asString(BRANCH_PARAM, null);
+ if (branchParam == null) {
+ branchParam = loadSparkTable(tableIdent).branch();
+ if (branchParam == null) {
+ branchParam = SnapshotRef.MAIN_BRANCH;
+ }
+ }
+ String branch = branchParam;
return modifyIcebergTable(
tableIdent,
table -> {
- RewriteDataFiles action =
actions().rewriteDataFiles(table).options(options);
+ RewriteDataFilesSparkAction action =
+ (RewriteDataFilesSparkAction)
+
actions().rewriteDataFiles(table).options(options).toBranch(branch);
if (strategy != null || sortOrderString != null) {
- action = checkAndApplyStrategy(action, strategy, sortOrderString,
table.schema());
+ action =
+ (RewriteDataFilesSparkAction)
+ checkAndApplyStrategy(action, strategy, sortOrderString,
table.schema());
}
- action = checkAndApplyFilter(action, where, tableIdent);
+ action = (RewriteDataFilesSparkAction) checkAndApplyFilter(action,
where, tableIdent);
RewriteDataFiles.Result result = action.execute();
diff --git
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index 5c3093e707..ee727bd139 100644
---
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -50,6 +50,7 @@ import org.apache.iceberg.spark.ExtendedParser;
import org.apache.iceberg.spark.SparkCatalogConfig;
import org.apache.iceberg.spark.SparkTableCache;
import org.apache.iceberg.spark.SystemFunctionPushDownHelper;
+import org.apache.iceberg.spark.actions.SparkActions;
import org.apache.iceberg.spark.source.ThreeColumnRecord;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
@@ -1107,4 +1108,209 @@ public class TestRewriteDataFilesProcedure extends
ExtensionsTestBase {
private List<Object[]> currentData(String table) {
return rowsToJava(spark.sql("SELECT * FROM " + table + " order by c1, c2,
c3").collectAsList());
}
+
+ @TestTemplate
+ public void testRewriteDataFilesOnBranch() {
+ createTable();
+ insertData(10);
+
+ String branchName = "testBranch";
+ sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branchName);
+
+ List<Object[]> expectedRecords =
+ rowsToJava(
+ spark
+ .sql(
+ String.format(
+ "SELECT * FROM %s.branch_%s ORDER BY c1, c2, c3",
tableName, branchName))
+ .collectAsList());
+
+ // Get snapshot IDs before rewrite
+ Table table = validationCatalog.loadTable(tableIdent);
+ long mainSnapshotId = table.currentSnapshot().snapshotId();
+ long branchSnapshotId = table.refs().get(branchName).snapshotId();
+
+ // Call rewrite_data_files on the branch
+ List<Object[]> output =
+ sql(
+ "CALL %s.system.rewrite_data_files(table => '%s', branch => '%s')",
+ catalogName, tableName, branchName);
+
+ assertEquals(
+ "Action should rewrite 10 data files and add 1 data file",
+ row(10, 1),
+ Arrays.copyOf(output.get(0), 2));
+
+ // Verify branch data is preserved after compaction
+ List<Object[]> actualRecords =
+ rowsToJava(
+ spark
+ .sql(
+ String.format(
+ "SELECT * FROM %s.branch_%s ORDER BY c1, c2, c3",
tableName, branchName))
+ .collectAsList());
+ assertEquals("Data after compaction should not change", expectedRecords,
actualRecords);
+
+ // Verify branch snapshot changed
+ table.refresh();
+ assertThat(table.refs().get(branchName).snapshotId())
+ .as("Branch snapshot should be updated when files are rewritten")
+ .isNotEqualTo(branchSnapshotId);
+
+ // Verify main snapshot unchanged
+ assertThat(table.currentSnapshot().snapshotId())
+ .as("Main snapshot should remain unchanged")
+ .isEqualTo(mainSnapshotId);
+ }
+
+ @TestTemplate
+ public void testRewriteDataFilesToNullBranchFails() {
+ createTable();
+ insertData(10);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ assertThatThrownBy(() ->
SparkActions.get(spark).rewriteDataFiles(table).toBranch(null))
+ .as("Invalid branch")
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid branch name: null");
+ }
+
+ @TestTemplate
+ public void testRewriteDataFilesOnBranchWithFilter() {
+ createPartitionTable();
+ insertData(10);
+
+ String branchName = "filteredBranch";
+ sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branchName);
+
+ List<Object[]> expectedRecords =
+ rowsToJava(
+ spark
+ .sql(
+ String.format(
+ "SELECT * FROM %s.branch_%s ORDER BY c1, c2, c3",
tableName, branchName))
+ .collectAsList());
+
+ // Get snapshot IDs before rewrite
+ Table table = validationCatalog.loadTable(tableIdent);
+ long mainSnapshotId = table.currentSnapshot().snapshotId();
+ long branchSnapshotId = table.refs().get(branchName).snapshotId();
+
+ // Call rewrite_data_files on the branch with filter (select only
partition c2 = 'bar')
+ List<Object[]> output =
+ sql(
+ "CALL %s.system.rewrite_data_files(table => '%s', branch => '%s',
where => 'c2 = \"bar\"')",
+ catalogName, tableName, branchName);
+
+ assertEquals(
+ "Action should rewrite 5 data files from single matching partition"
+ + "(containing c2 = bar) and add 1 data file",
+ row(5, 1),
+ Arrays.copyOf(output.get(0), 2));
+
+ // Verify branch data is preserved after compaction
+ List<Object[]> actualRecords =
+ rowsToJava(
+ spark
+ .sql(
+ String.format(
+ "SELECT * FROM %s.branch_%s ORDER BY c1, c2, c3",
tableName, branchName))
+ .collectAsList());
+ assertEquals("Data after compaction should not change", expectedRecords,
actualRecords);
+
+ // Verify branch snapshot changed after rewrite
+ table.refresh();
+ assertThat(table.refs().get(branchName).snapshotId())
+ .as("Branch snapshot should be updated when files are rewritten")
+ .isNotEqualTo(branchSnapshotId);
+
+ // Verify main snapshot unchanged
+ assertThat(table.currentSnapshot().snapshotId())
+ .as("Main snapshot should remain unchanged")
+ .isEqualTo(mainSnapshotId);
+ }
+
+ @TestTemplate
+ public void testBranchCompactionDoesNotAffectMain() {
+ createTable();
+ // create 10 files under non-partitioned table
+ insertData(10);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ // Create branch from current main state
+ String branchName = "compactionBranch";
+ sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branchName);
+
+ // Add more data to MAIN to make it diverge from branch
+ insertData(tableName, 10);
+
+ // Refresh to get new main snapshot after divergence
+ table.refresh();
+ long mainSnapshotAfterDivergence = table.currentSnapshot().snapshotId();
+ List<Object[]> expectedMainRecords = currentData();
+
+ // Get branch data before compaction
+ List<Object[]> expectedBranchRecords =
+ rowsToJava(
+ spark
+ .sql(
+ String.format(
+ "SELECT * FROM %s.branch_%s ORDER BY c1, c2, c3",
tableName, branchName))
+ .collectAsList());
+
+ long branchSnapshotBeforeCompaction =
table.refs().get(branchName).snapshotId();
+
+ // Verify that branch and main have diverged
+ assertThat(branchSnapshotBeforeCompaction)
+ .as("Branch and main should have different snapshots")
+ .isNotEqualTo(mainSnapshotAfterDivergence);
+
+ List<Object[]> output =
+ sql(
+ "CALL %s.system.rewrite_data_files(table => '%s', branch => '%s')",
+ catalogName, tableName, branchName);
+
+ assertEquals(
+ "Action should rewrite 10 data files and add 1 data file",
+ row(10, 1),
+ Arrays.copyOf(output.get(0), 2));
+
+ table.refresh();
+
+ // Verify main snapshot unchanged
+ assertThat(table.currentSnapshot().snapshotId())
+ .as("Main snapshot ID must remain unchanged after branch compaction")
+ .isEqualTo(mainSnapshotAfterDivergence);
+
+ // Verify main data unchanged
+ List<Object[]> actualMainRecords = currentData();
+ assertEquals(
+ "Main data after compaction should not change", expectedMainRecords,
actualMainRecords);
+
+ // Verify branch data unchanged
+ List<Object[]> actualBranchRecords =
+ rowsToJava(
+ spark
+ .sql(
+ String.format(
+ "SELECT * FROM %s.branch_%s ORDER BY c1, c2, c3",
tableName, branchName))
+ .collectAsList());
+ assertEquals(
+ "Branch data after compaction should not change",
+ expectedBranchRecords,
+ actualBranchRecords);
+
+ // Verify branch snapshot changed
+ long branchSnapshotAfterCompaction =
table.refs().get(branchName).snapshotId();
+ assertThat(branchSnapshotAfterCompaction)
+ .as("Branch snapshot must be updated after compaction")
+ .isNotEqualTo(branchSnapshotBeforeCompaction);
+
+ // Verify the new branch snapshot is a child of the previous branch
snapshot
+ assertThat(table.snapshot(branchSnapshotAfterCompaction).parentId())
+ .as("New branch snapshot must be a child of the previous branch
snapshot")
+ .isEqualTo(branchSnapshotBeforeCompaction);
+ }
}
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
index a42ce0ecf7..2f5cc25074 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
@@ -96,6 +97,7 @@ public class RewriteDataFilesSparkAction
private boolean removeDanglingDeletes;
private boolean useStartingSequenceNumber;
private boolean caseSensitive;
+ private String branch = SnapshotRef.MAIN_BRANCH;
private BinPackRewriteFilePlanner planner = null;
private FileRewriteRunner<FileGroupInfo, FileScanTask, DataFile,
RewriteFileGroup> runner = null;
@@ -157,13 +159,24 @@ public class RewriteDataFilesSparkAction
return this;
}
+ public RewriteDataFilesSparkAction toBranch(String targetBranch) {
+ Preconditions.checkArgument(targetBranch != null, "Invalid branch name:
null");
+ this.branch = targetBranch;
+ return this;
+ }
+
@Override
public RewriteDataFiles.Result execute() {
if (table.currentSnapshot() == null) {
return EMPTY_RESULT;
}
- long startingSnapshotId = table.currentSnapshot().snapshotId();
+ Preconditions.checkArgument(
+ table.snapshot(branch) != null,
+ "Cannot rewrite data files for branch %s: branch does not exist",
+ branch);
+
+ long startingSnapshotId = table.snapshot(branch).snapshotId();
init(startingSnapshotId);
@@ -230,7 +243,7 @@ public class RewriteDataFilesSparkAction
@VisibleForTesting
RewriteDataFilesCommitManager commitManager(long startingSnapshotId) {
return new RewriteDataFilesCommitManager(
- table, startingSnapshotId, useStartingSequenceNumber, commitSummary());
+ table, startingSnapshotId, useStartingSequenceNumber, commitSummary(),
branch);
}
private Builder doExecute(
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
index f1958a5a17..60ed7b84d4 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.spark.procedures;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.RewriteDataFiles;
@@ -30,6 +31,7 @@ import org.apache.iceberg.expressions.Zorder;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.ExtendedParser;
+import org.apache.iceberg.spark.actions.RewriteDataFilesSparkAction;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
@@ -57,10 +59,12 @@ class RewriteDataFilesProcedure extends BaseProcedure {
ProcedureParameter.optional("options", STRING_MAP);
private static final ProcedureParameter WHERE_PARAM =
ProcedureParameter.optional("where", DataTypes.StringType);
+ private static final ProcedureParameter BRANCH_PARAM =
+ ProcedureParameter.optional("branch", DataTypes.StringType);
private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
- TABLE_PARAM, STRATEGY_PARAM, SORT_ORDER_PARAM, OPTIONS_PARAM,
WHERE_PARAM
+ TABLE_PARAM, STRATEGY_PARAM, SORT_ORDER_PARAM, OPTIONS_PARAM,
WHERE_PARAM, BRANCH_PARAM
};
// counts are not nullable since the action result is never null
@@ -109,17 +113,29 @@ class RewriteDataFilesProcedure extends BaseProcedure {
String sortOrderString = input.asString(SORT_ORDER_PARAM, null);
Map<String, String> options = input.asStringMap(OPTIONS_PARAM,
ImmutableMap.of());
String where = input.asString(WHERE_PARAM, null);
+ String branchParam = input.asString(BRANCH_PARAM, null);
+ if (branchParam == null) {
+ branchParam = loadSparkTable(tableIdent).branch();
+ if (branchParam == null) {
+ branchParam = SnapshotRef.MAIN_BRANCH;
+ }
+ }
+ String branch = branchParam;
return modifyIcebergTable(
tableIdent,
table -> {
- RewriteDataFiles action =
actions().rewriteDataFiles(table).options(options);
+ RewriteDataFilesSparkAction action =
+ (RewriteDataFilesSparkAction)
+
actions().rewriteDataFiles(table).options(options).toBranch(branch);
if (strategy != null || sortOrderString != null) {
- action = checkAndApplyStrategy(action, strategy, sortOrderString,
table.schema());
+ action =
+ (RewriteDataFilesSparkAction)
+ checkAndApplyStrategy(action, strategy, sortOrderString,
table.schema());
}
- action = checkAndApplyFilter(action, where, tableIdent);
+ action = (RewriteDataFilesSparkAction) checkAndApplyFilter(action,
where, tableIdent);
RewriteDataFiles.Result result = action.execute();
diff --git
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index bd6d5abcd0..6b8244a331 100644
---
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -50,6 +50,7 @@ import org.apache.iceberg.spark.ExtendedParser;
import org.apache.iceberg.spark.SparkCatalogConfig;
import org.apache.iceberg.spark.SparkTableCache;
import org.apache.iceberg.spark.SystemFunctionPushDownHelper;
+import org.apache.iceberg.spark.actions.SparkActions;
import org.apache.iceberg.spark.source.ThreeColumnRecord;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
@@ -1105,4 +1106,209 @@ public class TestRewriteDataFilesProcedure extends
ExtensionsTestBase {
private List<Object[]> currentData(String table) {
return rowsToJava(spark.sql("SELECT * FROM " + table + " order by c1, c2,
c3").collectAsList());
}
+
+ @TestTemplate
+ public void testRewriteDataFilesOnBranch() {
+ createTable();
+ insertData(10);
+
+ String branchName = "testBranch";
+ sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branchName);
+
+ List<Object[]> expectedRecords =
+ rowsToJava(
+ spark
+ .sql(
+ String.format(
+ "SELECT * FROM %s.branch_%s ORDER BY c1, c2, c3",
tableName, branchName))
+ .collectAsList());
+
+ // Get snapshot IDs before rewrite
+ Table table = validationCatalog.loadTable(tableIdent);
+ long mainSnapshotId = table.currentSnapshot().snapshotId();
+ long branchSnapshotId = table.refs().get(branchName).snapshotId();
+
+ // Call rewrite_data_files on the branch
+ List<Object[]> output =
+ sql(
+ "CALL %s.system.rewrite_data_files(table => '%s', branch => '%s')",
+ catalogName, tableName, branchName);
+
+ assertEquals(
+ "Action should rewrite 10 data files and add 1 data file",
+ row(10, 1),
+ Arrays.copyOf(output.get(0), 2));
+
+ // Verify branch data is preserved after compaction
+ List<Object[]> actualRecords =
+ rowsToJava(
+ spark
+ .sql(
+ String.format(
+ "SELECT * FROM %s.branch_%s ORDER BY c1, c2, c3",
tableName, branchName))
+ .collectAsList());
+ assertEquals("Data after compaction should not change", expectedRecords,
actualRecords);
+
+ // Verify branch snapshot changed
+ table.refresh();
+ assertThat(table.refs().get(branchName).snapshotId())
+ .as("Branch snapshot should be updated when files are rewritten")
+ .isNotEqualTo(branchSnapshotId);
+
+ // Verify main snapshot unchanged
+ assertThat(table.currentSnapshot().snapshotId())
+ .as("Main snapshot should remain unchanged")
+ .isEqualTo(mainSnapshotId);
+ }
+
+ @TestTemplate
+ public void testRewriteDataFilesToNullBranchFails() {
+ createTable();
+ insertData(10);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ assertThatThrownBy(() ->
SparkActions.get(spark).rewriteDataFiles(table).toBranch(null))
+ .as("Invalid branch")
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid branch name: null");
+ }
+
+ @TestTemplate
+ public void testRewriteDataFilesOnBranchWithFilter() {
+ createPartitionTable();
+ insertData(10);
+
+ String branchName = "filteredBranch";
+ sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branchName);
+
+ List<Object[]> expectedRecords =
+ rowsToJava(
+ spark
+ .sql(
+ String.format(
+ "SELECT * FROM %s.branch_%s ORDER BY c1, c2, c3",
tableName, branchName))
+ .collectAsList());
+
+ // Get snapshot IDs before rewrite
+ Table table = validationCatalog.loadTable(tableIdent);
+ long mainSnapshotId = table.currentSnapshot().snapshotId();
+ long branchSnapshotId = table.refs().get(branchName).snapshotId();
+
+ // Call rewrite_data_files on the branch with filter (select only
partition c2 = 'bar')
+ List<Object[]> output =
+ sql(
+ "CALL %s.system.rewrite_data_files(table => '%s', branch => '%s',
where => 'c2 = \"bar\"')",
+ catalogName, tableName, branchName);
+
+ assertEquals(
+ "Action should rewrite 5 data files from single matching partition"
+ + "(containing c2 = bar) and add 1 data file",
+ row(5, 1),
+ Arrays.copyOf(output.get(0), 2));
+
+ // Verify branch data is preserved after compaction
+ List<Object[]> actualRecords =
+ rowsToJava(
+ spark
+ .sql(
+ String.format(
+ "SELECT * FROM %s.branch_%s ORDER BY c1, c2, c3",
tableName, branchName))
+ .collectAsList());
+ assertEquals("Data after compaction should not change", expectedRecords,
actualRecords);
+
+ // Verify branch snapshot changed after rewrite
+ table.refresh();
+ assertThat(table.refs().get(branchName).snapshotId())
+ .as("Branch snapshot should be updated when files are rewritten")
+ .isNotEqualTo(branchSnapshotId);
+
+ // Verify main snapshot unchanged
+ assertThat(table.currentSnapshot().snapshotId())
+ .as("Main snapshot should remain unchanged")
+ .isEqualTo(mainSnapshotId);
+ }
+
+ @TestTemplate
+ public void testBranchCompactionDoesNotAffectMain() {
+ createTable();
+ // create 10 files under non-partitioned table
+ insertData(10);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ // Create branch from current main state
+ String branchName = "compactionBranch";
+ sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branchName);
+
+ // Add more data to MAIN to make it diverge from branch
+ insertData(tableName, 10);
+
+ // Refresh to get new main snapshot after divergence
+ table.refresh();
+ long mainSnapshotAfterDivergence = table.currentSnapshot().snapshotId();
+ List<Object[]> expectedMainRecords = currentData();
+
+ // Get branch data before compaction
+ List<Object[]> expectedBranchRecords =
+ rowsToJava(
+ spark
+ .sql(
+ String.format(
+ "SELECT * FROM %s.branch_%s ORDER BY c1, c2, c3",
tableName, branchName))
+ .collectAsList());
+
+ long branchSnapshotBeforeCompaction =
table.refs().get(branchName).snapshotId();
+
+ // Verify that branch and main have diverged
+ assertThat(branchSnapshotBeforeCompaction)
+ .as("Branch and main should have different snapshots")
+ .isNotEqualTo(mainSnapshotAfterDivergence);
+
+ List<Object[]> output =
+ sql(
+ "CALL %s.system.rewrite_data_files(table => '%s', branch => '%s')",
+ catalogName, tableName, branchName);
+
+ assertEquals(
+ "Action should rewrite 10 data files and add 1 data file",
+ row(10, 1),
+ Arrays.copyOf(output.get(0), 2));
+
+ table.refresh();
+
+ // Verify main snapshot unchanged
+ assertThat(table.currentSnapshot().snapshotId())
+ .as("Main snapshot ID must remain unchanged after branch compaction")
+ .isEqualTo(mainSnapshotAfterDivergence);
+
+ // Verify main data unchanged
+ List<Object[]> actualMainRecords = currentData();
+ assertEquals(
+ "Main data after compaction should not change", expectedMainRecords,
actualMainRecords);
+
+ // Verify branch data unchanged
+ List<Object[]> actualBranchRecords =
+ rowsToJava(
+ spark
+ .sql(
+ String.format(
+ "SELECT * FROM %s.branch_%s ORDER BY c1, c2, c3",
tableName, branchName))
+ .collectAsList());
+ assertEquals(
+ "Branch data after compaction should not change",
+ expectedBranchRecords,
+ actualBranchRecords);
+
+ // Verify branch snapshot changed
+ long branchSnapshotAfterCompaction =
table.refs().get(branchName).snapshotId();
+ assertThat(branchSnapshotAfterCompaction)
+ .as("Branch snapshot must be updated after compaction")
+ .isNotEqualTo(branchSnapshotBeforeCompaction);
+
+ // Verify the new branch snapshot is a child of the previous branch
snapshot
+ assertThat(table.snapshot(branchSnapshotAfterCompaction).parentId())
+ .as("New branch snapshot must be a child of the previous branch
snapshot")
+ .isEqualTo(branchSnapshotBeforeCompaction);
+ }
}
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
index 61d1fdfe60..3415b6a551 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
@@ -96,6 +97,7 @@ public class RewriteDataFilesSparkAction
private boolean removeDanglingDeletes;
private boolean useStartingSequenceNumber;
private boolean caseSensitive;
+ private String branch = SnapshotRef.MAIN_BRANCH;
private BinPackRewriteFilePlanner planner = null;
private FileRewriteRunner<FileGroupInfo, FileScanTask, DataFile,
RewriteFileGroup> runner = null;
@@ -157,13 +159,24 @@ public class RewriteDataFilesSparkAction
return this;
}
+ public RewriteDataFilesSparkAction toBranch(String targetBranch) {
+ Preconditions.checkArgument(targetBranch != null, "Invalid branch name:
null");
+ this.branch = targetBranch;
+ return this;
+ }
+
@Override
public RewriteDataFiles.Result execute() {
if (table.currentSnapshot() == null) {
return EMPTY_RESULT;
}
- long startingSnapshotId = table.currentSnapshot().snapshotId();
+ Preconditions.checkArgument(
+ table.snapshot(branch) != null,
+ "Cannot rewrite data files for branch %s: branch does not exist",
+ branch);
+
+ long startingSnapshotId = table.snapshot(branch).snapshotId();
init(startingSnapshotId);
@@ -230,7 +243,7 @@ public class RewriteDataFilesSparkAction
@VisibleForTesting
RewriteDataFilesCommitManager commitManager(long startingSnapshotId) {
return new RewriteDataFilesCommitManager(
- table, startingSnapshotId, useStartingSequenceNumber, commitSummary());
+ table, startingSnapshotId, useStartingSequenceNumber, commitSummary(),
branch);
}
private Builder doExecute(
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
index 3e4f9ea587..5ab07f4d0a 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
@@ -22,6 +22,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.RewriteDataFiles;
@@ -31,6 +32,7 @@ import org.apache.iceberg.expressions.Zorder;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.ExtendedParser;
+import org.apache.iceberg.spark.actions.RewriteDataFilesSparkAction;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
@@ -62,10 +64,12 @@ class RewriteDataFilesProcedure extends BaseProcedure {
optionalInParameter("options", STRING_MAP);
private static final ProcedureParameter WHERE_PARAM =
optionalInParameter("where", DataTypes.StringType);
+ private static final ProcedureParameter BRANCH_PARAM =
+ optionalInParameter("branch", DataTypes.StringType);
private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
- TABLE_PARAM, STRATEGY_PARAM, SORT_ORDER_PARAM, OPTIONS_PARAM,
WHERE_PARAM
+ TABLE_PARAM, STRATEGY_PARAM, SORT_ORDER_PARAM, OPTIONS_PARAM,
WHERE_PARAM, BRANCH_PARAM
};
// counts are not nullable since the action result is never null
@@ -114,11 +118,21 @@ class RewriteDataFilesProcedure extends BaseProcedure {
String sortOrderString = input.asString(SORT_ORDER_PARAM, null);
Map<String, String> options = input.asStringMap(OPTIONS_PARAM,
ImmutableMap.of());
String where = input.asString(WHERE_PARAM, null);
+ // Determine target branch: explicit parameter > table branch > main branch
+ String branchParam = input.asString(BRANCH_PARAM, null);
+ if (branchParam == null) {
+ branchParam = loadSparkTable(tableIdent).branch();
+ if (branchParam == null) {
+ branchParam = SnapshotRef.MAIN_BRANCH;
+ }
+ }
+ String branch = branchParam;
return modifyIcebergTable(
tableIdent,
table -> {
- RewriteDataFiles action =
actions().rewriteDataFiles(table).options(options);
+ RewriteDataFilesSparkAction action =
+
actions().rewriteDataFiles(table).options(options).toBranch(branch);
if (strategy != null || sortOrderString != null) {
action = checkAndApplyStrategy(action, strategy, sortOrderString,
table.schema());
@@ -132,8 +146,8 @@ class RewriteDataFilesProcedure extends BaseProcedure {
});
}
- private RewriteDataFiles checkAndApplyFilter(
- RewriteDataFiles action, String where, Identifier ident) {
+ private RewriteDataFilesSparkAction checkAndApplyFilter(
+ RewriteDataFilesSparkAction action, String where, Identifier ident) {
if (where != null) {
Expression expression = filterExpression(ident, where);
return action.filter(expression);
@@ -141,8 +155,8 @@ class RewriteDataFilesProcedure extends BaseProcedure {
return action;
}
- private RewriteDataFiles checkAndApplyStrategy(
- RewriteDataFiles action, String strategy, String sortOrderString, Schema
schema) {
+ private RewriteDataFilesSparkAction checkAndApplyStrategy(
+ RewriteDataFilesSparkAction action, String strategy, String
sortOrderString, Schema schema) {
List<Zorder> zOrderTerms = Lists.newArrayList();
List<ExtendedParser.RawOrderField> sortOrderFields = Lists.newArrayList();
if (sortOrderString != null) {
@@ -179,13 +193,13 @@ class RewriteDataFilesProcedure extends BaseProcedure {
}
}
if (strategy.equalsIgnoreCase("binpack")) {
- RewriteDataFiles rewriteDataFiles = action.binPack();
+ RewriteDataFilesSparkAction binPackAction = action.binPack();
if (sortOrderString != null) {
// calling below method to throw the error as user has set both
binpack strategy and sort
// order
- return rewriteDataFiles.sort(buildSortOrder(sortOrderFields, schema));
+ return binPackAction.sort(buildSortOrder(sortOrderFields, schema));
}
- return rewriteDataFiles;
+ return binPackAction;
} else {
throw new IllegalArgumentException(
"unsupported strategy: " + strategy + ". Only binpack or sort is
supported");