This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 5b625cea96 Improve performance for ReadableInputStreamFrameChannel
(#13373)
5b625cea96 is described below
commit 5b625cea96450f284ad273b5619fb1e2eeaea42d
Author: Rohan Garg <[email protected]>
AuthorDate: Fri Nov 18 18:26:08 2022 +0530
Improve performance for ReadableInputStreamFrameChannel (#13373)
* Improve performance for ReadableInputStreamFrameChannel
* Fix race condition leading to unnecessary sleep
---
.../frame/channel/ReadableInputStreamFrameChannel.java | 18 ++++++++++++++++--
1 file changed, 16 insertions(+), 2 deletions(-)
diff --git
a/processing/src/main/java/org/apache/druid/frame/channel/ReadableInputStreamFrameChannel.java
b/processing/src/main/java/org/apache/druid/frame/channel/ReadableInputStreamFrameChannel.java
index d804e9d1cb..f06302b492 100644
---
a/processing/src/main/java/org/apache/druid/frame/channel/ReadableInputStreamFrameChannel.java
+++
b/processing/src/main/java/org/apache/druid/frame/channel/ReadableInputStreamFrameChannel.java
@@ -58,6 +58,8 @@ public class ReadableInputStreamFrameChannel implements
ReadableFrameChannel
private volatile boolean keepReading = true;
+ private final Object readMonitor = new Object();
+
private final ExecutorService executorService;
/**
@@ -152,7 +154,11 @@ public class ReadableInputStreamFrameChannel implements
ReadableFrameChannel
while (true) {
if (!keepReading) {
try {
- Thread.sleep(nextRetrySleepMillis(nTry));
+ synchronized (readMonitor) {
+ if (!keepReading) {
+ readMonitor.wait(nextRetrySleepMillis(nTry));
+ }
+ }
synchronized (lock) {
if (inputStreamFinished || inputStreamError ||
delegate.isErrorOrFinished()) {
return;
@@ -186,7 +192,15 @@ public class ReadableInputStreamFrameChannel implements
ReadableFrameChannel
totalInputStreamBytesRead += bytesRead;
if (backpressureFuture != null) {
keepReading = false;
- backpressureFuture.addListener(() -> keepReading = true,
Execs.directExecutor());
+ backpressureFuture.addListener(
+ () -> {
+ synchronized (readMonitor) {
+ keepReading = true;
+ readMonitor.notify();
+ }
+ },
+ Execs.directExecutor()
+ );
} else {
keepReading = true;
// continue adding data to delegate
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]