This is an automated email from the ASF dual-hosted git repository.
zjureel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d1f2acb06 [core][branch] Add branches system table (#2759)
d1f2acb06 is described below
commit d1f2acb06d654028368ee93663d45900386a44a4
Author: Fang Yong <[email protected]>
AuthorDate: Mon Feb 19 14:08:26 2024 +0800
[core][branch] Add branches system table (#2759)
---
.../java/org/apache/paimon/branch/TableBranch.java | 52 +++++++++++
.../system/{TagsTable.java => BranchesTable.java} | 100 ++++++++-------------
.../paimon/table/system/SystemTableLoader.java | 3 +
.../org/apache/paimon/table/system/TagsTable.java | 8 +-
.../org/apache/paimon/utils/BranchManager.java | 45 +++++++---
.../java/org/apache/paimon/utils/TagManager.java | 23 -----
.../paimon/table/system/BranchesTableTest.java | 93 +++++++++++++++++++
.../apache/paimon/table/system/TagsTableTest.java | 2 +-
8 files changed, 221 insertions(+), 105 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/branch/TableBranch.java
b/paimon-core/src/main/java/org/apache/paimon/branch/TableBranch.java
new file mode 100644
index 000000000..4b24c866f
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/branch/TableBranch.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.branch;
+
+/** {@link TableBranch} has branch relevant information for table. */
+public class TableBranch {
+ private final String branchName;
+ private final String createdFromTag;
+ private final Long createdFromSnapshot;
+
+ private final long createTime;
+
+ public TableBranch(
+ String branchName, String createdFromTag, Long
createdFromSnapshot, long createTime) {
+ this.branchName = branchName;
+ this.createdFromTag = createdFromTag;
+ this.createdFromSnapshot = createdFromSnapshot;
+ this.createTime = createTime;
+ }
+
+ public String getBranchName() {
+ return branchName;
+ }
+
+ public String getCreatedFromTag() {
+ return createdFromTag;
+ }
+
+ public Long getCreatedFromSnapshot() {
+ return createdFromSnapshot;
+ }
+
+ public long getCreateTime() {
+ return createTime;
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java
similarity index 59%
copy from
paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
copy to
paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java
index 28786dc4a..da568aaff 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java
@@ -18,8 +18,7 @@
package org.apache.paimon.table.system;
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.Snapshot;
+import org.apache.paimon.branch.TableBranch;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
@@ -27,7 +26,6 @@ import org.apache.paimon.data.Timestamp;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
-import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.FileStoreTable;
@@ -47,51 +45,46 @@ import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.IteratorRecordReader;
import org.apache.paimon.utils.ProjectedRow;
import org.apache.paimon.utils.SerializationUtils;
-import org.apache.paimon.utils.TagManager;
import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Iterator;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.SortedMap;
import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;
-/** A {@link Table} for showing tags of table. */
-public class TagsTable implements ReadonlyTable {
+/** A {@link Table} for showing branches of table. */
+public class BranchesTable implements ReadonlyTable {
private static final long serialVersionUID = 1L;
- public static final String TAGS = "tags";
+ public static final String BRANCHES = "branches";
public static final RowType TABLE_TYPE =
new RowType(
Arrays.asList(
- new DataField(0, "tag_name",
SerializationUtils.newStringType(false)),
- new DataField(1, "snapshot_id", new
BigIntType(false)),
- new DataField(2, "schema_id", new
BigIntType(false)),
- new DataField(3, "commit_time", new
TimestampType(false, 3)),
- new DataField(4, "record_count", new
BigIntType(true)),
- new DataField(5, "branches",
SerializationUtils.newStringType(true))));
+ new DataField(
+ 0, "branch_name",
SerializationUtils.newStringType(false)),
+ new DataField(
+ 1, "created_from_tag",
SerializationUtils.newStringType(false)),
+ new DataField(2, "created_from_snapshot", new
BigIntType(false)),
+ new DataField(3, "create_time", new
TimestampType(false, 3))));
private final FileIO fileIO;
private final Path location;
- public TagsTable(FileIO fileIO, Path location) {
+ public BranchesTable(FileIO fileIO, Path location) {
this.fileIO = fileIO;
this.location = location;
}
@Override
public String name() {
- return location.getName() + SYSTEM_TABLE_SPLITTER + TAGS;
+ return location.getName() + SYSTEM_TABLE_SPLITTER + BRANCHES;
}
@Override
@@ -101,25 +94,25 @@ public class TagsTable implements ReadonlyTable {
@Override
public List<String> primaryKeys() {
- return Collections.singletonList("tag_name");
+ return Arrays.asList("branch_name", "tag_name");
}
@Override
public InnerTableScan newScan() {
- return new TagsScan();
+ return new BranchesScan();
}
@Override
public InnerTableRead newRead() {
- return new TagsRead(fileIO);
+ return new BranchesRead(fileIO);
}
@Override
public Table copy(Map<String, String> dynamicOptions) {
- return new TagsTable(fileIO, location);
+ return new BranchesTable(fileIO, location);
}
- private class TagsScan extends ReadOnceTableScan {
+ private class BranchesScan extends ReadOnceTableScan {
@Override
public InnerTableScan withFilter(Predicate predicate) {
@@ -129,24 +122,25 @@ public class TagsTable implements ReadonlyTable {
@Override
public Plan innerPlan() {
- return () -> Collections.singletonList(new TagsSplit(fileIO,
location));
+ return () -> Collections.singletonList(new BranchesSplit(fileIO,
location));
}
}
- private static class TagsSplit implements Split {
+ private static class BranchesSplit implements Split {
private static final long serialVersionUID = 1L;
private final FileIO fileIO;
private final Path location;
- private TagsSplit(FileIO fileIO, Path location) {
+ private BranchesSplit(FileIO fileIO, Path location) {
this.fileIO = fileIO;
this.location = location;
}
@Override
public long rowCount() {
- return new TagManager(fileIO, location).tagCount();
+ FileStoreTable table = FileStoreTableFactory.create(fileIO,
location);
+ return table.branchManager().branchCount();
}
@Override
@@ -157,7 +151,7 @@ public class TagsTable implements ReadonlyTable {
if (o == null || getClass() != o.getClass()) {
return false;
}
- TagsSplit that = (TagsSplit) o;
+ BranchesSplit that = (BranchesSplit) o;
return Objects.equals(location, that.location);
}
@@ -167,12 +161,12 @@ public class TagsTable implements ReadonlyTable {
}
}
- private static class TagsRead implements InnerTableRead {
+ private static class BranchesRead implements InnerTableRead {
private final FileIO fileIO;
private int[][] projection;
- public TagsRead(FileIO fileIO) {
+ public BranchesRead(FileIO fileIO) {
this.fileIO = fileIO;
}
@@ -195,32 +189,13 @@ public class TagsTable implements ReadonlyTable {
@Override
public RecordReader<InternalRow> createReader(Split split) {
- if (!(split instanceof TagsSplit)) {
+ if (!(split instanceof BranchesSplit)) {
throw new IllegalArgumentException("Unsupported split: " +
split.getClass());
}
- Path location = ((TagsSplit) split).location;
- Options options = new Options();
- options.set(CoreOptions.PATH, location.toUri().toString());
- FileStoreTable table = FileStoreTableFactory.create(fileIO,
options);
- SortedMap<Snapshot, List<String>> tags = table.tagManager().tags();
- Map<String, Snapshot> nameToSnapshot = new LinkedHashMap<>();
- for (Map.Entry<Snapshot, List<String>> tag : tags.entrySet()) {
- for (String tagName : tag.getValue()) {
- nameToSnapshot.put(tagName, tag.getKey());
- }
- }
- Map<String, List<String>> tagBranches = new HashMap<>();
- table.branchManager()
- .branches()
- .forEach(
- (branch, tag) ->
- tagBranches
- .computeIfAbsent(tag, key -> new
ArrayList<>())
- .add(branch));
-
- Iterator<InternalRow> rows =
- Iterators.transform(
- nameToSnapshot.entrySet().iterator(), tag ->
toRow(tag, tagBranches));
+ Path location = ((BranchesSplit) split).location;
+ FileStoreTable table = FileStoreTableFactory.create(fileIO,
location);
+ List<TableBranch> branches = table.branchManager().branches();
+ Iterator<InternalRow> rows =
Iterators.transform(branches.iterator(), this::toRow);
if (projection != null) {
rows =
Iterators.transform(
@@ -229,18 +204,13 @@ public class TagsTable implements ReadonlyTable {
return new IteratorRecordReader<>(rows);
}
- private InternalRow toRow(
- Map.Entry<String, Snapshot> tag, Map<String, List<String>>
tagBranches) {
- Snapshot snapshot = tag.getValue();
- List<String> branches = tagBranches.get(tag.getKey());
+ private InternalRow toRow(TableBranch branch) {
return GenericRow.of(
- BinaryString.fromString(tag.getKey()),
- snapshot.id(),
- snapshot.schemaId(),
+ BinaryString.fromString(branch.getBranchName()),
+ BinaryString.fromString(branch.getCreatedFromTag()),
+ branch.getCreatedFromSnapshot(),
Timestamp.fromLocalDateTime(
-
DateTimeUtils.toLocalDateTime(snapshot.timeMillis())),
- snapshot.totalRecordCount(),
- BinaryString.fromString(branches == null ? "[]" :
branches.toString()));
+
DateTimeUtils.toLocalDateTime(branch.getCreateTime())));
}
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
index 71be3a582..32aa9fa35 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
@@ -36,6 +36,7 @@ import static
org.apache.paimon.options.CatalogOptions.LINEAGE_META;
import static
org.apache.paimon.table.system.AggregationFieldsTable.AGGREGATION;
import static
org.apache.paimon.table.system.AllTableOptionsTable.ALL_TABLE_OPTIONS;
import static org.apache.paimon.table.system.AuditLogTable.AUDIT_LOG;
+import static org.apache.paimon.table.system.BranchesTable.BRANCHES;
import static
org.apache.paimon.table.system.CatalogOptionsTable.CATALOG_OPTIONS;
import static org.apache.paimon.table.system.ConsumersTable.CONSUMERS;
import static org.apache.paimon.table.system.FilesTable.FILES;
@@ -73,6 +74,8 @@ public class SystemTableLoader {
return new FilesTable(dataTable);
case TAGS:
return new TagsTable(fileIO, location);
+ case BRANCHES:
+ return new BranchesTable(fileIO, location);
case CONSUMERS:
return new ConsumersTable(fileIO, location);
case READ_OPTIMIZED:
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
index 28786dc4a..8027da2f6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
@@ -213,10 +213,12 @@ public class TagsTable implements ReadonlyTable {
table.branchManager()
.branches()
.forEach(
- (branch, tag) ->
+ branch ->
tagBranches
- .computeIfAbsent(tag, key -> new
ArrayList<>())
- .add(branch));
+ .computeIfAbsent(
+ branch.getCreatedFromTag(),
+ key -> new ArrayList<>())
+ .add(branch.getBranchName()));
Iterator<InternalRow> rows =
Iterators.transform(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
index 2bf167fe2..6564bd4e5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
@@ -19,18 +19,20 @@
package org.apache.paimon.utils;
import org.apache.paimon.Snapshot;
+import org.apache.paimon.branch.TableBranch;
import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.HashMap;
+import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
+import java.util.SortedMap;
import java.util.stream.Collectors;
import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus;
@@ -146,23 +148,40 @@ public class BranchManager {
return fileExists(branchPath);
}
- /** Get branch->tag pair. */
- public Map<String, String> branches() {
- Map<String, String> branchTags = new HashMap<>();
+ /** Get branch count for the table. */
+ public long branchCount() {
+ try {
+ return listVersionedFileStatus(fileIO, branchDirectory(),
BRANCH_PREFIX).count();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ /** Get all branches for the table. */
+ public List<TableBranch> branches() {
try {
- List<Path> paths =
+ List<Pair<Path, Long>> paths =
listVersionedFileStatus(fileIO, branchDirectory(),
BRANCH_PREFIX)
- .map(FileStatus::getPath)
+ .map(status -> Pair.of(status.getPath(),
status.getModificationTime()))
.collect(Collectors.toList());
- for (Path path : paths) {
- String branchName =
path.getName().substring(BRANCH_PREFIX.length());
- branchTags.put(branchName,
tagManager.branchTags(branchName).get(0));
+ List<TableBranch> branches = new ArrayList<>();
+ for (Pair<Path, Long> path : paths) {
+ String branchName =
path.getLeft().getName().substring(BRANCH_PREFIX.length());
+ FileStoreTable branchTable =
+ FileStoreTableFactory.create(
+ fileIO, new Path(getBranchPath(tablePath,
branchName)));
+ SortedMap<Snapshot, List<String>> snapshotTags =
branchTable.tagManager().tags();
+ checkArgument(!snapshotTags.isEmpty());
+ Snapshot snapshot = snapshotTags.firstKey();
+ List<String> tags = snapshotTags.get(snapshot);
+ checkArgument(tags.size() == 1);
+ branches.add(
+ new TableBranch(branchName, tags.get(0),
snapshot.id(), path.getValue()));
}
+
+ return branches;
} catch (IOException e) {
throw new RuntimeException(e);
}
-
- return branchTags;
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index 3f9599431..a29a3e151 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -69,34 +69,11 @@ public class TagManager {
return new Path(tablePath + "/tag/" + TAG_PREFIX + tagName);
}
- /** Return the path of tag directory in branch. */
- public Path branchTagDirectory(String branchName) {
- return new Path(getBranchPath(tablePath, branchName) + "/tag");
- }
-
/** Return the path of a tag in branch. */
public Path branchTagPath(String branchName, String tagName) {
return new Path(getBranchPath(tablePath, branchName) + "/tag/" +
TAG_PREFIX + tagName);
}
- public List<String> branchTags(String branchName) {
- try {
- List<Path> tagPaths =
- listVersionedFileStatus(
- fileIO,
- new Path(getBranchPath(tablePath,
branchName) + "/tag/"),
- TAG_PREFIX)
- .map(FileStatus::getPath)
- .collect(Collectors.toList());
- checkArgument(tagPaths.size() > 0, "There should be at least one
tag in the branch.");
- return tagPaths.stream()
- .map(p -> p.getName().substring(TAG_PREFIX.length()))
- .collect(Collectors.toList());
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
/** Create a tag from given snapshot and save it in the storage. */
public void createTag(Snapshot snapshot, String tagName, List<TagCallback>
callbacks) {
checkArgument(!StringUtils.isBlank(tagName), "Tag name '%s' is
blank.", tagName);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/system/BranchesTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/system/BranchesTableTest.java
new file mode 100644
index 000000000..f1fbbe017
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/table/system/BranchesTableTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.system;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.LocalDateTime;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for {@link BranchesTable}. */
+class BranchesTableTest extends TableTestBase {
+ private static final String tableName = "MyTable";
+ private FileStoreTable table;
+ private BranchesTable branchesTable;
+
+ @BeforeEach
+ void before() throws Exception {
+ Identifier identifier = identifier(tableName);
+ Schema schema =
+ Schema.newBuilder()
+ .column("product_id", DataTypes.INT())
+ .column("price", DataTypes.INT())
+ .column("sales", DataTypes.INT())
+ .primaryKey("product_id")
+ .option("tag.automatic-creation", "watermark")
+ .option("tag.creation-period", "daily")
+ .option("tag.num-retained-max", "3")
+ .build();
+ catalog.createTable(identifier, schema, true);
+ table = (FileStoreTable) catalog.getTable(identifier);
+ TableCommitImpl commit =
table.newCommit(commitUser).ignoreEmptyCommit(false);
+ commit.commit(
+ new ManifestCommittable(
+ 0,
+
Timestamp.fromLocalDateTime(LocalDateTime.parse("2023-07-18T12:00:01"))
+ .getMillisecond()));
+ commit.commit(
+ new ManifestCommittable(
+ 1,
+
Timestamp.fromLocalDateTime(LocalDateTime.parse("2023-07-19T12:00:01"))
+ .getMillisecond()));
+ branchesTable = (BranchesTable) catalog.getTable(identifier(tableName
+ "$branches"));
+ }
+
+ @Test
+ void testEmptyBranches() throws Exception {
+ assertThat(read(branchesTable)).isEmpty();
+ }
+
+ @Test
+ void testBranches() throws Exception {
+ table.createBranch("my_branch1", "2023-07-17");
+ table.createBranch("my_branch2", "2023-07-18");
+ table.createBranch("my_branch3", "2023-07-18");
+ List<InternalRow> branches = read(branchesTable);
+ assertThat(branches.size()).isEqualTo(3);
+ assertThat(
+ branches.stream()
+ .map(v -> v.getString(0).toString())
+ .collect(Collectors.toList()))
+ .containsExactlyInAnyOrder("my_branch1", "my_branch2",
"my_branch3");
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java
index 51cfcd702..1f2b88047 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java
@@ -108,7 +108,7 @@ class TagsTableTest extends TableTestBase {
if (tag.equals("2023-07-17")) {
return
Collections.singletonList("2023-07-17-branch1");
} else if (tag.equals("2023-07-18")) {
- return Arrays.asList("2023-07-18-branch1",
"2023-07-18-branch2");
+ return Arrays.asList("2023-07-18-branch2",
"2023-07-18-branch1");
} else {
return new ArrayList<>();
}