cryptoe commented on code in PR #15076:
URL: https://github.com/apache/druid/pull/15076#discussion_r1345906495
##########
docs/multi-stage-query/reference.md:
##########
@@ -246,6 +246,7 @@ The following table lists the context parameters for the
MSQ task engine:
| `durableShuffleStorage` | SELECT, INSERT, REPLACE <br /><br />Whether to use
durable storage for shuffle mesh. To use this feature, configure the durable
storage at the server level using
`druid.msq.intermediate.storage.enable=true`). If these properties are not
configured, any query with the context variable `durableShuffleStorage=true`
fails with a configuration error. <br /><br />
| `false` |
| `faultTolerance` | SELECT, INSERT, REPLACE<br /><br /> Whether to turn on
fault tolerance mode or not. Failed workers are retried based on
[Limits](#limits). Cannot be used when `durableShuffleStorage` is explicitly
set to false.
| `false` |
| `selectDestination` | SELECT<br /><br /> Controls where the final result of
the select query is written. <br />Use `taskReport`(the default) to write
select results to the task report. <b> This is not scalable since task reports
size explodes for large results </b> <br/>Use `durableStorage` to write results
to durable storage location. <b>For large results sets, its recommended to use
`durableStorage` </b>. To configure durable storage see
[`this`](#durable-storage) section.
| `taskRep
ort` |
+| `segmentLoadWait` | INSERT, REPLACE<br /><br /> Whether the controller
should wait for segments to be loaded before exiting. If this is true, the
controller queries the broker and waits till the segments created (if any) have
been loaded by the load rules. The controller also provides this information in
the live reports and task reports. If this is false, the controller exits
immediately after finishing the query.
| `false` |
Review Comment:
segmentHandedOff ? How does this name sound ?
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java:
##########
@@ -159,29 +159,18 @@ public void waitForSegmentsToLoad()
return;
}
- Iterator<String> iterator = versionsToAwait.iterator();
- log.info(
+ log.debug(
Review Comment:
Can we log.info this every minute ?
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java:
##########
@@ -159,29 +159,18 @@ public void waitForSegmentsToLoad()
return;
}
- Iterator<String> iterator = versionsToAwait.iterator();
- log.info(
+ log.debug(
Review Comment:
And update the log message to say that even if the task is cancelled, the
segments will continue to load.
##########
docs/multi-stage-query/reference.md:
##########
@@ -246,6 +246,7 @@ The following table lists the context parameters for the
MSQ task engine:
| `durableShuffleStorage` | SELECT, INSERT, REPLACE <br /><br />Whether to use
durable storage for shuffle mesh. To use this feature, configure the durable
storage at the server level using
`druid.msq.intermediate.storage.enable=true`). If these properties are not
configured, any query with the context variable `durableShuffleStorage=true`
fails with a configuration error. <br /><br />
| `false` |
| `faultTolerance` | SELECT, INSERT, REPLACE<br /><br /> Whether to turn on
fault tolerance mode or not. Failed workers are retried based on
[Limits](#limits). Cannot be used when `durableShuffleStorage` is explicitly
set to false.
| `false` |
| `selectDestination` | SELECT<br /><br /> Controls where the final result of
the select query is written. <br />Use `taskReport`(the default) to write
select results to the task report. <b> This is not scalable since task reports
size explodes for large results </b> <br/>Use `durableStorage` to write results
to durable storage location. <b>For large results sets, its recommended to use
`durableStorage` </b>. To configure durable storage see
[`this`](#durable-storage) section.
| `taskRep
ort` |
+| `segmentLoadWait` | INSERT, REPLACE<br /><br /> Whether the controller
should wait for segments to be loaded before exiting. If this is true, the
controller queries the broker and waits till the segments created (if any) have
been loaded by the load rules. The controller also provides this information in
the live reports and task reports. If this is false, the controller exits
immediately after finishing the query.
| `false` |
Review Comment:
or waitTillSegmentsLoad
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java:
##########
@@ -216,43 +205,35 @@ private void waitIfNeeded(long waitTimeMillis) throws
Exception
}
/**
- * Updates the {@link #status} with the latest details based on {@link
#versionToLoadStatusMap}
+ * Updates the {@link #status} with the latest details based on {@link
#versionLoadStatusReference}
*/
private void updateStatus(State state, DateTime startTime)
{
- int pendingSegmentCount = 0, usedSegmentsCount = 0, precachedSegmentCount
= 0, onDemandSegmentCount = 0, unknownSegmentCount = 0;
- for (Map.Entry<String, VersionLoadStatus> entry :
versionToLoadStatusMap.entrySet()) {
- usedSegmentsCount += entry.getValue().getUsedSegments();
- precachedSegmentCount += entry.getValue().getPrecachedSegments();
- onDemandSegmentCount += entry.getValue().getOnDemandSegments();
- unknownSegmentCount += entry.getValue().getUnknownSegments();
- pendingSegmentCount += entry.getValue().getPendingSegments();
- }
-
long runningMillis = new Interval(startTime,
DateTimes.nowUtc()).toDurationMillis();
+ VersionLoadStatus versionLoadStatus = versionLoadStatusReference.get();
status.set(
new SegmentLoadWaiterStatus(
state,
startTime,
runningMillis,
totalSegmentsGenerated,
- usedSegmentsCount,
- precachedSegmentCount,
- onDemandSegmentCount,
- pendingSegmentCount,
- unknownSegmentCount
+ versionLoadStatus.getUsedSegments(),
+ versionLoadStatus.getPrecachedSegments(),
+ versionLoadStatus.getOnDemandSegments(),
+ versionLoadStatus.getPendingSegments(),
+ versionLoadStatus.getUnknownSegments()
)
);
}
/**
- * Uses {@link #brokerClient} to fetch latest load status for a given
version. Converts the response into a
+ * Uses {@link #brokerClient} to fetch latest load status for a given set of
versions. Converts the response into a
* {@link VersionLoadStatus} and returns it.
*/
- private VersionLoadStatus fetchLoadStatusForVersion(String version) throws
Exception
+ private VersionLoadStatus fetchLoadStatusFromBroker() throws Exception
{
Request request = brokerClient.makeRequest(HttpMethod.POST,
"/druid/v2/sql/");
- SqlQuery sqlQuery = new SqlQuery(StringUtils.format(LOAD_QUERY,
datasource, version),
+ SqlQuery sqlQuery = new SqlQuery(StringUtils.format(LOAD_QUERY,
datasource, versionsInClauseString),
Review Comment:
While running ut's I also saw
```
2023-10-05T11:21:28,837 WARN
[query-2cd60052-3011-47ae-87be-ec47100f649d-segment-load-waiter-0]
org.apache.druid.msq.exec.SegmentLoadStatusFetcher - Exception occurred while
waiting for segments to load. Exiting.
java.lang.NullPointerException: null
at
org.apache.druid.msq.exec.SegmentLoadStatusFetcher.fetchLoadStatusForVersion(SegmentLoadStatusFetcher.java:262)
~[classes/:?]
at
org.apache.druid.msq.exec.SegmentLoadStatusFetcher.lambda$waitForSegmentsToLoad$0(SegmentLoadStatusFetcher.java:174)
~[classes/:?]
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
at
com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:131)
~[guava-31.1-jre.jar:?]
at
com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:74)
~[guava-31.1-jre.jar:?]
at
com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:82)
~[guava-31.1-jre.jar:?]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
~[?:?]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
~[?:?]
at java.lang.Thread.run(Thread.java:829) ~[?:?]
2023-10-05T11:21:28,846 INFO [main] org.apache.druid.msq.exec.WorkerImpl -
Stopping gracefully for taskId
[query-2cd60052-3011-47ae-87be-ec47100f649d-worker0_0]
2023-10-05T11:21:28,848 INFO [main] org.apache.druid.msq.test.MSQTestBase -
found generated segments: DataSegment{binaryVersion=9,
id=foo1_-146136543-09-08T08:23:32.096Z_146140482-04-24T15:36:27.903Z_test,
loadSpec={type=>local,
path=>/var/folders/sx/n0v16tnj6mvf6hd14l6j6dth0000gn/T/junit8066366800031570368/localsegments/foo1/-146136543-09-08T08:23:32.096Z_146140482-04-24T15:36:27.903Z/test/0/index/},
dimensions=[dim_mv], metrics=[], shardSpec=NumberedShardSpec{partitionNum=0,
partitions=100}, lastCompactionState=null, size=1037}
2023-10-05T11:21:28,859 INFO [main] org.apache.druid.msq.test.MSQTestBase -
Found spec: {
"query" : {
```
Lets fix it in this PR as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]