This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 8dce3ca4d5 OOM fix for running MSQ jobs with
`intermediateSuperSorterStorageMaxLocalBytes` set (#13974)
8dce3ca4d5 is described below
commit 8dce3ca4d594ddc498c1372d4d6cef3c6c3ed056
Author: Karan Kumar <[email protected]>
AuthorDate: Wed Mar 29 18:00:00 2023 +0530
OOM fix for running MSQ jobs with
`intermediateSuperSorterStorageMaxLocalBytes` set (#13974)
While using intermediateSuperSorterStorageMaxLocalBytes the super sorter
was retaining references of the memory allocator.
The fix clears the current outputChannel when close() is called on the
ComposingWritableFrameChannel.java
---
.../channel/ComposingWritableFrameChannel.java | 18 ++++++++++++------
.../channel/ComposingWritableFrameChannelTest.java | 21 +++++++++++++++++++++
2 files changed, 33 insertions(+), 6 deletions(-)
diff --git
a/processing/src/main/java/org/apache/druid/frame/channel/ComposingWritableFrameChannel.java
b/processing/src/main/java/org/apache/druid/frame/channel/ComposingWritableFrameChannel.java
index c576c23696..ed0f56c408 100644
---
a/processing/src/main/java/org/apache/druid/frame/channel/ComposingWritableFrameChannel.java
+++
b/processing/src/main/java/org/apache/druid/frame/channel/ComposingWritableFrameChannel.java
@@ -90,12 +90,7 @@ public class ComposingWritableFrameChannel implements
WritableFrameChannel
// We are converting the corresponding channel to read only after
exhausting it because that channel won't be used
// for writes anymore
- if (outputChannelSuppliers != null) {
- outputChannelSuppliers.get(currentIndex).get().convertToReadOnly();
- }
- if (partitionedOutputChannelSuppliers != null) {
-
partitionedOutputChannelSuppliers.get(currentIndex).get().convertToReadOnly();
- }
+ convertChannelSuppliersToReadOnly(currentIndex);
currentIndex++;
if (currentIndex >= writableChannelSuppliers.size()) {
@@ -105,6 +100,16 @@ public class ComposingWritableFrameChannel implements
WritableFrameChannel
}
}
+ private void convertChannelSuppliersToReadOnly(int index)
+ {
+ if (outputChannelSuppliers != null) {
+ outputChannelSuppliers.get(index).get().convertToReadOnly();
+ }
+ if (partitionedOutputChannelSuppliers != null) {
+ partitionedOutputChannelSuppliers.get(index).get().convertToReadOnly();
+ }
+ }
+
@Override
public void fail(@Nullable Throwable cause) throws IOException
{
@@ -118,6 +123,7 @@ public class ComposingWritableFrameChannel implements
WritableFrameChannel
{
if (currentIndex < writableChannelSuppliers.size()) {
writableChannelSuppliers.get(currentIndex).get().close();
+ convertChannelSuppliersToReadOnly(currentIndex);
currentIndex = writableChannelSuppliers.size();
}
}
diff --git
a/processing/src/test/java/org/apache/druid/frame/channel/ComposingWritableFrameChannelTest.java
b/processing/src/test/java/org/apache/druid/frame/channel/ComposingWritableFrameChannelTest.java
index c2968b3579..eee8ce5e62 100644
---
a/processing/src/test/java/org/apache/druid/frame/channel/ComposingWritableFrameChannelTest.java
+++
b/processing/src/test/java/org/apache/druid/frame/channel/ComposingWritableFrameChannelTest.java
@@ -27,8 +27,11 @@ import
org.apache.druid.frame.allocation.ArenaMemoryAllocator;
import org.apache.druid.frame.processor.OutputChannel;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.ResourceLimitExceededException;
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;
+import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.mockito.Mockito;
import javax.annotation.Nullable;
@@ -89,8 +92,26 @@ public class ComposingWritableFrameChannelTest
Assert.assertEquals(ImmutableSet.of(0), partitionToChannelMap.get(2));
Assert.assertEquals(ImmutableSet.of(1), partitionToChannelMap.get(3));
+
// Test if the older channel has been converted to read only
Assert.assertThrows(ISE.class, outputChannel1::getWritableChannel);
+ composingWritableFrameChannel.close();
+
+ Exception ise1 = Assert.assertThrows(IllegalStateException.class, () ->
outputChannel1.getFrameMemoryAllocator());
+ MatcherAssert.assertThat(
+ ise1,
+ ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo(
+ "Frame allocator is not available. The output channel might be
marked as read-only, hence memory allocator is not required."))
+ );
+
+
+ Exception ise2 = Assert.assertThrows(IllegalStateException.class, () ->
outputChannel2.getFrameMemoryAllocator());
+ MatcherAssert.assertThat(
+ ise2,
+ ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo(
+ "Frame allocator is not available. The output channel might be
marked as read-only, hence memory allocator is not required."))
+ );
+
}
static class LimitedWritableFrameChannel implements WritableFrameChannel
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]