[
https://issues.apache.org/jira/browse/FLINK-35674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Thorne updated FLINK-35674:
---------------------------
Summary: MySQL connector cause blocking when searching for binlog file‘s
timestamp (was: MySQL connector cause blocking when searching for binlog
timestamps)
> MySQL connector cause blocking when searching for binlog file‘s timestamp
> -------------------------------------------------------------------------
>
> Key: FLINK-35674
> URL: https://issues.apache.org/jira/browse/FLINK-35674
> Project: Flink
> Issue Type: Bug
> Components: Flink CDC
> Affects Versions: cdc-3.1.1
> Environment: flink-cdc-3.1.x
> Reporter: Thorne
> Priority: Blocker
> Labels: pull-request-available
> Fix For: cdc-3.2.0
>
> Attachments: A7AE0D63-365D-4572-B63D-96DF5F096BF9.png,
> BF180441-9C61-40eb-B07C-A11F8BCEC2D0.png,
> FBA32597-8783-4678-B391-E450148C1B30.png
>
>
> When a task is started by multiple mysql connector timestamp start mode at
> the same time, when searching for binlog timestamp, there will be task
> blocking problem, which may cause source to be unable to obtain data all the
> time.
>
> 1、 I have four tables(products,orders,orders_copy,shipments,)to capture in a
> task . For these four tables, I made a lot of binlogs,such as 10 million。
> 2、I try start it with timestamp mode and the products table could not get any
> records .
> !FBA32597-8783-4678-B391-E450148C1B30.png|width=550,height=264!
> 3、I try start it with timestamp mode ,but the orders_copy table could not
> get any records
> !BF180441-9C61-40eb-B07C-A11F8BCEC2D0.png|width=557,height=230!
> 3、I debug code and find some problems
> {code:java}
> # Class: org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils
> private static String searchBinlogName(
> BinaryLogClient client, long targetMs, List<String> binlogFiles)
> throws IOException, InterruptedException {
> int startIdx = 0;
> int endIdx = binlogFiles.size() - 1;
> while (startIdx <= endIdx) {
> int mid = startIdx + (endIdx - startIdx) / 2;
> long midTs = getBinlogTimestamp(client, binlogFiles.get(mid));
> if (midTs < targetMs) {
> startIdx = mid + 1;
> } else if (targetMs < midTs) {
> endIdx = mid - 1;
> } else {
> return binlogFiles.get(mid);
> }
> }
> return endIdx < 0 ? binlogFiles.get(0) : binlogFiles.get(endIdx);
> }
> private static long getBinlogTimestamp(BinaryLogClient client, String
> binlogFile)
> throws IOException, InterruptedException {
> ArrayBlockingQueue<Long> binlogTimestamps = new ArrayBlockingQueue<>(1);
> BinaryLogClient.EventListener eventListener =
> event -> {
> EventData data = event.getData();
> if (data instanceof RotateEventData) {
> // We skip RotateEventData because it does not contain
> the timestamp we are
> // interested in.
> return;
> }
> EventHeaderV4 header = event.getHeader();
> long timestamp = header.getTimestamp();
> if (timestamp > 0) {
> binlogTimestamps.offer(timestamp);
> try {
> client.disconnect();
> } catch (IOException e) {
> throw new RuntimeException(e);
> }
> }
> };
> try {
> client.registerEventListener(eventListener);
> client.setBinlogFilename(binlogFile);
> client.setBinlogPosition(0);
> LOG.info("begin parse binlog: {}", binlogFile);
> client.connect();
> } finally {
> client.unregisterEventListener(eventListener);
> }
> return binlogTimestamps.take();
> }{code}
> 5、 the funciton binlogTimestamps.take() is blocking until the queue has
> records.
> 6、the binlogTimestamps queue is always blocking and cannot get any data.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)