QiuYucheng2003 opened a new issue, #25135:
URL: https://github.com/apache/pulsar/issues/25135

   ### Search before reporting
   
   - [x] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Read release policy
   
   - [x] I understand that [unsupported 
versions](https://pulsar.apache.org/contribute/release-policy/#supported-versions)
 don't get bug fixes. I will attempt to reproduce the issue on a supported 
version of Pulsar client and Pulsar broker.
   
   
   ### User environment
   
   Broker version: Master
   Java version: Java8
   
   ### Issue Description
   
   The FileSource connector (org.apache.pulsar.io.file.FileSource) initializes 
internal blocking queues (workQueue, inProcess, recentlyProcessed) using the 
default new LinkedBlockingQueue<>() constructor.
   
   Internal queues should have a bounded capacity to provide backpressure. This 
prevents memory exhaustion when the producer (file listing) creates tasks 
faster than the consumer (file processing) can handle them.
   
   The default constructor sets the capacity to Integer.MAX_VALUE. If the 
FileListingThread scans files significantly faster than the FileConsumerThread 
can process them (e.g., large files or slow I/O), the workQueue will grow 
indefinitely, eventually leading to a java.lang.OutOfMemoryError.
   
   This is a classic "Unbounded Size Cache Queue" (UBSCQ) pattern. It lacks 
protection against traffic spikes or slow consumption rates.
   
   ### Error messages
   
   ```text
   java.lang.OutOfMemoryError: Java heap space
       at 
java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:416)
       at 
org.apache.pulsar.io.file.FileListingThread.run(FileListingThread.java:...)
   ```
   
   ### Reproducing the issue
   
   1. Configure a FileSource connector to monitor a directory.
   
   2. Generate a scenario where the source directory contains a massive number 
of files (high production rate).
   
   3. Ensure the file processing logic is slower than the disk listing speed 
(simulation of backpressure accumulation).
   
   4. Monitor the JVM Heap memory; the workQueue object size will grow until 
OOM occurs.
   
   Relevant code: Location: 
pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileSource.java
   // Line 39-41: Queues are unbounded
   private final BlockingQueue<File> workQueue = new LinkedBlockingQueue<>();
   private final BlockingQueue<File> inProcess = new LinkedBlockingQueue<>();
   private final BlockingQueue<File> recentlyProcessed = new 
LinkedBlockingQueue<>();
   
   // Line 49: ThreadPool is also unbounded
   executor = Executors.newFixedThreadPool(fileConfig.getNumWorkers() + 2);
   
   ### Additional information
   
   Suggestion for fix: Replace the unbounded initialization with a configurable 
or fixed capacity to enable blocking behavior (backpressure) when the queue is 
full.
   
   // Example fix
   private final BlockingQueue<File> workQueue = new 
LinkedBlockingQueue<>(10000);
   
   This ensures the FileListingThread blocks when the consumer cannot keep up, 
preventing OOM.
   
   ### Are you willing to submit a PR?
   
   - [x] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to