masterOcean opened a new issue, #181: URL: https://github.com/apache/bifromq/issues/181
## ⚠️ *heap memory leak fbecause the ConcurrentLinkedQueue in Batcher is unbounded and filled* ### **Describe the bug** <img width="2480" height="1257" alt="Image" src="https://github.com/user-attachments/assets/8589319a-4d88-46af-883f-e73270cfad96" /> <img width="2514" height="1065" alt="Image" src="https://github.com/user-attachments/assets/33f7e58f-9e4d-4d54-93a2-3a13869d30b4" /> <img width="1161" height="614" alt="Image" src="https://github.com/user-attachments/assets/4aa82f51-37e4-49e0-8102-a0de96e3bec5" /> the batcher_call_sched metric is normal, but batcher_call_time metric is zero, because the dist req is pushed into the callTaskBuffers, but in batchAndEmit() not polled. I suppose the grpc server did not response for a long time, then pipelineDepth.getAndDecrement() do not execute, then the heap is filled by the dist req. ``` public final CompletableFuture<CallResultT> submit(BatcherKeyT batcherKey, CallT request) { if (avgLatencyNanos.estimate() < burstLatencyNanos) { ICallTask<CallT, CallResultT, BatcherKeyT> callTask = new CallTask<>(batcherKey, request); boolean offered = callTaskBuffers.offer(callTask); assert offered; trigger(); return callTask.resultPromise(); } else { dropCounter.increment(); return CompletableFuture.failedFuture(new BackPressureException("Too high average latency")); } } private void trigger() { if (triggering.compareAndSet(false, true)) { try { if (!callTaskBuffers.isEmpty() && pipelineDepth.get() < maxPipelineDepth.get()) { batchAndEmit(); } } catch (Throwable e) { log.error("Unexpected exception", e); } finally { triggering.set(false); if (!callTaskBuffers.isEmpty() && pipelineDepth.get() < maxPipelineDepth.get()) { trigger(); } } } } private void batchAndEmit() { pipelineDepth.incrementAndGet(); long buildStart = System.nanoTime(); IBatchCall<CallT, CallResultT, BatcherKeyT> batchCall = batchPool.poll(); assert batchCall != null; int batchSize = 0; LinkedList<ICallTask<CallT, CallResultT, BatcherKeyT>> batchedTasks = new LinkedList<>(); ICallTask<CallT, CallResultT, BatcherKeyT> callTask; while (batchSize < maxBatchSize && (callTask = callTaskBuffers.poll()) != null) { batchCall.add(callTask); batchedTasks.add(callTask); batchSize++; queueingTimeSummary.record(System.nanoTime() - callTask.ts()); } batchSizeSummary.record(batchSize); long execStart = System.nanoTime(); batchBuildTimeSummary.record((execStart - buildStart)); final int finalBatchSize = batchSize; batchCall.execute() .whenComplete((v, e) -> { long execEnd = System.nanoTime(); if (e != null) { log.error("Unexpected exception during handling batchcall result", e); // reset max batch size maxBatchSize = 1; } else { long thisLatency = execEnd - execStart; if (thisLatency > 0) { updateMaxBatchSize(finalBatchSize, thisLatency); } batchExecTimer.record(thisLatency, TimeUnit.NANOSECONDS); } batchedTasks.forEach(t -> { long callLatency = execEnd - t.ts(); avgLatencyNanos.observe(callLatency); batchCallTimer.record(callLatency, TimeUnit.NANOSECONDS); }); batchCall.reset(); batchPool.offer(batchCall); pipelineDepth.getAndDecrement(); if (!callTaskBuffers.isEmpty()) { trigger(); } }); } ``` #### **Environment** - Version: [3.21] - JVM Version: [OpenJDK17] - Hardware Spec: [32c64g] - OS: [CentOS 7] -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
