[ 
https://issues.apache.org/jira/browse/NIFI-12812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846189#comment-17846189
 ] 

Joe Witt commented on NIFI-12812:
---------------------------------

User xiyang from apache slack states:
I used CaptureChangeMySQL 1.23.3 for the consumption of binlogs. In the long 
running process, we found that there will often be stagnation (we did the 
monitoring, extracted the binlog position of the main database and the 
comparison of binlog.position), Delayed consumption or backlog occurs; Or in 
the case of a large number of updates and inserts, binlog.filename may be 
backlogged by 10 or more. Is there any way to optimize this, thanks!



We analyze that setting Events Per FlowFile too small will cause binlog 
consumption backlog, setting Events Per FlowFile too small will cause data to 
be delayed indefinitely, which is not acceptable in cdc. When the Events Per 
FlowFile is set to 150, the data backlog can reach 2 minutes, which is a 
serious problem that we hope can be fixed


After nearly two months of observation, we found that there will be a 
phenomenon when the consumption binlog is stuck, the running thread identifier 
on the CaptureChangeMySQL processor is always 1, theoretically the running 
thread identifier should disappear soon, when we find the binlog backlog, the 
running thread identifier will always be there, at this time, When we look at 
CaptureChangeMySQL consumption capabilities, it's very slow. I'm not sure 
what's wrong with it. When we stop the processor, or replace it with another 
processor, it runs for a long time, which confuses me.


CPU:
Architecture:          x86_64
CPU op-mode(s):        32-bit, 64-bit
Byte Order:            Little Endian
CPU(s):                16
On-line CPU(s) list:   0-15
Thread(s) per core:    2
Core(s) per socket:    8
Socket(s):             1
NUMA node(s):          1
Vendor ID:             AuthenticAMD
CPU family:            25
Model:                 1
Model name:            AMD EPYC 7T83 64-Core Processor
Stepping:              1
CPU MHz:               2545.218
BogoMIPS:              5090.43
Hypervisor vendor:     KVM
Virtualization type:   full
L1d cache:             32K
L1i cache:             32K
L2 cache:              512K
L3 cache:              32768K
NUMA node0 CPU(s):     0-15
memory
32G
java.arg.2=-Xms1g
java.arg.3=-Xmx16g
disk
200G SSD * 2

------NiFi BinLog monitor alarm ------
NiFi data synchronization is abnormal!!
Corresponding error message type: 2
Corresponding error message: binlog Position backlog absolute value exceeds 
configuration value :(10000000), backlog more than 10 times, please check!
Processor Id: f14230db-ba78-106d-973f-e90bd03f0a28
Processor Name: Advertising link -cdc
mysql: mysql-bin.262489
Corresponding point in the processor: mysql-bin.262489
Backlog value: 99644822
Current error accumulation: 11
Notice time: 2024-04-24 00:18:00
This is yesterday's alarm information, let me explain: the binlog file of the 
current processor and the database is the same, the backlog value is 99644822, 
it will be compared every 1 minute, if more than 10000000, +1, when more than 
10 times, send alarm information; We can see that the current backlog is 11 
times, which is 11 minutes, and the consumption rate of nifi has not caught up 
with the production rate of mysql. But when we stop the processor and change 
the value of Events Per FlowFile symbolically (from 5 to 10, or whatever), it 
runs normally for a while. Until the next backlog occurs

 I initially suspected a thread related problem, but I'm not sure which step, 
when a processor will be called to run in NIFI, when the UI shows a thread 
identifier on the processor, this processor should be running at this time, but 
when the thread identifier in UI is always 1, where will the processor be 
stuck? If we solve this problem, we will find the truth. (Forgive me if I don't 
know much about how NIFI works, or if it has anything to do with Penalty 
Duration and Yield Duration.)






@Joe Witt
 The problem is repeated, and the relevant log is obtained in time. Let me 
describe the file. The file with restart-after is the log information of the 
restart after the problem (restarting the processor, the task is normal), and 
the file without restart-after is the log information obtained when the problem 
occurs. Problems of the database instance is 
pxc-hzr66p3487fwri.polarx.rds.aliyuncs.com, seemingly a lock, but I am not sure 
what causes it.(The number after each file, representing minutes, is executed 
every minute)

xiyang
  19 hours ago
public void onEvent(Event event) {
    RawBinlogEvent ep = new RawBinlogEvent(event, client.getBinlogFilename());
    try {
        while (!stopNow.get()) {
            if (queue.offer(ep, QUEUE_OFFER_TIMEOUT_MSEC, 
TimeUnit.MILLISECONDS)) {
                return;
            }
        }

        logger.info("Stopped while waiting to enqueue event");
    } catch (InterruptedException e) {
        logger.warn("Interrupted while adding event to the queue", e);
    }
}
It looks like the queue is blocked, the method can't return, and the main 
thread is stuck

When offer can't return true, return can't be executed, so thread keeps running 
here, I'm still looking at what causes BlockingQueue<RawBinlogEvent> can't offer

try {
    outputEvents(currentSession, context, log);
} catch (Exception eventException) {
    getLogger().error("Exception during event processing at file={} pos={}", 
currentDataCaptureState.getBinlogFile(), 
currentDataCaptureState.getBinlogPosition(), eventException);
    try {
        // Perform some processor-level "rollback", then rollback the session
        binlogResourceInfo.setInTransaction(false);
        stop();
    } catch (Exception e) {
        // Not much we can recover from here
        log.error("Error stopping CDC client", e);
    } finally {
        queue.clear();
        currentSession.rollback();
    }
    context.yield();
}
So when I stop the processor and restart it, it works fine again because the 
queue is cleaned up;
--------------------------
while (isScheduled() && (rawBinlogEvent = queue.poll()) != null) {}
Now, we just need to figure out what's causing the while to fail to 'Drain the 
queue'

> CaptureChangeMySQL consumes binlog backlog
> ------------------------------------------
>
>                 Key: NIFI-12812
>                 URL: https://issues.apache.org/jira/browse/NIFI-12812
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: C2
>    Affects Versions: 1.23.2
>         Environment: java version "17.0.7" 2023-04-18 LTS
> Java(TM) SE Runtime Environment (build 17.0.7+8-LTS-224)
> Java HotSpot(TM) 64-Bit Server VM (build 17.0.7+8-LTS-224, mixed mode, 
> sharing)
> liuxu 3.10.0-1160.92.1.el7.x86_64
>            Reporter: xiyang
>            Priority: Major
>
>  
> I used CaptureChangeMySQL 1.23.3 for the consumption of binlogs.  In the long 
> running process, we found that there will often be stagnation (we did the 
> monitoring, extracted the binlog position of the main database and the 
> comparison of binlog.position), Delayed consumption or backlog occurs;  Or in 
> the case of a large number of updates and inserts, binlog. filename may be 
> backlogged by 10 or more. 
> We tried increasing the number of Events Per FlowFile and setting Include 
> Begin/Commit Events to true, which might alleviate the backlog caused by a 
> large number of updates and inserts, but there was another case where data 
> seemed to be sent intermittently (like a batch, Data in memory or somewhere 
> else), we try to stop the processor, and the processor will send all the 
> previously cached data; When the Events Per FlowFile is set to the default 
> value, at the insertion rate of 3000+ lines per second, the backlog situation 
> occurs and the catch-up speed is very slow, even when the binlog purge does 
> not catch up, eventually leading to the binlog flie is not found. How to 
> implement or solve the binlog (cdc) real-time output problem
>  
> Best regards!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to