This is an automated email from the ASF dual-hosted git repository.

stevenwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 4bb60aa0bc873c15e4446a0960d381d24fcbe8be
Author: Gyula Fora <g_f...@apple.com>
AuthorDate: Tue May 6 14:34:00 2025 +0200

    Flink: Backport Maintenance - RewriteDataFiles to Flink 1.19
    
    Backports #11497
---
 .../flink/maintenance/api/ExpireSnapshots.java     |   2 +-
 .../maintenance/api/MaintenanceTaskBuilder.java    |   4 +-
 .../flink/maintenance/api/RewriteDataFiles.java    | 234 ++++++++++++
 .../flink/maintenance/api/TableMaintenance.java    |   7 +-
 .../operator/DataFileRewriteCommitter.java         | 199 ++++++++++
 .../operator/DataFileRewritePlanner.java           | 206 ++++++++++
 .../operator/DataFileRewriteRunner.java            | 253 +++++++++++++
 .../maintenance/operator/DeleteFilesProcessor.java |  29 +-
 .../operator/ExpireSnapshotsProcessor.java         |   2 +-
 .../flink/maintenance/operator/LockRemover.java    |  28 +-
 .../flink/maintenance/operator/LogUtil.java        |  26 ++
 .../operator/TableMaintenanceMetrics.java          |  30 ++
 .../maintenance/operator/TaskResultAggregator.java | 101 +++++
 .../flink/maintenance/operator/TriggerManager.java |  29 +-
 .../api/MaintenanceTaskInfraExtension.java         |   8 +-
 .../maintenance/api/MaintenanceTaskTestBase.java   |  39 +-
 .../flink/maintenance/api/TestExpireSnapshots.java |  43 +--
 .../flink/maintenance/api/TestMaintenanceE2E.java  |  13 +
 .../maintenance/api/TestRewriteDataFiles.java      | 417 +++++++++++++++++++++
 .../maintenance/operator/OperatorTestBase.java     | 113 ++++++
 .../flink/maintenance/operator/RewriteUtil.java    |  83 ++++
 .../operator/TestDataFileRewriteCommitter.java     | 278 ++++++++++++++
 .../operator/TestDataFileRewritePlanner.java       | 193 ++++++++++
 .../operator/TestDataFileRewriteRunner.java        | 355 ++++++++++++++++++
 .../operator/TestDeleteFilesProcessor.java         |   3 +-
 25 files changed, 2587 insertions(+), 108 deletions(-)

diff --git 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java
 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java
index 9cde5cb173..2bac5163ca 100644
--- 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java
+++ 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java
@@ -113,7 +113,7 @@ public class ExpireSnapshots {
               operatorName(DELETE_FILES_OPERATOR_NAME),
               TypeInformation.of(Void.class),
               new DeleteFilesProcessor(
-                  index(), taskName(), tableLoader().loadTable(), 
deleteBatchSize))
+                  tableLoader().loadTable(), taskName(), index(), 
deleteBatchSize))
           .uid(DELETE_FILES_OPERATOR_NAME + uidSuffix())
           .slotSharingGroup(slotSharingGroup())
           .setParallelism(parallelism());
diff --git 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java
 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java
index 3fc431d025..c884f13b3e 100644
--- 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java
+++ 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java
@@ -195,9 +195,9 @@ public abstract class MaintenanceTaskBuilder<T extends 
MaintenanceTaskBuilder<?>
 
   DataStream<TaskResult> append(
       DataStream<Trigger> sourceStream,
-      int taskIndex,
-      String newTaskName,
       String newTableName,
+      String newTaskName,
+      int taskIndex,
       TableLoader newTableLoader,
       String defaultUidSuffix,
       String defaultSlotSharingGroup,
diff --git 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
new file mode 100644
index 0000000000..b96635e6ab
--- /dev/null
+++ 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.api;
+
+import java.util.Map;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.iceberg.actions.BinPackRewriteFilePlanner;
+import org.apache.iceberg.actions.SizeBasedFileRewritePlanner;
+import org.apache.iceberg.flink.maintenance.operator.DataFileRewriteCommitter;
+import org.apache.iceberg.flink.maintenance.operator.DataFileRewritePlanner;
+import org.apache.iceberg.flink.maintenance.operator.DataFileRewriteRunner;
+import org.apache.iceberg.flink.maintenance.operator.TaskResultAggregator;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+/**
+ * Creates the data file rewriter data stream. Which runs a single iteration 
of the task for every
+ * {@link Trigger} event.
+ */
+public class RewriteDataFiles {
+  static final String PLANNER_TASK_NAME = "RDF Planner";
+  static final String REWRITE_TASK_NAME = "Rewrite";
+  static final String COMMIT_TASK_NAME = "Rewrite commit";
+  static final String AGGREGATOR_TASK_NAME = "Rewrite aggregator";
+
+  private RewriteDataFiles() {}
+
+  /** Creates the builder for a stream which rewrites data files for the 
table. */
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static class Builder extends 
MaintenanceTaskBuilder<RewriteDataFiles.Builder> {
+    private boolean partialProgressEnabled = false;
+    private int partialProgressMaxCommits = 10;
+    private final Map<String, String> rewriteOptions = 
Maps.newHashMapWithExpectedSize(6);
+    private long maxRewriteBytes = Long.MAX_VALUE;
+
+    /**
+     * Allows committing compacted data files in batches. See {@link
+     * org.apache.iceberg.actions.RewriteDataFiles#PARTIAL_PROGRESS_ENABLED} 
for more details.
+     *
+     * @param newPartialProgressEnabled to enable partial commits
+     */
+    public Builder partialProgressEnabled(boolean newPartialProgressEnabled) {
+      this.partialProgressEnabled = newPartialProgressEnabled;
+      return this;
+    }
+
+    /**
+     * Configures the size of batches if {@link #partialProgressEnabled}. See 
{@link
+     * 
org.apache.iceberg.actions.RewriteDataFiles#PARTIAL_PROGRESS_MAX_COMMITS} for 
more details.
+     *
+     * @param newPartialProgressMaxCommits to target number of the commits per 
run
+     */
+    public Builder partialProgressMaxCommits(int newPartialProgressMaxCommits) 
{
+      this.partialProgressMaxCommits = newPartialProgressMaxCommits;
+      return this;
+    }
+
+    /**
+     * Configures the maximum byte size of the rewrites for one scheduled 
compaction. This could be
+     * used to limit the resources used by the compaction.
+     *
+     * @param newMaxRewriteBytes to limit the size of the rewrites
+     */
+    public Builder maxRewriteBytes(long newMaxRewriteBytes) {
+      this.maxRewriteBytes = newMaxRewriteBytes;
+      return this;
+    }
+
+    /**
+     * Configures the target file size. See {@link
+     * org.apache.iceberg.actions.RewriteDataFiles#TARGET_FILE_SIZE_BYTES} for 
more details.
+     *
+     * @param targetFileSizeBytes target file size
+     */
+    public Builder targetFileSizeBytes(long targetFileSizeBytes) {
+      this.rewriteOptions.put(
+          SizeBasedFileRewritePlanner.TARGET_FILE_SIZE_BYTES, 
String.valueOf(targetFileSizeBytes));
+      return this;
+    }
+
+    /**
+     * Configures the min file size considered for rewriting. See {@link
+     * SizeBasedFileRewritePlanner#MIN_FILE_SIZE_BYTES} for more details.
+     *
+     * @param minFileSizeBytes min file size
+     */
+    public Builder minFileSizeBytes(long minFileSizeBytes) {
+      this.rewriteOptions.put(
+          SizeBasedFileRewritePlanner.MIN_FILE_SIZE_BYTES, 
String.valueOf(minFileSizeBytes));
+      return this;
+    }
+
+    /**
+     * Configures the max file size considered for rewriting. See {@link
+     * SizeBasedFileRewritePlanner#MAX_FILE_SIZE_BYTES} for more details.
+     *
+     * @param maxFileSizeBytes max file size
+     */
+    public Builder maxFileSizeBytes(long maxFileSizeBytes) {
+      this.rewriteOptions.put(
+          SizeBasedFileRewritePlanner.MAX_FILE_SIZE_BYTES, 
String.valueOf(maxFileSizeBytes));
+      return this;
+    }
+
+    /**
+     * Configures the minimum file number after a rewrite is always initiated. 
See description see
+     * {@link SizeBasedFileRewritePlanner#MIN_INPUT_FILES} for more details.
+     *
+     * @param minInputFiles min file number
+     */
+    public Builder minInputFiles(int minInputFiles) {
+      this.rewriteOptions.put(
+          SizeBasedFileRewritePlanner.MIN_INPUT_FILES, 
String.valueOf(minInputFiles));
+      return this;
+    }
+
+    /**
+     * Configures the minimum delete file number for a file after a rewrite is 
always initiated. See
+     * {@link BinPackRewriteFilePlanner#DELETE_FILE_THRESHOLD} for more 
details.
+     *
+     * @param deleteFileThreshold min delete file number
+     */
+    public Builder deleteFileThreshold(int deleteFileThreshold) {
+      this.rewriteOptions.put(
+          BinPackRewriteFilePlanner.DELETE_FILE_THRESHOLD, 
String.valueOf(deleteFileThreshold));
+      return this;
+    }
+
+    /**
+     * Overrides other options and forces rewriting of all provided files.
+     *
+     * @param rewriteAll enables a full rewrite
+     */
+    public Builder rewriteAll(boolean rewriteAll) {
+      this.rewriteOptions.put(SizeBasedFileRewritePlanner.REWRITE_ALL, 
String.valueOf(rewriteAll));
+      return this;
+    }
+
+    /**
+     * Configures the group size for rewriting. See {@link
+     * SizeBasedFileRewritePlanner#MAX_FILE_GROUP_SIZE_BYTES} for more details.
+     *
+     * @param maxFileGroupSizeBytes file group size for rewrite
+     */
+    public Builder maxFileGroupSizeBytes(long maxFileGroupSizeBytes) {
+      this.rewriteOptions.put(
+          SizeBasedFileRewritePlanner.MAX_FILE_GROUP_SIZE_BYTES,
+          String.valueOf(maxFileGroupSizeBytes));
+      return this;
+    }
+
+    /**
+     * The input is a {@link DataStream} with {@link Trigger} events and every 
event should be
+     * immediately followed by a {@link Watermark} with the same timestamp as 
the event.
+     *
+     * <p>The output is a {@link DataStream} with the {@link TaskResult} of 
the run followed by the
+     * {@link Watermark}.
+     */
+    @Override
+    DataStream<TaskResult> append(DataStream<Trigger> trigger) {
+      SingleOutputStreamOperator<DataFileRewritePlanner.PlannedGroup> planned =
+          trigger
+              .process(
+                  new DataFileRewritePlanner(
+                      tableName(),
+                      taskName(),
+                      index(),
+                      tableLoader(),
+                      partialProgressEnabled ? partialProgressMaxCommits : 1,
+                      maxRewriteBytes,
+                      rewriteOptions))
+              .name(operatorName(PLANNER_TASK_NAME))
+              .uid(PLANNER_TASK_NAME + uidSuffix())
+              .slotSharingGroup(slotSharingGroup())
+              .forceNonParallel();
+
+      SingleOutputStreamOperator<DataFileRewriteRunner.ExecutedGroup> 
rewritten =
+          planned
+              .rebalance()
+              .process(new DataFileRewriteRunner(tableName(), taskName(), 
index()))
+              .name(operatorName(REWRITE_TASK_NAME))
+              .uid(REWRITE_TASK_NAME + uidSuffix())
+              .slotSharingGroup(slotSharingGroup())
+              .setParallelism(parallelism());
+
+      SingleOutputStreamOperator<Trigger> updated =
+          rewritten
+              .transform(
+                  operatorName(COMMIT_TASK_NAME),
+                  TypeInformation.of(Trigger.class),
+                  new DataFileRewriteCommitter(tableName(), taskName(), 
index(), tableLoader()))
+              .uid(COMMIT_TASK_NAME + uidSuffix())
+              .slotSharingGroup(slotSharingGroup())
+              .forceNonParallel();
+
+      return trigger
+          .union(updated)
+          .connect(
+              planned
+                  .getSideOutput(TaskResultAggregator.ERROR_STREAM)
+                  .union(
+                      
rewritten.getSideOutput(TaskResultAggregator.ERROR_STREAM),
+                      
updated.getSideOutput(TaskResultAggregator.ERROR_STREAM)))
+          .transform(
+              operatorName(AGGREGATOR_TASK_NAME),
+              TypeInformation.of(TaskResult.class),
+              new TaskResultAggregator(tableName(), taskName(), index()))
+          .uid(AGGREGATOR_TASK_NAME + uidSuffix())
+          .slotSharingGroup(slotSharingGroup())
+          .forceNonParallel();
+    }
+  }
+}
diff --git 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java
 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java
index 25d0325abb..12230891ac 100644
--- 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java
+++ 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.flink.maintenance.api;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.List;
+import java.util.Locale;
 import java.util.UUID;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
@@ -257,9 +258,9 @@ public class TableMaintenance {
           DataStream<TaskResult> result =
               builder.append(
                   filtered,
-                  taskIndex,
-                  taskNames.get(taskIndex),
                   tableName,
+                  taskNames.get(taskIndex),
+                  taskIndex,
                   loader,
                   uidSuffix,
                   slotSharingGroup,
@@ -301,7 +302,7 @@ public class TableMaintenance {
 
     private static String nameFor(MaintenanceTaskBuilder<?> streamBuilder, int 
taskIndex) {
       return String.format(
-          "%s [%s]", streamBuilder.getClass().getSimpleName(), 
String.valueOf(taskIndex));
+          Locale.ROOT, "%s [%d]", streamBuilder.getClass().getSimpleName(), 
taskIndex);
     }
   }
 
diff --git 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteCommitter.java
 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteCommitter.java
new file mode 100644
index 0000000000..135d3d9b42
--- /dev/null
+++ 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteCommitter.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.IOException;
+import java.util.Set;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFilesCommitManager;
+import org.apache.iceberg.actions.RewriteDataFilesCommitManager.CommitService;
+import org.apache.iceberg.actions.RewriteFileGroup;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.maintenance.api.Trigger;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Commits the rewrite changes using {@link RewriteDataFilesCommitManager}. 
The input is a {@link
+ * DataFileRewriteRunner.ExecutedGroup}. Only {@link Watermark} is emitted 
which is chained to
+ * {@link TaskResultAggregator} input 1.
+ */
+@Internal
+public class DataFileRewriteCommitter extends AbstractStreamOperator<Trigger>
+    implements OneInputStreamOperator<DataFileRewriteRunner.ExecutedGroup, 
Trigger> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataFileRewriteCommitter.class);
+
+  private final String tableName;
+  private final String taskName;
+  private final int taskIndex;
+  private final TableLoader tableLoader;
+
+  private transient Table table;
+  private transient CommitService commitService;
+  private transient Counter errorCounter;
+  private transient Counter addedDataFileNumCounter;
+  private transient Counter addedDataFileSizeCounter;
+  private transient Counter removedDataFileNumCounter;
+  private transient Counter removedDataFileSizeCounter;
+
+  public DataFileRewriteCommitter(
+      String tableName, String taskName, int taskIndex, TableLoader 
tableLoader) {
+    Preconditions.checkNotNull(tableName, "Table name should no be null");
+    Preconditions.checkNotNull(taskName, "Task name should no be null");
+    Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
+
+    this.tableName = tableName;
+    this.taskName = taskName;
+    this.taskIndex = taskIndex;
+    this.tableLoader = tableLoader;
+  }
+
+  @Override
+  public void open() throws Exception {
+    super.open();
+
+    tableLoader.open();
+    this.table = tableLoader.loadTable();
+
+    MetricGroup taskMetricGroup =
+        TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName, 
taskName, taskIndex);
+    this.errorCounter = 
taskMetricGroup.counter(TableMaintenanceMetrics.ERROR_COUNTER);
+    this.addedDataFileNumCounter =
+        
taskMetricGroup.counter(TableMaintenanceMetrics.ADDED_DATA_FILE_NUM_METRIC);
+    this.addedDataFileSizeCounter =
+        
taskMetricGroup.counter(TableMaintenanceMetrics.ADDED_DATA_FILE_SIZE_METRIC);
+    this.removedDataFileNumCounter =
+        
taskMetricGroup.counter(TableMaintenanceMetrics.REMOVED_DATA_FILE_NUM_METRIC);
+    this.removedDataFileSizeCounter =
+        
taskMetricGroup.counter(TableMaintenanceMetrics.REMOVED_DATA_FILE_SIZE_METRIC);
+  }
+
+  @Override
+  public void processElement(StreamRecord<DataFileRewriteRunner.ExecutedGroup> 
streamRecord) {
+    DataFileRewriteRunner.ExecutedGroup executedGroup = 
streamRecord.getValue();
+    try {
+      if (commitService == null) {
+        // Refresh the table to get the latest snapshot for the committer
+        table.refresh();
+
+        FlinkRewriteDataFilesCommitManager commitManager =
+            new FlinkRewriteDataFilesCommitManager(
+                table, executedGroup.snapshotId(), 
streamRecord.getTimestamp());
+        this.commitService = 
commitManager.service(executedGroup.groupsPerCommit());
+        commitService.start();
+      }
+
+      commitService.offer(executedGroup.group());
+    } catch (Exception e) {
+      LOG.warn(
+          DataFileRewritePlanner.MESSAGE_PREFIX + "Exception processing {}",
+          tableName,
+          taskName,
+          taskIndex,
+          streamRecord.getTimestamp(),
+          executedGroup,
+          e);
+      output.collect(TaskResultAggregator.ERROR_STREAM, new StreamRecord<>(e));
+      errorCounter.inc();
+    }
+  }
+
+  @Override
+  public void processWatermark(Watermark mark) throws Exception {
+    try {
+      if (commitService != null) {
+        commitService.close();
+      }
+
+      LOG.info(
+          DataFileRewritePlanner.MESSAGE_PREFIX + "Successfully completed data 
file compaction",
+          tableName,
+          taskName,
+          taskIndex,
+          mark.getTimestamp());
+    } catch (Exception e) {
+      LOG.warn(
+          DataFileRewritePlanner.MESSAGE_PREFIX + "Exception closing commit 
service",
+          tableName,
+          taskName,
+          taskIndex,
+          mark.getTimestamp(),
+          e);
+      output.collect(TaskResultAggregator.ERROR_STREAM, new StreamRecord<>(e));
+      errorCounter.inc();
+    }
+
+    // Cleanup
+    this.commitService = null;
+
+    super.processWatermark(mark);
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (commitService != null) {
+      commitService.close();
+    }
+  }
+
+  private class FlinkRewriteDataFilesCommitManager extends 
RewriteDataFilesCommitManager {
+    private final long timestamp;
+
+    FlinkRewriteDataFilesCommitManager(Table table, long startingSnapshotId, 
long timestamp) {
+      super(table, startingSnapshotId);
+      this.timestamp = timestamp;
+    }
+
+    @Override
+    public void commitFileGroups(Set<RewriteFileGroup> fileGroups) {
+      super.commitFileGroups(fileGroups);
+      LOG.info(
+          DataFileRewritePlanner.MESSAGE_PREFIX + "Committed {}",
+          tableName,
+          taskName,
+          taskIndex,
+          timestamp,
+          fileGroups);
+      updateMetrics(fileGroups);
+    }
+
+    private void updateMetrics(Set<RewriteFileGroup> fileGroups) {
+      for (RewriteFileGroup fileGroup : fileGroups) {
+        for (DataFile added : fileGroup.addedFiles()) {
+          addedDataFileNumCounter.inc();
+          addedDataFileSizeCounter.inc(added.fileSizeInBytes());
+        }
+
+        for (DataFile rewritten : fileGroup.rewrittenFiles()) {
+          removedDataFileNumCounter.inc();
+          removedDataFileSizeCounter.inc(rewritten.fileSizeInBytes());
+        }
+      }
+    }
+  }
+}
diff --git 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java
 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java
new file mode 100644
index 0000000000..e36c9c6610
--- /dev/null
+++ 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.math.RoundingMode;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.actions.BinPackRewriteFilePlanner;
+import org.apache.iceberg.actions.FileRewritePlan;
+import org.apache.iceberg.actions.RewriteDataFiles;
+import org.apache.iceberg.actions.RewriteFileGroup;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.maintenance.api.Trigger;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.math.IntMath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Plans the rewrite groups using the {@link BinPackRewriteFilePlanner}. The 
input is the {@link
+ * Trigger}, the output is zero, or some {@link PlannedGroup}s.
+ */
+@Internal
+public class DataFileRewritePlanner
+    extends ProcessFunction<Trigger, DataFileRewritePlanner.PlannedGroup> {
+  static final String MESSAGE_PREFIX = "[For table {} with {}[{}] at {}]: ";
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataFileRewritePlanner.class);
+
+  private final String tableName;
+  private final String taskName;
+  private final int taskIndex;
+  private final TableLoader tableLoader;
+  private final int partialProgressMaxCommits;
+  private final long maxRewriteBytes;
+  private final Map<String, String> rewriterOptions;
+  private transient Counter errorCounter;
+
+  public DataFileRewritePlanner(
+      String tableName,
+      String taskName,
+      int taskIndex,
+      TableLoader tableLoader,
+      int newPartialProgressMaxCommits,
+      long maxRewriteBytes,
+      Map<String, String> rewriterOptions) {
+    Preconditions.checkNotNull(tableName, "Table name should no be null");
+    Preconditions.checkNotNull(taskName, "Task name should no be null");
+    Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
+    Preconditions.checkNotNull(rewriterOptions, "Options map should no be 
null");
+
+    this.tableName = tableName;
+    this.taskName = taskName;
+    this.taskIndex = taskIndex;
+    this.tableLoader = tableLoader;
+    this.partialProgressMaxCommits = newPartialProgressMaxCommits;
+    this.maxRewriteBytes = maxRewriteBytes;
+    this.rewriterOptions = rewriterOptions;
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    tableLoader.open();
+    this.errorCounter =
+        TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName, 
taskName, taskIndex)
+            .counter(TableMaintenanceMetrics.ERROR_COUNTER);
+  }
+
+  @Override
+  public void processElement(Trigger value, Context ctx, 
Collector<PlannedGroup> out)
+      throws Exception {
+    LOG.info(
+        DataFileRewritePlanner.MESSAGE_PREFIX + "Creating rewrite plan",
+        tableName,
+        taskName,
+        taskIndex,
+        ctx.timestamp());
+    try {
+      SerializableTable table =
+          (SerializableTable) 
SerializableTable.copyOf(tableLoader.loadTable());
+      if (table.currentSnapshot() == null) {
+        LOG.info(
+            DataFileRewritePlanner.MESSAGE_PREFIX + "Nothing to plan for in an 
empty table",
+            tableName,
+            taskName,
+            taskIndex,
+            ctx.timestamp());
+        return;
+      }
+
+      BinPackRewriteFilePlanner planner = new BinPackRewriteFilePlanner(table);
+      planner.init(rewriterOptions);
+
+      FileRewritePlan<RewriteDataFiles.FileGroupInfo, FileScanTask, DataFile, 
RewriteFileGroup>
+          plan = planner.plan();
+
+      long rewriteBytes = 0;
+      List<RewriteFileGroup> groups = Lists.newArrayList();
+      for (CloseableIterator<RewriteFileGroup> groupIterator = 
plan.groups().iterator();
+          groupIterator.hasNext(); ) {
+        RewriteFileGroup group = groupIterator.next();
+        if (rewriteBytes + group.inputFilesSizeInBytes() > maxRewriteBytes) {
+          // Keep going, maybe some other group might fit in
+          LOG.info(
+              DataFileRewritePlanner.MESSAGE_PREFIX
+                  + "Skipping group as max rewrite size reached {}",
+              tableName,
+              taskName,
+              taskIndex,
+              ctx.timestamp(),
+              group);
+        } else {
+          rewriteBytes += group.inputFilesSizeInBytes();
+          groups.add(group);
+        }
+      }
+
+      int groupsPerCommit =
+          IntMath.divide(groups.size(), partialProgressMaxCommits, 
RoundingMode.CEILING);
+
+      LOG.info(
+          DataFileRewritePlanner.MESSAGE_PREFIX + "Rewrite plan created {}",
+          tableName,
+          taskName,
+          taskIndex,
+          ctx.timestamp(),
+          groups);
+
+      for (RewriteFileGroup group : groups) {
+        LOG.info(
+            DataFileRewritePlanner.MESSAGE_PREFIX + "Emitting {}",
+            tableName,
+            taskName,
+            taskIndex,
+            ctx.timestamp(),
+            group);
+        out.collect(new PlannedGroup(table, groupsPerCommit, group));
+      }
+    } catch (Exception e) {
+      LOG.warn(
+          DataFileRewritePlanner.MESSAGE_PREFIX + "Failed to plan data file 
rewrite groups",
+          tableName,
+          taskName,
+          taskIndex,
+          ctx.timestamp(),
+          e);
+      ctx.output(TaskResultAggregator.ERROR_STREAM, e);
+      errorCounter.inc();
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    super.close();
+    tableLoader.close();
+  }
+
+  public static class PlannedGroup {
+    private final SerializableTable table;
+    private final int groupsPerCommit;
+    private final RewriteFileGroup group;
+
+    private PlannedGroup(SerializableTable table, int groupsPerCommit, 
RewriteFileGroup group) {
+      this.table = table;
+      this.groupsPerCommit = groupsPerCommit;
+      this.group = group;
+    }
+
+    SerializableTable table() {
+      return table;
+    }
+
+    int groupsPerCommit() {
+      return groupsPerCommit;
+    }
+
+    RewriteFileGroup group() {
+      return group;
+    }
+  }
+}
diff --git 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java
 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java
new file mode 100644
index 0000000000..c03b5cc1c8
--- /dev/null
+++ 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
+
+import java.util.Collections;
+import java.util.Set;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.BaseCombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.actions.RewriteFileGroup;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import 
org.apache.iceberg.flink.maintenance.operator.DataFileRewritePlanner.PlannedGroup;
+import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory;
+import org.apache.iceberg.flink.sink.TaskWriterFactory;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.FileScanTaskReader;
+import org.apache.iceberg.flink.source.RowDataFileScanTaskReader;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Executes a rewrite for a single {@link PlannedGroup}. Reads the files with 
the standard {@link
+ * FileScanTaskReader}, so the delete files are considered, and writes using 
the {@link
+ * TaskWriterFactory}. The output is an {@link ExecutedGroup}.
+ */
+@Internal
+public class DataFileRewriteRunner
+    extends ProcessFunction<PlannedGroup, DataFileRewriteRunner.ExecutedGroup> 
{
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataFileRewriteRunner.class);
+
+  private final String tableName;
+  private final String taskName;
+  private final int taskIndex;
+
+  private transient int subTaskId;
+  private transient int attemptId;
+  private transient Counter errorCounter;
+
+  public DataFileRewriteRunner(String tableName, String taskName, int 
taskIndex) {
+    Preconditions.checkNotNull(tableName, "Table name should no be null");
+    Preconditions.checkNotNull(taskName, "Task name should no be null");
+    this.tableName = tableName;
+    this.taskName = taskName;
+    this.taskIndex = taskIndex;
+  }
+
+  @Override
+  public void open(Configuration parameters) {
+    this.errorCounter =
+        TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName, 
taskName, taskIndex)
+            .counter(TableMaintenanceMetrics.ERROR_COUNTER);
+
+    this.subTaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
+    this.attemptId = getRuntimeContext().getTaskInfo().getAttemptNumber();
+  }
+
+  @Override
+  public void processElement(PlannedGroup value, Context ctx, 
Collector<ExecutedGroup> out)
+      throws Exception {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(
+          DataFileRewritePlanner.MESSAGE_PREFIX + "Rewriting files for group 
{} with files: {}",
+          tableName,
+          taskName,
+          taskIndex,
+          ctx.timestamp(),
+          value.group().info(),
+          value.group().rewrittenFiles());
+    } else {
+      LOG.info(
+          DataFileRewritePlanner.MESSAGE_PREFIX
+              + "Rewriting files for group {} with {} number of files",
+          tableName,
+          taskName,
+          taskIndex,
+          ctx.timestamp(),
+          value.group().info(),
+          value.group().rewrittenFiles().size());
+    }
+
+    try (TaskWriter<RowData> writer = writerFor(value)) {
+      try (DataIterator<RowData> iterator = readerFor(value)) {
+        while (iterator.hasNext()) {
+          writer.write(iterator.next());
+        }
+
+        Set<DataFile> dataFiles = Sets.newHashSet(writer.dataFiles());
+        value.group().setOutputFiles(dataFiles);
+        out.collect(
+            new ExecutedGroup(
+                value.table().currentSnapshot().snapshotId(),
+                value.groupsPerCommit(),
+                value.group()));
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(
+              DataFileRewritePlanner.MESSAGE_PREFIX + "Rewritten files {} from 
{} to {}",
+              tableName,
+              taskName,
+              taskIndex,
+              ctx.timestamp(),
+              value.group().info(),
+              value.group().rewrittenFiles(),
+              value.group().addedFiles());
+        } else {
+          LOG.info(
+              DataFileRewritePlanner.MESSAGE_PREFIX + "Rewritten {} files to 
{} files",
+              tableName,
+              taskName,
+              taskIndex,
+              ctx.timestamp(),
+              value.group().rewrittenFiles().size(),
+              value.group().addedFiles().size());
+        }
+      } catch (Exception ex) {
+        LOG.info(
+            DataFileRewritePlanner.MESSAGE_PREFIX + "Exception rewriting 
datafile group {}",
+            tableName,
+            taskName,
+            taskIndex,
+            ctx.timestamp(),
+            value.group(),
+            ex);
+        ctx.output(TaskResultAggregator.ERROR_STREAM, ex);
+        errorCounter.inc();
+        abort(writer, ctx.timestamp());
+      }
+    } catch (Exception ex) {
+      LOG.info(
+          DataFileRewritePlanner.MESSAGE_PREFIX
+              + "Exception creating compaction writer for group {}",
+          tableName,
+          taskName,
+          taskIndex,
+          ctx.timestamp(),
+          value.group(),
+          ex);
+      ctx.output(TaskResultAggregator.ERROR_STREAM, ex);
+      errorCounter.inc();
+    }
+  }
+
+  private TaskWriter<RowData> writerFor(PlannedGroup value) {
+    String formatString =
+        PropertyUtil.propertyAsString(
+            value.table().properties(),
+            TableProperties.DEFAULT_FILE_FORMAT,
+            TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
+    RowDataTaskWriterFactory factory =
+        new RowDataTaskWriterFactory(
+            value.table(),
+            FlinkSchemaUtil.convert(value.table().schema()),
+            value.group().inputSplitSize(),
+            FileFormat.fromString(formatString),
+            value.table().properties(),
+            null,
+            false);
+    factory.initialize(subTaskId, attemptId);
+    return factory.create();
+  }
+
+  private DataIterator<RowData> readerFor(PlannedGroup value) {
+    RowDataFileScanTaskReader reader =
+        new RowDataFileScanTaskReader(
+            value.table().schema(),
+            value.table().schema(),
+            PropertyUtil.propertyAsString(value.table().properties(), 
DEFAULT_NAME_MAPPING, null),
+            false,
+            Collections.emptyList());
+    return new DataIterator<>(
+        reader,
+        new BaseCombinedScanTask(value.group().fileScanTasks()),
+        value.table().io(),
+        value.table().encryption());
+  }
+
+  private void abort(TaskWriter<RowData> writer, long timestamp) {
+    try {
+      LOG.info(
+          DataFileRewritePlanner.MESSAGE_PREFIX
+              + "Aborting rewrite for (subTaskId {}, attemptId {})",
+          tableName,
+          taskName,
+          taskIndex,
+          timestamp,
+          subTaskId,
+          attemptId);
+      writer.abort();
+    } catch (Exception e) {
+      LOG.info(
+          DataFileRewritePlanner.MESSAGE_PREFIX + "Exception in abort",
+          tableName,
+          taskName,
+          taskIndex,
+          timestamp,
+          e);
+    }
+  }
+
+  public static class ExecutedGroup {
+    private final long snapshotId;
+    private final int groupsPerCommit;
+    private final RewriteFileGroup group;
+
+    @VisibleForTesting
+    ExecutedGroup(long snapshotId, int groupsPerCommit, RewriteFileGroup 
group) {
+      this.snapshotId = snapshotId;
+      this.groupsPerCommit = groupsPerCommit;
+      this.group = group;
+    }
+
+    long snapshotId() {
+      return snapshotId;
+    }
+
+    int groupsPerCommit() {
+      return groupsPerCommit;
+    }
+
+    RewriteFileGroup group() {
+      return group;
+    }
+  }
+}
diff --git 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DeleteFilesProcessor.java
 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DeleteFilesProcessor.java
index dc7846c4c4..9189f5f018 100644
--- 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DeleteFilesProcessor.java
+++ 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DeleteFilesProcessor.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.flink.maintenance.operator;
 import java.util.Set;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -40,17 +41,17 @@ public class DeleteFilesProcessor extends 
AbstractStreamOperator<Void>
     implements OneInputStreamOperator<String, Void> {
   private static final Logger LOG = 
LoggerFactory.getLogger(DeleteFilesProcessor.class);
 
-  private final String taskIndex;
+  private final String tableName;
   private final String taskName;
+  private final int taskIndex;
   private final SupportsBulkOperations io;
-  private final String tableName;
   private final Set<String> filesToDelete = Sets.newHashSet();
   private final int batchSize;
 
   private transient Counter failedCounter;
   private transient Counter succeededCounter;
 
-  public DeleteFilesProcessor(int taskIndex, String taskName, Table table, int 
batchSize) {
+  public DeleteFilesProcessor(Table table, String taskName, int taskIndex, int 
batchSize) {
     Preconditions.checkNotNull(taskName, "Task name should no be null");
     Preconditions.checkNotNull(table, "Table should no be null");
 
@@ -60,31 +61,21 @@ public class DeleteFilesProcessor extends 
AbstractStreamOperator<Void>
         "%s doesn't support bulk delete",
         fileIO.getClass().getSimpleName());
 
-    this.taskIndex = String.valueOf(taskIndex);
+    this.tableName = table.name();
     this.taskName = taskName;
+    this.taskIndex = taskIndex;
     this.io = (SupportsBulkOperations) fileIO;
-    this.tableName = table.name();
     this.batchSize = batchSize;
   }
 
   @Override
   public void open() throws Exception {
+    MetricGroup taskMetricGroup =
+        TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName, 
taskName, taskIndex);
     this.failedCounter =
-        getRuntimeContext()
-            .getMetricGroup()
-            .addGroup(TableMaintenanceMetrics.GROUP_KEY)
-            .addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, tableName)
-            .addGroup(TableMaintenanceMetrics.TASK_NAME_KEY, taskName)
-            .addGroup(TableMaintenanceMetrics.TASK_INDEX_KEY, taskIndex)
-            .counter(TableMaintenanceMetrics.DELETE_FILE_FAILED_COUNTER);
+        
taskMetricGroup.counter(TableMaintenanceMetrics.DELETE_FILE_FAILED_COUNTER);
     this.succeededCounter =
-        getRuntimeContext()
-            .getMetricGroup()
-            .addGroup(TableMaintenanceMetrics.GROUP_KEY)
-            .addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, tableName)
-            .addGroup(TableMaintenanceMetrics.TASK_NAME_KEY, taskName)
-            .addGroup(TableMaintenanceMetrics.TASK_INDEX_KEY, taskIndex)
-            .counter(TableMaintenanceMetrics.DELETE_FILE_SUCCEEDED_COUNTER);
+        
taskMetricGroup.counter(TableMaintenanceMetrics.DELETE_FILE_SUCCEEDED_COUNTER);
   }
 
   @Override
diff --git 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java
 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java
index a09d0244e9..098d32e3b6 100644
--- 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java
+++ 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java
@@ -74,7 +74,7 @@ public class ExpireSnapshotsProcessor extends 
ProcessFunction<Trigger, TaskResul
     this.table = tableLoader.loadTable();
     this.plannerPool =
         plannerPoolSize != null
-            ? ThreadPools.newWorkerPool(table.name() + "-table--planner", 
plannerPoolSize)
+            ? ThreadPools.newFixedThreadPool(table.name() + "-table--planner", 
plannerPoolSize)
             : ThreadPools.getWorkerPool();
   }
 
diff --git 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRemover.java
 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRemover.java
index 49fa69b487..2066ca8e01 100644
--- 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRemover.java
+++ 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRemover.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -98,31 +99,16 @@ public class LockRemover extends 
AbstractStreamOperator<Void>
     this.failedTaskResultCounters = 
Lists.newArrayListWithExpectedSize(maintenanceTaskNames.size());
     this.taskLastRunDurationMs = 
Lists.newArrayListWithExpectedSize(maintenanceTaskNames.size());
     for (int taskIndex = 0; taskIndex < maintenanceTaskNames.size(); 
++taskIndex) {
+      MetricGroup taskMetricGroup =
+          TableMaintenanceMetrics.groupFor(
+              getRuntimeContext(), tableName, 
maintenanceTaskNames.get(taskIndex), taskIndex);
       succeededTaskResultCounters.add(
-          getRuntimeContext()
-              .getMetricGroup()
-              .addGroup(TableMaintenanceMetrics.GROUP_KEY)
-              .addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, tableName)
-              .addGroup(TableMaintenanceMetrics.TASK_NAME_KEY, 
maintenanceTaskNames.get(taskIndex))
-              .addGroup(TableMaintenanceMetrics.TASK_INDEX_KEY, 
String.valueOf(taskIndex))
-              .counter(TableMaintenanceMetrics.SUCCEEDED_TASK_COUNTER));
+          
taskMetricGroup.counter(TableMaintenanceMetrics.SUCCEEDED_TASK_COUNTER));
       failedTaskResultCounters.add(
-          getRuntimeContext()
-              .getMetricGroup()
-              .addGroup(TableMaintenanceMetrics.GROUP_KEY)
-              .addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, tableName)
-              .addGroup(TableMaintenanceMetrics.TASK_NAME_KEY, 
maintenanceTaskNames.get(taskIndex))
-              .addGroup(TableMaintenanceMetrics.TASK_INDEX_KEY, 
String.valueOf(taskIndex))
-              .counter(TableMaintenanceMetrics.FAILED_TASK_COUNTER));
+          
taskMetricGroup.counter(TableMaintenanceMetrics.FAILED_TASK_COUNTER));
       AtomicLong duration = new AtomicLong(0);
       taskLastRunDurationMs.add(duration);
-      getRuntimeContext()
-          .getMetricGroup()
-          .addGroup(TableMaintenanceMetrics.GROUP_KEY)
-          .addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, tableName)
-          .addGroup(TableMaintenanceMetrics.TASK_NAME_KEY, 
maintenanceTaskNames.get(taskIndex))
-          .addGroup(TableMaintenanceMetrics.TASK_INDEX_KEY, 
String.valueOf(taskIndex))
-          .gauge(TableMaintenanceMetrics.LAST_RUN_DURATION_MS, duration::get);
+      taskMetricGroup.gauge(TableMaintenanceMetrics.LAST_RUN_DURATION_MS, 
duration::get);
     }
 
     lockFactory.open();
diff --git 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LogUtil.java
 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LogUtil.java
new file mode 100644
index 0000000000..8bdcd7ba2b
--- /dev/null
+++ 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LogUtil.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+class LogUtil {
+  static final String MESSAGE_PREFIX = "[For table {} with {}[{}] at {}]: ";
+  static final String MESSAGE_FORMAT_PREFIX = "[For table %s with {%s}[{%d}] 
at {%d}]: ";
+
+  private LogUtil() {}
+}
diff --git 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableMaintenanceMetrics.java
 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableMaintenanceMetrics.java
index 6147c3a5fd..897760caaa 100644
--- 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableMaintenanceMetrics.java
+++ 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableMaintenanceMetrics.java
@@ -18,12 +18,18 @@
  */
 package org.apache.iceberg.flink.maintenance.operator;
 
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.metrics.MetricGroup;
+
 public class TableMaintenanceMetrics {
   public static final String GROUP_KEY = "maintenance";
   public static final String TASK_NAME_KEY = "taskName";
   public static final String TASK_INDEX_KEY = "taskIndex";
   public static final String TABLE_NAME_KEY = "tableName";
 
+  // Operator error counter
+  public static final String ERROR_COUNTER = "error";
+
   // TriggerManager metrics
   public static final String RATE_LIMITER_TRIGGERED = "rateLimiterTriggered";
   public static final String CONCURRENT_RUN_THROTTLED = 
"concurrentRunThrottled";
@@ -39,6 +45,30 @@ public class TableMaintenanceMetrics {
   public static final String DELETE_FILE_FAILED_COUNTER = "deleteFailed";
   public static final String DELETE_FILE_SUCCEEDED_COUNTER = "deleteSucceeded";
 
+  // DataFileUpdater metrics
+  public static final String ADDED_DATA_FILE_NUM_METRIC = "addedDataFileNum";
+  public static final String ADDED_DATA_FILE_SIZE_METRIC = "addedDataFileSize";
+  public static final String REMOVED_DATA_FILE_NUM_METRIC = 
"removedDataFileNum";
+  public static final String REMOVED_DATA_FILE_SIZE_METRIC = 
"removedDataFileSize";
+
+  static MetricGroup groupFor(
+      RuntimeContext context, String tableName, String taskName, int 
taskIndex) {
+    return groupFor(groupFor(context, tableName), taskName, taskIndex);
+  }
+
+  static MetricGroup groupFor(RuntimeContext context, String tableName) {
+    return context
+        .getMetricGroup()
+        .addGroup(TableMaintenanceMetrics.GROUP_KEY)
+        .addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, tableName);
+  }
+
+  static MetricGroup groupFor(MetricGroup mainGroup, String taskName, int 
taskIndex) {
+    return mainGroup
+        .addGroup(TableMaintenanceMetrics.TASK_NAME_KEY, taskName)
+        .addGroup(TableMaintenanceMetrics.TASK_INDEX_KEY, 
String.valueOf(taskIndex));
+  }
+
   private TableMaintenanceMetrics() {
     // do not instantiate
   }
diff --git 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TaskResultAggregator.java
 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TaskResultAggregator.java
new file mode 100644
index 0000000000..cceb043a26
--- /dev/null
+++ 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TaskResultAggregator.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.util.List;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
+import org.apache.iceberg.flink.maintenance.api.TaskResult;
+import org.apache.iceberg.flink.maintenance.api.Trigger;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Aggregates results of the operators for a given maintenance task.
+ *
+ * <ul>
+ *   <li>Input 1 is used:
+ *       <ul>
+ *         <li>To provide the {@link TaskResult#startEpoch()} - should be 
chained to the task input
+ *         <li>To mark that the task is finished - should be chained at the 
end of the task, so an
+ *             incoming watermark will signal that the task is finished
+ *       </ul>
+ *   <li>Input 2 expects an {@link Exception} which caused the failure - 
should be chained to the
+ *       {@link #ERROR_STREAM} of the operators
+ * </ul>
+ *
+ * The operator emits a {@link TaskResult} with the overall result on {@link 
Watermark}.
+ */
+@Internal
+public class TaskResultAggregator extends AbstractStreamOperator<TaskResult>
+    implements TwoInputStreamOperator<Trigger, Exception, TaskResult> {
+  public static final OutputTag<Exception> ERROR_STREAM =
+      new OutputTag<>("error-stream", TypeInformation.of(Exception.class));
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TaskResultAggregator.class);
+
+  private final String tableName;
+  private final String taskName;
+  private final int taskIndex;
+  private final List<Exception> exceptions;
+  private transient Long startTime;
+
+  public TaskResultAggregator(String tableName, String taskName, int 
taskIndex) {
+    Preconditions.checkNotNull(tableName, "Table name should no be null");
+    Preconditions.checkNotNull(taskName, "Task name should no be null");
+
+    this.tableName = tableName;
+    this.taskName = taskName;
+    this.taskIndex = taskIndex;
+    this.exceptions = Lists.newArrayList();
+    this.startTime = 0L;
+  }
+
+  @Override
+  public void processElement1(StreamRecord<Trigger> streamRecord) {
+    startTime = streamRecord.getValue().timestamp();
+  }
+
+  @Override
+  public void processElement2(StreamRecord<Exception> streamRecord) {
+    Preconditions.checkNotNull(streamRecord.getValue(), "Exception could not 
be `null`.");
+    exceptions.add(streamRecord.getValue());
+  }
+
+  @Override
+  public void processWatermark(Watermark mark) {
+    TaskResult response = new TaskResult(taskIndex, startTime, 
exceptions.isEmpty(), exceptions);
+    output.collect(new StreamRecord<>(response));
+    LOG.info(
+        "Aggregated result for table {}, task {}[{}] is {}",
+        tableName,
+        taskName,
+        taskIndex,
+        response);
+    exceptions.clear();
+    startTime = 0L;
+  }
+}
diff --git 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java
 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java
index d6d1ebe397..bd8424d726 100644
--- 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java
+++ 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.TimerService;
@@ -121,33 +122,17 @@ public class TriggerManager extends 
KeyedProcessFunction<Boolean, TableChange, T
 
   @Override
   public void open(Configuration parameters) throws Exception {
+    MetricGroup mainGroup = 
TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName);
     this.rateLimiterTriggeredCounter =
-        getRuntimeContext()
-            .getMetricGroup()
-            .addGroup(TableMaintenanceMetrics.GROUP_KEY)
-            .addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, tableName)
-            .counter(TableMaintenanceMetrics.RATE_LIMITER_TRIGGERED);
+        mainGroup.counter(TableMaintenanceMetrics.RATE_LIMITER_TRIGGERED);
     this.concurrentRunThrottledCounter =
-        getRuntimeContext()
-            .getMetricGroup()
-            .addGroup(TableMaintenanceMetrics.GROUP_KEY)
-            .addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, tableName)
-            .counter(TableMaintenanceMetrics.CONCURRENT_RUN_THROTTLED);
-    this.nothingToTriggerCounter =
-        getRuntimeContext()
-            .getMetricGroup()
-            .addGroup(TableMaintenanceMetrics.GROUP_KEY)
-            .addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, tableName)
-            .counter(TableMaintenanceMetrics.NOTHING_TO_TRIGGER);
+        mainGroup.counter(TableMaintenanceMetrics.CONCURRENT_RUN_THROTTLED);
+    this.nothingToTriggerCounter = 
mainGroup.counter(TableMaintenanceMetrics.NOTHING_TO_TRIGGER);
     this.triggerCounters = 
Lists.newArrayListWithExpectedSize(maintenanceTaskNames.size());
     for (int taskIndex = 0; taskIndex < maintenanceTaskNames.size(); 
++taskIndex) {
       triggerCounters.add(
-          getRuntimeContext()
-              .getMetricGroup()
-              .addGroup(TableMaintenanceMetrics.GROUP_KEY)
-              .addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, tableName)
-              .addGroup(TableMaintenanceMetrics.TASK_NAME_KEY, 
maintenanceTaskNames.get(taskIndex))
-              .addGroup(TableMaintenanceMetrics.TASK_INDEX_KEY, 
String.valueOf(taskIndex))
+          TableMaintenanceMetrics.groupFor(
+                  mainGroup, maintenanceTaskNames.get(taskIndex), taskIndex)
               .counter(TableMaintenanceMetrics.TRIGGERED));
     }
 
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskInfraExtension.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskInfraExtension.java
index 1b69be1fa3..10efb9120c 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskInfraExtension.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskInfraExtension.java
@@ -48,16 +48,16 @@ class MaintenanceTaskInfraExtension implements 
BeforeEachCallback {
 
   @Override
   public void beforeEach(ExtensionContext context) {
-    env = StreamExecutionEnvironment.getExecutionEnvironment();
-    source = new ManualSource<>(env, TypeInformation.of(Trigger.class));
+    this.env = StreamExecutionEnvironment.getExecutionEnvironment();
+    this.source = new ManualSource<>(env, TypeInformation.of(Trigger.class));
     // Adds the watermark to mimic the behaviour expected for the input of the 
maintenance tasks
-    triggerStream =
+    this.triggerStream =
         source
             .dataStream()
             .assignTimestampsAndWatermarks(new 
TableMaintenance.PunctuatedWatermarkStrategy())
             .name(IGNORED_OPERATOR_NAME)
             .forceNonParallel();
-    sink = new CollectingSink<>();
+    this.sink = new CollectingSink<>();
   }
 
   StreamExecutionEnvironment env() {
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskTestBase.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskTestBase.java
index 36041d9c38..6d3a10c4b1 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskTestBase.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskTestBase.java
@@ -37,16 +37,42 @@ class MaintenanceTaskTestBase extends OperatorTestBase {
   @RegisterExtension MaintenanceTaskInfraExtension infra = new 
MaintenanceTaskInfraExtension();
 
   void runAndWaitForSuccess(
+      StreamExecutionEnvironment env,
+      ManualSource<Trigger> triggerSource,
+      CollectingSink<TaskResult> collectingSink)
+      throws Exception {
+    runAndWaitForResult(env, triggerSource, collectingSink, false, () -> true);
+  }
+
+  void runAndWaitForSuccess(
+      StreamExecutionEnvironment env,
+      ManualSource<Trigger> triggerSource,
+      CollectingSink<TaskResult> collectingSink,
+      Supplier<Boolean> waitForCondition)
+      throws Exception {
+    runAndWaitForResult(env, triggerSource, collectingSink, false, 
waitForCondition);
+  }
+
+  void runAndWaitForFailure(
+      StreamExecutionEnvironment env,
+      ManualSource<Trigger> triggerSource,
+      CollectingSink<TaskResult> collectingSink)
+      throws Exception {
+    runAndWaitForResult(env, triggerSource, collectingSink, true, () -> true);
+  }
+
+  void runAndWaitForResult(
       StreamExecutionEnvironment env,
       ManualSource<Trigger> triggerSource,
       CollectingSink<TaskResult> collectingSink,
+      boolean generateFailure,
       Supplier<Boolean> waitForCondition)
       throws Exception {
     JobClient jobClient = null;
     try {
       jobClient = env.executeAsync();
 
-      // Do a single task run
+      // Do a single successful task run
       long time = System.currentTimeMillis();
       triggerSource.sendRecord(Trigger.create(time, TESTING_TASK_ID), time);
 
@@ -56,6 +82,17 @@ class MaintenanceTaskTestBase extends OperatorTestBase {
       assertThat(result.success()).isTrue();
       assertThat(result.taskIndex()).isEqualTo(TESTING_TASK_ID);
 
+      if (generateFailure) {
+        dropTable();
+        time = System.currentTimeMillis();
+        triggerSource.sendRecord(Trigger.create(time, TESTING_TASK_ID), time);
+        result = collectingSink.poll(POLL_DURATION);
+
+        assertThat(result.startEpoch()).isEqualTo(time);
+        assertThat(result.success()).isFalse();
+        assertThat(result.taskIndex()).isEqualTo(TESTING_TASK_ID);
+      }
+
       Awaitility.await().until(waitForCondition::get);
     } finally {
       closeJobClient(jobClient);
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshots.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshots.java
index f80129f966..b8aa259e2f 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshots.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshots.java
@@ -27,7 +27,6 @@ import static org.assertj.core.api.Assertions.assertThat;
 import java.time.Duration;
 import java.util.List;
 import java.util.Set;
-import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
@@ -69,9 +68,9 @@ class TestExpireSnapshots extends MaintenanceTaskTestBase {
         .uidSuffix(UID_SUFFIX)
         .append(
             infra.triggerStream(),
-            0,
-            DUMMY_TASK_NAME,
             DUMMY_TABLE_NAME,
+            DUMMY_TASK_NAME,
+            0,
             tableLoader(),
             "OTHER",
             StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP,
@@ -101,36 +100,16 @@ class TestExpireSnapshots extends MaintenanceTaskTestBase 
{
     ExpireSnapshots.builder()
         .append(
             infra.triggerStream(),
-            0,
-            DUMMY_TASK_NAME,
             DUMMY_TABLE_NAME,
+            DUMMY_TASK_NAME,
+            0,
             tableLoader(),
             UID_SUFFIX,
             StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP,
             1)
         .sinkTo(infra.sink());
 
-    JobClient jobClient = null;
-    try {
-      jobClient = infra.env().executeAsync();
-
-      // Do a single task run
-      long time = System.currentTimeMillis();
-      infra.source().sendRecord(Trigger.create(time, 1), time);
-
-      // First successful run (ensure that the operators are loaded/opened 
etc.)
-      assertThat(infra.sink().poll(Duration.ofSeconds(5)).success()).isTrue();
-
-      // Drop the table, so it will cause an exception
-      dropTable();
-
-      // Failed run
-      infra.source().sendRecord(Trigger.create(time + 1, 1), time + 1);
-
-      assertThat(infra.sink().poll(Duration.ofSeconds(5)).success()).isFalse();
-    } finally {
-      closeJobClient(jobClient);
-    }
+    runAndWaitForFailure(infra.env(), infra.source(), infra.sink());
 
     // Check the metrics. There are no expired snapshots or data files because 
ExpireSnapshots has
     // no max age of number of snapshots set, so no files are removed.
@@ -162,9 +141,9 @@ class TestExpireSnapshots extends MaintenanceTaskTestBase {
         .uidSuffix(UID_SUFFIX)
         .append(
             infra.triggerStream(),
-            0,
-            DUMMY_TASK_NAME,
             DUMMY_TABLE_NAME,
+            DUMMY_TASK_NAME,
+            0,
             tableLoader(),
             UID_SUFFIX,
             StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP,
@@ -180,9 +159,9 @@ class TestExpireSnapshots extends MaintenanceTaskTestBase {
     ExpireSnapshots.builder()
         .append(
             infra.triggerStream(),
-            0,
-            DUMMY_TASK_NAME,
             DUMMY_TABLE_NAME,
+            DUMMY_TASK_NAME,
+            0,
             tableLoader(),
             UID_SUFFIX,
             StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP,
@@ -204,9 +183,9 @@ class TestExpireSnapshots extends MaintenanceTaskTestBase {
         .parallelism(1)
         .append(
             infra.triggerStream(),
-            0,
-            DUMMY_TASK_NAME,
             DUMMY_TABLE_NAME,
+            DUMMY_TASK_NAME,
+            0,
             tableLoader(),
             UID_SUFFIX,
             StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP,
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestMaintenanceE2E.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestMaintenanceE2E.java
index 467ad2d8ce..0a860fec47 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestMaintenanceE2E.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestMaintenanceE2E.java
@@ -52,6 +52,19 @@ class TestMaintenanceE2E extends OperatorTestBase {
                 .retainLast(5)
                 .deleteBatchSize(5)
                 .parallelism(8))
+        .add(
+            RewriteDataFiles.builder()
+                .scheduleOnDataFileCount(10)
+                .partialProgressEnabled(true)
+                .partialProgressMaxCommits(10)
+                .maxRewriteBytes(1000L)
+                .targetFileSizeBytes(1000L)
+                .minFileSizeBytes(1000L)
+                .maxFileSizeBytes(1000L)
+                .minInputFiles(10)
+                .deleteFileThreshold(10)
+                .rewriteAll(false)
+                .maxFileGroupSizeBytes(1000L))
         .append();
 
     JobClient jobClient = null;
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
new file mode 100644
index 0000000000..618647259b
--- /dev/null
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.api;
+
+import static org.apache.iceberg.flink.SimpleDataUtil.createRecord;
+import static 
org.apache.iceberg.flink.maintenance.api.RewriteDataFiles.COMMIT_TASK_NAME;
+import static 
org.apache.iceberg.flink.maintenance.api.RewriteDataFiles.PLANNER_TASK_NAME;
+import static 
org.apache.iceberg.flink.maintenance.api.RewriteDataFiles.REWRITE_TASK_NAME;
+import static 
org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.ADDED_DATA_FILE_NUM_METRIC;
+import static 
org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.ADDED_DATA_FILE_SIZE_METRIC;
+import static 
org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.ERROR_COUNTER;
+import static 
org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.REMOVED_DATA_FILE_NUM_METRIC;
+import static 
org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.REMOVED_DATA_FILE_SIZE_METRIC;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.List;
+import java.util.stream.StreamSupport;
+import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import 
org.apache.iceberg.flink.maintenance.operator.MetricsReporterFactoryForTests;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.junit.jupiter.api.Test;
+
+class TestRewriteDataFiles extends MaintenanceTaskTestBase {
+  @Test
+  void testRewriteUnpartitioned() throws Exception {
+    Table table = createTable();
+    insert(table, 1, "a");
+    insert(table, 2, "b");
+    insert(table, 3, "c");
+    insert(table, 4, "d");
+
+    assertFileNum(table, 4, 0);
+
+    appendRewriteDataFiles(
+        RewriteDataFiles.builder()
+            .parallelism(2)
+            .deleteFileThreshold(10)
+            .targetFileSizeBytes(1_000_000L)
+            .maxFileGroupSizeBytes(10_000_000L)
+            .maxFileSizeBytes(2_000_000L)
+            .minFileSizeBytes(500_000L)
+            .minInputFiles(2)
+            .partialProgressEnabled(true)
+            .partialProgressMaxCommits(1)
+            .maxRewriteBytes(100_000L)
+            .rewriteAll(false));
+
+    runAndWaitForSuccess(infra.env(), infra.source(), infra.sink());
+
+    assertFileNum(table, 1, 0);
+
+    SimpleDataUtil.assertTableRecords(
+        table,
+        ImmutableList.of(
+            createRecord(1, "a"),
+            createRecord(2, "b"),
+            createRecord(3, "c"),
+            createRecord(4, "d")));
+  }
+
+  @Test
+  void testRewritePartitioned() throws Exception {
+    Table table = createPartitionedTable();
+    insertPartitioned(table, 1, "p1");
+    insertPartitioned(table, 2, "p1");
+    insertPartitioned(table, 3, "p2");
+    insertPartitioned(table, 4, "p2");
+
+    assertFileNum(table, 4, 0);
+
+    appendRewriteDataFiles();
+
+    runAndWaitForSuccess(infra.env(), infra.source(), infra.sink());
+
+    assertFileNum(table, 2, 0);
+
+    SimpleDataUtil.assertTableRecords(
+        table,
+        ImmutableList.of(
+            createRecord(1, "p1"),
+            createRecord(2, "p1"),
+            createRecord(3, "p2"),
+            createRecord(4, "p2")));
+  }
+
+  @Test
+  void testPlannerFailure() throws Exception {
+    Table table = createTable();
+    insert(table, 1, "a");
+    insert(table, 2, "b");
+
+    assertFileNum(table, 2, 0);
+
+    appendRewriteDataFiles();
+
+    runAndWaitForFailure(infra.env(), infra.source(), infra.sink());
+
+    // Check the metrics. The first task should be successful, but the second 
one should fail. This
+    // should be represented in the counters.
+    MetricsReporterFactoryForTests.assertCounters(
+        new ImmutableMap.Builder<List<String>, Long>()
+            .put(
+                ImmutableList.of(
+                    PLANNER_TASK_NAME + "[0]",
+                    DUMMY_TABLE_NAME,
+                    DUMMY_TASK_NAME,
+                    "0",
+                    ERROR_COUNTER),
+                1L)
+            .put(
+                ImmutableList.of(
+                    REWRITE_TASK_NAME + "[0]",
+                    DUMMY_TABLE_NAME,
+                    DUMMY_TASK_NAME,
+                    "0",
+                    ERROR_COUNTER),
+                0L)
+            .put(
+                ImmutableList.of(
+                    COMMIT_TASK_NAME + "[0]",
+                    DUMMY_TABLE_NAME,
+                    DUMMY_TASK_NAME,
+                    "0",
+                    ERROR_COUNTER),
+                0L)
+            .put(
+                ImmutableList.of(
+                    COMMIT_TASK_NAME + "[0]",
+                    DUMMY_TABLE_NAME,
+                    DUMMY_TASK_NAME,
+                    "0",
+                    ADDED_DATA_FILE_NUM_METRIC),
+                1L)
+            .put(
+                ImmutableList.of(
+                    COMMIT_TASK_NAME + "[0]",
+                    DUMMY_TABLE_NAME,
+                    DUMMY_TASK_NAME,
+                    "0",
+                    ADDED_DATA_FILE_SIZE_METRIC),
+                -1L)
+            .put(
+                ImmutableList.of(
+                    COMMIT_TASK_NAME + "[0]",
+                    DUMMY_TABLE_NAME,
+                    DUMMY_TASK_NAME,
+                    "0",
+                    REMOVED_DATA_FILE_NUM_METRIC),
+                2L)
+            .put(
+                ImmutableList.of(
+                    COMMIT_TASK_NAME + "[0]",
+                    DUMMY_TABLE_NAME,
+                    DUMMY_TASK_NAME,
+                    "0",
+                    REMOVED_DATA_FILE_SIZE_METRIC),
+                -1L)
+            .build());
+  }
+
+  @Test
+  void testUidAndSlotSharingGroup() {
+    createTable();
+
+    RewriteDataFiles.builder()
+        .slotSharingGroup(SLOT_SHARING_GROUP)
+        .uidSuffix(UID_SUFFIX)
+        .append(
+            infra.triggerStream(),
+            DUMMY_TABLE_NAME,
+            DUMMY_TASK_NAME,
+            0,
+            tableLoader(),
+            "OTHER",
+            "OTHER",
+            1)
+        .sinkTo(infra.sink());
+
+    checkUidsAreSet(infra.env(), UID_SUFFIX);
+    checkSlotSharingGroupsAreSet(infra.env(), SLOT_SHARING_GROUP);
+  }
+
+  @Test
+  void testUidAndSlotSharingGroupUnset() {
+    createTable();
+
+    RewriteDataFiles.builder()
+        .append(
+            infra.triggerStream(),
+            DUMMY_TABLE_NAME,
+            DUMMY_TASK_NAME,
+            0,
+            tableLoader(),
+            UID_SUFFIX,
+            StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP,
+            1)
+        .sinkTo(infra.sink());
+
+    checkUidsAreSet(infra.env(), null);
+    checkSlotSharingGroupsAreSet(infra.env(), null);
+  }
+
+  @Test
+  void testMetrics() throws Exception {
+    Table table = createTable();
+    insert(table, 1, "a");
+    insert(table, 2, "b");
+
+    assertFileNum(table, 2, 0);
+
+    appendRewriteDataFiles();
+
+    runAndWaitForSuccess(infra.env(), infra.source(), infra.sink());
+
+    // Check the metrics
+    MetricsReporterFactoryForTests.assertCounters(
+        new ImmutableMap.Builder<List<String>, Long>()
+            .put(
+                ImmutableList.of(
+                    PLANNER_TASK_NAME + "[0]",
+                    DUMMY_TABLE_NAME,
+                    DUMMY_TASK_NAME,
+                    "0",
+                    ERROR_COUNTER),
+                0L)
+            .put(
+                ImmutableList.of(
+                    REWRITE_TASK_NAME + "[0]",
+                    DUMMY_TABLE_NAME,
+                    DUMMY_TASK_NAME,
+                    "0",
+                    ERROR_COUNTER),
+                0L)
+            .put(
+                ImmutableList.of(
+                    COMMIT_TASK_NAME + "[0]",
+                    DUMMY_TABLE_NAME,
+                    DUMMY_TASK_NAME,
+                    "0",
+                    ERROR_COUNTER),
+                0L)
+            .put(
+                ImmutableList.of(
+                    COMMIT_TASK_NAME + "[0]",
+                    DUMMY_TABLE_NAME,
+                    DUMMY_TASK_NAME,
+                    "0",
+                    ADDED_DATA_FILE_NUM_METRIC),
+                1L)
+            .put(
+                ImmutableList.of(
+                    COMMIT_TASK_NAME + "[0]",
+                    DUMMY_TABLE_NAME,
+                    DUMMY_TASK_NAME,
+                    "0",
+                    ADDED_DATA_FILE_SIZE_METRIC),
+                -1L)
+            .put(
+                ImmutableList.of(
+                    COMMIT_TASK_NAME + "[0]",
+                    DUMMY_TABLE_NAME,
+                    DUMMY_TASK_NAME,
+                    "0",
+                    REMOVED_DATA_FILE_NUM_METRIC),
+                2L)
+            .put(
+                ImmutableList.of(
+                    COMMIT_TASK_NAME + "[0]",
+                    DUMMY_TABLE_NAME,
+                    DUMMY_TASK_NAME,
+                    "0",
+                    REMOVED_DATA_FILE_SIZE_METRIC),
+                -1L)
+            .build());
+  }
+
+  @Test
+  void testV2Table() throws Exception {
+    Table table = createTableWithDelete();
+    update(table, 1, null, "a", "b");
+    update(table, 1, "b", "c");
+
+    assertFileNum(table, 2, 3);
+    SimpleDataUtil.assertTableRecords(table, ImmutableList.of(createRecord(1, 
"c")));
+
+    appendRewriteDataFiles();
+
+    runAndWaitForSuccess(infra.env(), infra.source(), infra.sink());
+
+    // After #11131 we don't remove the delete files
+    assertFileNum(table, 1, 3);
+
+    SimpleDataUtil.assertTableRecords(table, ImmutableList.of(createRecord(1, 
"c")));
+
+    // Check the metrics
+    MetricsReporterFactoryForTests.assertCounters(
+        new ImmutableMap.Builder<List<String>, Long>()
+            .put(
+                ImmutableList.of(
+                    PLANNER_TASK_NAME + "[0]",
+                    DUMMY_TABLE_NAME,
+                    DUMMY_TASK_NAME,
+                    "0",
+                    ERROR_COUNTER),
+                0L)
+            .put(
+                ImmutableList.of(
+                    REWRITE_TASK_NAME + "[0]",
+                    DUMMY_TABLE_NAME,
+                    DUMMY_TASK_NAME,
+                    "0",
+                    ERROR_COUNTER),
+                0L)
+            .put(
+                ImmutableList.of(
+                    COMMIT_TASK_NAME + "[0]",
+                    DUMMY_TABLE_NAME,
+                    DUMMY_TASK_NAME,
+                    "0",
+                    ERROR_COUNTER),
+                0L)
+            .put(
+                ImmutableList.of(
+                    COMMIT_TASK_NAME + "[0]",
+                    DUMMY_TABLE_NAME,
+                    DUMMY_TASK_NAME,
+                    "0",
+                    ADDED_DATA_FILE_NUM_METRIC),
+                1L)
+            .put(
+                ImmutableList.of(
+                    COMMIT_TASK_NAME + "[0]",
+                    DUMMY_TABLE_NAME,
+                    DUMMY_TASK_NAME,
+                    "0",
+                    ADDED_DATA_FILE_SIZE_METRIC),
+                -1L)
+            .put(
+                ImmutableList.of(
+                    COMMIT_TASK_NAME + "[0]",
+                    DUMMY_TABLE_NAME,
+                    DUMMY_TASK_NAME,
+                    "0",
+                    REMOVED_DATA_FILE_NUM_METRIC),
+                2L)
+            .put(
+                ImmutableList.of(
+                    COMMIT_TASK_NAME + "[0]",
+                    DUMMY_TABLE_NAME,
+                    DUMMY_TASK_NAME,
+                    "0",
+                    REMOVED_DATA_FILE_SIZE_METRIC),
+                -1L)
+            .build());
+  }
+
+  private void appendRewriteDataFiles() {
+    appendRewriteDataFiles(RewriteDataFiles.builder().rewriteAll(true));
+  }
+
+  private void appendRewriteDataFiles(RewriteDataFiles.Builder builder) {
+    builder
+        .append(
+            infra.triggerStream(),
+            DUMMY_TABLE_NAME,
+            DUMMY_TASK_NAME,
+            0,
+            tableLoader(),
+            UID_SUFFIX,
+            StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP,
+            1)
+        .sinkTo(infra.sink());
+  }
+
+  private static void assertFileNum(
+      Table table, int expectedDataFileNum, int expectedDeleteFileNum) {
+    table.refresh();
+    assertThat(
+            table.currentSnapshot().dataManifests(table.io()).stream()
+                .flatMap(
+                    m ->
+                        StreamSupport.stream(
+                            ManifestFiles.read(m, table.io(), 
table.specs()).spliterator(), false))
+                .count())
+        .isEqualTo(expectedDataFileNum);
+    assertThat(
+            table.currentSnapshot().deleteManifests(table.io()).stream()
+                .flatMap(
+                    m ->
+                        StreamSupport.stream(
+                            ManifestFiles.readDeleteManifest(m, table.io(), 
table.specs())
+                                .spliterator(),
+                            false))
+                .count())
+        .isEqualTo(expectedDeleteFileNum);
+  }
+}
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
index a577e40b2c..4a4bceffb1 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
@@ -33,16 +33,26 @@ import 
org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
 import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionData;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.data.FileHelpers;
 import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.deletes.PositionDelete;
 import org.apache.iceberg.flink.HadoopCatalogExtension;
 import org.apache.iceberg.flink.SimpleDataUtil;
 import org.apache.iceberg.flink.TableLoader;
 import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.maintenance.api.Trigger;
 import org.apache.iceberg.flink.maintenance.api.TriggerLockFactory;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
@@ -126,12 +136,85 @@ public class OperatorTestBase {
             ImmutableMap.of("format-version", "2", "write.upsert.enabled", 
"true"));
   }
 
+  protected static Table createPartitionedTable() {
+    return CATALOG_EXTENSION
+        .catalog()
+        .createTable(
+            TestFixtures.TABLE_IDENTIFIER,
+            SimpleDataUtil.SCHEMA,
+            
PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).identity("data").build(),
+            null,
+            ImmutableMap.of("flink.max-continuous-empty-commits", "100000"));
+  }
+
   protected void insert(Table table, Integer id, String data) throws 
IOException {
     new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir)
         .appendToTable(Lists.newArrayList(SimpleDataUtil.createRecord(id, 
data)));
     table.refresh();
   }
 
+  /**
+   * For the same identifier column id this methods simulate the following row 
operations: <tr>
+   * <li>add an equality delete on oldData
+   * <li>insert newData </tr>
+   *
+   * @param table to modify
+   * @param id the identifier column id
+   * @param oldData the old data to be deleted
+   * @param newData the new data to be inserted
+   */
+  protected void update(Table table, Integer id, String oldData, String 
newData)
+      throws IOException {
+    DataFile dataFile =
+        new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir)
+            .writeFile(Lists.newArrayList(SimpleDataUtil.createRecord(id, 
newData)));
+    DeleteFile eqDelete = writeEqualityDelete(table, id, oldData);
+
+    table.newRowDelta().addRows(dataFile).addDeletes(eqDelete).commit();
+  }
+
+  /**
+   * For the same identifier column id this methods simulate the following row 
operations: <tr>
+   * <li>add an equality delete on oldData
+   * <li>insert tempData
+   * <li>add a position delete on tempData
+   * <li>insert newData </tr>
+   *
+   * @param table to modify
+   * @param id the identifier column id
+   * @param oldData the old data to be deleted
+   * @param tempData the temp data to be inserted and deleted with a position 
delete
+   * @param newData the new data to be inserted
+   */
+  protected void update(Table table, Integer id, String oldData, String 
tempData, String newData)
+      throws IOException {
+    DataFile dataFile =
+        new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir)
+            .writeFile(
+                Lists.newArrayList(
+                    SimpleDataUtil.createRecord(id, tempData),
+                    SimpleDataUtil.createRecord(id, newData)));
+    DeleteFile eqDelete = writeEqualityDelete(table, id, oldData);
+    DeleteFile posDelete = writePosDelete(table, dataFile.path(), 0, id, 
tempData);
+
+    
table.newRowDelta().addRows(dataFile).addDeletes(eqDelete).addDeletes(posDelete).commit();
+  }
+
+  protected void insertPartitioned(Table table, Integer id, String data) 
throws IOException {
+    new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir)
+        .appendToTable(
+            TestHelpers.Row.of(data), 
Lists.newArrayList(SimpleDataUtil.createRecord(id, data)));
+    table.refresh();
+  }
+
+  protected void insertFullPartitioned(Table table, Integer id, String data) 
throws IOException {
+    new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir)
+        .appendToTable(
+            TestHelpers.Row.of(data, id),
+            Lists.newArrayList(SimpleDataUtil.createRecord(id, data)));
+    table.refresh();
+  }
+
   protected void dropTable() {
     
CATALOG_EXTENSION.catalogLoader().loadCatalog().dropTable(TestFixtures.TABLE_IDENTIFIER);
   }
@@ -214,6 +297,36 @@ public class OperatorTestBase {
     return config;
   }
 
+  private DeleteFile writeEqualityDelete(Table table, Integer id, String 
oldData)
+      throws IOException {
+    File file = File.createTempFile("junit", null, warehouseDir.toFile());
+    assertThat(file.delete()).isTrue();
+    return FileHelpers.writeDeleteFile(
+        table,
+        Files.localOutput(file),
+        new PartitionData(PartitionSpec.unpartitioned().partitionType()),
+        Lists.newArrayList(SimpleDataUtil.createRecord(id, oldData)),
+        SCHEMA_WITH_PRIMARY_KEY);
+  }
+
+  private DeleteFile writePosDelete(
+      Table table, CharSequence path, Integer pos, Integer id, String oldData) 
throws IOException {
+    File file = File.createTempFile("junit", null, warehouseDir.toFile());
+    assertThat(file.delete()).isTrue();
+    PositionDelete<GenericRecord> posDelete = PositionDelete.create();
+    GenericRecord nested = GenericRecord.create(table.schema());
+    nested.set(0, id);
+    nested.set(1, oldData);
+    posDelete.set(path, pos, nested);
+    return FileHelpers.writePosDeleteFile(
+        table, Files.localOutput(file), null, Lists.newArrayList(posDelete));
+  }
+
+  static void trigger(OneInputStreamOperatorTestHarness<Trigger, ?> harness) 
throws Exception {
+    long time = System.currentTimeMillis();
+    harness.processElement(Trigger.create(time, 0), time);
+  }
+
   private static class MemoryLock implements TriggerLockFactory.Lock {
     volatile boolean locked = false;
 
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java
new file mode 100644
index 0000000000..6e43009e08
--- /dev/null
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import static 
org.apache.iceberg.actions.SizeBasedFileRewritePlanner.MIN_INPUT_FILES;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.List;
+import java.util.Set;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.maintenance.api.Trigger;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+
+class RewriteUtil {
+  private RewriteUtil() {}
+
+  static List<DataFileRewritePlanner.PlannedGroup> 
planDataFileRewrite(TableLoader tableLoader)
+      throws Exception {
+    try (OneInputStreamOperatorTestHarness<Trigger, 
DataFileRewritePlanner.PlannedGroup>
+        testHarness =
+            ProcessFunctionTestHarnesses.forProcessFunction(
+                new DataFileRewritePlanner(
+                    OperatorTestBase.DUMMY_TABLE_NAME,
+                    OperatorTestBase.DUMMY_TABLE_NAME,
+                    0,
+                    tableLoader,
+                    11,
+                    10_000_000L,
+                    ImmutableMap.of(MIN_INPUT_FILES, "2")))) {
+      testHarness.open();
+
+      OperatorTestBase.trigger(testHarness);
+
+      
assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).isNull();
+      return testHarness.extractOutputValues();
+    }
+  }
+
+  static List<DataFileRewriteRunner.ExecutedGroup> executeRewrite(
+      List<DataFileRewritePlanner.PlannedGroup> elements) throws Exception {
+    try (OneInputStreamOperatorTestHarness<
+            DataFileRewritePlanner.PlannedGroup, 
DataFileRewriteRunner.ExecutedGroup>
+        testHarness =
+            ProcessFunctionTestHarnesses.forProcessFunction(
+                new DataFileRewriteRunner(
+                    OperatorTestBase.DUMMY_TABLE_NAME, 
OperatorTestBase.DUMMY_TABLE_NAME, 0))) {
+      testHarness.open();
+
+      for (DataFileRewritePlanner.PlannedGroup element : elements) {
+        testHarness.processElement(element, System.currentTimeMillis());
+      }
+
+      
assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).isNull();
+      return testHarness.extractOutputValues();
+    }
+  }
+
+  static Set<DataFile> newDataFiles(Table table) {
+    table.refresh();
+    return Sets.newHashSet(table.currentSnapshot().addedDataFiles(table.io()));
+  }
+}
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteCommitter.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteCommitter.java
new file mode 100644
index 0000000000..9e8f2ec921
--- /dev/null
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteCommitter.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import static 
org.apache.iceberg.flink.maintenance.operator.RewriteUtil.executeRewrite;
+import static 
org.apache.iceberg.flink.maintenance.operator.RewriteUtil.planDataFileRewrite;
+import static org.apache.iceberg.metrics.CommitMetricsResult.TOTAL_DATA_FILES;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.maintenance.api.Trigger;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.junit.jupiter.api.Test;
+
+class TestDataFileRewriteCommitter extends OperatorTestBase {
+  @Test
+  void testUnpartitioned() throws Exception {
+    Table table = createTable();
+    insert(table, 1, "p1");
+    insert(table, 2, "p2");
+    insert(table, 3, "p3");
+
+    List<DataFileRewritePlanner.PlannedGroup> planned = 
planDataFileRewrite(tableLoader());
+    assertThat(planned).hasSize(1);
+    List<DataFileRewriteRunner.ExecutedGroup> rewritten = 
executeRewrite(planned);
+    assertThat(rewritten).hasSize(1);
+
+    try 
(OneInputStreamOperatorTestHarness<DataFileRewriteRunner.ExecutedGroup, Trigger>
+        testHarness = harness()) {
+      testHarness.open();
+
+      testHarness.processElement(rewritten.get(0), EVENT_TIME);
+      assertThat(testHarness.extractOutputValues()).isEmpty();
+
+      testHarness.processWatermark(EVENT_TIME);
+      assertThat(testHarness.extractOutputValues()).isEmpty();
+    }
+
+    assertDataFiles(
+        table, rewritten.get(0).group().addedFiles(), 
rewritten.get(0).group().rewrittenFiles(), 1);
+  }
+
+  @Test
+  void testPartitioned() throws Exception {
+    Table table = createPartitionedTable();
+    insertPartitioned(table, 1, "p1");
+    insertPartitioned(table, 2, "p1");
+    insertPartitioned(table, 3, "p2");
+    insertPartitioned(table, 4, "p2");
+
+    List<DataFileRewritePlanner.PlannedGroup> planned = 
planDataFileRewrite(tableLoader());
+    assertThat(planned).hasSize(2);
+    List<DataFileRewriteRunner.ExecutedGroup> rewritten = 
executeRewrite(planned);
+    assertThat(rewritten).hasSize(2);
+    assertThat(rewritten.get(0).groupsPerCommit()).isEqualTo(1);
+    assertThat(rewritten.get(1).groupsPerCommit()).isEqualTo(1);
+    ensureDifferentGroups(rewritten);
+
+    try 
(OneInputStreamOperatorTestHarness<DataFileRewriteRunner.ExecutedGroup, Trigger>
+        testHarness = harness()) {
+      testHarness.open();
+
+      testHarness.processElement(rewritten.get(0), EVENT_TIME);
+      assertDataFiles(
+          table,
+          rewritten.get(0).group().addedFiles(),
+          rewritten.get(0).group().rewrittenFiles(),
+          3);
+
+      testHarness.processElement(rewritten.get(1), EVENT_TIME);
+      assertDataFiles(
+          table,
+          rewritten.get(1).group().addedFiles(),
+          rewritten.get(1).group().rewrittenFiles(),
+          2);
+
+      assertThat(testHarness.extractOutputValues()).isEmpty();
+
+      testHarness.processWatermark(EVENT_TIME);
+      assertThat(testHarness.extractOutputValues()).isEmpty();
+    }
+  }
+
+  @Test
+  void testNewTable() throws Exception {
+    Table table = createTable();
+    List<DataFileRewriteRunner.ExecutedGroup> rewritten;
+
+    try 
(OneInputStreamOperatorTestHarness<DataFileRewriteRunner.ExecutedGroup, Trigger>
+        testHarness = harness()) {
+      testHarness.open();
+
+      insert(table, 1, "p1");
+      insert(table, 2, "p2");
+      insert(table, 3, "p3");
+
+      List<DataFileRewritePlanner.PlannedGroup> planned = 
planDataFileRewrite(tableLoader());
+      assertThat(planned).hasSize(1);
+      rewritten = executeRewrite(planned);
+      assertThat(rewritten).hasSize(1);
+
+      testHarness.processElement(rewritten.get(0), EVENT_TIME);
+      assertThat(testHarness.extractOutputValues()).isEmpty();
+
+      testHarness.processWatermark(EVENT_TIME);
+      assertThat(testHarness.extractOutputValues()).isEmpty();
+    }
+
+    assertDataFiles(
+        table, rewritten.get(0).group().addedFiles(), 
rewritten.get(0).group().rewrittenFiles(), 1);
+  }
+
+  @Test
+  void testBatchSize() throws Exception {
+    Table table = createPartitionedTable();
+    insertPartitioned(table, 1, "p1");
+    insertPartitioned(table, 2, "p1");
+    insertPartitioned(table, 3, "p2");
+    insertPartitioned(table, 4, "p2");
+    insertPartitioned(table, 5, "p3");
+    insertPartitioned(table, 6, "p3");
+
+    List<DataFileRewritePlanner.PlannedGroup> planned = 
planDataFileRewrite(tableLoader());
+    assertThat(planned).hasSize(3);
+    List<DataFileRewriteRunner.ExecutedGroup> rewritten = 
executeRewrite(planned);
+    assertThat(rewritten).hasSize(3);
+    ensureDifferentGroups(rewritten);
+
+    try 
(OneInputStreamOperatorTestHarness<DataFileRewriteRunner.ExecutedGroup, Trigger>
+        testHarness = harness()) {
+      testHarness.open();
+
+      testHarness.processElement(setBatchSizeToTwo(rewritten.get(0)), 
EVENT_TIME);
+      assertNoChange(table);
+      testHarness.processElement(setBatchSizeToTwo(rewritten.get(1)), 
EVENT_TIME);
+
+      Set<DataFile> added = 
Sets.newHashSet(rewritten.get(0).group().addedFiles());
+      added.addAll(rewritten.get(1).group().addedFiles());
+      Set<DataFile> removed = 
Sets.newHashSet(rewritten.get(0).group().rewrittenFiles());
+      removed.addAll(rewritten.get(1).group().rewrittenFiles());
+      assertDataFiles(table, added, removed, 4);
+
+      testHarness.processElement(setBatchSizeToTwo(rewritten.get(2)), 
EVENT_TIME);
+      assertNoChange(table);
+
+      assertThat(testHarness.extractOutputValues()).isEmpty();
+
+      testHarness.processWatermark(EVENT_TIME);
+      assertThat(testHarness.extractOutputValues()).isEmpty();
+    }
+
+    // This should be committed on close
+    assertDataFiles(
+        table, rewritten.get(2).group().addedFiles(), 
rewritten.get(2).group().rewrittenFiles(), 3);
+  }
+
+  @Test
+  void testError() throws Exception {
+    Table table = createPartitionedTable();
+    insertPartitioned(table, 1, "p1");
+    insertPartitioned(table, 2, "p1");
+    insertPartitioned(table, 3, "p2");
+    insertPartitioned(table, 4, "p2");
+    insertPartitioned(table, 5, "p3");
+    insertPartitioned(table, 6, "p3");
+    insertPartitioned(table, 7, "p4");
+    insertPartitioned(table, 8, "p4");
+
+    List<DataFileRewritePlanner.PlannedGroup> planned = 
planDataFileRewrite(tableLoader());
+    assertThat(planned).hasSize(4);
+    List<DataFileRewriteRunner.ExecutedGroup> rewritten = 
executeRewrite(planned);
+    assertThat(rewritten).hasSize(4);
+
+    try 
(OneInputStreamOperatorTestHarness<DataFileRewriteRunner.ExecutedGroup, Trigger>
+        testHarness = harness()) {
+      testHarness.open();
+
+      testHarness.processElement(setBatchSizeToTwo(rewritten.get(0)), 
EVENT_TIME);
+      assertNoChange(table);
+      
assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).isNull();
+
+      DataFileRewriteRunner.ExecutedGroup group = 
spy(setBatchSizeToTwo(rewritten.get(1)));
+      when(group.group()).thenThrow(new RuntimeException("Testing error"));
+      testHarness.processElement(group, EVENT_TIME);
+
+      
assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).hasSize(1);
+      assertThat(
+              testHarness
+                  .getSideOutput(TaskResultAggregator.ERROR_STREAM)
+                  .poll()
+                  .getValue()
+                  .getMessage())
+          .contains("Testing error");
+    }
+  }
+
+  private 
OneInputStreamOperatorTestHarness<DataFileRewriteRunner.ExecutedGroup, Trigger> 
harness()
+      throws Exception {
+    return new OneInputStreamOperatorTestHarness<>(
+        new DataFileRewriteCommitter(
+            OperatorTestBase.DUMMY_TABLE_NAME,
+            OperatorTestBase.DUMMY_TABLE_NAME,
+            0,
+            tableLoader()));
+  }
+
+  private static DataFileRewriteRunner.ExecutedGroup setBatchSizeToTwo(
+      DataFileRewriteRunner.ExecutedGroup from) {
+    return new DataFileRewriteRunner.ExecutedGroup(from.snapshotId(), 2, 
from.group());
+  }
+
+  // Ensure that the groups are different, so the tests are not accidentally 
passing
+  private static void 
ensureDifferentGroups(List<DataFileRewriteRunner.ExecutedGroup> rewritten) {
+    List<String> resultFiles =
+        rewritten.stream()
+            .flatMap(task -> 
task.group().addedFiles().stream().map(ContentFile::location))
+            .collect(Collectors.toList());
+    assertThat(resultFiles).hasSize(Set.copyOf(resultFiles).size());
+  }
+
+  /**
+   * Assert that the number of the data files in the table is as expected. 
Additionally, tests that
+   * the last commit contains the expected added and removed files.
+   *
+   * @param table the table to check
+   * @param expectedAdded the expected added data files
+   * @param expectedRemoved the expected removed data files
+   * @param expectedCurrent the expected current data files count
+   */
+  private static void assertDataFiles(
+      Table table,
+      Set<DataFile> expectedAdded,
+      Set<DataFile> expectedRemoved,
+      long expectedCurrent) {
+    table.refresh();
+
+    assertThat(table.currentSnapshot().summary().get(TOTAL_DATA_FILES))
+        .isEqualTo(String.valueOf(expectedCurrent));
+    Set<DataFile> actualAdded = 
Sets.newHashSet(table.currentSnapshot().addedDataFiles(table.io()));
+    Set<DataFile> actualRemoved =
+        Sets.newHashSet(table.currentSnapshot().removedDataFiles(table.io()));
+    
assertThat(actualAdded.stream().map(DataFile::location).collect(Collectors.toSet()))
+        
.isEqualTo(expectedAdded.stream().map(DataFile::location).collect(Collectors.toSet()));
+    
assertThat(actualRemoved.stream().map(DataFile::location).collect(Collectors.toSet()))
+        
.isEqualTo(expectedRemoved.stream().map(DataFile::location).collect(Collectors.toSet()));
+  }
+
+  private static void assertNoChange(Table table) {
+    long original = table.currentSnapshot().snapshotId();
+    table.refresh();
+
+    assertThat(table.currentSnapshot().snapshotId()).isEqualTo(original);
+  }
+}
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
new file mode 100644
index 0000000000..1d7e29813a
--- /dev/null
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import static 
org.apache.iceberg.actions.SizeBasedFileRewritePlanner.MIN_INPUT_FILES;
+import static 
org.apache.iceberg.flink.maintenance.operator.RewriteUtil.newDataFiles;
+import static 
org.apache.iceberg.flink.maintenance.operator.RewriteUtil.planDataFileRewrite;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.maintenance.api.Trigger;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.junit.jupiter.api.Test;
+
+class TestDataFileRewritePlanner extends OperatorTestBase {
+  @Test
+  void testUnpartitioned() throws Exception {
+    Set<DataFile> expected = Sets.newHashSetWithExpectedSize(3);
+    Table table = createTable();
+    insert(table, 1, "a");
+    expected.addAll(newDataFiles(table));
+    insert(table, 2, "b");
+    expected.addAll(newDataFiles(table));
+    insert(table, 3, "c");
+    expected.addAll(newDataFiles(table));
+
+    List<DataFileRewritePlanner.PlannedGroup> actual = 
planDataFileRewrite(tableLoader());
+
+    assertThat(actual).hasSize(1);
+    assertRewriteFileGroup(actual.get(0), table, expected);
+  }
+
+  @Test
+  void testPartitioned() throws Exception {
+    Set<DataFile> expectedP1 = Sets.newHashSetWithExpectedSize(2);
+    Set<DataFile> expectedP2 = Sets.newHashSetWithExpectedSize(2);
+    Table table = createPartitionedTable();
+    insertPartitioned(table, 1, "p1");
+    expectedP1.addAll(newDataFiles(table));
+    insertPartitioned(table, 2, "p1");
+    expectedP1.addAll(newDataFiles(table));
+
+    insertPartitioned(table, 3, "p2");
+    expectedP2.addAll(newDataFiles(table));
+    insertPartitioned(table, 4, "p2");
+    expectedP2.addAll(newDataFiles(table));
+
+    // This should not participate in compaction, as there is no more files in 
the partition
+    insertPartitioned(table, 5, "p3");
+
+    List<DataFileRewritePlanner.PlannedGroup> actual = 
planDataFileRewrite(tableLoader());
+
+    assertThat(actual).hasSize(2);
+    if (actual.get(0).group().info().partition().get(0, 
String.class).equals("p1")) {
+      assertRewriteFileGroup(actual.get(0), table, expectedP1);
+      assertRewriteFileGroup(actual.get(1), table, expectedP2);
+    } else {
+      assertRewriteFileGroup(actual.get(0), table, expectedP2);
+      assertRewriteFileGroup(actual.get(1), table, expectedP1);
+    }
+  }
+
+  @Test
+  void testError() throws Exception {
+    Table table = createTable();
+    insert(table, 1, "a");
+    insert(table, 2, "b");
+
+    try (OneInputStreamOperatorTestHarness<Trigger, 
DataFileRewritePlanner.PlannedGroup>
+        testHarness =
+            ProcessFunctionTestHarnesses.forProcessFunction(
+                new DataFileRewritePlanner(
+                    OperatorTestBase.DUMMY_TABLE_NAME,
+                    OperatorTestBase.DUMMY_TABLE_NAME,
+                    0,
+                    tableLoader(),
+                    11,
+                    1L,
+                    ImmutableMap.of(MIN_INPUT_FILES, "2")))) {
+      testHarness.open();
+
+      // Cause an exception
+      dropTable();
+
+      
assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).isNull();
+      trigger(testHarness);
+      
assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).hasSize(1);
+      assertThat(
+              testHarness
+                  .getSideOutput(TaskResultAggregator.ERROR_STREAM)
+                  .poll()
+                  .getValue()
+                  .getMessage())
+          .contains("Table does not exist: ");
+    }
+  }
+
+  @Test
+  void testV2Table() throws Exception {
+    Table table = createTableWithDelete();
+    update(table, 1, null, "a", "b");
+    update(table, 1, "b", "c");
+
+    List<DataFileRewritePlanner.PlannedGroup> actual = 
planDataFileRewrite(tableLoader());
+
+    assertThat(actual).hasSize(1);
+    List<FileScanTask> tasks = actual.get(0).group().fileScanTasks();
+    assertThat(tasks).hasSize(2);
+    // Find the task with the deletes
+    FileScanTask withDelete = tasks.get(0).deletes().isEmpty() ? tasks.get(1) 
: tasks.get(0);
+    assertThat(withDelete.deletes()).hasSize(2);
+    
assertThat(withDelete.deletes().stream().map(ContentFile::content).collect(Collectors.toList()))
+        .containsExactlyInAnyOrder(FileContent.POSITION_DELETES, 
FileContent.EQUALITY_DELETES);
+  }
+
+  @Test
+  void testMaxRewriteBytes() throws Exception {
+    Table table = createPartitionedTable();
+    insertPartitioned(table, 1, "p1");
+    insertPartitioned(table, 2, "p1");
+    insertPartitioned(table, 3, "p2");
+    insertPartitioned(table, 4, "p2");
+
+    // First run with high maxRewriteBytes
+    List<DataFileRewritePlanner.PlannedGroup> planWithNoMaxRewriteBytes =
+        planDataFileRewrite(tableLoader());
+    assertThat(planWithNoMaxRewriteBytes).hasSize(2);
+
+    // Second run with low maxRewriteBytes, the 2nd group should be removed 
from the plan
+    long maxRewriteBytes =
+        
planWithNoMaxRewriteBytes.get(0).group().fileScanTasks().get(0).sizeBytes()
+            + 
planWithNoMaxRewriteBytes.get(1).group().fileScanTasks().get(0).sizeBytes()
+            + 1;
+    try (OneInputStreamOperatorTestHarness<Trigger, 
DataFileRewritePlanner.PlannedGroup>
+        testHarness =
+            ProcessFunctionTestHarnesses.forProcessFunction(
+                new DataFileRewritePlanner(
+                    OperatorTestBase.DUMMY_TABLE_NAME,
+                    OperatorTestBase.DUMMY_TABLE_NAME,
+                    0,
+                    tableLoader(),
+                    11,
+                    maxRewriteBytes,
+                    ImmutableMap.of(MIN_INPUT_FILES, "2")))) {
+      testHarness.open();
+
+      OperatorTestBase.trigger(testHarness);
+
+      
assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).isNull();
+      // Only a single group is planned
+      assertThat(testHarness.extractOutputValues()).hasSize(1);
+    }
+  }
+
+  void assertRewriteFileGroup(
+      DataFileRewritePlanner.PlannedGroup plannedGroup, Table table, 
Set<DataFile> files) {
+    assertThat(plannedGroup.table().currentSnapshot().snapshotId())
+        .isEqualTo(table.currentSnapshot().snapshotId());
+    assertThat(plannedGroup.groupsPerCommit()).isEqualTo(1);
+    assertThat(
+            plannedGroup.group().fileScanTasks().stream()
+                .map(s -> s.file().location())
+                .collect(Collectors.toSet()))
+        .containsExactlyInAnyOrderElementsOf(
+            
files.stream().map(ContentFile::location).collect(Collectors.toList()));
+  }
+}
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java
new file mode 100644
index 0000000000..6dc8fb3c02
--- /dev/null
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import static 
org.apache.iceberg.actions.RewriteDataFiles.TARGET_FILE_SIZE_BYTES;
+import static 
org.apache.iceberg.actions.SizeBasedFileRewritePlanner.MIN_INPUT_FILES;
+import static 
org.apache.iceberg.flink.maintenance.operator.RewriteUtil.executeRewrite;
+import static 
org.apache.iceberg.flink.maintenance.operator.RewriteUtil.planDataFileRewrite;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionData;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.flink.maintenance.api.Trigger;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+class TestDataFileRewriteRunner extends OperatorTestBase {
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void testExecute(boolean partitioned) throws Exception {
+    Table table;
+    PartitionData partition;
+    if (partitioned) {
+      table = createPartitionedTable();
+      partition = new PartitionData(table.spec().partitionType());
+      partition.set(0, "p1");
+      insertPartitioned(table, 1, "p1");
+      insertPartitioned(table, 2, "p1");
+      insertPartitioned(table, 3, "p1");
+    } else {
+      table = createTable();
+      partition = new 
PartitionData(PartitionSpec.unpartitioned().partitionType());
+      insert(table, 1, "p1");
+      insert(table, 2, "p1");
+      insert(table, 3, "p1");
+    }
+
+    List<DataFileRewritePlanner.PlannedGroup> planned = 
planDataFileRewrite(tableLoader());
+    assertThat(planned).hasSize(1);
+    List<DataFileRewriteRunner.ExecutedGroup> actual = executeRewrite(planned);
+    assertThat(actual).hasSize(1);
+
+    assertRewriteFileGroup(
+        actual.get(0),
+        table,
+        records(
+            table.schema(),
+            ImmutableSet.of(
+                ImmutableList.of(1, "p1"), ImmutableList.of(2, "p1"), 
ImmutableList.of(3, "p1"))),
+        1,
+        ImmutableSet.of(partition));
+  }
+
+  @Test
+  void testPartitionSpecChange() throws Exception {
+    Table table = createPartitionedTable();
+    insertPartitioned(table, 1, "p1");
+    insertPartitioned(table, 2, "p1");
+    PartitionData oldPartition = new 
PartitionData(table.spec().partitionType());
+    oldPartition.set(0, "p1");
+
+    try (OneInputStreamOperatorTestHarness<
+            DataFileRewritePlanner.PlannedGroup, 
DataFileRewriteRunner.ExecutedGroup>
+        testHarness =
+            ProcessFunctionTestHarnesses.forProcessFunction(
+                new DataFileRewriteRunner(
+                    OperatorTestBase.DUMMY_TABLE_NAME, 
OperatorTestBase.DUMMY_TABLE_NAME, 0))) {
+      testHarness.open();
+
+      List<DataFileRewritePlanner.PlannedGroup> planned = 
planDataFileRewrite(tableLoader());
+      assertThat(planned).hasSize(1);
+
+      testHarness.processElement(planned.get(0), System.currentTimeMillis());
+      List<DataFileRewriteRunner.ExecutedGroup> actual = 
testHarness.extractOutputValues();
+      assertThat(actual).hasSize(1);
+      assertRewriteFileGroup(
+          actual.get(0),
+          table,
+          records(
+              table.schema(),
+              ImmutableSet.of(ImmutableList.of(1, "p1"), ImmutableList.of(2, 
"p1"))),
+          1,
+          ImmutableSet.of(oldPartition));
+
+      insertPartitioned(table, 3, "p1");
+
+      planned = planDataFileRewrite(tableLoader());
+      assertThat(planned).hasSize(1);
+
+      testHarness.processElement(planned.get(0), System.currentTimeMillis());
+      actual = testHarness.extractOutputValues();
+      assertThat(actual).hasSize(2);
+      assertRewriteFileGroup(
+          actual.get(1),
+          table,
+          records(
+              table.schema(),
+              ImmutableSet.of(
+                  ImmutableList.of(1, "p1"), ImmutableList.of(2, "p1"), 
ImmutableList.of(3, "p1"))),
+          1,
+          ImmutableSet.of(oldPartition));
+
+      // Alter the table schema
+      table.updateSpec().addField("id").commit();
+      // Insert some now data
+      insertFullPartitioned(table, 4, "p1");
+      insertFullPartitioned(table, 4, "p1");
+      PartitionData newPartition = new 
PartitionData(table.spec().partitionType());
+      newPartition.set(0, "p1");
+      newPartition.set(1, 4);
+      table.refresh();
+
+      planned = planDataFileRewrite(tableLoader());
+      assertThat(planned).hasSize(2);
+      DataFileRewritePlanner.PlannedGroup oldCompact = planned.get(0);
+      DataFileRewritePlanner.PlannedGroup newCompact = planned.get(1);
+      if (oldCompact.group().inputFileNum() == 2) {
+        newCompact = planned.get(0);
+        oldCompact = planned.get(1);
+      }
+
+      testHarness.processElement(newCompact, System.currentTimeMillis());
+      actual = testHarness.extractOutputValues();
+      assertThat(actual).hasSize(3);
+      assertRewriteFileGroup(
+          actual.get(2),
+          table,
+          records(
+              table.schema(),
+              ImmutableList.of(ImmutableList.of(4, "p1"), ImmutableList.of(4, 
"p1"))),
+          1,
+          ImmutableSet.of(newPartition));
+
+      testHarness.processElement(oldCompact, System.currentTimeMillis());
+      actual = testHarness.extractOutputValues();
+      assertThat(actual).hasSize(4);
+      PartitionData[] transformedPartitions = {
+        newPartition.copy(), newPartition.copy(), newPartition.copy()
+      };
+      transformedPartitions[0].set(1, 1);
+      transformedPartitions[1].set(1, 2);
+      transformedPartitions[2].set(1, 3);
+      assertRewriteFileGroup(
+          actual.get(3),
+          table,
+          records(
+              table.schema(),
+              ImmutableSet.of(
+                  ImmutableList.of(1, "p1"), ImmutableList.of(2, "p1"), 
ImmutableList.of(3, "p1"))),
+          3,
+          Sets.newHashSet(transformedPartitions));
+    }
+  }
+
+  @Test
+  void testError() throws Exception {
+    Table table = createTable();
+    insert(table, 1, "a");
+    insert(table, 2, "b");
+
+    try (OneInputStreamOperatorTestHarness<
+            DataFileRewritePlanner.PlannedGroup, 
DataFileRewriteRunner.ExecutedGroup>
+        testHarness =
+            ProcessFunctionTestHarnesses.forProcessFunction(
+                new DataFileRewriteRunner(
+                    OperatorTestBase.DUMMY_TABLE_NAME, 
OperatorTestBase.DUMMY_TABLE_NAME, 0))) {
+      testHarness.open();
+
+      List<DataFileRewritePlanner.PlannedGroup> planned = 
planDataFileRewrite(tableLoader());
+      assertThat(planned).hasSize(1);
+      // Cause an exception
+      dropTable();
+
+      
assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).isNull();
+      testHarness.processElement(planned.get(0), System.currentTimeMillis());
+      
assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).hasSize(1);
+      assertThat(
+              testHarness
+                  .getSideOutput(TaskResultAggregator.ERROR_STREAM)
+                  .poll()
+                  .getValue()
+                  .getMessage())
+          .contains("File does not exist: ");
+    }
+  }
+
+  @Test
+  void testV2Table() throws Exception {
+    Table table = createTableWithDelete();
+    update(table, 1, null, "a", "b");
+    update(table, 1, "b", "c");
+
+    List<DataFileRewritePlanner.PlannedGroup> planned = 
planDataFileRewrite(tableLoader());
+    assertThat(planned).hasSize(1);
+
+    List<DataFileRewriteRunner.ExecutedGroup> actual = executeRewrite(planned);
+    assertThat(actual).hasSize(1);
+
+    assertRewriteFileGroup(
+        actual.get(0),
+        table,
+        records(table.schema(), ImmutableSet.of(ImmutableList.of(1, "c"))),
+        1,
+        ImmutableSet.of(new 
PartitionData(PartitionSpec.unpartitioned().partitionType())));
+  }
+
+  @Test
+  void testSplitSize() throws Exception {
+    Table table = createTable();
+
+    File dataDir = new File(new Path(table.location(), 
"data").toUri().getPath());
+    dataDir.mkdir();
+    GenericAppenderHelper dataAppender =
+        new GenericAppenderHelper(table, FileFormat.PARQUET, dataDir.toPath());
+    List<Record> expected = Lists.newArrayListWithExpectedSize(4000);
+    for (int i = 0; i < 4; ++i) {
+      List<Record> batch = RandomGenericData.generate(table.schema(), 1000, 10 
+ i);
+      dataAppender.appendToTable(batch);
+      expected.addAll(batch);
+    }
+
+    // First run with high target file size
+    List<DataFileRewritePlanner.PlannedGroup> planWithNoTargetFileSize =
+        planDataFileRewrite(tableLoader());
+    assertThat(planWithNoTargetFileSize).hasSize(1);
+
+    // Second run with low target file size
+    long targetFileSize =
+        
planWithNoTargetFileSize.get(0).group().fileScanTasks().get(0).sizeBytes()
+            + 
planWithNoTargetFileSize.get(0).group().fileScanTasks().get(1).sizeBytes();
+    List<DataFileRewritePlanner.PlannedGroup> planned;
+    try (OneInputStreamOperatorTestHarness<Trigger, 
DataFileRewritePlanner.PlannedGroup>
+        testHarness =
+            ProcessFunctionTestHarnesses.forProcessFunction(
+                new DataFileRewritePlanner(
+                    OperatorTestBase.DUMMY_TABLE_NAME,
+                    OperatorTestBase.DUMMY_TABLE_NAME,
+                    0,
+                    tableLoader(),
+                    11,
+                    10_000_000,
+                    ImmutableMap.of(
+                        MIN_INPUT_FILES,
+                        "2",
+                        TARGET_FILE_SIZE_BYTES,
+                        String.valueOf(targetFileSize))))) {
+      testHarness.open();
+
+      OperatorTestBase.trigger(testHarness);
+
+      
assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).isNull();
+      planned = testHarness.extractOutputValues();
+      assertThat(planned).hasSize(1);
+    }
+
+    List<DataFileRewriteRunner.ExecutedGroup> actual = executeRewrite(planned);
+    assertThat(actual).hasSize(1);
+
+    assertRewriteFileGroup(
+        actual.get(0),
+        table,
+        expected,
+        2,
+        ImmutableSet.of(new 
PartitionData(PartitionSpec.unpartitioned().partitionType())));
+  }
+
+  void assertRewriteFileGroup(
+      DataFileRewriteRunner.ExecutedGroup actual,
+      Table table,
+      Collection<Record> expectedRecords,
+      int expectedFileNum,
+      Set<StructLike> expectedPartitions)
+      throws IOException {
+    
assertThat(actual.snapshotId()).isEqualTo(table.currentSnapshot().snapshotId());
+    assertThat(actual.groupsPerCommit()).isEqualTo(1);
+    assertThat(actual.group().addedFiles()).hasSize(expectedFileNum);
+    Collection<Record> writtenRecords = 
Lists.newArrayListWithExpectedSize(expectedRecords.size());
+    Set<StructLike> writtenPartitions = 
Sets.newHashSetWithExpectedSize(expectedPartitions.size());
+    for (DataFile newDataFile : actual.group().addedFiles()) {
+      assertThat(newDataFile.format()).isEqualTo(FileFormat.PARQUET);
+      assertThat(newDataFile.content()).isEqualTo(FileContent.DATA);
+      assertThat(newDataFile.keyMetadata()).isNull();
+      writtenPartitions.add(newDataFile.partition());
+
+      try (CloseableIterable<Record> reader =
+          Parquet.read(table.io().newInputFile(newDataFile.location()))
+              .project(table.schema())
+              .createReaderFunc(
+                  fileSchema -> 
GenericParquetReaders.buildReader(table.schema(), fileSchema))
+              .build()) {
+        List<Record> newRecords = Lists.newArrayList(reader);
+        assertThat(newRecords).hasSize((int) newDataFile.recordCount());
+        writtenRecords.addAll(newRecords);
+      }
+    }
+
+    
assertThat(writtenRecords).containsExactlyInAnyOrderElementsOf(expectedRecords);
+    assertThat(writtenPartitions).isEqualTo(expectedPartitions);
+  }
+
+  private List<Record> records(Schema schema, Collection<List<Object>> data) {
+    GenericRecord record = GenericRecord.create(schema);
+
+    ImmutableList.Builder<Record> builder = ImmutableList.builder();
+    data.forEach(
+        recordData ->
+            builder.add(
+                record.copy(ImmutableMap.of("id", recordData.get(0), "data", 
recordData.get(1)))));
+
+    return builder.build();
+  }
+}
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDeleteFilesProcessor.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDeleteFilesProcessor.java
index d70c4aafd5..7511e1029b 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDeleteFilesProcessor.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDeleteFilesProcessor.java
@@ -90,8 +90,7 @@ class TestDeleteFilesProcessor extends OperatorTestBase {
     tableLoader().open();
     try (OneInputStreamOperatorTestHarness<String, Void> testHarness =
         new OneInputStreamOperatorTestHarness<>(
-            new DeleteFilesProcessor(0, DUMMY_TASK_NAME, 
tableLoader.loadTable(), 10),
-            StringSerializer.INSTANCE)) {
+            new DeleteFilesProcessor(table, DUMMY_TASK_NAME, 0, 10), 
StringSerializer.INSTANCE)) {
       testHarness.open();
       testHarness.processElement(fileName, System.currentTimeMillis());
       testHarness.processWatermark(EVENT_TIME);

Reply via email to