This is an automated email from the ASF dual-hosted git repository.
liguojun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 705c9590f [Core]Support merge branch (#2862)
705c9590f is described below
commit 705c9590f576278d20cb00da62319ade70d3794d
Author: TaoZex <[email protected]>
AuthorDate: Thu Jun 13 22:54:16 2024 +0800
[Core]Support merge branch (#2862)
* [Core]Support merge branch
---
.../src/main/java/org/apache/paimon/fs/FileIO.java | 20 ++++
.../paimon/privilege/PrivilegedFileStoreTable.java | 6 +
.../java/org/apache/paimon/schema/TableSchema.java | 12 ++
.../paimon/table/AbstractFileStoreTable.java | 5 +
.../org/apache/paimon/table/ReadonlyTable.java | 8 ++
.../main/java/org/apache/paimon/table/Table.java | 4 +
.../org/apache/paimon/utils/BranchManager.java | 75 ++++++++++++
.../org/apache/paimon/utils/SnapshotManager.java | 6 +
.../paimon/table/FileStoreTableTestBase.java | 131 ++++++++++++++++++++-
.../paimon/table/PrimaryKeyFileStoreTableTest.java | 18 +--
.../flink/procedure/MergeBranchProcedure.java | 54 +++++++++
.../services/org.apache.paimon.factories.Factory | 1 +
.../paimon/flink/action/BranchActionITCase.java | 113 ++++++++++++++++--
13 files changed, 437 insertions(+), 16 deletions(-)
diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
index 55aa9b825..f31b00af1 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
@@ -175,6 +175,12 @@ public interface FileIO extends Serializable {
}
}
+ default void deleteFilesQuietly(List<Path> files) {
+ for (Path file : files) {
+ deleteQuietly(file);
+ }
+ }
+
default void deleteDirectoryQuietly(Path directory) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ready to delete " + directory.toString());
@@ -272,6 +278,20 @@ public interface FileIO extends Serializable {
return writeFileUtf8(targetPath, content);
}
+ /** Copy all files in sourceDirectory to directory targetDirectory. */
+ default void copyFilesUtf8(Path sourceDirectory, Path targetDirectory)
throws IOException {
+ FileStatus[] fileStatuses = listStatus(sourceDirectory);
+ List<Path> copyFiles =
+ Arrays.stream(fileStatuses)
+ .map(fileStatus -> fileStatus.getPath())
+ .collect(Collectors.toList());
+ for (Path file : copyFiles) {
+ String fileName = file.getName();
+ Path targetPath = new Path(targetDirectory.toString() + "/" +
fileName);
+ copyFileUtf8(file, targetPath);
+ }
+ }
+
/** Read file from {@link #overwriteFileUtf8} file. */
default Optional<String> readOverwrittenFileUtf8(Path path) throws
IOException {
int retryNumber = 0;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
index fe0c65728..548ae69ee 100644
---
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
@@ -204,6 +204,12 @@ public class PrivilegedFileStoreTable implements
FileStoreTable {
wrapped.deleteBranch(branchName);
}
+ @Override
+ public void mergeBranch(String branchName) {
+ privilegeChecker.assertCanInsert(identifier);
+ wrapped.mergeBranch(branchName);
+ }
+
@Override
public void replaceBranch(String fromBranch) {
privilegeChecker.assertCanInsert(identifier);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java
b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java
index 2d86c96c1..4f94c6470 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java
@@ -18,6 +18,8 @@
package org.apache.paimon.schema;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.JsonSerdeUtil;
@@ -26,6 +28,7 @@ import org.apache.paimon.utils.StringUtils;
import javax.annotation.Nullable;
+import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
@@ -286,6 +289,15 @@ public class TableSchema implements Serializable {
return JsonSerdeUtil.fromJson(json, TableSchema.class);
}
+ public static TableSchema fromPath(FileIO fileIO, Path path) {
+ try {
+ String json = fileIO.readFileUtf8(path);
+ return TableSchema.fromJson(json);
+ } catch (IOException e) {
+ throw new RuntimeException("Fails to read schema from path " +
path, e);
+ }
+ }
+
@Override
public String toString() {
return JsonSerdeUtil.toJson(this);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index bf8857e72..cddc1d7bf 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -524,6 +524,11 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
branchManager().deleteBranch(branchName);
}
+ @Override
+ public void mergeBranch(String branchName) {
+ branchManager().mergeBranch(branchName);
+ }
+
@Override
public void replaceBranch(String fromBranch) {
branchManager().replaceBranch(fromBranch);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
index dc50a83d9..70023d555 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
@@ -182,6 +182,14 @@ public interface ReadonlyTable extends InnerTable {
this.getClass().getSimpleName()));
}
+ @Override
+ default void mergeBranch(String branchName) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Readonly Table %s does not support mergeBranch.",
+ this.getClass().getSimpleName()));
+ }
+
@Override
default void replaceBranch(String fromBranch) {
throw new UnsupportedOperationException(
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java
b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
index d01ecc95c..227f2836d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
@@ -111,6 +111,10 @@ public interface Table extends Serializable {
@Experimental
void deleteBranch(String branchName);
+ /** Merge a branch to main branch. */
+ @Experimental
+ void mergeBranch(String branchName);
+
@Experimental
void replaceBranch(String fromBranch);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
index 9742d63ac..63c33e593 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
@@ -21,6 +21,7 @@ package org.apache.paimon.utils;
import org.apache.paimon.Snapshot;
import org.apache.paimon.branch.TableBranch;
import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
@@ -41,8 +42,10 @@ import java.util.PriorityQueue;
import java.util.Set;
import java.util.SortedMap;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.apache.paimon.utils.FileUtils.listVersionedDirectories;
+import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Manager for {@code Branch}. */
@@ -353,6 +356,78 @@ public class BranchManager {
}
}
+ public void mergeBranch(String branchName) {
+ checkArgument(
+ !branchName.equals(DEFAULT_MAIN_BRANCH),
+ "Branch name '%s' do not use in merge branch.",
+ branchName);
+ checkArgument(!StringUtils.isBlank(branchName), "Branch name '%s' is
blank.", branchName);
+ checkArgument(branchExists(branchName), "Branch name '%s' doesn't
exist.", branchName);
+
+ Long earliestSnapshotId =
snapshotManager.copyWithBranch(branchName).earliestSnapshotId();
+ Snapshot earliestSnapshot =
+
snapshotManager.copyWithBranch(branchName).snapshot(earliestSnapshotId);
+ long earliestSchemaId = earliestSnapshot.schemaId();
+
+ try {
+ // Delete snapshot, schema, and tag from the main branch which
occurs after
+ // earliestSnapshotId
+ List<Path> deleteSnapshotPaths =
+ listVersionedFileStatus(
+ fileIO,
snapshotManager.snapshotDirectory(), "snapshot-")
+ .map(FileStatus::getPath)
+ .filter(
+ path ->
+ Snapshot.fromPath(fileIO,
path).id()
+ >= earliestSnapshotId)
+ .collect(Collectors.toList());
+ List<Path> deleteSchemaPaths =
+ listVersionedFileStatus(fileIO,
schemaManager.schemaDirectory(), "schema-")
+ .map(FileStatus::getPath)
+ .filter(
+ path ->
+ TableSchema.fromPath(fileIO,
path).id()
+ >= earliestSchemaId)
+ .collect(Collectors.toList());
+ List<Path> deleteTagPaths =
+ listVersionedFileStatus(fileIO, tagManager.tagDirectory(),
"tag-")
+ .map(FileStatus::getPath)
+ .filter(
+ path ->
+ Snapshot.fromPath(fileIO,
path).id()
+ >= earliestSnapshotId)
+ .collect(Collectors.toList());
+
+ List<Path> deletePaths =
+ Stream.concat(
+ Stream.concat(
+ deleteSnapshotPaths.stream(),
+ deleteSchemaPaths.stream()),
+ deleteTagPaths.stream())
+ .collect(Collectors.toList());
+
+ // Delete latest snapshot hint
+ snapshotManager.deleteLatestHint();
+
+ fileIO.deleteFilesQuietly(deletePaths);
+ fileIO.copyFilesUtf8(
+
snapshotManager.copyWithBranch(branchName).snapshotDirectory(),
+ snapshotManager.snapshotDirectory());
+ fileIO.copyFilesUtf8(
+ schemaManager.copyWithBranch(branchName).schemaDirectory(),
+ schemaManager.schemaDirectory());
+ fileIO.copyFilesUtf8(
+ tagManager.copyWithBranch(branchName).tagDirectory(),
+ tagManager.tagDirectory());
+ } catch (IOException e) {
+ throw new RuntimeException(
+ String.format(
+ "Exception occurs when merge branch '%s'
(directory in %s).",
+ branchName, getBranchPath(fileIO, tablePath,
branchName)),
+ e);
+ }
+ }
+
/** Check if a branch exists. */
public boolean branchExists(String branchName) {
Path branchPath = branchPath(branchName);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index 93442fd7c..af1d450db 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -634,6 +634,12 @@ public class SnapshotManager implements Serializable {
return listVersionedFiles(fileIO, dir,
prefix).reduce(reducer).orElse(null);
}
+ public void deleteLatestHint() throws IOException {
+ Path snapshotDir = snapshotDirectory();
+ Path hintFile = new Path(snapshotDir, LATEST);
+ fileIO.delete(hintFile, false);
+ }
+
public void commitLatestHint(long snapshotId) throws IOException {
commitHint(snapshotId, LATEST, snapshotDirectory());
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index 35b7666c9..6ff0bd441 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -1165,6 +1165,136 @@ public abstract class FileStoreTableTestBase {
"Branch name 'branch1' doesn't exist."));
}
+ @Test
+ public void testMergeBranch() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+ generateBranch(table);
+ FileStoreTable tableBranch = createFileStoreTable(BRANCH_NAME);
+
+ // Verify branch1 and the main branch have the same data
+ assertThat(
+ getResult(
+ tableBranch.newRead(),
+
toSplits(tableBranch.newSnapshotReader().read().dataSplits()),
+ BATCH_ROW_TO_STRING))
+
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
+
+ // Test for unsupported branch name
+ assertThatThrownBy(() -> table.mergeBranch("test-branch"))
+ .satisfies(
+ anyCauseMatches(
+ IllegalArgumentException.class,
+ "Branch name 'test-branch' doesn't exist."));
+
+ assertThatThrownBy(() -> table.mergeBranch("main"))
+ .satisfies(
+ anyCauseMatches(
+ IllegalArgumentException.class,
+ "Branch name 'main' do not use in merge
branch."));
+
+ // Write data to branch1
+ try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+ StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
+ write.write(rowData(2, 20, 200L));
+ commit.commit(1, write.prepareCommit(false, 2));
+ }
+
+ // Validate data in branch1
+ assertThat(
+ getResult(
+ tableBranch.newRead(),
+
toSplits(tableBranch.newSnapshotReader().read().dataSplits()),
+ BATCH_ROW_TO_STRING))
+ .containsExactlyInAnyOrder(
+ "0|0|0|binary|varbinary|mapKey:mapVal|multiset",
+ "2|20|200|binary|varbinary|mapKey:mapVal|multiset");
+
+ // Validate data in main branch not changed
+ assertThat(
+ getResult(
+ table.newRead(),
+
toSplits(table.newSnapshotReader().read().dataSplits()),
+ BATCH_ROW_TO_STRING))
+
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
+
+ // Merge branch1 to main branch
+ table.mergeBranch(BRANCH_NAME);
+
+ // After merge branch1, verify branch1 and the main branch have the
same data
+ assertThat(
+ getResult(
+ table.newRead(),
+
toSplits(table.newSnapshotReader().read().dataSplits()),
+ BATCH_ROW_TO_STRING))
+ .containsExactlyInAnyOrder(
+ "0|0|0|binary|varbinary|mapKey:mapVal|multiset",
+ "2|20|200|binary|varbinary|mapKey:mapVal|multiset");
+
+ // verify snapshot in branch1 and main branch is same
+ SnapshotManager snapshotManager = new SnapshotManager(new
TraceableFileIO(), tablePath);
+ Snapshot branchSnapshot =
+ Snapshot.fromPath(
+ new TraceableFileIO(),
+
snapshotManager.copyWithBranch(BRANCH_NAME).snapshotPath(2));
+ Snapshot snapshot =
+ Snapshot.fromPath(new TraceableFileIO(),
snapshotManager.snapshotPath(2));
+ assertThat(branchSnapshot.equals(snapshot)).isTrue();
+
+ // verify schema in branch1 and main branch is same
+ SchemaManager schemaManager = new SchemaManager(new TraceableFileIO(),
tablePath);
+ TableSchema branchSchema =
+ SchemaManager.fromPath(
+ new TraceableFileIO(),
+
schemaManager.copyWithBranch(BRANCH_NAME).toSchemaPath(0));
+ TableSchema schema0 = schemaManager.schema(0);
+ assertThat(branchSchema.equals(schema0)).isTrue();
+
+ // Write two rows data to branch1 again
+ try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+ StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
+ write.write(rowData(3, 30, 300L));
+ write.write(rowData(4, 40, 400L));
+ commit.commit(2, write.prepareCommit(false, 3));
+ }
+
+ // Verify data in branch1
+ assertThat(
+ getResult(
+ tableBranch.newRead(),
+
toSplits(tableBranch.newSnapshotReader().read().dataSplits()),
+ BATCH_ROW_TO_STRING))
+ .containsExactlyInAnyOrder(
+ "0|0|0|binary|varbinary|mapKey:mapVal|multiset",
+ "2|20|200|binary|varbinary|mapKey:mapVal|multiset",
+ "3|30|300|binary|varbinary|mapKey:mapVal|multiset",
+ "4|40|400|binary|varbinary|mapKey:mapVal|multiset");
+
+ // Verify data in main branch not changed
+ assertThat(
+ getResult(
+ table.newRead(),
+
toSplits(table.newSnapshotReader().read().dataSplits()),
+ BATCH_ROW_TO_STRING))
+ .containsExactlyInAnyOrder(
+ "0|0|0|binary|varbinary|mapKey:mapVal|multiset",
+ "2|20|200|binary|varbinary|mapKey:mapVal|multiset");
+
+ // Merge branch1 to main branch again
+ table.mergeBranch("branch1");
+
+ // Verify data in main branch is same to branch1
+ assertThat(
+ getResult(
+ table.newRead(),
+
toSplits(table.newSnapshotReader().read().dataSplits()),
+ BATCH_ROW_TO_STRING))
+ .containsExactlyInAnyOrder(
+ "0|0|0|binary|varbinary|mapKey:mapVal|multiset",
+ "2|20|200|binary|varbinary|mapKey:mapVal|multiset",
+ "3|30|300|binary|varbinary|mapKey:mapVal|multiset",
+ "4|40|400|binary|varbinary|mapKey:mapVal|multiset");
+ }
+
@Test
public void testUnsupportedTagName() throws Exception {
FileStoreTable table = createFileStoreTable();
@@ -1636,7 +1766,6 @@ public abstract class FileStoreTableTestBase {
table.createBranch(BRANCH_NAME, "tag1");
// verify that branch1 file exist
- TraceableFileIO fileIO = new TraceableFileIO();
BranchManager branchManager = table.branchManager();
assertThat(branchManager.branchExists(BRANCH_NAME)).isTrue();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index 095511ae1..eccb68221 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -1696,15 +1696,17 @@ public class PrimaryKeyFileStoreTableTest extends
FileStoreTableTestBase {
options.set(BUCKET, 1);
options.set(BRANCH, branch);
configure.accept(options);
+ TableSchema latestSchema =
+ new SchemaManager(LocalFileIO.create(),
tablePath).latest().get();
TableSchema tableSchema =
- SchemaUtils.forceCommit(
- new SchemaManager(LocalFileIO.create(), tablePath),
- new Schema(
- rowType.getFields(),
- Collections.singletonList("pt"),
- Arrays.asList("pt", "a"),
- options.toMap(),
- ""));
+ new TableSchema(
+ latestSchema.id(),
+ latestSchema.fields(),
+ latestSchema.highestFieldId(),
+ latestSchema.partitionKeys(),
+ latestSchema.primaryKeys(),
+ options.toMap(),
+ latestSchema.comment());
return new PrimaryKeyFileStoreTable(FileIOFinder.find(tablePath),
tablePath, tableSchema);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeBranchProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeBranchProcedure.java
new file mode 100644
index 000000000..e7eb3eb33
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeBranchProcedure.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.Table;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+
+/**
+ * Merge branch procedure for given branch. Usage:
+ *
+ * <pre><code>
+ * CALL sys.merge_branch('tableId', 'branchName')
+ * </code></pre>
+ */
+public class MergeBranchProcedure extends ProcedureBase {
+
+ public static final String IDENTIFIER = "merge_branch";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ public String[] call(ProcedureContext procedureContext, String tableId,
String branchName)
+ throws Catalog.TableNotExistException {
+ return innerCall(tableId, branchName);
+ }
+
+ private String[] innerCall(String tableId, String branchName)
+ throws Catalog.TableNotExistException {
+ Table table = catalog.getTable(Identifier.fromString(tableId));
+ table.mergeBranch(branchName);
+ return new String[] {"Success"};
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 38cb36713..192be6c14 100644
---
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -53,3 +53,4 @@
org.apache.paimon.flink.procedure.privilege.GrantPrivilegeToUserProcedure
org.apache.paimon.flink.procedure.privilege.RevokePrivilegeFromUserProcedure
org.apache.paimon.flink.procedure.RepairProcedure
org.apache.paimon.flink.procedure.ReplaceBranchProcedure
+org.apache.paimon.flink.procedure.MergeBranchProcedure
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
index 209b0d2e7..a41d46e38 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
@@ -227,13 +227,7 @@ class BranchActionITCase extends ActionITCaseBase {
writeData(rowData(i,
BinaryString.fromString(String.format("new.data_%s", i))));
}
- ReadBuilder readBuilder = table.newReadBuilder();
- TableScan.Plan plan = readBuilder.newScan().plan();
- List<String> result =
- getResult(
- readBuilder.newRead(),
- plan == null ? Collections.emptyList() : plan.splits(),
- rowType);
+ List<String> result = readTableData(table);
List<String> sortedActual = new ArrayList<>(result);
List<String> expected =
Arrays.asList(
@@ -255,4 +249,109 @@ class BranchActionITCase extends ActionITCaseBase {
String.format("CALL sys.create_tag('%s.%s', 'tag3', 3)",
database, tableName));
assertThat(tagManager.tagExists("tag3")).isTrue();
}
+
+ @Test
+ void testMergeBranch() throws Exception {
+ init(warehouse);
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.BIGINT(),
DataTypes.STRING()},
+ new String[] {"k", "v"});
+ FileStoreTable table =
+ createFileStoreTable(
+ rowType,
+ Collections.emptyList(),
+ Collections.singletonList("k"),
+ Collections.emptyList(),
+ Collections.emptyMap());
+
+ StreamWriteBuilder writeBuilder =
table.newStreamWriteBuilder().withCommitUser(commitUser);
+ write = writeBuilder.newWrite();
+ commit = writeBuilder.newCommit();
+
+ // 3 snapshots
+ writeData(rowData(1L, BinaryString.fromString("Hi")));
+ writeData(rowData(2L, BinaryString.fromString("Hello")));
+ writeData(rowData(3L, BinaryString.fromString("Paimon")));
+
+ // Create tag2
+ TagManager tagManager = new TagManager(table.fileIO(),
table.location());
+ callProcedure(
+ String.format("CALL sys.create_tag('%s.%s', 'tag2', 2)",
database, tableName));
+ assertThat(tagManager.tagExists("tag2")).isTrue();
+
+ // Create merge_branch_name branch
+ BranchManager branchManager = table.branchManager();
+ callProcedure(
+ String.format(
+ "CALL sys.create_branch('%s.%s', 'merge_branch_name',
'tag2')",
+ database, tableName));
+ assertThat(branchManager.branchExists("merge_branch_name")).isTrue();
+
+ // Merge branch
+ callProcedure(
+ String.format(
+ "CALL sys.merge_branch('%s.%s', 'merge_branch_name')",
+ database, tableName));
+
+ // Check snapshot
+ SnapshotManager snapshotManager = table.snapshotManager();
+ assertThat(snapshotManager.snapshotExists(3)).isFalse();
+
+ // Renew write
+ write = writeBuilder.newWrite();
+ commit = writeBuilder.newCommit();
+
+ // Add data, forward to merge branch
+ for (long i = 4; i < 14; i++) {
+ writeData(rowData(i,
BinaryString.fromString(String.format("new.data_%s", i))));
+ }
+
+ // Check main branch data
+ List<String> result = readTableData(table);
+ List<String> sortedActual = new ArrayList<>(result);
+ List<String> expected =
+ Arrays.asList(
+ "+I[1, Hi]",
+ "+I[2, Hello]",
+ "+I[4, new.data_4]",
+ "+I[5, new.data_5]",
+ "+I[6, new.data_6]",
+ "+I[7, new.data_7]",
+ "+I[8, new.data_8]",
+ "+I[9, new.data_9]",
+ "+I[10, new.data_10]",
+ "+I[11, new.data_11]",
+ "+I[12, new.data_12]",
+ "+I[13, new.data_13]");
+ Assert.assertEquals(expected, sortedActual);
+
+ // Merge branch again
+ callProcedure(
+ String.format(
+ "CALL sys.merge_branch('%s.%s', 'merge_branch_name')",
+ database, tableName));
+
+ // Check main branch data
+ result = readTableData(table);
+ sortedActual = new ArrayList<>(result);
+ expected = Arrays.asList("+I[1, Hi]", "+I[2, Hello]");
+ Assert.assertEquals(expected, sortedActual);
+ }
+
+ List<String> readTableData(FileStoreTable table) throws Exception {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.BIGINT(),
DataTypes.STRING()},
+ new String[] {"k", "v"});
+
+ ReadBuilder readBuilder = table.newReadBuilder();
+ TableScan.Plan plan = readBuilder.newScan().plan();
+ List<String> result =
+ getResult(
+ readBuilder.newRead(),
+ plan == null ? Collections.emptyList() : plan.splits(),
+ rowType);
+ return result;
+ }
}