This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b58f0c9fd [core] Support catalog options table (#1889)
b58f0c9fd is described below
commit b58f0c9fd7073ec4ba4626ed97443bcb764c9335
Author: GuojunLi <[email protected]>
AuthorDate: Mon Aug 28 15:06:43 2023 +0800
[core] Support catalog options table (#1889)
---
docs/content/how-to/system-tables.md | 16 ++
.../org/apache/paimon/catalog/AbstractCatalog.java | 11 +-
.../paimon/table/system/CatalogOptionsTable.java | 192 +++++++++++++++++++++
.../paimon/table/system/SystemTableLoader.java | 8 +-
.../table/system/CatalogOptionsTableTest.java | 83 +++++++++
.../apache/paimon/flink/CatalogTableITCase.java | 11 +-
6 files changed, 317 insertions(+), 4 deletions(-)
diff --git a/docs/content/how-to/system-tables.md
b/docs/content/how-to/system-tables.md
index 70a84e94b..9ea6548a6 100644
--- a/docs/content/how-to/system-tables.md
+++ b/docs/content/how-to/system-tables.md
@@ -280,3 +280,19 @@ SELECT * FROM sys.all_table_options;
*/
```
+### Catalog Options Table
+You can query the catalog's option information through catalog options table.
The options not shown will be the default value. You can take reference to
[Configuration]({{< ref "maintenance/configurations#coreoptions" >}}).
+
+```sql
+SELECT * FROM sys.catalog_options;
+
+/*
++-----------+---------------------------+
+| key | value |
++-----------+---------------------------+
+| warehouse | hdfs:///path/to/warehouse |
++-----------+---------------------------+
+1 rows in set
+*/
+```
+
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 102ef5a5d..6d212e908 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -32,6 +32,7 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.system.AllTableOptionsTable;
+import org.apache.paimon.table.system.CatalogOptionsTable;
import org.apache.paimon.table.system.SystemTableLoader;
import org.apache.paimon.utils.StringUtils;
@@ -50,10 +51,12 @@ public abstract class AbstractCatalog implements Catalog {
public static final String DB_SUFFIX = ".db";
protected static final String TABLE_DEFAULT_OPTION_PREFIX =
"table-default.";
protected static final List<String> GLOBAL_TABLES =
- Arrays.asList(AllTableOptionsTable.ALL_TABLE_OPTIONS);
+ Arrays.asList(
+ AllTableOptionsTable.ALL_TABLE_OPTIONS,
CatalogOptionsTable.CATALOG_OPTIONS);
protected final FileIO fileIO;
protected final Map<String, String> tableDefaultOptions;
+ protected final Map<String, String> catalogOptions;
@Nullable protected final LineageMeta lineageMeta;
@@ -61,6 +64,7 @@ public abstract class AbstractCatalog implements Catalog {
this.fileIO = fileIO;
this.lineageMeta = null;
this.tableDefaultOptions = new HashMap<>();
+ this.catalogOptions = new HashMap<>();
}
protected AbstractCatalog(FileIO fileIO, Map<String, String> options) {
@@ -69,6 +73,7 @@ public abstract class AbstractCatalog implements Catalog {
findAndCreateLineageMeta(
Options.fromMap(options),
AbstractCatalog.class.getClassLoader());
this.tableDefaultOptions = new HashMap<>();
+ this.catalogOptions = options;
options.keySet().stream()
.filter(key -> key.startsWith(TABLE_DEFAULT_OPTION_PREFIX))
@@ -94,7 +99,9 @@ public abstract class AbstractCatalog implements Catalog {
public Table getTable(Identifier identifier) throws TableNotExistException
{
if (isSystemDatabase(identifier.getDatabaseName())) {
String tableName = identifier.getObjectName();
- Table table = SystemTableLoader.loadGlobal(tableName, fileIO,
allTablePaths());
+ Table table =
+ SystemTableLoader.loadGlobal(
+ tableName, fileIO, allTablePaths(),
catalogOptions);
if (table == null) {
throw new TableNotExistException(identifier);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/CatalogOptionsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/CatalogOptionsTable.java
new file mode 100644
index 000000000..19ce65874
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/CatalogOptionsTable.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.system;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.ReadonlyTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.InnerTableRead;
+import org.apache.paimon.table.source.InnerTableScan;
+import org.apache.paimon.table.source.ReadOnceTableScan;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.IteratorRecordReader;
+import org.apache.paimon.utils.ProjectedRow;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.utils.SerializationUtils.newStringType;
+
+/** This is a system {@link Table} to display catalog options. */
+public class CatalogOptionsTable implements ReadonlyTable {
+
+ public static final String CATALOG_OPTIONS = "catalog_options";
+
+ private final Map<String, String> catalogOptions;
+
+ public static final RowType TABLE_TYPE =
+ new RowType(
+ Arrays.asList(
+ new DataField(0, "key", newStringType(false)),
+ new DataField(1, "value", newStringType(false))));
+
+ public CatalogOptionsTable(Map<String, String> catalogOptions) {
+ this.catalogOptions = catalogOptions;
+ }
+
+ /** A name to identify this table. */
+ @Override
+ public String name() {
+ return CATALOG_OPTIONS;
+ }
+
+ /** Returns the row type of this table. */
+ @Override
+ public RowType rowType() {
+ return TABLE_TYPE;
+ }
+
+ @Override
+ public InnerTableScan newScan() {
+ return new CatalogOptionsScan();
+ }
+
+ @Override
+ public InnerTableRead newRead() {
+ return new CatalogOptionsRead();
+ }
+
+ /** Primary keys of this table. */
+ @Override
+ public List<String> primaryKeys() {
+ return Collections.singletonList("key");
+ }
+
+ @Override
+ public Table copy(Map<String, String> dynamicOptions) {
+ return new CatalogOptionsTable(catalogOptions);
+ }
+
+ private class CatalogOptionsScan extends ReadOnceTableScan {
+
+ @Override
+ public InnerTableScan withFilter(Predicate predicate) {
+ return this;
+ }
+
+ @Override
+ public Plan innerPlan() {
+ return () ->
+ Collections.singletonList(
+ new
CatalogOptionsTable.CatalogOptionsSplit(catalogOptions));
+ }
+ }
+
+ private static class CatalogOptionsSplit implements Split {
+
+ private static final long serialVersionUID = 1L;
+
+ private final Map<String, String> catalogOptions;
+
+ private CatalogOptionsSplit(Map<String, String> catalogOptions) {
+ this.catalogOptions = catalogOptions;
+ }
+
+ @Override
+ public long rowCount() {
+ return catalogOptions.size();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CatalogOptionsTable.CatalogOptionsSplit that =
+ (CatalogOptionsTable.CatalogOptionsSplit) o;
+ return catalogOptions.equals(that.catalogOptions);
+ }
+
+ @Override
+ public int hashCode() {
+ return catalogOptions.hashCode();
+ }
+ }
+
+ private static class CatalogOptionsRead implements InnerTableRead {
+
+ private int[][] projection;
+
+ @Override
+ public InnerTableRead withFilter(Predicate predicate) {
+ return this;
+ }
+
+ @Override
+ public InnerTableRead withProjection(int[][] projection) {
+ this.projection = projection;
+ return this;
+ }
+
+ @Override
+ public TableRead withIOManager(IOManager ioManager) {
+ return this;
+ }
+
+ @Override
+ public RecordReader<InternalRow> createReader(Split split) throws
IOException {
+ if (!(split instanceof CatalogOptionsTable.CatalogOptionsSplit)) {
+ throw new IllegalArgumentException("Unsupported split: " +
split.getClass());
+ }
+ Iterator<InternalRow> rows =
+ Iterators.transform(
+ ((CatalogOptionsSplit)
split).catalogOptions.entrySet().iterator(),
+ this::toRow);
+ if (projection != null) {
+ rows =
+ Iterators.transform(
+ rows, row ->
ProjectedRow.from(projection).replaceRow(row));
+ }
+ return new IteratorRecordReader<>(rows);
+ }
+
+ private InternalRow toRow(Map.Entry<String, String> option) {
+ return GenericRow.of(
+ BinaryString.fromString(option.getKey()),
+ BinaryString.fromString(option.getValue()));
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
index 3d7daea9c..c31901af8 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
@@ -29,6 +29,7 @@ import java.util.Map;
import static
org.apache.paimon.table.system.AllTableOptionsTable.ALL_TABLE_OPTIONS;
import static org.apache.paimon.table.system.AuditLogTable.AUDIT_LOG;
+import static
org.apache.paimon.table.system.CatalogOptionsTable.CATALOG_OPTIONS;
import static org.apache.paimon.table.system.ConsumersTable.CONSUMERS;
import static org.apache.paimon.table.system.FilesTable.FILES;
import static org.apache.paimon.table.system.ManifestsTable.MANIFESTS;
@@ -70,10 +71,15 @@ public class SystemTableLoader {
@Nullable
public static Table loadGlobal(
- String tableName, FileIO fileIO, Map<String, Map<String, Path>>
allTablePaths) {
+ String tableName,
+ FileIO fileIO,
+ Map<String, Map<String, Path>> allTablePaths,
+ Map<String, String> catalogOptions) {
switch (tableName.toLowerCase()) {
case ALL_TABLE_OPTIONS:
return new AllTableOptionsTable(fileIO, allTablePaths);
+ case CATALOG_OPTIONS:
+ return new CatalogOptionsTable(catalogOptions);
default:
return null;
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/system/CatalogOptionsTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/system/CatalogOptionsTableTest.java
new file mode 100644
index 000000000..9ee1a63cb
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/table/system/CatalogOptionsTableTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.system;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.table.TableType;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME;
+import static
org.apache.paimon.table.system.CatalogOptionsTable.CATALOG_OPTIONS;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link CatalogOptionsTable}. */
+public class CatalogOptionsTableTest extends TableTestBase {
+
+ private Catalog catalog;
+
+ private CatalogOptionsTable catalogOptionsTable;
+ private Options catalogOptions;
+ @TempDir java.nio.file.Path tempDir;
+
+ @BeforeEach
+ public void before() throws Exception {
+ catalogOptions = new Options();
+ catalogOptions.set(CatalogOptions.TABLE_TYPE, TableType.MANAGED);
+ catalogOptions.set("table-default.scan.infer-parallelism", "false");
+ catalogOptions.set(CatalogOptions.WAREHOUSE,
tempDir.toUri().toString());
+ catalog =
CatalogFactory.createCatalog(CatalogContext.create(catalogOptions));
+ catalogOptionsTable =
+ (CatalogOptionsTable)
+ catalog.getTable(new Identifier(SYSTEM_DATABASE_NAME,
CATALOG_OPTIONS));
+ }
+
+ @Test
+ public void testCatalogOptionsTable() throws Exception {
+ List<InternalRow> expectRow = getExceptedResult();
+ List<InternalRow> result = read(catalogOptionsTable);
+ assertThat(result).containsExactlyElementsOf(expectRow);
+ }
+
+ private List<InternalRow> getExceptedResult() {
+ List<InternalRow> expectedRow = new ArrayList<>();
+ for (Map.Entry<String, String> option :
catalogOptions.toMap().entrySet()) {
+ expectedRow.add(
+ GenericRow.of(
+ BinaryString.fromString(option.getKey()),
+ BinaryString.fromString(option.getValue())));
+ }
+ return expectedRow;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index 092c6ddba..bc9e773aa 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -24,6 +24,7 @@ import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.system.AllTableOptionsTable;
+import org.apache.paimon.table.system.CatalogOptionsTable;
import org.apache.paimon.types.IntType;
import org.apache.paimon.utils.BlockingIterator;
@@ -102,6 +103,12 @@ public class CatalogTableITCase extends CatalogITCaseBase {
Row.of("default", "T", "c.cc.ccc", "val3"));
}
+ @Test
+ public void testCatalogOptionsTable() {
+ List<Row> result = sql("SELECT * FROM sys.catalog_options");
+ assertThat(result).containsExactly(Row.of("warehouse", path));
+ }
+
@Test
public void testDropSystemDatabase() {
assertThatCode(() -> sql("DROP DATABASE sys"))
@@ -125,7 +132,9 @@ public class CatalogTableITCase extends CatalogITCaseBase {
public void testSystemDatabase() {
sql("USE " + Catalog.SYSTEM_DATABASE_NAME);
assertThat(sql("SHOW TABLES"))
-
.containsExactly(Row.of(AllTableOptionsTable.ALL_TABLE_OPTIONS));
+ .containsExactly(
+ Row.of(AllTableOptionsTable.ALL_TABLE_OPTIONS),
+ Row.of(CatalogOptionsTable.CATALOG_OPTIONS));
}
@Test