cryptoe commented on code in PR #13205:
URL: https://github.com/apache/druid/pull/13205#discussion_r1004264481


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java:
##########
@@ -59,6 +59,7 @@
   private static final boolean DEFAULT_FINALIZE_AGGREGATIONS = true;
 
   public static final String CTX_ENABLE_DURABLE_SHUFFLE_STORAGE = 
"durableShuffleStorage";
+  public static final String CTX_FORCE_NON_SEQUENTIAL_MERGE = 
"forceNonSequentialMerging";

Review Comment:
   Some suggestions : 
   1. Let's document the context parameter in the PR description. 
   2. Let's add it in the SQL readme docs. 
   3. Should we call the parameter segment size estimator mode: and have 2 
values for it, serial, and parallel? In the future, we can have an auto mode 
that automatically switches the size estimator based on the inputs. 
   wdyt ?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Worker.java:
##########
@@ -105,4 +107,15 @@ boolean postResultPartitionBoundaries(
    * Called when the work required for the query has been finished
    */
   void postFinish();
+
+  /**
+   * Returns the statistics snapshot for the given stageId
+   */
+  ClusterByStatisticsSnapshot fetchStatisticsSnapshot(StageId stageId) throws 
ExecutionException, InterruptedException;

Review Comment:
   Nit: Lets rearrange the new methods to below postWorkOrder. 
   It's generally nicer to have the last method as a finish. 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Controller.java:
##########
@@ -81,9 +81,9 @@ public String getId()
   // Worker-to-controller messages
 
   /**
-   * Provide a {@link ClusterByStatisticsSnapshot} for shuffling stages.
+   * Accepts a {@link ClusterByStatisticsWorkerReport} for generating fetching 
sketches.

Review Comment:
   We might want to add some details to the javadocs of this method. A good way 
to write this would be 
   1. What does this api do ?
   2. Who uses this api?  



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Controller.java:
##########
@@ -81,9 +81,9 @@ public String getId()
   // Worker-to-controller messages
 
   /**
-   * Provide a {@link ClusterByStatisticsSnapshot} for shuffling stages.
+   * Accepts a {@link ClusterByStatisticsWorkerReport} for generating fetching 
sketches.
    */
-  void updateStatus(int stageNumber, int workerNumber, Object 
keyStatisticsObject);
+  void updateWorkerReportStatus(int stageNumber, int workerNumber, Object 
workerReport);

Review Comment:
   ```suggestion
     void updateWorkerReportStatus(int stageNumber, int workerNumber, Object 
workerStatisticsReport);
   ```



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerClient.java:
##########
@@ -34,13 +34,13 @@
 public interface ControllerClient extends AutoCloseable
 {
   /**
-   * Client side method to update the controller with key statistics for a 
particular stage and worker.
-   * Controller's implementation collates all the key statistics for a stage 
to generate the partition boundaries.
+   * Client side method to update the controller with worker reports for a 
particular stage and worker.

Review Comment:
   We already have a worker report. Its a bit confusing as to which report are 
you referring to. 
   Suggest changing the name to workerAggregatedKeyStatistics



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.exec;
+
+import org.apache.druid.frame.key.ClusterBy;
+import org.apache.druid.frame.key.ClusterByPartition;
+import org.apache.druid.frame.key.ClusterByPartitions;
+import org.apache.druid.java.util.common.Either;
+import org.apache.druid.msq.kernel.StageDefinition;
+import org.apache.druid.msq.statistics.ClusterByStatisticsCollector;
+import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;
+import org.apache.druid.msq.statistics.ClusterByStatisticsWorkerReport;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Queues up fetching sketches from workers and progressively generates 
partitions boundaries.
+ */
+public class WorkerSketchFetcher
+{
+  private static final int DEFAULT_THREAD_COUNT = 10;
+  private static final long BYTES_THRESHOLD = 1_000_000_000L;
+  private static final long WORKER_THRESHOLD = 100;
+
+  private final boolean forceNonSequentialMerging;
+  private final WorkerClient workerClient;
+  private final ExecutorService executorService;
+
+  public WorkerSketchFetcher(WorkerClient workerClient, boolean 
forceNonSequentialMerging)
+  {
+    this.workerClient = workerClient;
+    this.forceNonSequentialMerging = forceNonSequentialMerging;
+    this.executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_COUNT);
+  }
+
+  /**
+   * Submits a request to fetch and generate partitions for the given worker 
report and returns a future for it. It
+   * decides based on the report if it should fetch sketches one by one or 
together.
+   */
+  public CompletableFuture<Either<Long, ClusterByPartitions>> 
submitFetcherTask(
+      ClusterByStatisticsWorkerReport workerReport,
+      List<String> workerTaskIds,
+      StageDefinition stageDefinition
+  )
+  {
+    ClusterBy clusterBy = stageDefinition.getClusterBy();
+
+    if (forceNonSequentialMerging || clusterBy.getBucketByCount() == 0) {
+      return inMemoryFullSketchMerging(stageDefinition, workerTaskIds);
+    } else if (stageDefinition.getMaxWorkerCount() > WORKER_THRESHOLD || 
workerReport.getBytesRetained() > BYTES_THRESHOLD) {
+      return inMemoryFullSketchMerging(stageDefinition, workerTaskIds);

Review Comment:
   shouldn't this be `sequentialTimeChunkMerging` ? since we have a lot of 
workers or bytes?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/ClusterByStatisticsCollectorImpl.java:
##########
@@ -315,13 +316,18 @@ public ClusterByStatisticsSnapshot snapshot()
   {
     assertRetainedByteCountsAreTrackedCorrectly();
 
-    final List<ClusterByStatisticsSnapshot.Bucket> bucketSnapshots = new 
ArrayList<>();
+    final Map<Long, ClusterByStatisticsSnapshot.Bucket> bucketSnapshots = new 
HashMap<>();
+    final RowKeyReader trimmedRowReader = 
keyReader.trimmedKeyReader(clusterBy.getBucketByCount());
 
     for (final Map.Entry<RowKey, BucketHolder> bucketEntry : 
buckets.entrySet()) {
       //noinspection rawtypes, unchecked
       final KeyCollectorSnapshot keyCollectorSnapshot =
           ((KeyCollectorFactory) 
keyCollectorFactory).toSnapshot(bucketEntry.getValue().keyCollector);
-      bucketSnapshots.add(new 
ClusterByStatisticsSnapshot.Bucket(bucketEntry.getKey(), keyCollectorSnapshot));
+      Long bucketKey = Long.MIN_VALUE;
+      if (clusterBy.getBucketByCount() == 1) {

Review Comment:
   Nit: It would help if you could provide a comment here as to why this is 
required.



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/ClusterByStatisticsWorkerReport.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.statistics;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.SortedMap;
+
+/**
+ * Class sent by worker to controller after reading input to generate 
partition boundries.
+ */
+public class ClusterByStatisticsWorkerReport
+{
+  private final SortedMap<Long, Set<Integer>> timeSegmentVsWorkerIdMap;
+
+  private Boolean hasMultipleValues;

Review Comment:
   is there a reason this is not `boolean`



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/ClusterByStatisticsWorkerReport.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.statistics;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.SortedMap;
+
+/**
+ * Class sent by worker to controller after reading input to generate 
partition boundries.

Review Comment:
   If each worker is sending this class in the payload then why does it need to 
send a redundant worker ID ?
   {
   "2022-01-01":[1],
   "2022-01-02":[1]
   }
   
   Maybe I am missing something 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to