cryptoe commented on code in PR #15076:
URL: https://github.com/apache/druid/pull/15076#discussion_r1345906495


##########
docs/multi-stage-query/reference.md:
##########
@@ -246,6 +246,7 @@ The following table lists the context parameters for the 
MSQ task engine:
 | `durableShuffleStorage` | SELECT, INSERT, REPLACE <br /><br />Whether to use 
durable storage for shuffle mesh. To use this feature, configure the durable 
storage at the server level using 
`druid.msq.intermediate.storage.enable=true`). If these properties are not 
configured, any query with the context variable `durableShuffleStorage=true` 
fails with a configuration error. <br /><br />                                  
                                                                                
                                                                                
                                                                                
                                                                                
                                                 | `false` |
 | `faultTolerance` | SELECT, INSERT, REPLACE<br /><br /> Whether to turn on 
fault tolerance mode or not. Failed workers are retried based on 
[Limits](#limits). Cannot be used when `durableShuffleStorage` is explicitly 
set to false.                                                                   
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
      | `false` |
 | `selectDestination` | SELECT<br /><br /> Controls where the final result of 
the select query is written. <br />Use `taskReport`(the default) to write 
select results to the task report. <b> This is not scalable since task reports 
size explodes for large results </b> <br/>Use `durableStorage` to write results 
to durable storage location. <b>For large results sets, its recommended to use 
`durableStorage` </b>. To configure durable storage see 
[`this`](#durable-storage) section.                                             
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                             | `taskRep
 ort` |
+| `segmentLoadWait` | INSERT, REPLACE<br /><br /> Whether the controller 
should wait for segments to be loaded before exiting. If this is true, the 
controller queries the broker and waits till the segments created (if any) have 
been loaded by the load rules. The controller also provides this information in 
the live reports and task reports. If this is false, the controller exits 
immediately after finishing the query.                                          
                                                                                
                                                                                
                                                                                
                                                                                
| `false` |

Review Comment:
   segmentHandedOff ? How does this name sound ?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java:
##########
@@ -159,29 +159,18 @@ public void waitForSegmentsToLoad()
               return;
             }
 
-            Iterator<String> iterator = versionsToAwait.iterator();
-            log.info(
+            log.debug(

Review Comment:
   Can we log.info this every minute ?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java:
##########
@@ -159,29 +159,18 @@ public void waitForSegmentsToLoad()
               return;
             }
 
-            Iterator<String> iterator = versionsToAwait.iterator();
-            log.info(
+            log.debug(

Review Comment:
   And update the log message to say that even if the task is cancelled, the 
segments will continue to load. 



##########
docs/multi-stage-query/reference.md:
##########
@@ -246,6 +246,7 @@ The following table lists the context parameters for the 
MSQ task engine:
 | `durableShuffleStorage` | SELECT, INSERT, REPLACE <br /><br />Whether to use 
durable storage for shuffle mesh. To use this feature, configure the durable 
storage at the server level using 
`druid.msq.intermediate.storage.enable=true`). If these properties are not 
configured, any query with the context variable `durableShuffleStorage=true` 
fails with a configuration error. <br /><br />                                  
                                                                                
                                                                                
                                                                                
                                                                                
                                                 | `false` |
 | `faultTolerance` | SELECT, INSERT, REPLACE<br /><br /> Whether to turn on 
fault tolerance mode or not. Failed workers are retried based on 
[Limits](#limits). Cannot be used when `durableShuffleStorage` is explicitly 
set to false.                                                                   
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
      | `false` |
 | `selectDestination` | SELECT<br /><br /> Controls where the final result of 
the select query is written. <br />Use `taskReport`(the default) to write 
select results to the task report. <b> This is not scalable since task reports 
size explodes for large results </b> <br/>Use `durableStorage` to write results 
to durable storage location. <b>For large results sets, its recommended to use 
`durableStorage` </b>. To configure durable storage see 
[`this`](#durable-storage) section.                                             
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                             | `taskRep
 ort` |
+| `segmentLoadWait` | INSERT, REPLACE<br /><br /> Whether the controller 
should wait for segments to be loaded before exiting. If this is true, the 
controller queries the broker and waits till the segments created (if any) have 
been loaded by the load rules. The controller also provides this information in 
the live reports and task reports. If this is false, the controller exits 
immediately after finishing the query.                                          
                                                                                
                                                                                
                                                                                
                                                                                
| `false` |

Review Comment:
   or waitTillSegmentsLoad



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java:
##########
@@ -216,43 +205,35 @@ private void waitIfNeeded(long waitTimeMillis) throws 
Exception
   }
 
   /**
-   * Updates the {@link #status} with the latest details based on {@link 
#versionToLoadStatusMap}
+   * Updates the {@link #status} with the latest details based on {@link 
#versionLoadStatusReference}
    */
   private void updateStatus(State state, DateTime startTime)
   {
-    int pendingSegmentCount = 0, usedSegmentsCount = 0, precachedSegmentCount 
= 0, onDemandSegmentCount = 0, unknownSegmentCount = 0;
-    for (Map.Entry<String, VersionLoadStatus> entry : 
versionToLoadStatusMap.entrySet()) {
-      usedSegmentsCount += entry.getValue().getUsedSegments();
-      precachedSegmentCount += entry.getValue().getPrecachedSegments();
-      onDemandSegmentCount += entry.getValue().getOnDemandSegments();
-      unknownSegmentCount += entry.getValue().getUnknownSegments();
-      pendingSegmentCount += entry.getValue().getPendingSegments();
-    }
-
     long runningMillis = new Interval(startTime, 
DateTimes.nowUtc()).toDurationMillis();
+    VersionLoadStatus versionLoadStatus = versionLoadStatusReference.get();
     status.set(
         new SegmentLoadWaiterStatus(
             state,
             startTime,
             runningMillis,
             totalSegmentsGenerated,
-            usedSegmentsCount,
-            precachedSegmentCount,
-            onDemandSegmentCount,
-            pendingSegmentCount,
-            unknownSegmentCount
+            versionLoadStatus.getUsedSegments(),
+            versionLoadStatus.getPrecachedSegments(),
+            versionLoadStatus.getOnDemandSegments(),
+            versionLoadStatus.getPendingSegments(),
+            versionLoadStatus.getUnknownSegments()
         )
     );
   }
 
   /**
-   * Uses {@link #brokerClient} to fetch latest load status for a given 
version. Converts the response into a
+   * Uses {@link #brokerClient} to fetch latest load status for a given set of 
versions. Converts the response into a
    * {@link VersionLoadStatus} and returns it.
    */
-  private VersionLoadStatus fetchLoadStatusForVersion(String version) throws 
Exception
+  private VersionLoadStatus fetchLoadStatusFromBroker() throws Exception
   {
     Request request = brokerClient.makeRequest(HttpMethod.POST, 
"/druid/v2/sql/");
-    SqlQuery sqlQuery = new SqlQuery(StringUtils.format(LOAD_QUERY, 
datasource, version),
+    SqlQuery sqlQuery = new SqlQuery(StringUtils.format(LOAD_QUERY, 
datasource, versionsInClauseString),

Review Comment:
   While running ut's I also saw
   ```
   
   2023-10-05T11:21:28,837 WARN 
[query-2cd60052-3011-47ae-87be-ec47100f649d-segment-load-waiter-0] 
org.apache.druid.msq.exec.SegmentLoadStatusFetcher - Exception occurred while 
waiting for segments to load. Exiting.
   java.lang.NullPointerException: null
        at 
org.apache.druid.msq.exec.SegmentLoadStatusFetcher.fetchLoadStatusForVersion(SegmentLoadStatusFetcher.java:262)
 ~[classes/:?]
        at 
org.apache.druid.msq.exec.SegmentLoadStatusFetcher.lambda$waitForSegmentsToLoad$0(SegmentLoadStatusFetcher.java:174)
 ~[classes/:?]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
        at 
com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:131)
 ~[guava-31.1-jre.jar:?]
        at 
com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:74)
 ~[guava-31.1-jre.jar:?]
        at 
com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:82)
 ~[guava-31.1-jre.jar:?]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
~[?:?]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
~[?:?]
        at java.lang.Thread.run(Thread.java:829) ~[?:?]
   2023-10-05T11:21:28,846 INFO [main] org.apache.druid.msq.exec.WorkerImpl - 
Stopping gracefully for taskId 
[query-2cd60052-3011-47ae-87be-ec47100f649d-worker0_0]
   2023-10-05T11:21:28,848 INFO [main] org.apache.druid.msq.test.MSQTestBase - 
found generated segments: DataSegment{binaryVersion=9, 
id=foo1_-146136543-09-08T08:23:32.096Z_146140482-04-24T15:36:27.903Z_test, 
loadSpec={type=>local, 
path=>/var/folders/sx/n0v16tnj6mvf6hd14l6j6dth0000gn/T/junit8066366800031570368/localsegments/foo1/-146136543-09-08T08:23:32.096Z_146140482-04-24T15:36:27.903Z/test/0/index/},
 dimensions=[dim_mv], metrics=[], shardSpec=NumberedShardSpec{partitionNum=0, 
partitions=100}, lastCompactionState=null, size=1037}
   2023-10-05T11:21:28,859 INFO [main] org.apache.druid.msq.test.MSQTestBase - 
Found spec: {
     "query" : {
     
   ```
   Lets fix it in this PR as well. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to