This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3b751349f [core] Support branch function for system tables (#3815)
3b751349f is described below
commit 3b751349f1ca8930d9cb1b5eed9a70115054049c
Author: HeavenZH <[email protected]>
AuthorDate: Fri Aug 16 14:53:48 2024 +0800
[core] Support branch function for system tables (#3815)
This closes #3815.
---------
Co-authored-by: tsreaper <[email protected]>
---
.../table/system/AggregationFieldsTable.java | 16 ++-
.../apache/paimon/table/system/ConsumersTable.java | 16 ++-
.../apache/paimon/table/system/OptionsTable.java | 22 +--
.../apache/paimon/table/system/SchemasTable.java | 16 ++-
.../apache/paimon/table/system/SnapshotsTable.java | 19 ++-
.../org/apache/paimon/table/system/TagsTable.java | 16 ++-
.../table/system/AggregationFieldsTableTest.java | 52 ++++++-
.../paimon/table/system/AuditLogTableTest.java | 4 +-
.../table/system/CatalogOptionsTableTest.java | 4 +-
.../paimon/table/system/ConsumersTableTest.java | 4 +-
.../apache/paimon/table/system/FilesTableTest.java | 6 +-
.../paimon/table/system/ManifestsTableTest.java | 6 +-
.../paimon/table/system/OptionsTableTest.java | 50 ++++++-
.../paimon/table/system/SchemasTableTest.java | 4 +-
.../paimon/table/system/SnapshotsTableTest.java | 4 +-
.../apache/paimon/table/system/TagsTableTest.java | 4 +-
.../org/apache/paimon/flink/BranchSqlITCase.java | 157 +++++++++++++++++++++
17 files changed, 344 insertions(+), 56 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java
index 48d02c421..f5f4d401f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java
@@ -18,6 +18,7 @@
package org.apache.paimon.table.system;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
@@ -77,14 +78,19 @@ public class AggregationFieldsTable implements
ReadonlyTable {
private final FileIO fileIO;
private final Path location;
+ private final String branch;
public AggregationFieldsTable(FileStoreTable dataTable) {
- this(dataTable.fileIO(), dataTable.location());
+ this(
+ dataTable.fileIO(),
+ dataTable.location(),
+ CoreOptions.branch(dataTable.schema().options()));
}
- public AggregationFieldsTable(FileIO fileIO, Path location) {
+ public AggregationFieldsTable(FileIO fileIO, Path location, String
branchName) {
this.fileIO = fileIO;
this.location = location;
+ this.branch = branchName;
}
@Override
@@ -114,7 +120,7 @@ public class AggregationFieldsTable implements
ReadonlyTable {
@Override
public Table copy(Map<String, String> dynamicOptions) {
- return new AggregationFieldsTable(fileIO, location);
+ return new AggregationFieldsTable(fileIO, location, branch);
}
private class SchemasScan extends ReadOnceTableScan {
@@ -160,7 +166,7 @@ public class AggregationFieldsTable implements
ReadonlyTable {
}
/** {@link TableRead} implementation for {@link AggregationFieldsTable}. */
- private static class SchemasRead implements InnerTableRead {
+ private class SchemasRead implements InnerTableRead {
private final FileIO fileIO;
private int[][] projection;
@@ -191,7 +197,7 @@ public class AggregationFieldsTable implements
ReadonlyTable {
throw new IllegalArgumentException("Unsupported split: " +
split.getClass());
}
Path location = ((AggregationSplit) split).location;
- TableSchema schemas = new SchemaManager(fileIO,
location).latest().get();
+ TableSchema schemas = new SchemaManager(fileIO, location,
branch).latest().get();
Iterator<InternalRow> rows = createInternalRowIterator(schemas);
if (projection != null) {
rows =
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java
index e40f087b4..11db77303 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java
@@ -18,6 +18,7 @@
package org.apache.paimon.table.system;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
@@ -71,14 +72,19 @@ public class ConsumersTable implements ReadonlyTable {
private final FileIO fileIO;
private final Path location;
+ private final String branch;
public ConsumersTable(FileStoreTable dataTable) {
- this(dataTable.fileIO(), dataTable.location());
+ this(
+ dataTable.fileIO(),
+ dataTable.location(),
+ CoreOptions.branch(dataTable.schema().options()));
}
- public ConsumersTable(FileIO fileIO, Path location) {
+ public ConsumersTable(FileIO fileIO, Path location, String branchName) {
this.fileIO = fileIO;
this.location = location;
+ this.branch = branchName;
}
@Override
@@ -108,7 +114,7 @@ public class ConsumersTable implements ReadonlyTable {
@Override
public Table copy(Map<String, String> dynamicOptions) {
- return new ConsumersTable(fileIO, location);
+ return new ConsumersTable(fileIO, location, branch);
}
private class ConsumersScan extends ReadOnceTableScan {
@@ -154,7 +160,7 @@ public class ConsumersTable implements ReadonlyTable {
}
/** {@link TableRead} implementation for {@link ConsumersTable}. */
- private static class ConsumersRead implements InnerTableRead {
+ private class ConsumersRead implements InnerTableRead {
private final FileIO fileIO;
private int[][] projection;
@@ -185,7 +191,7 @@ public class ConsumersTable implements ReadonlyTable {
throw new IllegalArgumentException("Unsupported split: " +
split.getClass());
}
Path location = ((ConsumersTable.ConsumersSplit) split).location;
- Map<String, Long> consumers = new ConsumerManager(fileIO,
location).consumers();
+ Map<String, Long> consumers = new ConsumerManager(fileIO,
location, branch).consumers();
Iterator<InternalRow> rows =
Iterators.transform(consumers.entrySet().iterator(),
this::toRow);
if (projection != null) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java
index d6eda6959..0874a8104 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java
@@ -18,6 +18,7 @@
package org.apache.paimon.table.system;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
@@ -57,7 +58,7 @@ import static
org.apache.paimon.utils.SerializationUtils.newStringType;
/** A {@link Table} for showing options of table. */
public class OptionsTable implements ReadonlyTable {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
public static final String OPTIONS = "options";
@@ -69,14 +70,19 @@ public class OptionsTable implements ReadonlyTable {
private final FileIO fileIO;
private final Path location;
+ private final String branch;
public OptionsTable(FileStoreTable dataTable) {
- this(dataTable.fileIO(), dataTable.location());
+ this(
+ dataTable.fileIO(),
+ dataTable.location(),
+ CoreOptions.branch(dataTable.schema().options()));
}
- public OptionsTable(FileIO fileIO, Path location) {
+ public OptionsTable(FileIO fileIO, Path location, String branchName) {
this.fileIO = fileIO;
this.location = location;
+ this.branch = branchName;
}
@Override
@@ -106,7 +112,7 @@ public class OptionsTable implements ReadonlyTable {
@Override
public Table copy(Map<String, String> dynamicOptions) {
- return new OptionsTable(fileIO, location);
+ return new OptionsTable(fileIO, location, branch);
}
private class OptionsScan extends ReadOnceTableScan {
@@ -150,7 +156,7 @@ public class OptionsTable implements ReadonlyTable {
}
}
- private static class OptionsRead implements InnerTableRead {
+ private class OptionsRead implements InnerTableRead {
private final FileIO fileIO;
private int[][] projection;
@@ -183,7 +189,7 @@ public class OptionsTable implements ReadonlyTable {
Path location = ((OptionsSplit) split).location;
Iterator<InternalRow> rows =
Iterators.transform(
- options(fileIO, location).entrySet().iterator(),
this::toRow);
+ options(fileIO, location,
branch).entrySet().iterator(), this::toRow);
if (projection != null) {
rows =
Iterators.transform(
@@ -199,8 +205,8 @@ public class OptionsTable implements ReadonlyTable {
}
}
- private static Map<String, String> options(FileIO fileIO, Path location) {
- return new SchemaManager(fileIO, location)
+ private static Map<String, String> options(FileIO fileIO, Path location,
String branchName) {
+ return new SchemaManager(fileIO, location, branchName)
.latest()
.orElseThrow(() -> new RuntimeException("Table not exists."))
.options();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
index b5f2bf4d5..10313a1c7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
@@ -18,6 +18,7 @@
package org.apache.paimon.table.system;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
@@ -83,14 +84,19 @@ public class SchemasTable implements ReadonlyTable {
private final FileIO fileIO;
private final Path location;
+ private final String branch;
public SchemasTable(FileStoreTable dataTable) {
- this(dataTable.fileIO(), dataTable.location());
+ this(
+ dataTable.fileIO(),
+ dataTable.location(),
+ CoreOptions.branch(dataTable.schema().options()));
}
- public SchemasTable(FileIO fileIO, Path location) {
+ public SchemasTable(FileIO fileIO, Path location, String branchName) {
this.fileIO = fileIO;
this.location = location;
+ this.branch = branchName;
}
@Override
@@ -120,7 +126,7 @@ public class SchemasTable implements ReadonlyTable {
@Override
public Table copy(Map<String, String> dynamicOptions) {
- return new SchemasTable(fileIO, location);
+ return new SchemasTable(fileIO, location, branch);
}
private class SchemasScan extends ReadOnceTableScan {
@@ -165,7 +171,7 @@ public class SchemasTable implements ReadonlyTable {
}
/** {@link TableRead} implementation for {@link SchemasTable}. */
- private static class SchemasRead implements InnerTableRead {
+ private class SchemasRead implements InnerTableRead {
private final FileIO fileIO;
private int[][] projection;
@@ -197,7 +203,7 @@ public class SchemasTable implements ReadonlyTable {
}
Path location = ((SchemasSplit) split).location;
Iterator<TableSchema> schemas =
- new SchemaManager(fileIO, location).listAll().iterator();
+ new SchemaManager(fileIO, location,
branch).listAll().iterator();
Iterator<InternalRow> rows = Iterators.transform(schemas,
this::toRow);
if (projection != null) {
rows =
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
index 90268bd77..50def71f1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
@@ -18,6 +18,7 @@
package org.apache.paimon.table.system;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
@@ -75,7 +76,6 @@ public class SnapshotsTable implements ReadonlyTable {
private static final long serialVersionUID = 1L;
public static final String SNAPSHOTS = "snapshots";
-
public static final RowType TABLE_TYPE =
new RowType(
Arrays.asList(
@@ -106,17 +106,24 @@ public class SnapshotsTable implements ReadonlyTable {
private final FileIO fileIO;
private final Path location;
+ private final String branch;
private final FileStoreTable dataTable;
public SnapshotsTable(FileStoreTable dataTable) {
- this(dataTable.fileIO(), dataTable.location(), dataTable);
+ this(
+ dataTable.fileIO(),
+ dataTable.location(),
+ dataTable,
+ CoreOptions.branch(dataTable.schema().options()));
}
- public SnapshotsTable(FileIO fileIO, Path location, FileStoreTable
dataTable) {
+ public SnapshotsTable(
+ FileIO fileIO, Path location, FileStoreTable dataTable, String
branchName) {
this.fileIO = fileIO;
this.location = location;
this.dataTable = dataTable;
+ this.branch = branchName;
}
@Override
@@ -146,7 +153,7 @@ public class SnapshotsTable implements ReadonlyTable {
@Override
public Table copy(Map<String, String> dynamicOptions) {
- return new SnapshotsTable(fileIO, location,
dataTable.copy(dynamicOptions));
+ return new SnapshotsTable(fileIO, location,
dataTable.copy(dynamicOptions), branch);
}
private class SnapshotsScan extends ReadOnceTableScan {
@@ -191,7 +198,7 @@ public class SnapshotsTable implements ReadonlyTable {
}
}
- private static class SnapshotsRead implements InnerTableRead {
+ private class SnapshotsRead implements InnerTableRead {
private final FileIO fileIO;
private int[][] projection;
@@ -259,7 +266,7 @@ public class SnapshotsTable implements ReadonlyTable {
throw new IllegalArgumentException("Unsupported split: " +
split.getClass());
}
SnapshotManager snapshotManager =
- new SnapshotManager(fileIO, ((SnapshotsSplit)
split).location);
+ new SnapshotManager(fileIO, ((SnapshotsSplit)
split).location, branch);
Iterator<Snapshot> snapshots =
snapshotManager.snapshotsWithinRange(
optionalFilterSnapshotIdMax,
optionalFilterSnapshotIdMin);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
index 9fc13c65d..d92876e4c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
@@ -18,6 +18,7 @@
package org.apache.paimon.table.system;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
@@ -82,14 +83,19 @@ public class TagsTable implements ReadonlyTable {
private final FileIO fileIO;
private final Path location;
+ private final String branch;
public TagsTable(FileStoreTable dataTable) {
- this(dataTable.fileIO(), dataTable.location());
+ this(
+ dataTable.fileIO(),
+ dataTable.location(),
+ CoreOptions.branch(dataTable.schema().options()));
}
- public TagsTable(FileIO fileIO, Path location) {
+ public TagsTable(FileIO fileIO, Path location, String branchName) {
this.fileIO = fileIO;
this.location = location;
+ this.branch = branchName;
}
@Override
@@ -119,7 +125,7 @@ public class TagsTable implements ReadonlyTable {
@Override
public Table copy(Map<String, String> dynamicOptions) {
- return new TagsTable(fileIO, location);
+ return new TagsTable(fileIO, location, branch);
}
private class TagsScan extends ReadOnceTableScan {
@@ -164,7 +170,7 @@ public class TagsTable implements ReadonlyTable {
}
}
- private static class TagsRead implements InnerTableRead {
+ private class TagsRead implements InnerTableRead {
private final FileIO fileIO;
private int[][] projection;
@@ -196,7 +202,7 @@ public class TagsTable implements ReadonlyTable {
throw new IllegalArgumentException("Unsupported split: " +
split.getClass());
}
Path location = ((TagsSplit) split).location;
- List<Pair<Tag, String>> tags = new TagManager(fileIO,
location).tagObjects();
+ List<Pair<Tag, String>> tags = new TagManager(fileIO, location,
branch).tagObjects();
Map<String, Tag> nameToSnapshot = new LinkedHashMap<>();
for (Pair<Tag, String> tag : tags) {
nameToSnapshot.put(tag.getValue(), tag.getKey());
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/system/AggregationFieldsTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/system/AggregationFieldsTableTest.java
index e256d5ce2..0cb6ef093 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/system/AggregationFieldsTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/system/AggregationFieldsTableTest.java
@@ -27,9 +27,12 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.shade.guava30.com.google.common.collect.Multimap;
@@ -40,6 +43,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import static org.apache.paimon.catalog.Catalog.SYSTEM_BRANCH_PREFIX;
+import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;
import static
org.apache.paimon.table.system.AggregationFieldsTable.extractFieldMultimap;
import static org.assertj.core.api.Assertions.assertThat;
@@ -76,12 +81,55 @@ public class AggregationFieldsTableTest extends
TableTestBase {
@Test
public void testAggregationFieldsRecord() throws Exception {
- List<InternalRow> expectRow = getExceptedResult();
+ List<InternalRow> expectRow = getExpectedResult();
List<InternalRow> result = read(aggregationFieldsTable);
assertThat(result).containsExactlyElementsOf(expectRow);
}
- private List<InternalRow> getExceptedResult() {
+ @Test
+ public void testBranchAggregationFieldsRecord() throws Exception {
+ FileStoreTable table = (FileStoreTable)
catalog.getTable(identifier(tableName));
+ table.createBranch("b1");
+ // verify that branch file exist
+ BranchManager branchManager = table.branchManager();
+ assertThat(branchManager.branchExists("b1")).isTrue();
+
+ SchemaManager schemaManagerBranch = schemaManager.copyWithBranch("b1");
+ SchemaUtils.forceCommit(
+ schemaManagerBranch,
+ Schema.newBuilder()
+ .column("product_id", DataTypes.INT())
+ .column("price", DataTypes.INT())
+ .column("sales", DataTypes.INT())
+ .primaryKey("product_id")
+ .option("merge-engine", "aggregation")
+ .option("fields.price.aggregate-function", "sum")
+ .option("fields.sales.aggregate-function", "sum")
+ .option("fields.sales.ignore-retract", "false")
+ .build());
+ AggregationFieldsTable branchAggregationFieldsTable =
+ (AggregationFieldsTable)
+ catalog.getTable(
+ identifier(
+ tableName
+ + SYSTEM_TABLE_SPLITTER
+ + SYSTEM_BRANCH_PREFIX
+ + "b1"
+ + "$aggregation_fields"));
+ List<InternalRow> expectRow = getExpectedResult();
+ List<InternalRow> result = read(aggregationFieldsTable);
+ assertThat(result).containsExactlyElementsOf(expectRow);
+
+ expectRow = getExpectedResult(schemaManagerBranch);
+ result = read(branchAggregationFieldsTable);
+ assertThat(result).containsExactlyElementsOf(expectRow);
+ }
+
+ private List<InternalRow> getExpectedResult() {
+ return getExpectedResult(schemaManager);
+ }
+
+ private List<InternalRow> getExpectedResult(SchemaManager schemaManager) {
TableSchema schema = schemaManager.latest().get();
Multimap<String, String> function =
extractFieldMultimap(schema.options(), Map.Entry::getValue);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/system/AuditLogTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/system/AuditLogTableTest.java
index 77d2cafe7..ebefa2af6 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/system/AuditLogTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/system/AuditLogTableTest.java
@@ -85,12 +85,12 @@ public class AuditLogTableTest extends TableTestBase {
@Test
public void testReadAuditLogFromLatest() throws Exception {
- List<InternalRow> expectRow = getExceptedResult();
+ List<InternalRow> expectRow = getExpectedResult();
List<InternalRow> result = read(auditLogTable);
assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
}
- private List<InternalRow> getExceptedResult() {
+ private List<InternalRow> getExpectedResult() {
List<InternalRow> expectedRow = new ArrayList<>();
expectedRow.add(
GenericRow.of(BinaryString.fromString(RowKind.DELETE.shortString()), 1, 1, 1));
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/system/CatalogOptionsTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/system/CatalogOptionsTableTest.java
index 9ee1a63cb..b40277db3 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/system/CatalogOptionsTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/system/CatalogOptionsTableTest.java
@@ -65,12 +65,12 @@ public class CatalogOptionsTableTest extends TableTestBase {
@Test
public void testCatalogOptionsTable() throws Exception {
- List<InternalRow> expectRow = getExceptedResult();
+ List<InternalRow> expectRow = getExpectedResult();
List<InternalRow> result = read(catalogOptionsTable);
assertThat(result).containsExactlyElementsOf(expectRow);
}
- private List<InternalRow> getExceptedResult() {
+ private List<InternalRow> getExpectedResult() {
List<InternalRow> expectedRow = new ArrayList<>();
for (Map.Entry<String, String> option :
catalogOptions.toMap().entrySet()) {
expectedRow.add(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/system/ConsumersTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/system/ConsumersTableTest.java
index 0183eb800..6412db9df 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/system/ConsumersTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/system/ConsumersTableTest.java
@@ -70,12 +70,12 @@ public class ConsumersTableTest extends TableTestBase {
@Test
public void testPartitionRecordCount() throws Exception {
- List<InternalRow> expectRow = getExceptedResult();
+ List<InternalRow> expectRow = getExpectedResult();
List<InternalRow> result = read(consumersTable);
assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
}
- private List<InternalRow> getExceptedResult() throws IOException {
+ private List<InternalRow> getExpectedResult() throws IOException {
Map<String, Long> consumers = manager.consumers();
return consumers.entrySet().stream()
.map(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java
index e26e3ba2a..0d7a8b497 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java
@@ -136,7 +136,7 @@ public class FilesTableTest extends TableTestBase {
@Test
public void testReadFilesFromLatest() throws Exception {
- List<InternalRow> expectedRow = getExceptedResult(2L);
+ List<InternalRow> expectedRow = getExpectedResult(2L);
List<InternalRow> result = read(filesTable);
assertThat(result).containsExactlyInAnyOrderElementsOf(expectedRow);
}
@@ -149,7 +149,7 @@ public class FilesTableTest extends TableTestBase {
@Test
public void testReadFilesFromSpecifiedSnapshot() throws Exception {
- List<InternalRow> expectedRow = getExceptedResult(1L);
+ List<InternalRow> expectedRow = getExpectedResult(1L);
filesTable =
(FilesTable)
filesTable.copy(
@@ -169,7 +169,7 @@ public class FilesTableTest extends TableTestBase {
.satisfies(anyCauseMatches(IllegalArgumentException.class));
}
- private List<InternalRow> getExceptedResult(long snapshotId) {
+ private List<InternalRow> getExpectedResult(long snapshotId) {
if (!snapshotManager.snapshotExists(snapshotId)) {
return Collections.emptyList();
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
index 970732e83..ffd4716e7 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
@@ -96,14 +96,14 @@ public class ManifestsTableTest extends TableTestBase {
@Test
public void testReadManifestsFromLatest() throws Exception {
- List<InternalRow> expectedRow = getExceptedResult(2L);
+ List<InternalRow> expectedRow = getExpectedResult(2L);
List<InternalRow> result = read(manifestsTable);
assertThat(result).containsExactlyElementsOf(expectedRow);
}
@Test
public void testReadManifestsFromSpecifiedSnapshot() throws Exception {
- List<InternalRow> expectedRow = getExceptedResult(1L);
+ List<InternalRow> expectedRow = getExpectedResult(1L);
manifestsTable =
(ManifestsTable)
manifestsTable.copy(
@@ -122,7 +122,7 @@ public class ManifestsTableTest extends TableTestBase {
assertThat(result).isEmpty();
}
- private List<InternalRow> getExceptedResult(long snapshotId) {
+ private List<InternalRow> getExpectedResult(long snapshotId) {
if (!snapshotManager.snapshotExists(snapshotId)) {
return Collections.emptyList();
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/system/OptionsTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/system/OptionsTableTest.java
index 31b54e83e..ce7f3a9b7 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/system/OptionsTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/system/OptionsTableTest.java
@@ -28,16 +28,22 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.BranchManager;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static org.apache.paimon.catalog.Catalog.SYSTEM_BRANCH_PREFIX;
+import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;
import static org.assertj.core.api.Assertions.assertThat;
/** Unit tests for {@link OptionsTableTest}. */
@@ -70,12 +76,52 @@ public class OptionsTableTest extends TableTestBase {
@Test
public void testOptionsTable() throws Exception {
- List<InternalRow> expectRow = getExceptedResult();
+ List<InternalRow> expectRow = getExpectedResult();
List<InternalRow> result = read(optionsTable);
assertThat(result).containsExactlyElementsOf(expectRow);
}
- private List<InternalRow> getExceptedResult() {
+ @Test
+ public void testBranchOptionsTable() throws Exception {
+ FileStoreTable table = (FileStoreTable)
catalog.getTable(identifier(tableName));
+ table.createBranch("b1");
+ // verify that branch file exist
+ BranchManager branchManager = table.branchManager();
+ assertThat(branchManager.branchExists("b1")).isTrue();
+ SchemaManager schemaManagerBranch = schemaManager.copyWithBranch("b1");
+ Map<String, String> newOptions = new HashMap<>();
+ newOptions.put(CoreOptions.IGNORE_DELETE.key(), "true");
+ SchemaUtils.forceCommit(
+ schemaManagerBranch,
+ Schema.newBuilder()
+ .column("product_id", DataTypes.INT())
+ .column("price", DataTypes.INT())
+ .column("sales", DataTypes.INT())
+ .primaryKey("product_id")
+ .option(CoreOptions.MERGE_ENGINE.key(), "deduplicate")
+ .option(CoreOptions.TAG_AUTOMATIC_CREATION.key(),
"watermark")
+ .option(CoreOptions.TAG_CREATION_PERIOD.key(), "daily")
+ .option(CoreOptions.CONSUMER_ID.key(), "id0")
+ .build());
+ OptionsTable branchOptionsTable =
+ (OptionsTable)
+ catalog.getTable(
+ identifier(
+ tableName
+ + SYSTEM_TABLE_SPLITTER
+ + SYSTEM_BRANCH_PREFIX
+ + "b1"
+ + "$options"));
+ List<InternalRow> expectRow = getExpectedResult(schemaManagerBranch);
+ List<InternalRow> result = read(branchOptionsTable);
+ assertThat(result).containsExactlyElementsOf(expectRow);
+ }
+
+ private List<InternalRow> getExpectedResult() {
+ return getExpectedResult(schemaManager);
+ }
+
+ private List<InternalRow> getExpectedResult(SchemaManager schemaManager) {
Map<String, String> options =
schemaManager
.latest()
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/system/SchemasTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/system/SchemasTableTest.java
index a10b2e1d7..c7c0ca5cf 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/system/SchemasTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/system/SchemasTableTest.java
@@ -73,12 +73,12 @@ public class SchemasTableTest extends TableTestBase {
@Test
public void testSchemasTable() throws Exception {
- List<InternalRow> expectRow = getExceptedResult();
+ List<InternalRow> expectRow = getExpectedResult();
List<InternalRow> result = read(schemasTable);
assertThat(result).containsExactlyElementsOf(expectRow);
}
- private List<InternalRow> getExceptedResult() {
+ private List<InternalRow> getExpectedResult() {
List<TableSchema> tableSchemas = schemaManager.listAll();
List<InternalRow> expectedRow = new ArrayList<>();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/system/SnapshotsTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/system/SnapshotsTableTest.java
index fa89d3fca..44381c124 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/system/SnapshotsTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/system/SnapshotsTableTest.java
@@ -90,12 +90,12 @@ public class SnapshotsTableTest extends TableTestBase {
@Test
public void testReadSnapshotsFromLatest() throws Exception {
- List<InternalRow> expectedRow = getExceptedResult(new long[] {1, 2});
+ List<InternalRow> expectedRow = getExpectedResult(new long[] {1, 2});
List<InternalRow> result = read(snapshotsTable);
assertThat(result).containsExactlyInAnyOrderElementsOf(expectedRow);
}
- private List<InternalRow> getExceptedResult(long[] snapshotIds) {
+ private List<InternalRow> getExpectedResult(long[] snapshotIds) {
List<InternalRow> expectedRow = new ArrayList<>();
for (long snapshotId : snapshotIds) {
Snapshot snapshot = snapshotManager.snapshot(snapshotId);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java
index 99c38df01..8f8029cde 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java
@@ -83,12 +83,12 @@ class TagsTableTest extends TableTestBase {
@Test
void testTagsTable() throws Exception {
- List<InternalRow> expectRow = getExceptedResult();
+ List<InternalRow> expectRow = getExpectedResult();
List<InternalRow> result = read(tagsTable);
assertThat(result).containsExactlyElementsOf(expectRow);
}
- private List<InternalRow> getExceptedResult() {
+ private List<InternalRow> getExpectedResult() {
List<InternalRow> internalRows = new ArrayList<>();
for (Pair<Tag, String> snapshot : tagManager.tagObjects()) {
Tag tag = snapshot.getKey();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
index ba197abfa..d50f36ccd 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
@@ -19,8 +19,10 @@
package org.apache.paimon.flink;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.BlockingIterator;
import org.apache.paimon.utils.SnapshotManager;
+import org.apache.commons.lang3.StringUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.junit.jupiter.api.Test;
@@ -346,6 +348,161 @@ public class BranchSqlITCase extends CatalogITCaseBase {
.hasMessageContaining("Branch main and pk does not have the
same row type");
}
+ @Test
+ public void testBranchOptionsTable() throws Exception {
+ sql(
+ "CREATE TABLE t ( pt INT, k INT, v STRING, PRIMARY KEY (pt, k)
NOT ENFORCED ) "
+ + "PARTITIONED BY (pt) WITH ( 'bucket' = '2' )");
+
+ sql("CALL sys.create_branch('default.t', 'test')");
+
+ sql("ALTER TABLE t SET ('snapshot.time-retained' = '5 h')");
+ sql("ALTER TABLE t$branch_test SET ('snapshot.time-retained' = '1
h')");
+
+ assertThat(collectResult("SELECT * FROM t$options"))
+ .containsExactlyInAnyOrder(
+ "+I[bucket, 2]",
+ "+I[snapshot.time-retained, 5 h]",
+ "+I[scan.infer-parallelism, false]");
+ assertThat(collectResult("SELECT * FROM t$branch_test$options"))
+ .containsExactlyInAnyOrder(
+ "+I[bucket, 2]",
+ "+I[snapshot.time-retained, 1 h]",
+ "+I[scan.infer-parallelism, false]");
+ }
+
+ @Test
+ public void testBranchSchemasTable() throws Exception {
+ sql("CREATE TABLE t (a INT, b INT)");
+ sql("INSERT INTO t VALUES (1, 2)");
+ sql("CALL sys.create_branch('default.t', 'b1')");
+ assertThat(collectResult("SELECT schema_id FROM t$branch_b1$schemas
order by schema_id"))
+ .containsExactlyInAnyOrder("+I[0]");
+
+ sql("ALTER TABLE t$branch_b1 SET ('snapshot.time-retained' = '5 h')");
+ assertThat(collectResult("SELECT schema_id FROM t$branch_b1$schemas
order by schema_id"))
+ .containsExactlyInAnyOrder("+I[0]", "+I[1]");
+ }
+
+ @Test
+ public void testBranchAuditLogTable() throws Exception {
+ sql("CREATE TABLE t (a INT, b INT)");
+ sql("INSERT INTO t VALUES (1, 2)");
+ assertThat(collectResult("SELECT * FROM t$audit_log"))
+ .containsExactlyInAnyOrder("+I[+I, 1, 2]");
+
+ sql("CALL sys.create_branch('default.t', 'b1')");
+ sql("INSERT INTO t$branch_b1 VALUES (3, 4)");
+ assertThat(collectResult("SELECT * FROM t$branch_b1$audit_log"))
+ .containsExactlyInAnyOrder("+I[+I, 3, 4]");
+ }
+
+ @Test
+ public void testBranchReadOptimizedTable() throws Exception {
+ sql("CREATE TABLE t (a INT, b INT)");
+ sql("INSERT INTO t VALUES (1, 2)");
+ assertThat(collectResult("SELECT * FROM
t$ro")).containsExactlyInAnyOrder("+I[1, 2]");
+
+ sql("CALL sys.create_branch('default.t', 'b1')");
+ sql("INSERT INTO t$branch_b1 VALUES (3, 4)");
+ assertThat(collectResult("SELECT * FROM t$branch_b1$ro"))
+ .containsExactlyInAnyOrder("+I[3, 4]");
+ }
+
+ @Test
+ public void testBranchFilesTable() throws Exception {
+ sql("CREATE TABLE t (a INT, b INT)");
+ sql("INSERT INTO t VALUES (1, 2)");
+
+ sql("CALL sys.create_branch('default.t', 'b1')");
+ sql("INSERT INTO t$branch_b1 VALUES (3, 4)");
+ sql("INSERT INTO t$branch_b1 VALUES (5, 6)");
+
+ assertThat(collectResult("SELECT min_value_stats FROM t$files"))
+ .containsExactlyInAnyOrder("+I[{a=1, b=2}]");
+ assertThat(collectResult("SELECT min_value_stats FROM
t$branch_b1$files"))
+ .containsExactlyInAnyOrder("+I[{a=3, b=4}]", "+I[{a=5, b=6}]");
+ }
+
+ @Test
+ public void testBranchTagsTable() throws Exception {
+ sql("CREATE TABLE t (a INT, b INT)");
+ sql("INSERT INTO t VALUES (1, 2)");
+ paimonTable("t").createTag("tag1", 1);
+
+ sql("CALL sys.create_branch('default.t', 'b1','tag1')");
+ sql("INSERT INTO t$branch_b1 VALUES (3, 4)");
+ paimonTable("t$branch_b1").createTag("tag2", 2);
+
+ assertThat(collectResult("SELECT tag_name,snapshot_id,record_count
FROM t$tags"))
+ .containsExactlyInAnyOrder("+I[tag1, 1, 1]");
+ assertThat(collectResult("SELECT tag_name,snapshot_id,record_count
FROM t$branch_b1$tags"))
+ .containsExactlyInAnyOrder("+I[tag1, 1, 1]", "+I[tag2, 2, 2]");
+ }
+
+ @Test
+ public void testBranchConsumersTable() throws Exception {
+ sql("CREATE TABLE t (a INT, b INT)");
+ sql("INSERT INTO t VALUES (1, 2), (3,4)");
+
+ sql("CALL sys.create_branch('default.t', 'b1')");
+ BlockingIterator<Row, Row> iterator =
+ BlockingIterator.of(
+ streamSqlIter(
+ "SELECT * FROM t$branch_b1 /*+
OPTIONS('consumer-id'='id1','consumer.expiration-time'='3h') */"));
+ sql("INSERT INTO t$branch_b1 VALUES (5, 6), (7, 8)");
+ assertThat(iterator.collect(2)).containsExactlyInAnyOrder(Row.of(5,
6), Row.of(7, 8));
+ iterator.close();
+
+ assertThat(collectResult("SELECT * FROM t$consumers")).isEmpty();
+ assertThat(collectResult("SELECT * FROM t$branch_b1$consumers"))
+ .containsExactlyInAnyOrder("+I[id1, 2]");
+ }
+
+ @Test
+ public void testBranchManifestsTable() {
+ sql("CREATE TABLE t (a INT, b INT)");
+ sql("INSERT INTO t VALUES (1, 2)");
+
+ sql("CALL sys.create_branch('default.t', 'b1')");
+ sql("INSERT INTO t$branch_b1 VALUES (3, 4)");
+ sql("INSERT INTO t$branch_b1 VALUES (5, 6)");
+
+ List<Row> res = sql("SELECT schema_id, file_name, file_size FROM
t$manifests");
+ assertThat(res).hasSize(1);
+
+ res = sql("SELECT schema_id, file_name, file_size FROM
t$branch_b1$manifests");
+ assertThat(res).hasSize(2);
+ res.forEach(
+ row -> {
+ assertThat((long) row.getField(0)).isEqualTo(0L);
+ assertThat(StringUtils.startsWith((String)
row.getField(1), "manifest"))
+ .isTrue();
+ assertThat((long) row.getField(2)).isGreaterThan(0L);
+ });
+ }
+
+ @Test
+ public void testBranchPartitionsTable() throws Exception {
+ sql("CREATE TABLE t (a INT, b INT,c STRING) PARTITIONED BY (a)");
+ assertThat(sql("SELECT * FROM t$partitions")).isEmpty();
+
+ sql("INSERT INTO t VALUES (1, 2, 'x')");
+ sql("INSERT INTO t VALUES (1, 4, 'S2'), (2, 2, 'S1'), (2, 2, 'S1')");
+ sql("INSERT INTO t VALUES (1, 4, 'S3'), (2, 2, 'S4')");
+
+ sql("CALL sys.create_branch('default.t', 'b1')");
+ sql("INSERT INTO t$branch_b1 VALUES (1, 4, 'S2'), (2, 2, 'S1'), (2, 2,
'S5')");
+ sql("INSERT INTO t$branch_b1 VALUES (1, 4, 'S3'), (2, 2, 'S4')");
+
+ assertThat(collectResult("SELECT `partition`, record_count, file_count
FROM t$partitions"))
+ .containsExactlyInAnyOrder("+I[[1], 3, 3]", "+I[[2], 3, 2]");
+ assertThat(
+ collectResult(
+ "SELECT `partition`, record_count, file_count
FROM t$branch_b1$partitions"))
+ .containsExactlyInAnyOrder("+I[[1], 2, 2]", "+I[[2], 3, 2]");
+ }
+
private List<String> collectResult(String sql) throws Exception {
List<String> result = new ArrayList<>();
try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) {