SteNicholas commented on code in PR #2815:
URL: https://github.com/apache/celeborn/pull/2815#discussion_r1805950379


##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java:
##########
@@ -132,4 +109,66 @@ public void close() {
     stopFlag = true;
     requests.clear();
   }
+
+  private class DispatcherRunnable implements Runnable {
+
+    public DispatcherRunnable() {}
+
+    @Override
+    public void run() {
+      while (!stopFlag) {
+        try {
+          ReadBufferRequest request;
+          request = requests.poll(1000, TimeUnit.MILLISECONDS);
+          List<ByteBuf> buffers = new ArrayList<>();
+          try {
+            if (request != null) {
+              processBufferRequest(request, buffers);
+            } else {
+              // Free buffer pool memory to main direct memory when dispatcher 
is idle.
+              readBufferAllocator.trimCurrentThreadCache();
+            }
+          } catch (Throwable e) {
+            logger.error(e.getMessage(), e);
+            try {
+              // recycle all allocated buffers
+              for (ByteBuf buffer : buffers) {
+                recycle(buffer);
+              }
+            } catch (Throwable e1) {
+              logger.error("Recycle read buffer failed.", e1);
+            }
+            request.getBufferListener().notifyBuffers(null, e);
+          }
+        } catch (Throwable e) {
+          logger.error("Read buffer dispatcher encountered error: {}", 
e.getMessage(), e);
+        }
+      }
+    }
+
+    void processBufferRequest(ReadBufferRequest request, List<ByteBuf> 
buffers) {
+      long start = System.nanoTime();
+      int bufferSize = request.getBufferSize();
+      while (buffers.size() < request.getNumber()) {
+        if (memoryManager.readBufferAvailable(bufferSize)) {
+          ByteBuf buf = readBufferAllocator.buffer(bufferSize, bufferSize);
+          buffers.add(buf);
+          memoryManager.changeReadBufferCounter(bufferSize);
+          allocatedReadBuffers.increment();
+        } else {
+          try {
+            // If dispatcher can not allocate requested buffers, it will wait 
here until
+            // necessary buffers are get.
+            Thread.sleep(readBufferAllocationWait);
+          } catch (InterruptedException e) {
+            logger.info("Buffer dispatcher is closing");

Review Comment:
   ```suggestion
               logger.warn("Buffer dispatcher is closing");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to