I'm not very familiar with this processor, but I think we should
probably set a size on the blocking queue so that it can't grow
indefinitely, and possibly make the size configurable as a property of
the processor.

On Sat, Jul 13, 2019 at 2:05 PM Purushotham Pushpavanthar
<[email protected]> wrote:
>
> Hi,
>
> I've been trying to run CDC in 3 node NiFi (ver : 1.9.2) cluster similar to
> the what is illustrated here
> <https://community.hortonworks.com/articles/113941/change-data-capture-cdc-with-apache-nifi-version-1-1.html>
> .
> However, I've been facing below issue because of scale. When I start the
> processor, the JVM heap(12 GB) utilization reaches 100% on primary node and
> then crashes. Same repeats with the other nodes when new Cluster
> coordinator/Primary node is elected.
>
> I tried debugging this issue and drafted out below details for discussion.
>
> The processor initializes an unbounded *LinkedBlockingQueue* and registers
> it with the listener to binlog client.
> The BinaryLogClient reads the binlog files and adds it into the Queue. When
> triggered the processor drains the Queue and writes the events to flowfile
> and transfer it to SUCCESS relationship in a single thread. However, when
> throughput in the database is huge, the queue gets flooded with events and
> single threaded processor fails to catch up and results in bloating up the
> JVM and the primary node.
> Below are the reasons I suspect most (Feel free to correct me if I'm wrong.
> Let's debate on this for better understanding).
>
>    1. Due to decoupled nature of this Queue with respect to NiFi
>    connections, the back pressure configurations doesn't have any control on
>    throughput of BinaryLogClient.
>    2. I tried increasing JVM memory settings from Xms3G and Xmx3G to *Xms32G
>    and Xmx32G.* If not for CDC, our cluster used run at 20%-70% heap
>    utilization with Xmx3G (past 4 months). I'm unable to budget the JVM usage.
>    There should be a limit on how much share a processor can take out of the
>    cluster.
>    3. Having CaptureChangeMySQL as single threaded processor, running on
>    primary node adds to the above issue.
>    4. The processor doesn't have batching. Ends up creating too many
>    flowfiles whose content size is comparable to the its flowfile attributes
>    in memory.
>
> I'm posting this thread to initiate discussion on how to solve this issue.
> Please share your experiences if you have faced similar issue
> in CaptureChangeMySQL processor or any other NiFi processor.
> What work around did you follow? How did you fix it?
> Is NiFi a right tool for CDC use case?
> If so should we have a separate cluster for per CDC pipeline based on the
> scale since executing CaptureChangeMySQL in primary node is bottleneck?
>
> *Cluster specs :*
> 3 Node Cluster
>
>    - Model : c5.2xlarge
>    - vCPU : 8
>    - Memory (GiB) : 16
>    - Instance Storage (GiB) : 200 (EBS)
>    - Network Bandwidth (Gbps) : Up to 10
>    - EBS Bandwidth (Mbps) : Up to 3,500
>
> *Nifi Configs :*
> *Bootstrap.conf*
>
>    - java.arg.2=-Xms12G
>    - java.arg.3=-Xmx12G
>    - java.arg.5=-Dsun.net.http.allowRestrictedHeaders=true
>    - java.arg.6=-Djava.protocol.handler.pkgs=sun.net.www.protocol
>    - java.arg.7=-XX:ReservedCodeCacheSize=256m
>    - java.arg.8=-XX:CodeCacheMinimumFreeSpace=10m
>    - java.arg.9=-XX:+UseCodeCacheFlushing
>
>
> *nifi.properties *(I can share more configs if needed)
>
>    - nifi.queue.backpressure.count=100000
>    - nifi.queue.backpressure.size=1 GB
>
>
> Regards,
> Purushotham Pushpavanth

Reply via email to