cryptoe commented on code in PR #13368:
URL: https://github.com/apache/druid/pull/13368#discussion_r1025178890
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java:
##########
@@ -183,7 +184,10 @@ public WorkerImpl(MSQWorkerTask task, WorkerContext
context)
this.context = context;
this.selfDruidNode = context.selfNode();
this.processorBouncer = context.processorBouncer();
- this.durableStageStorageEnabled =
MultiStageQueryContext.isDurableStorageEnabled(
+ this.durableStageStorageEnabled =
MultiStageQueryContext.isDurableShuffleStorageEnabled(
+ QueryContext.of(task.getContext())
+ );
+ this.durableTaskIntermediateStorageEnabled =
MultiStageQueryContext.isDurableTaskIntermediateStorageEnabled(
Review Comment:
I think we do not need this parameter at all. Eventually, we would use
tiered spilling so I think we should be okay if we are spilling everything to
remote storage.
##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java:
##########
@@ -89,6 +90,43 @@ public InputStream open(GetObjectRequest object, long offset)
);
}
+ @Override
+ public InputStream readRange(String path, long from, long size) throws
IOException
+ {
+ if (from < 0 || size < 0) {
Review Comment:
Can the `public InputStream read(String path) throws IOException` and this
method use the same common base method?
Maybe pass the method the `GetObjectRequest` , that way there is less
duplication.
##########
processing/src/main/java/org/apache/druid/frame/util/DurableStorageUtils.java:
##########
@@ -102,4 +102,20 @@ public static String
getPartitionOutputsFileNameForPartition(
partitionNumber
);
}
+
+
Review Comment:
Nit: Lets java doc this method.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/CountingOutputChannelFactory.java:
##########
@@ -50,7 +51,22 @@ public OutputChannel openChannel(int partitionNumber) throws
IOException
new CountingWritableFrameChannel(
baseChannel.getWritableChannel(),
channelCounters,
- baseChannel.getPartitionNumber()
+ partitionNumber
+ )
+ );
+ }
+
+ @Override
+ public PartitionedOutputChannel openPartitionedChannel(String name, boolean
deleteAfterRead) throws IOException
+ {
+ final PartitionedOutputChannel baseChannel =
baseFactory.openPartitionedChannel(name, deleteAfterRead);
+
+ return baseChannel.mapWritableChannel(
+ baseWritableChannel ->
+ new CountingWritableFrameChannel(
+ baseChannel.getWritableChannel(),
+ channelCounters,
+ null
Review Comment:
why is empty partition Number passed here ?
##########
processing/src/main/java/org/apache/druid/frame/processor/PartitionedOutputChannel.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.processor;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Suppliers;
+import org.apache.druid.frame.allocation.MemoryAllocator;
+import org.apache.druid.frame.channel.PartitionedReadableFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.java.util.common.ISE;
+
+import javax.annotation.Nullable;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+public class PartitionedOutputChannel
+{
+ @Nullable
+ private final WritableFrameChannel writableChannel;
+ @Nullable
+ private final MemoryAllocator frameMemoryAllocator;
+ private final Supplier<PartitionedReadableFrameChannel>
readableChannelSupplier;
+
+ private PartitionedOutputChannel(
+ @Nullable final WritableFrameChannel writableChannel,
Review Comment:
can the nullable be removed since you are doing a precheck in pair method ?
##########
processing/src/main/java/org/apache/druid/frame/processor/OutputChannelFactory.java:
##########
@@ -31,6 +31,11 @@
*/
OutputChannel openChannel(int partitionNumber) throws IOException;
+ /**
+ * Create a channel pair tagged with a particular name and a flag to delete
the channel data after its read.
+ */
+ PartitionedOutputChannel openPartitionedChannel(String name, boolean
deleteAfterRead) throws IOException;
Review Comment:
I have a similar comment like @gianm . I think the two interface calls looks
very similar.
Couple of questions:
1. Does partitionName have to be a string and not an int or another way is
why not work with partition numbers only ?
2. From the javadoc, the pair seems to be of a OutputChannel and the
partition name and if the channel data needs to be deleted on exit. My
question is the called would be interested in the output channel for that
"partition" and clean up. So why do we need another API for this. We could have
a cleaning output channel factory impl which could do the cleaning for us no ?
##########
.travis.yml:
##########
@@ -200,7 +200,7 @@ jobs:
${MAVEN_SKIP} -Dremoteresources.skip=true
-Ddruid.generic.useDefaultValueForNull=${DRUID_USE_DEFAULT_VALUE_FOR_NULL}
- sh -c "dmesg | egrep -i '(oom|out of memory|kill process|killed).*'
-C 1 || exit 0"
- free -m
- - ${MVN} -pl ${MAVEN_PROJECTS} jacoco:report
+ - travis_wait 15 ${MVN} -pl ${MAVEN_PROJECTS} jacoco:report
Review Comment:
I think this change would be orthogonal to the PR no?
##########
processing/src/main/java/org/apache/druid/frame/processor/PartitionedOutputChannel.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.processor;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Suppliers;
+import org.apache.druid.frame.allocation.MemoryAllocator;
+import org.apache.druid.frame.channel.PartitionedReadableFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.java.util.common.ISE;
+
+import javax.annotation.Nullable;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+public class PartitionedOutputChannel
Review Comment:
class level docs would be very helpful here.
Outline could be:
1. What does this class do.
2. Where is it used.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]