This is an automated email from the ASF dual-hosted git repository.
zjureel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e8aa707db [core][branch] Add branches in tags table (#2754)
e8aa707db is described below
commit e8aa707dbc277af21cca5665ec6e2e2a1cec448f
Author: Fang Yong <[email protected]>
AuthorDate: Wed Feb 7 12:51:47 2024 +0800
[core][branch] Add branches in tags table (#2754)
* [core][branch] Add branches in tags table
---
docs/content/how-to/system-tables.md | 12 +++---
.../org/apache/paimon/table/system/TagsTable.java | 32 +++++++++++++---
.../org/apache/paimon/utils/BranchManager.java | 26 +++++++++++++
.../java/org/apache/paimon/utils/TagManager.java | 18 +++++++++
.../apache/paimon/table/system/TagsTableTest.java | 44 +++++++++++++++++++---
5 files changed, 115 insertions(+), 17 deletions(-)
diff --git a/docs/content/how-to/system-tables.md
b/docs/content/how-to/system-tables.md
index 8005ab27b..5ee01ffcc 100644
--- a/docs/content/how-to/system-tables.md
+++ b/docs/content/how-to/system-tables.md
@@ -198,12 +198,12 @@ and some historical information of the snapshots. You can
also get all tag names
SELECT * FROM MyTable$tags;
/*
-+----------+-------------+-----------+-------------------------+--------------+
-| tag_name | snapshot_id | schema_id | commit_time | record_count |
-+----------+-------------+-----------+-------------------------+--------------+
-| tag1 | 1 | 0 | 2023-06-28 14:55:29.344 | 3 |
-| tag3 | 3 | 0 | 2023-06-28 14:58:24.691 | 7 |
-+----------+-------------+-----------+-------------------------+--------------+
++----------+-------------+-----------+-------------------------+--------------+--------------+
+| tag_name | snapshot_id | schema_id | commit_time | record_count
| branches |
++----------+-------------+-----------+-------------------------+--------------+--------------+
+| tag1 | 1 | 0 | 2023-06-28 14:55:29.344 | 3
| [] |
+| tag3 | 3 | 0 | 2023-06-28 14:58:24.691 | 7
| [branch-1] |
++----------+-------------+-----------+-------------------------+--------------+--------------+
2 rows in set
*/
```
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
index 98c3ec3cc..28786dc4a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
@@ -18,6 +18,7 @@
package org.apache.paimon.table.system;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
@@ -26,8 +27,11 @@ import org.apache.paimon.data.Timestamp;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.ReadonlyTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.InnerTableRead;
@@ -47,8 +51,10 @@ import org.apache.paimon.utils.TagManager;
import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
@@ -72,7 +78,8 @@ public class TagsTable implements ReadonlyTable {
new DataField(1, "snapshot_id", new
BigIntType(false)),
new DataField(2, "schema_id", new
BigIntType(false)),
new DataField(3, "commit_time", new
TimestampType(false, 3)),
- new DataField(4, "record_count", new
BigIntType(true))));
+ new DataField(4, "record_count", new
BigIntType(true)),
+ new DataField(5, "branches",
SerializationUtils.newStringType(true))));
private final FileIO fileIO;
private final Path location;
@@ -192,16 +199,28 @@ public class TagsTable implements ReadonlyTable {
throw new IllegalArgumentException("Unsupported split: " +
split.getClass());
}
Path location = ((TagsSplit) split).location;
- SortedMap<Snapshot, List<String>> tags = new TagManager(fileIO,
location).tags();
+ Options options = new Options();
+ options.set(CoreOptions.PATH, location.toUri().toString());
+ FileStoreTable table = FileStoreTableFactory.create(fileIO,
options);
+ SortedMap<Snapshot, List<String>> tags = table.tagManager().tags();
Map<String, Snapshot> nameToSnapshot = new LinkedHashMap<>();
for (Map.Entry<Snapshot, List<String>> tag : tags.entrySet()) {
for (String tagName : tag.getValue()) {
nameToSnapshot.put(tagName, tag.getKey());
}
}
+ Map<String, List<String>> tagBranches = new HashMap<>();
+ table.branchManager()
+ .branches()
+ .forEach(
+ (branch, tag) ->
+ tagBranches
+ .computeIfAbsent(tag, key -> new
ArrayList<>())
+ .add(branch));
Iterator<InternalRow> rows =
- Iterators.transform(nameToSnapshot.entrySet().iterator(),
this::toRow);
+ Iterators.transform(
+ nameToSnapshot.entrySet().iterator(), tag ->
toRow(tag, tagBranches));
if (projection != null) {
rows =
Iterators.transform(
@@ -210,15 +229,18 @@ public class TagsTable implements ReadonlyTable {
return new IteratorRecordReader<>(rows);
}
- private InternalRow toRow(Map.Entry<String, Snapshot> tag) {
+ private InternalRow toRow(
+ Map.Entry<String, Snapshot> tag, Map<String, List<String>>
tagBranches) {
Snapshot snapshot = tag.getValue();
+ List<String> branches = tagBranches.get(tag.getKey());
return GenericRow.of(
BinaryString.fromString(tag.getKey()),
snapshot.id(),
snapshot.schemaId(),
Timestamp.fromLocalDateTime(
DateTimeUtils.toLocalDateTime(snapshot.timeMillis())),
- snapshot.totalRecordCount());
+ snapshot.totalRecordCount(),
+ BinaryString.fromString(branches == null ? "[]" :
branches.toString()));
}
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
index 5ed647ef4..9d14d5368 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
@@ -20,6 +20,7 @@ package org.apache.paimon.utils;
import org.apache.paimon.Snapshot;
import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.schema.SchemaManager;
@@ -27,7 +28,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Manager for {@code Branch}. */
@@ -133,4 +139,24 @@ public class BranchManager {
Path branchPath = branchPath(branchName);
return fileExists(branchPath);
}
+
+ /** Get branch->tag pair. */
+ public Map<String, String> branches() {
+ Map<String, String> branchTags = new HashMap<>();
+
+ try {
+ List<Path> paths =
+ listVersionedFileStatus(fileIO, branchDirectory(),
BRANCH_PREFIX)
+ .map(FileStatus::getPath)
+ .collect(Collectors.toList());
+ for (Path path : paths) {
+ String branchName =
path.getName().substring(BRANCH_PREFIX.length());
+ branchTags.put(branchName,
tagManager.branchTags(branchName).get(0));
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ return branchTags;
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index a29a3e151..96167b91d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -74,6 +74,24 @@ public class TagManager {
return new Path(getBranchPath(tablePath, branchName) + "/tag/" +
TAG_PREFIX + tagName);
}
+ public List<String> branchTags(String branchName) {
+ try {
+ List<Path> tagPaths =
+ listVersionedFileStatus(
+ fileIO,
+ new Path(getBranchPath(tablePath,
branchName) + "/tag/"),
+ TAG_PREFIX)
+ .map(FileStatus::getPath)
+ .collect(Collectors.toList());
+ checkArgument(tagPaths.size() > 0, "There should be at least one
tag in the branch.");
+ return tagPaths.stream()
+ .map(p -> p.getName().substring(TAG_PREFIX.length()))
+ .collect(Collectors.toList());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
/** Create a tag from given snapshot and save it in the storage. */
public void createTag(Snapshot snapshot, String tagName, List<TagCallback>
callbacks) {
checkArgument(!StringUtils.isBlank(tagName), "Tag name '%s' is
blank.", tagName);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java
index c6df41841..51cfcd702 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java
@@ -27,6 +27,7 @@ import org.apache.paimon.data.Timestamp;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.types.DataTypes;
@@ -38,20 +39,23 @@ import org.junit.jupiter.api.Test;
import java.time.LocalDateTime;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.function.Function;
import static org.assertj.core.api.Assertions.assertThat;
/** Unit tests for {@link TagsTable}. */
-public class TagsTableTest extends TableTestBase {
+class TagsTableTest extends TableTestBase {
private static final String tableName = "MyTable";
private TagsTable tagsTable;
private TagManager tagManager;
@BeforeEach
- public void before() throws Exception {
+ void before() throws Exception {
Identifier identifier = identifier(tableName);
Schema schema =
Schema.newBuilder()
@@ -82,13 +86,39 @@ public class TagsTableTest extends TableTestBase {
}
@Test
- public void testTagsTable() throws Exception {
- List<InternalRow> expectRow = getExceptedResult();
+ void testTagsTable() throws Exception {
+ List<InternalRow> expectRow =
+ getExceptedResult(
+ key -> {
+ return new ArrayList<>();
+ });
List<InternalRow> result = read(tagsTable);
assertThat(result).containsExactlyElementsOf(expectRow);
}
- private List<InternalRow> getExceptedResult() {
+ @Test
+ void testTagBranchesTable() throws Exception {
+ Table table = catalog.getTable(identifier(tableName));
+ table.createBranch("2023-07-17-branch1", "2023-07-17");
+ table.createBranch("2023-07-18-branch1", "2023-07-18");
+ table.createBranch("2023-07-18-branch2", "2023-07-18");
+ List<InternalRow> expectRow =
+ getExceptedResult(
+ tag -> {
+ if (tag.equals("2023-07-17")) {
+ return
Collections.singletonList("2023-07-17-branch1");
+ } else if (tag.equals("2023-07-18")) {
+ return Arrays.asList("2023-07-18-branch1",
"2023-07-18-branch2");
+ } else {
+ return new ArrayList<>();
+ }
+ });
+ List<InternalRow> result = read(tagsTable);
+ assertThat(result).containsExactlyElementsOf(expectRow);
+ }
+
+ private List<InternalRow> getExceptedResult(
+ Function<String, List<String>> tagBranchesFunction) {
List<InternalRow> internalRows = new ArrayList<>();
for (Map.Entry<Snapshot, List<String>> tag :
tagManager.tags().entrySet()) {
Snapshot snapshot = tag.getKey();
@@ -100,7 +130,9 @@ public class TagsTableTest extends TableTestBase {
snapshot.schemaId(),
Timestamp.fromLocalDateTime(
DateTimeUtils.toLocalDateTime(snapshot.timeMillis())),
- snapshot.totalRecordCount()));
+ snapshot.totalRecordCount(),
+ BinaryString.fromString(
+
tagBranchesFunction.apply(tagName).toString())));
}
}
return internalRows;