This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 15485f5523 Spark: Backport adding branch support to rewrite_data_files 
procedure (#15067)
15485f5523 is described below

commit 15485f5523d08aae2a503c143c51b6df2debb655
Author: Harsh Sharma <[email protected]>
AuthorDate: Wed Jan 21 13:31:01 2026 +0530

    Spark: Backport adding branch support to rewrite_data_files procedure 
(#15067)
    
    backports #14964
---
 .../extensions/TestRewriteDataFilesProcedure.java  | 207 +++++++++++++++++++++
 .../spark/actions/RewriteDataFilesSparkAction.java |  17 +-
 .../procedures/RewriteDataFilesProcedure.java      |  24 ++-
 .../extensions/TestRewriteDataFilesProcedure.java  | 206 ++++++++++++++++++++
 .../spark/actions/RewriteDataFilesSparkAction.java |  17 +-
 .../procedures/RewriteDataFilesProcedure.java      |  24 ++-
 .../extensions/TestRewriteDataFilesProcedure.java  | 206 ++++++++++++++++++++
 .../spark/actions/RewriteDataFilesSparkAction.java |  17 +-
 .../procedures/RewriteDataFilesProcedure.java      |  32 +++-
 9 files changed, 727 insertions(+), 23 deletions(-)

diff --git 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index d011ad0628..a2a693c56b 100644
--- 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++ 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -33,6 +33,7 @@ import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.EnvironmentContext;
 import org.apache.iceberg.ParameterizedTestExtension;
 import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.ValidationException;
@@ -45,6 +46,7 @@ import org.apache.iceberg.spark.ExtendedParser;
 import org.apache.iceberg.spark.SparkCatalogConfig;
 import org.apache.iceberg.spark.SparkTableCache;
 import org.apache.iceberg.spark.SystemFunctionPushDownHelper;
+import org.apache.iceberg.spark.actions.SparkActions;
 import org.apache.iceberg.spark.source.ThreeColumnRecord;
 import org.apache.spark.sql.AnalysisException;
 import org.apache.spark.sql.Dataset;
@@ -1056,4 +1058,209 @@ public class TestRewriteDataFilesProcedure extends 
ExtensionsTestBase {
   private List<Object[]> currentData(String table) {
     return rowsToJava(spark.sql("SELECT * FROM " + table + " order by c1, c2, 
c3").collectAsList());
   }
+
+  @TestTemplate
+  public void testRewriteDataFilesOnBranch() {
+    createTable();
+    insertData(10);
+
+    String branchName = "testBranch";
+    sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branchName);
+
+    List<Object[]> expectedRecords =
+        rowsToJava(
+            spark
+                .sql(
+                    String.format(
+                        "SELECT * FROM %s.branch_%s ORDER BY c1, c2, c3", 
tableName, branchName))
+                .collectAsList());
+
+    // Get snapshot IDs before rewrite
+    Table table = validationCatalog.loadTable(tableIdent);
+    long mainSnapshotId = table.currentSnapshot().snapshotId();
+    long branchSnapshotId = table.refs().get(branchName).snapshotId();
+
+    // Call rewrite_data_files on the branch
+    List<Object[]> output =
+        sql(
+            "CALL %s.system.rewrite_data_files(table => '%s', branch => '%s')",
+            catalogName, tableName, branchName);
+
+    assertEquals(
+        "Action should rewrite 10 data files and add 1 data file",
+        row(10, 1),
+        Arrays.copyOf(output.get(0), 2));
+
+    // Verify branch data is preserved after compaction
+    List<Object[]> actualRecords =
+        rowsToJava(
+            spark
+                .sql(
+                    String.format(
+                        "SELECT * FROM %s.branch_%s ORDER BY c1, c2, c3", 
tableName, branchName))
+                .collectAsList());
+    assertEquals("Data after compaction should not change", expectedRecords, 
actualRecords);
+
+    // Verify branch snapshot changed
+    table.refresh();
+    assertThat(table.refs().get(branchName).snapshotId())
+        .as("Branch snapshot should be updated when files are rewritten")
+        .isNotEqualTo(branchSnapshotId);
+
+    // Verify main snapshot unchanged
+    assertThat(table.currentSnapshot().snapshotId())
+        .as("Main snapshot should remain unchanged")
+        .isEqualTo(mainSnapshotId);
+  }
+
+  @TestTemplate
+  public void testRewriteDataFilesToNullBranchFails() {
+    createTable();
+    insertData(10);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    assertThatThrownBy(() -> 
SparkActions.get(spark).rewriteDataFiles(table).toBranch(null))
+        .as("Invalid branch")
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid branch name: null");
+  }
+
+  @TestTemplate
+  public void testRewriteDataFilesOnBranchWithFilter() {
+    createPartitionTable();
+    insertData(10);
+
+    String branchName = "filteredBranch";
+    sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branchName);
+
+    List<Object[]> expectedRecords =
+        rowsToJava(
+            spark
+                .sql(
+                    String.format(
+                        "SELECT * FROM %s.branch_%s ORDER BY c1, c2, c3", 
tableName, branchName))
+                .collectAsList());
+
+    // Get snapshot IDs before rewrite
+    Table table = validationCatalog.loadTable(tableIdent);
+    long mainSnapshotId = table.currentSnapshot().snapshotId();
+    long branchSnapshotId = table.refs().get(branchName).snapshotId();
+
+    // Call rewrite_data_files on the branch with filter (select only 
partition c2 = 'bar')
+    List<Object[]> output =
+        sql(
+            "CALL %s.system.rewrite_data_files(table => '%s', branch => '%s', 
where => 'c2 = \"bar\"')",
+            catalogName, tableName, branchName);
+
+    assertEquals(
+        "Action should rewrite 5 data files from single matching partition"
+            + "(containing c2 = bar) and add 1 data file",
+        row(5, 1),
+        Arrays.copyOf(output.get(0), 2));
+
+    // Verify branch data is preserved after compaction
+    List<Object[]> actualRecords =
+        rowsToJava(
+            spark
+                .sql(
+                    String.format(
+                        "SELECT * FROM %s.branch_%s ORDER BY c1, c2, c3", 
tableName, branchName))
+                .collectAsList());
+    assertEquals("Data after compaction should not change", expectedRecords, 
actualRecords);
+
+    // Verify branch snapshot changed after rewrite
+    table.refresh();
+    assertThat(table.refs().get(branchName).snapshotId())
+        .as("Branch snapshot should be updated when files are rewritten")
+        .isNotEqualTo(branchSnapshotId);
+
+    // Verify main snapshot unchanged
+    assertThat(table.currentSnapshot().snapshotId())
+        .as("Main snapshot should remain unchanged")
+        .isEqualTo(mainSnapshotId);
+  }
+
+  @TestTemplate
+  public void testBranchCompactionDoesNotAffectMain() {
+    createTable();
+    // create 10 files under non-partitioned table
+    insertData(10);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    // Create branch from current main state
+    String branchName = "compactionBranch";
+    sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branchName);
+
+    // Add more data to MAIN to make it diverge from branch
+    insertData(tableName, 10);
+
+    // Refresh to get new main snapshot after divergence
+    table.refresh();
+    long mainSnapshotAfterDivergence = table.currentSnapshot().snapshotId();
+    List<Object[]> expectedMainRecords = currentData();
+
+    // Get branch data before compaction
+    List<Object[]> expectedBranchRecords =
+        rowsToJava(
+            spark
+                .sql(
+                    String.format(
+                        "SELECT * FROM %s.branch_%s ORDER BY c1, c2, c3", 
tableName, branchName))
+                .collectAsList());
+
+    long branchSnapshotBeforeCompaction = 
table.refs().get(branchName).snapshotId();
+
+    // Verify that branch and main have diverged
+    assertThat(branchSnapshotBeforeCompaction)
+        .as("Branch and main should have different snapshots")
+        .isNotEqualTo(mainSnapshotAfterDivergence);
+
+    List<Object[]> output =
+        sql(
+            "CALL %s.system.rewrite_data_files(table => '%s', branch => '%s')",
+            catalogName, tableName, branchName);
+
+    assertEquals(
+        "Action should rewrite 10 data files and add 1 data file",
+        row(10, 1),
+        Arrays.copyOf(output.get(0), 2));
+
+    table.refresh();
+
+    // Verify main snapshot unchanged
+    assertThat(table.currentSnapshot().snapshotId())
+        .as("Main snapshot ID must remain unchanged after branch compaction")
+        .isEqualTo(mainSnapshotAfterDivergence);
+
+    // Verify main data unchanged
+    List<Object[]> actualMainRecords = currentData();
+    assertEquals(
+        "Main data after compaction should not change", expectedMainRecords, 
actualMainRecords);
+
+    // Verify branch data unchanged
+    List<Object[]> actualBranchRecords =
+        rowsToJava(
+            spark
+                .sql(
+                    String.format(
+                        "SELECT * FROM %s.branch_%s ORDER BY c1, c2, c3", 
tableName, branchName))
+                .collectAsList());
+    assertEquals(
+        "Branch data after compaction should not change",
+        expectedBranchRecords,
+        actualBranchRecords);
+
+    // Verify branch snapshot changed
+    long branchSnapshotAfterCompaction = 
table.refs().get(branchName).snapshotId();
+    assertThat(branchSnapshotAfterCompaction)
+        .as("Branch snapshot must be updated after compaction")
+        .isNotEqualTo(branchSnapshotBeforeCompaction);
+
+    // Verify the new branch snapshot is a child of the previous branch 
snapshot
+    assertThat(table.snapshot(branchSnapshotAfterCompaction).parentId())
+        .as("New branch snapshot must be a child of the previous branch 
snapshot")
+        .isEqualTo(branchSnapshotBeforeCompaction);
+  }
 }
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
index a42ce0ecf7..2f5cc25074 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.stream.Collectors;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.SortOrder;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
@@ -96,6 +97,7 @@ public class RewriteDataFilesSparkAction
   private boolean removeDanglingDeletes;
   private boolean useStartingSequenceNumber;
   private boolean caseSensitive;
+  private String branch = SnapshotRef.MAIN_BRANCH;
   private BinPackRewriteFilePlanner planner = null;
   private FileRewriteRunner<FileGroupInfo, FileScanTask, DataFile, 
RewriteFileGroup> runner = null;
 
@@ -157,13 +159,24 @@ public class RewriteDataFilesSparkAction
     return this;
   }
 
+  public RewriteDataFilesSparkAction toBranch(String targetBranch) {
+    Preconditions.checkArgument(targetBranch != null, "Invalid branch name: 
null");
+    this.branch = targetBranch;
+    return this;
+  }
+
   @Override
   public RewriteDataFiles.Result execute() {
     if (table.currentSnapshot() == null) {
       return EMPTY_RESULT;
     }
 
-    long startingSnapshotId = table.currentSnapshot().snapshotId();
+    Preconditions.checkArgument(
+        table.snapshot(branch) != null,
+        "Cannot rewrite data files for branch %s: branch does not exist",
+        branch);
+
+    long startingSnapshotId = table.snapshot(branch).snapshotId();
 
     init(startingSnapshotId);
 
@@ -230,7 +243,7 @@ public class RewriteDataFilesSparkAction
   @VisibleForTesting
   RewriteDataFilesCommitManager commitManager(long startingSnapshotId) {
     return new RewriteDataFilesCommitManager(
-        table, startingSnapshotId, useStartingSequenceNumber, commitSummary());
+        table, startingSnapshotId, useStartingSequenceNumber, commitSummary(), 
branch);
   }
 
   private Builder doExecute(
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
index f1958a5a17..60ed7b84d4 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.spark.procedures;
 import java.util.List;
 import java.util.Map;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.SortOrder;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.actions.RewriteDataFiles;
@@ -30,6 +31,7 @@ import org.apache.iceberg.expressions.Zorder;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.ExtendedParser;
+import org.apache.iceberg.spark.actions.RewriteDataFilesSparkAction;
 import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.connector.catalog.Identifier;
@@ -57,10 +59,12 @@ class RewriteDataFilesProcedure extends BaseProcedure {
       ProcedureParameter.optional("options", STRING_MAP);
   private static final ProcedureParameter WHERE_PARAM =
       ProcedureParameter.optional("where", DataTypes.StringType);
+  private static final ProcedureParameter BRANCH_PARAM =
+      ProcedureParameter.optional("branch", DataTypes.StringType);
 
   private static final ProcedureParameter[] PARAMETERS =
       new ProcedureParameter[] {
-        TABLE_PARAM, STRATEGY_PARAM, SORT_ORDER_PARAM, OPTIONS_PARAM, 
WHERE_PARAM
+        TABLE_PARAM, STRATEGY_PARAM, SORT_ORDER_PARAM, OPTIONS_PARAM, 
WHERE_PARAM, BRANCH_PARAM
       };
 
   // counts are not nullable since the action result is never null
@@ -109,17 +113,29 @@ class RewriteDataFilesProcedure extends BaseProcedure {
     String sortOrderString = input.asString(SORT_ORDER_PARAM, null);
     Map<String, String> options = input.asStringMap(OPTIONS_PARAM, 
ImmutableMap.of());
     String where = input.asString(WHERE_PARAM, null);
+    String branchParam = input.asString(BRANCH_PARAM, null);
+    if (branchParam == null) {
+      branchParam = loadSparkTable(tableIdent).branch();
+      if (branchParam == null) {
+        branchParam = SnapshotRef.MAIN_BRANCH;
+      }
+    }
+    String branch = branchParam;
 
     return modifyIcebergTable(
         tableIdent,
         table -> {
-          RewriteDataFiles action = 
actions().rewriteDataFiles(table).options(options);
+          RewriteDataFilesSparkAction action =
+              (RewriteDataFilesSparkAction)
+                  
actions().rewriteDataFiles(table).options(options).toBranch(branch);
 
           if (strategy != null || sortOrderString != null) {
-            action = checkAndApplyStrategy(action, strategy, sortOrderString, 
table.schema());
+            action =
+                (RewriteDataFilesSparkAction)
+                    checkAndApplyStrategy(action, strategy, sortOrderString, 
table.schema());
           }
 
-          action = checkAndApplyFilter(action, where, tableIdent);
+          action = (RewriteDataFilesSparkAction) checkAndApplyFilter(action, 
where, tableIdent);
 
           RewriteDataFiles.Result result = action.execute();
 
diff --git 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index 5c3093e707..ee727bd139 100644
--- 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++ 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -50,6 +50,7 @@ import org.apache.iceberg.spark.ExtendedParser;
 import org.apache.iceberg.spark.SparkCatalogConfig;
 import org.apache.iceberg.spark.SparkTableCache;
 import org.apache.iceberg.spark.SystemFunctionPushDownHelper;
+import org.apache.iceberg.spark.actions.SparkActions;
 import org.apache.iceberg.spark.source.ThreeColumnRecord;
 import org.apache.spark.sql.AnalysisException;
 import org.apache.spark.sql.Dataset;
@@ -1107,4 +1108,209 @@ public class TestRewriteDataFilesProcedure extends 
ExtensionsTestBase {
   private List<Object[]> currentData(String table) {
     return rowsToJava(spark.sql("SELECT * FROM " + table + " order by c1, c2, 
c3").collectAsList());
   }
+
+  @TestTemplate
+  public void testRewriteDataFilesOnBranch() {
+    createTable();
+    insertData(10);
+
+    String branchName = "testBranch";
+    sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branchName);
+
+    List<Object[]> expectedRecords =
+        rowsToJava(
+            spark
+                .sql(
+                    String.format(
+                        "SELECT * FROM %s.branch_%s ORDER BY c1, c2, c3", 
tableName, branchName))
+                .collectAsList());
+
+    // Get snapshot IDs before rewrite
+    Table table = validationCatalog.loadTable(tableIdent);
+    long mainSnapshotId = table.currentSnapshot().snapshotId();
+    long branchSnapshotId = table.refs().get(branchName).snapshotId();
+
+    // Call rewrite_data_files on the branch
+    List<Object[]> output =
+        sql(
+            "CALL %s.system.rewrite_data_files(table => '%s', branch => '%s')",
+            catalogName, tableName, branchName);
+
+    assertEquals(
+        "Action should rewrite 10 data files and add 1 data file",
+        row(10, 1),
+        Arrays.copyOf(output.get(0), 2));
+
+    // Verify branch data is preserved after compaction
+    List<Object[]> actualRecords =
+        rowsToJava(
+            spark
+                .sql(
+                    String.format(
+                        "SELECT * FROM %s.branch_%s ORDER BY c1, c2, c3", 
tableName, branchName))
+                .collectAsList());
+    assertEquals("Data after compaction should not change", expectedRecords, 
actualRecords);
+
+    // Verify branch snapshot changed
+    table.refresh();
+    assertThat(table.refs().get(branchName).snapshotId())
+        .as("Branch snapshot should be updated when files are rewritten")
+        .isNotEqualTo(branchSnapshotId);
+
+    // Verify main snapshot unchanged
+    assertThat(table.currentSnapshot().snapshotId())
+        .as("Main snapshot should remain unchanged")
+        .isEqualTo(mainSnapshotId);
+  }
+
+  @TestTemplate
+  public void testRewriteDataFilesToNullBranchFails() {
+    createTable();
+    insertData(10);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    assertThatThrownBy(() -> 
SparkActions.get(spark).rewriteDataFiles(table).toBranch(null))
+        .as("Invalid branch")
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid branch name: null");
+  }
+
+  @TestTemplate
+  public void testRewriteDataFilesOnBranchWithFilter() {
+    createPartitionTable();
+    insertData(10);
+
+    String branchName = "filteredBranch";
+    sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branchName);
+
+    List<Object[]> expectedRecords =
+        rowsToJava(
+            spark
+                .sql(
+                    String.format(
+                        "SELECT * FROM %s.branch_%s ORDER BY c1, c2, c3", 
tableName, branchName))
+                .collectAsList());
+
+    // Get snapshot IDs before rewrite
+    Table table = validationCatalog.loadTable(tableIdent);
+    long mainSnapshotId = table.currentSnapshot().snapshotId();
+    long branchSnapshotId = table.refs().get(branchName).snapshotId();
+
+    // Call rewrite_data_files on the branch with filter (select only 
partition c2 = 'bar')
+    List<Object[]> output =
+        sql(
+            "CALL %s.system.rewrite_data_files(table => '%s', branch => '%s', 
where => 'c2 = \"bar\"')",
+            catalogName, tableName, branchName);
+
+    assertEquals(
+        "Action should rewrite 5 data files from single matching partition"
+            + "(containing c2 = bar) and add 1 data file",
+        row(5, 1),
+        Arrays.copyOf(output.get(0), 2));
+
+    // Verify branch data is preserved after compaction
+    List<Object[]> actualRecords =
+        rowsToJava(
+            spark
+                .sql(
+                    String.format(
+                        "SELECT * FROM %s.branch_%s ORDER BY c1, c2, c3", 
tableName, branchName))
+                .collectAsList());
+    assertEquals("Data after compaction should not change", expectedRecords, 
actualRecords);
+
+    // Verify branch snapshot changed after rewrite
+    table.refresh();
+    assertThat(table.refs().get(branchName).snapshotId())
+        .as("Branch snapshot should be updated when files are rewritten")
+        .isNotEqualTo(branchSnapshotId);
+
+    // Verify main snapshot unchanged
+    assertThat(table.currentSnapshot().snapshotId())
+        .as("Main snapshot should remain unchanged")
+        .isEqualTo(mainSnapshotId);
+  }
+
+  @TestTemplate
+  public void testBranchCompactionDoesNotAffectMain() {
+    createTable();
+    // create 10 files under non-partitioned table
+    insertData(10);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    // Create branch from current main state
+    String branchName = "compactionBranch";
+    sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branchName);
+
+    // Add more data to MAIN to make it diverge from branch
+    insertData(tableName, 10);
+
+    // Refresh to get new main snapshot after divergence
+    table.refresh();
+    long mainSnapshotAfterDivergence = table.currentSnapshot().snapshotId();
+    List<Object[]> expectedMainRecords = currentData();
+
+    // Get branch data before compaction
+    List<Object[]> expectedBranchRecords =
+        rowsToJava(
+            spark
+                .sql(
+                    String.format(
+                        "SELECT * FROM %s.branch_%s ORDER BY c1, c2, c3", 
tableName, branchName))
+                .collectAsList());
+
+    long branchSnapshotBeforeCompaction = 
table.refs().get(branchName).snapshotId();
+
+    // Verify that branch and main have diverged
+    assertThat(branchSnapshotBeforeCompaction)
+        .as("Branch and main should have different snapshots")
+        .isNotEqualTo(mainSnapshotAfterDivergence);
+
+    List<Object[]> output =
+        sql(
+            "CALL %s.system.rewrite_data_files(table => '%s', branch => '%s')",
+            catalogName, tableName, branchName);
+
+    assertEquals(
+        "Action should rewrite 10 data files and add 1 data file",
+        row(10, 1),
+        Arrays.copyOf(output.get(0), 2));
+
+    table.refresh();
+
+    // Verify main snapshot unchanged
+    assertThat(table.currentSnapshot().snapshotId())
+        .as("Main snapshot ID must remain unchanged after branch compaction")
+        .isEqualTo(mainSnapshotAfterDivergence);
+
+    // Verify main data unchanged
+    List<Object[]> actualMainRecords = currentData();
+    assertEquals(
+        "Main data after compaction should not change", expectedMainRecords, 
actualMainRecords);
+
+    // Verify branch data unchanged
+    List<Object[]> actualBranchRecords =
+        rowsToJava(
+            spark
+                .sql(
+                    String.format(
+                        "SELECT * FROM %s.branch_%s ORDER BY c1, c2, c3", 
tableName, branchName))
+                .collectAsList());
+    assertEquals(
+        "Branch data after compaction should not change",
+        expectedBranchRecords,
+        actualBranchRecords);
+
+    // Verify branch snapshot changed
+    long branchSnapshotAfterCompaction = 
table.refs().get(branchName).snapshotId();
+    assertThat(branchSnapshotAfterCompaction)
+        .as("Branch snapshot must be updated after compaction")
+        .isNotEqualTo(branchSnapshotBeforeCompaction);
+
+    // Verify the new branch snapshot is a child of the previous branch 
snapshot
+    assertThat(table.snapshot(branchSnapshotAfterCompaction).parentId())
+        .as("New branch snapshot must be a child of the previous branch 
snapshot")
+        .isEqualTo(branchSnapshotBeforeCompaction);
+  }
 }
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
index a42ce0ecf7..2f5cc25074 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.stream.Collectors;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.SortOrder;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
@@ -96,6 +97,7 @@ public class RewriteDataFilesSparkAction
   private boolean removeDanglingDeletes;
   private boolean useStartingSequenceNumber;
   private boolean caseSensitive;
+  private String branch = SnapshotRef.MAIN_BRANCH;
   private BinPackRewriteFilePlanner planner = null;
   private FileRewriteRunner<FileGroupInfo, FileScanTask, DataFile, 
RewriteFileGroup> runner = null;
 
@@ -157,13 +159,24 @@ public class RewriteDataFilesSparkAction
     return this;
   }
 
+  public RewriteDataFilesSparkAction toBranch(String targetBranch) {
+    Preconditions.checkArgument(targetBranch != null, "Invalid branch name: 
null");
+    this.branch = targetBranch;
+    return this;
+  }
+
   @Override
   public RewriteDataFiles.Result execute() {
     if (table.currentSnapshot() == null) {
       return EMPTY_RESULT;
     }
 
-    long startingSnapshotId = table.currentSnapshot().snapshotId();
+    Preconditions.checkArgument(
+        table.snapshot(branch) != null,
+        "Cannot rewrite data files for branch %s: branch does not exist",
+        branch);
+
+    long startingSnapshotId = table.snapshot(branch).snapshotId();
 
     init(startingSnapshotId);
 
@@ -230,7 +243,7 @@ public class RewriteDataFilesSparkAction
   @VisibleForTesting
   RewriteDataFilesCommitManager commitManager(long startingSnapshotId) {
     return new RewriteDataFilesCommitManager(
-        table, startingSnapshotId, useStartingSequenceNumber, commitSummary());
+        table, startingSnapshotId, useStartingSequenceNumber, commitSummary(), 
branch);
   }
 
   private Builder doExecute(
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
index f1958a5a17..60ed7b84d4 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.spark.procedures;
 import java.util.List;
 import java.util.Map;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.SortOrder;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.actions.RewriteDataFiles;
@@ -30,6 +31,7 @@ import org.apache.iceberg.expressions.Zorder;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.ExtendedParser;
+import org.apache.iceberg.spark.actions.RewriteDataFilesSparkAction;
 import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.connector.catalog.Identifier;
@@ -57,10 +59,12 @@ class RewriteDataFilesProcedure extends BaseProcedure {
       ProcedureParameter.optional("options", STRING_MAP);
   private static final ProcedureParameter WHERE_PARAM =
       ProcedureParameter.optional("where", DataTypes.StringType);
+  private static final ProcedureParameter BRANCH_PARAM =
+      ProcedureParameter.optional("branch", DataTypes.StringType);
 
   private static final ProcedureParameter[] PARAMETERS =
       new ProcedureParameter[] {
-        TABLE_PARAM, STRATEGY_PARAM, SORT_ORDER_PARAM, OPTIONS_PARAM, 
WHERE_PARAM
+        TABLE_PARAM, STRATEGY_PARAM, SORT_ORDER_PARAM, OPTIONS_PARAM, 
WHERE_PARAM, BRANCH_PARAM
       };
 
   // counts are not nullable since the action result is never null
@@ -109,17 +113,29 @@ class RewriteDataFilesProcedure extends BaseProcedure {
     String sortOrderString = input.asString(SORT_ORDER_PARAM, null);
     Map<String, String> options = input.asStringMap(OPTIONS_PARAM, 
ImmutableMap.of());
     String where = input.asString(WHERE_PARAM, null);
+    String branchParam = input.asString(BRANCH_PARAM, null);
+    if (branchParam == null) {
+      branchParam = loadSparkTable(tableIdent).branch();
+      if (branchParam == null) {
+        branchParam = SnapshotRef.MAIN_BRANCH;
+      }
+    }
+    String branch = branchParam;
 
     return modifyIcebergTable(
         tableIdent,
         table -> {
-          RewriteDataFiles action = 
actions().rewriteDataFiles(table).options(options);
+          RewriteDataFilesSparkAction action =
+              (RewriteDataFilesSparkAction)
+                  
actions().rewriteDataFiles(table).options(options).toBranch(branch);
 
           if (strategy != null || sortOrderString != null) {
-            action = checkAndApplyStrategy(action, strategy, sortOrderString, 
table.schema());
+            action =
+                (RewriteDataFilesSparkAction)
+                    checkAndApplyStrategy(action, strategy, sortOrderString, 
table.schema());
           }
 
-          action = checkAndApplyFilter(action, where, tableIdent);
+          action = (RewriteDataFilesSparkAction) checkAndApplyFilter(action, 
where, tableIdent);
 
           RewriteDataFiles.Result result = action.execute();
 
diff --git 
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
 
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index bd6d5abcd0..6b8244a331 100644
--- 
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++ 
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -50,6 +50,7 @@ import org.apache.iceberg.spark.ExtendedParser;
 import org.apache.iceberg.spark.SparkCatalogConfig;
 import org.apache.iceberg.spark.SparkTableCache;
 import org.apache.iceberg.spark.SystemFunctionPushDownHelper;
+import org.apache.iceberg.spark.actions.SparkActions;
 import org.apache.iceberg.spark.source.ThreeColumnRecord;
 import org.apache.spark.sql.AnalysisException;
 import org.apache.spark.sql.Dataset;
@@ -1105,4 +1106,209 @@ public class TestRewriteDataFilesProcedure extends 
ExtensionsTestBase {
   private List<Object[]> currentData(String table) {
     return rowsToJava(spark.sql("SELECT * FROM " + table + " order by c1, c2, 
c3").collectAsList());
   }
+
+  @TestTemplate
+  public void testRewriteDataFilesOnBranch() {
+    createTable();
+    insertData(10);
+
+    String branchName = "testBranch";
+    sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branchName);
+
+    List<Object[]> expectedRecords =
+        rowsToJava(
+            spark
+                .sql(
+                    String.format(
+                        "SELECT * FROM %s.branch_%s ORDER BY c1, c2, c3", 
tableName, branchName))
+                .collectAsList());
+
+    // Get snapshot IDs before rewrite
+    Table table = validationCatalog.loadTable(tableIdent);
+    long mainSnapshotId = table.currentSnapshot().snapshotId();
+    long branchSnapshotId = table.refs().get(branchName).snapshotId();
+
+    // Call rewrite_data_files on the branch
+    List<Object[]> output =
+        sql(
+            "CALL %s.system.rewrite_data_files(table => '%s', branch => '%s')",
+            catalogName, tableName, branchName);
+
+    assertEquals(
+        "Action should rewrite 10 data files and add 1 data file",
+        row(10, 1),
+        Arrays.copyOf(output.get(0), 2));
+
+    // Verify branch data is preserved after compaction
+    List<Object[]> actualRecords =
+        rowsToJava(
+            spark
+                .sql(
+                    String.format(
+                        "SELECT * FROM %s.branch_%s ORDER BY c1, c2, c3", 
tableName, branchName))
+                .collectAsList());
+    assertEquals("Data after compaction should not change", expectedRecords, 
actualRecords);
+
+    // Verify branch snapshot changed
+    table.refresh();
+    assertThat(table.refs().get(branchName).snapshotId())
+        .as("Branch snapshot should be updated when files are rewritten")
+        .isNotEqualTo(branchSnapshotId);
+
+    // Verify main snapshot unchanged
+    assertThat(table.currentSnapshot().snapshotId())
+        .as("Main snapshot should remain unchanged")
+        .isEqualTo(mainSnapshotId);
+  }
+
+  @TestTemplate
+  public void testRewriteDataFilesToNullBranchFails() {
+    createTable();
+    insertData(10);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    assertThatThrownBy(() -> 
SparkActions.get(spark).rewriteDataFiles(table).toBranch(null))
+        .as("Invalid branch")
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid branch name: null");
+  }
+
+  @TestTemplate
+  public void testRewriteDataFilesOnBranchWithFilter() {
+    createPartitionTable();
+    insertData(10);
+
+    String branchName = "filteredBranch";
+    sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branchName);
+
+    List<Object[]> expectedRecords =
+        rowsToJava(
+            spark
+                .sql(
+                    String.format(
+                        "SELECT * FROM %s.branch_%s ORDER BY c1, c2, c3", 
tableName, branchName))
+                .collectAsList());
+
+    // Get snapshot IDs before rewrite
+    Table table = validationCatalog.loadTable(tableIdent);
+    long mainSnapshotId = table.currentSnapshot().snapshotId();
+    long branchSnapshotId = table.refs().get(branchName).snapshotId();
+
+    // Call rewrite_data_files on the branch with filter (select only 
partition c2 = 'bar')
+    List<Object[]> output =
+        sql(
+            "CALL %s.system.rewrite_data_files(table => '%s', branch => '%s', 
where => 'c2 = \"bar\"')",
+            catalogName, tableName, branchName);
+
+    assertEquals(
+        "Action should rewrite 5 data files from single matching partition"
+            + "(containing c2 = bar) and add 1 data file",
+        row(5, 1),
+        Arrays.copyOf(output.get(0), 2));
+
+    // Verify branch data is preserved after compaction
+    List<Object[]> actualRecords =
+        rowsToJava(
+            spark
+                .sql(
+                    String.format(
+                        "SELECT * FROM %s.branch_%s ORDER BY c1, c2, c3", 
tableName, branchName))
+                .collectAsList());
+    assertEquals("Data after compaction should not change", expectedRecords, 
actualRecords);
+
+    // Verify branch snapshot changed after rewrite
+    table.refresh();
+    assertThat(table.refs().get(branchName).snapshotId())
+        .as("Branch snapshot should be updated when files are rewritten")
+        .isNotEqualTo(branchSnapshotId);
+
+    // Verify main snapshot unchanged
+    assertThat(table.currentSnapshot().snapshotId())
+        .as("Main snapshot should remain unchanged")
+        .isEqualTo(mainSnapshotId);
+  }
+
+  @TestTemplate
+  public void testBranchCompactionDoesNotAffectMain() {
+    createTable();
+    // create 10 files under non-partitioned table
+    insertData(10);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    // Create branch from current main state
+    String branchName = "compactionBranch";
+    sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branchName);
+
+    // Add more data to MAIN to make it diverge from branch
+    insertData(tableName, 10);
+
+    // Refresh to get new main snapshot after divergence
+    table.refresh();
+    long mainSnapshotAfterDivergence = table.currentSnapshot().snapshotId();
+    List<Object[]> expectedMainRecords = currentData();
+
+    // Get branch data before compaction
+    List<Object[]> expectedBranchRecords =
+        rowsToJava(
+            spark
+                .sql(
+                    String.format(
+                        "SELECT * FROM %s.branch_%s ORDER BY c1, c2, c3", 
tableName, branchName))
+                .collectAsList());
+
+    long branchSnapshotBeforeCompaction = 
table.refs().get(branchName).snapshotId();
+
+    // Verify that branch and main have diverged
+    assertThat(branchSnapshotBeforeCompaction)
+        .as("Branch and main should have different snapshots")
+        .isNotEqualTo(mainSnapshotAfterDivergence);
+
+    List<Object[]> output =
+        sql(
+            "CALL %s.system.rewrite_data_files(table => '%s', branch => '%s')",
+            catalogName, tableName, branchName);
+
+    assertEquals(
+        "Action should rewrite 10 data files and add 1 data file",
+        row(10, 1),
+        Arrays.copyOf(output.get(0), 2));
+
+    table.refresh();
+
+    // Verify main snapshot unchanged
+    assertThat(table.currentSnapshot().snapshotId())
+        .as("Main snapshot ID must remain unchanged after branch compaction")
+        .isEqualTo(mainSnapshotAfterDivergence);
+
+    // Verify main data unchanged
+    List<Object[]> actualMainRecords = currentData();
+    assertEquals(
+        "Main data after compaction should not change", expectedMainRecords, 
actualMainRecords);
+
+    // Verify branch data unchanged
+    List<Object[]> actualBranchRecords =
+        rowsToJava(
+            spark
+                .sql(
+                    String.format(
+                        "SELECT * FROM %s.branch_%s ORDER BY c1, c2, c3", 
tableName, branchName))
+                .collectAsList());
+    assertEquals(
+        "Branch data after compaction should not change",
+        expectedBranchRecords,
+        actualBranchRecords);
+
+    // Verify branch snapshot changed
+    long branchSnapshotAfterCompaction = 
table.refs().get(branchName).snapshotId();
+    assertThat(branchSnapshotAfterCompaction)
+        .as("Branch snapshot must be updated after compaction")
+        .isNotEqualTo(branchSnapshotBeforeCompaction);
+
+    // Verify the new branch snapshot is a child of the previous branch 
snapshot
+    assertThat(table.snapshot(branchSnapshotAfterCompaction).parentId())
+        .as("New branch snapshot must be a child of the previous branch 
snapshot")
+        .isEqualTo(branchSnapshotBeforeCompaction);
+  }
 }
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
index 61d1fdfe60..3415b6a551 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.stream.Collectors;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.SortOrder;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
@@ -96,6 +97,7 @@ public class RewriteDataFilesSparkAction
   private boolean removeDanglingDeletes;
   private boolean useStartingSequenceNumber;
   private boolean caseSensitive;
+  private String branch = SnapshotRef.MAIN_BRANCH;
   private BinPackRewriteFilePlanner planner = null;
   private FileRewriteRunner<FileGroupInfo, FileScanTask, DataFile, 
RewriteFileGroup> runner = null;
 
@@ -157,13 +159,24 @@ public class RewriteDataFilesSparkAction
     return this;
   }
 
+  public RewriteDataFilesSparkAction toBranch(String targetBranch) {
+    Preconditions.checkArgument(targetBranch != null, "Invalid branch name: 
null");
+    this.branch = targetBranch;
+    return this;
+  }
+
   @Override
   public RewriteDataFiles.Result execute() {
     if (table.currentSnapshot() == null) {
       return EMPTY_RESULT;
     }
 
-    long startingSnapshotId = table.currentSnapshot().snapshotId();
+    Preconditions.checkArgument(
+        table.snapshot(branch) != null,
+        "Cannot rewrite data files for branch %s: branch does not exist",
+        branch);
+
+    long startingSnapshotId = table.snapshot(branch).snapshotId();
 
     init(startingSnapshotId);
 
@@ -230,7 +243,7 @@ public class RewriteDataFilesSparkAction
   @VisibleForTesting
   RewriteDataFilesCommitManager commitManager(long startingSnapshotId) {
     return new RewriteDataFilesCommitManager(
-        table, startingSnapshotId, useStartingSequenceNumber, commitSummary());
+        table, startingSnapshotId, useStartingSequenceNumber, commitSummary(), 
branch);
   }
 
   private Builder doExecute(
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
index 3e4f9ea587..5ab07f4d0a 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
@@ -22,6 +22,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.SortOrder;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.actions.RewriteDataFiles;
@@ -31,6 +32,7 @@ import org.apache.iceberg.expressions.Zorder;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.ExtendedParser;
+import org.apache.iceberg.spark.actions.RewriteDataFilesSparkAction;
 import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.connector.catalog.Identifier;
@@ -62,10 +64,12 @@ class RewriteDataFilesProcedure extends BaseProcedure {
       optionalInParameter("options", STRING_MAP);
   private static final ProcedureParameter WHERE_PARAM =
       optionalInParameter("where", DataTypes.StringType);
+  private static final ProcedureParameter BRANCH_PARAM =
+      optionalInParameter("branch", DataTypes.StringType);
 
   private static final ProcedureParameter[] PARAMETERS =
       new ProcedureParameter[] {
-        TABLE_PARAM, STRATEGY_PARAM, SORT_ORDER_PARAM, OPTIONS_PARAM, 
WHERE_PARAM
+        TABLE_PARAM, STRATEGY_PARAM, SORT_ORDER_PARAM, OPTIONS_PARAM, 
WHERE_PARAM, BRANCH_PARAM
       };
 
   // counts are not nullable since the action result is never null
@@ -114,11 +118,21 @@ class RewriteDataFilesProcedure extends BaseProcedure {
     String sortOrderString = input.asString(SORT_ORDER_PARAM, null);
     Map<String, String> options = input.asStringMap(OPTIONS_PARAM, 
ImmutableMap.of());
     String where = input.asString(WHERE_PARAM, null);
+    // Determine target branch: explicit parameter > table branch > main branch
+    String branchParam = input.asString(BRANCH_PARAM, null);
+    if (branchParam == null) {
+      branchParam = loadSparkTable(tableIdent).branch();
+      if (branchParam == null) {
+        branchParam = SnapshotRef.MAIN_BRANCH;
+      }
+    }
+    String branch = branchParam;
 
     return modifyIcebergTable(
         tableIdent,
         table -> {
-          RewriteDataFiles action = 
actions().rewriteDataFiles(table).options(options);
+          RewriteDataFilesSparkAction action =
+              
actions().rewriteDataFiles(table).options(options).toBranch(branch);
 
           if (strategy != null || sortOrderString != null) {
             action = checkAndApplyStrategy(action, strategy, sortOrderString, 
table.schema());
@@ -132,8 +146,8 @@ class RewriteDataFilesProcedure extends BaseProcedure {
         });
   }
 
-  private RewriteDataFiles checkAndApplyFilter(
-      RewriteDataFiles action, String where, Identifier ident) {
+  private RewriteDataFilesSparkAction checkAndApplyFilter(
+      RewriteDataFilesSparkAction action, String where, Identifier ident) {
     if (where != null) {
       Expression expression = filterExpression(ident, where);
       return action.filter(expression);
@@ -141,8 +155,8 @@ class RewriteDataFilesProcedure extends BaseProcedure {
     return action;
   }
 
-  private RewriteDataFiles checkAndApplyStrategy(
-      RewriteDataFiles action, String strategy, String sortOrderString, Schema 
schema) {
+  private RewriteDataFilesSparkAction checkAndApplyStrategy(
+      RewriteDataFilesSparkAction action, String strategy, String 
sortOrderString, Schema schema) {
     List<Zorder> zOrderTerms = Lists.newArrayList();
     List<ExtendedParser.RawOrderField> sortOrderFields = Lists.newArrayList();
     if (sortOrderString != null) {
@@ -179,13 +193,13 @@ class RewriteDataFilesProcedure extends BaseProcedure {
       }
     }
     if (strategy.equalsIgnoreCase("binpack")) {
-      RewriteDataFiles rewriteDataFiles = action.binPack();
+      RewriteDataFilesSparkAction binPackAction = action.binPack();
       if (sortOrderString != null) {
         // calling below method to throw the error as user has set both 
binpack strategy and sort
         // order
-        return rewriteDataFiles.sort(buildSortOrder(sortOrderFields, schema));
+        return binPackAction.sort(buildSortOrder(sortOrderFields, schema));
       }
-      return rewriteDataFiles;
+      return binPackAction;
     } else {
       throw new IllegalArgumentException(
           "unsupported strategy: " + strategy + ". Only binpack or sort is 
supported");


Reply via email to