cryptoe commented on code in PR #13368:
URL: https://github.com/apache/druid/pull/13368#discussion_r1025178890


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java:
##########
@@ -183,7 +184,10 @@ public WorkerImpl(MSQWorkerTask task, WorkerContext 
context)
     this.context = context;
     this.selfDruidNode = context.selfNode();
     this.processorBouncer = context.processorBouncer();
-    this.durableStageStorageEnabled = 
MultiStageQueryContext.isDurableStorageEnabled(
+    this.durableStageStorageEnabled = 
MultiStageQueryContext.isDurableShuffleStorageEnabled(
+        QueryContext.of(task.getContext())
+    );
+    this.durableTaskIntermediateStorageEnabled = 
MultiStageQueryContext.isDurableTaskIntermediateStorageEnabled(

Review Comment:
   I think we do not need this parameter at all. Eventually, we would use 
tiered spilling so I think we should be okay if we are spilling everything to 
remote storage.  
   



##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java:
##########
@@ -89,6 +90,43 @@ public InputStream open(GetObjectRequest object, long offset)
     );
   }
 
+  @Override
+  public InputStream readRange(String path, long from, long size) throws 
IOException
+  {
+    if (from < 0 || size < 0) {

Review Comment:
   Can the   `public InputStream read(String path) throws IOException` and this 
method use the same common base method? 
   Maybe pass the method the `GetObjectRequest` , that way there is less 
duplication. 
   



##########
processing/src/main/java/org/apache/druid/frame/util/DurableStorageUtils.java:
##########
@@ -102,4 +102,20 @@ public static String 
getPartitionOutputsFileNameForPartition(
         partitionNumber
     );
   }
+
+

Review Comment:
   Nit: Lets java doc this method. 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/CountingOutputChannelFactory.java:
##########
@@ -50,7 +51,22 @@ public OutputChannel openChannel(int partitionNumber) throws 
IOException
             new CountingWritableFrameChannel(
                 baseChannel.getWritableChannel(),
                 channelCounters,
-                baseChannel.getPartitionNumber()
+                partitionNumber
+            )
+    );
+  }
+
+  @Override
+  public PartitionedOutputChannel openPartitionedChannel(String name, boolean 
deleteAfterRead) throws IOException
+  {
+    final PartitionedOutputChannel baseChannel = 
baseFactory.openPartitionedChannel(name, deleteAfterRead);
+
+    return baseChannel.mapWritableChannel(
+        baseWritableChannel ->
+            new CountingWritableFrameChannel(
+                baseChannel.getWritableChannel(),
+                channelCounters,
+                null

Review Comment:
   why is empty partition Number passed here ?



##########
processing/src/main/java/org/apache/druid/frame/processor/PartitionedOutputChannel.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.processor;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Suppliers;
+import org.apache.druid.frame.allocation.MemoryAllocator;
+import org.apache.druid.frame.channel.PartitionedReadableFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.java.util.common.ISE;
+
+import javax.annotation.Nullable;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+public class PartitionedOutputChannel
+{
+  @Nullable
+  private final WritableFrameChannel writableChannel;
+  @Nullable
+  private final MemoryAllocator frameMemoryAllocator;
+  private final Supplier<PartitionedReadableFrameChannel> 
readableChannelSupplier;
+
+  private PartitionedOutputChannel(
+      @Nullable final WritableFrameChannel writableChannel,

Review Comment:
   can the nullable be removed since you are doing a precheck in pair method ?



##########
processing/src/main/java/org/apache/druid/frame/processor/OutputChannelFactory.java:
##########
@@ -31,6 +31,11 @@
    */
   OutputChannel openChannel(int partitionNumber) throws IOException;
 
+  /**
+   * Create a channel pair tagged with a particular name and a flag to delete 
the channel data after its read.
+   */
+  PartitionedOutputChannel openPartitionedChannel(String name, boolean 
deleteAfterRead) throws IOException;

Review Comment:
   I have a similar comment like @gianm . I think the two interface calls looks 
very similar.  
   Couple of questions:
   1.  Does partitionName have to be a string and not an int or another way is 
why not work with partition numbers only ?
   2. From the javadoc, the pair seems to be of a OutputChannel and the 
partition name and if the channel data needs to be deleted on exit.  My 
question is the called would be interested in the output channel for that 
"partition" and clean up. So why do we need another API for this. We could have 
a cleaning output channel factory impl which could do the cleaning for us no ?
   



##########
.travis.yml:
##########
@@ -200,7 +200,7 @@ jobs:
           ${MAVEN_SKIP} -Dremoteresources.skip=true 
-Ddruid.generic.useDefaultValueForNull=${DRUID_USE_DEFAULT_VALUE_FOR_NULL}
         - sh -c "dmesg | egrep -i '(oom|out of memory|kill process|killed).*' 
-C 1 || exit 0"
         - free -m
-        - ${MVN} -pl ${MAVEN_PROJECTS} jacoco:report
+        - travis_wait 15 ${MVN} -pl ${MAVEN_PROJECTS} jacoco:report

Review Comment:
   I think this change would be orthogonal to the PR no?



##########
processing/src/main/java/org/apache/druid/frame/processor/PartitionedOutputChannel.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.processor;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Suppliers;
+import org.apache.druid.frame.allocation.MemoryAllocator;
+import org.apache.druid.frame.channel.PartitionedReadableFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.java.util.common.ISE;
+
+import javax.annotation.Nullable;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+public class PartitionedOutputChannel

Review Comment:
   class level docs would be very helpful here. 
   Outline could be:
   1. What does this class do.
   2. Where is it used. 
    



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to