[ 
https://issues.apache.org/jira/browse/FLINK-32798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17758804#comment-17758804
 ] 

Hang Ruan commented on FLINK-32798:
-----------------------------------

Hi, all. [~renqs] [~zjureel] 

I have modify the code to test using the configuration.
{code:java}
package org.self.listener;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.catalog.listener.CatalogModificationListener;
import 
org.apache.flink.table.catalog.listener.CatalogModificationListenerFactory;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

public class MyCatalogListenerFactory implements 
CatalogModificationListenerFactory {

    public static final ConfigOption<String> URL =
            ConfigOptions.key("my.catalog.listener.url")
                    .stringType()
                    .noDefaultValue()
                    .withDescription(
                            "JDBC url of the MySQL database to use when 
connecting to the MySQL database server.");

    public static final ConfigOption<String> USERNAME =
            ConfigOptions.key("my.catalog.listener.username")
                    .stringType()
                    .noDefaultValue()
                    .withDescription(
                            "Username of the MySQL database to use when 
connecting to the MySQL database server.");

    public static final ConfigOption<String> PASSWORD =
            ConfigOptions.key("my.catalog.listener.password")
                    .stringType()
                    .noDefaultValue()
                    .withDescription(
                            "Password to use when connecting to the MySQL 
database server.");
    @Override
    public CatalogModificationListener createListener(Context context) {
        ReadableConfig config = context.getConfiguration();
        return new MyCatalogListener(config.get(URL), config.get(USERNAME), 
config.get(PASSWORD));
    }

    @Override
    public Set<ConfigOption<?>> requiredOptions() {
        Set<ConfigOption<?>> requiredOptions = new HashSet<>();
        requiredOptions.add(URL);
        requiredOptions.add(USERNAME);
        requiredOptions.add(PASSWORD);
        return requiredOptions;
    }

    @Override
    public String factoryIdentifier() {
        return "test";
    }
} {code}
And add the parameters in `flink-conf.yaml`.
{code:java}
table.catalog-modification.listeners: test
my.catalog.listener.url: 
jdbc:mysql://hostname:3306/db?useSSL=false&connectTimeout=30000
my.catalog.listener.username: username
my.catalog.listener.password: password{code}
It works well.

> Release Testing: Verify FLIP-294: Support Customized Catalog Modification 
> Listener
> ----------------------------------------------------------------------------------
>
>                 Key: FLINK-32798
>                 URL: https://issues.apache.org/jira/browse/FLINK-32798
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Tests
>    Affects Versions: 1.18.0
>            Reporter: Qingsheng Ren
>            Assignee: Hang Ruan
>            Priority: Major
>             Fix For: 1.18.0
>
>         Attachments: result.png, sqls.png, test.png
>
>
> The document about catalog modification listener is: 
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#catalog-modification-listener



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to