This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 8986f4b33a NIFI-6501 Refactor CaptureChangeMySQL with bounded queue of
1000
8986f4b33a is described below
commit 8986f4b33a86e9bd646b7c20da852610e9f7d5ea
Author: Matthew Burgess <[email protected]>
AuthorDate: Fri Dec 16 13:46:15 2022 -0500
NIFI-6501 Refactor CaptureChangeMySQL with bounded queue of 1000
This closes #6791
Signed-off-by: David Handermann <[email protected]>
---
.../nifi/cdc/mysql/event/BinlogEventListener.java | 18 +++++++++---------
.../nifi/cdc/mysql/processors/CaptureChangeMySQL.java | 6 ++++--
2 files changed, 13 insertions(+), 11 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogEventListener.java
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogEventListener.java
index 343cced39e..c0f5800f67 100644
---
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogEventListener.java
+++
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogEventListener.java
@@ -28,7 +28,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
public class BinlogEventListener implements BinaryLogClient.EventListener {
- protected final AtomicBoolean stopNow = new AtomicBoolean(false);
+ private final AtomicBoolean stopNow = new AtomicBoolean(false);
private static final int QUEUE_OFFER_TIMEOUT_MSEC = 100;
private final BlockingQueue<RawBinlogEvent> queue;
@@ -49,17 +49,17 @@ public class BinlogEventListener implements
BinaryLogClient.EventListener {
@Override
public void onEvent(Event event) {
- while (!stopNow.get()) {
- RawBinlogEvent ep = new RawBinlogEvent(event,
client.getBinlogFilename());
- try {
+ RawBinlogEvent ep = new RawBinlogEvent(event,
client.getBinlogFilename());
+ try {
+ while (!stopNow.get()) {
if (queue.offer(ep, QUEUE_OFFER_TIMEOUT_MSEC,
TimeUnit.MILLISECONDS)) {
return;
- } else {
- throw new RuntimeException("Unable to add event to the
queue");
}
- } catch (InterruptedException e) {
- throw new RuntimeException("Interrupted while adding event to
the queue");
}
+
+ throw new RuntimeException("Stopped while waiting to enqueue
event");
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Interrupted while adding event to the
queue");
}
}
-}
+}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
index 978b8bc8d7..71bea808e1 100644
---
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
+++
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
@@ -114,6 +114,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -434,7 +435,8 @@ public class CaptureChangeMySQL extends
AbstractSessionFactoryProcessor {
private BinlogLifecycleListener lifecycleListener;
private GtidSet gtidSet;
- private final LinkedBlockingQueue<RawBinlogEvent> queue = new
LinkedBlockingQueue<>();
+ // Set queue capacity to avoid excessive memory consumption
+ private final BlockingQueue<RawBinlogEvent> queue = new
LinkedBlockingQueue<>(1000);
private volatile String currentBinlogFile = null;
private volatile long currentBinlogPosition = 4;
private volatile String currentGtidSet = null;
@@ -1192,7 +1194,7 @@ public class CaptureChangeMySQL extends
AbstractSessionFactoryProcessor {
* @param q A queue used to communicate events between the listener
and the NiFi processor thread.
* @return A BinlogEventListener instance, which will be notified of
events associated with the specified client
*/
- BinlogEventListener createBinlogEventListener(BinaryLogClient client,
LinkedBlockingQueue<RawBinlogEvent> q) {
+ BinlogEventListener createBinlogEventListener(BinaryLogClient client,
BlockingQueue<RawBinlogEvent> q) {
return new BinlogEventListener(client, q);
}