github-code-scanning[bot] commented on code in PR #13353:
URL: https://github.com/apache/druid/pull/13353#discussion_r1064653537
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/MSQTasks.java:
##########
@@ -65,9 +71,26 @@
/**
* Returns a worker task ID given a SQL query id.
*/
- public static String workerTaskId(final String controllerTaskId, final int
workerNumber)
+ public static String workerTaskId(final String controllerTaskId, final int
workerNumber, final int retryCount)
{
- return StringUtils.format("%s-worker%d", controllerTaskId, workerNumber);
+ return StringUtils.format("%s-worker%d_%d", controllerTaskId,
workerNumber, retryCount);
+ }
+
+ /**
+ * Extract worker from taskId or throw exception if unable to parse out the
worker.
+ */
+ public static int workerFromTaskId(final String taskId)
+ {
+ final Matcher matcher = WORKER_PATTERN.matcher(taskId);
+ if (matcher.matches()) {
+ return Integer.parseInt(matcher.group(1));
Review Comment:
## Missing catch of NumberFormatException
Potential uncaught 'java.lang.NumberFormatException'.
[Show more
details](https://github.com/apache/druid/security/code-scanning/3606)
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java:
##########
@@ -165,10 +219,88 @@
}
}
+
+ /**
+ * Get workers which need to be sent partition boundaries
+ *
+ * @return
+ */
+ IntSet getWorkersToSendPartitionBoundaries()
+ {
+ if (!getStageDefinition().doesShuffle()) {
+ throw new ISE("Result partition information is not relevant to this
stage because it does not shuffle");
+ }
+ IntAVLTreeSet workers = new IntAVLTreeSet();
+ for (Integer worker : workerToPhase.keySet()) {
+ if
(ControllerWorkerStagePhase.PRESHUFFLE_WAITING_FOR_RESULT_PARTITION_BOUNDARIES.equals(workerToPhase.get(worker)))
{
+ workers.add(worker);
Review Comment:
## Deprecated method or constructor invocation
Invoking [AbstractIntCollection.add](1) should be avoided because it has
been deprecated.
[Show more
details](https://github.com/apache/druid/security/code-scanning/3602)
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java:
##########
@@ -283,6 +466,262 @@
return getPhase();
}
+ private void initializeTimeChunkWorkerTrackers()
+ {
+ workerToRemainingTimeChunks = new HashMap<>();
+ timeChunkToRemainingWorkers = new HashMap<>();
+
completeKeyStatisticsInformation.getTimeSegmentVsWorkerMap().forEach((timeChunk,
workers) -> {
+ for (int worker : workers) {
+ this.workerToRemainingTimeChunks.compute(worker, (wk, timeChunks) -> {
+ if (timeChunks == null) {
+ timeChunks = new HashSet<>();
+ }
+ timeChunks.add(timeChunk);
+ return timeChunks;
+ });
+ }
+ timeChunkToRemainingWorkers.put(timeChunk, workers);
+ });
+ }
+
+
+ /**
+ * Merges the {@link ClusterByStatisticsSnapshot} for the worker, time
chunk with the stage {@link ClusterByStatisticsCollector} being
+ * tracked at {@link #timeChunkToCollector} for the same time chunk. This
method is called when
+ * {@link ClusterStatisticsMergeMode#SEQUENTIAL} is chosen eventually.
+ * <br></br>
+ * <br></br>
+ * If all the stats from the worker are merged, we transition the worker to
{@link
ControllerWorkerStagePhase#PRESHUFFLE_WAITING_FOR_RESULT_PARTITION_BOUNDARIES};
+ * <br></br>
+ * If all the stats from all the workers are merged, we transition the stage
to {@link ControllerStagePhase#POST_READING}
+ */
+
+ void mergeClusterByStatisticsCollectorForTimeChunk(
+ int workerNumber,
+ Long timeChunk,
+ ClusterByStatisticsSnapshot clusterByStatisticsSnapshot
+ )
+ {
+ if (!stageDef.mustGatherResultKeyStatistics()
+ || !stageDef.doesShuffle()) {
+ throw new ISE("Stage does not gather result key statistics");
+ }
+
+ if (workerNumber < 0 || workerNumber >= workerCount) {
+ throw new IAE("Invalid workerNumber [%s]", workerNumber);
+ }
+
+ if (completeKeyStatisticsInformation == null ||
!completeKeyStatisticsInformation.isComplete()) {
+ throw new ISE(
+ "Cannot merge worker[%d] time chunk until all the key information is
received for stage[%d]",
+ workerNumber,
+ stageDef.getStageNumber()
+ );
+ }
+
+ ControllerWorkerStagePhase workerStagePhase =
workerToPhase.get(workerNumber);
+
+ if (workerStagePhase == null) {
+ throw new ISE("Worker[%d] not found for stage[%s]", workerNumber,
stageDef.getStageNumber());
+ }
+
+ // only merge in case this worker has remaining time chunks
+ workerToRemainingTimeChunks.computeIfPresent(workerNumber, (wk,
timeChunks) -> {
+ if (timeChunks.remove(timeChunk)) {
+
+ // merge the key collector
+ timeChunkToCollector.compute(
+ timeChunk,
+ (ignored, collector) -> {
+ if (collector == null) {
+ collector =
stageDef.createResultKeyStatisticsCollector(maxRetainedPartitionSketchBytes);
+ }
+ collector.addAll(clusterByStatisticsSnapshot);
+ return collector;
+ }
+ );
+
+ // if work for one time chunk is finished, generate the
ClusterByPartitions for that timeChunk and clear the collector so that we free
up controller memory.
+ timeChunkToRemainingWorkers.compute(timeChunk, (tc, workers) -> {
+ if (workers == null || workers.isEmpty()) {
+ throw new ISE(
+ "Remaining workers should not be empty until all the work is
finished for time chunk[%d] for stage[%d]",
+ timeChunk,
+ stageDef.getStageNumber()
+ );
+ }
+ workers.remove(workerNumber);
+ if (workers.isEmpty()) {
+ // generate partition boundaries since all work is finished for
the time chunk
+ ClusterByStatisticsCollector collector =
timeChunkToCollector.get(tc);
+ Either<Long, ClusterByPartitions> countOrPartitions =
stageDef.generatePartitionsForShuffle(collector);
+ totalPartitionCount +=
getPartitionCountFromEither(countOrPartitions);
+ if (totalPartitionCount > stageDef.getMaxPartitionCount()) {
+ failForReason(new
TooManyPartitionsFault(stageDef.getMaxPartitionCount()));
+ return null;
+ }
+ timeChunkToBoundaries.put(tc, countOrPartitions.valueOrThrow());
+
+ // clear the collector to give back memory
+ collector.clear();
+ timeChunkToCollector.remove(tc);
+ return null;
+ }
+ return workers;
+ });
+ }
+ return timeChunks.isEmpty() ? null : timeChunks;
+ });
+
+
+ // if all time chunks for worker are taken care off transition worker.
+ if (workerToRemainingTimeChunks.get(workerNumber) == null) {
+ // adding worker to a set so that we do not fetch the worker collectors
again.
+ workersFromWhichKeyCollectorFetched.add(workerNumber);
+ if
(ControllerWorkerStagePhase.PRESHUFFLE_WAITING_FOR_RESULT_PARTITION_BOUNDARIES.canTransitionFrom(
+ workerStagePhase)) {
+ workerToPhase.put(workerNumber,
ControllerWorkerStagePhase.PRESHUFFLE_WAITING_FOR_RESULT_PARTITION_BOUNDARIES);
+ } else {
+ throw new ISE(
+ "Worker[%d] for stage[%d] expected to be in state[%s]. Found
state[%s]",
+ workerNumber,
+ (stageDef.getStageNumber()),
+
ControllerWorkerStagePhase.PRESHUFFLE_WAITING_FOR_RESULT_PARTITION_BOUNDARIES,
+ workerStagePhase
+
+ );
+ }
+ }
+
+
+ // if all time chunks have the partition boundaries, merge them to set
resultPartitionBoundaries
+ if (workerToRemainingTimeChunks.isEmpty()) {
+ if (resultPartitionBoundaries == null) {
+ timeChunkToBoundaries.forEach((ignored, partitions) -> {
+ if (resultPartitionBoundaries == null) {
+ resultPartitionBoundaries = partitions;
+ } else {
+
abutAndAppendPartitionBoundaries(resultPartitionBoundaries.ranges(),
partitions.ranges());
+ }
+ });
+ timeChunkToBoundaries.clear();
+ setClusterByPartitionBoundaries(resultPartitionBoundaries);
+ } else {
+ // we already have result partitions. We can safely transition to POST
READING and submit the result boundaries to the workers.
+ transitionTo(ControllerStagePhase.POST_READING);
+ }
+ }
+
+ }
+
+ /**
+ * Merges the entire {@link ClusterByStatisticsSnapshot} for the worker with
the stage {@link ClusterByStatisticsCollector} being
+ * tracked at {@link #timeChunkToCollector} with key {@link
ControllerStageTracker#STATIC_TIME_CHUNK_FOR_PARALLEL_MERGE}. This method is
called when
+ * {@link ClusterStatisticsMergeMode#PARALLEL} is chosen eventually.
+ * <br></br>
+ * <br></br>
+ * If all the stats from the worker are merged, we transition the worker to
{@link
ControllerWorkerStagePhase#PRESHUFFLE_WAITING_FOR_RESULT_PARTITION_BOUNDARIES}.
+ * <br></br>
+ * If all the stats from all the workers are merged, we transition the stage
to {@link ControllerStagePhase#POST_READING}.
+ */
+
+ void mergeClusterByStatisticsCollectorForAllTimeChunks(
+ int workerNumber,
+ ClusterByStatisticsSnapshot clusterByStatsSnapshot
+ )
+ {
+ if (!stageDef.mustGatherResultKeyStatistics()
+ || !stageDef.doesShuffle()) {
+ throw new ISE("Stage does not gather result key statistics");
+ }
+
+ if (workerNumber < 0 || workerNumber >= workerCount) {
+ throw new IAE("Invalid workerNumber [%s]", workerNumber);
+ }
+
+ ControllerWorkerStagePhase workerStagePhase =
workerToPhase.get(workerNumber);
+
+ if (workerStagePhase == null) {
+ throw new ISE("Worker[%d] not found for stage[%s]", workerNumber,
stageDef.getStageNumber());
+ }
+
+
+ // To prevent the case where we do not fetch the collector twice, like
when worker is retried, we should be okay with the
+ // older collector from the previous run of the worker.
+
+ if (workersFromWhichKeyCollectorFetched.add(workerNumber)) {
+ // in case of parallel merge we use the "ALL" granularity start time to
put the sketches
+ timeChunkToCollector.compute(
+ STATIC_TIME_CHUNK_FOR_PARALLEL_MERGE,
+ (timeChunk, stats) -> {
+ if (stats == null) {
+ stats =
stageDef.createResultKeyStatisticsCollector(maxRetainedPartitionSketchBytes);
+ }
+ stats.addAll(clusterByStatsSnapshot);
+ return stats;
+ }
+ );
+ } else {
+ log.debug("Already have key collector for worker[%d] stage[%d]",
workerNumber, stageDef.getStageNumber());
+ }
+
+ if
(ControllerWorkerStagePhase.PRESHUFFLE_WAITING_FOR_RESULT_PARTITION_BOUNDARIES.canTransitionFrom(workerStagePhase))
{
+ workerToPhase.put(workerNumber,
ControllerWorkerStagePhase.PRESHUFFLE_WAITING_FOR_RESULT_PARTITION_BOUNDARIES);
+ } else {
+ throw new ISE(
+ "Worker[%d] for stage[%d] expected to be in state[%s]. Found
state[%s]",
+ workerNumber,
+ (stageDef.getStageNumber()),
+
ControllerWorkerStagePhase.PRESHUFFLE_WAITING_FOR_RESULT_PARTITION_BOUNDARIES,
+ workerStagePhase
+
+ );
+ }
+
+ if (allResultsStatsFetched()) {
+ if (completeKeyStatisticsInformation == null ||
!completeKeyStatisticsInformation.isComplete()) {
+ throw new ISE(
+ "Cannot generate partition boundaries until all the key
information is received for stage[%d]",
+ workerNumber,
+ stageDef.getStageNumber()
+ );
Review Comment:
## Unused format argument
This format call refers to 1 argument(s) but supplies 2 argument(s).
[Show more
details](https://github.com/apache/druid/security/code-scanning/3604)
##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java:
##########
@@ -1254,13 +1299,15 @@
.setExpectedRowSignature(rowSignature)
.setExpectedResultRows(ImmutableList.of(new Object[]{1L, 6L}))
.verifyResults();
- File successFile = new File(
- localFileStorageDir,
- DurableStorageUtils.getSuccessFilePath("query-test-query", 0, 0)
- );
+ if (DURABLE_STORAGE.equals(contextName) ||
!FAULT_TOLERANCE.equals(contextName)) {
+ File successFile = new File(
+ localFileStorageDir,
+ DurableStorageUtils.getSuccessFilePath("query-test-query", 0, 0)
+ );
Review Comment:
## Unread local variable
Variable 'File successFile' is never read.
[Show more
details](https://github.com/apache/druid/security/code-scanning/3605)
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java:
##########
@@ -50,322 +54,236 @@
{
private static final Logger log = new Logger(WorkerSketchFetcher.class);
private static final int DEFAULT_THREAD_COUNT = 4;
- // If the combined size of worker sketches is more than this threshold,
SEQUENTIAL merging mode is used.
- static final long BYTES_THRESHOLD = 1_000_000_000L;
- // If there are more workers than this threshold, SEQUENTIAL merging mode is
used.
- static final long WORKER_THRESHOLD = 100;
- private final ClusterStatisticsMergeMode clusterStatisticsMergeMode;
- private final int statisticsMaxRetainedBytes;
private final WorkerClient workerClient;
- private final ExecutorService executorService;
+ private final MSQWorkerTaskLauncher workerTaskLauncher;
+
+ private final boolean retryEnabled;
+
+ private AtomicReference<Throwable> isError = new AtomicReference<>();
+ final ExecutorService executorService;
+
public WorkerSketchFetcher(
WorkerClient workerClient,
- ClusterStatisticsMergeMode clusterStatisticsMergeMode,
- int statisticsMaxRetainedBytes
+ MSQWorkerTaskLauncher workerTaskLauncher,
+ boolean retryEnabled
)
{
this.workerClient = workerClient;
- this.clusterStatisticsMergeMode = clusterStatisticsMergeMode;
this.executorService = Execs.multiThreaded(DEFAULT_THREAD_COUNT,
"SketchFetcherThreadPool-%d");
- this.statisticsMaxRetainedBytes = statisticsMaxRetainedBytes;
+ this.workerTaskLauncher = workerTaskLauncher;
+ this.retryEnabled = retryEnabled;
}
/**
- * Submits a request to fetch and generate partitions for the given worker
statistics and returns a future for it. It
- * decides based on the statistics if it should fetch sketches one by one or
together.
+ * Fetches the full {@link ClusterByStatisticsCollector} from all workers
and generates partition boundaries from them.
+ * This is faster than fetching them timechunk by timechunk but the
collector will be downsampled till it can fit
+ * on the controller, resulting in less accurate partition boundries.
*/
- public CompletableFuture<Either<Long, ClusterByPartitions>>
submitFetcherTask(
- CompleteKeyStatisticsInformation completeKeyStatisticsInformation,
- List<String> workerTaskIds,
- StageDefinition stageDefinition,
- IntSet workersForStage
+ public void inMemoryFullSketchMerging(
+ Consumer<Consumer<ControllerQueryKernel>> kernelActions,
+ StageId stageId,
+ Set<String> taskIds,
+ TriConsumer<ControllerQueryKernel, Integer, MSQFault> retryOperation
)
{
- ClusterBy clusterBy = stageDefinition.getClusterBy();
- switch (clusterStatisticsMergeMode) {
- case SEQUENTIAL:
- return sequentialTimeChunkMerging(completeKeyStatisticsInformation,
stageDefinition, workerTaskIds);
- case PARALLEL:
- return inMemoryFullSketchMerging(stageDefinition, workerTaskIds,
workersForStage);
- case AUTO:
- if (clusterBy.getBucketByCount() == 0) {
- log.info(
- "Query[%s] stage[%d] for AUTO mode: chose PARALLEL mode to merge
key statistics",
- stageDefinition.getId().getQueryId(),
- stageDefinition.getStageNumber()
- );
- // If there is no time clustering, there is no scope for sequential
merge
- return inMemoryFullSketchMerging(stageDefinition, workerTaskIds,
workersForStage);
- } else if (stageDefinition.getMaxWorkerCount() > WORKER_THRESHOLD
- || completeKeyStatisticsInformation.getBytesRetained() >
BYTES_THRESHOLD) {
- log.info(
- "Query[%s] stage[%d] for AUTO mode: chose SEQUENTIAL mode to
merge key statistics",
- stageDefinition.getId().getQueryId(),
- stageDefinition.getStageNumber()
+ for (String taskId : taskIds) {
+ try {
+ int workerNumber = MSQTasks.workerFromTaskId(taskId);
+ executorService.submit(() -> {
+ fetchStatsFromWorker(
+ kernelActions,
+ () -> workerClient.fetchClusterByStatisticsSnapshot(
+ taskId,
+ stageId.getQueryId(),
+ stageId.getStageNumber()
+ ),
+ taskId,
+ (kernel, snapshot) ->
kernel.mergeClusterByStatisticsCollectorForAllTimeChunks(
+ stageId,
+ workerNumber,
+ snapshot
+ ),
+ retryOperation
);
- return sequentialTimeChunkMerging(completeKeyStatisticsInformation,
stageDefinition, workerTaskIds);
+ });
+ }
+ catch (RejectedExecutionException rejectedExecutionException) {
+ if (isError.get() == null) {
+ throw rejectedExecutionException;
+ } else {
+ // throw worker error exception
+ throw new ISE("Unable to fetch partitions ", isError.get());
Review Comment:
## Unused format argument
This format call refers to 0 argument(s) but supplies 1 argument(s).
[Show more
details](https://github.com/apache/druid/security/code-scanning/3603)
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java:
##########
@@ -165,10 +219,88 @@
}
}
+
+ /**
+ * Get workers which need to be sent partition boundaries
+ *
+ * @return
+ */
+ IntSet getWorkersToSendPartitionBoundaries()
+ {
+ if (!getStageDefinition().doesShuffle()) {
+ throw new ISE("Result partition information is not relevant to this
stage because it does not shuffle");
+ }
+ IntAVLTreeSet workers = new IntAVLTreeSet();
+ for (Integer worker : workerToPhase.keySet()) {
+ if
(ControllerWorkerStagePhase.PRESHUFFLE_WAITING_FOR_RESULT_PARTITION_BOUNDARIES.equals(workerToPhase.get(worker)))
{
Review Comment:
## Deprecated method or constructor invocation
Invoking [Int2ObjectMap.get](1) should be avoided because it has been
deprecated.
[Show more
details](https://github.com/apache/druid/security/code-scanning/3601)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]