[
https://issues.apache.org/jira/browse/FLINK-35674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Thorne updated FLINK-35674:
---------------------------
Description:
When a task is started by multiple mysql connector timestamp start mode at the
same time, when searching for binlog timestamp, there will be task blocking
problem, which may cause source to be unable to obtain data all the time.
1、 I have four tables(products,orders,orders_copy,shipments,)to capture in a
task . For these four tables, I made a lot of binlogs.
2、I try start it with timestamp mode and the products table cannot get any
records
!FBA32597-8783-4678-B391-E450148C1B30.png!
3、I try start it with timestamp mode ,but the orders_copy table cannot get
any records
!BF180441-9C61-40eb-B07C-A11F8BCEC2D0.png!
3、I debug code and find some problems
{code:java}
# Class: org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils
private static String searchBinlogName(
BinaryLogClient client, long targetMs, List<String> binlogFiles)
throws IOException, InterruptedException {
int startIdx = 0;
int endIdx = binlogFiles.size() - 1;
while (startIdx <= endIdx) {
int mid = startIdx + (endIdx - startIdx) / 2;
long midTs = getBinlogTimestamp(client, binlogFiles.get(mid));
if (midTs < targetMs) {
startIdx = mid + 1;
} else if (targetMs < midTs) {
endIdx = mid - 1;
} else {
return binlogFiles.get(mid);
}
}
return endIdx < 0 ? binlogFiles.get(0) : binlogFiles.get(endIdx);
}
private static long getBinlogTimestamp(BinaryLogClient client, String
binlogFile)
throws IOException, InterruptedException {
ArrayBlockingQueue<Long> binlogTimestamps = new ArrayBlockingQueue<>(1);
BinaryLogClient.EventListener eventListener =
event -> {
EventData data = event.getData();
if (data instanceof RotateEventData) {
// We skip RotateEventData because it does not contain the
timestamp we are
// interested in.
return;
}
EventHeaderV4 header = event.getHeader();
long timestamp = header.getTimestamp();
if (timestamp > 0) {
binlogTimestamps.offer(timestamp);
try {
client.disconnect();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
};
try {
client.registerEventListener(eventListener);
client.setBinlogFilename(binlogFile);
client.setBinlogPosition(0);
LOG.info("begin parse binlog: {}", binlogFile);
client.connect();
} finally {
client.unregisterEventListener(eventListener);
}
return binlogTimestamps.take();
}{code}
4、 the
was:
When a task is started by multiple mysql connector timestamp start mode at the
same time, when searching for binlog timestamp, there will be task blocking
problem, which may cause source to be unable to obtain data all the time.
> MySQL connector may cause deadlock issues when searching for binlog timestamps
> ------------------------------------------------------------------------------
>
> Key: FLINK-35674
> URL: https://issues.apache.org/jira/browse/FLINK-35674
> Project: Flink
> Issue Type: Bug
> Components: Flink CDC
> Affects Versions: cdc-3.1.1
> Environment: flink-cdc-3.1.x
> Reporter: Thorne
> Priority: Blocker
> Fix For: cdc-3.2.0
>
> Attachments: A7AE0D63-365D-4572-B63D-96DF5F096BF9.png,
> BF180441-9C61-40eb-B07C-A11F8BCEC2D0.png,
> FBA32597-8783-4678-B391-E450148C1B30.png
>
>
> When a task is started by multiple mysql connector timestamp start mode at
> the same time, when searching for binlog timestamp, there will be task
> blocking problem, which may cause source to be unable to obtain data all the
> time.
>
> 1、 I have four tables(products,orders,orders_copy,shipments,)to capture in a
> task . For these four tables, I made a lot of binlogs.
> 2、I try start it with timestamp mode and the products table cannot get any
> records
> !FBA32597-8783-4678-B391-E450148C1B30.png!
> 3、I try start it with timestamp mode ,but the orders_copy table cannot get
> any records
> !BF180441-9C61-40eb-B07C-A11F8BCEC2D0.png!
> 3、I debug code and find some problems
> {code:java}
> # Class: org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils
> private static String searchBinlogName(
> BinaryLogClient client, long targetMs, List<String> binlogFiles)
> throws IOException, InterruptedException {
> int startIdx = 0;
> int endIdx = binlogFiles.size() - 1;
> while (startIdx <= endIdx) {
> int mid = startIdx + (endIdx - startIdx) / 2;
> long midTs = getBinlogTimestamp(client, binlogFiles.get(mid));
> if (midTs < targetMs) {
> startIdx = mid + 1;
> } else if (targetMs < midTs) {
> endIdx = mid - 1;
> } else {
> return binlogFiles.get(mid);
> }
> }
> return endIdx < 0 ? binlogFiles.get(0) : binlogFiles.get(endIdx);
> }
> private static long getBinlogTimestamp(BinaryLogClient client, String
> binlogFile)
> throws IOException, InterruptedException {
> ArrayBlockingQueue<Long> binlogTimestamps = new ArrayBlockingQueue<>(1);
> BinaryLogClient.EventListener eventListener =
> event -> {
> EventData data = event.getData();
> if (data instanceof RotateEventData) {
> // We skip RotateEventData because it does not contain
> the timestamp we are
> // interested in.
> return;
> }
> EventHeaderV4 header = event.getHeader();
> long timestamp = header.getTimestamp();
> if (timestamp > 0) {
> binlogTimestamps.offer(timestamp);
> try {
> client.disconnect();
> } catch (IOException e) {
> throw new RuntimeException(e);
> }
> }
> };
> try {
> client.registerEventListener(eventListener);
> client.setBinlogFilename(binlogFile);
> client.setBinlogPosition(0);
> LOG.info("begin parse binlog: {}", binlogFile);
> client.connect();
> } finally {
> client.unregisterEventListener(eventListener);
> }
> return binlogTimestamps.take();
> }{code}
>
> 4、 the
--
This message was sent by Atlassian Jira
(v8.20.10#820010)