This is an automated email from the ASF dual-hosted git repository.

liguojun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9bd60203d [Core] Support branch batch/streaming read and write (#2748)
9bd60203d is described below

commit 9bd60203d1e1053e6f96d0a9ec42361a5ff01628
Author: TaoZex <[email protected]>
AuthorDate: Wed Feb 7 14:49:22 2024 +0800

    [Core] Support branch batch/streaming read and write (#2748)
    
    [Core] Support branch batch/streaming read and write
---
 .../java/org/apache/paimon/AbstractFileStore.java  |   7 ++
 .../org/apache/paimon/AppendOnlyFileStore.java     |  14 ++-
 .../src/main/java/org/apache/paimon/FileStore.java |   4 +
 .../java/org/apache/paimon/KeyValueFileStore.java  |  14 ++-
 .../paimon/operation/AbstractFileStoreScan.java    |   7 +-
 .../paimon/operation/AppendOnlyFileStoreScan.java  |   6 +-
 .../paimon/operation/FileStoreCommitImpl.java      |  27 ++++-
 .../paimon/operation/KeyValueFileStoreScan.java    |   6 +-
 .../org/apache/paimon/schema/SchemaManager.java    |  15 ++-
 .../java/org/apache/paimon/schema/TableSchema.java |   4 +
 .../paimon/table/AbstractFileStoreTable.java       |  16 ++-
 .../java/org/apache/paimon/table/DataTable.java    |   2 +
 .../org/apache/paimon/table/FileStoreTable.java    |   2 +
 .../apache/paimon/table/system/AuditLogTable.java  |   5 +
 .../apache/paimon/table/system/BucketsTable.java   |   5 +
 .../paimon/table/system/FileMonitorTable.java      |   5 +
 .../paimon/table/system/ReadOptimizedTable.java    |   5 +
 .../org/apache/paimon/utils/BranchManager.java     |   6 ++
 .../org/apache/paimon/utils/SnapshotManager.java   |  88 +++++++++++----
 .../java/org/apache/paimon/utils/TagManager.java   |   5 +
 .../apache/paimon/operation/FileDeletionTest.java  |   2 +
 .../paimon/table/AppendOnlyFileStoreTableTest.java |  72 +++++++++++++
 .../paimon/table/FileStoreTableTestBase.java       | 118 ++++++++++++++++++---
 .../paimon/table/PrimaryKeyFileStoreTableTest.java |  68 ++++++++++++
 .../paimon/table/SchemaEvolutionTableTestBase.java |   6 ++
 25 files changed, 451 insertions(+), 58 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 0b4ebba19..896b53794 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -53,6 +53,8 @@ import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 
+import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
+
 /**
  * Base {@link FileStore} implementation.
  *
@@ -169,6 +171,10 @@ public abstract class AbstractFileStore<T> implements 
FileStore<T> {
 
     @Override
     public FileStoreCommitImpl newCommit(String commitUser) {
+        return newCommit(commitUser, DEFAULT_MAIN_BRANCH);
+    }
+
+    public FileStoreCommitImpl newCommit(String commitUser, String branchName) 
{
         return new FileStoreCommitImpl(
                 fileIO,
                 schemaManager,
@@ -186,6 +192,7 @@ public abstract class AbstractFileStore<T> implements 
FileStore<T> {
                 options.manifestMergeMinCount(),
                 partitionType.getFieldCount() > 0 && 
options.dynamicPartitionOverwrite(),
                 newKeyComparator(),
+                branchName,
                 newStatsFileHandler());
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index ec1e7cb58..8be8f8178 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -38,6 +38,7 @@ import java.util.List;
 import static org.apache.paimon.predicate.PredicateBuilder.and;
 import static 
org.apache.paimon.predicate.PredicateBuilder.pickTransformFieldMapping;
 import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
+import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
 
 /** {@link FileStore} for reading and writing {@link InternalRow}. */
 public class AppendOnlyFileStore extends AbstractFileStore<InternalRow> {
@@ -69,7 +70,11 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
 
     @Override
     public AppendOnlyFileStoreScan newScan() {
-        return newScan(false);
+        return newScan(DEFAULT_MAIN_BRANCH);
+    }
+
+    public AppendOnlyFileStoreScan newScan(String branchName) {
+        return newScan(false, branchName);
     }
 
     @Override
@@ -99,12 +104,12 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
                 rowType,
                 pathFactory(),
                 snapshotManager(),
-                newScan(true).withManifestCacheFilter(manifestFilter),
+                newScan(true, 
DEFAULT_MAIN_BRANCH).withManifestCacheFilter(manifestFilter),
                 options,
                 tableName);
     }
 
-    private AppendOnlyFileStoreScan newScan(boolean forWrite) {
+    private AppendOnlyFileStoreScan newScan(boolean forWrite, String 
branchName) {
         ScanBucketFilter bucketFilter =
                 new ScanBucketFilter(bucketKeyType) {
                     @Override
@@ -138,7 +143,8 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
                 manifestListFactory(forWrite),
                 options.bucket(),
                 forWrite,
-                options.scanManifestParallelism());
+                options.scanManifestParallelism(),
+                branchName);
     }
 
     @Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index b8346f986..cd38d2061 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -63,6 +63,8 @@ public interface FileStore<T> extends Serializable {
 
     FileStoreScan newScan();
 
+    FileStoreScan newScan(String branchName);
+
     ManifestList.Factory manifestListFactory();
 
     ManifestFile.Factory manifestFileFactory();
@@ -79,6 +81,8 @@ public interface FileStore<T> extends Serializable {
 
     FileStoreCommit newCommit(String commitUser);
 
+    FileStoreCommit newCommit(String commitUser, String branchName);
+
     SnapshotDeletion newSnapshotDeletion();
 
     TagManager newTagManager();
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index 710b01585..373bce35c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -52,6 +52,7 @@ import java.util.function.Supplier;
 import static org.apache.paimon.predicate.PredicateBuilder.and;
 import static 
org.apache.paimon.predicate.PredicateBuilder.pickTransformFieldMapping;
 import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
+import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** {@link FileStore} for querying and updating {@link KeyValue}s. */
@@ -107,7 +108,11 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
 
     @Override
     public KeyValueFileStoreScan newScan() {
-        return newScan(false);
+        return newScan(DEFAULT_MAIN_BRANCH);
+    }
+
+    public KeyValueFileStoreScan newScan(String branchName) {
+        return newScan(false, branchName);
     }
 
     @Override
@@ -159,7 +164,7 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
                 pathFactory(),
                 format2PathFactory(),
                 snapshotManager(),
-                newScan(true).withManifestCacheFilter(manifestFilter),
+                newScan(true, 
DEFAULT_MAIN_BRANCH).withManifestCacheFilter(manifestFilter),
                 indexFactory,
                 options,
                 keyValueFieldsExtractor,
@@ -182,7 +187,7 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
         return pathFactoryMap;
     }
 
-    private KeyValueFileStoreScan newScan(boolean forWrite) {
+    private KeyValueFileStoreScan newScan(boolean forWrite, String branchName) 
{
         ScanBucketFilter bucketFilter =
                 new ScanBucketFilter(bucketKeyType) {
                     @Override
@@ -212,7 +217,8 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
                 manifestListFactory(forWrite),
                 options.bucket(),
                 forWrite,
-                options.scanManifestParallelism());
+                options.scanManifestParallelism(),
+                branchName);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 65857daed..adcab4191 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -84,6 +84,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     private final Integer scanManifestParallelism;
 
     private ScanMetrics scanMetrics = null;
+    private String branchName;
 
     public AbstractFileStoreScan(
             RowType partitionType,
@@ -94,7 +95,8 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
             ManifestList.Factory manifestListFactory,
             int numOfBuckets,
             boolean checkNumOfBuckets,
-            Integer scanManifestParallelism) {
+            Integer scanManifestParallelism,
+            String branchName) {
         this.partitionType = partitionType;
         this.bucketKeyFilter = bucketKeyFilter;
         this.snapshotManager = snapshotManager;
@@ -105,6 +107,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
         this.checkNumOfBuckets = checkNumOfBuckets;
         this.tableSchemas = new ConcurrentHashMap<>();
         this.scanManifestParallelism = scanManifestParallelism;
+        this.branchName = branchName;
     }
 
     @Override
@@ -245,7 +248,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
         if (manifests == null) {
             snapshot =
                     specifiedSnapshot == null
-                            ? snapshotManager.latestSnapshot()
+                            ? snapshotManager.latestSnapshot(branchName)
                             : specifiedSnapshot;
             if (snapshot == null) {
                 manifests = Collections.emptyList();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
index 1cd7db0d2..90aa988b6 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
@@ -49,7 +49,8 @@ public class AppendOnlyFileStoreScan extends 
AbstractFileStoreScan {
             ManifestList.Factory manifestListFactory,
             int numOfBuckets,
             boolean checkNumOfBuckets,
-            Integer scanManifestParallelism) {
+            Integer scanManifestParallelism,
+            String branchName) {
         super(
                 partitionType,
                 bucketFilter,
@@ -59,7 +60,8 @@ public class AppendOnlyFileStoreScan extends 
AbstractFileStoreScan {
                 manifestListFactory,
                 numOfBuckets,
                 checkNumOfBuckets,
-                scanManifestParallelism);
+                scanManifestParallelism,
+                branchName);
         this.fieldStatsConverters =
                 new FieldStatsConverters(sid -> scanTableSchema(sid).fields(), 
schemaId);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 044eb7f6e..0264f1e45 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -71,6 +71,8 @@ import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
+
 /**
  * Default implementation of {@link FileStoreCommit}.
  *
@@ -112,6 +114,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
     private final int manifestMergeMinCount;
     private final boolean dynamicPartitionOverwrite;
     @Nullable private final Comparator<InternalRow> keyComparator;
+    private final String branchName;
 
     @Nullable private Lock lock;
     private boolean ignoreEmptyCommit;
@@ -137,6 +140,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             int manifestMergeMinCount,
             boolean dynamicPartitionOverwrite,
             @Nullable Comparator<InternalRow> keyComparator,
+            String branchName,
             StatsFileHandler statsFileHandler) {
         this.fileIO = fileIO;
         this.schemaManager = schemaManager;
@@ -155,6 +159,8 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
         this.manifestMergeMinCount = manifestMergeMinCount;
         this.dynamicPartitionOverwrite = dynamicPartitionOverwrite;
         this.keyComparator = keyComparator;
+        this.branchName = branchName;
+
         this.lock = null;
         this.ignoreEmptyCommit = true;
         this.commitMetrics = null;
@@ -233,7 +239,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                 // we can skip conflict checking in tryCommit method.
                 // This optimization is mainly used to decrease the number of 
times we read from
                 // files.
-                latestSnapshot = snapshotManager.latestSnapshot();
+                latestSnapshot = snapshotManager.latestSnapshot(branchName);
                 if (latestSnapshot != null) {
                     // it is possible that some partitions only have compact 
changes,
                     // so we need to contain all changes
@@ -254,6 +260,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                                 committable.logOffsets(),
                                 Snapshot.CommitKind.APPEND,
                                 safeLatestSnapshotId,
+                                branchName,
                                 null);
                 generatedSnapshot += 1;
             }
@@ -283,6 +290,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                                 committable.logOffsets(),
                                 Snapshot.CommitKind.COMPACT,
                                 safeLatestSnapshotId,
+                                branchName,
                                 null);
                 generatedSnapshot += 1;
             }
@@ -428,6 +436,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                                 committable.logOffsets(),
                                 Snapshot.CommitKind.COMPACT,
                                 null,
+                                branchName,
                                 null);
                 generatedSnapshot += 1;
             }
@@ -523,6 +532,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                 Collections.emptyMap(),
                 Snapshot.CommitKind.ANALYZE,
                 null,
+                branchName,
                 statsFileName);
     }
 
@@ -596,10 +606,11 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
             Map<Integer, Long> logOffsets,
             Snapshot.CommitKind commitKind,
             @Nullable Long safeLatestSnapshotId,
+            String branchName,
             @Nullable String statsFileName) {
         int cnt = 0;
         while (true) {
-            Snapshot latestSnapshot = snapshotManager.latestSnapshot();
+            Snapshot latestSnapshot = 
snapshotManager.latestSnapshot(branchName);
             cnt++;
             if (tryCommitOnce(
                     tableFiles,
@@ -611,6 +622,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                     commitKind,
                     latestSnapshot,
                     safeLatestSnapshotId,
+                    branchName,
                     statsFileName)) {
                 break;
             }
@@ -672,6 +684,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                     Snapshot.CommitKind.OVERWRITE,
                     latestSnapshot,
                     null,
+                    branchName,
                     null)) {
                 break;
             }
@@ -690,10 +703,14 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
             Snapshot.CommitKind commitKind,
             @Nullable Snapshot latestSnapshot,
             @Nullable Long safeLatestSnapshotId,
+            String branchName,
             @Nullable String newStatsFileName) {
         long newSnapshotId =
                 latestSnapshot == null ? Snapshot.FIRST_SNAPSHOT_ID : 
latestSnapshot.id() + 1;
-        Path newSnapshotPath = snapshotManager.snapshotPath(newSnapshotId);
+        Path newSnapshotPath =
+                branchName.equals(DEFAULT_MAIN_BRANCH)
+                        ? snapshotManager.snapshotPath(newSnapshotId)
+                        : snapshotManager.branchSnapshotPath(branchName, 
newSnapshotId);
 
         if (LOG.isDebugEnabled()) {
             LOG.debug("Ready to commit table files to snapshot #" + 
newSnapshotId);
@@ -775,7 +792,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                 newIndexManifest = indexManifest;
             }
 
-            long latestSchemaId = schemaManager.latest().get().id();
+            long latestSchemaId = schemaManager.latest(branchName).get().id();
 
             // write new stats or inherit from the previous snapshot
             String statsFileName = null;
@@ -840,7 +857,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                         boolean committed =
                                 fileIO.writeFileUtf8(newSnapshotPath, 
newSnapshot.toJson());
                         if (committed) {
-                            snapshotManager.commitLatestHint(newSnapshotId);
+                            snapshotManager.commitLatestHint(newSnapshotId, 
branchName);
                         }
                         return committed;
                     };
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
index d528e5e79..02086ceb3 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
@@ -53,7 +53,8 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
             ManifestList.Factory manifestListFactory,
             int numOfBuckets,
             boolean checkNumOfBuckets,
-            Integer scanManifestParallelism) {
+            Integer scanManifestParallelism,
+            String branchName) {
         super(
                 partitionType,
                 bucketFilter,
@@ -63,7 +64,8 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
                 manifestListFactory,
                 numOfBuckets,
                 checkNumOfBuckets,
-                scanManifestParallelism);
+                scanManifestParallelism,
+                branchName);
         this.fieldKeyStatsConverters =
                 new FieldStatsConverters(
                         sid -> 
keyValueFieldsExtractor.keyFields(scanTableSchema(sid)), schemaId);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 07dda6196..a6d274688 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -64,6 +64,7 @@ import java.util.stream.Collectors;
 
 import static org.apache.paimon.catalog.AbstractCatalog.DB_SUFFIX;
 import static org.apache.paimon.catalog.Identifier.UNKNOWN_DATABASE;
+import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
 import static org.apache.paimon.utils.BranchManager.getBranchPath;
 import static org.apache.paimon.utils.FileUtils.listVersionedFiles;
 import static org.apache.paimon.utils.Preconditions.checkState;
@@ -91,8 +92,16 @@ public class SchemaManager implements Serializable {
 
     /** @return latest schema. */
     public Optional<TableSchema> latest() {
+        return latest(DEFAULT_MAIN_BRANCH);
+    }
+
+    public Optional<TableSchema> latest(String branchName) {
+        Path directoryPath =
+                branchName.equals(DEFAULT_MAIN_BRANCH)
+                        ? schemaDirectory()
+                        : branchSchemaDirectory(branchName);
         try {
-            return listVersionedFiles(fileIO, schemaDirectory(), SCHEMA_PREFIX)
+            return listVersionedFiles(fileIO, directoryPath, SCHEMA_PREFIX)
                     .reduce(Math::max)
                     .map(this::schema);
         } catch (IOException e) {
@@ -482,6 +491,10 @@ public class SchemaManager implements Serializable {
         return new Path(tableRoot + "/schema/" + SCHEMA_PREFIX + id);
     }
 
+    public Path branchSchemaDirectory(String branchName) {
+        return new Path(getBranchPath(tableRoot, branchName) + "/schema");
+    }
+
     public Path branchSchemaPath(String branchName, long schemaId) {
         return new Path(
                 getBranchPath(tableRoot, branchName) + "/schema/" + 
SCHEMA_PREFIX + schemaId);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java
index 6fe65aca1..398aa98d1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java
@@ -285,6 +285,10 @@ public class TableSchema implements Serializable {
                 timeMillis);
     }
 
+    public static TableSchema fromJson(String json) {
+        return JsonSerdeUtil.fromJson(json, TableSchema.class);
+    }
+
     @Override
     public String toString() {
         return JsonSerdeUtil.toJson(this);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 9900d9ecf..d00ce913d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -68,6 +68,7 @@ import java.util.Optional;
 import java.util.function.BiConsumer;
 
 import static org.apache.paimon.CoreOptions.PATH;
+import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
@@ -134,8 +135,13 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
 
     @Override
     public SnapshotReader newSnapshotReader() {
+        return newSnapshotReader(DEFAULT_MAIN_BRANCH);
+    }
+
+    @Override
+    public SnapshotReader newSnapshotReader(String branchName) {
         return new SnapshotReaderImpl(
-                store().newScan(),
+                store().newScan(branchName),
                 tableSchema,
                 coreOptions(),
                 snapshotManager(),
@@ -289,6 +295,11 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
 
     @Override
     public TableCommitImpl newCommit(String commitUser) {
+        // Compatibility with previous design, the main branch is written by 
default
+        return newCommit(commitUser, DEFAULT_MAIN_BRANCH);
+    }
+
+    public TableCommitImpl newCommit(String commitUser, String branchName) {
         CoreOptions options = coreOptions();
         Runnable snapshotExpire = null;
         if (!options.writeOnly()) {
@@ -304,8 +315,9 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
                                     .olderThanMills(System.currentTimeMillis() 
- snapshotTimeRetain)
                                     .expire();
         }
+
         return new TableCommitImpl(
-                store().newCommit(commitUser),
+                store().newCommit(commitUser, branchName),
                 createCommitCallbacks(),
                 snapshotExpire,
                 options.writeOnly() ? null : 
store().newPartitionExpire(commitUser),
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java
index b5bebe2a7..1d8921304 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java
@@ -31,6 +31,8 @@ public interface DataTable extends InnerTable {
 
     SnapshotReader newSnapshotReader();
 
+    SnapshotReader newSnapshotReader(String branchName);
+
     CoreOptions coreOptions();
 
     SnapshotManager snapshotManager();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
index b183d6ad6..ab1b2e961 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
@@ -97,6 +97,8 @@ public interface FileStoreTable extends DataTable {
     @Override
     TableCommitImpl newCommit(String commitUser);
 
+    TableCommitImpl newCommit(String commitUser, String branchName);
+
     LocalTableQuery newLocalTableQuery();
 
     default BinaryTableStats getSchemaFieldStats(DataFileMeta dataFileMeta) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 96c2621b6..59ceed137 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -131,6 +131,11 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
         return new AuditLogDataReader(dataTable.newSnapshotReader());
     }
 
+    @Override
+    public SnapshotReader newSnapshotReader(String branchName) {
+        return new AuditLogDataReader(dataTable.newSnapshotReader(branchName));
+    }
+
     @Override
     public InnerTableScan newScan() {
         return new AuditLogBatchScan(dataTable.newScan());
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
index 4c9b9a601..2ab88a346 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
@@ -148,6 +148,11 @@ public class BucketsTable implements DataTable, 
ReadonlyTable {
         return wrapped.newSnapshotReader();
     }
 
+    @Override
+    public SnapshotReader newSnapshotReader(String branchName) {
+        return wrapped.newSnapshotReader(branchName);
+    }
+
     @Override
     public InnerTableScan newScan() {
         return wrapped.newScan();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
index 7825b93e4..bedc19ac3 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
@@ -135,6 +135,11 @@ public class FileMonitorTable implements DataTable, 
ReadonlyTable {
         return wrapped.newSnapshotReader();
     }
 
+    @Override
+    public SnapshotReader newSnapshotReader(String branchName) {
+        return wrapped.newSnapshotReader(branchName);
+    }
+
     @Override
     public InnerTableScan newScan() {
         return wrapped.newScan();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
index 8daff265f..0deac172b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
@@ -97,6 +97,11 @@ public class ReadOptimizedTable implements DataTable, 
ReadonlyTable {
         }
     }
 
+    @Override
+    public SnapshotReader newSnapshotReader(String branchName) {
+        return dataTable.newSnapshotReader(branchName);
+    }
+
     @Override
     public InnerTableScan newScan() {
         return new InnerTableScanImpl(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
index 9d14d5368..2bf167fe2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
@@ -42,6 +42,7 @@ public class BranchManager {
     private static final Logger LOG = 
LoggerFactory.getLogger(BranchManager.class);
 
     public static final String BRANCH_PREFIX = "branch-";
+    public static final String DEFAULT_MAIN_BRANCH = "main";
 
     private final FileIO fileIO;
     private final Path tablePath;
@@ -78,6 +79,11 @@ public class BranchManager {
     }
 
     public void createBranch(String branchName, String tagName) {
+        checkArgument(
+                !branchName.equals(DEFAULT_MAIN_BRANCH),
+                String.format(
+                        "Branch name '%s' is the default branch and cannot be 
used.",
+                        DEFAULT_MAIN_BRANCH));
         checkArgument(!StringUtils.isBlank(branchName), "Branch name '%s' is 
blank.", branchName);
         checkArgument(!branchExists(branchName), "Branch name '%s' already 
exists.", branchName);
         checkArgument(tagManager.tagExists(tagName), "Tag name '%s' not 
exists.", tagName);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index c69653855..2f0a1d859 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -43,6 +43,7 @@ import java.util.function.BinaryOperator;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
 import static org.apache.paimon.utils.BranchManager.getBranchPath;
 import static org.apache.paimon.utils.FileUtils.listVersionedFiles;
 
@@ -81,13 +82,34 @@ public class SnapshotManager implements Serializable {
         return new Path(tablePath + "/snapshot/" + SNAPSHOT_PREFIX + 
snapshotId);
     }
 
+    public Path branchSnapshotDirectory(String branchName) {
+        return new Path(getBranchPath(tablePath, branchName) + "/snapshot");
+    }
+
     public Path branchSnapshotPath(String branchName, long snapshotId) {
         return new Path(
                 getBranchPath(tablePath, branchName) + "/snapshot/" + 
SNAPSHOT_PREFIX + snapshotId);
     }
 
+    public Path snapshotPathByBranch(String branchName, long snapshotId) {
+        return branchName.equals(DEFAULT_MAIN_BRANCH)
+                ? snapshotPath(snapshotId)
+                : branchSnapshotPath(branchName, snapshotId);
+    }
+
+    public Path snapshotDirByBranch(String branchName) {
+        return branchName.equals(DEFAULT_MAIN_BRANCH)
+                ? snapshotDirectory()
+                : branchSnapshotDirectory(branchName);
+    }
+
     public Snapshot snapshot(long snapshotId) {
-        return Snapshot.fromPath(fileIO, snapshotPath(snapshotId));
+        return snapshot(DEFAULT_MAIN_BRANCH, snapshotId);
+    }
+
+    public Snapshot snapshot(String branchName, long snapshotId) {
+        Path snapshotPath = snapshotPathByBranch(branchName, snapshotId);
+        return Snapshot.fromPath(fileIO, snapshotPath);
     }
 
     public boolean snapshotExists(long snapshotId) {
@@ -102,13 +124,21 @@ public class SnapshotManager implements Serializable {
     }
 
     public @Nullable Snapshot latestSnapshot() {
-        Long snapshotId = latestSnapshotId();
-        return snapshotId == null ? null : snapshot(snapshotId);
+        return latestSnapshot(DEFAULT_MAIN_BRANCH);
+    }
+
+    public @Nullable Snapshot latestSnapshot(String branchName) {
+        Long snapshotId = latestSnapshotId(branchName);
+        return snapshotId == null ? null : snapshot(branchName, snapshotId);
     }
 
     public @Nullable Long latestSnapshotId() {
+        return latestSnapshotId(DEFAULT_MAIN_BRANCH);
+    }
+
+    public @Nullable Long latestSnapshotId(String branchName) {
         try {
-            return findLatest();
+            return findLatest(branchName);
         } catch (IOException e) {
             throw new RuntimeException("Failed to find latest snapshot id", e);
         }
@@ -120,8 +150,12 @@ public class SnapshotManager implements Serializable {
     }
 
     public @Nullable Long earliestSnapshotId() {
+        return earliestSnapshotId(DEFAULT_MAIN_BRANCH);
+    }
+
+    public @Nullable Long earliestSnapshotId(String branchName) {
         try {
-            return findEarliest();
+            return findEarliest(branchName);
         } catch (IOException e) {
             throw new RuntimeException("Failed to find earliest snapshot id", 
e);
         }
@@ -352,13 +386,13 @@ public class SnapshotManager implements Serializable {
         return null;
     }
 
-    private @Nullable Long findLatest() throws IOException {
-        Path snapshotDir = snapshotDirectory();
+    private @Nullable Long findLatest(String branchName) throws IOException {
+        Path snapshotDir = snapshotDirByBranch(branchName);
         if (!fileIO.exists(snapshotDir)) {
             return null;
         }
 
-        Long snapshotId = readHint(LATEST);
+        Long snapshotId = readHint(LATEST, branchName);
         if (snapshotId != null) {
             long nextSnapshot = snapshotId + 1;
             // it is the latest only there is no next one
@@ -367,26 +401,30 @@ public class SnapshotManager implements Serializable {
             }
         }
 
-        return findByListFiles(Math::max);
+        return findByListFiles(Math::max, branchName);
     }
 
-    private @Nullable Long findEarliest() throws IOException {
-        Path snapshotDir = snapshotDirectory();
+    private @Nullable Long findEarliest(String branchName) throws IOException {
+        Path snapshotDir = snapshotDirByBranch(branchName);
         if (!fileIO.exists(snapshotDir)) {
             return null;
         }
 
-        Long snapshotId = readHint(EARLIEST);
+        Long snapshotId = readHint(EARLIEST, branchName);
         // null and it is the earliest only it exists
         if (snapshotId != null && snapshotExists(snapshotId)) {
             return snapshotId;
         }
 
-        return findByListFiles(Math::min);
+        return findByListFiles(Math::min, branchName);
     }
 
     public Long readHint(String fileName) {
-        Path snapshotDir = snapshotDirectory();
+        return readHint(fileName, DEFAULT_MAIN_BRANCH);
+    }
+
+    public Long readHint(String fileName, String branchName) {
+        Path snapshotDir = snapshotDirByBranch(branchName);
         Path path = new Path(snapshotDir, fileName);
         int retryNumber = 0;
         while (retryNumber++ < READ_HINT_RETRY_NUM) {
@@ -404,23 +442,33 @@ public class SnapshotManager implements Serializable {
         return null;
     }
 
-    private Long findByListFiles(BinaryOperator<Long> reducer) throws 
IOException {
-        Path snapshotDir = snapshotDirectory();
+    private Long findByListFiles(BinaryOperator<Long> reducer, String 
branchName)
+            throws IOException {
+        Path snapshotDir = snapshotDirByBranch(branchName);
         return listVersionedFiles(fileIO, snapshotDir, SNAPSHOT_PREFIX)
                 .reduce(reducer)
                 .orElse(null);
     }
 
     public void commitLatestHint(long snapshotId) throws IOException {
-        commitHint(snapshotId, LATEST);
+        commitLatestHint(snapshotId, DEFAULT_MAIN_BRANCH);
+    }
+
+    public void commitLatestHint(long snapshotId, String branchName) throws 
IOException {
+        commitHint(snapshotId, LATEST, branchName);
     }
 
     public void commitEarliestHint(long snapshotId) throws IOException {
-        commitHint(snapshotId, EARLIEST);
+        commitEarliestHint(snapshotId, DEFAULT_MAIN_BRANCH);
+    }
+
+    public void commitEarliestHint(long snapshotId, String branchName) throws 
IOException {
+        commitHint(snapshotId, EARLIEST, branchName);
     }
 
-    private void commitHint(long snapshotId, String fileName) throws 
IOException {
-        Path snapshotDir = snapshotDirectory();
+    private void commitHint(long snapshotId, String fileName, String 
branchName)
+            throws IOException {
+        Path snapshotDir = snapshotDirByBranch(branchName);
         Path hintFile = new Path(snapshotDir, fileName);
         fileIO.delete(hintFile, false);
         fileIO.writeFileUtf8(hintFile, String.valueOf(snapshotId));
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index 96167b91d..3f9599431 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -69,6 +69,11 @@ public class TagManager {
         return new Path(tablePath + "/tag/" + TAG_PREFIX + tagName);
     }
 
+    /** Return the path of tag directory in branch. */
+    public Path branchTagDirectory(String branchName) {
+        return new Path(getBranchPath(tablePath, branchName) + "/tag");
+    }
+
     /** Return the path of a tag in branch. */
     public Path branchTagPath(String branchName, String tagName) {
         return new Path(getBranchPath(tablePath, branchName) + "/tag/" + 
TAG_PREFIX + tagName);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
index c41741c08..49e55ee4e 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
@@ -63,6 +63,7 @@ import static 
org.apache.paimon.operation.FileStoreTestUtils.assertPathExists;
 import static 
org.apache.paimon.operation.FileStoreTestUtils.assertPathNotExists;
 import static org.apache.paimon.operation.FileStoreTestUtils.commitData;
 import static org.apache.paimon.operation.FileStoreTestUtils.partitionedData;
+import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /**
@@ -723,6 +724,7 @@ public class FileDeletionTest {
                         Snapshot.CommitKind.APPEND,
                         store.snapshotManager().latestSnapshot(),
                         null,
+                        DEFAULT_MAIN_BRANCH,
                         null);
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
index d91e0a320..9e0c6d2ab 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
@@ -85,6 +85,30 @@ public class AppendOnlyFileStoreTableTest extends 
FileStoreTableTestBase {
                                 
"2|21|201|binary|varbinary|mapKey:mapVal|multiset"));
     }
 
+    @Test
+    public void testBranchBatchReadWrite() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        generateBranch(table);
+        writeBranchData(table);
+        List<Split> splits = 
toSplits(table.newSnapshotReader(BRANCH_NAME).read().dataSplits());
+        TableRead read = table.newRead();
+        assertThat(getResult(read, splits, binaryRow(1), 0, 
BATCH_ROW_TO_STRING))
+                .hasSameElementsAs(
+                        Arrays.asList(
+                                
"1|10|100|binary|varbinary|mapKey:mapVal|multiset",
+                                
"1|11|101|binary|varbinary|mapKey:mapVal|multiset",
+                                
"1|12|102|binary|varbinary|mapKey:mapVal|multiset",
+                                
"1|11|101|binary|varbinary|mapKey:mapVal|multiset",
+                                
"1|12|102|binary|varbinary|mapKey:mapVal|multiset"));
+        assertThat(getResult(read, splits, binaryRow(2), 0, 
BATCH_ROW_TO_STRING))
+                .hasSameElementsAs(
+                        Arrays.asList(
+                                
"2|20|200|binary|varbinary|mapKey:mapVal|multiset",
+                                
"2|21|201|binary|varbinary|mapKey:mapVal|multiset",
+                                
"2|22|202|binary|varbinary|mapKey:mapVal|multiset",
+                                
"2|21|201|binary|varbinary|mapKey:mapVal|multiset"));
+    }
+
     @Test
     public void testBatchProjection() throws Exception {
         writeData();
@@ -241,6 +265,31 @@ public class AppendOnlyFileStoreTableTest extends 
FileStoreTableTestBase {
         assertThat(partitions).containsExactly(1, 2, 3);
     }
 
+    @Test
+    public void testBranchStreamingReadWrite() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        generateBranch(table);
+        writeBranchData(table);
+
+        List<Split> splits =
+                toSplits(
+                        table.newSnapshotReader(BRANCH_NAME)
+                                .withMode(ScanMode.DELTA)
+                                .read()
+                                .dataSplits());
+        TableRead read = table.newRead();
+
+        assertThat(getResult(read, splits, binaryRow(1), 0, 
STREAMING_ROW_TO_STRING))
+                .isEqualTo(
+                        Arrays.asList(
+                                
"+1|11|101|binary|varbinary|mapKey:mapVal|multiset",
+                                
"+1|12|102|binary|varbinary|mapKey:mapVal|multiset"));
+        assertThat(getResult(read, splits, binaryRow(2), 0, 
STREAMING_ROW_TO_STRING))
+                .isEqualTo(
+                        Collections.singletonList(
+                                
"+2|21|201|binary|varbinary|mapKey:mapVal|multiset"));
+    }
+
     @Test
     public void testStreamingSplitInUnawareBucketMode() throws Exception {
         // in unaware-bucket mode, we split files into splits all the time
@@ -451,6 +500,29 @@ public class AppendOnlyFileStoreTableTest extends 
FileStoreTableTestBase {
         commit.close();
     }
 
+    private void writeBranchData(FileStoreTable table) throws Exception {
+        StreamTableWrite write = table.newWrite(commitUser);
+        StreamTableCommit commit = table.newCommit(commitUser, BRANCH_NAME);
+
+        write.write(rowData(1, 10, 100L));
+        write.write(rowData(2, 20, 200L));
+        write.write(rowData(1, 11, 101L));
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        write.write(rowData(1, 12, 102L));
+        write.write(rowData(2, 21, 201L));
+        write.write(rowData(2, 22, 202L));
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        write.write(rowData(1, 11, 101L));
+        write.write(rowData(2, 21, 201L));
+        write.write(rowData(1, 12, 102L));
+        commit.commit(2, write.prepareCommit(true, 2));
+
+        write.close();
+        commit.close();
+    }
+
     @Override
     protected FileStoreTable createFileStoreTable(Consumer<Options> configure) 
throws Exception {
         Options conf = new Options();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index c07fc2029..b9b117182 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -117,6 +117,8 @@ import static 
org.junit.jupiter.params.provider.Arguments.arguments;
 /** Base test class for {@link FileStoreTable}. */
 public abstract class FileStoreTableTestBase {
 
+    protected static final String BRANCH_NAME = "branch1";
+
     protected static final RowType ROW_TYPE =
             RowType.of(
                     new DataType[] {
@@ -941,14 +943,7 @@ public abstract class FileStoreTableTestBase {
         table.createBranch("test-branch", "test-tag");
 
         // verify that branch file exist
-        TraceableFileIO fileIO = new TraceableFileIO();
-        BranchManager branchManager =
-                new BranchManager(
-                        fileIO,
-                        tablePath,
-                        new SnapshotManager(fileIO, tablePath),
-                        new TagManager(fileIO, tablePath),
-                        new SchemaManager(fileIO, tablePath));
+        BranchManager branchManager = table.branchManager();
         assertThat(branchManager.branchExists("test-branch")).isTrue();
 
         // verify test-tag in test-branch is equal to snapshot 2
@@ -987,6 +982,12 @@ public abstract class FileStoreTableTestBase {
         table.createTag("test-tag", 1);
         table.createBranch("branch0", "test-tag");
 
+        assertThatThrownBy(() -> table.createBranch("main", "tag1"))
+                .satisfies(
+                        AssertionUtils.anyCauseMatches(
+                                IllegalArgumentException.class,
+                                "Branch name 'main' is the default branch and 
cannot be used."));
+
         assertThatThrownBy(() -> table.createBranch("branch-1", "tag1"))
                 .satisfies(
                         AssertionUtils.anyCauseMatches(
@@ -1026,14 +1027,7 @@ public abstract class FileStoreTableTestBase {
         table.deleteBranch("branch1");
 
         // verify that branch file not exist
-        TraceableFileIO fileIO = new TraceableFileIO();
-        BranchManager branchManager =
-                new BranchManager(
-                        fileIO,
-                        tablePath,
-                        new SnapshotManager(fileIO, tablePath),
-                        new TagManager(fileIO, tablePath),
-                        new SchemaManager(fileIO, tablePath));
+        BranchManager branchManager = table.branchManager();
         assertThat(branchManager.branchExists("branch1")).isFalse();
 
         assertThatThrownBy(() -> table.deleteBranch("branch1"))
@@ -1240,6 +1234,66 @@ public abstract class FileStoreTableTestBase {
         assertThat(schemaPath).isEqualTo(tablePath);
     }
 
+    @Test
+    public void testBranchWriteAndRead() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+
+        generateBranch(table);
+
+        // Write data to branch1
+        try (StreamTableWrite write = table.newWrite(commitUser);
+                StreamTableCommit commit = table.newCommit(commitUser, 
BRANCH_NAME)) {
+            write.write(rowData(2, 20, 200L));
+            commit.commit(1, write.prepareCommit(false, 2));
+        }
+
+        // Validate data in main branch
+        assertThat(
+                        getResult(
+                                table.newRead(),
+                                
toSplits(table.newSnapshotReader().read().dataSplits()),
+                                BATCH_ROW_TO_STRING))
+                
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
+
+        // Validate data in branch1
+        assertThat(
+                        getResult(
+                                table.newRead(),
+                                
toSplits(table.newSnapshotReader(BRANCH_NAME).read().dataSplits()),
+                                BATCH_ROW_TO_STRING))
+                .containsExactlyInAnyOrder(
+                        "0|0|0|binary|varbinary|mapKey:mapVal|multiset",
+                        "2|20|200|binary|varbinary|mapKey:mapVal|multiset");
+
+        // Write two rows data to branch1 again
+        try (StreamTableWrite write = table.newWrite(commitUser);
+                StreamTableCommit commit = table.newCommit(commitUser, 
BRANCH_NAME)) {
+            write.write(rowData(3, 30, 300L));
+            write.write(rowData(4, 40, 400L));
+            commit.commit(2, write.prepareCommit(false, 3));
+        }
+
+        // Validate data in main branch
+        assertThat(
+                        getResult(
+                                table.newRead(),
+                                
toSplits(table.newSnapshotReader().read().dataSplits()),
+                                BATCH_ROW_TO_STRING))
+                
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
+
+        // Verify data in branch1
+        assertThat(
+                        getResult(
+                                table.newRead(),
+                                
toSplits(table.newSnapshotReader(BRANCH_NAME).read().dataSplits()),
+                                BATCH_ROW_TO_STRING))
+                .containsExactlyInAnyOrder(
+                        "0|0|0|binary|varbinary|mapKey:mapVal|multiset",
+                        "2|20|200|binary|varbinary|mapKey:mapVal|multiset",
+                        "3|30|300|binary|varbinary|mapKey:mapVal|multiset",
+                        "4|40|400|binary|varbinary|mapKey:mapVal|multiset");
+    }
+
     protected List<String> getResult(
             TableRead read,
             List<Split> splits,
@@ -1420,4 +1474,36 @@ public abstract class FileStoreTableTestBase {
     protected List<Split> toSplits(List<DataSplit> dataSplits) {
         return new ArrayList<>(dataSplits);
     }
+
+    // create a branch which named branch1
+    protected void generateBranch(FileStoreTable table) throws Exception {
+        try (StreamTableWrite write = table.newWrite(commitUser);
+                StreamTableCommit commit = table.newCommit(commitUser)) {
+            write.write(rowData(0, 0, 0L));
+            commit.commit(0, write.prepareCommit(false, 1));
+        }
+
+        assertThat(
+                        getResult(
+                                table.newRead(),
+                                
toSplits(table.newSnapshotReader().read().dataSplits()),
+                                BATCH_ROW_TO_STRING))
+                
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
+
+        table.createTag("tag1", 1);
+        table.createBranch(BRANCH_NAME, "tag1");
+
+        // verify that branch1 file exist
+        TraceableFileIO fileIO = new TraceableFileIO();
+        BranchManager branchManager = table.branchManager();
+        assertThat(branchManager.branchExists(BRANCH_NAME)).isTrue();
+
+        // Verify branch1 and the main branch have the same data
+        assertThat(
+                        getResult(
+                                table.newRead(),
+                                
toSplits(table.newSnapshotReader(BRANCH_NAME).read().dataSplits()),
+                                BATCH_ROW_TO_STRING))
+                
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index 92172ae4f..2412e4b7c 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -261,6 +261,24 @@ public class PrimaryKeyFileStoreTableTest extends 
FileStoreTableTestBase {
                                 
"2|22|202|binary|varbinary|mapKey:mapVal|multiset"));
     }
 
+    @Test
+    public void testBranchBatchReadWrite() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        generateBranch(table);
+        writeBranchData(table);
+        List<Split> splits = 
toSplits(table.newSnapshotReader(BRANCH_NAME).read().dataSplits());
+        TableRead read = table.newRead();
+        assertThat(getResult(read, splits, binaryRow(1), 0, 
BATCH_ROW_TO_STRING))
+                .isEqualTo(
+                        Collections.singletonList(
+                                
"1|10|1000|binary|varbinary|mapKey:mapVal|multiset"));
+        assertThat(getResult(read, splits, binaryRow(2), 0, 
BATCH_ROW_TO_STRING))
+                .isEqualTo(
+                        Arrays.asList(
+                                
"2|21|20001|binary|varbinary|mapKey:mapVal|multiset",
+                                
"2|22|202|binary|varbinary|mapKey:mapVal|multiset"));
+    }
+
     @Test
     public void testBatchProjection() throws Exception {
         writeData();
@@ -314,6 +332,31 @@ public class PrimaryKeyFileStoreTableTest extends 
FileStoreTableTestBase {
                                 
"+2|22|202|binary|varbinary|mapKey:mapVal|multiset"));
     }
 
+    @Test
+    public void testBranchStreamingReadWrite() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        generateBranch(table);
+        writeBranchData(table);
+
+        List<Split> splits =
+                toSplits(
+                        table.newSnapshotReader(BRANCH_NAME)
+                                .withMode(ScanMode.DELTA)
+                                .read()
+                                .dataSplits());
+        TableRead read = table.newRead();
+        assertThat(getResult(read, splits, binaryRow(1), 0, 
STREAMING_ROW_TO_STRING))
+                .isEqualTo(
+                        Collections.singletonList(
+                                
"-1|11|1001|binary|varbinary|mapKey:mapVal|multiset"));
+        assertThat(getResult(read, splits, binaryRow(2), 0, 
STREAMING_ROW_TO_STRING))
+                .isEqualTo(
+                        Arrays.asList(
+                                
"-2|20|200|binary|varbinary|mapKey:mapVal|multiset",
+                                
"+2|21|20001|binary|varbinary|mapKey:mapVal|multiset",
+                                
"+2|22|202|binary|varbinary|mapKey:mapVal|multiset"));
+    }
+
     @Test
     public void testStreamingProjection() throws Exception {
         writeData();
@@ -611,6 +654,31 @@ public class PrimaryKeyFileStoreTableTest extends 
FileStoreTableTestBase {
         commit.close();
     }
 
+    private void writeBranchData(FileStoreTable table) throws Exception {
+        StreamTableWrite write = table.newWrite(commitUser);
+        StreamTableCommit commit = table.newCommit(commitUser, BRANCH_NAME);
+
+        write.write(rowData(1, 10, 100L));
+        write.write(rowData(2, 20, 200L));
+        write.write(rowData(1, 11, 101L));
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        write.write(rowData(1, 10, 1000L));
+        write.write(rowData(2, 21, 201L));
+        write.write(rowData(2, 21, 2001L));
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        write.write(rowData(1, 11, 1001L));
+        write.write(rowData(2, 21, 20001L));
+        write.write(rowData(2, 22, 202L));
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 11, 1001L));
+        write.write(rowDataWithKind(RowKind.DELETE, 2, 20, 200L));
+        commit.commit(2, write.prepareCommit(true, 2));
+
+        write.close();
+        commit.close();
+    }
+
     @Override
     @Test
     public void testReadFilter() throws Exception {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java
index f5874bed7..a55bda911 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java
@@ -502,6 +502,12 @@ public abstract class SchemaEvolutionTableTestBase {
                                     .orElseThrow(IllegalStateException::new)));
         }
 
+        @Override
+        public Optional<TableSchema> latest(String branchName) {
+            // for compatibility test
+            return latest();
+        }
+
         @Override
         public List<TableSchema> listAll() {
             return new ArrayList<>(tableSchemas.values());

Reply via email to