This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5c001b4f7 [flink] Support to set the table option by dynamic global
job configu… (#2104)
5c001b4f7 is described below
commit 5c001b4f7da2ae973293f74a544535a77aba1708
Author: Aitozi <[email protected]>
AuthorDate: Tue Oct 24 11:07:43 2023 +0800
[flink] Support to set the table option by dynamic global job configu…
(#2104)
---
docs/content/engines/flink.md | 16 +++++
.../paimon/flink/AbstractFlinkTableFactory.java | 82 +++++++++++++++++++++-
.../apache/paimon/flink/FlinkConnectorOptions.java | 2 +
.../flink/AbstractFlinkTableFactoryTest.java | 26 +++++++
.../apache/paimon/flink/AppendOnlyTableITCase.java | 18 +++++
5 files changed, 142 insertions(+), 2 deletions(-)
diff --git a/docs/content/engines/flink.md b/docs/content/engines/flink.md
index 45caeeeb9..543f13f13 100644
--- a/docs/content/engines/flink.md
+++ b/docs/content/engines/flink.md
@@ -301,3 +301,19 @@ Users can set memory weight in SQL for Flink Managed
Memory, then Flink sink ope
INSERT INTO paimon_table /*+
OPTIONS('sink.use-managed-memory-allocator'='true',
'sink.managed.writer-buffer-memory'='256M') */
SELECT * FROM ....;
```
+## Setting dynamic options
+
+When interacting with the Paimon table, table options can be tuned without
changing the options in the catalog. Paimon will extract job-level dynamic
options and take effect in the current session.
+The dynamic option's key format is
`paimon.${catalogName}.${dbName}.${tableName}.${config_key}`. The
catalogName/dbName/tableName can be `*`, which means matching all the specific
parts.
+
+For example:
+
+```sql
+-- set scan.timestamp-millis=1697018249000 for the table mycatalog.default.T
+SET 'paimon.mycatalog.default.T.scan.timestamp-millis' = '1697018249000';
+SELECT * FROM T;
+
+-- set scan.timestamp-millis=1697018249000 for the table default.T in any
catalog
+SET 'paimon.*.default.T.scan.timestamp-millis' = '1697018249000';
+SELECT * FROM T;
+```
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
index 31acd85b9..0d87742f8 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
@@ -36,6 +36,7 @@ import org.apache.paimon.lineage.TableLineageEntity;
import org.apache.paimon.lineage.TableLineageEntityImpl;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.Table;
@@ -43,8 +44,11 @@ import org.apache.paimon.utils.Preconditions;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.connector.sink.DynamicTableSink;
@@ -53,16 +57,21 @@ import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.types.logical.RowType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
import static org.apache.paimon.CoreOptions.LOG_CONSISTENCY;
@@ -80,6 +89,8 @@ import static
org.apache.paimon.flink.log.LogStoreTableFactory.discoverLogStoreF
public abstract class AbstractFlinkTableFactory
implements DynamicTableSourceFactory, DynamicTableSinkFactory {
+ private static final Logger LOG =
LoggerFactory.getLogger(AbstractFlinkTableFactory.class);
+
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
CatalogTable origin = context.getCatalogTable().getOrigin();
@@ -210,10 +221,22 @@ public abstract class AbstractFlinkTableFactory
CatalogTable origin = context.getCatalogTable().getOrigin();
Table table;
+ Map<String, String> dynamicOptions =
getDynamicTableConfigOptions(context);
+ dynamicOptions.forEach(
+ (key, value) -> {
+ if (origin.getOptions().get(key) == null
+ || !origin.getOptions().get(key).equals(value)) {
+ SchemaManager.checkAlterTableOption(key);
+ }
+ });
+ Map<String, String> newOptions = new HashMap<>();
+ newOptions.putAll(origin.getOptions());
+ newOptions.putAll(dynamicOptions);
+
if (origin instanceof DataCatalogTable) {
- table = ((DataCatalogTable)
origin).table().copy(origin.getOptions());
+ table = ((DataCatalogTable) origin).table().copy(newOptions);
} else {
- table =
FileStoreTableFactory.create(createCatalogContext(context));
+ table =
FileStoreTableFactory.create(createCatalogContext(context)).copy(newOptions);
}
Schema schema =
FlinkCatalog.fromCatalogTable(context.getCatalogTable());
@@ -263,4 +286,59 @@ public abstract class AbstractFlinkTableFactory
}
return true;
}
+
+ /**
+ * The dynamic option's format is:
+ *
+ * <p>{@link
+ *
FlinkConnectorOptions#TABLE_DYNAMIC_OPTION_PREFIX}.${catalog}.${database}.${tableName}.key
=
+ * value. These job level configs will be extracted and injected into the
target table option.
+ *
+ * @param context The table factory context.
+ * @return The dynamic options of this target table.
+ */
+ static Map<String, String>
getDynamicTableConfigOptions(DynamicTableFactory.Context context) {
+
+ Map<String, String> optionsFromTableConfig = new HashMap<>();
+
+ ReadableConfig config = context.getConfiguration();
+
+ Map<String, String> conf;
+
+ if (config instanceof Configuration) {
+ conf = ((Configuration) config).toMap();
+ } else if (config instanceof TableConfig) {
+ conf = ((TableConfig) config).getConfiguration().toMap();
+ } else {
+ throw new IllegalArgumentException("Unexpected config: " +
config.getClass());
+ }
+
+ String template =
+ String.format(
+ "(%s)\\.(%s|\\*)\\.(%s|\\*)\\.(%s|\\*)\\.(.+)",
+ FlinkConnectorOptions.TABLE_DYNAMIC_OPTION_PREFIX,
+ context.getObjectIdentifier().getCatalogName(),
+ context.getObjectIdentifier().getDatabaseName(),
+ context.getObjectIdentifier().getObjectName());
+ Pattern pattern = Pattern.compile(template);
+
+ conf.keySet()
+ .forEach(
+ (key) -> {
+ if
(key.startsWith(FlinkConnectorOptions.TABLE_DYNAMIC_OPTION_PREFIX)) {
+ Matcher matcher = pattern.matcher(key);
+ if (matcher.find()) {
+
optionsFromTableConfig.put(matcher.group(5), conf.get(key));
+ }
+ }
+ });
+
+ if (!optionsFromTableConfig.isEmpty()) {
+ LOG.info(
+ "Loading dynamic table options for {} in table config: {}",
+ context.getObjectIdentifier().getObjectName(),
+ optionsFromTableConfig);
+ }
+ return optionsFromTableConfig;
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index cefd4517a..551f541fd 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -43,6 +43,8 @@ public class FlinkConnectorOptions {
public static final String NONE = "none";
+ public static final String TABLE_DYNAMIC_OPTION_PREFIX = "paimon";
+
public static final ConfigOption<String> LOG_SYSTEM =
ConfigOptions.key("log.system")
.stringType()
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AbstractFlinkTableFactoryTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AbstractFlinkTableFactoryTest.java
index 41958a016..aeb46da8d 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AbstractFlinkTableFactoryTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AbstractFlinkTableFactoryTest.java
@@ -18,13 +18,20 @@
package org.apache.paimon.flink;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.junit.jupiter.api.Test;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
@@ -53,6 +60,25 @@ public class AbstractFlinkTableFactoryTest {
true);
}
+ @Test
+ public void testGetDynamicOptions() {
+ Configuration configuration = new Configuration();
+ configuration.setString("paimon.catalog1.db.T.k1", "v1");
+ configuration.setString("paimon.*.db.*.k2", "v2");
+ ObjectIdentifier identifier = ObjectIdentifier.of("catalog1", "db",
"T");
+ DynamicTableFactory.Context context =
+ new FactoryUtil.DefaultDynamicTableContext(
+ identifier,
+ null,
+ new HashMap<>(),
+ configuration,
+ AbstractFlinkTableFactoryTest.class.getClassLoader(),
+ false);
+ Map<String, String> options =
+
AbstractFlinkTableFactory.getDynamicTableConfigOptions(context);
+ assertThat(options).isEqualTo(ImmutableMap.of("k1", "v1", "k2", "v2"));
+ }
+
private void innerTest(RowType r1, RowType r2, boolean expectEquals) {
assertThat(AbstractFlinkTableFactory.schemaEquals(r1,
r2)).isEqualTo(expectEquals);
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java
index 9e1408658..0dc141ee6 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java
@@ -18,7 +18,9 @@
package org.apache.paimon.flink;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
+import org.apache.paimon.utils.BlockingIterator;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
@@ -241,6 +243,22 @@ public class AppendOnlyTableITCase extends
CatalogITCaseBase {
.toInstant()));
}
+ @Test
+ public void testDynamicOptions() throws Exception {
+ sql("CREATE TABLE T (id INT) WITH ('write-mode'='append-only')");
+ batchSql("INSERT INTO T VALUES (1)");
+ sEnv.getConfig()
+ .getConfiguration()
+ .setString(
+ "paimon.*.*.T." + CoreOptions.SCAN_MODE.key(),
+ CoreOptions.StartupMode.LATEST.toString());
+ BlockingIterator<Row, Row> iterator = streamSqlBlockIter("SELECT *
FROM T");
+
+ sql("INSERT INTO T VALUES (2)");
+ // Only fetch latest snapshot is, dynamic option worked
+ assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of(2));
+ }
+
@Override
protected List<String> ddl() {
return Arrays.asList(