ruanhang1993 commented on code in PR #3463:
URL: https://github.com/apache/flink-cdc/pull/3463#discussion_r1914284429
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java:
##########
@@ -180,6 +199,7 @@ public void close() {
statefulTaskContext.getBinaryLogClient().disconnect();
}
Review Comment:
We need to delete the previous close invokes for statefulTaskContext.
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java:
##########
@@ -86,6 +92,19 @@ public class BinlogSplitReader implements
DebeziumReader<SourceRecords, MySqlSpl
private static final long READER_CLOSE_TIMEOUT = 30L;
+ public BinlogSplitReader(MySqlSourceConfig sourceConfig, int subTaskId) {
+ final MySqlConnection jdbcConnection =
createMySqlConnection(sourceConfig);
+ final BinaryLogClient binaryLogClient =
+ createBinaryClient(sourceConfig.getDbzConfiguration());
+ this.statefulTaskContext =
+ new StatefulTaskContext(sourceConfig, binaryLogClient,
jdbcConnection);
+ ThreadFactory threadFactory =
+ new ThreadFactoryBuilder().setNameFormat("binlog-reader-" +
subTaskId).build();
+ this.executorService =
Executors.newSingleThreadExecutor(threadFactory);
+ this.currentTaskRunning = true;
+ this.pureBinlogPhaseTables = new HashSet<>();
Review Comment:
```suggestion
this(new StatefulTaskContext(...), subtaskId);
```
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java:
##########
@@ -180,6 +199,7 @@ public void close() {
statefulTaskContext.getBinaryLogClient().disconnect();
}
+ statefulTaskContext.close();
Review Comment:
```suggestion
if (statefulTaskContext != null) {
statefulTaskContext.close();
}
```
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java:
##########
@@ -86,6 +92,19 @@ public class BinlogSplitReader implements
DebeziumReader<SourceRecords, MySqlSpl
private static final long READER_CLOSE_TIMEOUT = 30L;
+ public BinlogSplitReader(MySqlSourceConfig sourceConfig, int subTaskId) {
Review Comment:
subTaskId => subtaskId
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java:
##########
@@ -378,15 +403,7 @@ private void setReadException(Throwable throwable) {
public void close() {
try {
stopCurrentTask();
- if (statefulTaskContext.getConnection() != null) {
- statefulTaskContext.getConnection().close();
- }
- if (statefulTaskContext.getBinaryLogClient() != null) {
- statefulTaskContext.getBinaryLogClient().disconnect();
- }
- if (statefulTaskContext.getDatabaseSchema() != null) {
- statefulTaskContext.getDatabaseSchema().close();
- }
+ statefulTaskContext.close();
Review Comment:
```suggestion
if (statefulTaskContext != null) {
statefulTaskContext.close();
}
```
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java:
##########
@@ -93,6 +99,26 @@ public class SnapshotSplitReader implements
DebeziumReader<SourceRecords, MySqlS
private static final long READER_CLOSE_TIMEOUT = 30L;
+ public SnapshotSplitReader(
+ MySqlSourceConfig sourceConfig, int subtaskId, SnapshotPhaseHooks
hooks) {
+ final MySqlConnection jdbcConnection =
createMySqlConnection(sourceConfig);
+ final BinaryLogClient binaryLogClient =
+ createBinaryClient(sourceConfig.getDbzConfiguration());
+ this.statefulTaskContext =
+ new StatefulTaskContext(sourceConfig, binaryLogClient,
jdbcConnection);
+ ThreadFactory threadFactory =
+ new ThreadFactoryBuilder()
+ .setNameFormat("debezium-reader-" + subtaskId)
+ .setUncaughtExceptionHandler(
+ (thread, throwable) ->
setReadException(throwable))
+ .build();
+ this.executorService =
Executors.newSingleThreadExecutor(threadFactory);
+ this.hooks = hooks;
+ this.currentTaskRunning = false;
+ this.hasNextElement = new AtomicBoolean(false);
+ this.reachEnd = new AtomicBoolean(false);
Review Comment:
```suggestion
this(new StatefulTaskContext(...), subtaskId, hooks);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]