This is an automated email from the ASF dual-hosted git repository.

huaxingao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 386f0ea6ab Flink: Backport: Add branch support to RewriteDataFiles 
maintenance task (#15672) (#15690)
386f0ea6ab is described below

commit 386f0ea6abe6cd988178ee922b7cceaaf0d2d46b
Author: Maximilian Michels <[email protected]>
AuthorDate: Thu Mar 19 23:49:42 2026 +0100

    Flink: Backport: Add branch support to RewriteDataFiles maintenance task 
(#15672) (#15690)
---
 .../flink/maintenance/api/RewriteDataFiles.java    | 20 +++++++++-
 .../operator/DataFileRewriteCommitter.java         | 19 +++++++--
 .../operator/DataFileRewritePlanner.java           | 24 +++++++++---
 .../operator/DataFileRewriteRunner.java            |  2 +-
 .../org/apache/iceberg/flink/sink/IcebergSink.java |  5 ++-
 .../maintenance/api/TestRewriteDataFiles.java      | 35 +++++++++++++++++
 .../flink/maintenance/operator/RewriteUtil.java    |  4 +-
 .../operator/TestDataFileRewriteCommitter.java     |  4 +-
 .../operator/TestDataFileRewritePlanner.java       | 45 +++++++++++++++++++++-
 .../operator/TestDataFileRewriteRunner.java        |  4 +-
 .../flink/maintenance/api/RewriteDataFiles.java    | 20 +++++++++-
 .../operator/DataFileRewriteCommitter.java         | 19 +++++++--
 .../operator/DataFileRewritePlanner.java           | 24 +++++++++---
 .../operator/DataFileRewriteRunner.java            |  2 +-
 .../org/apache/iceberg/flink/sink/IcebergSink.java |  5 ++-
 .../maintenance/api/TestRewriteDataFiles.java      | 35 +++++++++++++++++
 .../flink/maintenance/operator/RewriteUtil.java    |  4 +-
 .../operator/TestDataFileRewriteCommitter.java     |  4 +-
 .../operator/TestDataFileRewritePlanner.java       | 45 +++++++++++++++++++++-
 .../operator/TestDataFileRewriteRunner.java        |  4 +-
 20 files changed, 288 insertions(+), 36 deletions(-)

diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
index 3b64a79eee..9aeee75b14 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.actions.BinPackRewriteFilePlanner;
 import org.apache.iceberg.actions.SizeBasedFileRewritePlanner;
 import org.apache.iceberg.expressions.Expression;
@@ -59,6 +60,7 @@ public class RewriteDataFiles {
     private final Map<String, String> rewriteOptions = 
Maps.newHashMapWithExpectedSize(6);
     private long maxRewriteBytes = Long.MAX_VALUE;
     private Expression filter = Expressions.alwaysTrue();
+    private String branch = SnapshotRef.MAIN_BRANCH;
 
     @Override
     String maintenanceTaskName() {
@@ -218,6 +220,18 @@ public class RewriteDataFiles {
       return this;
     }
 
+    /**
+     * Sets the branch to compact. When set, the planner reads from the 
branch's snapshot and
+     * commits are made to this branch.
+     *
+     * @param newBranch the branch name
+     * @return this for method chaining
+     */
+    public Builder branch(String newBranch) {
+      this.branch = newBranch;
+      return this;
+    }
+
     /**
      * Configures the properties for the rewriter.
      *
@@ -262,7 +276,8 @@ public class RewriteDataFiles {
                       partialProgressEnabled ? partialProgressMaxCommits : 1,
                       maxRewriteBytes,
                       rewriteOptions,
-                      filter))
+                      filter,
+                      branch))
               .name(operatorName(PLANNER_TASK_NAME))
               .uid(PLANNER_TASK_NAME + uidSuffix())
               .slotSharingGroup(slotSharingGroup())
@@ -282,7 +297,8 @@ public class RewriteDataFiles {
               .transform(
                   operatorName(COMMIT_TASK_NAME),
                   TypeInformation.of(Trigger.class),
-                  new DataFileRewriteCommitter(tableName(), taskName(), 
index(), tableLoader()))
+                  new DataFileRewriteCommitter(
+                      tableName(), taskName(), index(), tableLoader(), branch))
               .uid(COMMIT_TASK_NAME + uidSuffix())
               .slotSharingGroup(slotSharingGroup())
               .forceNonParallel();
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteCommitter.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteCommitter.java
index 135d3d9b42..1125e5d9b6 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteCommitter.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteCommitter.java
@@ -29,12 +29,14 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFiles;
 import org.apache.iceberg.actions.RewriteDataFilesCommitManager;
 import org.apache.iceberg.actions.RewriteDataFilesCommitManager.CommitService;
 import org.apache.iceberg.actions.RewriteFileGroup;
 import org.apache.iceberg.flink.TableLoader;
 import org.apache.iceberg.flink.maintenance.api.Trigger;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,6 +54,7 @@ public class DataFileRewriteCommitter extends 
AbstractStreamOperator<Trigger>
   private final String taskName;
   private final int taskIndex;
   private final TableLoader tableLoader;
+  private final String branch;
 
   private transient Table table;
   private transient CommitService commitService;
@@ -62,15 +65,17 @@ public class DataFileRewriteCommitter extends 
AbstractStreamOperator<Trigger>
   private transient Counter removedDataFileSizeCounter;
 
   public DataFileRewriteCommitter(
-      String tableName, String taskName, int taskIndex, TableLoader 
tableLoader) {
+      String tableName, String taskName, int taskIndex, TableLoader 
tableLoader, String branch) {
     Preconditions.checkNotNull(tableName, "Table name should no be null");
     Preconditions.checkNotNull(taskName, "Task name should no be null");
     Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
+    Preconditions.checkNotNull(branch, "Branch should not be null");
 
     this.tableName = tableName;
     this.taskName = taskName;
     this.taskIndex = taskIndex;
     this.tableLoader = tableLoader;
+    this.branch = branch;
   }
 
   @Override
@@ -103,7 +108,7 @@ public class DataFileRewriteCommitter extends 
AbstractStreamOperator<Trigger>
 
         FlinkRewriteDataFilesCommitManager commitManager =
             new FlinkRewriteDataFilesCommitManager(
-                table, executedGroup.snapshotId(), 
streamRecord.getTimestamp());
+                table, executedGroup.snapshotId(), 
streamRecord.getTimestamp(), branch);
         this.commitService = 
commitManager.service(executedGroup.groupsPerCommit());
         commitService.start();
       }
@@ -164,8 +169,14 @@ public class DataFileRewriteCommitter extends 
AbstractStreamOperator<Trigger>
   private class FlinkRewriteDataFilesCommitManager extends 
RewriteDataFilesCommitManager {
     private final long timestamp;
 
-    FlinkRewriteDataFilesCommitManager(Table table, long startingSnapshotId, 
long timestamp) {
-      super(table, startingSnapshotId);
+    FlinkRewriteDataFilesCommitManager(
+        Table table, long startingSnapshotId, long timestamp, String branch) {
+      super(
+          table,
+          startingSnapshotId,
+          RewriteDataFiles.USE_STARTING_SEQUENCE_NUMBER_DEFAULT,
+          ImmutableMap.of(),
+          branch);
       this.timestamp = timestamp;
     }
 
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java
index 6751caeb28..feb2dd26c8 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java
@@ -29,6 +29,7 @@ import org.apache.flink.util.Collector;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.actions.BinPackRewriteFilePlanner;
 import org.apache.iceberg.actions.FileRewritePlan;
 import org.apache.iceberg.actions.RewriteDataFiles;
@@ -62,6 +63,7 @@ public class DataFileRewritePlanner
   private final Map<String, String> rewriterOptions;
   private transient Counter errorCounter;
   private final Expression filter;
+  private final String branch;
 
   public DataFileRewritePlanner(
       String tableName,
@@ -71,12 +73,14 @@ public class DataFileRewritePlanner
       int newPartialProgressMaxCommits,
       long maxRewriteBytes,
       Map<String, String> rewriterOptions,
-      Expression filter) {
+      Expression filter,
+      String branch) {
 
     Preconditions.checkNotNull(tableName, "Table name should no be null");
     Preconditions.checkNotNull(taskName, "Task name should no be null");
     Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
     Preconditions.checkNotNull(rewriterOptions, "Options map should no be 
null");
+    Preconditions.checkNotNull(branch, "Branch should no be null");
 
     this.tableName = tableName;
     this.taskName = taskName;
@@ -86,6 +90,7 @@ public class DataFileRewritePlanner
     this.maxRewriteBytes = maxRewriteBytes;
     this.rewriterOptions = rewriterOptions;
     this.filter = filter;
+    this.branch = branch;
   }
 
   @Override
@@ -108,7 +113,8 @@ public class DataFileRewritePlanner
     try {
       SerializableTable table =
           (SerializableTable) 
SerializableTable.copyOf(tableLoader.loadTable());
-      if (table.currentSnapshot() == null) {
+      Snapshot snapshot = table.snapshot(branch);
+      if (snapshot == null) {
         LOG.info(
             DataFileRewritePlanner.MESSAGE_PREFIX + "Nothing to plan for in an 
empty table",
             tableName,
@@ -118,7 +124,8 @@ public class DataFileRewritePlanner
         return;
       }
 
-      BinPackRewriteFilePlanner planner = new BinPackRewriteFilePlanner(table, 
filter);
+      BinPackRewriteFilePlanner planner =
+          new BinPackRewriteFilePlanner(table, filter, snapshot.snapshotId(), 
false);
       planner.init(rewriterOptions);
 
       FileRewritePlan<RewriteDataFiles.FileGroupInfo, FileScanTask, DataFile, 
RewriteFileGroup>
@@ -164,7 +171,7 @@ public class DataFileRewritePlanner
             taskIndex,
             ctx.timestamp(),
             group);
-        out.collect(new PlannedGroup(table, groupsPerCommit, group));
+        out.collect(new PlannedGroup(table, groupsPerCommit, group, branch));
       }
     } catch (Exception e) {
       LOG.warn(
@@ -189,11 +196,14 @@ public class DataFileRewritePlanner
     private final SerializableTable table;
     private final int groupsPerCommit;
     private final RewriteFileGroup group;
+    private final String branch;
 
-    private PlannedGroup(SerializableTable table, int groupsPerCommit, 
RewriteFileGroup group) {
+    private PlannedGroup(
+        SerializableTable table, int groupsPerCommit, RewriteFileGroup group, 
String branch) {
       this.table = table;
       this.groupsPerCommit = groupsPerCommit;
       this.group = group;
+      this.branch = branch;
     }
 
     SerializableTable table() {
@@ -207,5 +217,9 @@ public class DataFileRewritePlanner
     RewriteFileGroup group() {
       return group;
     }
+
+    String branch() {
+      return branch;
+    }
   }
 }
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java
index 1e8db128e9..f7a53c7fca 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java
@@ -124,7 +124,7 @@ public class DataFileRewriteRunner
         value.group().setOutputFiles(dataFiles);
         out.collect(
             new ExecutedGroup(
-                value.table().currentSnapshot().snapshotId(),
+                value.table().snapshot(value.branch()).snapshotId(),
                 value.groupsPerCommit(),
                 value.group()));
         if (LOG.isDebugEnabled()) {
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
index 0d3e4a34d9..b1a58a6d3a 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
@@ -769,7 +769,10 @@ public class IcebergSink
       if (flinkWriteConf.compactMode()) {
         RewriteDataFilesConfig rewriteDataFilesConfig =
             flinkMaintenanceConfig.createRewriteDataFilesConfig();
-        
maintenanceTasks.add(RewriteDataFiles.builder().config(rewriteDataFilesConfig));
+        maintenanceTasks.add(
+            RewriteDataFiles.builder()
+                .branch(flinkWriteConf.branch())
+                .config(rewriteDataFilesConfig));
       }
 
       if (flinkWriteConf.expireSnapshotsMode()) {
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
index 707038c925..bb53b52656 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
@@ -529,6 +529,41 @@ class TestRewriteDataFiles extends MaintenanceTaskTestBase 
{
             createRecord(4, "d")));
   }
 
+  @Test
+  void testBranch() throws Exception {
+    Table table = createTable();
+    insert(table, 1, "a");
+    insert(table, 2, "b");
+
+    // Create branch based on above inserts
+    String branchName = "test-branch";
+    table.manageSnapshots().createBranch(branchName).commit();
+
+    // Insert another file on main only (main has 3 files, branch stays at 2)
+    insert(table, 3, "c");
+
+    
appendRewriteDataFiles(RewriteDataFiles.builder().rewriteAll(true).branch(branchName));
+
+    runAndWaitForSuccess(infra.env(), infra.source(), infra.sink());
+
+    table.refresh();
+
+    // Branch should be compacted from 2 files to 1
+    assertThat(
+            table.snapshot(branchName).dataManifests(table.io()).stream()
+                .flatMap(
+                    m ->
+                        StreamSupport.stream(
+                            ManifestFiles.read(m, table.io(), 
table.specs()).spliterator(), false))
+                .count())
+        .isEqualTo(1);
+    SimpleDataUtil.assertTableRecords(
+        table, ImmutableList.of(createRecord(1, "a"), createRecord(2, "b")), 
branchName);
+
+    // Main should be untouched with 3 files
+    assertFileNum(table, 3, 0);
+  }
+
   private void appendRewriteDataFiles() {
     appendRewriteDataFiles(RewriteDataFiles.builder().rewriteAll(true));
   }
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java
index 95992ccd97..9b19a50b09 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java
@@ -27,6 +27,7 @@ import java.util.Set;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses;
 import org.apache.iceberg.DataFile;
+import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.flink.TableLoader;
@@ -55,7 +56,8 @@ class RewriteUtil {
                     11,
                     10_000_000L,
                     rewriterOptions,
-                    Expressions.alwaysTrue()))) {
+                    Expressions.alwaysTrue(),
+                    SnapshotRef.MAIN_BRANCH))) {
       testHarness.open();
 
       OperatorTestBase.trigger(testHarness);
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteCommitter.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteCommitter.java
index 9e8f2ec921..cdd40eb2a2 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteCommitter.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteCommitter.java
@@ -31,6 +31,7 @@ import java.util.stream.Collectors;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.DataFile;
+import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.flink.maintenance.api.Trigger;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
@@ -225,7 +226,8 @@ class TestDataFileRewriteCommitter extends OperatorTestBase 
{
             OperatorTestBase.DUMMY_TABLE_NAME,
             OperatorTestBase.DUMMY_TABLE_NAME,
             0,
-            tableLoader()));
+            tableLoader(),
+            SnapshotRef.MAIN_BRANCH));
   }
 
   private static DataFileRewriteRunner.ExecutedGroup setBatchSizeToTwo(
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
index cb1a41bb43..16d524f05c 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
@@ -33,6 +33,7 @@ import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileContent;
 import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.flink.maintenance.api.Trigger;
@@ -106,7 +107,8 @@ class TestDataFileRewritePlanner extends OperatorTestBase {
                     11,
                     1L,
                     ImmutableMap.of(MIN_INPUT_FILES, "2"),
-                    Expressions.alwaysTrue()))) {
+                    Expressions.alwaysTrue(),
+                    SnapshotRef.MAIN_BRANCH))) {
       testHarness.open();
 
       // Cause an exception
@@ -172,7 +174,8 @@ class TestDataFileRewritePlanner extends OperatorTestBase {
                     11,
                     maxRewriteBytes,
                     ImmutableMap.of(MIN_INPUT_FILES, "2"),
-                    Expressions.alwaysTrue()))) {
+                    Expressions.alwaysTrue(),
+                    SnapshotRef.MAIN_BRANCH))) {
       testHarness.open();
 
       OperatorTestBase.trigger(testHarness);
@@ -202,6 +205,44 @@ class TestDataFileRewritePlanner extends OperatorTestBase {
     assertThat(planWithMaxFileGroupCount).hasSize(3);
   }
 
+  @Test
+  void testBranch() throws Exception {
+    Table table = createTable();
+    insert(table, 1, "a");
+    insert(table, 2, "b");
+
+    String branchName = "test-branch";
+    table.manageSnapshots().createBranch(branchName).commit();
+
+    // Insert more data on main only
+    insert(table, 3, "c");
+
+    try (OneInputStreamOperatorTestHarness<Trigger, 
DataFileRewritePlanner.PlannedGroup>
+        testHarness =
+            ProcessFunctionTestHarnesses.forProcessFunction(
+                new DataFileRewritePlanner(
+                    OperatorTestBase.DUMMY_TABLE_NAME,
+                    OperatorTestBase.DUMMY_TABLE_NAME,
+                    0,
+                    tableLoader(),
+                    11,
+                    10_000_000L,
+                    ImmutableMap.of(MIN_INPUT_FILES, "2"),
+                    Expressions.alwaysTrue(),
+                    branchName))) {
+      testHarness.open();
+
+      trigger(testHarness);
+
+      
assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).isNull();
+      List<DataFileRewritePlanner.PlannedGroup> planned = 
testHarness.extractOutputValues();
+      assertThat(planned).hasSize(1);
+      // Branch has 2 files, main has 3
+      assertThat(planned.get(0).group().fileScanTasks()).hasSize(2);
+      assertThat(planned.get(0).branch()).isEqualTo(branchName);
+    }
+  }
+
   void assertRewriteFileGroup(
       DataFileRewritePlanner.PlannedGroup plannedGroup, Table table, 
Set<DataFile> files) {
     assertThat(plannedGroup.table().currentSnapshot().snapshotId())
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java
index 4e21c7a956..9202a1df92 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java
@@ -38,6 +38,7 @@ import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionData;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.data.GenericAppenderHelper;
@@ -308,7 +309,8 @@ class TestDataFileRewriteRunner extends OperatorTestBase {
                         "2",
                         TARGET_FILE_SIZE_BYTES,
                         String.valueOf(targetFileSize)),
-                    Expressions.alwaysTrue()))) {
+                    Expressions.alwaysTrue(),
+                    SnapshotRef.MAIN_BRANCH))) {
       testHarness.open();
 
       OperatorTestBase.trigger(testHarness);
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
index 3b64a79eee..9aeee75b14 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.actions.BinPackRewriteFilePlanner;
 import org.apache.iceberg.actions.SizeBasedFileRewritePlanner;
 import org.apache.iceberg.expressions.Expression;
@@ -59,6 +60,7 @@ public class RewriteDataFiles {
     private final Map<String, String> rewriteOptions = 
Maps.newHashMapWithExpectedSize(6);
     private long maxRewriteBytes = Long.MAX_VALUE;
     private Expression filter = Expressions.alwaysTrue();
+    private String branch = SnapshotRef.MAIN_BRANCH;
 
     @Override
     String maintenanceTaskName() {
@@ -218,6 +220,18 @@ public class RewriteDataFiles {
       return this;
     }
 
+    /**
+     * Sets the branch to compact. When set, the planner reads from the 
branch's snapshot and
+     * commits are made to this branch.
+     *
+     * @param newBranch the branch name
+     * @return this for method chaining
+     */
+    public Builder branch(String newBranch) {
+      this.branch = newBranch;
+      return this;
+    }
+
     /**
      * Configures the properties for the rewriter.
      *
@@ -262,7 +276,8 @@ public class RewriteDataFiles {
                       partialProgressEnabled ? partialProgressMaxCommits : 1,
                       maxRewriteBytes,
                       rewriteOptions,
-                      filter))
+                      filter,
+                      branch))
               .name(operatorName(PLANNER_TASK_NAME))
               .uid(PLANNER_TASK_NAME + uidSuffix())
               .slotSharingGroup(slotSharingGroup())
@@ -282,7 +297,8 @@ public class RewriteDataFiles {
               .transform(
                   operatorName(COMMIT_TASK_NAME),
                   TypeInformation.of(Trigger.class),
-                  new DataFileRewriteCommitter(tableName(), taskName(), 
index(), tableLoader()))
+                  new DataFileRewriteCommitter(
+                      tableName(), taskName(), index(), tableLoader(), branch))
               .uid(COMMIT_TASK_NAME + uidSuffix())
               .slotSharingGroup(slotSharingGroup())
               .forceNonParallel();
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteCommitter.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteCommitter.java
index 135d3d9b42..1125e5d9b6 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteCommitter.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteCommitter.java
@@ -29,12 +29,14 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFiles;
 import org.apache.iceberg.actions.RewriteDataFilesCommitManager;
 import org.apache.iceberg.actions.RewriteDataFilesCommitManager.CommitService;
 import org.apache.iceberg.actions.RewriteFileGroup;
 import org.apache.iceberg.flink.TableLoader;
 import org.apache.iceberg.flink.maintenance.api.Trigger;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,6 +54,7 @@ public class DataFileRewriteCommitter extends 
AbstractStreamOperator<Trigger>
   private final String taskName;
   private final int taskIndex;
   private final TableLoader tableLoader;
+  private final String branch;
 
   private transient Table table;
   private transient CommitService commitService;
@@ -62,15 +65,17 @@ public class DataFileRewriteCommitter extends 
AbstractStreamOperator<Trigger>
   private transient Counter removedDataFileSizeCounter;
 
   public DataFileRewriteCommitter(
-      String tableName, String taskName, int taskIndex, TableLoader 
tableLoader) {
+      String tableName, String taskName, int taskIndex, TableLoader 
tableLoader, String branch) {
     Preconditions.checkNotNull(tableName, "Table name should no be null");
     Preconditions.checkNotNull(taskName, "Task name should no be null");
     Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
+    Preconditions.checkNotNull(branch, "Branch should not be null");
 
     this.tableName = tableName;
     this.taskName = taskName;
     this.taskIndex = taskIndex;
     this.tableLoader = tableLoader;
+    this.branch = branch;
   }
 
   @Override
@@ -103,7 +108,7 @@ public class DataFileRewriteCommitter extends 
AbstractStreamOperator<Trigger>
 
         FlinkRewriteDataFilesCommitManager commitManager =
             new FlinkRewriteDataFilesCommitManager(
-                table, executedGroup.snapshotId(), 
streamRecord.getTimestamp());
+                table, executedGroup.snapshotId(), 
streamRecord.getTimestamp(), branch);
         this.commitService = 
commitManager.service(executedGroup.groupsPerCommit());
         commitService.start();
       }
@@ -164,8 +169,14 @@ public class DataFileRewriteCommitter extends 
AbstractStreamOperator<Trigger>
   private class FlinkRewriteDataFilesCommitManager extends 
RewriteDataFilesCommitManager {
     private final long timestamp;
 
-    FlinkRewriteDataFilesCommitManager(Table table, long startingSnapshotId, 
long timestamp) {
-      super(table, startingSnapshotId);
+    FlinkRewriteDataFilesCommitManager(
+        Table table, long startingSnapshotId, long timestamp, String branch) {
+      super(
+          table,
+          startingSnapshotId,
+          RewriteDataFiles.USE_STARTING_SEQUENCE_NUMBER_DEFAULT,
+          ImmutableMap.of(),
+          branch);
       this.timestamp = timestamp;
     }
 
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java
index c50060e16a..a9360374df 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java
@@ -29,6 +29,7 @@ import org.apache.flink.util.Collector;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.actions.BinPackRewriteFilePlanner;
 import org.apache.iceberg.actions.FileRewritePlan;
 import org.apache.iceberg.actions.RewriteDataFiles;
@@ -62,6 +63,7 @@ public class DataFileRewritePlanner
   private final Map<String, String> rewriterOptions;
   private transient Counter errorCounter;
   private final Expression filter;
+  private final String branch;
 
   public DataFileRewritePlanner(
       String tableName,
@@ -71,12 +73,14 @@ public class DataFileRewritePlanner
       int newPartialProgressMaxCommits,
       long maxRewriteBytes,
       Map<String, String> rewriterOptions,
-      Expression filter) {
+      Expression filter,
+      String branch) {
 
     Preconditions.checkNotNull(tableName, "Table name should no be null");
     Preconditions.checkNotNull(taskName, "Task name should no be null");
     Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
     Preconditions.checkNotNull(rewriterOptions, "Options map should no be 
null");
+    Preconditions.checkNotNull(branch, "Branch should no be null");
 
     this.tableName = tableName;
     this.taskName = taskName;
@@ -86,6 +90,7 @@ public class DataFileRewritePlanner
     this.maxRewriteBytes = maxRewriteBytes;
     this.rewriterOptions = rewriterOptions;
     this.filter = filter;
+    this.branch = branch;
   }
 
   @Override
@@ -108,7 +113,8 @@ public class DataFileRewritePlanner
     try {
       SerializableTable table =
           (SerializableTable) 
SerializableTable.copyOf(tableLoader.loadTable());
-      if (table.currentSnapshot() == null) {
+      Snapshot snapshot = table.snapshot(branch);
+      if (snapshot == null) {
         LOG.info(
             DataFileRewritePlanner.MESSAGE_PREFIX + "Nothing to plan for in an 
empty table",
             tableName,
@@ -118,7 +124,8 @@ public class DataFileRewritePlanner
         return;
       }
 
-      BinPackRewriteFilePlanner planner = new BinPackRewriteFilePlanner(table, 
filter);
+      BinPackRewriteFilePlanner planner =
+          new BinPackRewriteFilePlanner(table, filter, snapshot.snapshotId(), 
false);
       planner.init(rewriterOptions);
 
       FileRewritePlan<RewriteDataFiles.FileGroupInfo, FileScanTask, DataFile, 
RewriteFileGroup>
@@ -164,7 +171,7 @@ public class DataFileRewritePlanner
             taskIndex,
             ctx.timestamp(),
             group);
-        out.collect(new PlannedGroup(table, groupsPerCommit, group));
+        out.collect(new PlannedGroup(table, groupsPerCommit, group, branch));
       }
     } catch (Exception e) {
       LOG.warn(
@@ -189,11 +196,14 @@ public class DataFileRewritePlanner
     private final SerializableTable table;
     private final int groupsPerCommit;
     private final RewriteFileGroup group;
+    private final String branch;
 
-    private PlannedGroup(SerializableTable table, int groupsPerCommit, 
RewriteFileGroup group) {
+    private PlannedGroup(
+        SerializableTable table, int groupsPerCommit, RewriteFileGroup group, 
String branch) {
       this.table = table;
       this.groupsPerCommit = groupsPerCommit;
       this.group = group;
+      this.branch = branch;
     }
 
     SerializableTable table() {
@@ -207,5 +217,9 @@ public class DataFileRewritePlanner
     RewriteFileGroup group() {
       return group;
     }
+
+    String branch() {
+      return branch;
+    }
   }
 }
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java
index 57b0e53d86..6fbfacf9f6 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java
@@ -124,7 +124,7 @@ public class DataFileRewriteRunner
         value.group().setOutputFiles(dataFiles);
         out.collect(
             new ExecutedGroup(
-                value.table().currentSnapshot().snapshotId(),
+                value.table().snapshot(value.branch()).snapshotId(),
                 value.groupsPerCommit(),
                 value.group()));
         if (LOG.isDebugEnabled()) {
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
index 9f90d8fd35..eaaf4ea6e4 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
@@ -769,7 +769,10 @@ public class IcebergSink
       if (flinkWriteConf.compactMode()) {
         RewriteDataFilesConfig rewriteDataFilesConfig =
             flinkMaintenanceConfig.createRewriteDataFilesConfig();
-        
maintenanceTasks.add(RewriteDataFiles.builder().config(rewriteDataFilesConfig));
+        maintenanceTasks.add(
+            RewriteDataFiles.builder()
+                .branch(flinkWriteConf.branch())
+                .config(rewriteDataFilesConfig));
       }
 
       if (flinkWriteConf.expireSnapshotsMode()) {
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
index 707038c925..bb53b52656 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
@@ -529,6 +529,41 @@ class TestRewriteDataFiles extends MaintenanceTaskTestBase 
{
             createRecord(4, "d")));
   }
 
+  @Test
+  void testBranch() throws Exception {
+    Table table = createTable();
+    insert(table, 1, "a");
+    insert(table, 2, "b");
+
+    // Create branch based on above inserts
+    String branchName = "test-branch";
+    table.manageSnapshots().createBranch(branchName).commit();
+
+    // Insert another file on main only (main has 3 files, branch stays at 2)
+    insert(table, 3, "c");
+
+    
appendRewriteDataFiles(RewriteDataFiles.builder().rewriteAll(true).branch(branchName));
+
+    runAndWaitForSuccess(infra.env(), infra.source(), infra.sink());
+
+    table.refresh();
+
+    // Branch should be compacted from 2 files to 1
+    assertThat(
+            table.snapshot(branchName).dataManifests(table.io()).stream()
+                .flatMap(
+                    m ->
+                        StreamSupport.stream(
+                            ManifestFiles.read(m, table.io(), 
table.specs()).spliterator(), false))
+                .count())
+        .isEqualTo(1);
+    SimpleDataUtil.assertTableRecords(
+        table, ImmutableList.of(createRecord(1, "a"), createRecord(2, "b")), 
branchName);
+
+    // Main should be untouched with 3 files
+    assertFileNum(table, 3, 0);
+  }
+
   private void appendRewriteDataFiles() {
     appendRewriteDataFiles(RewriteDataFiles.builder().rewriteAll(true));
   }
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java
index 95992ccd97..9b19a50b09 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java
@@ -27,6 +27,7 @@ import java.util.Set;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses;
 import org.apache.iceberg.DataFile;
+import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.flink.TableLoader;
@@ -55,7 +56,8 @@ class RewriteUtil {
                     11,
                     10_000_000L,
                     rewriterOptions,
-                    Expressions.alwaysTrue()))) {
+                    Expressions.alwaysTrue(),
+                    SnapshotRef.MAIN_BRANCH))) {
       testHarness.open();
 
       OperatorTestBase.trigger(testHarness);
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteCommitter.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteCommitter.java
index 9e8f2ec921..cdd40eb2a2 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteCommitter.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteCommitter.java
@@ -31,6 +31,7 @@ import java.util.stream.Collectors;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.DataFile;
+import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.flink.maintenance.api.Trigger;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
@@ -225,7 +226,8 @@ class TestDataFileRewriteCommitter extends OperatorTestBase 
{
             OperatorTestBase.DUMMY_TABLE_NAME,
             OperatorTestBase.DUMMY_TABLE_NAME,
             0,
-            tableLoader()));
+            tableLoader(),
+            SnapshotRef.MAIN_BRANCH));
   }
 
   private static DataFileRewriteRunner.ExecutedGroup setBatchSizeToTwo(
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
index cb1a41bb43..16d524f05c 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
@@ -33,6 +33,7 @@ import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileContent;
 import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.flink.maintenance.api.Trigger;
@@ -106,7 +107,8 @@ class TestDataFileRewritePlanner extends OperatorTestBase {
                     11,
                     1L,
                     ImmutableMap.of(MIN_INPUT_FILES, "2"),
-                    Expressions.alwaysTrue()))) {
+                    Expressions.alwaysTrue(),
+                    SnapshotRef.MAIN_BRANCH))) {
       testHarness.open();
 
       // Cause an exception
@@ -172,7 +174,8 @@ class TestDataFileRewritePlanner extends OperatorTestBase {
                     11,
                     maxRewriteBytes,
                     ImmutableMap.of(MIN_INPUT_FILES, "2"),
-                    Expressions.alwaysTrue()))) {
+                    Expressions.alwaysTrue(),
+                    SnapshotRef.MAIN_BRANCH))) {
       testHarness.open();
 
       OperatorTestBase.trigger(testHarness);
@@ -202,6 +205,44 @@ class TestDataFileRewritePlanner extends OperatorTestBase {
     assertThat(planWithMaxFileGroupCount).hasSize(3);
   }
 
+  @Test
+  void testBranch() throws Exception {
+    Table table = createTable();
+    insert(table, 1, "a");
+    insert(table, 2, "b");
+
+    String branchName = "test-branch";
+    table.manageSnapshots().createBranch(branchName).commit();
+
+    // Insert more data on main only
+    insert(table, 3, "c");
+
+    try (OneInputStreamOperatorTestHarness<Trigger, 
DataFileRewritePlanner.PlannedGroup>
+        testHarness =
+            ProcessFunctionTestHarnesses.forProcessFunction(
+                new DataFileRewritePlanner(
+                    OperatorTestBase.DUMMY_TABLE_NAME,
+                    OperatorTestBase.DUMMY_TABLE_NAME,
+                    0,
+                    tableLoader(),
+                    11,
+                    10_000_000L,
+                    ImmutableMap.of(MIN_INPUT_FILES, "2"),
+                    Expressions.alwaysTrue(),
+                    branchName))) {
+      testHarness.open();
+
+      trigger(testHarness);
+
+      
assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).isNull();
+      List<DataFileRewritePlanner.PlannedGroup> planned = 
testHarness.extractOutputValues();
+      assertThat(planned).hasSize(1);
+      // Branch has 2 files, main has 3
+      assertThat(planned.get(0).group().fileScanTasks()).hasSize(2);
+      assertThat(planned.get(0).branch()).isEqualTo(branchName);
+    }
+  }
+
   void assertRewriteFileGroup(
       DataFileRewritePlanner.PlannedGroup plannedGroup, Table table, 
Set<DataFile> files) {
     assertThat(plannedGroup.table().currentSnapshot().snapshotId())
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java
index 4e21c7a956..9202a1df92 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java
@@ -38,6 +38,7 @@ import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionData;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.data.GenericAppenderHelper;
@@ -308,7 +309,8 @@ class TestDataFileRewriteRunner extends OperatorTestBase {
                         "2",
                         TARGET_FILE_SIZE_BYTES,
                         String.valueOf(targetFileSize)),
-                    Expressions.alwaysTrue()))) {
+                    Expressions.alwaysTrue(),
+                    SnapshotRef.MAIN_BRANCH))) {
       testHarness.open();
 
       OperatorTestBase.trigger(testHarness);


Reply via email to