This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 8986f4b33a NIFI-6501 Refactor CaptureChangeMySQL with bounded queue of 
1000
8986f4b33a is described below

commit 8986f4b33a86e9bd646b7c20da852610e9f7d5ea
Author: Matthew Burgess <[email protected]>
AuthorDate: Fri Dec 16 13:46:15 2022 -0500

    NIFI-6501 Refactor CaptureChangeMySQL with bounded queue of 1000
    
    This closes #6791
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../nifi/cdc/mysql/event/BinlogEventListener.java      | 18 +++++++++---------
 .../nifi/cdc/mysql/processors/CaptureChangeMySQL.java  |  6 ++++--
 2 files changed, 13 insertions(+), 11 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogEventListener.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogEventListener.java
index 343cced39e..c0f5800f67 100644
--- 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogEventListener.java
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogEventListener.java
@@ -28,7 +28,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
  */
 public class BinlogEventListener implements BinaryLogClient.EventListener {
 
-    protected final AtomicBoolean stopNow = new AtomicBoolean(false);
+    private final AtomicBoolean stopNow = new AtomicBoolean(false);
     private static final int QUEUE_OFFER_TIMEOUT_MSEC = 100;
 
     private final BlockingQueue<RawBinlogEvent> queue;
@@ -49,17 +49,17 @@ public class BinlogEventListener implements 
BinaryLogClient.EventListener {
 
     @Override
     public void onEvent(Event event) {
-        while (!stopNow.get()) {
-            RawBinlogEvent ep = new RawBinlogEvent(event, 
client.getBinlogFilename());
-            try {
+        RawBinlogEvent ep = new RawBinlogEvent(event, 
client.getBinlogFilename());
+        try {
+            while (!stopNow.get()) {
                 if (queue.offer(ep, QUEUE_OFFER_TIMEOUT_MSEC, 
TimeUnit.MILLISECONDS)) {
                     return;
-                } else {
-                    throw new RuntimeException("Unable to add event to the 
queue");
                 }
-            } catch (InterruptedException e) {
-                throw new RuntimeException("Interrupted while adding event to 
the queue");
             }
+
+            throw new RuntimeException("Stopped while waiting to enqueue 
event");
+        } catch (InterruptedException e) {
+            throw new RuntimeException("Interrupted while adding event to the 
queue");
         }
     }
-}
+}
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
index 978b8bc8d7..71bea808e1 100644
--- 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
@@ -114,6 +114,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -434,7 +435,8 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
     private BinlogLifecycleListener lifecycleListener;
     private GtidSet gtidSet;
 
-    private final LinkedBlockingQueue<RawBinlogEvent> queue = new 
LinkedBlockingQueue<>();
+    // Set queue capacity to avoid excessive memory consumption
+    private final BlockingQueue<RawBinlogEvent> queue = new 
LinkedBlockingQueue<>(1000);
     private volatile String currentBinlogFile = null;
     private volatile long currentBinlogPosition = 4;
     private volatile String currentGtidSet = null;
@@ -1192,7 +1194,7 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
      * @param q      A queue used to communicate events between the listener 
and the NiFi processor thread.
      * @return A BinlogEventListener instance, which will be notified of 
events associated with the specified client
      */
-    BinlogEventListener createBinlogEventListener(BinaryLogClient client, 
LinkedBlockingQueue<RawBinlogEvent> q) {
+    BinlogEventListener createBinlogEventListener(BinaryLogClient client, 
BlockingQueue<RawBinlogEvent> q) {
         return new BinlogEventListener(client, q);
     }
 

Reply via email to