This is an automated email from the ASF dual-hosted git repository.
huaxingao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 386f0ea6ab Flink: Backport: Add branch support to RewriteDataFiles
maintenance task (#15672) (#15690)
386f0ea6ab is described below
commit 386f0ea6abe6cd988178ee922b7cceaaf0d2d46b
Author: Maximilian Michels <[email protected]>
AuthorDate: Thu Mar 19 23:49:42 2026 +0100
Flink: Backport: Add branch support to RewriteDataFiles maintenance task
(#15672) (#15690)
---
.../flink/maintenance/api/RewriteDataFiles.java | 20 +++++++++-
.../operator/DataFileRewriteCommitter.java | 19 +++++++--
.../operator/DataFileRewritePlanner.java | 24 +++++++++---
.../operator/DataFileRewriteRunner.java | 2 +-
.../org/apache/iceberg/flink/sink/IcebergSink.java | 5 ++-
.../maintenance/api/TestRewriteDataFiles.java | 35 +++++++++++++++++
.../flink/maintenance/operator/RewriteUtil.java | 4 +-
.../operator/TestDataFileRewriteCommitter.java | 4 +-
.../operator/TestDataFileRewritePlanner.java | 45 +++++++++++++++++++++-
.../operator/TestDataFileRewriteRunner.java | 4 +-
.../flink/maintenance/api/RewriteDataFiles.java | 20 +++++++++-
.../operator/DataFileRewriteCommitter.java | 19 +++++++--
.../operator/DataFileRewritePlanner.java | 24 +++++++++---
.../operator/DataFileRewriteRunner.java | 2 +-
.../org/apache/iceberg/flink/sink/IcebergSink.java | 5 ++-
.../maintenance/api/TestRewriteDataFiles.java | 35 +++++++++++++++++
.../flink/maintenance/operator/RewriteUtil.java | 4 +-
.../operator/TestDataFileRewriteCommitter.java | 4 +-
.../operator/TestDataFileRewritePlanner.java | 45 +++++++++++++++++++++-
.../operator/TestDataFileRewriteRunner.java | 4 +-
20 files changed, 288 insertions(+), 36 deletions(-)
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
index 3b64a79eee..9aeee75b14 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.actions.BinPackRewriteFilePlanner;
import org.apache.iceberg.actions.SizeBasedFileRewritePlanner;
import org.apache.iceberg.expressions.Expression;
@@ -59,6 +60,7 @@ public class RewriteDataFiles {
private final Map<String, String> rewriteOptions =
Maps.newHashMapWithExpectedSize(6);
private long maxRewriteBytes = Long.MAX_VALUE;
private Expression filter = Expressions.alwaysTrue();
+ private String branch = SnapshotRef.MAIN_BRANCH;
@Override
String maintenanceTaskName() {
@@ -218,6 +220,18 @@ public class RewriteDataFiles {
return this;
}
+ /**
+ * Sets the branch to compact. When set, the planner reads from the
branch's snapshot and
+ * commits are made to this branch.
+ *
+ * @param newBranch the branch name
+ * @return this for method chaining
+ */
+ public Builder branch(String newBranch) {
+ this.branch = newBranch;
+ return this;
+ }
+
/**
* Configures the properties for the rewriter.
*
@@ -262,7 +276,8 @@ public class RewriteDataFiles {
partialProgressEnabled ? partialProgressMaxCommits : 1,
maxRewriteBytes,
rewriteOptions,
- filter))
+ filter,
+ branch))
.name(operatorName(PLANNER_TASK_NAME))
.uid(PLANNER_TASK_NAME + uidSuffix())
.slotSharingGroup(slotSharingGroup())
@@ -282,7 +297,8 @@ public class RewriteDataFiles {
.transform(
operatorName(COMMIT_TASK_NAME),
TypeInformation.of(Trigger.class),
- new DataFileRewriteCommitter(tableName(), taskName(),
index(), tableLoader()))
+ new DataFileRewriteCommitter(
+ tableName(), taskName(), index(), tableLoader(), branch))
.uid(COMMIT_TASK_NAME + uidSuffix())
.slotSharingGroup(slotSharingGroup())
.forceNonParallel();
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteCommitter.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteCommitter.java
index 135d3d9b42..1125e5d9b6 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteCommitter.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteCommitter.java
@@ -29,12 +29,14 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFiles;
import org.apache.iceberg.actions.RewriteDataFilesCommitManager;
import org.apache.iceberg.actions.RewriteDataFilesCommitManager.CommitService;
import org.apache.iceberg.actions.RewriteFileGroup;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.maintenance.api.Trigger;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,6 +54,7 @@ public class DataFileRewriteCommitter extends
AbstractStreamOperator<Trigger>
private final String taskName;
private final int taskIndex;
private final TableLoader tableLoader;
+ private final String branch;
private transient Table table;
private transient CommitService commitService;
@@ -62,15 +65,17 @@ public class DataFileRewriteCommitter extends
AbstractStreamOperator<Trigger>
private transient Counter removedDataFileSizeCounter;
public DataFileRewriteCommitter(
- String tableName, String taskName, int taskIndex, TableLoader
tableLoader) {
+ String tableName, String taskName, int taskIndex, TableLoader
tableLoader, String branch) {
Preconditions.checkNotNull(tableName, "Table name should no be null");
Preconditions.checkNotNull(taskName, "Task name should no be null");
Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
+ Preconditions.checkNotNull(branch, "Branch should not be null");
this.tableName = tableName;
this.taskName = taskName;
this.taskIndex = taskIndex;
this.tableLoader = tableLoader;
+ this.branch = branch;
}
@Override
@@ -103,7 +108,7 @@ public class DataFileRewriteCommitter extends
AbstractStreamOperator<Trigger>
FlinkRewriteDataFilesCommitManager commitManager =
new FlinkRewriteDataFilesCommitManager(
- table, executedGroup.snapshotId(),
streamRecord.getTimestamp());
+ table, executedGroup.snapshotId(),
streamRecord.getTimestamp(), branch);
this.commitService =
commitManager.service(executedGroup.groupsPerCommit());
commitService.start();
}
@@ -164,8 +169,14 @@ public class DataFileRewriteCommitter extends
AbstractStreamOperator<Trigger>
private class FlinkRewriteDataFilesCommitManager extends
RewriteDataFilesCommitManager {
private final long timestamp;
- FlinkRewriteDataFilesCommitManager(Table table, long startingSnapshotId,
long timestamp) {
- super(table, startingSnapshotId);
+ FlinkRewriteDataFilesCommitManager(
+ Table table, long startingSnapshotId, long timestamp, String branch) {
+ super(
+ table,
+ startingSnapshotId,
+ RewriteDataFiles.USE_STARTING_SEQUENCE_NUMBER_DEFAULT,
+ ImmutableMap.of(),
+ branch);
this.timestamp = timestamp;
}
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java
index 6751caeb28..feb2dd26c8 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java
@@ -29,6 +29,7 @@ import org.apache.flink.util.Collector;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
import org.apache.iceberg.actions.BinPackRewriteFilePlanner;
import org.apache.iceberg.actions.FileRewritePlan;
import org.apache.iceberg.actions.RewriteDataFiles;
@@ -62,6 +63,7 @@ public class DataFileRewritePlanner
private final Map<String, String> rewriterOptions;
private transient Counter errorCounter;
private final Expression filter;
+ private final String branch;
public DataFileRewritePlanner(
String tableName,
@@ -71,12 +73,14 @@ public class DataFileRewritePlanner
int newPartialProgressMaxCommits,
long maxRewriteBytes,
Map<String, String> rewriterOptions,
- Expression filter) {
+ Expression filter,
+ String branch) {
Preconditions.checkNotNull(tableName, "Table name should no be null");
Preconditions.checkNotNull(taskName, "Task name should no be null");
Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
Preconditions.checkNotNull(rewriterOptions, "Options map should no be
null");
+ Preconditions.checkNotNull(branch, "Branch should no be null");
this.tableName = tableName;
this.taskName = taskName;
@@ -86,6 +90,7 @@ public class DataFileRewritePlanner
this.maxRewriteBytes = maxRewriteBytes;
this.rewriterOptions = rewriterOptions;
this.filter = filter;
+ this.branch = branch;
}
@Override
@@ -108,7 +113,8 @@ public class DataFileRewritePlanner
try {
SerializableTable table =
(SerializableTable)
SerializableTable.copyOf(tableLoader.loadTable());
- if (table.currentSnapshot() == null) {
+ Snapshot snapshot = table.snapshot(branch);
+ if (snapshot == null) {
LOG.info(
DataFileRewritePlanner.MESSAGE_PREFIX + "Nothing to plan for in an
empty table",
tableName,
@@ -118,7 +124,8 @@ public class DataFileRewritePlanner
return;
}
- BinPackRewriteFilePlanner planner = new BinPackRewriteFilePlanner(table,
filter);
+ BinPackRewriteFilePlanner planner =
+ new BinPackRewriteFilePlanner(table, filter, snapshot.snapshotId(),
false);
planner.init(rewriterOptions);
FileRewritePlan<RewriteDataFiles.FileGroupInfo, FileScanTask, DataFile,
RewriteFileGroup>
@@ -164,7 +171,7 @@ public class DataFileRewritePlanner
taskIndex,
ctx.timestamp(),
group);
- out.collect(new PlannedGroup(table, groupsPerCommit, group));
+ out.collect(new PlannedGroup(table, groupsPerCommit, group, branch));
}
} catch (Exception e) {
LOG.warn(
@@ -189,11 +196,14 @@ public class DataFileRewritePlanner
private final SerializableTable table;
private final int groupsPerCommit;
private final RewriteFileGroup group;
+ private final String branch;
- private PlannedGroup(SerializableTable table, int groupsPerCommit,
RewriteFileGroup group) {
+ private PlannedGroup(
+ SerializableTable table, int groupsPerCommit, RewriteFileGroup group,
String branch) {
this.table = table;
this.groupsPerCommit = groupsPerCommit;
this.group = group;
+ this.branch = branch;
}
SerializableTable table() {
@@ -207,5 +217,9 @@ public class DataFileRewritePlanner
RewriteFileGroup group() {
return group;
}
+
+ String branch() {
+ return branch;
+ }
}
}
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java
index 1e8db128e9..f7a53c7fca 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java
@@ -124,7 +124,7 @@ public class DataFileRewriteRunner
value.group().setOutputFiles(dataFiles);
out.collect(
new ExecutedGroup(
- value.table().currentSnapshot().snapshotId(),
+ value.table().snapshot(value.branch()).snapshotId(),
value.groupsPerCommit(),
value.group()));
if (LOG.isDebugEnabled()) {
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
index 0d3e4a34d9..b1a58a6d3a 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
@@ -769,7 +769,10 @@ public class IcebergSink
if (flinkWriteConf.compactMode()) {
RewriteDataFilesConfig rewriteDataFilesConfig =
flinkMaintenanceConfig.createRewriteDataFilesConfig();
-
maintenanceTasks.add(RewriteDataFiles.builder().config(rewriteDataFilesConfig));
+ maintenanceTasks.add(
+ RewriteDataFiles.builder()
+ .branch(flinkWriteConf.branch())
+ .config(rewriteDataFilesConfig));
}
if (flinkWriteConf.expireSnapshotsMode()) {
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
index 707038c925..bb53b52656 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
@@ -529,6 +529,41 @@ class TestRewriteDataFiles extends MaintenanceTaskTestBase
{
createRecord(4, "d")));
}
+ @Test
+ void testBranch() throws Exception {
+ Table table = createTable();
+ insert(table, 1, "a");
+ insert(table, 2, "b");
+
+ // Create branch based on above inserts
+ String branchName = "test-branch";
+ table.manageSnapshots().createBranch(branchName).commit();
+
+ // Insert another file on main only (main has 3 files, branch stays at 2)
+ insert(table, 3, "c");
+
+
appendRewriteDataFiles(RewriteDataFiles.builder().rewriteAll(true).branch(branchName));
+
+ runAndWaitForSuccess(infra.env(), infra.source(), infra.sink());
+
+ table.refresh();
+
+ // Branch should be compacted from 2 files to 1
+ assertThat(
+ table.snapshot(branchName).dataManifests(table.io()).stream()
+ .flatMap(
+ m ->
+ StreamSupport.stream(
+ ManifestFiles.read(m, table.io(),
table.specs()).spliterator(), false))
+ .count())
+ .isEqualTo(1);
+ SimpleDataUtil.assertTableRecords(
+ table, ImmutableList.of(createRecord(1, "a"), createRecord(2, "b")),
branchName);
+
+ // Main should be untouched with 3 files
+ assertFileNum(table, 3, 0);
+ }
+
private void appendRewriteDataFiles() {
appendRewriteDataFiles(RewriteDataFiles.builder().rewriteAll(true));
}
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java
index 95992ccd97..9b19a50b09 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java
@@ -27,6 +27,7 @@ import java.util.Set;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses;
import org.apache.iceberg.DataFile;
+import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.flink.TableLoader;
@@ -55,7 +56,8 @@ class RewriteUtil {
11,
10_000_000L,
rewriterOptions,
- Expressions.alwaysTrue()))) {
+ Expressions.alwaysTrue(),
+ SnapshotRef.MAIN_BRANCH))) {
testHarness.open();
OperatorTestBase.trigger(testHarness);
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteCommitter.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteCommitter.java
index 9e8f2ec921..cdd40eb2a2 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteCommitter.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteCommitter.java
@@ -31,6 +31,7 @@ import java.util.stream.Collectors;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
+import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.maintenance.api.Trigger;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
@@ -225,7 +226,8 @@ class TestDataFileRewriteCommitter extends OperatorTestBase
{
OperatorTestBase.DUMMY_TABLE_NAME,
OperatorTestBase.DUMMY_TABLE_NAME,
0,
- tableLoader()));
+ tableLoader(),
+ SnapshotRef.MAIN_BRANCH));
}
private static DataFileRewriteRunner.ExecutedGroup setBatchSizeToTwo(
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
index cb1a41bb43..16d524f05c 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
@@ -33,6 +33,7 @@ import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.flink.maintenance.api.Trigger;
@@ -106,7 +107,8 @@ class TestDataFileRewritePlanner extends OperatorTestBase {
11,
1L,
ImmutableMap.of(MIN_INPUT_FILES, "2"),
- Expressions.alwaysTrue()))) {
+ Expressions.alwaysTrue(),
+ SnapshotRef.MAIN_BRANCH))) {
testHarness.open();
// Cause an exception
@@ -172,7 +174,8 @@ class TestDataFileRewritePlanner extends OperatorTestBase {
11,
maxRewriteBytes,
ImmutableMap.of(MIN_INPUT_FILES, "2"),
- Expressions.alwaysTrue()))) {
+ Expressions.alwaysTrue(),
+ SnapshotRef.MAIN_BRANCH))) {
testHarness.open();
OperatorTestBase.trigger(testHarness);
@@ -202,6 +205,44 @@ class TestDataFileRewritePlanner extends OperatorTestBase {
assertThat(planWithMaxFileGroupCount).hasSize(3);
}
+ @Test
+ void testBranch() throws Exception {
+ Table table = createTable();
+ insert(table, 1, "a");
+ insert(table, 2, "b");
+
+ String branchName = "test-branch";
+ table.manageSnapshots().createBranch(branchName).commit();
+
+ // Insert more data on main only
+ insert(table, 3, "c");
+
+ try (OneInputStreamOperatorTestHarness<Trigger,
DataFileRewritePlanner.PlannedGroup>
+ testHarness =
+ ProcessFunctionTestHarnesses.forProcessFunction(
+ new DataFileRewritePlanner(
+ OperatorTestBase.DUMMY_TABLE_NAME,
+ OperatorTestBase.DUMMY_TABLE_NAME,
+ 0,
+ tableLoader(),
+ 11,
+ 10_000_000L,
+ ImmutableMap.of(MIN_INPUT_FILES, "2"),
+ Expressions.alwaysTrue(),
+ branchName))) {
+ testHarness.open();
+
+ trigger(testHarness);
+
+
assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).isNull();
+ List<DataFileRewritePlanner.PlannedGroup> planned =
testHarness.extractOutputValues();
+ assertThat(planned).hasSize(1);
+ // Branch has 2 files, main has 3
+ assertThat(planned.get(0).group().fileScanTasks()).hasSize(2);
+ assertThat(planned.get(0).branch()).isEqualTo(branchName);
+ }
+ }
+
void assertRewriteFileGroup(
DataFileRewritePlanner.PlannedGroup plannedGroup, Table table,
Set<DataFile> files) {
assertThat(plannedGroup.table().currentSnapshot().snapshotId())
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java
index 4e21c7a956..9202a1df92 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java
@@ -38,6 +38,7 @@ import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionData;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.GenericAppenderHelper;
@@ -308,7 +309,8 @@ class TestDataFileRewriteRunner extends OperatorTestBase {
"2",
TARGET_FILE_SIZE_BYTES,
String.valueOf(targetFileSize)),
- Expressions.alwaysTrue()))) {
+ Expressions.alwaysTrue(),
+ SnapshotRef.MAIN_BRANCH))) {
testHarness.open();
OperatorTestBase.trigger(testHarness);
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
index 3b64a79eee..9aeee75b14 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.actions.BinPackRewriteFilePlanner;
import org.apache.iceberg.actions.SizeBasedFileRewritePlanner;
import org.apache.iceberg.expressions.Expression;
@@ -59,6 +60,7 @@ public class RewriteDataFiles {
private final Map<String, String> rewriteOptions =
Maps.newHashMapWithExpectedSize(6);
private long maxRewriteBytes = Long.MAX_VALUE;
private Expression filter = Expressions.alwaysTrue();
+ private String branch = SnapshotRef.MAIN_BRANCH;
@Override
String maintenanceTaskName() {
@@ -218,6 +220,18 @@ public class RewriteDataFiles {
return this;
}
+ /**
+ * Sets the branch to compact. When set, the planner reads from the
branch's snapshot and
+ * commits are made to this branch.
+ *
+ * @param newBranch the branch name
+ * @return this for method chaining
+ */
+ public Builder branch(String newBranch) {
+ this.branch = newBranch;
+ return this;
+ }
+
/**
* Configures the properties for the rewriter.
*
@@ -262,7 +276,8 @@ public class RewriteDataFiles {
partialProgressEnabled ? partialProgressMaxCommits : 1,
maxRewriteBytes,
rewriteOptions,
- filter))
+ filter,
+ branch))
.name(operatorName(PLANNER_TASK_NAME))
.uid(PLANNER_TASK_NAME + uidSuffix())
.slotSharingGroup(slotSharingGroup())
@@ -282,7 +297,8 @@ public class RewriteDataFiles {
.transform(
operatorName(COMMIT_TASK_NAME),
TypeInformation.of(Trigger.class),
- new DataFileRewriteCommitter(tableName(), taskName(),
index(), tableLoader()))
+ new DataFileRewriteCommitter(
+ tableName(), taskName(), index(), tableLoader(), branch))
.uid(COMMIT_TASK_NAME + uidSuffix())
.slotSharingGroup(slotSharingGroup())
.forceNonParallel();
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteCommitter.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteCommitter.java
index 135d3d9b42..1125e5d9b6 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteCommitter.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteCommitter.java
@@ -29,12 +29,14 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFiles;
import org.apache.iceberg.actions.RewriteDataFilesCommitManager;
import org.apache.iceberg.actions.RewriteDataFilesCommitManager.CommitService;
import org.apache.iceberg.actions.RewriteFileGroup;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.maintenance.api.Trigger;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,6 +54,7 @@ public class DataFileRewriteCommitter extends
AbstractStreamOperator<Trigger>
private final String taskName;
private final int taskIndex;
private final TableLoader tableLoader;
+ private final String branch;
private transient Table table;
private transient CommitService commitService;
@@ -62,15 +65,17 @@ public class DataFileRewriteCommitter extends
AbstractStreamOperator<Trigger>
private transient Counter removedDataFileSizeCounter;
public DataFileRewriteCommitter(
- String tableName, String taskName, int taskIndex, TableLoader
tableLoader) {
+ String tableName, String taskName, int taskIndex, TableLoader
tableLoader, String branch) {
Preconditions.checkNotNull(tableName, "Table name should no be null");
Preconditions.checkNotNull(taskName, "Task name should no be null");
Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
+ Preconditions.checkNotNull(branch, "Branch should not be null");
this.tableName = tableName;
this.taskName = taskName;
this.taskIndex = taskIndex;
this.tableLoader = tableLoader;
+ this.branch = branch;
}
@Override
@@ -103,7 +108,7 @@ public class DataFileRewriteCommitter extends
AbstractStreamOperator<Trigger>
FlinkRewriteDataFilesCommitManager commitManager =
new FlinkRewriteDataFilesCommitManager(
- table, executedGroup.snapshotId(),
streamRecord.getTimestamp());
+ table, executedGroup.snapshotId(),
streamRecord.getTimestamp(), branch);
this.commitService =
commitManager.service(executedGroup.groupsPerCommit());
commitService.start();
}
@@ -164,8 +169,14 @@ public class DataFileRewriteCommitter extends
AbstractStreamOperator<Trigger>
private class FlinkRewriteDataFilesCommitManager extends
RewriteDataFilesCommitManager {
private final long timestamp;
- FlinkRewriteDataFilesCommitManager(Table table, long startingSnapshotId,
long timestamp) {
- super(table, startingSnapshotId);
+ FlinkRewriteDataFilesCommitManager(
+ Table table, long startingSnapshotId, long timestamp, String branch) {
+ super(
+ table,
+ startingSnapshotId,
+ RewriteDataFiles.USE_STARTING_SEQUENCE_NUMBER_DEFAULT,
+ ImmutableMap.of(),
+ branch);
this.timestamp = timestamp;
}
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java
index c50060e16a..a9360374df 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java
@@ -29,6 +29,7 @@ import org.apache.flink.util.Collector;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
import org.apache.iceberg.actions.BinPackRewriteFilePlanner;
import org.apache.iceberg.actions.FileRewritePlan;
import org.apache.iceberg.actions.RewriteDataFiles;
@@ -62,6 +63,7 @@ public class DataFileRewritePlanner
private final Map<String, String> rewriterOptions;
private transient Counter errorCounter;
private final Expression filter;
+ private final String branch;
public DataFileRewritePlanner(
String tableName,
@@ -71,12 +73,14 @@ public class DataFileRewritePlanner
int newPartialProgressMaxCommits,
long maxRewriteBytes,
Map<String, String> rewriterOptions,
- Expression filter) {
+ Expression filter,
+ String branch) {
Preconditions.checkNotNull(tableName, "Table name should no be null");
Preconditions.checkNotNull(taskName, "Task name should no be null");
Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
Preconditions.checkNotNull(rewriterOptions, "Options map should no be
null");
+ Preconditions.checkNotNull(branch, "Branch should no be null");
this.tableName = tableName;
this.taskName = taskName;
@@ -86,6 +90,7 @@ public class DataFileRewritePlanner
this.maxRewriteBytes = maxRewriteBytes;
this.rewriterOptions = rewriterOptions;
this.filter = filter;
+ this.branch = branch;
}
@Override
@@ -108,7 +113,8 @@ public class DataFileRewritePlanner
try {
SerializableTable table =
(SerializableTable)
SerializableTable.copyOf(tableLoader.loadTable());
- if (table.currentSnapshot() == null) {
+ Snapshot snapshot = table.snapshot(branch);
+ if (snapshot == null) {
LOG.info(
DataFileRewritePlanner.MESSAGE_PREFIX + "Nothing to plan for in an
empty table",
tableName,
@@ -118,7 +124,8 @@ public class DataFileRewritePlanner
return;
}
- BinPackRewriteFilePlanner planner = new BinPackRewriteFilePlanner(table,
filter);
+ BinPackRewriteFilePlanner planner =
+ new BinPackRewriteFilePlanner(table, filter, snapshot.snapshotId(),
false);
planner.init(rewriterOptions);
FileRewritePlan<RewriteDataFiles.FileGroupInfo, FileScanTask, DataFile,
RewriteFileGroup>
@@ -164,7 +171,7 @@ public class DataFileRewritePlanner
taskIndex,
ctx.timestamp(),
group);
- out.collect(new PlannedGroup(table, groupsPerCommit, group));
+ out.collect(new PlannedGroup(table, groupsPerCommit, group, branch));
}
} catch (Exception e) {
LOG.warn(
@@ -189,11 +196,14 @@ public class DataFileRewritePlanner
private final SerializableTable table;
private final int groupsPerCommit;
private final RewriteFileGroup group;
+ private final String branch;
- private PlannedGroup(SerializableTable table, int groupsPerCommit,
RewriteFileGroup group) {
+ private PlannedGroup(
+ SerializableTable table, int groupsPerCommit, RewriteFileGroup group,
String branch) {
this.table = table;
this.groupsPerCommit = groupsPerCommit;
this.group = group;
+ this.branch = branch;
}
SerializableTable table() {
@@ -207,5 +217,9 @@ public class DataFileRewritePlanner
RewriteFileGroup group() {
return group;
}
+
+ String branch() {
+ return branch;
+ }
}
}
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java
index 57b0e53d86..6fbfacf9f6 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java
@@ -124,7 +124,7 @@ public class DataFileRewriteRunner
value.group().setOutputFiles(dataFiles);
out.collect(
new ExecutedGroup(
- value.table().currentSnapshot().snapshotId(),
+ value.table().snapshot(value.branch()).snapshotId(),
value.groupsPerCommit(),
value.group()));
if (LOG.isDebugEnabled()) {
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
index 9f90d8fd35..eaaf4ea6e4 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
@@ -769,7 +769,10 @@ public class IcebergSink
if (flinkWriteConf.compactMode()) {
RewriteDataFilesConfig rewriteDataFilesConfig =
flinkMaintenanceConfig.createRewriteDataFilesConfig();
-
maintenanceTasks.add(RewriteDataFiles.builder().config(rewriteDataFilesConfig));
+ maintenanceTasks.add(
+ RewriteDataFiles.builder()
+ .branch(flinkWriteConf.branch())
+ .config(rewriteDataFilesConfig));
}
if (flinkWriteConf.expireSnapshotsMode()) {
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
index 707038c925..bb53b52656 100644
---
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
@@ -529,6 +529,41 @@ class TestRewriteDataFiles extends MaintenanceTaskTestBase
{
createRecord(4, "d")));
}
+ @Test
+ void testBranch() throws Exception {
+ Table table = createTable();
+ insert(table, 1, "a");
+ insert(table, 2, "b");
+
+ // Create branch based on above inserts
+ String branchName = "test-branch";
+ table.manageSnapshots().createBranch(branchName).commit();
+
+ // Insert another file on main only (main has 3 files, branch stays at 2)
+ insert(table, 3, "c");
+
+
appendRewriteDataFiles(RewriteDataFiles.builder().rewriteAll(true).branch(branchName));
+
+ runAndWaitForSuccess(infra.env(), infra.source(), infra.sink());
+
+ table.refresh();
+
+ // Branch should be compacted from 2 files to 1
+ assertThat(
+ table.snapshot(branchName).dataManifests(table.io()).stream()
+ .flatMap(
+ m ->
+ StreamSupport.stream(
+ ManifestFiles.read(m, table.io(),
table.specs()).spliterator(), false))
+ .count())
+ .isEqualTo(1);
+ SimpleDataUtil.assertTableRecords(
+ table, ImmutableList.of(createRecord(1, "a"), createRecord(2, "b")),
branchName);
+
+ // Main should be untouched with 3 files
+ assertFileNum(table, 3, 0);
+ }
+
private void appendRewriteDataFiles() {
appendRewriteDataFiles(RewriteDataFiles.builder().rewriteAll(true));
}
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java
index 95992ccd97..9b19a50b09 100644
---
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java
@@ -27,6 +27,7 @@ import java.util.Set;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses;
import org.apache.iceberg.DataFile;
+import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.flink.TableLoader;
@@ -55,7 +56,8 @@ class RewriteUtil {
11,
10_000_000L,
rewriterOptions,
- Expressions.alwaysTrue()))) {
+ Expressions.alwaysTrue(),
+ SnapshotRef.MAIN_BRANCH))) {
testHarness.open();
OperatorTestBase.trigger(testHarness);
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteCommitter.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteCommitter.java
index 9e8f2ec921..cdd40eb2a2 100644
---
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteCommitter.java
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteCommitter.java
@@ -31,6 +31,7 @@ import java.util.stream.Collectors;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
+import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.maintenance.api.Trigger;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
@@ -225,7 +226,8 @@ class TestDataFileRewriteCommitter extends OperatorTestBase
{
OperatorTestBase.DUMMY_TABLE_NAME,
OperatorTestBase.DUMMY_TABLE_NAME,
0,
- tableLoader()));
+ tableLoader(),
+ SnapshotRef.MAIN_BRANCH));
}
private static DataFileRewriteRunner.ExecutedGroup setBatchSizeToTwo(
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
index cb1a41bb43..16d524f05c 100644
---
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
@@ -33,6 +33,7 @@ import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.flink.maintenance.api.Trigger;
@@ -106,7 +107,8 @@ class TestDataFileRewritePlanner extends OperatorTestBase {
11,
1L,
ImmutableMap.of(MIN_INPUT_FILES, "2"),
- Expressions.alwaysTrue()))) {
+ Expressions.alwaysTrue(),
+ SnapshotRef.MAIN_BRANCH))) {
testHarness.open();
// Cause an exception
@@ -172,7 +174,8 @@ class TestDataFileRewritePlanner extends OperatorTestBase {
11,
maxRewriteBytes,
ImmutableMap.of(MIN_INPUT_FILES, "2"),
- Expressions.alwaysTrue()))) {
+ Expressions.alwaysTrue(),
+ SnapshotRef.MAIN_BRANCH))) {
testHarness.open();
OperatorTestBase.trigger(testHarness);
@@ -202,6 +205,44 @@ class TestDataFileRewritePlanner extends OperatorTestBase {
assertThat(planWithMaxFileGroupCount).hasSize(3);
}
+ @Test
+ void testBranch() throws Exception {
+ Table table = createTable();
+ insert(table, 1, "a");
+ insert(table, 2, "b");
+
+ String branchName = "test-branch";
+ table.manageSnapshots().createBranch(branchName).commit();
+
+ // Insert more data on main only
+ insert(table, 3, "c");
+
+ try (OneInputStreamOperatorTestHarness<Trigger,
DataFileRewritePlanner.PlannedGroup>
+ testHarness =
+ ProcessFunctionTestHarnesses.forProcessFunction(
+ new DataFileRewritePlanner(
+ OperatorTestBase.DUMMY_TABLE_NAME,
+ OperatorTestBase.DUMMY_TABLE_NAME,
+ 0,
+ tableLoader(),
+ 11,
+ 10_000_000L,
+ ImmutableMap.of(MIN_INPUT_FILES, "2"),
+ Expressions.alwaysTrue(),
+ branchName))) {
+ testHarness.open();
+
+ trigger(testHarness);
+
+
assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).isNull();
+ List<DataFileRewritePlanner.PlannedGroup> planned =
testHarness.extractOutputValues();
+ assertThat(planned).hasSize(1);
+ // Branch has 2 files, main has 3
+ assertThat(planned.get(0).group().fileScanTasks()).hasSize(2);
+ assertThat(planned.get(0).branch()).isEqualTo(branchName);
+ }
+ }
+
void assertRewriteFileGroup(
DataFileRewritePlanner.PlannedGroup plannedGroup, Table table,
Set<DataFile> files) {
assertThat(plannedGroup.table().currentSnapshot().snapshotId())
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java
index 4e21c7a956..9202a1df92 100644
---
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java
@@ -38,6 +38,7 @@ import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionData;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.GenericAppenderHelper;
@@ -308,7 +309,8 @@ class TestDataFileRewriteRunner extends OperatorTestBase {
"2",
TARGET_FILE_SIZE_BYTES,
String.valueOf(targetFileSize)),
- Expressions.alwaysTrue()))) {
+ Expressions.alwaysTrue(),
+ SnapshotRef.MAIN_BRANCH))) {
testHarness.open();
OperatorTestBase.trigger(testHarness);