masterOcean opened a new issue, #181:
URL: https://github.com/apache/bifromq/issues/181

   
   ## ⚠️ *heap memory leak fbecause the ConcurrentLinkedQueue in Batcher is 
unbounded and filled*
   
   
   ### **Describe the bug**
   
   <img width="2480" height="1257" alt="Image" 
src="https://github.com/user-attachments/assets/8589319a-4d88-46af-883f-e73270cfad96";
 />
   
   <img width="2514" height="1065" alt="Image" 
src="https://github.com/user-attachments/assets/33f7e58f-9e4d-4d54-93a2-3a13869d30b4";
 />
   
   <img width="1161" height="614" alt="Image" 
src="https://github.com/user-attachments/assets/4aa82f51-37e4-49e0-8102-a0de96e3bec5";
 />
   the batcher_call_sched metric is normal, but batcher_call_time metric is 
zero, because the dist req is  pushed into the callTaskBuffers, but in 
batchAndEmit() not polled. I suppose the grpc server did not response for a 
long time, then pipelineDepth.getAndDecrement() do not execute, then the heap 
is filled by the dist req.
   ```
      public final CompletableFuture<CallResultT> submit(BatcherKeyT 
batcherKey, CallT request) {
           if (avgLatencyNanos.estimate() < burstLatencyNanos) {
               ICallTask<CallT, CallResultT, BatcherKeyT> callTask = new 
CallTask<>(batcherKey, request);
               boolean offered = callTaskBuffers.offer(callTask);
               assert offered;
               trigger();
               return callTask.resultPromise();
           } else {
               dropCounter.increment();
               return CompletableFuture.failedFuture(new 
BackPressureException("Too high average latency"));
           }
       }
   
       private void trigger() {
           if (triggering.compareAndSet(false, true)) {
               try {
                   if (!callTaskBuffers.isEmpty() && pipelineDepth.get() < 
maxPipelineDepth.get()) {
                       batchAndEmit();
                   }
               } catch (Throwable e) {
                   log.error("Unexpected exception", e);
               } finally {
                   triggering.set(false);
                   if (!callTaskBuffers.isEmpty() && pipelineDepth.get() < 
maxPipelineDepth.get()) {
                       trigger();
                   }
               }
           }
       }
   
    private void batchAndEmit() {
           pipelineDepth.incrementAndGet();
           long buildStart = System.nanoTime();
           IBatchCall<CallT, CallResultT, BatcherKeyT> batchCall = 
batchPool.poll();
           assert batchCall != null;
           int batchSize = 0;
           LinkedList<ICallTask<CallT, CallResultT, BatcherKeyT>> batchedTasks 
= new LinkedList<>();
           ICallTask<CallT, CallResultT, BatcherKeyT> callTask;
           while (batchSize < maxBatchSize && (callTask = 
callTaskBuffers.poll()) != null) {
               batchCall.add(callTask);
               batchedTasks.add(callTask);
               batchSize++;
               queueingTimeSummary.record(System.nanoTime() - callTask.ts());
           }
           batchSizeSummary.record(batchSize);
           long execStart = System.nanoTime();
           batchBuildTimeSummary.record((execStart - buildStart));
           final int finalBatchSize = batchSize;
           batchCall.execute()
               .whenComplete((v, e) -> {
                   long execEnd = System.nanoTime();
                   if (e != null) {
                       log.error("Unexpected exception during handling 
batchcall result", e);
                       // reset max batch size
                       maxBatchSize = 1;
                   } else {
                       long thisLatency = execEnd - execStart;
                       if (thisLatency > 0) {
                           updateMaxBatchSize(finalBatchSize, thisLatency);
                       }
                       batchExecTimer.record(thisLatency, TimeUnit.NANOSECONDS);
                   }
                   batchedTasks.forEach(t -> {
                       long callLatency = execEnd - t.ts();
                       avgLatencyNanos.observe(callLatency);
                       batchCallTimer.record(callLatency, TimeUnit.NANOSECONDS);
                   });
                   batchCall.reset();
                   batchPool.offer(batchCall);
                   pipelineDepth.getAndDecrement();
                   if (!callTaskBuffers.isEmpty()) {
                       trigger();
                   }
               });
       }
   ```
   
   
   #### **Environment**
   
   - Version: [3.21]
   - JVM Version: [OpenJDK17]
   - Hardware Spec: [32c64g]
   - OS: [CentOS 7]
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to