This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c001b4f7 [flink] Support to set the table option by dynamic global 
job configu… (#2104)
5c001b4f7 is described below

commit 5c001b4f7da2ae973293f74a544535a77aba1708
Author: Aitozi <[email protected]>
AuthorDate: Tue Oct 24 11:07:43 2023 +0800

    [flink] Support to set the table option by dynamic global job configu… 
(#2104)
---
 docs/content/engines/flink.md                      | 16 +++++
 .../paimon/flink/AbstractFlinkTableFactory.java    | 82 +++++++++++++++++++++-
 .../apache/paimon/flink/FlinkConnectorOptions.java |  2 +
 .../flink/AbstractFlinkTableFactoryTest.java       | 26 +++++++
 .../apache/paimon/flink/AppendOnlyTableITCase.java | 18 +++++
 5 files changed, 142 insertions(+), 2 deletions(-)

diff --git a/docs/content/engines/flink.md b/docs/content/engines/flink.md
index 45caeeeb9..543f13f13 100644
--- a/docs/content/engines/flink.md
+++ b/docs/content/engines/flink.md
@@ -301,3 +301,19 @@ Users can set memory weight in SQL for Flink Managed 
Memory, then Flink sink ope
 INSERT INTO paimon_table /*+ 
OPTIONS('sink.use-managed-memory-allocator'='true', 
'sink.managed.writer-buffer-memory'='256M') */
 SELECT * FROM ....;
 ```
+## Setting dynamic options
+
+When interacting with the Paimon table, table options can be tuned without 
changing the options in the catalog. Paimon will extract job-level dynamic 
options and take effect in the current session.
+The dynamic option's key format is 
`paimon.${catalogName}.${dbName}.${tableName}.${config_key}`. The 
catalogName/dbName/tableName can be `*`, which means matching all the specific 
parts. 
+
+For example:
+
+```sql
+-- set scan.timestamp-millis=1697018249000 for the table mycatalog.default.T
+SET 'paimon.mycatalog.default.T.scan.timestamp-millis' = '1697018249000';
+SELECT * FROM T;
+
+-- set scan.timestamp-millis=1697018249000 for the table default.T in any 
catalog
+SET 'paimon.*.default.T.scan.timestamp-millis' = '1697018249000';
+SELECT * FROM T;
+```
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
index 31acd85b9..0d87742f8 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
@@ -36,6 +36,7 @@ import org.apache.paimon.lineage.TableLineageEntity;
 import org.apache.paimon.lineage.TableLineageEntityImpl;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.table.Table;
@@ -43,8 +44,11 @@ import org.apache.paimon.utils.Preconditions;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
@@ -53,16 +57,21 @@ import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.factories.DynamicTableSinkFactory;
 import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.types.logical.RowType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.function.BiConsumer;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
 import static org.apache.paimon.CoreOptions.LOG_CONSISTENCY;
@@ -80,6 +89,8 @@ import static 
org.apache.paimon.flink.log.LogStoreTableFactory.discoverLogStoreF
 public abstract class AbstractFlinkTableFactory
         implements DynamicTableSourceFactory, DynamicTableSinkFactory {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(AbstractFlinkTableFactory.class);
+
     @Override
     public DynamicTableSource createDynamicTableSource(Context context) {
         CatalogTable origin = context.getCatalogTable().getOrigin();
@@ -210,10 +221,22 @@ public abstract class AbstractFlinkTableFactory
         CatalogTable origin = context.getCatalogTable().getOrigin();
         Table table;
 
+        Map<String, String> dynamicOptions = 
getDynamicTableConfigOptions(context);
+        dynamicOptions.forEach(
+                (key, value) -> {
+                    if (origin.getOptions().get(key) == null
+                            || !origin.getOptions().get(key).equals(value)) {
+                        SchemaManager.checkAlterTableOption(key);
+                    }
+                });
+        Map<String, String> newOptions = new HashMap<>();
+        newOptions.putAll(origin.getOptions());
+        newOptions.putAll(dynamicOptions);
+
         if (origin instanceof DataCatalogTable) {
-            table = ((DataCatalogTable) 
origin).table().copy(origin.getOptions());
+            table = ((DataCatalogTable) origin).table().copy(newOptions);
         } else {
-            table = 
FileStoreTableFactory.create(createCatalogContext(context));
+            table = 
FileStoreTableFactory.create(createCatalogContext(context)).copy(newOptions);
         }
 
         Schema schema = 
FlinkCatalog.fromCatalogTable(context.getCatalogTable());
@@ -263,4 +286,59 @@ public abstract class AbstractFlinkTableFactory
         }
         return true;
     }
+
+    /**
+     * The dynamic option's format is:
+     *
+     * <p>{@link
+     * 
FlinkConnectorOptions#TABLE_DYNAMIC_OPTION_PREFIX}.${catalog}.${database}.${tableName}.key
 =
+     * value. These job level configs will be extracted and injected into the 
target table option.
+     *
+     * @param context The table factory context.
+     * @return The dynamic options of this target table.
+     */
+    static Map<String, String> 
getDynamicTableConfigOptions(DynamicTableFactory.Context context) {
+
+        Map<String, String> optionsFromTableConfig = new HashMap<>();
+
+        ReadableConfig config = context.getConfiguration();
+
+        Map<String, String> conf;
+
+        if (config instanceof Configuration) {
+            conf = ((Configuration) config).toMap();
+        } else if (config instanceof TableConfig) {
+            conf = ((TableConfig) config).getConfiguration().toMap();
+        } else {
+            throw new IllegalArgumentException("Unexpected config: " + 
config.getClass());
+        }
+
+        String template =
+                String.format(
+                        "(%s)\\.(%s|\\*)\\.(%s|\\*)\\.(%s|\\*)\\.(.+)",
+                        FlinkConnectorOptions.TABLE_DYNAMIC_OPTION_PREFIX,
+                        context.getObjectIdentifier().getCatalogName(),
+                        context.getObjectIdentifier().getDatabaseName(),
+                        context.getObjectIdentifier().getObjectName());
+        Pattern pattern = Pattern.compile(template);
+
+        conf.keySet()
+                .forEach(
+                        (key) -> {
+                            if 
(key.startsWith(FlinkConnectorOptions.TABLE_DYNAMIC_OPTION_PREFIX)) {
+                                Matcher matcher = pattern.matcher(key);
+                                if (matcher.find()) {
+                                    
optionsFromTableConfig.put(matcher.group(5), conf.get(key));
+                                }
+                            }
+                        });
+
+        if (!optionsFromTableConfig.isEmpty()) {
+            LOG.info(
+                    "Loading dynamic table options for {} in table config: {}",
+                    context.getObjectIdentifier().getObjectName(),
+                    optionsFromTableConfig);
+        }
+        return optionsFromTableConfig;
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index cefd4517a..551f541fd 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -43,6 +43,8 @@ public class FlinkConnectorOptions {
 
     public static final String NONE = "none";
 
+    public static final String TABLE_DYNAMIC_OPTION_PREFIX = "paimon";
+
     public static final ConfigOption<String> LOG_SYSTEM =
             ConfigOptions.key("log.system")
                     .stringType()
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AbstractFlinkTableFactoryTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AbstractFlinkTableFactoryTest.java
index 41958a016..aeb46da8d 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AbstractFlinkTableFactoryTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AbstractFlinkTableFactoryTest.java
@@ -18,13 +18,20 @@
 
 package org.apache.paimon.flink;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.VarCharType;
 import org.junit.jupiter.api.Test;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
 
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -53,6 +60,25 @@ public class AbstractFlinkTableFactoryTest {
                 true);
     }
 
+    @Test
+    public void testGetDynamicOptions() {
+        Configuration configuration = new Configuration();
+        configuration.setString("paimon.catalog1.db.T.k1", "v1");
+        configuration.setString("paimon.*.db.*.k2", "v2");
+        ObjectIdentifier identifier = ObjectIdentifier.of("catalog1", "db", 
"T");
+        DynamicTableFactory.Context context =
+                new FactoryUtil.DefaultDynamicTableContext(
+                        identifier,
+                        null,
+                        new HashMap<>(),
+                        configuration,
+                        AbstractFlinkTableFactoryTest.class.getClassLoader(),
+                        false);
+        Map<String, String> options =
+                
AbstractFlinkTableFactory.getDynamicTableConfigOptions(context);
+        assertThat(options).isEqualTo(ImmutableMap.of("k1", "v1", "k2", "v2"));
+    }
+
     private void innerTest(RowType r1, RowType r2, boolean expectEquals) {
         assertThat(AbstractFlinkTableFactory.schemaEquals(r1, 
r2)).isEqualTo(expectEquals);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java
index 9e1408658..0dc141ee6 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java
@@ -18,7 +18,9 @@
 
 package org.apache.paimon.flink;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
+import org.apache.paimon.utils.BlockingIterator;
 
 import org.apache.flink.table.planner.factories.TestValuesTableFactory;
 import org.apache.flink.types.Row;
@@ -241,6 +243,22 @@ public class AppendOnlyTableITCase extends 
CatalogITCaseBase {
                                         .toInstant()));
     }
 
+    @Test
+    public void testDynamicOptions() throws Exception {
+        sql("CREATE TABLE T (id INT) WITH ('write-mode'='append-only')");
+        batchSql("INSERT INTO T VALUES (1)");
+        sEnv.getConfig()
+                .getConfiguration()
+                .setString(
+                        "paimon.*.*.T." + CoreOptions.SCAN_MODE.key(),
+                        CoreOptions.StartupMode.LATEST.toString());
+        BlockingIterator<Row, Row> iterator = streamSqlBlockIter("SELECT * 
FROM T");
+
+        sql("INSERT INTO T VALUES (2)");
+        // Only fetch latest snapshot is, dynamic option worked
+        assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of(2));
+    }
+
     @Override
     protected List<String> ddl() {
         return Arrays.asList(

Reply via email to