[ https://issues.apache.org/jira/browse/FLINK-32798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17758804#comment-17758804 ]
Hang Ruan commented on FLINK-32798: ----------------------------------- Hi, all. [~renqs] [~zjureel] I have modify the code to test using the configuration. {code:java} package org.self.listener; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.catalog.listener.CatalogModificationListener; import org.apache.flink.table.catalog.listener.CatalogModificationListenerFactory; import java.util.Collections; import java.util.HashSet; import java.util.Set; public class MyCatalogListenerFactory implements CatalogModificationListenerFactory { public static final ConfigOption<String> URL = ConfigOptions.key("my.catalog.listener.url") .stringType() .noDefaultValue() .withDescription( "JDBC url of the MySQL database to use when connecting to the MySQL database server."); public static final ConfigOption<String> USERNAME = ConfigOptions.key("my.catalog.listener.username") .stringType() .noDefaultValue() .withDescription( "Username of the MySQL database to use when connecting to the MySQL database server."); public static final ConfigOption<String> PASSWORD = ConfigOptions.key("my.catalog.listener.password") .stringType() .noDefaultValue() .withDescription( "Password to use when connecting to the MySQL database server."); @Override public CatalogModificationListener createListener(Context context) { ReadableConfig config = context.getConfiguration(); return new MyCatalogListener(config.get(URL), config.get(USERNAME), config.get(PASSWORD)); } @Override public Set<ConfigOption<?>> requiredOptions() { Set<ConfigOption<?>> requiredOptions = new HashSet<>(); requiredOptions.add(URL); requiredOptions.add(USERNAME); requiredOptions.add(PASSWORD); return requiredOptions; } @Override public String factoryIdentifier() { return "test"; } } {code} And add the parameters in `flink-conf.yaml`. {code:java} table.catalog-modification.listeners: test my.catalog.listener.url: jdbc:mysql://hostname:3306/db?useSSL=false&connectTimeout=30000 my.catalog.listener.username: username my.catalog.listener.password: password{code} It works well. > Release Testing: Verify FLIP-294: Support Customized Catalog Modification > Listener > ---------------------------------------------------------------------------------- > > Key: FLINK-32798 > URL: https://issues.apache.org/jira/browse/FLINK-32798 > Project: Flink > Issue Type: Sub-task > Components: Tests > Affects Versions: 1.18.0 > Reporter: Qingsheng Ren > Assignee: Hang Ruan > Priority: Major > Fix For: 1.18.0 > > Attachments: result.png, sqls.png, test.png > > > The document about catalog modification listener is: > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#catalog-modification-listener -- This message was sent by Atlassian Jira (v8.20.10#820010)