This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 1bb39065f [improve] add system table to show all table configs in
`sys.all_table_options` (#1647)
1bb39065f is described below
commit 1bb39065fac35eb6ac7df824e4a50883052fb003
Author: YeJunHao <[email protected]>
AuthorDate: Mon Jul 31 18:56:00 2023 +0800
[improve] add system table to show all table configs in
`sys.all_table_options` (#1647)
---
docs/content/how-to/system-tables.md | 52 ++++-
.../org/apache/paimon/catalog/AbstractCatalog.java | 44 +++-
.../java/org/apache/paimon/catalog/Catalog.java | 10 +
.../apache/paimon/catalog/FileSystemCatalog.java | 12 +
.../paimon/table/system/AllTableOptionsTable.java | 244 +++++++++++++++++++++
.../paimon/table/system/SystemTableLoader.java | 14 ++
.../java/org/apache/paimon/flink/FlinkCatalog.java | 3 +
.../apache/paimon/flink/CatalogTableITCase.java | 42 ++++
.../java/org/apache/paimon/hive/HiveCatalog.java | 12 +
9 files changed, 418 insertions(+), 15 deletions(-)
diff --git a/docs/content/how-to/system-tables.md
b/docs/content/how-to/system-tables.md
index e97ba55f3..f01bc4092 100644
--- a/docs/content/how-to/system-tables.md
+++ b/docs/content/how-to/system-tables.md
@@ -26,7 +26,9 @@ under the License.
# System Tables
-System tables contain metadata and information about each table, such as the
snapshots created and the options in use. Users can access system tables with
batch queries.
+## Table Specified System Table
+
+Table specified system tables contain metadata and information about each
table, such as the snapshots created and the options in use. Users can access
system tables with batch queries.
Currently, Flink, Spark and Trino supports querying system tables.
@@ -35,7 +37,7 @@ In some cases, the table name needs to be enclosed with back
quotes to avoid syn
SELECT * FROM my_catalog.my_db.`MyTable$snapshots`;
```
-## Snapshots Table
+### Snapshots Table
You can query the snapshot history information of the table through snapshots
table, including the record count occurred in the snapshot.
@@ -55,7 +57,7 @@ SELECT * FROM MyTable$snapshots;
By querying the snapshots table, you can know the commit and expiration
information about that table and time travel through the data.
-## Schemas Table
+### Schemas Table
You can query the historical schemas of the table through schemas table.
@@ -82,7 +84,7 @@ SELECT s.snapshot_id, t.schema_id, t.fields
ON s.schema_id=t.schema_id where s.snapshot_id=100;
```
-## Options Table
+### Options Table
You can query the table's option information which is specified from the DDL
through options table. The options not shown will be the default value. You can
take reference to [Configuration].
@@ -99,7 +101,7 @@ SELECT * FROM MyTable$options;
*/
```
-## Audit log Table
+### Audit log Table
If you need to audit the changelog of the table, you can use the `audit_log`
system table. Through `audit_log` table, you can get the `rowkind` column when
you get the incremental data of the table. You can use this column for
filtering and other operations to complete the audit.
@@ -128,7 +130,7 @@ SELECT * FROM MyTable$audit_log;
*/
```
-## Files Table
+### Files Table
You can query the files of the table with specific snapshot.
```sql
@@ -164,7 +166,7 @@ SELECT * FROM MyTable$files /*+
OPTIONS('scan.snapshot-id'='1') */;
*/
```
-## Tags Table
+### Tags Table
You can query the tag history information of the table through tags table,
including which snapshots are the tags based on
and some historical information of the snapshots. You can also get all tag
names and time travel to a specific tag data by name.
@@ -183,7 +185,7 @@ SELECT * FROM MyTable$tags;
*/
```
-## Consumers Table
+### Consumers Table
You can query all consumers which contains next snapshot.
@@ -201,7 +203,7 @@ SELECT * FROM MyTable$consumers;
*/
```
-## Manifests Table
+### Manifests Table
You can query all manifest files contained in the latest snapshot or the
specified snapshot of the current table.
@@ -246,3 +248,35 @@ SELECT * FROM MyTable$partitions;
+---------------+----------------+--------------------+
*/
```
+
+## Global System Table
+
+Global system tables contain the statistical information of all the tables
exists in paimon. For convenient of searching, we create a reference system
database called `sys`.
+We can display all the global system tables by sql in flink:
+```sql
+USE sys;
+SHOW TABLES;
+```
+
+### ALL Options Table
+This table is similar to [Options Table]({{< ref
"how-to/system-tables#options-table" >}}), but it shows all the table options
is all database.
+
+```sql
+SELECT * FROM sys.all_table_options;
+
+/*
++---------------+--------------------------------+--------------------------------+------------------+
+| database_name | table_name |
key | value |
++---------------+--------------------------------+--------------------------------+------------------+
+| my_db | Orders_orc |
bucket | -1 |
+| my_db | Orders2 |
bucket | -1 |
+| my_db | Orders2 |
write-mode | append-only |
+| my_db | Orders2 |
sink.parallelism | 7 |
+| my_db | StockAnalyze |
write-mode | change-log |
+| my_db2| OrdersSum |
bucket | 1 |
+| my_db2| OrdersSum |
write-mode | change-log |
++---------------+--------------------------------+--------------------------------+------------------+
+7 rows in set
+*/
+```
+
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index dba06caf7..38d3eef20 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -26,21 +26,24 @@ import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.Table;
+import org.apache.paimon.table.system.AllTableOptionsTable;
import org.apache.paimon.table.system.SystemTableLoader;
import org.apache.paimon.utils.StringUtils;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
/** Common implementation of {@link Catalog}. */
public abstract class AbstractCatalog implements Catalog {
public static final String DB_SUFFIX = ".db";
-
protected static final String TABLE_DEFAULT_OPTION_PREFIX =
"table-default.";
+ protected static final List<String> GLOBAL_TABLES =
+ Arrays.asList(AllTableOptionsTable.ALL_TABLE_OPTIONS);
protected final FileIO fileIO;
-
protected final Map<String, String> tableDefaultOptions;
protected AbstractCatalog(FileIO fileIO) {
@@ -63,7 +66,14 @@ public abstract class AbstractCatalog implements Catalog {
@Override
public Table getTable(Identifier identifier) throws TableNotExistException
{
- if (isSystemTable(identifier)) {
+ if (isSystemDatabase(identifier.getDatabaseName())) {
+ String tableName = identifier.getObjectName();
+ Table table = SystemTableLoader.loadGlobal(tableName, fileIO,
allTablePaths());
+ if (table == null) {
+ throw new TableNotExistException(identifier);
+ }
+ return table;
+ } else if (isSpecifiedSystemTable(identifier)) {
String[] splits = tableAndSystemName(identifier);
String tableName = splits[0];
String type = splits[1];
@@ -94,6 +104,24 @@ public abstract class AbstractCatalog implements Catalog {
return databasePath(warehouse(), database);
}
+ Map<String, Map<String, Path>> allTablePaths() {
+ try {
+ Map<String, Map<String, Path>> allPaths = new HashMap<>();
+ for (String database : listDatabases()) {
+ Map<String, Path> tableMap =
+ allPaths.computeIfAbsent(database, d -> new
HashMap<>());
+ for (String table : listTables(database)) {
+ tableMap.put(
+ table,
+ dataTableLocation(warehouse(),
Identifier.create(database, table)));
+ }
+ }
+ return allPaths;
+ } catch (DatabaseNotExistException e) {
+ throw new RuntimeException("Database is deleted while listing", e);
+ }
+ }
+
protected abstract String warehouse();
protected abstract TableSchema getDataTableSchema(Identifier identifier)
@@ -104,12 +132,12 @@ public abstract class AbstractCatalog implements Catalog {
return dataTableLocation(warehouse(), identifier);
}
- private static boolean isSystemTable(Identifier identifier) {
+ private static boolean isSpecifiedSystemTable(Identifier identifier) {
return identifier.getObjectName().contains(SYSTEM_TABLE_SPLITTER);
}
protected void checkNotSystemTable(Identifier identifier, String method) {
- if (isSystemTable(identifier)) {
+ if (isSystemDatabase(identifier.getDatabaseName()) ||
isSpecifiedSystemTable(identifier)) {
throw new IllegalArgumentException(
String.format(
"Cannot '%s' for system table '%s', please use
data table.",
@@ -132,7 +160,7 @@ public abstract class AbstractCatalog implements Catalog {
}
public static Path dataTableLocation(String warehouse, Identifier
identifier) {
- if (isSystemTable(identifier)) {
+ if (isSpecifiedSystemTable(identifier)) {
throw new IllegalArgumentException(
String.format(
"Table name[%s] cannot contain '%s' separator",
@@ -145,4 +173,8 @@ public abstract class AbstractCatalog implements Catalog {
public static Path databasePath(String warehouse, String database) {
return new Path(warehouse, database + DB_SUFFIX);
}
+
+ protected boolean isSystemDatabase(String database) {
+ return SYSTEM_DATABASE_NAME.equals(database);
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index 04518e809..7e8b03f13 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -42,6 +42,7 @@ public interface Catalog extends AutoCloseable {
String DEFAULT_DATABASE = "default";
String SYSTEM_TABLE_SPLITTER = "$";
+ String SYSTEM_DATABASE_NAME = "sys";
/**
* Get lock factory from catalog. Lock is used to support multiple
concurrent writes on the
@@ -271,6 +272,15 @@ public interface Catalog extends AutoCloseable {
}
}
+ /** Exception for trying to operate on a system database. */
+ class ProcessSystemDatabaseException extends IllegalArgumentException {
+ private static final String MSG = "Can't do operation on system
database.";
+
+ public ProcessSystemDatabaseException() {
+ super(MSG);
+ }
+ }
+
/** Exception for trying to create a table that already exists. */
class TableAlreadyExistException extends Exception {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
index 16a99b1b7..e8468cc1b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
@@ -66,12 +66,18 @@ public class FileSystemCatalog extends AbstractCatalog {
@Override
public boolean databaseExists(String databaseName) {
+ if (isSystemDatabase(databaseName)) {
+ return true;
+ }
return uncheck(() -> fileIO.exists(databasePath(databaseName)));
}
@Override
public void createDatabase(String name, boolean ignoreIfExists)
throws DatabaseAlreadyExistException {
+ if (isSystemDatabase(name)) {
+ throw new ProcessSystemDatabaseException();
+ }
if (databaseExists(name)) {
if (ignoreIfExists) {
return;
@@ -84,6 +90,9 @@ public class FileSystemCatalog extends AbstractCatalog {
@Override
public void dropDatabase(String name, boolean ignoreIfNotExists, boolean
cascade)
throws DatabaseNotExistException, DatabaseNotEmptyException {
+ if (isSystemDatabase(name)) {
+ throw new ProcessSystemDatabaseException();
+ }
if (!databaseExists(name)) {
if (ignoreIfNotExists) {
return;
@@ -101,6 +110,9 @@ public class FileSystemCatalog extends AbstractCatalog {
@Override
public List<String> listTables(String databaseName) throws
DatabaseNotExistException {
+ if (isSystemDatabase(databaseName)) {
+ return GLOBAL_TABLES;
+ }
if (!databaseExists(databaseName)) {
throw new DatabaseNotExistException(databaseName);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AllTableOptionsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AllTableOptionsTable.java
new file mode 100644
index 000000000..65a4afb0f
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AllTableOptionsTable.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.system;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.ReadonlyTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.InnerTableRead;
+import org.apache.paimon.table.source.InnerTableScan;
+import org.apache.paimon.table.source.ReadOnceTableScan;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.IteratorRecordReader;
+import org.apache.paimon.utils.ProjectedRow;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * This is a system table to display all the database-table properties.
+ *
+ * <pre>
+ * For example:
+ * If we select * from sys.all_table_options, we will get
+ * databasename tablename key value
+ * default test0 a b
+ * my_db test1 c d
+ * ... ... ... ...
+ * We can write sql to fetch the information we need.
+ * </pre>
+ */
+public class AllTableOptionsTable implements ReadonlyTable {
+
+ public static final String ALL_TABLE_OPTIONS = "all_table_options";
+
+ private final FileIO fileIO;
+ private final Map<String, Map<String, Path>> allTablePaths;
+
+ public AllTableOptionsTable(FileIO fileIO, Map<String, Map<String, Path>>
allTablePaths) {
+ // allTablePath is the map of <database, <table_name, properties>>
+ this.fileIO = fileIO;
+ this.allTablePaths = allTablePaths;
+ }
+
+ @Override
+ public String name() {
+ return ALL_TABLE_OPTIONS;
+ }
+
+ @Override
+ public RowType rowType() {
+ List<DataField> fields = new ArrayList<>();
+ fields.add(new DataField(0, "database_name", new
VarCharType(VarCharType.MAX_LENGTH)));
+ fields.add(new DataField(1, "table_name", new
VarCharType(VarCharType.MAX_LENGTH)));
+ fields.add(new DataField(2, "key", new
VarCharType(VarCharType.MAX_LENGTH)));
+ fields.add(new DataField(3, "value", new
VarCharType(VarCharType.MAX_LENGTH)));
+ return new RowType(fields);
+ }
+
+ @Override
+ public List<String> primaryKeys() {
+ return Collections.singletonList("table_name");
+ }
+
+ @Override
+ public InnerTableScan newScan() {
+ return new AllTableOptionsScan();
+ }
+
+ @Override
+ public InnerTableRead newRead() {
+ return new AllTableOptionsRead(fileIO);
+ }
+
+ @Override
+ public Table copy(Map<String, String> dynamicOptions) {
+ return new AllTableOptionsTable(fileIO, allTablePaths);
+ }
+
+ private class AllTableOptionsScan extends ReadOnceTableScan {
+
+ @Override
+ public InnerTableScan withFilter(Predicate predicate) {
+ return this;
+ }
+
+ @Override
+ public Plan innerPlan() {
+ return () -> Collections.singletonList(new AllTableSplit(fileIO,
allTablePaths));
+ }
+ }
+
+ private static class AllTableSplit implements Split {
+
+ private static final long serialVersionUID = 1L;
+
+ private final FileIO fileIO;
+ private final Map<String, Map<String, Path>> allTablePaths;
+
+ private AllTableSplit(FileIO fileIO, Map<String, Map<String, Path>>
allTablePaths) {
+ this.fileIO = fileIO;
+ this.allTablePaths = allTablePaths;
+ }
+
+ @Override
+ public long rowCount() {
+ return options(fileIO, allTablePaths).values().stream()
+ .flatMap(t -> t.values().stream())
+ .reduce(0, (a, b) -> a + b.size(), Integer::sum);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ AllTableSplit that = (AllTableSplit) o;
+ return Objects.equals(allTablePaths, that.allTablePaths);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(allTablePaths);
+ }
+ }
+
+ private static class AllTableOptionsRead implements InnerTableRead {
+
+ private final FileIO fileIO;
+ private int[][] projection;
+
+ public AllTableOptionsRead(FileIO fileIO) {
+ this.fileIO = fileIO;
+ }
+
+ @Override
+ public InnerTableRead withFilter(Predicate predicate) {
+ return this;
+ }
+
+ @Override
+ public InnerTableRead withProjection(int[][] projection) {
+ this.projection = projection;
+ return this;
+ }
+
+ @Override
+ public TableRead withIOManager(IOManager ioManager) {
+ return this;
+ }
+
+ @Override
+ public RecordReader<InternalRow> createReader(Split split) throws
IOException {
+ if (!(split instanceof AllTableSplit)) {
+ throw new IllegalArgumentException("Unsupported split: " +
split.getClass());
+ }
+ Map<String, Map<String, Path>> location = ((AllTableSplit)
split).allTablePaths;
+ Iterator<InternalRow> rows = toRow(options(fileIO, location));
+ if (projection != null) {
+ rows =
+ Iterators.transform(
+ rows, row ->
ProjectedRow.from(projection).replaceRow(row));
+ }
+ return new IteratorRecordReader<>(rows);
+ }
+
+ private Iterator<InternalRow> toRow(Map<String, Map<String,
Map<String, String>>> option) {
+ List<InternalRow> rows = new ArrayList<>();
+ for (Map.Entry<String, Map<String, Map<String, String>>> entry0 :
option.entrySet()) {
+ String database = entry0.getKey();
+ for (Map.Entry<String, Map<String, String>> entry1 :
entry0.getValue().entrySet()) {
+ String tableName = entry1.getKey();
+ for (Map.Entry<String, String> entry2 :
entry1.getValue().entrySet()) {
+ String key = entry2.getKey();
+ String value = entry2.getValue();
+ rows.add(
+ GenericRow.of(
+ BinaryString.fromString(database),
+ BinaryString.fromString(tableName),
+ BinaryString.fromString(key),
+ BinaryString.fromString(value)));
+ }
+ }
+ }
+ return rows.iterator();
+ }
+ }
+
+ private static Map<String, Map<String, Map<String, String>>> options(
+ FileIO fileIO, Map<String, Map<String, Path>> allTablePaths) {
+ Map<String, Map<String, Map<String, String>>> allOptions = new
HashMap<>();
+ for (Map.Entry<String, Map<String, Path>> entry0 :
allTablePaths.entrySet()) {
+ Map<String, Map<String, String>> m0 =
+ allOptions.computeIfAbsent(entry0.getKey(), k -> new
HashMap<>());
+ for (Map.Entry<String, Path> entry1 :
entry0.getValue().entrySet()) {
+ Map<String, String> options =
+ new SchemaManager(fileIO, entry1.getValue())
+ .latest()
+ .orElseThrow(() -> new RuntimeException("Table
not exists."))
+ .options();
+ m0.put(entry1.getKey(), options);
+ }
+ }
+ return allOptions;
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
index 511886cb2..49e99e696 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
@@ -25,6 +25,9 @@ import org.apache.paimon.table.Table;
import javax.annotation.Nullable;
+import java.util.Map;
+
+import static
org.apache.paimon.table.system.AllTableOptionsTable.ALL_TABLE_OPTIONS;
import static org.apache.paimon.table.system.AuditLogTable.AUDIT_LOG;
import static org.apache.paimon.table.system.ConsumersTable.CONSUMERS;
import static org.apache.paimon.table.system.FilesTable.FILES;
@@ -64,4 +67,15 @@ public class SystemTableLoader {
return null;
}
}
+
+ @Nullable
+ public static Table loadGlobal(
+ String tableName, FileIO fileIO, Map<String, Map<String, Path>>
allTablePaths) {
+ switch (tableName.toLowerCase()) {
+ case ALL_TABLE_OPTIONS:
+ return new AllTableOptionsTable(fileIO, allTablePaths);
+ default:
+ return null;
+ }
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index 8a2224da0..51499abae 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -496,6 +496,9 @@ public class FlinkCatalog extends AbstractCatalog {
}
private static void validateAlterTable(CatalogTable ct1, CatalogTable ct2)
{
+ if (ct1 instanceof SystemCatalogTable) {
+ throw new UnsupportedOperationException("Can't alter system
table.");
+ }
org.apache.flink.table.api.TableSchema ts1 = ct1.getSchema();
org.apache.flink.table.api.TableSchema ts2 = ct2.getSchema();
boolean pkEquality = false;
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index 7befc61af..5b40d2921 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -18,10 +18,12 @@
package org.apache.paimon.flink;
+import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.system.AllTableOptionsTable;
import org.apache.paimon.types.IntType;
import org.apache.paimon.utils.BlockingIterator;
@@ -36,6 +38,7 @@ import java.util.List;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** ITCase for catalog tables. */
@@ -86,6 +89,45 @@ public class CatalogTableITCase extends CatalogITCaseBase {
assertThat(result).containsExactly(Row.of("snapshot.time-retained", "5
h"));
}
+ @Test
+ public void testAllTableOptions() {
+ sql("CREATE TABLE T (a INT, b INT) with ('a.aa.aaa'='val1',
'b.bb.bbb'='val2')");
+ sql("ALTER TABLE T SET ('c.cc.ccc' = 'val3')");
+
+ List<Row> result = sql("SELECT * FROM sys.all_table_options");
+ assertThat(result)
+ .containsExactly(
+ Row.of("default", "T", "a.aa.aaa", "val1"),
+ Row.of("default", "T", "b.bb.bbb", "val2"),
+ Row.of("default", "T", "c.cc.ccc", "val3"));
+ }
+
+ @Test
+ public void testDropSystemDatabase() {
+ assertThatCode(() -> sql("DROP DATABASE sys"))
+ .hasRootCauseMessage("Can't do operation on system database.");
+ }
+
+ @Test
+ public void testCreateSystemDatabase() {
+ assertThatCode(() -> sql("CREATE DATABASE sys"))
+ .hasRootCauseMessage("Can't do operation on system database.");
+ }
+
+ @Test
+ public void testChangeTableInSystemDatabase() {
+ sql("USE sys");
+ assertThatCode(() -> sql("ALTER TABLE all_table_options SET
('bucket-num' = '5')"))
+ .hasRootCauseMessage("Can't alter system table.");
+ }
+
+ @Test
+ public void testSystemDatabase() {
+ sql("USE " + Catalog.SYSTEM_DATABASE_NAME);
+ assertThat(sql("SHOW TABLES"))
+
.containsExactly(Row.of(AllTableOptionsTable.ALL_TABLE_OPTIONS));
+ }
+
@Test
public void testCreateSystemTable() {
assertThatThrownBy(() -> sql("CREATE TABLE T$snapshots (a INT, b
INT)"))
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index ea9e91139..ad8c48da0 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -171,6 +171,9 @@ public class HiveCatalog extends AbstractCatalog {
@Override
public boolean databaseExists(String databaseName) {
+ if (isSystemDatabase(databaseName)) {
+ return true;
+ }
try {
client.getDatabase(databaseName);
return true;
@@ -185,6 +188,9 @@ public class HiveCatalog extends AbstractCatalog {
@Override
public void createDatabase(String name, boolean ignoreIfExists)
throws DatabaseAlreadyExistException {
+ if (isSystemDatabase(name)) {
+ throw new ProcessSystemDatabaseException();
+ }
try {
client.createDatabase(convertToDatabase(name));
@@ -201,6 +207,9 @@ public class HiveCatalog extends AbstractCatalog {
@Override
public void dropDatabase(String name, boolean ignoreIfNotExists, boolean
cascade)
throws DatabaseNotExistException, DatabaseNotEmptyException {
+ if (isSystemDatabase(name)) {
+ throw new ProcessSystemDatabaseException();
+ }
try {
if (!cascade && client.getAllTables(name).size() > 0) {
throw new DatabaseNotEmptyException(name);
@@ -219,6 +228,9 @@ public class HiveCatalog extends AbstractCatalog {
@Override
public List<String> listTables(String databaseName) throws
DatabaseNotExistException {
+ if (isSystemDatabase(databaseName)) {
+ return GLOBAL_TABLES;
+ }
try {
return client.getAllTables(databaseName).stream()
.filter(