This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e987e3d696 Add query context parameter for segment load wait (#15076)
7e987e3d696 is described below

commit 7e987e3d696b6f157e8197eea30c6444ac1a6d7e
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Thu Oct 5 18:26:34 2023 +0530

    Add query context parameter for segment load wait (#15076)
    
    Add segmentLoadWait as a query context parameter. If this is true, the 
controller queries the broker and waits till the segments created (if any) have 
been loaded by the load rules. The controller also provides this information in 
the live reports and task reports. If this is false, the controller exits 
immediately after finishing the query.
---
 docs/multi-stage-query/reference.md                |  1 +
 .../org/apache/druid/msq/exec/ControllerImpl.java  | 44 ++++++-----
 .../druid/msq/exec/SegmentLoadStatusFetcher.java   | 88 ++++++++++------------
 .../druid/msq/util/MultiStageQueryContext.java     | 10 +++
 4 files changed, 75 insertions(+), 68 deletions(-)

diff --git a/docs/multi-stage-query/reference.md 
b/docs/multi-stage-query/reference.md
index 6236b654525..010bbff2a27 100644
--- a/docs/multi-stage-query/reference.md
+++ b/docs/multi-stage-query/reference.md
@@ -246,6 +246,7 @@ The following table lists the context parameters for the 
MSQ task engine:
 | `durableShuffleStorage` | SELECT, INSERT, REPLACE <br /><br />Whether to use 
durable storage for shuffle mesh. To use this feature, configure the durable 
storage at the server level using 
`druid.msq.intermediate.storage.enable=true`). If these properties are not 
configured, any query with the context variable `durableShuffleStorage=true` 
fails with a configuration error. <br /><br />                                  
                                                                       [...]
 | `faultTolerance` | SELECT, INSERT, REPLACE<br /><br /> Whether to turn on 
fault tolerance mode or not. Failed workers are retried based on 
[Limits](#limits). Cannot be used when `durableShuffleStorage` is explicitly 
set to false.                                                                   
                                                                                
                                                                                
                                   [...]
 | `selectDestination` | SELECT<br /><br /> Controls where the final result of 
the select query is written. <br />Use `taskReport`(the default) to write 
select results to the task report. <b> This is not scalable since task reports 
size explodes for large results </b> <br/>Use `durableStorage` to write results 
to durable storage location. <b>For large results sets, its recommended to use 
`durableStorage` </b>. To configure durable storage see 
[`this`](#durable-storage) section.            [...]
+| `waitTillSegmentsLoad` | INSERT, REPLACE<br /><br /> If set, the ingest 
query waits for the generated segment to be loaded before exiting, else the 
ingest query exits without waiting. The task and live reports contain the 
information about the status of loading segments if this flag is set. This will 
ensure that any future queries made after the ingestion exits will include 
results from the ingestion. The drawback is that the controller task will stall 
till the segments are loaded.     [...]
 
 ## Joins
 
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index c423b959ecc..c7b10f245c1 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -463,14 +463,18 @@ public class ControllerImpl implements Controller
       }
     }
 
+    boolean shouldWaitForSegmentLoad = 
MultiStageQueryContext.shouldWaitForSegmentLoad(task.getQuerySpec().getQuery().context());
 
     try {
       releaseTaskLocks();
       cleanUpDurableStorageIfNeeded();
 
       if (queryKernel != null && queryKernel.isSuccess()) {
-        if (segmentLoadWaiter != null) {
-          // If successful and there are segments created, segmentLoadWaiter 
should wait for them to become available.
+        if (shouldWaitForSegmentLoad && segmentLoadWaiter != null) {
+          // If successful, there are segments created and segment load is 
enabled, segmentLoadWaiter should wait
+          // for them to become available.
+          log.info("Controller will now wait for segments to be loaded. The 
query has already finished executing,"
+                   + " and results will be included once the segments are 
loaded, even if this query is cancelled now.");
           segmentLoadWaiter.waitForSegmentsToLoad();
         }
       }
@@ -1363,31 +1367,35 @@ public class ControllerImpl implements Controller
         }
       } else {
         Set<String> versionsToAwait = 
segmentsWithTombstones.stream().map(DataSegment::getVersion).collect(Collectors.toSet());
+        if 
(MultiStageQueryContext.shouldWaitForSegmentLoad(task.getQuerySpec().getQuery().context()))
 {
+          segmentLoadWaiter = new SegmentLoadStatusFetcher(
+              context.injector().getInstance(BrokerClient.class),
+              context.jsonMapper(),
+              task.getId(),
+              task.getDataSource(),
+              versionsToAwait,
+              segmentsWithTombstones.size(),
+              true
+          );
+        }
+        performSegmentPublish(
+            context.taskActionClient(),
+            SegmentTransactionalInsertAction.overwriteAction(null, 
segmentsWithTombstones)
+        );
+      }
+    } else if (!segments.isEmpty()) {
+      Set<String> versionsToAwait = 
segments.stream().map(DataSegment::getVersion).collect(Collectors.toSet());
+      if 
(MultiStageQueryContext.shouldWaitForSegmentLoad(task.getQuerySpec().getQuery().context()))
 {
         segmentLoadWaiter = new SegmentLoadStatusFetcher(
             context.injector().getInstance(BrokerClient.class),
             context.jsonMapper(),
             task.getId(),
             task.getDataSource(),
             versionsToAwait,
-            segmentsWithTombstones.size(),
+            segments.size(),
             true
         );
-        performSegmentPublish(
-            context.taskActionClient(),
-            SegmentTransactionalInsertAction.overwriteAction(null, 
segmentsWithTombstones)
-        );
       }
-    } else if (!segments.isEmpty()) {
-      Set<String> versionsToAwait = 
segments.stream().map(DataSegment::getVersion).collect(Collectors.toSet());
-      segmentLoadWaiter = new SegmentLoadStatusFetcher(
-          context.injector().getInstance(BrokerClient.class),
-          context.jsonMapper(),
-          task.getId(),
-          task.getDataSource(),
-          versionsToAwait,
-          segments.size(),
-          true
-      );
       // Append mode.
       performSegmentPublish(
           context.taskActionClient(),
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java
index 478c632a749..17f46bad23a 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java
@@ -41,13 +41,10 @@ import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
 import javax.ws.rs.core.MediaType;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
 import java.util.Set;
-import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 
 /**
  * Class that periodically checks with the broker if all the segments 
generated are loaded by querying the sys table
@@ -84,14 +81,14 @@ public class SegmentLoadStatusFetcher implements 
AutoCloseable
                                            + "COUNT(*) FILTER (WHERE 
is_available = 0 AND is_published = 1 AND replication_factor != 0) AS 
pendingSegments,\n"
                                            + "COUNT(*) FILTER (WHERE 
replication_factor = -1) AS unknownSegments\n"
                                            + "FROM sys.segments\n"
-                                           + "WHERE datasource = '%s' AND 
is_overshadowed = 0 AND version = '%s'";
+                                           + "WHERE datasource = '%s' AND 
is_overshadowed = 0 AND version in (%s)";
 
   private final BrokerClient brokerClient;
   private final ObjectMapper objectMapper;
   // Map of version vs latest load status.
-  private final Map<String, VersionLoadStatus> versionToLoadStatusMap;
+  private final AtomicReference<VersionLoadStatus> versionLoadStatusReference;
   private final String datasource;
-  private final Set<String> versionsToAwait;
+  private final String versionsInClauseString;
   private final int totalSegmentsGenerated;
   private final boolean doWait;
   // since live reports fetch the value in another thread, we need to use 
AtomicReference
@@ -112,8 +109,11 @@ public class SegmentLoadStatusFetcher implements 
AutoCloseable
     this.brokerClient = brokerClient;
     this.objectMapper = objectMapper;
     this.datasource = datasource;
-    this.versionsToAwait = new TreeSet<>(versionsToAwait);
-    this.versionToLoadStatusMap = new HashMap<>();
+    this.versionsInClauseString = String.join(
+        ",",
+        versionsToAwait.stream().map(s -> StringUtils.format("'%s'", 
s)).collect(Collectors.toSet())
+    );
+    this.versionLoadStatusReference = new AtomicReference<>(new 
VersionLoadStatus(0, 0, 0, 0, totalSegmentsGenerated));
     this.totalSegmentsGenerated = totalSegmentsGenerated;
     this.status = new AtomicReference<>(new SegmentLoadWaiterStatus(
         State.INIT,
@@ -145,8 +145,9 @@ public class SegmentLoadStatusFetcher implements 
AutoCloseable
     final AtomicReference<Boolean> hasAnySegmentBeenLoaded = new 
AtomicReference<>(false);
     try {
       FutureUtils.getUnchecked(executorService.submit(() -> {
+        long lastLogMillis = -TimeUnit.MINUTES.toMillis(1);
         try {
-          while (!versionsToAwait.isEmpty()) {
+          while (!(hasAnySegmentBeenLoaded.get() && 
versionLoadStatusReference.get().isLoadingComplete())) {
             // Check the timeout and exit if exceeded.
             long runningMillis = new Interval(startTime, 
DateTimes.nowUtc()).toDurationMillis();
             if (runningMillis > TIMEOUT_DURATION_MILLIS) {
@@ -159,29 +160,21 @@ public class SegmentLoadStatusFetcher implements 
AutoCloseable
               return;
             }
 
-            Iterator<String> iterator = versionsToAwait.iterator();
-            log.info(
-                "Fetching segment load status for datasource[%s] from broker 
for segment versions[%s]",
-                datasource,
-                versionsToAwait
-            );
-
-            // Query the broker for all pending versions
-            while (iterator.hasNext()) {
-              String version = iterator.next();
-
-              // Fetch the load status for this version from the broker
-              VersionLoadStatus loadStatus = 
fetchLoadStatusForVersion(version);
-              versionToLoadStatusMap.put(version, loadStatus);
-              hasAnySegmentBeenLoaded.set(hasAnySegmentBeenLoaded.get() || 
loadStatus.getUsedSegments() > 0);
-
-              // If loading is done for this stage, remove it from future 
loops.
-              if (hasAnySegmentBeenLoaded.get() && 
loadStatus.isLoadingComplete()) {
-                iterator.remove();
-              }
+            if (runningMillis - lastLogMillis >= TimeUnit.MINUTES.toMillis(1)) 
{
+              lastLogMillis = runningMillis;
+              log.info(
+                  "Fetching segment load status for datasource[%s] from broker 
for segment versions[%s]",
+                  datasource,
+                  versionsInClauseString
+              );
             }
 
-            if (!versionsToAwait.isEmpty()) {
+            // Fetch the load status from the broker
+            VersionLoadStatus loadStatus = fetchLoadStatusFromBroker();
+            versionLoadStatusReference.set(loadStatus);
+            hasAnySegmentBeenLoaded.set(hasAnySegmentBeenLoaded.get() || 
loadStatus.getUsedSegments() > 0);
+
+            if (!(hasAnySegmentBeenLoaded.get() && 
versionLoadStatusReference.get().isLoadingComplete())) {
               // Update the status.
               updateStatus(State.WAITING, startTime);
               // Sleep for a bit before checking again.
@@ -216,50 +209,45 @@ public class SegmentLoadStatusFetcher implements 
AutoCloseable
   }
 
   /**
-   * Updates the {@link #status} with the latest details based on {@link 
#versionToLoadStatusMap}
+   * Updates the {@link #status} with the latest details based on {@link 
#versionLoadStatusReference}
    */
   private void updateStatus(State state, DateTime startTime)
   {
-    int pendingSegmentCount = 0, usedSegmentsCount = 0, precachedSegmentCount 
= 0, onDemandSegmentCount = 0, unknownSegmentCount = 0;
-    for (Map.Entry<String, VersionLoadStatus> entry : 
versionToLoadStatusMap.entrySet()) {
-      usedSegmentsCount += entry.getValue().getUsedSegments();
-      precachedSegmentCount += entry.getValue().getPrecachedSegments();
-      onDemandSegmentCount += entry.getValue().getOnDemandSegments();
-      unknownSegmentCount += entry.getValue().getUnknownSegments();
-      pendingSegmentCount += entry.getValue().getPendingSegments();
-    }
-
     long runningMillis = new Interval(startTime, 
DateTimes.nowUtc()).toDurationMillis();
+    VersionLoadStatus versionLoadStatus = versionLoadStatusReference.get();
     status.set(
         new SegmentLoadWaiterStatus(
             state,
             startTime,
             runningMillis,
             totalSegmentsGenerated,
-            usedSegmentsCount,
-            precachedSegmentCount,
-            onDemandSegmentCount,
-            pendingSegmentCount,
-            unknownSegmentCount
+            versionLoadStatus.getUsedSegments(),
+            versionLoadStatus.getPrecachedSegments(),
+            versionLoadStatus.getOnDemandSegments(),
+            versionLoadStatus.getPendingSegments(),
+            versionLoadStatus.getUnknownSegments()
         )
     );
   }
 
   /**
-   * Uses {@link #brokerClient} to fetch latest load status for a given 
version. Converts the response into a
+   * Uses {@link #brokerClient} to fetch latest load status for a given set of 
versions. Converts the response into a
    * {@link VersionLoadStatus} and returns it.
    */
-  private VersionLoadStatus fetchLoadStatusForVersion(String version) throws 
Exception
+  private VersionLoadStatus fetchLoadStatusFromBroker() throws Exception
   {
     Request request = brokerClient.makeRequest(HttpMethod.POST, 
"/druid/v2/sql/");
-    SqlQuery sqlQuery = new SqlQuery(StringUtils.format(LOAD_QUERY, 
datasource, version),
+    SqlQuery sqlQuery = new SqlQuery(StringUtils.format(LOAD_QUERY, 
datasource, versionsInClauseString),
                                      ResultFormat.OBJECTLINES,
                                      false, false, false, null, null
     );
     request.setContent(MediaType.APPLICATION_JSON, 
objectMapper.writeValueAsBytes(sqlQuery));
     String response = brokerClient.sendQuery(request);
 
-    if (response.trim().isEmpty()) {
+    if (response == null) {
+      // Unable to query broker
+      return new VersionLoadStatus(0, 0, 0, 0, totalSegmentsGenerated);
+    } else if (response.trim().isEmpty()) {
       // If no segments are returned for a version, all segments have been 
dropped by a drop rule.
       return new VersionLoadStatus(0, 0, 0, 0, 0);
     } else {
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
index 265f5eae0fe..98dcd471d0f 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
@@ -97,6 +97,8 @@ public class MultiStageQueryContext
 
   public static final String CTX_FAULT_TOLERANCE = "faultTolerance";
   public static final boolean DEFAULT_FAULT_TOLERANCE = false;
+  public static final String CTX_SEGMENT_LOAD_WAIT = "waitTillSegmentsLoad";
+  public static final boolean DEFAULT_SEGMENT_LOAD_WAIT = false;
   public static final String CTX_MAX_INPUT_BYTES_PER_WORKER = 
"maxInputBytesPerWorker";
 
   public static final String CTX_CLUSTER_STATISTICS_MERGE_MODE = 
"clusterStatisticsMergeMode";
@@ -148,6 +150,14 @@ public class MultiStageQueryContext
     );
   }
 
+  public static boolean shouldWaitForSegmentLoad(final QueryContext 
queryContext)
+  {
+    return queryContext.getBoolean(
+        CTX_SEGMENT_LOAD_WAIT,
+        DEFAULT_SEGMENT_LOAD_WAIT
+    );
+  }
+
   public static boolean isReindex(final QueryContext queryContext)
   {
     return queryContext.getBoolean(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to