cryptoe commented on code in PR #13205:
URL: https://github.com/apache/druid/pull/13205#discussion_r1004264481
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java:
##########
@@ -59,6 +59,7 @@
private static final boolean DEFAULT_FINALIZE_AGGREGATIONS = true;
public static final String CTX_ENABLE_DURABLE_SHUFFLE_STORAGE =
"durableShuffleStorage";
+ public static final String CTX_FORCE_NON_SEQUENTIAL_MERGE =
"forceNonSequentialMerging";
Review Comment:
Some suggestions :
1. Let's document the context parameter in the PR description.
2. Let's add it in the SQL readme docs.
3. Should we call the parameter segment size estimator mode: and have 2
values for it, serial, and parallel? In the future, we can have an auto mode
that automatically switches the size estimator based on the inputs.
wdyt ?
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Worker.java:
##########
@@ -105,4 +107,15 @@ boolean postResultPartitionBoundaries(
* Called when the work required for the query has been finished
*/
void postFinish();
+
+ /**
+ * Returns the statistics snapshot for the given stageId
+ */
+ ClusterByStatisticsSnapshot fetchStatisticsSnapshot(StageId stageId) throws
ExecutionException, InterruptedException;
Review Comment:
Nit: Lets rearrange the new methods to below postWorkOrder.
It's generally nicer to have the last method as a finish.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Controller.java:
##########
@@ -81,9 +81,9 @@ public String getId()
// Worker-to-controller messages
/**
- * Provide a {@link ClusterByStatisticsSnapshot} for shuffling stages.
+ * Accepts a {@link ClusterByStatisticsWorkerReport} for generating fetching
sketches.
Review Comment:
We might want to add some details to the javadocs of this method. A good way
to write this would be
1. What does this api do ?
2. Who uses this api?
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Controller.java:
##########
@@ -81,9 +81,9 @@ public String getId()
// Worker-to-controller messages
/**
- * Provide a {@link ClusterByStatisticsSnapshot} for shuffling stages.
+ * Accepts a {@link ClusterByStatisticsWorkerReport} for generating fetching
sketches.
*/
- void updateStatus(int stageNumber, int workerNumber, Object
keyStatisticsObject);
+ void updateWorkerReportStatus(int stageNumber, int workerNumber, Object
workerReport);
Review Comment:
```suggestion
void updateWorkerReportStatus(int stageNumber, int workerNumber, Object
workerStatisticsReport);
```
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerClient.java:
##########
@@ -34,13 +34,13 @@
public interface ControllerClient extends AutoCloseable
{
/**
- * Client side method to update the controller with key statistics for a
particular stage and worker.
- * Controller's implementation collates all the key statistics for a stage
to generate the partition boundaries.
+ * Client side method to update the controller with worker reports for a
particular stage and worker.
Review Comment:
We already have a worker report. Its a bit confusing as to which report are
you referring to.
Suggest changing the name to workerAggregatedKeyStatistics
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.exec;
+
+import org.apache.druid.frame.key.ClusterBy;
+import org.apache.druid.frame.key.ClusterByPartition;
+import org.apache.druid.frame.key.ClusterByPartitions;
+import org.apache.druid.java.util.common.Either;
+import org.apache.druid.msq.kernel.StageDefinition;
+import org.apache.druid.msq.statistics.ClusterByStatisticsCollector;
+import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;
+import org.apache.druid.msq.statistics.ClusterByStatisticsWorkerReport;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Queues up fetching sketches from workers and progressively generates
partitions boundaries.
+ */
+public class WorkerSketchFetcher
+{
+ private static final int DEFAULT_THREAD_COUNT = 10;
+ private static final long BYTES_THRESHOLD = 1_000_000_000L;
+ private static final long WORKER_THRESHOLD = 100;
+
+ private final boolean forceNonSequentialMerging;
+ private final WorkerClient workerClient;
+ private final ExecutorService executorService;
+
+ public WorkerSketchFetcher(WorkerClient workerClient, boolean
forceNonSequentialMerging)
+ {
+ this.workerClient = workerClient;
+ this.forceNonSequentialMerging = forceNonSequentialMerging;
+ this.executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_COUNT);
+ }
+
+ /**
+ * Submits a request to fetch and generate partitions for the given worker
report and returns a future for it. It
+ * decides based on the report if it should fetch sketches one by one or
together.
+ */
+ public CompletableFuture<Either<Long, ClusterByPartitions>>
submitFetcherTask(
+ ClusterByStatisticsWorkerReport workerReport,
+ List<String> workerTaskIds,
+ StageDefinition stageDefinition
+ )
+ {
+ ClusterBy clusterBy = stageDefinition.getClusterBy();
+
+ if (forceNonSequentialMerging || clusterBy.getBucketByCount() == 0) {
+ return inMemoryFullSketchMerging(stageDefinition, workerTaskIds);
+ } else if (stageDefinition.getMaxWorkerCount() > WORKER_THRESHOLD ||
workerReport.getBytesRetained() > BYTES_THRESHOLD) {
+ return inMemoryFullSketchMerging(stageDefinition, workerTaskIds);
Review Comment:
shouldn't this be `sequentialTimeChunkMerging` ? since we have a lot of
workers or bytes?
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/ClusterByStatisticsCollectorImpl.java:
##########
@@ -315,13 +316,18 @@ public ClusterByStatisticsSnapshot snapshot()
{
assertRetainedByteCountsAreTrackedCorrectly();
- final List<ClusterByStatisticsSnapshot.Bucket> bucketSnapshots = new
ArrayList<>();
+ final Map<Long, ClusterByStatisticsSnapshot.Bucket> bucketSnapshots = new
HashMap<>();
+ final RowKeyReader trimmedRowReader =
keyReader.trimmedKeyReader(clusterBy.getBucketByCount());
for (final Map.Entry<RowKey, BucketHolder> bucketEntry :
buckets.entrySet()) {
//noinspection rawtypes, unchecked
final KeyCollectorSnapshot keyCollectorSnapshot =
((KeyCollectorFactory)
keyCollectorFactory).toSnapshot(bucketEntry.getValue().keyCollector);
- bucketSnapshots.add(new
ClusterByStatisticsSnapshot.Bucket(bucketEntry.getKey(), keyCollectorSnapshot));
+ Long bucketKey = Long.MIN_VALUE;
+ if (clusterBy.getBucketByCount() == 1) {
Review Comment:
Nit: It would help if you could provide a comment here as to why this is
required.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/ClusterByStatisticsWorkerReport.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.statistics;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.SortedMap;
+
+/**
+ * Class sent by worker to controller after reading input to generate
partition boundries.
+ */
+public class ClusterByStatisticsWorkerReport
+{
+ private final SortedMap<Long, Set<Integer>> timeSegmentVsWorkerIdMap;
+
+ private Boolean hasMultipleValues;
Review Comment:
is there a reason this is not `boolean`
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/ClusterByStatisticsWorkerReport.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.statistics;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.SortedMap;
+
+/**
+ * Class sent by worker to controller after reading input to generate
partition boundries.
Review Comment:
If each worker is sending this class in the payload then why does it need to
send a redundant worker ID ?
{
"2022-01-01":[1],
"2022-01-02":[1]
}
Maybe I am missing something
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]