Flink CDC Issue Import created FLINK-34809:
----------------------------------------------
Summary: [mysql] Add notifications to Slack when the Snapshot
phase ends or Binlog stream phase begins
Key: FLINK-34809
URL: https://issues.apache.org/jira/browse/FLINK-34809
Project: Flink
Issue Type: Improvement
Components: Flink CDC
Reporter: Flink CDC Issue Import
### Search before asking
- [X] I searched in the
[issues|https://github.com/ververica/flink-cdc-connectors/issues] and found
nothing similar.
### Motivation
On our team, we use Flink CDC to perform MySql CDC.
Since [there is no Snapshot Only mode for MySql
yet|https://github.com/ververica/flink-cdc-connectors/issues/1687], we had a
need to be notified when a snapshot is completed and when the binlog stream is
started.
To accomplish this, we **implemented a notification when a snapshot ends and
when a binlog stream starts with GTIDs.**
---
Here's the team's use case in more detail
1. We set parallelism to 2 or more for large tables.
2. And we send change event log to kafka to use Debezium's JDBC Sink Connector,
which supports [Schema
Evolution|https://debezium.io/documentation/reference/stable/connectors/jdbc.html#jdbc-schema-evolution).
3. Sinking to Kafka is slower than MySqlSource operator, so we give more
paralleisms to sink operator more parallelism than MySqlSource.
4. In this case, the transfer is done in rebalance mode from source operator to
sink operator, so the order for the same PK is not guaranteed when transferring
binlogs.
5. So we restart the job based on GTIDs with parallelism equal to 1 at the end
of the snapshot phase .
To do this, we needed (1) to be notified that the snapshot ended and the binlog
stream started, and (2) to know from which GTIDs the binlog stream started.
---
This is portion of our code. We **assumed that we only capture 1 table per
flink cdc job.**
```scala
def getMySQLSourceOperator(]: MySqlSource[String] = {
MySqlSource.builder[String|)
.hostname(mySqlConfig.host)
.port(mySqlConfig.port)
.serverTimeZone(mySqlConfig.timeZone)
.databaseList(mySqlConfig.database)
.tableList(mySqlConfig.table)
.username(mySqlConfig.user)
.serverId(mySqlConfig.serverIdRange)
.password(mySqlConfig.password)
.startupOptions(mySqlConfig.startupMode)
.fetchSize(mySqlConfig.fetchSize)
.splitSize(mySqlConfig.splitSize)
.chunkKeyColumn(new ObjectPath(mySqlConfig.database, mySqlConfig.table),
mySqlConfig.chunkKeyColumn)
.connectionPoolSize(mySqlConfig.poolSize)
.scanNewlyAddedTableEnabled(false)
.includeSchemaChanges(false)
.debeziumProperties(mySqlConfig.dbzProps)
.closeIdleReaders(true)
.notifySnapshotToBinlogSwitch("slack-hook-url") // here what we
implemented
.deserializer(new JsonDebeziumDeserializationSchema(true,
mySqlConfig.jsonConverterProps))
.build()
}
```
---
We can't share a real picture of the notification, because our company
recommends using in-house tools rather than Slack(🥲🥲🥲] and has some security
policy.
But it looks something like the format below!
- Snapshot finished notifiaction.
```
[SNAPSHOT FINISHED]
Database: test_database
Table: test_table
```
- Binlog stream start notification.
```
[BINLOG STREAM START]
Database: test_database
Table: test_table
GTIDs:
3bda59bb-2fc8-11eb-855f-fa163e2550e3:1-128377129,3c3a6a1b-c931-11ed-b0db-b4055dec129e:1-273703345,901d637c-8add-11eb-8e3f-b4055d3355a6:1-3641352422,b46b8251-5254-11ed-a648-d0946637df48:1-1069556331,db449f07-c53e-11e8-b8c2-d094663d3d1d:1-3543229125,e8d62f95-c77a-11e8-9270-d0946637df48:1-5715021533
```
### Solution
Our implementation and PR is here.
https://github.com/ververica/flink-cdc-connectors/pull/2453
### Alternatives
_No response_
### Anything else?
_No response_
### Are you willing to submit a PR?
- [X] I'm willing to submit a PR!
---------------- Imported from GitHub ----------------
Url: https://github.com/apache/flink-cdc/issues/2454
Created by: [SML0127|https://github.com/SML0127]
Labels: enhancement,
Created at: Sat Sep 02 14:34:50 CST 2023
State: open
--
This message was sent by Atlassian Jira
(v8.20.10#820010)