peterh-wob opened a new issue, #25030: URL: https://github.com/apache/pulsar/issues/25030
### Search before reporting - [x] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Read release policy - [x] I understand that [unsupported versions](https://pulsar.apache.org/contribute/release-policy/#supported-versions) don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker. ### User environment - Client library type: Java - Client library version: 4.1.0 - Client Java version: 21 ### Issue Description The [JdbcAbstractSink.write](https://github.com/apache/pulsar/blob/master/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java#L180) method adds records to an unbounded incomingList without checking if batchSize is exceeded. The [JavaInstanceRunnable](https://github.com/apache/pulsar/blob/master/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java#L351) runtime pushes messages as fast as possible because write never blocks. If the Database is slower than the consumption rate, incomingList grows indefinitely until OOM. Proposed fix: Use wait/notify mechanism to block write when incomingList.size() >= batchSize. ### Error messages ```text java.lang.OutOfMemoryError: Java heap space Dumping heap to /pulsar/tmp/heapdump-%p.hprof https://github.com/user-attachments/assets/636cea5c-3837-4189-a837-88019384d166 ``` ### Reproducing the issue 1. Set up a JDBC Sink with a constrained batchSize (e.g., 5) and a very high timeoutMs (e.g., 100000) to prevent time-based flushing from clearing the queue immediately. 2. Use a mock or slow database connection that cannot process records instantly (or simply pause the flush thread in a debugger/test). 3. Produce messages rapidly to the input topic. **Expected behavior**: The `sink.write()` method should block once the incomingList reaches the batchSize, waiting for the flush thread to clear the batch. **Actual behavior**: The pulsar function continues to consume messages regardless of the receiverQueueSize and the jdbc sink's batch size from the topic and calls sink.write. This method returns immediately for every received message. Because of this, the internal incomingList in JdbcAbstractSink grows indefinitely. If the database can't keep up the pace, it'll result in an OOM error. ### Additional information Proof of the leak: <img width="1971" height="525" alt="Image" src="https://github.com/user-attachments/assets/d26af2c8-1339-4174-b93c-9933f46ef7d2" /> ### Are you willing to submit a PR? - [x] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
