This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 34a22f6c2 [core] support tables inherit not built-in configurations 
from catalog (#775)
34a22f6c2 is described below

commit 34a22f6c2d65294204b9b99dbb1d014568239a9f
Author: legendtkl <[email protected]>
AuthorDate: Wed Apr 12 10:19:48 2023 +0800

    [core] support tables inherit not built-in configurations from catalog 
(#775)
---
 docs/content/how-to/creating-catalogs.md           | 19 +++++++-
 .../org/apache/paimon/catalog/AbstractCatalog.java | 25 +++++++++++
 .../apache/paimon/catalog/FileSystemCatalog.java   |  8 ++++
 .../paimon/catalog/FileSystemCatalogFactory.java   |  2 +-
 .../paimon/flink/FileSystemCatalogITCase.java      | 50 +++++++++++++++++++++-
 .../java/org/apache/paimon/hive/HiveCatalog.java   | 12 ++++++
 .../org/apache/paimon/hive/HiveCatalogFactory.java |  2 +-
 .../apache/paimon/hive/HiveCatalogITCaseBase.java  | 49 +++++++++++++++++++++
 8 files changed, 162 insertions(+), 5 deletions(-)

diff --git a/docs/content/how-to/creating-catalogs.md 
b/docs/content/how-to/creating-catalogs.md
index 2ead26372..bfb12d6c6 100644
--- a/docs/content/how-to/creating-catalogs.md
+++ b/docs/content/how-to/creating-catalogs.md
@@ -44,12 +44,18 @@ The following Flink SQL registers and uses a Paimon catalog 
named `my_catalog`.
 ```sql
 CREATE CATALOG my_catalog WITH (
     'type' = 'paimon',
-    'warehouse' = 'hdfs://path/to/warehouse'
+    'warehouse' = 'hdfs://path/to/warehouse',
+    'table-default.key' = 'value'
 );
 
 USE CATALOG my_catalog;
 ```
 
+The property `'table-default.key' = 'value'` specifies the default property 
`'key' = 'value'` for tables created in the catalog.
+
+You can define any default properties with the prefix `table-default.`. If the 
table defines the same properties with the default properties, the table's 
properties will be taken.
+
+
 {{< /tab >}}
 
 {{< tab "Spark3" >}}
@@ -59,9 +65,14 @@ The following shell command registers a paimon catalog named 
`paimon`. Metadata
 ```bash
 spark-sql ... \
     --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
-    --conf spark.sql.catalog.paimon.warehouse=hdfs://path/to/warehouse
+    --conf spark.sql.catalog.paimon.warehouse=hdfs://path/to/warehouse \
+    --conf spark.sql.catalog.paimon.table-default.key=value
 ```
 
+The conf `--conf spark.sql.catalog.paimon.table-default.key=value` specifies 
the default property `'key' = 'value'` for tables created in the catalog.
+
+You can define any default properties with the prefix 
`spark.sql.catalog.paimon.table-default.` configuration. If the table defines 
the same properties with the default properties, the table's properties will be 
taken.
+
 After `spark-sql` is started, you can switch to the `default` database of the 
`paimon` catalog with the following SQL.
 
 ```sql
@@ -97,6 +108,8 @@ CREATE CATALOG my_hive WITH (
 USE CATALOG my_hive;
 ```
 
+You can also use the property with the prefix `table-default.` to define 
default properties for tables as above `Catalog with Filesystem Metastore`.
+
 {{< /tab >}}
 
 {{< tab "Spark3" >}}
@@ -113,6 +126,8 @@ spark-sql ... \
     --conf 
spark.sql.catalog.paimon.uri=thrift://<hive-metastore-host-name>:<port>
 ```
 
+You can also define any default properties with the prefix 
`spark.sql.catalog.paimon.table-default.` configuration for tables as above 
`Catalog with Filesystem Metastore`.
+
 After `spark-sql` is started, you can switch to the `default` database of the 
`paimon` catalog with the following SQL.
 
 ```sql
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index beecbe5af..21b012d91 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -28,15 +28,36 @@ import org.apache.paimon.table.Table;
 import org.apache.paimon.table.system.SystemTableLoader;
 import org.apache.paimon.utils.StringUtils;
 
+import java.util.HashMap;
+import java.util.Map;
+
 /** Common implementation of {@link Catalog}. */
 public abstract class AbstractCatalog implements Catalog {
 
     protected static final String DB_SUFFIX = ".db";
 
+    protected static final String TABLE_DEFAULT_OPTION_PREFIX = 
"table-default.";
+
     protected final FileIO fileIO;
 
+    protected final Map<String, String> tableDefaultOptions;
+
     protected AbstractCatalog(FileIO fileIO) {
         this.fileIO = fileIO;
+        this.tableDefaultOptions = new HashMap<>();
+    }
+
+    protected AbstractCatalog(FileIO fileIO, Map<String, String> options) {
+        this.fileIO = fileIO;
+        this.tableDefaultOptions = new HashMap<>();
+
+        options.keySet().stream()
+                .filter(key -> key.startsWith(TABLE_DEFAULT_OPTION_PREFIX))
+                .forEach(
+                        key ->
+                                this.tableDefaultOptions.put(
+                                        
key.substring(TABLE_DEFAULT_OPTION_PREFIX.length()),
+                                        options.get(key)));
     }
 
     @Override
@@ -95,6 +116,10 @@ public abstract class AbstractCatalog implements Catalog {
         }
     }
 
+    protected void copyTableDefaultOptions(Map<String, String> options) {
+        tableDefaultOptions.forEach((k, v) -> options.putIfAbsent(k, v));
+    }
+
     private String[] tableAndSystemName(Identifier identifier) {
         String[] splits = StringUtils.split(identifier.getObjectName(), 
SYSTEM_TABLE_SPLITTER);
         if (splits.length != 2) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
index f3ef51620..8a52c5182 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
@@ -28,6 +28,7 @@ import org.apache.paimon.schema.TableSchema;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.Callable;
 
@@ -41,6 +42,11 @@ public class FileSystemCatalog extends AbstractCatalog {
         this.warehouse = warehouse;
     }
 
+    public FileSystemCatalog(FileIO fileIO, Path warehouse, Map<String, 
String> options) {
+        super(fileIO, options);
+        this.warehouse = warehouse;
+    }
+
     @Override
     public Optional<CatalogLock.Factory> lockFactory() {
         return Optional.empty();
@@ -153,6 +159,8 @@ public class FileSystemCatalog extends AbstractCatalog {
             throw new TableAlreadyExistException(identifier);
         }
 
+        copyTableDefaultOptions(schema.options());
+
         uncheck(() -> new SchemaManager(fileIO, path).createTable(schema));
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java
index bb09de29d..62412d272 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java
@@ -40,6 +40,6 @@ public class FileSystemCatalogFactory implements 
CatalogFactory {
             throw new IllegalArgumentException(
                     "Only managed table is supported in File system catalog.");
         }
-        return new FileSystemCatalog(fileIO, warehouse);
+        return new FileSystemCatalog(fileIO, warehouse, 
context.options().toMap());
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
index 5d0be0614..202faea9f 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
@@ -35,6 +35,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -52,18 +53,19 @@ public class FileSystemCatalogITCase extends 
KafkaTableTestBase {
         path = getTempDirPath();
         tEnv.executeSql(
                 String.format("CREATE CATALOG fs WITH ('type'='paimon', 
'warehouse'='%s')", path));
-        tEnv.useCatalog("fs");
         env.setParallelism(1);
     }
 
     @Test
     public void testWriteRead() throws Exception {
+        tEnv.useCatalog("fs");
         tEnv.executeSql("CREATE TABLE T (a STRING, b STRING, c STRING)");
         innerTestWriteRead();
     }
 
     @Test
     public void testRenameTable() throws Exception {
+        tEnv.useCatalog("fs");
         tEnv.executeSql("CREATE TABLE t1 (a INT)").await();
         tEnv.executeSql("CREATE TABLE t2 (a INT)").await();
         tEnv.executeSql("INSERT INTO t1 VALUES(1),(2)").await();
@@ -98,6 +100,7 @@ public class FileSystemCatalogITCase extends 
KafkaTableTestBase {
         createTopicIfNotExists(topic, 1);
 
         try {
+            tEnv.useCatalog("fs");
             tEnv.executeSql(
                     String.format(
                             "CREATE TABLE T (a STRING, b STRING, c STRING) 
WITH ("
@@ -118,6 +121,7 @@ public class FileSystemCatalogITCase extends 
KafkaTableTestBase {
         createTopicIfNotExists(topic, 1);
 
         try {
+            tEnv.useCatalog("fs");
             tEnv.executeSql(
                     String.format(
                             "CREATE TABLE T ("
@@ -142,6 +146,50 @@ public class FileSystemCatalogITCase extends 
KafkaTableTestBase {
         }
     }
 
+    @Test
+    public void testCatalogOptionsInheritAndOverride() throws Exception {
+        tEnv.executeSql(
+                String.format(
+                        "CREATE CATALOG fs_with_options WITH ("
+                                + "'type'='paimon', "
+                                + "'warehouse'='%s', "
+                                + "'table-default.opt1'='value1', "
+                                + "'table-default.opt2'='value2', "
+                                + "'table-default.opt3'='value3', "
+                                + "'fs.allow-hadoop-fallback'='false',"
+                                + "'lock.enabled'='true'"
+                                + ")",
+                        path));
+        tEnv.useCatalog("fs_with_options");
+
+        // check table inherit catalog options
+        tEnv.executeSql("CREATE TABLE t1_options (a STRING, b STRING, c 
STRING)");
+
+        Identifier identifier = new Identifier(DB_NAME, "t1_options");
+        Catalog catalog =
+                ((FlinkCatalog) 
tEnv.getCatalog(tEnv.getCurrentCatalog()).get()).catalog();
+        Map<String, String> tableOptions = 
catalog.getTable(identifier).options();
+
+        assertThat(tableOptions).containsEntry("opt1", "value1");
+        assertThat(tableOptions).containsEntry("opt2", "value2");
+        assertThat(tableOptions).containsEntry("opt3", "value3");
+        assertThat(tableOptions).doesNotContainKey("fs.allow-hadoop-fallback");
+        assertThat(tableOptions).doesNotContainKey("lock.enabled");
+
+        // check table options override catalog's
+        tEnv.executeSql(
+                "CREATE TABLE t2_options (a STRING, b STRING, c STRING) WITH 
('opt3'='value4')");
+
+        identifier = new Identifier(DB_NAME, "t2_options");
+        tableOptions = catalog.getTable(identifier).options();
+
+        assertThat(tableOptions).containsEntry("opt1", "value1");
+        assertThat(tableOptions).containsEntry("opt2", "value2");
+        assertThat(tableOptions).containsEntry("opt3", "value4");
+        assertThat(tableOptions).doesNotContainKey("fs.allow-hadoop-fallback");
+        assertThat(tableOptions).doesNotContainKey("lock.enabled");
+    }
+
     private void innerTestWriteRead() throws Exception {
         BlockingIterator<Row, Row> iterator =
                 BlockingIterator.of(tEnv.from("T").execute().collect());
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 5af1e58de..737841d07 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -65,6 +65,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
@@ -103,6 +104,14 @@ public class HiveCatalog extends AbstractCatalog {
         this.client = createClient(hiveConf, clientClassName);
     }
 
+    public HiveCatalog(
+            FileIO fileIO, HiveConf hiveConf, String clientClassName, 
Map<String, String> options) {
+        super(fileIO, options);
+        this.hiveConf = hiveConf;
+        this.clientClassName = clientClassName;
+        this.client = createClient(hiveConf, clientClassName);
+    }
+
     @Override
     public Optional<CatalogLock.Factory> lockFactory() {
         return lockEnabled()
@@ -250,6 +259,9 @@ public class HiveCatalog extends AbstractCatalog {
         checkFieldNamesUpperCase(schema.rowType().getFieldNames());
         // first commit changes to underlying files
         // if changes on Hive fails there is no harm to perform the same 
changes to files again
+
+        copyTableDefaultOptions(schema.options());
+
         TableSchema tableSchema;
         try {
             tableSchema = schemaManager(identifier).createTable(schema);
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogFactory.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogFactory.java
index a33463738..e8771df31 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogFactory.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogFactory.java
@@ -73,6 +73,6 @@ public class HiveCatalogFactory implements CatalogFactory {
 
         String clientClassName = context.options().get(METASTORE_CLIENT_CLASS);
 
-        return new HiveCatalog(fileIO, hiveConf, clientClassName);
+        return new HiveCatalog(fileIO, hiveConf, clientClassName, 
context.options().toMap());
     }
 }
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
index ad113afe9..5dd9b46b3 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
@@ -49,6 +49,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -517,6 +518,54 @@ public abstract class HiveCatalogITCaseBase {
         Assert.assertEquals("[]", tables.toString());
     }
 
+    @Test
+    public void testCatalogOptionsInheritAndOverride() throws Exception {
+        tEnv.executeSql(
+                        String.join(
+                                "\n",
+                                "CREATE CATALOG my_hive_options WITH (",
+                                "  'type' = 'paimon',",
+                                "  'metastore' = 'hive',",
+                                "  'uri' = '',",
+                                "  'hive-conf-dir' = '"
+                                        + 
hiveShell.getBaseDir().getRoot().getPath()
+                                        + HIVE_CONF
+                                        + "',",
+                                "  'warehouse' = '" + path + "',",
+                                "  'lock.enabled' = 'true',",
+                                "  'table-default.opt1' = 'value1',",
+                                "  'table-default.opt2' = 'value2',",
+                                "  'table-default.opt3' = 'value3'",
+                                ")"))
+                .await();
+        tEnv.executeSql("USE CATALOG my_hive_options").await();
+
+        // check inherit
+        tEnv.executeSql("CREATE TABLE table_without_options (a INT, b 
STRING)").await();
+
+        Identifier identifier = new Identifier("default", 
"table_without_options");
+        Catalog catalog =
+                ((FlinkCatalog) 
tEnv.getCatalog(tEnv.getCurrentCatalog()).get()).catalog();
+        Map<String, String> tableOptions = 
catalog.getTable(identifier).options();
+
+        Assertions.assertThat(tableOptions).containsEntry("opt1", "value1");
+        Assertions.assertThat(tableOptions).containsEntry("opt2", "value2");
+        Assertions.assertThat(tableOptions).containsEntry("opt3", "value3");
+        Assertions.assertThat(tableOptions).doesNotContainKey("lock.enabled");
+
+        // check override
+        tEnv.executeSql(
+                        "CREATE TABLE table_with_options (a INT, b STRING) 
WITH ('opt1' = 'new_value')")
+                .await();
+        identifier = new Identifier("default", "table_with_options");
+        tableOptions = catalog.getTable(identifier).options();
+
+        Assertions.assertThat(tableOptions).containsEntry("opt1", "new_value");
+        Assertions.assertThat(tableOptions).containsEntry("opt2", "value2");
+        Assertions.assertThat(tableOptions).containsEntry("opt3", "value3");
+        Assertions.assertThat(tableOptions).doesNotContainKey("lock.enabled");
+    }
+
     protected List<Row> collect(String sql) throws Exception {
         List<Row> result = new ArrayList<>();
         try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) {

Reply via email to