cryptoe commented on code in PR #13368:
URL: https://github.com/apache/druid/pull/13368#discussion_r1071054906


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java:
##########
@@ -88,14 +94,30 @@ public static String getMSQMode(final QueryContext 
queryContext)
     );
   }
 
-  public static boolean isDurableStorageEnabled(final QueryContext 
queryContext)
+  public static boolean isDurableShuffleStorageEnabled(final QueryContext 
queryContext)
   {
     return queryContext.getBoolean(
         CTX_ENABLE_DURABLE_SHUFFLE_STORAGE,
         DEFAULT_ENABLE_DURABLE_SHUFFLE_STORAGE
     );
   }
 
+  public static boolean isComposedIntermediateSuperSorterStorageEnabled(final 
QueryContext queryContext)
+  {
+    return queryContext.getBoolean(
+        CTX_COMPOSED_INTERMEDIATE_SUPER_SORTER_STORAGE,
+        DEFAULT_COMPOSED_INTERMEDIATE_SUPER_SORTER_STORAGE
+    );
+  }
+
+  public static long getIntermediateSuperSorterStorageMaxLocalBytes(final 
QueryContext queryContext)

Review Comment:
   If isComposedIntermediateSuperSorterStorageEnabled is enabled and 
getIntermediateSuperSorterStorageMaxLocalBytes is not set ie default value, 
what will happen?



##########
core/src/main/java/org/apache/druid/storage/local/LocalFileStorageConnector.java:
##########
@@ -55,16 +60,32 @@ public boolean pathExists(String path)
     return fileWithBasePath(path).exists();
   }
 
-  /**
-   * Reads the file present as basePath + path. Will throw an IO exception in 
case the file is not present.
-   * Closing of the stream is the responsibility of the caller.
-   */
   @Override
   public InputStream read(String path) throws IOException
   {
     return Files.newInputStream(fileWithBasePath(path).toPath());
   }
 
+  @Override
+  public InputStream readRange(String path, long from, long size) throws 
IOException

Review Comment:
   nit: from-> start seems better. Feel free to ignore this as well. 



##########
processing/src/main/java/org/apache/druid/frame/channel/ByteTracker.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.channel;
+
+import com.google.common.base.Preconditions;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.druid.query.ResourceLimitExceededException;
+
+/**
+ * Tracks the byte usage with an upper bound bytes limit. Reservaction of 
bytes beyond limit throws
+ * {@link ResourceLimitExceededException}.
+ */
+public class ByteTracker
+{
+  private final long maxBytes;
+
+  @GuardedBy("this")
+  private long currentBytes;
+
+  public ByteTracker(long maxBytes)
+  {
+    this.maxBytes = maxBytes;
+  }
+
+  public synchronized void reserve(long byteCount) throws 
ResourceLimitExceededException
+  {
+    Preconditions.checkState(byteCount >= 0, "Can't reserve negative bytes");

Review Comment:
   This check need not happen in the lock



##########
processing/src/main/java/org/apache/druid/frame/channel/ByteTracker.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.channel;
+
+import com.google.common.base.Preconditions;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.druid.query.ResourceLimitExceededException;
+
+/**
+ * Tracks the byte usage with an upper bound bytes limit. Reservaction of 
bytes beyond limit throws
+ * {@link ResourceLimitExceededException}.
+ */
+public class ByteTracker
+{
+  private final long maxBytes;
+
+  @GuardedBy("this")
+  private long currentBytes;
+
+  public ByteTracker(long maxBytes)
+  {
+    this.maxBytes = maxBytes;
+  }
+
+  public synchronized void reserve(long byteCount) throws 
ResourceLimitExceededException
+  {
+    Preconditions.checkState(byteCount >= 0, "Can't reserve negative bytes");
+    if (currentBytes + byteCount > maxBytes) {
+      throw new ResourceLimitExceededException("");
+    }
+    currentBytes += byteCount;
+  }
+
+  public synchronized void release(long byteCount)
+  {
+    Preconditions.checkState(byteCount >= 0, "Can't release negative bytes");

Review Comment:
   Similar comment. Can be checked outside the lock



##########
processing/src/main/java/org/apache/druid/frame/channel/ComposingReadableFrameChannel.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.channel;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.frame.Frame;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+
+/**
+ * A composed readable channel to read frames. The channel can encapsulate 
multiple readable channels in it and
+ * automatically switches to next channels once the currently read channel is 
finished.
+ */
+public class ComposingReadableFrameChannel implements ReadableFrameChannel
+{
+  private final List<Supplier<ReadableFrameChannel>> channels;
+  private ReadableFrameChannel currentChannel;
+  private int currentIndex;
+
+  public ComposingReadableFrameChannel(
+      int partition,
+      List<Supplier<ReadableFrameChannel>> channels,
+      Map<Integer, HashSet<Integer>> partitionToChannelMap

Review Comment:
   Should this just take a hashSet ?



##########
processing/src/main/java/org/apache/druid/frame/channel/ComposingReadableFrameChannel.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.channel;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.frame.Frame;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+
+/**
+ * A composed readable channel to read frames. The channel can encapsulate 
multiple readable channels in it and
+ * automatically switches to next channels once the currently read channel is 
finished.
+ */
+public class ComposingReadableFrameChannel implements ReadableFrameChannel

Review Comment:
   Please mention that this is not threadsafe as well.



##########
processing/src/main/java/org/apache/druid/frame/channel/ComposingWritableFrameChannel.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.channel;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.query.ResourceLimitExceededException;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+
+/**
+ * A composed writable channel to write frames. The channel can encapsulate 
multiple writable channels in it and
+ * automatically switches to next channels once the current write channel 
cannot allow more writes.
+ */
+public class ComposingWritableFrameChannel implements WritableFrameChannel
+{
+  private final List<Supplier<WritableFrameChannel>> channels;
+  private final Map<Integer, HashSet<Integer>> partitionToChannelMap;
+  private int currentIndex;
+
+  public ComposingWritableFrameChannel(
+      List<Supplier<WritableFrameChannel>> channels,
+      Map<Integer, HashSet<Integer>> partitionToChannelMap
+  )
+  {
+    this.channels = Preconditions.checkNotNull(channels, "channels is null");
+    this.partitionToChannelMap =
+        Preconditions.checkNotNull(partitionToChannelMap, 
"partitionToChannelMap is null");
+    this.currentIndex = 0;
+  }
+
+  @Override
+  public void write(FrameWithPartition frameWithPartition) throws IOException
+  {
+    if (currentIndex >= channels.size()) {
+      throw new ISE("No more channels available to write. Total available 
channels : " + channels.size());
+    }
+
+    try {
+      channels.get(currentIndex).get().write(frameWithPartition);

Review Comment:
   We are relying on an exception as a signal to try the next Writer. 
   So should we enfore a "sorterBySize" ordering in the channel Supplier?
   
   
   IMHO, can we respond with a boolean if the write fails instead of relying on 
an exception?



##########
processing/src/main/java/org/apache/druid/frame/channel/ComposingReadableFrameChannel.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.channel;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.frame.Frame;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+
+/**
+ * A composed readable channel to read frames. The channel can encapsulate 
multiple readable channels in it and
+ * automatically switches to next channels once the currently read channel is 
finished.
+ */
+public class ComposingReadableFrameChannel implements ReadableFrameChannel
+{
+  private final List<Supplier<ReadableFrameChannel>> channels;
+  private ReadableFrameChannel currentChannel;
+  private int currentIndex;
+
+  public ComposingReadableFrameChannel(
+      int partition,
+      List<Supplier<ReadableFrameChannel>> channels,
+      Map<Integer, HashSet<Integer>> partitionToChannelMap
+  )
+  {
+    Preconditions.checkNotNull(channels, "channels is null");
+    Preconditions.checkNotNull(partitionToChannelMap, "partitionToChannelMap 
is null");
+    if (partitionToChannelMap.get(partition) == null) {
+      // no writes for the partition, send an empty readable channel
+      this.channels = ImmutableList.of(() -> ReadableNilFrameChannel.INSTANCE);
+    } else {
+      HashSet<Integer> validChannels = partitionToChannelMap.get(partition);
+      Preconditions.checkState(validChannels.size() > 0, "No channels found 
for partition " + partition);
+      ImmutableList.Builder<Supplier<ReadableFrameChannel>> 
validChannelsBuilder = ImmutableList.builder();
+      ArrayList<Integer> sortedChannelIds = new ArrayList<>(validChannels);
+      Collections.sort(sortedChannelIds); // the data was written from lowest 
to highest channel
+      for (Integer channelId : sortedChannelIds) {
+        validChannelsBuilder.add(channels.get(channelId));
+      }
+      this.channels = validChannelsBuilder.build();
+    }
+    this.currentIndex = 0;
+    this.currentChannel = null;
+  }
+
+  @Override
+  public boolean isFinished()
+  {
+    initCurrentChannel();
+    if (!currentChannel.isFinished()) {
+      return false;
+    }
+    currentChannel.close();
+    currentChannel = null;
+    if (isLastIndex()) {
+      return true;
+    }
+    ++currentIndex;
+    return isFinished();
+  }
+
+  @Override
+  public boolean canRead()
+  {
+    initCurrentChannel();
+    if (currentChannel.canRead()) {
+      return true;
+    }
+    if (currentChannel.isFinished()) {
+      currentChannel.close();
+      currentChannel = null;
+      if (isLastIndex()) {
+        return false;
+      }
+      ++currentIndex;
+      return canRead();
+    }
+    return false;
+  }
+
+  @Override
+  public Frame read()
+  {
+    return currentChannel.read();
+  }
+
+  @Override
+  public ListenableFuture<?> readabilityFuture()
+  {
+    initCurrentChannel();
+    if (!currentChannel.isFinished()) {
+      return currentChannel.readabilityFuture();
+    }
+    currentChannel.close();
+    currentChannel = null;
+    if (isLastIndex()) {
+      return Futures.immediateFuture(true);
+    }
+    ++currentIndex;
+    return readabilityFuture();
+  }
+
+  @Override
+  public void close()
+  {
+    if (currentChannel != null) {

Review Comment:
   Shouldnt we close all the "channels here"
   `Releases any resources associated with this readable channel. After calling 
this, you should not call any other methods on the channel.`
      



##########
processing/src/main/java/org/apache/druid/frame/channel/ComposingWritableFrameChannel.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.channel;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.query.ResourceLimitExceededException;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+
+/**
+ * A composed writable channel to write frames. The channel can encapsulate 
multiple writable channels in it and
+ * automatically switches to next channels once the current write channel 
cannot allow more writes.
+ */
+public class ComposingWritableFrameChannel implements WritableFrameChannel
+{
+  private final List<Supplier<WritableFrameChannel>> channels;
+  private final Map<Integer, HashSet<Integer>> partitionToChannelMap;
+  private int currentIndex;
+
+  public ComposingWritableFrameChannel(
+      List<Supplier<WritableFrameChannel>> channels,
+      Map<Integer, HashSet<Integer>> partitionToChannelMap
+  )
+  {
+    this.channels = Preconditions.checkNotNull(channels, "channels is null");
+    this.partitionToChannelMap =
+        Preconditions.checkNotNull(partitionToChannelMap, 
"partitionToChannelMap is null");
+    this.currentIndex = 0;
+  }
+
+  @Override
+  public void write(FrameWithPartition frameWithPartition) throws IOException
+  {
+    if (currentIndex >= channels.size()) {
+      throw new ISE("No more channels available to write. Total available 
channels : " + channels.size());
+    }
+
+    try {
+      channels.get(currentIndex).get().write(frameWithPartition);
+      partitionToChannelMap.computeIfAbsent(frameWithPartition.partition(), k 
-> Sets.newHashSetWithExpectedSize(1))
+                           .add(currentIndex);
+    }
+    catch (ResourceLimitExceededException rlee) {
+      channels.get(currentIndex).get().close();
+      currentIndex++;
+      if (currentIndex >= channels.size()) {
+        throw rlee;
+      }
+      write(frameWithPartition);
+    }
+  }
+
+  @Override
+  public void fail(@Nullable Throwable cause) throws IOException
+  {
+    for (Supplier<WritableFrameChannel> channel : channels) {
+      channel.get().fail(cause);
+    }
+  }
+
+  @Override
+  public void close() throws IOException
+  {
+    if (currentIndex < channels.size()) {
+      channels.get(currentIndex).get().close();

Review Comment:
   should we close all channels ?



##########
processing/src/main/java/org/apache/druid/frame/channel/ByteTracker.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.channel;
+
+import com.google.common.base.Preconditions;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.druid.query.ResourceLimitExceededException;
+
+/**
+ * Tracks the byte usage with an upper bound bytes limit. Reservaction of 
bytes beyond limit throws

Review Comment:
   Nit spelling: Reservaction



##########
processing/src/main/java/org/apache/druid/frame/channel/ComposingReadableFrameChannel.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.channel;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.frame.Frame;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+
+/**
+ * A composed readable channel to read frames. The channel can encapsulate 
multiple readable channels in it and
+ * automatically switches to next channels once the currently read channel is 
finished.
+ */
+public class ComposingReadableFrameChannel implements ReadableFrameChannel
+{
+  private final List<Supplier<ReadableFrameChannel>> channels;
+  private ReadableFrameChannel currentChannel;
+  private int currentIndex;
+
+  public ComposingReadableFrameChannel(
+      int partition,
+      List<Supplier<ReadableFrameChannel>> channels,
+      Map<Integer, HashSet<Integer>> partitionToChannelMap
+  )
+  {
+    Preconditions.checkNotNull(channels, "channels is null");
+    Preconditions.checkNotNull(partitionToChannelMap, "partitionToChannelMap 
is null");
+    if (partitionToChannelMap.get(partition) == null) {
+      // no writes for the partition, send an empty readable channel
+      this.channels = ImmutableList.of(() -> ReadableNilFrameChannel.INSTANCE);
+    } else {
+      HashSet<Integer> validChannels = partitionToChannelMap.get(partition);
+      Preconditions.checkState(validChannels.size() > 0, "No channels found 
for partition " + partition);
+      ImmutableList.Builder<Supplier<ReadableFrameChannel>> 
validChannelsBuilder = ImmutableList.builder();
+      ArrayList<Integer> sortedChannelIds = new ArrayList<>(validChannels);
+      Collections.sort(sortedChannelIds); // the data was written from lowest 
to highest channel
+      for (Integer channelId : sortedChannelIds) {
+        validChannelsBuilder.add(channels.get(channelId));
+      }
+      this.channels = validChannelsBuilder.build();
+    }
+    this.currentIndex = 0;

Review Comment:
   The logic breaks in case of an empty Set 



##########
processing/src/main/java/org/apache/druid/frame/channel/ComposingReadableFrameChannel.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.channel;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.frame.Frame;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+
+/**
+ * A composed readable channel to read frames. The channel can encapsulate 
multiple readable channels in it and
+ * automatically switches to next channels once the currently read channel is 
finished.
+ */
+public class ComposingReadableFrameChannel implements ReadableFrameChannel
+{
+  private final List<Supplier<ReadableFrameChannel>> channels;
+  private ReadableFrameChannel currentChannel;
+  private int currentIndex;
+
+  public ComposingReadableFrameChannel(
+      int partition,
+      List<Supplier<ReadableFrameChannel>> channels,
+      Map<Integer, HashSet<Integer>> partitionToChannelMap
+  )
+  {
+    Preconditions.checkNotNull(channels, "channels is null");
+    Preconditions.checkNotNull(partitionToChannelMap, "partitionToChannelMap 
is null");
+    if (partitionToChannelMap.get(partition) == null) {
+      // no writes for the partition, send an empty readable channel
+      this.channels = ImmutableList.of(() -> ReadableNilFrameChannel.INSTANCE);
+    } else {
+      HashSet<Integer> validChannels = partitionToChannelMap.get(partition);
+      Preconditions.checkState(validChannels.size() > 0, "No channels found 
for partition " + partition);
+      ImmutableList.Builder<Supplier<ReadableFrameChannel>> 
validChannelsBuilder = ImmutableList.builder();
+      ArrayList<Integer> sortedChannelIds = new ArrayList<>(validChannels);
+      Collections.sort(sortedChannelIds); // the data was written from lowest 
to highest channel
+      for (Integer channelId : sortedChannelIds) {
+        validChannelsBuilder.add(channels.get(channelId));
+      }
+      this.channels = validChannelsBuilder.build();
+    }
+    this.currentIndex = 0;
+    this.currentChannel = null;
+  }
+
+  @Override
+  public boolean isFinished()
+  {
+    initCurrentChannel();

Review Comment:
   Should we make this class threadSafe ?



##########
processing/src/main/java/org/apache/druid/frame/channel/ByteTracker.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.channel;
+
+import com.google.common.base.Preconditions;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.druid.query.ResourceLimitExceededException;
+
+/**
+ * Tracks the byte usage with an upper bound bytes limit. Reservaction of 
bytes beyond limit throws
+ * {@link ResourceLimitExceededException}.
+ */
+public class ByteTracker
+{
+  private final long maxBytes;
+
+  @GuardedBy("this")
+  private long currentBytes;
+
+  public ByteTracker(long maxBytes)
+  {
+    this.maxBytes = maxBytes;
+  }
+
+  public synchronized void reserve(long byteCount) throws 
ResourceLimitExceededException
+  {
+    Preconditions.checkState(byteCount >= 0, "Can't reserve negative bytes");
+    if (currentBytes + byteCount > maxBytes) {
+      throw new ResourceLimitExceededException("");

Review Comment:
   I think there is a todo here. 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -562,13 +563,30 @@ private QueryDefinition initializeQueryDefAndState(final 
Closer closer)
                                    
.orElse(MSQWarnings.DEFAULT_MAX_PARSE_EXCEPTIONS_ALLOWED);
     }
 
-
+    ImmutableMap.Builder<String, Object> taskContextOverridesBuilder = 
ImmutableMap.builder();
+    taskContextOverridesBuilder
+        .put(
+            MultiStageQueryContext.CTX_ENABLE_DURABLE_SHUFFLE_STORAGE,
+            isDurableStageStorageEnabled
+        ).put(
+            
MultiStageQueryContext.CTX_COMPOSED_INTERMEDIATE_SUPER_SORTER_STORAGE,
+            
MultiStageQueryContext.isComposedIntermediateSuperSorterStorageEnabled(
+                task.getQuerySpec().getQuery().context()
+            )
+        ).put(

Review Comment:
   If the `CTX_COMPOSED_INTERMEDIATE_SUPER_SORTER_STORAGE` only then 
`CTX_INTERMEDIATE_SUPER_SORTER_STORAGE_MAX_LOCAL_BYTES` should be set no ? 
    



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to