adarshsanjeev commented on code in PR #15399:
URL: https://github.com/apache/druid/pull/15399#discussion_r1488901472


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java:
##########
@@ -100,23 +105,38 @@ public GroupByPreShuffleFrameProcessor(
   }
 
   @Override
-  protected ReturnOrAwait<Unit> runWithLoadedSegment(SegmentWithDescriptor 
segment) throws IOException
+  protected ReturnOrAwait<SegmentsInputSlice> 
runWithDataServerQuery(DataServerQueryHandler dataServerQueryHandler) throws 
IOException
   {
-    if (resultYielder == null) {
-      Pair<LoadedSegmentDataProvider.DataServerQueryStatus, 
Yielder<ResultRow>> statusSequencePair =
-          
segment.fetchRowsFromDataServer(groupingEngine.prepareGroupByQuery(query), 
Function.identity(), closer);
-      if 
(LoadedSegmentDataProvider.DataServerQueryStatus.HANDOFF.equals(statusSequencePair.lhs))
 {
-        log.info("Segment[%s] was handed off, falling back to fetching the 
segment from deep storage.",
-                 segment.getDescriptor());
-        return runWithSegment(segment);
+    if (resultYielder == null || resultYielder.isDone()) {
+      if (yielderYielder == null) {

Review Comment:
   Changed



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpecSlicer.java:
##########
@@ -126,28 +161,67 @@ private Set<DataSegmentWithInterval> 
getPrunedSegmentSet(final TableInputSpec ta
 
   private static List<InputSlice> makeSlices(
       final TableInputSpec tableInputSpec,
-      final List<List<DataSegmentWithInterval>> assignments
+      final List<List<WeightedInputInstance>> assignments
   )
   {
     final List<InputSlice> retVal = new ArrayList<>(assignments.size());
 
-    for (final List<DataSegmentWithInterval> assignment : assignments) {
+    for (final List<WeightedInputInstance> assignment : assignments) {
+
       final List<RichSegmentDescriptor> descriptors = new ArrayList<>();
-      for (final DataSegmentWithInterval dataSegmentWithInterval : assignment) 
{
-        descriptors.add(dataSegmentWithInterval.toRichSegmentDescriptor());
+      final List<DataServerRequestDescriptor> dataServerRequests = new 
ArrayList<>();
+
+      for (final WeightedInputInstance weightedSegment : assignment) {
+        if (weightedSegment instanceof DataSegmentWithInterval) {
+          DataSegmentWithInterval dataSegmentWithInterval = 
(DataSegmentWithInterval) weightedSegment;
+          descriptors.add(dataSegmentWithInterval.toRichSegmentDescriptor());
+        } else {
+          DataServerRequest serverRequest = (DataServerRequest) 
weightedSegment;
+          
dataServerRequests.add(serverRequest.toDataServerRequestDescriptor());
+        }
       }
 
-      if (descriptors.isEmpty()) {
+      if (descriptors.isEmpty() && dataServerRequests.isEmpty()) {
         retVal.add(NilInputSlice.INSTANCE);
       } else {
-        retVal.add(new SegmentsInputSlice(tableInputSpec.getDataSource(), 
descriptors));
+        retVal.add(new SegmentsInputSlice(tableInputSpec.getDataSource(), 
descriptors, dataServerRequests));
       }
     }
 
     return retVal;
   }
 
-  private static class DataSegmentWithInterval
+  private static List<WeightedInputInstance> 
createWeightedSegmentSet(List<DataSegmentWithInterval> prunedServedSegments)

Review Comment:
   Added



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpecSlicer.java:
##########
@@ -169,9 +243,40 @@ public RichSegmentDescriptor toRichSegmentDescriptor()
           segment.getInterval(),
           interval,
           segment.getVersion(),
-          segment.getShardSpec().getPartitionNum(),
-          segment instanceof DataSegmentWithLocation ? 
((DataSegmentWithLocation) segment).getServers() : null
+          segment.getShardSpec().getPartitionNum()
       );
     }
+
+    @Override
+    public long getWeight()
+    {
+      return segment.getSize();
+    }
+  }
+
+  private static class DataServerRequest implements WeightedInputInstance
+  {
+    private static final long DATA_SERVER_WEIGHT_ESTIMATION = 5000L;
+    private final List<DataSegmentWithInterval> segments;
+    private final DruidServerMetadata serverMetadata;
+
+    public DataServerRequest(DruidServerMetadata serverMetadata, 
List<DataSegmentWithInterval> segments)
+    {
+      this.segments = Preconditions.checkNotNull(segments, "segments");
+      this.serverMetadata = Preconditions.checkNotNull(serverMetadata, 
"server");
+    }
+
+    @Override
+    public long getWeight()
+    {
+      return segments.size() * DATA_SERVER_WEIGHT_ESTIMATION;

Review Comment:
   Yes, added a comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to