kaori-seasons commented on issue #617:
URL:
https://github.com/apache/doris-flink-connector/issues/617#issuecomment-3475720259
您好,我认真看完了您的问题
## 一、问题根本原因分析
### 1.1 错误的核心原因
您遇到的问题本质在于:**通过文件中转破坏了CDC事件流的完整性**。
从错误日志可以看到,这是一个**数据变更事件**(`op="r"` 表示snapshot read操作),但代码尝试从中提取
`tableChanges` 字段。问题在于
`tableChanges` 字段**只存在于DDL schema
change事件中**,普通的数据变更事件(INSERT/UPDATE/DELETE/READ)不包含此字段。
### 1.2 事件区分机制
`JsonDebeziumSchemaSerializer` 通过以下逻辑区分事件类型:
- **有 `op` 字段** → 数据变更事件(INSERT/UPDATE/DELETE/READ)
- **无 `op` 字段** → Schema change事件(DDL)
您的方案将两种事件混在同一个文件中,导致处理逻辑混乱。
### 1.3 架构缺陷
```mermaid
graph LR
A[MySQL CDC Source] -->|包含DDL+DML| B[写入文件]
B --> C[FTP传输]
C --> D[内网读取文件]
D -->|无法区分事件类型| E[Doris Sink报错]
style E fill:#f88,stroke:#f00
```
---
## 二、生产可用解决方案(推荐优先级排序)
### 方案A:通过跳板机实现直连
这是**最佳方案**,使用Doris-Flink-Connector内置的 `DatabaseSync` 工具进行整库同步。
#### A.1 架构设计
```mermaid
graph TB
subgraph 外网区域
MySQL[(MySQL)]
end
subgraph 跳板机/代理
Proxy[SSH Tunnel/Socks代理]
end
subgraph 内网区域
Flink[Flink Job<br/>DatabaseSync]
Doris[(Doris)]
end
MySQL -.->|SSH/VPN| Proxy
Proxy -->|转发| Flink
Flink -->|直写| Doris
```
#### A.2 实现代码
使用 `MysqlDatabaseSync` 工具类:
**完整代码示例**(通过SSH隧道):
```java
public class MySQLToDorisSync {
public static void main(String[] args) throws Exception {
// 1. 先建立SSH隧道映射MySQL端口
// ssh -L 3307:mysql-host:3306 user@jump-server
// 2. 配置同步参数
Map<String, String> config = new HashMap<>();
// MySQL配置(通过隧道连接)
config.put("mysql-conf.hostname", "localhost"); // 通过隧道
config.put("mysql-conf.port", "3307"); // 本地映射端口
config.put("mysql-conf.username", "root");
config.put("mysql-conf.password", "123456");
config.put("mysql-conf.database-name", "test");
config.put("mysql-conf.scan-startup-mode", "initial");
// Doris配置
config.put("sink.fenodes", "127.0.0.1:8030");
config.put("sink.username", "root");
config.put("sink.password", "123456");
config.put("sink.jdbc-url", "jdbc:mysql://127.0.0.1:9030");
// 同步配置
config.put("sink.sink.batch.size", "10000");
config.put("sink.sink.max-retries", "3");
config.put("sink.sink.batch.interval", "10s");
// 表过滤(可选)
config.put("including-tables", "test\\..*");
// Schema change模式(推荐SQL_PARSER)
config.put("schema-change-mode", "sql_parser");
// 3. 创建并执行同步任务
Configuration configuration = Configuration.fromMap(config);
MysqlDatabaseSync sync = new MysqlDatabaseSync();
sync.setConfig(configuration);
sync.create();
sync.build();
}
}
```
#### A.3 网络连接方案
**选项1:SSH隧道**
```bash
# 在内网Flink机器上执行
ssh -N -L 3307:mysql-real-host:3306 user@jump-server
```
**选项2:SOCKS代理**
```java
// 在Flink任务启动参数中设置
-DsocksProxyHost=proxy-host
-DsocksProxyPort=1080
```
**选项3:VPN专线**(最稳定)
- 企业级VPN打通网络
- 直接使用真实MySQL地址
#### A.4 核心优势
- **自动建表**:根据MySQL schema自动在Doris创建表
- **Schema演化**:自动同步DDL变更(ADD/DROP/MODIFY COLUMN)
- **事务一致性**:保证数据一致性
- **断点续传**:支持checkpoint恢复
- **生产稳定**:官方推荐方案
---
### 方案B:改进文件传输方案
如果无法建立直连,可以优化文件传输架构。
#### B.1 核心思路:分离Schema Change和Data Change
```mermaid
graph LR
A[MySQL CDC] -->|判断事件类型| B{有op字段?}
B -->|是| C[数据变更文件]
B -->|否| D[DDL变更文件]
C --> E[FTP传输]
D --> E
E --> F[内网处理]
F -->|先处理DDL| G[创建/修改表]
F -->|后处理数据| H[写入数据]
```
#### B.2 实现代码
**步骤1:生成文件时分离事件**
```java
public class MySQLCDCToFileWithSeparation {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(3000);
// 配置CDC Source
Map<String, Object> customConverterConfigs = new HashMap<>();
customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG,
"numeric");
JsonDebeziumDeserializationSchema schema =
new JsonDebeziumDeserializationSchema(true,
customConverterConfigs);
MySqlSource<String> mySqlSource =
MySqlSource.<String>builder()
.hostname("127.0.0.1")
.port(23306)
.databaseList("test")
.tableList("test\\..*")
.username("root")
.password("123456")
.debeziumProperties(DateToStringConverter.DEFAULT_PROPS)
.deserializer(schema)
.serverTimeZone("Asia/Shanghai")
.includeSchemaChanges(true)
.build();
DataStream<String> cdcStream = env
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(),
"MySQL Source")
.setParallelism(2);
// 分流:根据是否有op字段判断
OutputTag<String> ddlTag = new OutputTag<String>("ddl-events"){};
SingleOutputStreamOperator<String> dataStream = cdcStream.process(
new ProcessFunction<String, String>() {
@Override
public void processElement(String value, Context ctx,
Collector<String> out) {
try {
JsonNode node = new ObjectMapper().readTree(value);
if (node.has("op") && node.get("op") != null) {
// 数据变更事件
out.collect(value);
} else {
// DDL事件
ctx.output(ddlTag, value);
}
} catch (Exception e) {
// 容错处理
out.collect(value);
}
}
}
);
// DDL事件单独输出
DataStream<String> ddlStream = dataStream.getSideOutput(ddlTag);
// 1. 数据变更写入data目录
FileSink<String> dataSink = FileSink
.forRowFormat(
new Path("D:/data/mysql_cdc/data/"),
new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(1))
.withInactivityInterval(Duration.ofSeconds(30))
.withMaxPartSize(1024 * 1024 * 128)
.build())
.build();
// 2. DDL事件写入ddl目录
FileSink<String> ddlSink = FileSink
.forRowFormat(
new Path("D:/data/mysql_cdc/ddl/"),
new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(5))
.withInactivityInterval(Duration.ofSeconds(60))
.build())
.build();
dataStream.sinkTo(dataSink).setParallelism(1);
ddlStream.sinkTo(ddlSink).setParallelism(1);
env.execute("MySQL CDC to Separated Files");
}
}
```
**步骤2:内网分别处理**
```java
public class SeparatedFilesToDoris {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10000);
// 配置Doris连接
DorisOptions dorisOptions = DorisOptions.builder()
.setFenodes("127.0.0.1:8030")
.setTableIdentifier("")
.setUsername("root")
.setPassword("123456")
.build();
// ========== 任务1:处理DDL事件(优先级高) ==========
FileSource<String> ddlSource = FileSource
.forRecordStreamFormat(new TextLineInputFormat(),
new Path("D:/data/mysql_cdc/ddl/"))
.monitorContinuously(Duration.ofSeconds(5))
.build();
DataStream<String> ddlStream = env.fromSource(
ddlSource,
WatermarkStrategy.noWatermarks(),
"DDL Source"
);
// DDL专用Sink配置
DorisSink.Builder<String> ddlSinkBuilder = DorisSink.builder();
ddlSinkBuilder.setDorisReadOptions(DorisReadOptions.builder().build())
.setDorisExecutionOptions(DorisExecutionOptions.builder()
.setLabelPrefix("ddl-" + UUID.randomUUID())
.setStreamLoadProp(new Properties())
.build())
.setDorisOptions(dorisOptions)
.setSerializer(
JsonDebeziumSchemaSerializer.builder()
.setDorisOptions(dorisOptions)
.setNewSchemaChange(true)
.setSchemaChangeMode("sql_parser") //
推荐使用SQL Parser
.build());
ddlStream.sinkTo(ddlSinkBuilder.build()).name("DDL Processor");
// ========== 任务2:处理数据变更事件 ==========
FileSource<String> dataSource = FileSource
.forRecordStreamFormat(new TextLineInputFormat(),
new Path("D:/data/mysql_cdc/data/"))
.monitorContinuously(Duration.ofSeconds(5))
.build();
DataStream<String> dataStream = env.fromSource(
dataSource,
WatermarkStrategy.noWatermarks(),
"Data Source"
);
// 数据专用Sink配置
Properties props = new Properties();
props.setProperty("format", "json");
props.setProperty("read_json_by_line", "true");
DorisSink.Builder<String> dataSinkBuilder = DorisSink.builder();
dataSinkBuilder.setDorisReadOptions(DorisReadOptions.builder().build())
.setDorisExecutionOptions(DorisExecutionOptions.builder()
.setLabelPrefix("data-" + UUID.randomUUID())
.setStreamLoadProp(props)
.setDeletable(true)
.build())
.setDorisOptions(dorisOptions)
.setSerializer(
JsonDebeziumSchemaSerializer.builder()
.setDorisOptions(dorisOptions)
.setNewSchemaChange(true)
.build());
dataStream.sinkTo(dataSinkBuilder.build()).name("Data Processor");
env.execute("Separated Files to Doris");
}
}
```
#### B.3 方案优势与劣势
**优势**:
- 支持自动建表和schema演化
- 事件处理逻辑清晰
- 可以优先处理DDL保证表存在
**劣势**:
- 需要维护两套文件流
- 时序可能出现问题(数据到了但表还没创建)
- 复杂度较高
---
### 方案C:预建表 + 纯数据同步
如果schema相对稳定,可以预先创建表。
#### C.1 实现步骤
**步骤1:一次性建表**
```java
public class InitialTableCreator {
public static void main(String[] args) throws Exception {
// 使用DatabaseSync的create-table-only模式
Map<String, String> config = new HashMap<>();
config.put("mysql-conf.hostname", "localhost");
config.put("mysql-conf.port", "3307"); // 通过隧道临时连接
config.put("mysql-conf.username", "root");
config.put("mysql-conf.password", "123456");
config.put("mysql-conf.database-name", "test");
config.put("sink.fenodes", "127.0.0.1:8030");
config.put("sink.username", "root");
config.put("sink.password", "123456");
// 关键:只建表不同步数据
config.put("create-table-only", "true");
Configuration configuration = Configuration.fromMap(config);
MysqlDatabaseSync sync = new MysqlDatabaseSync();
sync.setConfig(configuration);
sync.create();
sync.build();
}
}
```
**步骤2:修改CDC为纯数据模式**
```java
// 生成文件时:设置 includeSchemaChanges(false)
MySqlSource<String> mySqlSource =
MySqlSource.<String>builder()
// ... 其他配置
.includeSchemaChanges(false) // 不包含DDL
.build();
```
**步骤3:使用简化的Doris Sink**
使用 `JsonDebeziumDataChange` 仅处理数据变更:
#### C.2 适用场景
- Schema变更不频繁
- 可接受手动DDL变更
- 追求方案简单性
- **不支持**:自动schema演化
---
### 方案D:Kafka中转
使用Kafka替代文件传输。
#### D.1 架构
```mermaid
graph LR
A[外网MySQL CDC] --> B[Kafka<br/>外网]
B --> C[镜像同步]
C --> D[Kafka<br/>内网]
D --> E[Flink消费]
E --> F[Doris]
```
#### D.2 优势
- 天然支持事件顺序
- 支持断点续传
- 高可用性
- 实时性更好
**配置示例**:
```java
// 外网写入Kafka
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
"mysql-cdc-topic",
new SimpleStringSchema(),
props
);
cdcStream.addSink(kafkaProducer);
// 内网从Kafka消费
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
"mysql-cdc-topic",
new SimpleStringSchema(),
props
);
DataStream<String> stream = env.addSource(kafkaConsumer);
```
---
## 三、关键配置说明
### 3.1 Schema Change模式选择
**推荐使用 `SQL_PARSER` 模式**:
- 直接解析DDL语句
- 支持更复杂的DDL操作
- 准确性更高
### 3.2 关键参数配置
```java
DorisExecutionOptions executionOptions = DorisExecutionOptions.builder()
.setLabelPrefix("doris-" + UUID.randomUUID())
.setStreamLoadProp(props)
.setDeletable(true) // 支持DELETE操作
.setIgnoreUpdateBefore(true) // 忽略UPDATE的before值
.setBatchSize(10000) // 批次大小
.setBatchIntervalMs(10000) // 批次间隔
.setMaxRetries(3) // 重试次数
.build();
```
---
## 四、生产环境最佳实践
### 4.1 监控告警
```java
// 添加自定义Metrics
env.getConfig().setLatencyTrackingInterval(1000L);
// 监控指标
- 处理延迟
- 失败重试次数
- Schema change成功率
- 数据写入TPS
```
### 4.2 容错配置
```java
env.enableCheckpointing(60000); // 1分钟checkpoint
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
env.getCheckpointConfig().setCheckpointTimeout(600000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
```
### 4.3 表设计建议
```sql
-- Doris建表时使用Unique Key模型(推荐)
CREATE TABLE test.students (
student_id INT,
first_name VARCHAR(100),
last_name VARCHAR(100),
...
)
UNIQUE KEY(student_id)
DISTRIBUTED BY HASH(student_id) BUCKETS 10;
```
所以我认为您应该想实现**Schema
Change处理**,如果必须使用您的架构,需要实现**方案B(事件分离)**,将DDL和DML事件分开处理,确保DDL先执行
**配置关键点**:
- 使用 `includeSchemaChanges(true)` 才能捕获DDL
- 设置 `schema-change-mode` 为 `sql_parser` 获得最佳兼容性
- 必须保证DDL事件在相应数据变更事件之前处理
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]