This is an automated email from the ASF dual-hosted git repository.
liguojun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4b878fe5d [Feature] Support create a empty branch and create a branch
based on snapshotId (#2938)
4b878fe5d is described below
commit 4b878fe5d6f6cf70b6c1a6acd1e158665bc30b95
Author: Xiaojian Sun <[email protected]>
AuthorDate: Wed Apr 24 16:02:28 2024 +0800
[Feature] Support create a empty branch and create a branch based on
snapshotId (#2938)
* support create empty branch and create a branch based on snapshotId
---
.../java/org/apache/paimon/branch/TableBranch.java | 14 ++++
.../paimon/table/AbstractFileStoreTable.java | 10 +++
.../org/apache/paimon/table/ReadonlyTable.java | 16 ++++
.../main/java/org/apache/paimon/table/Table.java | 8 ++
.../org/apache/paimon/utils/BranchManager.java | 90 ++++++++++++++++++++--
.../flink/procedure/CreateBranchProcedure.java | 24 +++++-
.../services/org.apache.paimon.factories.Factory | 2 +-
.../paimon/flink/action/BranchActionITCase.java | 80 +++++++++++++++++++
8 files changed, 235 insertions(+), 9 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/branch/TableBranch.java
b/paimon-core/src/main/java/org/apache/paimon/branch/TableBranch.java
index 4b24c866f..9b5b478fc 100644
--- a/paimon-core/src/main/java/org/apache/paimon/branch/TableBranch.java
+++ b/paimon-core/src/main/java/org/apache/paimon/branch/TableBranch.java
@@ -26,6 +26,20 @@ public class TableBranch {
private final long createTime;
+ public TableBranch(String branchName, Long createdFromSnapshot, long
createTime) {
+ this.branchName = branchName;
+ this.createdFromTag = null;
+ this.createdFromSnapshot = createdFromSnapshot;
+ this.createTime = createTime;
+ }
+
+ public TableBranch(String branchName, long createTime) {
+ this.branchName = branchName;
+ this.createdFromTag = null;
+ this.createdFromSnapshot = null;
+ this.createTime = createTime;
+ }
+
public TableBranch(
String branchName, String createdFromTag, Long
createdFromSnapshot, long createTime) {
this.branchName = branchName;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 1ca321d56..7d4a4a7c2 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -524,6 +524,16 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
store().createTagCallbacks());
}
+ @Override
+ public void createBranch(String branchName) {
+ branchManager().createBranch(branchName);
+ }
+
+ @Override
+ public void createBranch(String branchName, long snapshotId) {
+ branchManager().createBranch(branchName, snapshotId);
+ }
+
@Override
public void createBranch(String branchName, String tagName) {
branchManager().createBranch(branchName, tagName);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
index be4976f10..42bea3f68 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
@@ -150,6 +150,22 @@ public interface ReadonlyTable extends InnerTable {
this.getClass().getSimpleName()));
}
+ @Override
+ default void createBranch(String branchName) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Readonly Table %s does not support create empty
branch.",
+ this.getClass().getSimpleName()));
+ }
+
+ @Override
+ default void createBranch(String branchName, long snapshotId) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Readonly Table %s does not support createBranch with
snapshotId.",
+ this.getClass().getSimpleName()));
+ }
+
@Override
default void createBranch(String branchName, String tagName) {
throw new UnsupportedOperationException(
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java
b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
index 3650b773c..876908394 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
@@ -95,6 +95,14 @@ public interface Table extends Serializable {
@Experimental
void rollbackTo(String tagName);
+ /** Create a empty branch. */
+ @Experimental
+ void createBranch(String branchName);
+
+ /** Create a branch from given snapshot. */
+ @Experimental
+ void createBranch(String branchName, long snapshotId);
+
/** Create a branch from given tag. */
@Experimental
void createBranch(String branchName, String tagName);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
index 4656deb67..f3f06f892 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
@@ -23,6 +23,7 @@ import org.apache.paimon.branch.TableBranch;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
@@ -33,6 +34,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
+import java.util.Optional;
import java.util.PriorityQueue;
import java.util.SortedMap;
import java.util.stream.Collectors;
@@ -82,6 +84,65 @@ public class BranchManager {
return new Path(getBranchPath(tablePath, branchName));
}
+ /** Create empty branch. */
+ public void createBranch(String branchName) {
+ checkArgument(
+ !branchName.equals(DEFAULT_MAIN_BRANCH),
+ String.format(
+ "Branch name '%s' is the default branch and cannot be
used.",
+ DEFAULT_MAIN_BRANCH));
+ checkArgument(!StringUtils.isBlank(branchName), "Branch name '%s' is
blank.", branchName);
+ checkArgument(!branchExists(branchName), "Branch name '%s' already
exists.", branchName);
+ checkArgument(
+ !branchName.chars().allMatch(Character::isDigit),
+ "Branch name cannot be pure numeric string but is '%s'.",
+ branchName);
+ try {
+ TableSchema latestSchema = schemaManager.latest().get();
+ fileIO.copyFileUtf8(
+ schemaManager.toSchemaPath(latestSchema.id()),
+ schemaManager.branchSchemaPath(branchName,
latestSchema.id()));
+ } catch (IOException e) {
+ throw new RuntimeException(
+ String.format(
+ "Exception occurs when create branch '%s'
(directory in %s).",
+ branchName, getBranchPath(tablePath, branchName)),
+ e);
+ }
+ }
+
+ public void createBranch(String branchName, long snapshotId) {
+ checkArgument(
+ !branchName.equals(DEFAULT_MAIN_BRANCH),
+ String.format(
+ "Branch name '%s' is the default branch and cannot be
used.",
+ DEFAULT_MAIN_BRANCH));
+ checkArgument(!StringUtils.isBlank(branchName), "Branch name '%s' is
blank.", branchName);
+ checkArgument(!branchExists(branchName), "Branch name '%s' already
exists.", branchName);
+ checkArgument(
+ !branchName.chars().allMatch(Character::isDigit),
+ "Branch name cannot be pure numeric string but is '%s'.",
+ branchName);
+
+ Snapshot snapshot = snapshotManager.snapshot(snapshotId);
+
+ try {
+ // Copy the corresponding snapshot and schema files into the
branch directory
+ fileIO.copyFileUtf8(
+ snapshotManager.snapshotPath(snapshotId),
+ snapshotManager.branchSnapshotPath(branchName,
snapshot.id()));
+ fileIO.copyFileUtf8(
+ schemaManager.toSchemaPath(snapshot.schemaId()),
+ schemaManager.branchSchemaPath(branchName,
snapshot.schemaId()));
+ } catch (IOException e) {
+ throw new RuntimeException(
+ String.format(
+ "Exception occurs when create branch '%s'
(directory in %s).",
+ branchName, getBranchPath(tablePath, branchName)),
+ e);
+ }
+ }
+
public void createBranch(String branchName, String tagName) {
checkArgument(
!branchName.equals(DEFAULT_MAIN_BRANCH),
@@ -170,15 +231,34 @@ public class BranchManager {
new
PriorityQueue<>(Comparator.comparingLong(TableBranch::getCreateTime));
for (Pair<Path, Long> path : paths) {
String branchName =
path.getLeft().getName().substring(BRANCH_PREFIX.length());
+ Optional<TableSchema> tableSchema =
schemaManager.latest(branchName);
+ if (!tableSchema.isPresent()) {
+ // Support empty branch.
+ pq.add(new TableBranch(branchName, path.getValue()));
+ continue;
+ }
FileStoreTable branchTable =
FileStoreTableFactory.create(
fileIO, new Path(getBranchPath(tablePath,
branchName)));
+
SortedMap<Snapshot, List<String>> snapshotTags =
branchTable.tagManager().tags();
- checkArgument(!snapshotTags.isEmpty());
- Snapshot snapshot = snapshotTags.firstKey();
- List<String> tags = snapshotTags.get(snapshot);
- checkArgument(tags.size() == 1);
- pq.add(new TableBranch(branchName, tags.get(0), snapshot.id(),
path.getValue()));
+ Long earliestSnapshotId =
branchTable.snapshotManager().earliestSnapshotId();
+ if (snapshotTags.isEmpty()) {
+ // Create based on snapshotId.
+ pq.add(new TableBranch(branchName, earliestSnapshotId,
path.getValue()));
+ } else {
+ Snapshot snapshot = snapshotTags.firstKey();
+ if (earliestSnapshotId == snapshot.id()) {
+ List<String> tags = snapshotTags.get(snapshot);
+ checkArgument(tags.size() == 1);
+ pq.add(
+ new TableBranch(
+ branchName, tags.get(0),
snapshot.id(), path.getValue()));
+ } else {
+ // Create based on snapshotId.
+ pq.add(new TableBranch(branchName, earliestSnapshotId,
path.getValue()));
+ }
+ }
}
List<TableBranch> branches = new ArrayList<>(pq.size());
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java
index b870f088b..3d8ae49cc 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java
@@ -22,6 +22,7 @@ import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.Table;
+import org.apache.commons.lang3.StringUtils;
import org.apache.flink.table.procedure.ProcedureContext;
/**
@@ -43,13 +44,30 @@ public class CreateBranchProcedure extends ProcedureBase {
public String[] call(
ProcedureContext procedureContext, String tableId, String
branchName, String tagName)
throws Catalog.TableNotExistException {
- return innerCall(tableId, branchName, tagName);
+ return innerCall(tableId, branchName, tagName, 0);
}
- private String[] innerCall(String tableId, String branchName, String
tagName)
+ public String[] call(ProcedureContext procedureContext, String tableId,
String branchName)
+ throws Catalog.TableNotExistException {
+ return innerCall(tableId, branchName, null, 0);
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext, String tableId, String
branchName, long snapshotId)
+ throws Catalog.TableNotExistException {
+ return innerCall(tableId, branchName, null, snapshotId);
+ }
+
+ private String[] innerCall(String tableId, String branchName, String
tagName, long snapshotId)
throws Catalog.TableNotExistException {
Table table = catalog.getTable(Identifier.fromString(tableId));
- table.createBranch(branchName, tagName);
+ if (!StringUtils.isBlank(tagName)) {
+ table.createBranch(branchName, tagName);
+ } else if (snapshotId > 0) {
+ table.createBranch(branchName, snapshotId);
+ } else {
+ table.createBranch(branchName);
+ }
return new String[] {"Success"};
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 04f2b7933..fe463161b 100644
---
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -44,4 +44,4 @@ org.apache.paimon.flink.procedure.MigrateDatabaseProcedure
org.apache.paimon.flink.procedure.MigrateFileProcedure
org.apache.paimon.flink.procedure.RemoveOrphanFilesProcedure
org.apache.paimon.flink.procedure.QueryServiceProcedure
-org.apache.paimon.flink.procedure.ExpireSnapshotsProcedure
+org.apache.paimon.flink.procedure.ExpireSnapshotsProcedure
\ No newline at end of file
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
index 7a6361657..4f4f31496 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
@@ -80,4 +80,84 @@ class BranchActionITCase extends ActionITCaseBase {
"CALL sys.delete_branch('%s.%s', 'branch_name')",
database, tableName));
assertThat(branchManager.branchExists("branch_name")).isFalse();
}
+
+ @Test
+ void testCreateAndDeleteBranchWithSnapshotId() throws Exception {
+
+ init(warehouse);
+
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.BIGINT(),
DataTypes.STRING()},
+ new String[] {"k", "v"});
+ FileStoreTable table =
+ createFileStoreTable(
+ rowType,
+ Collections.emptyList(),
+ Collections.singletonList("k"),
+ Collections.emptyMap());
+
+ StreamWriteBuilder writeBuilder =
table.newStreamWriteBuilder().withCommitUser(commitUser);
+ write = writeBuilder.newWrite();
+ commit = writeBuilder.newCommit();
+
+ // 3 snapshots
+ writeData(rowData(1L, BinaryString.fromString("Hi")));
+ writeData(rowData(2L, BinaryString.fromString("Hello")));
+ writeData(rowData(3L, BinaryString.fromString("Paimon")));
+
+ BranchManager branchManager = table.branchManager();
+
+ callProcedure(
+ String.format(
+ "CALL sys.create_branch('%s.%s',
'branch_name_with_snapshotId', 2)",
+ database, tableName));
+
assertThat(branchManager.branchExists("branch_name_with_snapshotId")).isTrue();
+ branchManager.branches();
+
+ callProcedure(
+ String.format(
+ "CALL sys.delete_branch('%s.%s',
'branch_name_with_snapshotId')",
+ database, tableName));
+
assertThat(branchManager.branchExists("branch_name_with_snapshotId")).isFalse();
+ }
+
+ @Test
+ void testCreateAndDeleteEmptyBranch() throws Exception {
+
+ init(warehouse);
+
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.BIGINT(),
DataTypes.STRING()},
+ new String[] {"k", "v"});
+ FileStoreTable table =
+ createFileStoreTable(
+ rowType,
+ Collections.emptyList(),
+ Collections.singletonList("k"),
+ Collections.emptyMap());
+
+ StreamWriteBuilder writeBuilder =
table.newStreamWriteBuilder().withCommitUser(commitUser);
+ write = writeBuilder.newWrite();
+ commit = writeBuilder.newCommit();
+
+ // 3 snapshots
+ writeData(rowData(1L, BinaryString.fromString("Hi")));
+ writeData(rowData(2L, BinaryString.fromString("Hello")));
+ writeData(rowData(3L, BinaryString.fromString("Paimon")));
+
+ BranchManager branchManager = table.branchManager();
+ callProcedure(
+ String.format(
+ "CALL sys.create_branch('%s.%s', 'empty_branch_name')",
+ database, tableName));
+ assertThat(branchManager.branchExists("empty_branch_name")).isTrue();
+
+ callProcedure(
+ String.format(
+ "CALL sys.delete_branch('%s.%s', 'empty_branch_name')",
+ database, tableName));
+ assertThat(branchManager.branchExists("empty_branch_name")).isFalse();
+ }
}