This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b0b3a9b2c Add a readOnly() method for PartitionedOutputChannel 
(#13755)
5b0b3a9b2c is described below

commit 5b0b3a9b2c08fd988a1967d760eaf9b2b819b1b6
Author: Laksh Singla <[email protected]>
AuthorDate: Fri Mar 10 06:58:00 2023 +0530

    Add a readOnly() method for PartitionedOutputChannel (#13755)
    
    With SuperSorter using the PartitionedOutputChannels for sorting, it might 
OOM on inputs of reasonable size because the channel consists of both the 
writable frame channel and the frame allocator, both of which are not required 
once the output channel has been written to.
    This change adds a readOnly to the output channel which contains only the 
readable channel, due to which unnecessary memory references to the writable 
channel and the memory allocator are lost once the output channel has been 
written to, preventing the OOM.
---
 .../channel/ComposingWritableFrameChannel.java     |  54 ++++++--
 .../processor/ComposingOutputChannelFactory.java   |  16 ++-
 .../druid/frame/processor/OutputChannel.java       |  38 ++++--
 .../frame/processor/PartitionedOutputChannel.java  |  48 +++++--
 .../apache/druid/frame/processor/SuperSorter.java  |   6 +-
 .../channel/ComposingWritableFrameChannelTest.java | 144 +++++++++++++++++++++
 .../druid/frame/processor/OutputChannelTest.java   |   6 +-
 .../druid/frame/processor/OutputChannelsTest.java  |   6 +-
 8 files changed, 279 insertions(+), 39 deletions(-)

diff --git 
a/processing/src/main/java/org/apache/druid/frame/channel/ComposingWritableFrameChannel.java
 
b/processing/src/main/java/org/apache/druid/frame/channel/ComposingWritableFrameChannel.java
index 7f2a61e437..c576c23696 100644
--- 
a/processing/src/main/java/org/apache/druid/frame/channel/ComposingWritableFrameChannel.java
+++ 
b/processing/src/main/java/org/apache/druid/frame/channel/ComposingWritableFrameChannel.java
@@ -22,6 +22,9 @@ package org.apache.druid.frame.channel;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.frame.processor.OutputChannel;
+import org.apache.druid.frame.processor.PartitionedOutputChannel;
+import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.query.ResourceLimitExceededException;
 
@@ -38,16 +41,29 @@ import java.util.function.Supplier;
  */
 public class ComposingWritableFrameChannel implements WritableFrameChannel
 {
-  private final List<Supplier<WritableFrameChannel>> channels;
+  @Nullable
+  private final List<Supplier<OutputChannel>> outputChannelSuppliers;
+
+  @Nullable
+  private final List<Supplier<PartitionedOutputChannel>> 
partitionedOutputChannelSuppliers;
+
+  private final List<Supplier<WritableFrameChannel>> writableChannelSuppliers;
   private final Map<Integer, HashSet<Integer>> partitionToChannelMap;
   private int currentIndex;
 
   public ComposingWritableFrameChannel(
-      List<Supplier<WritableFrameChannel>> channels,
+      @Nullable List<Supplier<OutputChannel>> outputChannelSuppliers,
+      @Nullable List<Supplier<PartitionedOutputChannel>> 
partitionedOutputChannelSuppliers,
+      List<Supplier<WritableFrameChannel>> writableChannelSuppliers,
       Map<Integer, HashSet<Integer>> partitionToChannelMap
   )
   {
-    this.channels = Preconditions.checkNotNull(channels, "channels is null");
+    if (outputChannelSuppliers != null && partitionedOutputChannelSuppliers != 
null) {
+      throw new IAE("Atmost one of outputChannelSuppliers and 
partitionedOutputChannelSuppliers can be provided");
+    }
+    this.outputChannelSuppliers = outputChannelSuppliers;
+    this.partitionedOutputChannelSuppliers = partitionedOutputChannelSuppliers;
+    this.writableChannelSuppliers = 
Preconditions.checkNotNull(writableChannelSuppliers, "writableChannelSuppliers 
is null");
     this.partitionToChannelMap =
         Preconditions.checkNotNull(partitionToChannelMap, 
"partitionToChannelMap is null");
     this.currentIndex = 0;
@@ -56,12 +72,12 @@ public class ComposingWritableFrameChannel implements 
WritableFrameChannel
   @Override
   public void write(FrameWithPartition frameWithPartition) throws IOException
   {
-    if (currentIndex >= channels.size()) {
-      throw new ISE("No more channels available to write. Total available 
channels : " + channels.size());
+    if (currentIndex >= writableChannelSuppliers.size()) {
+      throw new ISE("No more channels available to write. Total available 
channels : " + writableChannelSuppliers.size());
     }
 
     try {
-      channels.get(currentIndex).get().write(frameWithPartition);
+      
writableChannelSuppliers.get(currentIndex).get().write(frameWithPartition);
       partitionToChannelMap.computeIfAbsent(frameWithPartition.partition(), k 
-> Sets.newHashSetWithExpectedSize(1))
                            .add(currentIndex);
     }
@@ -70,9 +86,19 @@ public class ComposingWritableFrameChannel implements 
WritableFrameChannel
       // exception is automatically passed up to the user incase all the 
channels are exhausted. If in future, more
       // cases come up to dictate control flow, then we can switch to 
returning a custom object from the channel's write
       // operation.
-      channels.get(currentIndex).get().close();
+      writableChannelSuppliers.get(currentIndex).get().close();
+
+      // We are converting the corresponding channel to read only after 
exhausting it because that channel won't be used
+      // for writes anymore
+      if (outputChannelSuppliers != null) {
+        outputChannelSuppliers.get(currentIndex).get().convertToReadOnly();
+      }
+      if (partitionedOutputChannelSuppliers != null) {
+        
partitionedOutputChannelSuppliers.get(currentIndex).get().convertToReadOnly();
+      }
+
       currentIndex++;
-      if (currentIndex >= channels.size()) {
+      if (currentIndex >= writableChannelSuppliers.size()) {
         throw rlee;
       }
       write(frameWithPartition);
@@ -82,7 +108,7 @@ public class ComposingWritableFrameChannel implements 
WritableFrameChannel
   @Override
   public void fail(@Nullable Throwable cause) throws IOException
   {
-    for (Supplier<WritableFrameChannel> channel : channels) {
+    for (Supplier<WritableFrameChannel> channel : writableChannelSuppliers) {
       channel.get().fail(cause);
     }
   }
@@ -90,21 +116,21 @@ public class ComposingWritableFrameChannel implements 
WritableFrameChannel
   @Override
   public void close() throws IOException
   {
-    if (currentIndex < channels.size()) {
-      channels.get(currentIndex).get().close();
-      currentIndex = channels.size();
+    if (currentIndex < writableChannelSuppliers.size()) {
+      writableChannelSuppliers.get(currentIndex).get().close();
+      currentIndex = writableChannelSuppliers.size();
     }
   }
 
   @Override
   public boolean isClosed()
   {
-    return currentIndex == channels.size();
+    return currentIndex == writableChannelSuppliers.size();
   }
 
   @Override
   public ListenableFuture<?> writabilityFuture()
   {
-    return channels.get(currentIndex).get().writabilityFuture();
+    return 
writableChannelSuppliers.get(currentIndex).get().writabilityFuture();
   }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/frame/processor/ComposingOutputChannelFactory.java
 
b/processing/src/main/java/org/apache/druid/frame/processor/ComposingOutputChannelFactory.java
index 8ca9aa4f6f..cf94262ac3 100644
--- 
a/processing/src/main/java/org/apache/druid/frame/processor/ComposingOutputChannelFactory.java
+++ 
b/processing/src/main/java/org/apache/druid/frame/processor/ComposingOutputChannelFactory.java
@@ -60,6 +60,7 @@ public class ComposingOutputChannelFactory implements 
OutputChannelFactory
   {
     ImmutableList.Builder<Supplier<WritableFrameChannel>> 
writableFrameChannelSuppliersBuilder = ImmutableList.builder();
     ImmutableList.Builder<Supplier<ReadableFrameChannel>> 
readableFrameChannelSuppliersBuilder = ImmutableList.builder();
+    ImmutableList.Builder<Supplier<OutputChannel>> 
outputChannelSupplierBuilder = ImmutableList.builder();
     for (OutputChannelFactory channelFactory : channelFactories) {
       // open channel lazily
       Supplier<OutputChannel> channel =
@@ -71,14 +72,19 @@ public class ComposingOutputChannelFactory implements 
OutputChannelFactory
               throw new UncheckedIOException(e);
             }
           })::get;
+      outputChannelSupplierBuilder.add(channel);
       writableFrameChannelSuppliersBuilder.add(() -> 
channel.get().getWritableChannel());
-      readableFrameChannelSuppliersBuilder.add(() -> 
channel.get().getReadableChannelSupplier().get());
+      // We read the output channel once they have been written to, and 
therefore it is space efficient and safe to
+      // save their read only copies
+      readableFrameChannelSuppliersBuilder.add(() -> 
channel.get().readOnly().getReadableChannelSupplier().get());
     }
 
     // the map maintains a mapping of channels which have the data for a given 
partition.
     // it is useful to identify the readable channels to open in the 
composition while reading the partition data.
     Map<Integer, HashSet<Integer>> partitionToChannelMap = new HashMap<>();
     ComposingWritableFrameChannel writableFrameChannel = new 
ComposingWritableFrameChannel(
+        outputChannelSupplierBuilder.build(),
+        null,
         writableFrameChannelSuppliersBuilder.build(),
         partitionToChannelMap
     );
@@ -103,6 +109,7 @@ public class ComposingOutputChannelFactory implements 
OutputChannelFactory
     ImmutableList.Builder<Supplier<WritableFrameChannel>> 
writableFrameChannelsBuilder = ImmutableList.builder();
     ImmutableList.Builder<Supplier<PartitionedReadableFrameChannel>> 
readableFrameChannelSuppliersBuilder =
         ImmutableList.builder();
+    ImmutableList.Builder<Supplier<PartitionedOutputChannel>> 
partitionedOutputChannelSupplierBuilder = ImmutableList.builder();
     for (OutputChannelFactory channelFactory : channelFactories) {
       Supplier<PartitionedOutputChannel> channel =
           Suppliers.memoize(() -> {
@@ -113,14 +120,19 @@ public class ComposingOutputChannelFactory implements 
OutputChannelFactory
               throw new UncheckedIOException(e);
             }
           })::get;
+      partitionedOutputChannelSupplierBuilder.add(channel);
       writableFrameChannelsBuilder.add(() -> 
channel.get().getWritableChannel());
-      readableFrameChannelSuppliersBuilder.add(() -> 
channel.get().getReadableChannelSupplier().get());
+      // We read the output channel once they have been written to, and 
therefore it is space efficient and safe to
+      // save their read only copies
+      readableFrameChannelSuppliersBuilder.add(() -> 
channel.get().readOnly().getReadableChannelSupplier().get());
     }
     // the map maintains a mapping of channels which have the data for a given 
partition.
     // it is useful to identify the readable channels to open in the 
composition while reading the partition data.
 
     Map<Integer, HashSet<Integer>> partitionToChannelMap = new HashMap<>();
     ComposingWritableFrameChannel writableFrameChannel = new 
ComposingWritableFrameChannel(
+        null,
+        partitionedOutputChannelSupplierBuilder.build(),
         writableFrameChannelsBuilder.build(),
         partitionToChannelMap
     );
diff --git 
a/processing/src/main/java/org/apache/druid/frame/processor/OutputChannel.java 
b/processing/src/main/java/org/apache/druid/frame/processor/OutputChannel.java
index ac0e0a5fac..e1377eddca 100644
--- 
a/processing/src/main/java/org/apache/druid/frame/processor/OutputChannel.java
+++ 
b/processing/src/main/java/org/apache/druid/frame/processor/OutputChannel.java
@@ -21,6 +21,7 @@ package org.apache.druid.frame.processor;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Suppliers;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
 import org.apache.druid.frame.allocation.MemoryAllocator;
 import org.apache.druid.frame.channel.FrameWithPartition;
 import org.apache.druid.frame.channel.ReadableFrameChannel;
@@ -42,11 +43,16 @@ import java.util.function.Supplier;
  */
 public class OutputChannel
 {
+  @GuardedBy("this")
   @Nullable
-  private final WritableFrameChannel writableChannel;
+  private WritableFrameChannel writableChannel;
+
+  @GuardedBy("this")
   @Nullable
-  private final MemoryAllocator frameMemoryAllocator;
+  private MemoryAllocator frameMemoryAllocator;
+
   private final Supplier<ReadableFrameChannel> readableChannelSupplier;
+
   private final boolean readableChannelUsableWhileWriting;
   private final int partitionNumber;
 
@@ -157,12 +163,14 @@ public class OutputChannel
   }
 
   /**
-   * Returns the writable channel of this pair. The producer writes to this 
channel.
+   * Returns the writable channel of this pair. The producer writes to this 
channel. Throws ISE if the output channel is
+   * read only.
    */
-  public WritableFrameChannel getWritableChannel()
+  public synchronized WritableFrameChannel getWritableChannel()
   {
     if (writableChannel == null) {
-      throw new ISE("Writable channel is not available");
+      throw new ISE("Writable channel is not available. The output channel 
might be marked as read-only,"
+                    + " hence no writes are allowed.");
     } else {
       return writableChannel;
     }
@@ -170,11 +178,13 @@ public class OutputChannel
 
   /**
    * Returns the memory allocator for the writable channel. The producer uses 
this to generate frames for the channel.
+   * Throws ISE if the output channel is read only.
    */
-  public MemoryAllocator getFrameMemoryAllocator()
+  public synchronized MemoryAllocator getFrameMemoryAllocator()
   {
     if (frameMemoryAllocator == null) {
-      throw new ISE("Writable channel is not available");
+      throw new ISE("Frame allocator is not available. The output channel 
might be marked as read-only,"
+                    + " hence memory allocator is not required.");
     } else {
       return frameMemoryAllocator;
     }
@@ -197,7 +207,7 @@ public class OutputChannel
   /**
    * Whether {@link #getReadableChannel()} is ready to use.
    */
-  public boolean isReadableChannelReady()
+  public synchronized boolean isReadableChannelReady()
   {
     return readableChannelUsableWhileWriting || writableChannel == null || 
writableChannel.isClosed();
   }
@@ -212,7 +222,7 @@ public class OutputChannel
     return partitionNumber;
   }
 
-  public OutputChannel mapWritableChannel(final Function<WritableFrameChannel, 
WritableFrameChannel> mapFn)
+  public synchronized OutputChannel mapWritableChannel(final 
Function<WritableFrameChannel, WritableFrameChannel> mapFn)
   {
     if (writableChannel == null) {
       return this;
@@ -235,4 +245,14 @@ public class OutputChannel
   {
     return OutputChannel.readOnly(readableChannelSupplier, partitionNumber);
   }
+
+  /**
+   * Removes the reference to the {@link #writableChannel} and {@link 
#frameMemoryAllocator} from the object, making
+   * it more efficient
+   */
+  public synchronized void convertToReadOnly()
+  {
+    this.writableChannel = null;
+    this.frameMemoryAllocator = null;
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/frame/processor/PartitionedOutputChannel.java
 
b/processing/src/main/java/org/apache/druid/frame/processor/PartitionedOutputChannel.java
index 3e455545b0..34ad2c2323 100644
--- 
a/processing/src/main/java/org/apache/druid/frame/processor/PartitionedOutputChannel.java
+++ 
b/processing/src/main/java/org/apache/druid/frame/processor/PartitionedOutputChannel.java
@@ -21,6 +21,7 @@ package org.apache.druid.frame.processor;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Suppliers;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
 import org.apache.druid.frame.allocation.MemoryAllocator;
 import org.apache.druid.frame.channel.PartitionedReadableFrameChannel;
 import org.apache.druid.frame.channel.WritableFrameChannel;
@@ -37,10 +38,15 @@ import java.util.function.Supplier;
  */
 public class PartitionedOutputChannel
 {
+
+  @GuardedBy("this")
   @Nullable
-  private final WritableFrameChannel writableChannel;
+  private WritableFrameChannel writableChannel;
+
+  @GuardedBy("this")
   @Nullable
-  private final MemoryAllocator frameMemoryAllocator;
+  private MemoryAllocator frameMemoryAllocator;
+
   private final Supplier<PartitionedReadableFrameChannel> 
readableChannelSupplier;
 
   private PartitionedOutputChannel(
@@ -76,12 +82,14 @@ public class PartitionedOutputChannel
   }
 
   /**
-   * Returns the writable channel of this pair. The producer writes to this 
channel.
+   * Returns the writable channel of this pair. The producer writes to this 
channel. Throws ISE if the output channel is
+   * read only.
    */
-  public WritableFrameChannel getWritableChannel()
+  public synchronized WritableFrameChannel getWritableChannel()
   {
     if (writableChannel == null) {
-      throw new ISE("Writable channel is not available");
+      throw new ISE("Writable channel is not available. The output channel 
might be marked as read-only,"
+                    + " hence no writes are allowed.");
     } else {
       return writableChannel;
     }
@@ -89,11 +97,13 @@ public class PartitionedOutputChannel
 
   /**
    * Returns the memory allocator for the writable channel. The producer uses 
this to generate frames for the channel.
+   * Throws ISE if the output channel is read only.
    */
-  public MemoryAllocator getFrameMemoryAllocator()
+  public synchronized MemoryAllocator getFrameMemoryAllocator()
   {
     if (frameMemoryAllocator == null) {
-      throw new ISE("Writable channel is not available");
+      throw new ISE("Frame allocator is not available. The output channel 
might be marked as read-only,"
+                    + " hence memory allocator is not required.");
     } else {
       return frameMemoryAllocator;
     }
@@ -102,12 +112,12 @@ public class PartitionedOutputChannel
   /**
    * Returns the partitioned readable channel supplier of this pair. The 
consumer reads from this channel.
    */
-  public Supplier<PartitionedReadableFrameChannel> getReadableChannelSupplier()
+  public synchronized Supplier<PartitionedReadableFrameChannel> 
getReadableChannelSupplier()
   {
     return readableChannelSupplier;
   }
 
-  public PartitionedOutputChannel mapWritableChannel(final 
Function<WritableFrameChannel, WritableFrameChannel> mapFn)
+  public synchronized PartitionedOutputChannel mapWritableChannel(final 
Function<WritableFrameChannel, WritableFrameChannel> mapFn)
   {
     if (writableChannel == null) {
       return this;
@@ -119,4 +129,24 @@ public class PartitionedOutputChannel
       );
     }
   }
+
+
+  /**
+   * Returns a read-only version of this instance. Read-only versions have 
neither {@link #getWritableChannel()} nor
+   * {@link #getFrameMemoryAllocator()}, and therefore require substantially 
less memory.
+   */
+  public PartitionedOutputChannel readOnly()
+  {
+    return new PartitionedOutputChannel(null, null, readableChannelSupplier);
+  }
+
+  /**
+   * Removes the reference to the {@link #writableChannel} and {@link 
#frameMemoryAllocator} from the object, making
+   * it more efficient
+   */
+  public synchronized void convertToReadOnly()
+  {
+    this.writableChannel = null;
+    this.frameMemoryAllocator = null;
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/frame/processor/SuperSorter.java 
b/processing/src/main/java/org/apache/druid/frame/processor/SuperSorter.java
index 1be686be6f..440da49d7c 100644
--- a/processing/src/main/java/org/apache/druid/frame/processor/SuperSorter.java
+++ b/processing/src/main/java/org/apache/druid/frame/processor/SuperSorter.java
@@ -611,7 +611,11 @@ public class SuperSorter
         );
         writableChannel = partitionedOutputChannel.getWritableChannel();
         frameAllocatorFactory = new 
SingleMemoryAllocatorFactory(partitionedOutputChannel.getFrameMemoryAllocator());
-        levelAndRankToReadableChannelMap.put(levelAndRankKey, 
partitionedOutputChannel);
+
+        // We add the readOnly() channel even though we require the 
writableChannel and the frame allocator because
+        // the original partitionedOutputChannel would contain the reference 
to those, which would get cleaned up
+        // appropriately and not be held up in the class level map
+        levelAndRankToReadableChannelMap.put(levelAndRankKey, 
partitionedOutputChannel.readOnly());
       }
 
       final FrameChannelMerger worker =
diff --git 
a/processing/src/test/java/org/apache/druid/frame/channel/ComposingWritableFrameChannelTest.java
 
b/processing/src/test/java/org/apache/druid/frame/channel/ComposingWritableFrameChannelTest.java
new file mode 100644
index 0000000000..c2968b3579
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/frame/channel/ComposingWritableFrameChannelTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.channel;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.allocation.ArenaMemoryAllocator;
+import org.apache.druid.frame.processor.OutputChannel;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.query.ResourceLimitExceededException;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.function.Supplier;
+
+
+public class ComposingWritableFrameChannelTest
+{
+  @Test
+  public void testComposingWritableChannelSwitchesProperly() throws IOException
+  {
+
+    // This frame channel writes a single frame
+    WritableFrameChannel writableFrameChannel1 = new 
LimitedWritableFrameChannel(2);
+    WritableFrameChannel writableFrameChannel2 = new 
LimitedWritableFrameChannel(100);
+
+    Supplier<ReadableFrameChannel> readableFrameChannelSupplier1 = () -> null;
+    Supplier<ReadableFrameChannel> readableFrameChannelSupplier2 = () -> null;
+
+    OutputChannel outputChannel1 = OutputChannel.pair(
+        writableFrameChannel1,
+        ArenaMemoryAllocator.createOnHeap(1),
+        readableFrameChannelSupplier1,
+        1
+    );
+    OutputChannel outputChannel2 = OutputChannel.pair(
+        writableFrameChannel2,
+        ArenaMemoryAllocator.createOnHeap(1),
+        readableFrameChannelSupplier2,
+        2
+    );
+
+    Map<Integer, HashSet<Integer>> partitionToChannelMap = new HashMap<>();
+
+    ComposingWritableFrameChannel composingWritableFrameChannel = new 
ComposingWritableFrameChannel(
+        ImmutableList.of(
+            () -> outputChannel1,
+            () -> outputChannel2
+        ),
+        null,
+        ImmutableList.of(
+            () -> writableFrameChannel1,
+            () -> writableFrameChannel2
+        ),
+        partitionToChannelMap
+    );
+
+    composingWritableFrameChannel.write(new 
FrameWithPartition(Mockito.mock(Frame.class), 1));
+    composingWritableFrameChannel.write(new 
FrameWithPartition(Mockito.mock(Frame.class), 2));
+    composingWritableFrameChannel.write(new 
FrameWithPartition(Mockito.mock(Frame.class), 3));
+
+    // Assert the location of the channels where the frames have been written 
to
+    Assert.assertEquals(ImmutableSet.of(0), partitionToChannelMap.get(1));
+    Assert.assertEquals(ImmutableSet.of(0), partitionToChannelMap.get(2));
+    Assert.assertEquals(ImmutableSet.of(1), partitionToChannelMap.get(3));
+
+    // Test if the older channel has been converted to read only
+    Assert.assertThrows(ISE.class, outputChannel1::getWritableChannel);
+  }
+
+  static class LimitedWritableFrameChannel implements WritableFrameChannel
+  {
+    private final int maxFrames;
+    private int curFrame = 0;
+
+    public LimitedWritableFrameChannel(int maxFrames)
+    {
+      this.maxFrames = maxFrames;
+    }
+
+    @Override
+    public void write(FrameWithPartition frameWithPartition)
+    {
+      if (curFrame >= maxFrames) {
+        throw new ResourceLimitExceededException("Cannot write more frames to 
the channel");
+      }
+      ++curFrame;
+    }
+
+    @Override
+    public void write(Frame frame)
+    {
+    }
+
+    @Override
+    public void fail(@Nullable Throwable cause)
+    {
+
+    }
+
+    @Override
+    public void close()
+    {
+
+    }
+
+    @Override
+    public boolean isClosed()
+    {
+      return false;
+    }
+
+    @Override
+    public ListenableFuture<?> writabilityFuture()
+    {
+      return null;
+    }
+  }
+}
diff --git 
a/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelTest.java
 
b/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelTest.java
index ac0b03b286..6e5b904ab3 100644
--- 
a/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelTest.java
+++ 
b/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelTest.java
@@ -44,14 +44,16 @@ public class OutputChannelTest
     final IllegalStateException e1 = 
Assert.assertThrows(IllegalStateException.class, channel::getWritableChannel);
     MatcherAssert.assertThat(
         e1,
-        ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("Writable 
channel is not available"))
+        ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo(
+        "Writable channel is not available. The output channel might be marked 
as read-only, hence no writes are allowed."))
     );
 
     // No writable channel: cannot call getFrameMemoryAllocator.
     final IllegalStateException e2 = 
Assert.assertThrows(IllegalStateException.class, 
channel::getFrameMemoryAllocator);
     MatcherAssert.assertThat(
         e2,
-        ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("Writable 
channel is not available"))
+        ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo(
+            "Frame allocator is not available. The output channel might be 
marked as read-only, hence memory allocator is not required."))
     );
 
     // Mapping the writable channel of a nil channel has no effect, because 
there is no writable channel.
diff --git 
a/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelsTest.java
 
b/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelsTest.java
index dbef92b047..a22c7e92bf 100644
--- 
a/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelsTest.java
+++ 
b/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelsTest.java
@@ -86,7 +86,8 @@ public class OutputChannelsTest
 
     MatcherAssert.assertThat(
         e,
-        ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("Writable 
channel is not available"))
+        ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo(
+            "Writable channel is not available. The output channel might be 
marked as read-only, hence no writes are allowed."))
     );
 
     final IllegalStateException e2 = Assert.assertThrows(
@@ -96,7 +97,8 @@ public class OutputChannelsTest
 
     MatcherAssert.assertThat(
         e2,
-        ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("Writable 
channel is not available"))
+        ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo(
+            "Frame allocator is not available. The output channel might be 
marked as read-only, hence memory allocator is not required."))
     );
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to