This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 34a22f6c2 [core] support tables inherit not built-in configurations
from catalog (#775)
34a22f6c2 is described below
commit 34a22f6c2d65294204b9b99dbb1d014568239a9f
Author: legendtkl <[email protected]>
AuthorDate: Wed Apr 12 10:19:48 2023 +0800
[core] support tables inherit not built-in configurations from catalog
(#775)
---
docs/content/how-to/creating-catalogs.md | 19 +++++++-
.../org/apache/paimon/catalog/AbstractCatalog.java | 25 +++++++++++
.../apache/paimon/catalog/FileSystemCatalog.java | 8 ++++
.../paimon/catalog/FileSystemCatalogFactory.java | 2 +-
.../paimon/flink/FileSystemCatalogITCase.java | 50 +++++++++++++++++++++-
.../java/org/apache/paimon/hive/HiveCatalog.java | 12 ++++++
.../org/apache/paimon/hive/HiveCatalogFactory.java | 2 +-
.../apache/paimon/hive/HiveCatalogITCaseBase.java | 49 +++++++++++++++++++++
8 files changed, 162 insertions(+), 5 deletions(-)
diff --git a/docs/content/how-to/creating-catalogs.md
b/docs/content/how-to/creating-catalogs.md
index 2ead26372..bfb12d6c6 100644
--- a/docs/content/how-to/creating-catalogs.md
+++ b/docs/content/how-to/creating-catalogs.md
@@ -44,12 +44,18 @@ The following Flink SQL registers and uses a Paimon catalog
named `my_catalog`.
```sql
CREATE CATALOG my_catalog WITH (
'type' = 'paimon',
- 'warehouse' = 'hdfs://path/to/warehouse'
+ 'warehouse' = 'hdfs://path/to/warehouse',
+ 'table-default.key' = 'value'
);
USE CATALOG my_catalog;
```
+The property `'table-default.key' = 'value'` specifies the default property
`'key' = 'value'` for tables created in the catalog.
+
+You can define any default properties with the prefix `table-default.`. If the
table defines the same properties with the default properties, the table's
properties will be taken.
+
+
{{< /tab >}}
{{< tab "Spark3" >}}
@@ -59,9 +65,14 @@ The following shell command registers a paimon catalog named
`paimon`. Metadata
```bash
spark-sql ... \
--conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
- --conf spark.sql.catalog.paimon.warehouse=hdfs://path/to/warehouse
+ --conf spark.sql.catalog.paimon.warehouse=hdfs://path/to/warehouse \
+ --conf spark.sql.catalog.paimon.table-default.key=value
```
+The conf `--conf spark.sql.catalog.paimon.table-default.key=value` specifies
the default property `'key' = 'value'` for tables created in the catalog.
+
+You can define any default properties with the prefix
`spark.sql.catalog.paimon.table-default.` configuration. If the table defines
the same properties with the default properties, the table's properties will be
taken.
+
After `spark-sql` is started, you can switch to the `default` database of the
`paimon` catalog with the following SQL.
```sql
@@ -97,6 +108,8 @@ CREATE CATALOG my_hive WITH (
USE CATALOG my_hive;
```
+You can also use the property with the prefix `table-default.` to define
default properties for tables as above `Catalog with Filesystem Metastore`.
+
{{< /tab >}}
{{< tab "Spark3" >}}
@@ -113,6 +126,8 @@ spark-sql ... \
--conf
spark.sql.catalog.paimon.uri=thrift://<hive-metastore-host-name>:<port>
```
+You can also define any default properties with the prefix
`spark.sql.catalog.paimon.table-default.` configuration for tables as above
`Catalog with Filesystem Metastore`.
+
After `spark-sql` is started, you can switch to the `default` database of the
`paimon` catalog with the following SQL.
```sql
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index beecbe5af..21b012d91 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -28,15 +28,36 @@ import org.apache.paimon.table.Table;
import org.apache.paimon.table.system.SystemTableLoader;
import org.apache.paimon.utils.StringUtils;
+import java.util.HashMap;
+import java.util.Map;
+
/** Common implementation of {@link Catalog}. */
public abstract class AbstractCatalog implements Catalog {
protected static final String DB_SUFFIX = ".db";
+ protected static final String TABLE_DEFAULT_OPTION_PREFIX =
"table-default.";
+
protected final FileIO fileIO;
+ protected final Map<String, String> tableDefaultOptions;
+
protected AbstractCatalog(FileIO fileIO) {
this.fileIO = fileIO;
+ this.tableDefaultOptions = new HashMap<>();
+ }
+
+ protected AbstractCatalog(FileIO fileIO, Map<String, String> options) {
+ this.fileIO = fileIO;
+ this.tableDefaultOptions = new HashMap<>();
+
+ options.keySet().stream()
+ .filter(key -> key.startsWith(TABLE_DEFAULT_OPTION_PREFIX))
+ .forEach(
+ key ->
+ this.tableDefaultOptions.put(
+
key.substring(TABLE_DEFAULT_OPTION_PREFIX.length()),
+ options.get(key)));
}
@Override
@@ -95,6 +116,10 @@ public abstract class AbstractCatalog implements Catalog {
}
}
+ protected void copyTableDefaultOptions(Map<String, String> options) {
+ tableDefaultOptions.forEach((k, v) -> options.putIfAbsent(k, v));
+ }
+
private String[] tableAndSystemName(Identifier identifier) {
String[] splits = StringUtils.split(identifier.getObjectName(),
SYSTEM_TABLE_SPLITTER);
if (splits.length != 2) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
index f3ef51620..8a52c5182 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
@@ -28,6 +28,7 @@ import org.apache.paimon.schema.TableSchema;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
@@ -41,6 +42,11 @@ public class FileSystemCatalog extends AbstractCatalog {
this.warehouse = warehouse;
}
+ public FileSystemCatalog(FileIO fileIO, Path warehouse, Map<String,
String> options) {
+ super(fileIO, options);
+ this.warehouse = warehouse;
+ }
+
@Override
public Optional<CatalogLock.Factory> lockFactory() {
return Optional.empty();
@@ -153,6 +159,8 @@ public class FileSystemCatalog extends AbstractCatalog {
throw new TableAlreadyExistException(identifier);
}
+ copyTableDefaultOptions(schema.options());
+
uncheck(() -> new SchemaManager(fileIO, path).createTable(schema));
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java
index bb09de29d..62412d272 100644
---
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java
@@ -40,6 +40,6 @@ public class FileSystemCatalogFactory implements
CatalogFactory {
throw new IllegalArgumentException(
"Only managed table is supported in File system catalog.");
}
- return new FileSystemCatalog(fileIO, warehouse);
+ return new FileSystemCatalog(fileIO, warehouse,
context.options().toMap());
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
index 5d0be0614..202faea9f 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
@@ -35,6 +35,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
@@ -52,18 +53,19 @@ public class FileSystemCatalogITCase extends
KafkaTableTestBase {
path = getTempDirPath();
tEnv.executeSql(
String.format("CREATE CATALOG fs WITH ('type'='paimon',
'warehouse'='%s')", path));
- tEnv.useCatalog("fs");
env.setParallelism(1);
}
@Test
public void testWriteRead() throws Exception {
+ tEnv.useCatalog("fs");
tEnv.executeSql("CREATE TABLE T (a STRING, b STRING, c STRING)");
innerTestWriteRead();
}
@Test
public void testRenameTable() throws Exception {
+ tEnv.useCatalog("fs");
tEnv.executeSql("CREATE TABLE t1 (a INT)").await();
tEnv.executeSql("CREATE TABLE t2 (a INT)").await();
tEnv.executeSql("INSERT INTO t1 VALUES(1),(2)").await();
@@ -98,6 +100,7 @@ public class FileSystemCatalogITCase extends
KafkaTableTestBase {
createTopicIfNotExists(topic, 1);
try {
+ tEnv.useCatalog("fs");
tEnv.executeSql(
String.format(
"CREATE TABLE T (a STRING, b STRING, c STRING)
WITH ("
@@ -118,6 +121,7 @@ public class FileSystemCatalogITCase extends
KafkaTableTestBase {
createTopicIfNotExists(topic, 1);
try {
+ tEnv.useCatalog("fs");
tEnv.executeSql(
String.format(
"CREATE TABLE T ("
@@ -142,6 +146,50 @@ public class FileSystemCatalogITCase extends
KafkaTableTestBase {
}
}
+ @Test
+ public void testCatalogOptionsInheritAndOverride() throws Exception {
+ tEnv.executeSql(
+ String.format(
+ "CREATE CATALOG fs_with_options WITH ("
+ + "'type'='paimon', "
+ + "'warehouse'='%s', "
+ + "'table-default.opt1'='value1', "
+ + "'table-default.opt2'='value2', "
+ + "'table-default.opt3'='value3', "
+ + "'fs.allow-hadoop-fallback'='false',"
+ + "'lock.enabled'='true'"
+ + ")",
+ path));
+ tEnv.useCatalog("fs_with_options");
+
+ // check table inherit catalog options
+ tEnv.executeSql("CREATE TABLE t1_options (a STRING, b STRING, c
STRING)");
+
+ Identifier identifier = new Identifier(DB_NAME, "t1_options");
+ Catalog catalog =
+ ((FlinkCatalog)
tEnv.getCatalog(tEnv.getCurrentCatalog()).get()).catalog();
+ Map<String, String> tableOptions =
catalog.getTable(identifier).options();
+
+ assertThat(tableOptions).containsEntry("opt1", "value1");
+ assertThat(tableOptions).containsEntry("opt2", "value2");
+ assertThat(tableOptions).containsEntry("opt3", "value3");
+ assertThat(tableOptions).doesNotContainKey("fs.allow-hadoop-fallback");
+ assertThat(tableOptions).doesNotContainKey("lock.enabled");
+
+ // check table options override catalog's
+ tEnv.executeSql(
+ "CREATE TABLE t2_options (a STRING, b STRING, c STRING) WITH
('opt3'='value4')");
+
+ identifier = new Identifier(DB_NAME, "t2_options");
+ tableOptions = catalog.getTable(identifier).options();
+
+ assertThat(tableOptions).containsEntry("opt1", "value1");
+ assertThat(tableOptions).containsEntry("opt2", "value2");
+ assertThat(tableOptions).containsEntry("opt3", "value4");
+ assertThat(tableOptions).doesNotContainKey("fs.allow-hadoop-fallback");
+ assertThat(tableOptions).doesNotContainKey("lock.enabled");
+ }
+
private void innerTestWriteRead() throws Exception {
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(tEnv.from("T").execute().collect());
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 5af1e58de..737841d07 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -65,6 +65,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@@ -103,6 +104,14 @@ public class HiveCatalog extends AbstractCatalog {
this.client = createClient(hiveConf, clientClassName);
}
+ public HiveCatalog(
+ FileIO fileIO, HiveConf hiveConf, String clientClassName,
Map<String, String> options) {
+ super(fileIO, options);
+ this.hiveConf = hiveConf;
+ this.clientClassName = clientClassName;
+ this.client = createClient(hiveConf, clientClassName);
+ }
+
@Override
public Optional<CatalogLock.Factory> lockFactory() {
return lockEnabled()
@@ -250,6 +259,9 @@ public class HiveCatalog extends AbstractCatalog {
checkFieldNamesUpperCase(schema.rowType().getFieldNames());
// first commit changes to underlying files
// if changes on Hive fails there is no harm to perform the same
changes to files again
+
+ copyTableDefaultOptions(schema.options());
+
TableSchema tableSchema;
try {
tableSchema = schemaManager(identifier).createTable(schema);
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogFactory.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogFactory.java
index a33463738..e8771df31 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogFactory.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogFactory.java
@@ -73,6 +73,6 @@ public class HiveCatalogFactory implements CatalogFactory {
String clientClassName = context.options().get(METASTORE_CLIENT_CLASS);
- return new HiveCatalog(fileIO, hiveConf, clientClassName);
+ return new HiveCatalog(fileIO, hiveConf, clientClassName,
context.options().toMap());
}
}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
index ad113afe9..5dd9b46b3 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
@@ -49,6 +49,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
@@ -517,6 +518,54 @@ public abstract class HiveCatalogITCaseBase {
Assert.assertEquals("[]", tables.toString());
}
+ @Test
+ public void testCatalogOptionsInheritAndOverride() throws Exception {
+ tEnv.executeSql(
+ String.join(
+ "\n",
+ "CREATE CATALOG my_hive_options WITH (",
+ " 'type' = 'paimon',",
+ " 'metastore' = 'hive',",
+ " 'uri' = '',",
+ " 'hive-conf-dir' = '"
+ +
hiveShell.getBaseDir().getRoot().getPath()
+ + HIVE_CONF
+ + "',",
+ " 'warehouse' = '" + path + "',",
+ " 'lock.enabled' = 'true',",
+ " 'table-default.opt1' = 'value1',",
+ " 'table-default.opt2' = 'value2',",
+ " 'table-default.opt3' = 'value3'",
+ ")"))
+ .await();
+ tEnv.executeSql("USE CATALOG my_hive_options").await();
+
+ // check inherit
+ tEnv.executeSql("CREATE TABLE table_without_options (a INT, b
STRING)").await();
+
+ Identifier identifier = new Identifier("default",
"table_without_options");
+ Catalog catalog =
+ ((FlinkCatalog)
tEnv.getCatalog(tEnv.getCurrentCatalog()).get()).catalog();
+ Map<String, String> tableOptions =
catalog.getTable(identifier).options();
+
+ Assertions.assertThat(tableOptions).containsEntry("opt1", "value1");
+ Assertions.assertThat(tableOptions).containsEntry("opt2", "value2");
+ Assertions.assertThat(tableOptions).containsEntry("opt3", "value3");
+ Assertions.assertThat(tableOptions).doesNotContainKey("lock.enabled");
+
+ // check override
+ tEnv.executeSql(
+ "CREATE TABLE table_with_options (a INT, b STRING)
WITH ('opt1' = 'new_value')")
+ .await();
+ identifier = new Identifier("default", "table_with_options");
+ tableOptions = catalog.getTable(identifier).options();
+
+ Assertions.assertThat(tableOptions).containsEntry("opt1", "new_value");
+ Assertions.assertThat(tableOptions).containsEntry("opt2", "value2");
+ Assertions.assertThat(tableOptions).containsEntry("opt3", "value3");
+ Assertions.assertThat(tableOptions).doesNotContainKey("lock.enabled");
+ }
+
protected List<Row> collect(String sql) throws Exception {
List<Row> result = new ArrayList<>();
try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) {