This is an automated email from the ASF dual-hosted git repository.
liguojun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 483a69cba Implement replace branch in BranchManager (#2911)
483a69cba is described below
commit 483a69cbaa93aa71661dca014ecac82443a402ed
Author: Xiaojian Sun <[email protected]>
AuthorDate: Tue May 28 15:35:23 2024 +0800
Implement replace branch in BranchManager (#2911)
---
.../paimon/privilege/PrivilegedFileStoreTable.java | 6 +
.../org/apache/paimon/schema/SchemaManager.java | 11 +-
.../paimon/table/AbstractFileStoreTable.java | 5 +
.../org/apache/paimon/table/ReadonlyTable.java | 8 +
.../main/java/org/apache/paimon/table/Table.java | 3 +
.../org/apache/paimon/utils/BranchManager.java | 163 +++++++++++++++++++--
.../org/apache/paimon/utils/SnapshotManager.java | 33 ++---
.../java/org/apache/paimon/utils/TagManager.java | 13 +-
.../flink/procedure/ReplaceBranchProcedure.java | 54 +++++++
.../services/org.apache.paimon.factories.Factory | 1 +
.../paimon/flink/action/BranchActionITCase.java | 94 +++++++++++-
11 files changed, 341 insertions(+), 50 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
index 1881c7350..e4b09df38 100644
---
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
@@ -204,6 +204,12 @@ public class PrivilegedFileStoreTable implements
FileStoreTable {
wrapped.deleteBranch(branchName);
}
+ @Override
+ public void replaceBranch(String fromBranch) {
+ privilegeChecker.assertCanInsert(identifier);
+ wrapped.replaceBranch(fromBranch);
+ }
+
@Override
public ExpireSnapshots newExpireSnapshots() {
privilegeChecker.assertCanInsert(identifier);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 228d30d5c..cb478d7ba 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -68,7 +68,6 @@ import static
org.apache.paimon.catalog.AbstractCatalog.DB_SUFFIX;
import static org.apache.paimon.catalog.Identifier.UNKNOWN_DATABASE;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.BranchManager.getBranchPath;
-import static org.apache.paimon.utils.BranchManager.isMainBranch;
import static org.apache.paimon.utils.FileUtils.listVersionedFiles;
import static org.apache.paimon.utils.Preconditions.checkState;
@@ -500,17 +499,13 @@ public class SchemaManager implements Serializable {
}
public Path schemaDirectory() {
- return isMainBranch(branch)
- ? new Path(tableRoot + "/schema")
- : new Path(getBranchPath(tableRoot, branch) + "/schema");
+ return new Path(getBranchPath(fileIO, tableRoot, branch) + "/schema");
}
@VisibleForTesting
public Path toSchemaPath(long schemaId) {
- return isMainBranch(branch)
- ? new Path(tableRoot + "/schema/" + SCHEMA_PREFIX + schemaId)
- : new Path(
- getBranchPath(tableRoot, branch) + "/schema/" +
SCHEMA_PREFIX + schemaId);
+ return new Path(
+ getBranchPath(fileIO, tableRoot, branch) + "/schema/" +
SCHEMA_PREFIX + schemaId);
}
/**
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 655e81431..82cc47ad5 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -525,6 +525,11 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
branchManager().deleteBranch(branchName);
}
+ @Override
+ public void replaceBranch(String fromBranch) {
+ branchManager().replaceBranch(fromBranch);
+ }
+
@Override
public void rollbackTo(String tagName) {
TagManager tagManager = tagManager();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
index 42bea3f68..dcb62dfcb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
@@ -182,6 +182,14 @@ public interface ReadonlyTable extends InnerTable {
this.getClass().getSimpleName()));
}
+ @Override
+ default void replaceBranch(String fromBranch) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Readonly Table %s does not support replaceBranch.",
+ this.getClass().getSimpleName()));
+ }
+
@Override
default ExpireSnapshots newExpireSnapshots() {
throw new UnsupportedOperationException(
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java
b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
index 876908394..d01ecc95c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
@@ -111,6 +111,9 @@ public interface Table extends Serializable {
@Experimental
void deleteBranch(String branchName);
+ @Experimental
+ void replaceBranch(String fromBranch);
+
/** Manually expire snapshots, parameters can be controlled independently
of table options. */
@Experimental
ExpireSnapshots newExpireSnapshots();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
index 41099dfac..9742d63ac 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
@@ -33,9 +33,12 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.PriorityQueue;
+import java.util.Set;
import java.util.SortedMap;
import java.util.stream.Collectors;
@@ -49,6 +52,7 @@ public class BranchManager {
public static final String BRANCH_PREFIX = "branch-";
public static final String DEFAULT_MAIN_BRANCH = "main";
+ public static final String MAIN_BRANCH_FILE = "MAIN-BRANCH";
private final FileIO fileIO;
private final Path tablePath;
@@ -69,6 +73,12 @@ public class BranchManager {
this.schemaManager = schemaManager;
}
+ /** Commit specify branch to main. */
+ public void commitMainBranch(String branchName) throws IOException {
+ Path mainBranchFile = new Path(tablePath, MAIN_BRANCH_FILE);
+ fileIO.overwriteFileUtf8(mainBranchFile, branchName);
+ }
+
/** Return the root Directory of branch. */
public Path branchDirectory() {
return new Path(tablePath + "/branch");
@@ -79,13 +89,45 @@ public class BranchManager {
}
/** Return the path string of a branch. */
- public static String getBranchPath(Path tablePath, String branchName) {
- return tablePath.toString() + "/branch/" + BRANCH_PREFIX + branchName;
+ public static String getBranchPath(FileIO fileIO, Path tablePath, String
branch) {
+ if (isMainBranch(branch)) {
+ Path path = new Path(tablePath, MAIN_BRANCH_FILE);
+ try {
+ if (fileIO.exists(path)) {
+ String data = fileIO.readFileUtf8(path);
+ if (StringUtils.isBlank(data)) {
+ return tablePath.toString();
+ } else {
+ return tablePath.toString() + "/branch/" +
BRANCH_PREFIX + data;
+ }
+ } else {
+ return tablePath.toString();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return tablePath.toString() + "/branch/" + BRANCH_PREFIX + branch;
+ }
+
+ public String defaultMainBranch() {
+ Path path = new Path(tablePath, MAIN_BRANCH_FILE);
+ try {
+ if (fileIO.exists(path)) {
+ String data = fileIO.readFileUtf8(path);
+ if (!StringUtils.isBlank(data)) {
+ return data;
+ }
+ }
+ return DEFAULT_MAIN_BRANCH;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
/** Return the path of a branch. */
public Path branchPath(String branchName) {
- return new Path(getBranchPath(tablePath, branchName));
+ return new Path(getBranchPath(fileIO, tablePath, branchName));
}
/** Create empty branch. */
@@ -111,7 +153,7 @@ public class BranchManager {
throw new RuntimeException(
String.format(
"Exception occurs when create branch '%s'
(directory in %s).",
- branchName, getBranchPath(tablePath, branchName)),
+ branchName, getBranchPath(fileIO, tablePath,
branchName)),
e);
}
}
@@ -143,17 +185,17 @@ public class BranchManager {
throw new RuntimeException(
String.format(
"Exception occurs when create branch '%s'
(directory in %s).",
- branchName, getBranchPath(tablePath, branchName)),
+ branchName, getBranchPath(fileIO, tablePath,
branchName)),
e);
}
}
public void createBranch(String branchName, String tagName) {
+ String mainBranch = defaultMainBranch();
checkArgument(
!isMainBranch(branchName),
String.format(
- "Branch name '%s' is the default branch and cannot be
used.",
- DEFAULT_MAIN_BRANCH));
+ "Branch name '%s' is the default branch and cannot be
used.", mainBranch));
checkArgument(!StringUtils.isBlank(branchName), "Branch name '%s' is
blank.", branchName);
checkArgument(!branchExists(branchName), "Branch name '%s' already
exists.", branchName);
checkArgument(tagManager.tagExists(tagName), "Tag name '%s' not
exists.", tagName);
@@ -179,7 +221,7 @@ public class BranchManager {
throw new RuntimeException(
String.format(
"Exception occurs when create branch '%s'
(directory in %s).",
- branchName, getBranchPath(tablePath, branchName)),
+ branchName, getBranchPath(fileIO, tablePath,
branchName)),
e);
}
}
@@ -193,11 +235,111 @@ public class BranchManager {
LOG.info(
String.format(
"Deleting the branch failed due to an exception in
deleting the directory %s. Please try again.",
- getBranchPath(tablePath, branchName)),
+ getBranchPath(fileIO, tablePath, branchName)),
e);
}
}
+ /** Replace specify branch to main branch. */
+ public void replaceBranch(String branchName) {
+ String mainBranch = defaultMainBranch();
+ checkArgument(
+ !isMainBranch(branchName),
+ String.format(
+ "Branch name '%s' is the default main branch and
cannot be replaced repeatedly.",
+ mainBranch));
+ checkArgument(!StringUtils.isBlank(branchName), "Branch name '%s' is
blank.", branchName);
+ checkArgument(branchExists(branchName), "Branch name '%s' not
exists.", branchName);
+ try {
+ // 0. Cache previous tag,snapshot,schema directory.
+ Path tagDirectory = tagManager.tagDirectory();
+ Path snapshotDirectory = snapshotManager.snapshotDirectory();
+ Path schemaDirectory = schemaManager.schemaDirectory();
+ // 1. Calculate and copy the snapshots, tags and schemas which
should be copied from the
+ // main to branch.
+ calculateCopyMainToBranch(branchName);
+ // 2. Update the Main Branch File to the target branch.
+ commitMainBranch(branchName);
+ // 3.Drop the previous main branch, including snapshots, tags and
schemas.
+ dropPreviousMainBranch(tagDirectory, snapshotDirectory,
schemaDirectory);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /** Calculate copy main branch to target branch. */
+ private void calculateCopyMainToBranch(String branchName) throws
IOException {
+ TableBranch fromBranch =
+ this.branches().stream()
+ .filter(branch ->
branch.getBranchName().equals(branchName))
+ .findFirst()
+ .orElse(null);
+ if (fromBranch == null) {
+ throw new RuntimeException(String.format("No branches found %s",
branchName));
+ }
+ Snapshot fromSnapshot =
snapshotManager.snapshot(fromBranch.getCreatedFromSnapshot());
+ // Copy tags.
+ List<String> tags = tagManager.allTagNames();
+ TagManager branchTagManager = tagManager.copyWithBranch(branchName);
+ for (String tagName : tags) {
+ if (branchTagManager.tagExists(tagName)) {
+ // If it already exists, skip it directly.
+ continue;
+ }
+ Snapshot snapshot = tagManager.taggedSnapshot(tagName);
+ if (snapshot.id() < fromSnapshot.id()) {
+ fileIO.copyFileUtf8(tagManager.tagPath(tagName),
branchTagManager.tagPath(tagName));
+ }
+ }
+ // Copy snapshots.
+ Iterator<Snapshot> snapshots = snapshotManager.snapshots();
+ SnapshotManager branchSnapshotManager =
snapshotManager.copyWithBranch(branchName);
+ while (snapshots.hasNext()) {
+ Snapshot snapshot = snapshots.next();
+ if (snapshot.id() >= fromSnapshot.id()) {
+ continue;
+ }
+ if (branchSnapshotManager.snapshotExists(snapshot.id())) {
+ // If it already exists, skip it directly.
+ continue;
+ }
+ fileIO.copyFileUtf8(
+ snapshotManager.snapshotPath(snapshot.id()),
+ branchSnapshotManager.snapshotPath(snapshot.id()));
+ }
+
+ // Copy schemas.
+ List<Long> schemaIds = schemaManager.listAllIds();
+ SchemaManager branchSchemaManager =
schemaManager.copyWithBranch(branchName);
+ Set<Long> existsSchemas = new
HashSet<>(branchSchemaManager.listAllIds());
+
+ for (Long schemaId : schemaIds) {
+ if (existsSchemas.contains(schemaId)) {
+ // If it already exists, skip it directly.
+ continue;
+ }
+ TableSchema tableSchema = schemaManager.schema(schemaId);
+ if (tableSchema.id() < fromSnapshot.schemaId()) {
+ fileIO.copyFileUtf8(
+ schemaManager.toSchemaPath(schemaId),
+ branchSchemaManager.toSchemaPath(schemaId));
+ }
+ }
+ }
+
+ /** Directly delete snapshot, tag , schema directory. */
+ private void dropPreviousMainBranch(
+ Path tagDirectory, Path snapshotDirectory, Path schemaDirectory)
throws IOException {
+ // Delete tags.
+ fileIO.delete(tagDirectory, true);
+
+ // Delete snapshots.
+ fileIO.delete(snapshotDirectory, true);
+
+ // Delete schemas.
+ fileIO.delete(schemaDirectory, true);
+ }
+
/** Check if path exists. */
public boolean fileExists(Path path) {
try {
@@ -246,8 +388,7 @@ public class BranchManager {
}
FileStoreTable branchTable =
FileStoreTableFactory.create(
- fileIO, new Path(getBranchPath(tablePath,
branchName)));
-
+ fileIO, new Path(getBranchPath(fileIO,
tablePath, branchName)));
SortedMap<Snapshot, List<String>> snapshotTags =
branchTable.tagManager().tags();
Long earliestSnapshotId =
branchTable.snapshotManager().earliestSnapshotId();
if (snapshotTags.isEmpty()) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index 63679b86a..9813debe4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -47,7 +47,6 @@ import java.util.stream.Collectors;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.BranchManager.getBranchPath;
-import static org.apache.paimon.utils.BranchManager.isMainBranch;
import static org.apache.paimon.utils.FileUtils.listVersionedFiles;
/** Manager for {@link Snapshot}, providing utility methods related to paths
and snapshot hints. */
@@ -90,35 +89,27 @@ public class SnapshotManager implements Serializable {
}
public Path changelogDirectory() {
- return isMainBranch(branch)
- ? new Path(tablePath + "/changelog")
- : new Path(getBranchPath(tablePath, branch) + "/changelog");
+ return new Path(getBranchPath(fileIO, tablePath, branch) +
"/changelog");
}
public Path longLivedChangelogPath(long snapshotId) {
- return isMainBranch(branch)
- ? new Path(tablePath + "/changelog/" + CHANGELOG_PREFIX +
snapshotId)
- : new Path(
- getBranchPath(tablePath, branch)
- + "/changelog/"
- + CHANGELOG_PREFIX
- + snapshotId);
+ return new Path(
+ getBranchPath(fileIO, tablePath, branch)
+ + "/changelog/"
+ + CHANGELOG_PREFIX
+ + snapshotId);
}
public Path snapshotPath(long snapshotId) {
- return isMainBranch(branch)
- ? new Path(tablePath + "/snapshot/" + SNAPSHOT_PREFIX +
snapshotId)
- : new Path(
- getBranchPath(tablePath, branch)
- + "/snapshot/"
- + SNAPSHOT_PREFIX
- + snapshotId);
+ return new Path(
+ getBranchPath(fileIO, tablePath, branch)
+ + "/snapshot/"
+ + SNAPSHOT_PREFIX
+ + snapshotId);
}
public Path snapshotDirectory() {
- return isMainBranch(branch)
- ? new Path(tablePath + "/snapshot")
- : new Path(getBranchPath(tablePath, branch) + "/snapshot");
+ return new Path(getBranchPath(fileIO, tablePath, branch) +
"/snapshot");
}
public Snapshot snapshot(long snapshotId) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index c96bbdd56..6c59ef53c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -47,7 +47,6 @@ import java.util.stream.Collectors;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.BranchManager.getBranchPath;
-import static org.apache.paimon.utils.BranchManager.isMainBranch;
import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -79,16 +78,12 @@ public class TagManager {
/** Return the root Directory of tags. */
public Path tagDirectory() {
- return isMainBranch(branch)
- ? new Path(tablePath + "/tag")
- : new Path(getBranchPath(tablePath, branch) + "/tag");
+ return new Path(getBranchPath(fileIO, tablePath, branch) + "/tag");
}
- /** Return the path of a tag in branch. */
+ /** Return the path of a tag. */
public Path tagPath(String tagName) {
- return isMainBranch(branch)
- ? new Path(tablePath + "/tag/" + TAG_PREFIX + tagName)
- : new Path(getBranchPath(tablePath, branch) + "/tag/" +
TAG_PREFIX + tagName);
+ return new Path(getBranchPath(fileIO, tablePath, branch) + "/tag/" +
TAG_PREFIX + tagName);
}
/** Create a tag from given snapshot and save it in the storage. */
@@ -237,7 +232,7 @@ public class TagManager {
taggedSnapshot,
tagDeletion.manifestSkippingSet(skippedSnapshots));
}
- /** Check if a branch tag exists. */
+ /** Check if a tag exists. */
public boolean tagExists(String tagName) {
Path path = tagPath(tagName);
try {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ReplaceBranchProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ReplaceBranchProcedure.java
new file mode 100644
index 000000000..10ef4a67a
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ReplaceBranchProcedure.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.Table;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+
+/**
+ * Replace branch procedure for given branch. Usage:
+ *
+ * <pre><code>
+ * CALL sys.replace_branch('tableId', 'branchName')
+ * </code></pre>
+ */
+public class ReplaceBranchProcedure extends ProcedureBase {
+
+ public static final String IDENTIFIER = "replace_branch";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ public String[] call(ProcedureContext procedureContext, String tableId,
String branchName)
+ throws Catalog.TableNotExistException {
+ return innerCall(tableId, branchName);
+ }
+
+ private String[] innerCall(String tableId, String branchName)
+ throws Catalog.TableNotExistException {
+ Table table = catalog.getTable(Identifier.fromString(tableId));
+ table.replaceBranch(branchName);
+ return new String[] {"Success"};
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 848dd317d..33a43009d 100644
---
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -51,3 +51,4 @@
org.apache.paimon.flink.procedure.privilege.DropPrivilegedUserProcedure
org.apache.paimon.flink.procedure.privilege.GrantPrivilegeToUserProcedure
org.apache.paimon.flink.procedure.privilege.RevokePrivilegeFromUserProcedure
org.apache.paimon.flink.procedure.RepairProcedure
+org.apache.paimon.flink.procedure.ReplaceBranchProcedure
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
index 007d1ac5c..209b0d2e7 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
@@ -21,15 +21,22 @@ package org.apache.paimon.flink.action;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.StreamWriteBuilder;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BranchManager;
+import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
+import org.junit.Assert;
import org.junit.jupiter.api.Test;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init;
import static org.assertj.core.api.Assertions.assertThat;
@@ -115,7 +122,6 @@ class BranchActionITCase extends ActionITCaseBase {
"CALL sys.create_branch('%s.%s',
'branch_name_with_snapshotId', 2)",
database, tableName));
assertThat(branchManager.branchExists("branch_name_with_snapshotId")).isTrue();
- branchManager.branches();
callProcedure(
String.format(
@@ -163,4 +169,90 @@ class BranchActionITCase extends ActionITCaseBase {
database, tableName));
assertThat(branchManager.branchExists("empty_branch_name")).isFalse();
}
+
+ @Test
+ void testReplaceBranch() throws Exception {
+ init(warehouse);
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.BIGINT(),
DataTypes.STRING()},
+ new String[] {"k", "v"});
+ FileStoreTable table =
+ createFileStoreTable(
+ rowType,
+ Collections.emptyList(),
+ Collections.singletonList("k"),
+ Collections.emptyList(),
+ Collections.emptyMap());
+
+ StreamWriteBuilder writeBuilder =
table.newStreamWriteBuilder().withCommitUser(commitUser);
+ write = writeBuilder.newWrite();
+ commit = writeBuilder.newCommit();
+
+ // 3 snapshots
+ writeData(rowData(1L, BinaryString.fromString("Hi")));
+ writeData(rowData(2L, BinaryString.fromString("Hello")));
+ writeData(rowData(3L, BinaryString.fromString("Paimon")));
+
+ // Create tag2
+ TagManager tagManager = new TagManager(table.fileIO(),
table.location());
+ callProcedure(
+ String.format("CALL sys.create_tag('%s.%s', 'tag2', 2)",
database, tableName));
+ assertThat(tagManager.tagExists("tag2")).isTrue();
+
+ // Create replace_branch_name branch
+ BranchManager branchManager = table.branchManager();
+ callProcedure(
+ String.format(
+ "CALL sys.create_branch('%s.%s',
'replace_branch_name', 'tag2')",
+ database, tableName));
+ assertThat(branchManager.branchExists("replace_branch_name")).isTrue();
+
+ // Replace branch
+ callProcedure(
+ String.format(
+ "CALL sys.replace_branch('%s.%s',
'replace_branch_name')",
+ database, tableName));
+
+ // Check snapshot
+ SnapshotManager snapshotManager = table.snapshotManager();
+ assertThat(snapshotManager.snapshotExists(3)).isFalse();
+
+ // Renew write
+ write = writeBuilder.newWrite();
+ commit = writeBuilder.newCommit();
+
+ // Add data, forward to replace branch
+ for (long i = 4; i < 14; i++) {
+ writeData(rowData(i,
BinaryString.fromString(String.format("new.data_%s", i))));
+ }
+
+ ReadBuilder readBuilder = table.newReadBuilder();
+ TableScan.Plan plan = readBuilder.newScan().plan();
+ List<String> result =
+ getResult(
+ readBuilder.newRead(),
+ plan == null ? Collections.emptyList() : plan.splits(),
+ rowType);
+ List<String> sortedActual = new ArrayList<>(result);
+ List<String> expected =
+ Arrays.asList(
+ "+I[1, Hi]",
+ "+I[2, Hello]",
+ "+I[4, new.data_4]",
+ "+I[5, new.data_5]",
+ "+I[6, new.data_6]",
+ "+I[7, new.data_7]",
+ "+I[8, new.data_8]",
+ "+I[9, new.data_9]",
+ "+I[10, new.data_10]",
+ "+I[11, new.data_11]",
+ "+I[12, new.data_12]",
+ "+I[13, new.data_13]");
+ Assert.assertEquals(expected, sortedActual);
+
+ callProcedure(
+ String.format("CALL sys.create_tag('%s.%s', 'tag3', 3)",
database, tableName));
+ assertThat(tagManager.tagExists("tag3")).isTrue();
+ }
}