This is an automated email from the ASF dual-hosted git repository.

zjureel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e8aa707db [core][branch] Add branches in tags table (#2754)
e8aa707db is described below

commit e8aa707dbc277af21cca5665ec6e2e2a1cec448f
Author: Fang Yong <[email protected]>
AuthorDate: Wed Feb 7 12:51:47 2024 +0800

    [core][branch] Add branches in tags table (#2754)
    
    * [core][branch] Add branches in tags table
---
 docs/content/how-to/system-tables.md               | 12 +++---
 .../org/apache/paimon/table/system/TagsTable.java  | 32 +++++++++++++---
 .../org/apache/paimon/utils/BranchManager.java     | 26 +++++++++++++
 .../java/org/apache/paimon/utils/TagManager.java   | 18 +++++++++
 .../apache/paimon/table/system/TagsTableTest.java  | 44 +++++++++++++++++++---
 5 files changed, 115 insertions(+), 17 deletions(-)

diff --git a/docs/content/how-to/system-tables.md 
b/docs/content/how-to/system-tables.md
index 8005ab27b..5ee01ffcc 100644
--- a/docs/content/how-to/system-tables.md
+++ b/docs/content/how-to/system-tables.md
@@ -198,12 +198,12 @@ and some historical information of the snapshots. You can 
also get all tag names
 SELECT * FROM MyTable$tags;
 
 /*
-+----------+-------------+-----------+-------------------------+--------------+
-| tag_name | snapshot_id | schema_id |             commit_time | record_count |
-+----------+-------------+-----------+-------------------------+--------------+
-|     tag1 |           1 |         0 | 2023-06-28 14:55:29.344 |            3 |
-|     tag3 |           3 |         0 | 2023-06-28 14:58:24.691 |            7 |
-+----------+-------------+-----------+-------------------------+--------------+
++----------+-------------+-----------+-------------------------+--------------+--------------+
+| tag_name | snapshot_id | schema_id |             commit_time | record_count 
|   branches   |
++----------+-------------+-----------+-------------------------+--------------+--------------+
+|     tag1 |           1 |         0 | 2023-06-28 14:55:29.344 |            3 
|      []      |
+|     tag3 |           3 |         0 | 2023-06-28 14:58:24.691 |            7 
|  [branch-1]  |
++----------+-------------+-----------+-------------------------+--------------+--------------+
 2 rows in set
 */
 ```
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
index 98c3ec3cc..28786dc4a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.table.system;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
@@ -26,8 +27,11 @@ import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.table.ReadonlyTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.InnerTableRead;
@@ -47,8 +51,10 @@ import org.apache.paimon.utils.TagManager;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -72,7 +78,8 @@ public class TagsTable implements ReadonlyTable {
                             new DataField(1, "snapshot_id", new 
BigIntType(false)),
                             new DataField(2, "schema_id", new 
BigIntType(false)),
                             new DataField(3, "commit_time", new 
TimestampType(false, 3)),
-                            new DataField(4, "record_count", new 
BigIntType(true))));
+                            new DataField(4, "record_count", new 
BigIntType(true)),
+                            new DataField(5, "branches", 
SerializationUtils.newStringType(true))));
 
     private final FileIO fileIO;
     private final Path location;
@@ -192,16 +199,28 @@ public class TagsTable implements ReadonlyTable {
                 throw new IllegalArgumentException("Unsupported split: " + 
split.getClass());
             }
             Path location = ((TagsSplit) split).location;
-            SortedMap<Snapshot, List<String>> tags = new TagManager(fileIO, 
location).tags();
+            Options options = new Options();
+            options.set(CoreOptions.PATH, location.toUri().toString());
+            FileStoreTable table = FileStoreTableFactory.create(fileIO, 
options);
+            SortedMap<Snapshot, List<String>> tags = table.tagManager().tags();
             Map<String, Snapshot> nameToSnapshot = new LinkedHashMap<>();
             for (Map.Entry<Snapshot, List<String>> tag : tags.entrySet()) {
                 for (String tagName : tag.getValue()) {
                     nameToSnapshot.put(tagName, tag.getKey());
                 }
             }
+            Map<String, List<String>> tagBranches = new HashMap<>();
+            table.branchManager()
+                    .branches()
+                    .forEach(
+                            (branch, tag) ->
+                                    tagBranches
+                                            .computeIfAbsent(tag, key -> new 
ArrayList<>())
+                                            .add(branch));
 
             Iterator<InternalRow> rows =
-                    Iterators.transform(nameToSnapshot.entrySet().iterator(), 
this::toRow);
+                    Iterators.transform(
+                            nameToSnapshot.entrySet().iterator(), tag -> 
toRow(tag, tagBranches));
             if (projection != null) {
                 rows =
                         Iterators.transform(
@@ -210,15 +229,18 @@ public class TagsTable implements ReadonlyTable {
             return new IteratorRecordReader<>(rows);
         }
 
-        private InternalRow toRow(Map.Entry<String, Snapshot> tag) {
+        private InternalRow toRow(
+                Map.Entry<String, Snapshot> tag, Map<String, List<String>> 
tagBranches) {
             Snapshot snapshot = tag.getValue();
+            List<String> branches = tagBranches.get(tag.getKey());
             return GenericRow.of(
                     BinaryString.fromString(tag.getKey()),
                     snapshot.id(),
                     snapshot.schemaId(),
                     Timestamp.fromLocalDateTime(
                             
DateTimeUtils.toLocalDateTime(snapshot.timeMillis())),
-                    snapshot.totalRecordCount());
+                    snapshot.totalRecordCount(),
+                    BinaryString.fromString(branches == null ? "[]" : 
branches.toString()));
         }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
index 5ed647ef4..9d14d5368 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
@@ -20,6 +20,7 @@ package org.apache.paimon.utils;
 
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.schema.SchemaManager;
 
@@ -27,7 +28,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
 
+import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Manager for {@code Branch}. */
@@ -133,4 +139,24 @@ public class BranchManager {
         Path branchPath = branchPath(branchName);
         return fileExists(branchPath);
     }
+
+    /** Get branch->tag pair. */
+    public Map<String, String> branches() {
+        Map<String, String> branchTags = new HashMap<>();
+
+        try {
+            List<Path> paths =
+                    listVersionedFileStatus(fileIO, branchDirectory(), 
BRANCH_PREFIX)
+                            .map(FileStatus::getPath)
+                            .collect(Collectors.toList());
+            for (Path path : paths) {
+                String branchName = 
path.getName().substring(BRANCH_PREFIX.length());
+                branchTags.put(branchName, 
tagManager.branchTags(branchName).get(0));
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        return branchTags;
+    }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index a29a3e151..96167b91d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -74,6 +74,24 @@ public class TagManager {
         return new Path(getBranchPath(tablePath, branchName) + "/tag/" + 
TAG_PREFIX + tagName);
     }
 
+    public List<String> branchTags(String branchName) {
+        try {
+            List<Path> tagPaths =
+                    listVersionedFileStatus(
+                                    fileIO,
+                                    new Path(getBranchPath(tablePath, 
branchName) + "/tag/"),
+                                    TAG_PREFIX)
+                            .map(FileStatus::getPath)
+                            .collect(Collectors.toList());
+            checkArgument(tagPaths.size() > 0, "There should be at least one 
tag in the branch.");
+            return tagPaths.stream()
+                    .map(p -> p.getName().substring(TAG_PREFIX.length()))
+                    .collect(Collectors.toList());
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     /** Create a tag from given snapshot and save it in the storage. */
     public void createTag(Snapshot snapshot, String tagName, List<TagCallback> 
callbacks) {
         checkArgument(!StringUtils.isBlank(tagName), "Tag name '%s' is 
blank.", tagName);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java
index c6df41841..51cfcd702 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java
@@ -27,6 +27,7 @@ import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
 import org.apache.paimon.table.TableTestBase;
 import org.apache.paimon.table.sink.TableCommitImpl;
 import org.apache.paimon.types.DataTypes;
@@ -38,20 +39,23 @@ import org.junit.jupiter.api.Test;
 
 import java.time.LocalDateTime;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Function;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Unit tests for {@link TagsTable}. */
-public class TagsTableTest extends TableTestBase {
+class TagsTableTest extends TableTestBase {
 
     private static final String tableName = "MyTable";
     private TagsTable tagsTable;
     private TagManager tagManager;
 
     @BeforeEach
-    public void before() throws Exception {
+    void before() throws Exception {
         Identifier identifier = identifier(tableName);
         Schema schema =
                 Schema.newBuilder()
@@ -82,13 +86,39 @@ public class TagsTableTest extends TableTestBase {
     }
 
     @Test
-    public void testTagsTable() throws Exception {
-        List<InternalRow> expectRow = getExceptedResult();
+    void testTagsTable() throws Exception {
+        List<InternalRow> expectRow =
+                getExceptedResult(
+                        key -> {
+                            return new ArrayList<>();
+                        });
         List<InternalRow> result = read(tagsTable);
         assertThat(result).containsExactlyElementsOf(expectRow);
     }
 
-    private List<InternalRow> getExceptedResult() {
+    @Test
+    void testTagBranchesTable() throws Exception {
+        Table table = catalog.getTable(identifier(tableName));
+        table.createBranch("2023-07-17-branch1", "2023-07-17");
+        table.createBranch("2023-07-18-branch1", "2023-07-18");
+        table.createBranch("2023-07-18-branch2", "2023-07-18");
+        List<InternalRow> expectRow =
+                getExceptedResult(
+                        tag -> {
+                            if (tag.equals("2023-07-17")) {
+                                return 
Collections.singletonList("2023-07-17-branch1");
+                            } else if (tag.equals("2023-07-18")) {
+                                return Arrays.asList("2023-07-18-branch1", 
"2023-07-18-branch2");
+                            } else {
+                                return new ArrayList<>();
+                            }
+                        });
+        List<InternalRow> result = read(tagsTable);
+        assertThat(result).containsExactlyElementsOf(expectRow);
+    }
+
+    private List<InternalRow> getExceptedResult(
+            Function<String, List<String>> tagBranchesFunction) {
         List<InternalRow> internalRows = new ArrayList<>();
         for (Map.Entry<Snapshot, List<String>> tag : 
tagManager.tags().entrySet()) {
             Snapshot snapshot = tag.getKey();
@@ -100,7 +130,9 @@ public class TagsTableTest extends TableTestBase {
                                 snapshot.schemaId(),
                                 Timestamp.fromLocalDateTime(
                                         
DateTimeUtils.toLocalDateTime(snapshot.timeMillis())),
-                                snapshot.totalRecordCount()));
+                                snapshot.totalRecordCount(),
+                                BinaryString.fromString(
+                                        
tagBranchesFunction.apply(tagName).toString())));
             }
         }
         return internalRows;

Reply via email to