peterh-wob opened a new issue, #25030:
URL: https://github.com/apache/pulsar/issues/25030

   ### Search before reporting
   
   - [x] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Read release policy
   
   - [x] I understand that [unsupported 
versions](https://pulsar.apache.org/contribute/release-policy/#supported-versions)
 don't get bug fixes. I will attempt to reproduce the issue on a supported 
version of Pulsar client and Pulsar broker.
   
   
   ### User environment
   
   - Client library type: Java
   - Client library version: 4.1.0
   - Client Java version: 21
   
   ### Issue Description
   
   The 
[JdbcAbstractSink.write](https://github.com/apache/pulsar/blob/master/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java#L180)
 method adds records to an unbounded incomingList without checking if batchSize 
is exceeded.
   
   The 
[JavaInstanceRunnable](https://github.com/apache/pulsar/blob/master/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java#L351)
 runtime pushes messages as fast as possible because write never blocks.
   
   If the Database is slower than the consumption rate, incomingList grows 
indefinitely until OOM.
   
   Proposed fix: Use wait/notify mechanism to block write when 
incomingList.size() >= batchSize.
   
   ### Error messages
   
   ```text
   java.lang.OutOfMemoryError: Java heap space
   Dumping heap to /pulsar/tmp/heapdump-%p.hprof 
   
   
https://github.com/user-attachments/assets/636cea5c-3837-4189-a837-88019384d166
   ```
   
   ### Reproducing the issue
   
   1. Set up a JDBC Sink with a constrained batchSize (e.g., 5) and a very high 
timeoutMs (e.g., 100000) to prevent time-based flushing from clearing the queue 
immediately.
   2. Use a mock or slow database connection that cannot process records 
instantly (or simply pause the flush thread in a debugger/test).
   3. Produce messages rapidly to the input topic.
   
   **Expected behavior**: The `sink.write()` method should block once the 
incomingList reaches the batchSize, waiting for the flush thread to clear the 
batch.
   
   **Actual behavior**:
   The pulsar function continues to consume messages regardless of the 
receiverQueueSize and the jdbc sink's batch size from the topic and calls 
sink.write. This method returns immediately for every received message. Because 
of this, the internal incomingList in JdbcAbstractSink grows indefinitely. If 
the database can't keep up the pace, it'll result in an OOM error.
   
   
   ### Additional information
   
   Proof of the leak:
   
   <img width="1971" height="525" alt="Image" 
src="https://github.com/user-attachments/assets/d26af2c8-1339-4174-b93c-9933f46ef7d2";
 />
   
   ### Are you willing to submit a PR?
   
   - [x] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to