This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b625cea96 Improve performance for ReadableInputStreamFrameChannel 
(#13373)
5b625cea96 is described below

commit 5b625cea96450f284ad273b5619fb1e2eeaea42d
Author: Rohan Garg <[email protected]>
AuthorDate: Fri Nov 18 18:26:08 2022 +0530

    Improve performance for ReadableInputStreamFrameChannel (#13373)
    
    * Improve performance for ReadableInputStreamFrameChannel
    
    * Fix race condition leading to unnecessary sleep
---
 .../frame/channel/ReadableInputStreamFrameChannel.java | 18 ++++++++++++++++--
 1 file changed, 16 insertions(+), 2 deletions(-)

diff --git 
a/processing/src/main/java/org/apache/druid/frame/channel/ReadableInputStreamFrameChannel.java
 
b/processing/src/main/java/org/apache/druid/frame/channel/ReadableInputStreamFrameChannel.java
index d804e9d1cb..f06302b492 100644
--- 
a/processing/src/main/java/org/apache/druid/frame/channel/ReadableInputStreamFrameChannel.java
+++ 
b/processing/src/main/java/org/apache/druid/frame/channel/ReadableInputStreamFrameChannel.java
@@ -58,6 +58,8 @@ public class ReadableInputStreamFrameChannel implements 
ReadableFrameChannel
 
   private volatile boolean keepReading = true;
 
+  private final Object readMonitor = new Object();
+
   private final ExecutorService executorService;
 
   /**
@@ -152,7 +154,11 @@ public class ReadableInputStreamFrameChannel implements 
ReadableFrameChannel
       while (true) {
         if (!keepReading) {
           try {
-            Thread.sleep(nextRetrySleepMillis(nTry));
+            synchronized (readMonitor) {
+              if (!keepReading) {
+                readMonitor.wait(nextRetrySleepMillis(nTry));
+              }
+            }
             synchronized (lock) {
               if (inputStreamFinished || inputStreamError || 
delegate.isErrorOrFinished()) {
                 return;
@@ -186,7 +192,15 @@ public class ReadableInputStreamFrameChannel implements 
ReadableFrameChannel
                 totalInputStreamBytesRead += bytesRead;
                 if (backpressureFuture != null) {
                   keepReading = false;
-                  backpressureFuture.addListener(() -> keepReading = true, 
Execs.directExecutor());
+                  backpressureFuture.addListener(
+                      () -> {
+                        synchronized (readMonitor) {
+                          keepReading = true;
+                          readMonitor.notify();
+                        }
+                      },
+                      Execs.directExecutor()
+                  );
                 } else {
                   keepReading = true;
                   // continue adding data to delegate


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to