[
https://issues.apache.org/jira/browse/FLINK-32798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17758339#comment-17758339
]
Hang Ruan edited comment on FLINK-32798 at 8/24/23 4:12 AM:
------------------------------------------------------------
Hi, all. I have done some tests about this feature.
1.Create a mysql table to store the changes.
{code:java}
CREATE TABLE `listener_test` ( `id` bigint NOT NULL AUTO_INCREMENT, `catalog`
varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
`identifier` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT
NULL, `type` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT
NULL, `detail` text CHARACTER SET utf8 COLLATE utf8_general_ci, PRIMARY KEY
(`id`)) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8mb3
ROW_FORMAT=DYNAMIC; {code}
2.Develop the Factory and Listener.
{code:java}
package org.self.listener;
import org.apache.flink.table.catalog.listener.CatalogModificationListener;
import
org.apache.flink.table.catalog.listener.CatalogModificationListenerFactory;
public class MyCatalogListenerFactory implements
CatalogModificationListenerFactory {
@Override
public CatalogModificationListener createListener(Context context) {
return new
MyCatalogListener("jdbc:mysql://hostname:3306/db?useSSL=false&connectTimeout=30000",
"username", "password");
}
@Override
public String factoryIdentifier() {
return "test";
}
} {code}
{code:java}
package org.self.listener;
import org.apache.flink.table.catalog.listener.AlterDatabaseEvent;
import org.apache.flink.table.catalog.listener.AlterTableEvent;
import org.apache.flink.table.catalog.listener.CatalogModificationEvent;
import org.apache.flink.table.catalog.listener.CatalogModificationListener;
import org.apache.flink.table.catalog.listener.CreateDatabaseEvent;
import org.apache.flink.table.catalog.listener.CreateTableEvent;
import org.apache.flink.table.catalog.listener.DropDatabaseEvent;
import org.apache.flink.table.catalog.listener.DropTableEvent;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.stream.Collectors;
public class MyCatalogListener implements CatalogModificationListener {
private final String jdbcUrl;
private final String username;
private final String password;
public MyCatalogListener(String jdbcUrl, String username, String password) {
this.jdbcUrl = jdbcUrl;
this.username = username;
this.password = password;
}
@Override
public void onEvent(CatalogModificationEvent event) {
try {
Class.forName(getDriverClassName());
try (Connection connection = DriverManager.getConnection(jdbcUrl,
username, password);
PreparedStatement statement =
connection.prepareStatement("INSERT INTO `listener_test`
(`catalog`,`identifier`,`type`,`detail`) VALUES (?,?,?,?);")) {
String identifier;
String type;
String detail;
String catalog = event.context().getCatalogName();
if (event instanceof CreateDatabaseEvent) {
CreateDatabaseEvent cde = (CreateDatabaseEvent) event;
identifier = "DB:" + cde.databaseName();
detail =
cde.database().getProperties().entrySet().stream().map(e -> e.getKey() + ":" +
e.getValue()).collect(Collectors.joining(", "));
type = "CREATE DB";
} else if (event instanceof AlterDatabaseEvent) {
AlterDatabaseEvent ade = (AlterDatabaseEvent) event;
identifier = "DB:" + ade.databaseName();
detail =
ade.newDatabase().getProperties().entrySet().stream().map(e -> e.getKey() + ":"
+ e.getValue()).collect(Collectors.joining(", "));
type = "ALTER DB";
} else if (event instanceof DropDatabaseEvent) {
DropDatabaseEvent dde = (DropDatabaseEvent) event;
identifier = "DB:" + dde.databaseName();
detail = "null";
type = "DELETE DB";
} else if (event instanceof CreateTableEvent) {
CreateTableEvent cte = (CreateTableEvent) event;
identifier = "TBL:" + cte.identifier().toString();
detail = cte.table().toString();
type = "CREATE TBL";
} else if (event instanceof AlterTableEvent) {
AlterTableEvent ate = (AlterTableEvent) event;
identifier = "TBL:" + ate.identifier().toString();
detail = ate.newTable().toString();
type = "ALTER TBL";
} else if (event instanceof DropTableEvent) {
DropTableEvent dte = (DropTableEvent) event;
identifier = "TBL:" + dte.identifier().toString();
detail = dte.table().toString();
type = "DELETE TBL";
} else {
throw new IllegalArgumentException("Unknown event type.");
}
statement.setObject(1, catalog);
statement.setObject(2, identifier);
statement.setObject(3, type);
statement.setObject(4, detail);
statement.execute();
}
} catch (Exception e) {
throw new IllegalArgumentException(e);
}
}
private String getDriverClassName() {
try {
Class.forName("com.mysql.cj.jdbc.Driver");
return "com.mysql.cj.jdbc.Driver";
} catch (ClassNotFoundException e) {
return "com.mysql.jdbc.Driver";
}
}
} {code}
3.Add file `org.apache.flink.table.factories.Factory` to the resources.
4.Package the code and put it in `lib`.
5.Add `table.catalog-modification.listeners: test` to `flink-conf.yaml`.
6.Start sql client and test. The test result as follows.
!sqls.png!
!result.png!
It seems good when using by the sql client. And I think we should add the Step
3 and 4 to docs.
Does FLIP-294 support to provide parameters when creating listeners? If it is
supported, I think we should describe how to provide parameters for the
listener in docs.
was (Author: ruanhang1993):
Hi, all. I have done some tests about this feature.
1.Create a mysql table to store the changes.
{code:java}
CREATE TABLE `listener_test` ( `id` bigint NOT NULL AUTO_INCREMENT, `catalog`
varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
`identifier` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT
NULL, `type` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT
NULL, `detail` text CHARACTER SET utf8 COLLATE utf8_general_ci, PRIMARY KEY
(`id`)) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8mb3
ROW_FORMAT=DYNAMIC; {code}
2.Develop the Factory and Listener.
{code:java}
package org.self.listener;
import org.apache.flink.table.catalog.listener.CatalogModificationListener;
import
org.apache.flink.table.catalog.listener.CatalogModificationListenerFactory;
public class MyCatalogListenerFactory implements
CatalogModificationListenerFactory {
@Override
public CatalogModificationListener createListener(Context context) {
return new
MyCatalogListener("jdbc:mysql://hostname:3306/db?useSSL=false&connectTimeout=30000",
"username", "password");
}
@Override
public String factoryIdentifier() {
return "test";
}
} {code}
{code:java}
package org.self.listener;
import org.apache.flink.table.catalog.listener.AlterDatabaseEvent;
import org.apache.flink.table.catalog.listener.AlterTableEvent;
import org.apache.flink.table.catalog.listener.CatalogModificationEvent;
import org.apache.flink.table.catalog.listener.CatalogModificationListener;
import org.apache.flink.table.catalog.listener.CreateDatabaseEvent;
import org.apache.flink.table.catalog.listener.CreateTableEvent;
import org.apache.flink.table.catalog.listener.DropDatabaseEvent;
import org.apache.flink.table.catalog.listener.DropTableEvent;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.stream.Collectors;
public class MyCatalogListener implements CatalogModificationListener {
private final String jdbcUrl;
private final String username;
private final String password;
public MyCatalogListener(String jdbcUrl, String username, String password) {
this.jdbcUrl = jdbcUrl;
this.username = username;
this.password = password;
}
@Override
public void onEvent(CatalogModificationEvent event) {
try {
Class.forName(getDriverClassName());
try (Connection connection = DriverManager.getConnection(jdbcUrl,
username, password);
PreparedStatement statement =
connection.prepareStatement("INSERT INTO `listener_test`
(`catalog`,`identifier`,`type`,`detail`) VALUES (?,?,?,?);")) {
String identifier;
String type;
String detail;
String catalog = event.context().getCatalogName();
if (event instanceof CreateDatabaseEvent) {
CreateDatabaseEvent cde = (CreateDatabaseEvent) event;
identifier = "DB:" + cde.databaseName();
detail =
cde.database().getProperties().entrySet().stream().map(e -> e.getKey() + ":" +
e.getValue()).collect(Collectors.joining(", "));
type = "CREATE DB";
} else if (event instanceof AlterDatabaseEvent) {
AlterDatabaseEvent ade = (AlterDatabaseEvent) event;
identifier = "DB:" + ade.databaseName();
detail =
ade.newDatabase().getProperties().entrySet().stream().map(e -> e.getKey() + ":"
+ e.getValue()).collect(Collectors.joining(", "));
type = "ALTER DB";
} else if (event instanceof DropDatabaseEvent) {
DropDatabaseEvent dde = (DropDatabaseEvent) event;
identifier = "DB:" + dde.databaseName();
detail = "null";
type = "DELETE DB";
} else if (event instanceof CreateTableEvent) {
CreateTableEvent cte = (CreateTableEvent) event;
identifier = "TBL:" + cte.identifier().toString();
detail = cte.table().toString();
type = "CREATE TBL";
} else if (event instanceof AlterTableEvent) {
AlterTableEvent ate = (AlterTableEvent) event;
identifier = "TBL:" + ate.identifier().toString();
detail = ate.newTable().toString();
type = "ALTER TBL";
} else if (event instanceof DropTableEvent) {
DropTableEvent dte = (DropTableEvent) event;
identifier = "TBL:" + dte.identifier().toString();
detail = dte.table().toString();
type = "DELETE TBL";
} else {
throw new IllegalArgumentException("Unknown event type.");
}
statement.setObject(1, catalog);
statement.setObject(2, identifier);
statement.setObject(3, type);
statement.setObject(4, detail);
statement.execute();
}
} catch (Exception e) {
throw new IllegalArgumentException(e);
}
}
private String getDriverClassName() {
try {
Class.forName("com.mysql.cj.jdbc.Driver");
return "com.mysql.cj.jdbc.Driver";
} catch (ClassNotFoundException e) {
return "com.mysql.jdbc.Driver";
}
}
} {code}
3.Add file `org.apache.flink.table.factories.Factory` to the resources.
4.Package the code and put it in `lib`.
5.Add `table.catalog-modification.listeners: test` to `flink-conf.yaml`.
6.Start sql client and test. The test result as follows.
!sqls.png!
!result.png!
It seems good when using by the sql client. And I think we should add the Step
3 and 4 to docs.
Besides that, I think we should describe how to provide parameters for the
listener in docs.
> Release Testing: Verify FLIP-294: Support Customized Catalog Modification
> Listener
> ----------------------------------------------------------------------------------
>
> Key: FLINK-32798
> URL: https://issues.apache.org/jira/browse/FLINK-32798
> Project: Flink
> Issue Type: Sub-task
> Components: Tests
> Affects Versions: 1.18.0
> Reporter: Qingsheng Ren
> Priority: Major
> Fix For: 1.18.0
>
> Attachments: result.png, sqls.png, test.png
>
>
> The document about catalog modification listener is:
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#catalog-modification-listener
--
This message was sent by Atlassian Jira
(v8.20.10#820010)