GWphua opened a new pull request, #18206: URL: https://github.com/apache/druid/pull/18206
<!-- Thanks for trying to help us make Apache Druid be the best it can be! Please fill out as much of the following information as is possible (where relevant, and remove it when irrelevant) to help make the intention and scope of this PR clear in order to ease review. --> <!-- Please read the doc for contribution (https://github.com/apache/druid/blob/master/CONTRIBUTING.md) before making this PR. Also, once you open a PR, please _avoid using force pushes and rebasing_ since these make it difficult for reviewers to see what you've changed in response to their reviews. See [the 'If your pull request shows conflicts with master' section](https://github.com/apache/druid/blob/master/CONTRIBUTING.md#if-your-pull-request-shows-conflicts-with-master) for more details. --> Fixes #18197 This problem is introduced through this [PR](https://github.com/apache/druid/pull/13896) in Druid v26. ## Problem Statement Using the `druid-multi-stage-query` extension with K8s, using the `INSERT` or `REPLACE` query will result in a `failed with status code 404` message. However, the query is able to succeed uneventfully on a MM architecture. (See related issue for screenshots.) ### Logs #### Broker Logs ``` 2025-06-26T09:08:09,757 INFO [qtp1786513714-155] org.apache.druid.msq.sql.resources.SqlStatementResource - Query details not found for queryId [query-1ed10341-bb20-4086-a1d4-060d2496266a] org.apache.druid.rpc.HttpResponseException: Server error [404 Not Found]; body: No task reports were found for this task. The task may not exist, or it may not have completed yet. at org.apache.druid.rpc.ServiceClientImpl$1.onSuccess(ServiceClientImpl.java:200) ~[druid-server-31.0.1-SNAPSHOT.jar:31.0.1-SNAPSHOT] at org.apache.druid.rpc.ServiceClientImpl$1.onSuccess(ServiceClientImpl.java:182) ~[druid-server-31.0.1-SNAPSHOT.jar:31.0.1-SNAPSHOT] at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1133) ~[guava-32.0.1-jre.jar:?] at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?] at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[?:?] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?] at java.base/java.lang.Thread.run(Thread.java:840) [?:?] ``` #### Overlord Logs ``` 2025-07-01T03:18:16,150 WARN [qtp1510570627-108] org.apache.druid.indexing.overlord.http.OverlordResource - Failed to stream task reports for task query-be62b874-c86c-4079-8a71-a0867f6953c2 org.jboss.netty.channel.ChannelException: Faulty channel in resource pool at org.apache.druid.java.util.http.client.NettyHttpClient.go(NettyHttpClient.java:134) ~[druid-processing-31.0.1-SNAPSHOT.jar:31.0.1-SNAPSHOT] at org.apache.druid.java.util.http.client.CredentialedHttpClient.go(CredentialedHttpClient.java:48) ~[druid-processing-31.0.1-SNAPSHOT.jar:31.0.1-SNAPSHOT] at org.apache.druid.java.util.http.client.AbstractHttpClient.go(AbstractHttpClient.java:33) ~[druid-processing-31.0.1-SNAPSHOT.jar:31.0.1-SNAPSHOT] at org.apache.druid.k8s.overlord.KubernetesTaskRunner.streamTaskReports(KubernetesTaskRunner.java:298) ~[?:?] at org.apache.druid.indexing.common.tasklogs.TaskRunnerTaskLogStreamer.streamTaskReports(TaskRunnerTaskLogStreamer.java:59) ~[druid-indexing-service-31.0.1-SNAPSHOT.jar:31.0.1-SNAPSHOT] at org.apache.druid.indexing.common.tasklogs.SwitchingTaskLogStreamer.streamTaskReports(SwitchingTaskLogStreamer.java:94) ~[druid-indexing-service-31.0.1-SNAPSHOT.jar:31.0.1-SNAPSHOT] at org.apache.druid.indexing.overlord.http.OverlordResource.doGetReports(OverlordResource.java:810) ~[druid-indexing-service-31.0.1-SNAPSHOT.jar:31.0.1-SNAPSHOT] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?] ... Caused by: java.net.ConnectException: Connection refused: /XX.XX.XX.XX:8100 ... ``` ## Investigation ### Root Cause Analysis 1. The query is able to succeed, and the segments are able to be pushed on the Historicals. 2. On the dashboard, the execution details are queried using the api `druid/v2/sql/statement/{queryId}`. 3. The following error log is found on the webpage console: `{"error":"druidException","errorCode":"notFound","persona":"USER","category":"NOT_FOUND","errorMessage":"Query [query-a72b5075-0dae-48bf-b1b8-7f66c2e070e6] was not found. The query details are no longer present or might not be of the type [query_controller]. Verify that the id is correct.","context":{}}` ### Code Dive #### Looking into Dashboard API Call 1. The dashboard tries to call a GET API to `druid/v2/sql/statement/{queryId}?detail=true`. 2. The API is provided by `SqlStatementResource#doGetStatus`, and proceeds into the following steps: 1. Authorise and Authenticate 2. Get Statement Status for query through `SqlStatementResource#getStatementStatus` 3. Returns the statement status as an OK `Response` if exists, else build and return a non-OK `Response`. #### Get Statement Status 1. Contact the overlord for the task status with `taskId` = `queryId` at `/druid/indexer/v1/tasks/{taskId}/status`. 1. Hits `OverlordResource#getTaskStatus` 2. Contact the overlord for the task report with `taskId` = `queryId` at `/druid/indexer/v1/tasks/{taskId}/reports`. 1. Calls `OverlordResource#doGetReports`, which calls `SwitchingTaskLogStreamer#streamTaskReports`. ```java @GET @Path("/task/{taskid}/reports") @Produces(MediaType.APPLICATION_JSON) @ResourceFilters(TaskResourceFilter.class) public Response doGetReports( @PathParam("taskid") final String taskid ) { try { final Optional<InputStream> stream = taskLogStreamer.streamTaskReports(taskid); if (stream.isPresent()) { return Response.ok(stream.get()).build(); } else { return Response.status(Response.Status.NOT_FOUND) .entity( "No task reports were found for this task. " + "The task may not exist, or it may not have completed yet." ) .build(); } } catch (Exception e) { log.warn(e, "Failed to stream task reports for task %s", taskid); return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build(); } } ``` 2. `SwitchingTaskLogStreamer` will first query task report from the task runner (Kubernetes pods in K8s context), before trying to find task reports in deep storage. ```java @Override public Optional<InputStream> streamTaskReports(String taskid) throws IOException { IOException deferIOException = null; try { final Optional<InputStream> stream = taskRunnerTaskLogStreamer.streamTaskReports(taskid); if (stream.isPresent()) { return stream; } } catch (IOException e) { // defer first IO exception due to race in the way tasks update their exit status in the overlord // It may happen that the task sent the report to deep storage but the task is still running with http chat handlers unregistered // In such a case, catch and ignore the 1st IOException and try deepStorage for the report. If the report is still not found, return the caught exception deferIOException = e; } for (TaskLogStreamer provider : deepStorageStreamers) { try { final Optional<InputStream> stream = provider.streamTaskReports(taskid); if (stream.isPresent()) { return stream; } } catch (IOException e) { if (deferIOException != null) { e.addSuppressed(deferIOException); } throw e; } } // Could not find any InputStream. Throw deferred exception if exists if (deferIOException != null) { throw deferIOException; } return Optional.absent(); } ``` 3. Looking into the above Overlord logs, there is a stack trace that surfaces after the task completes. This shows us that the Overlord is unable to contact the exited Peon pod to find the task report, and then unable to find the task report from deep storage. ## Hypothesis Given the trace from above, there is reason to believe that the task report cannot be found in the Deep Storage. We are now guessing that the code is unable to push the task report to HDFS (Deep Storage). ### Kubernetes Task Reports Not Uploaded Into Deep Storage During Task Cleanup 1. In the Kubernetes context, the K8s tries to push task reports, followed by task status via `AbstractTask` when cleaning up the task. ```java @Override public void cleanUp(TaskToolbox toolbox, @Nullable TaskStatus taskStatus) throws Exception { // clear any interrupted status to ensure subsequent cleanup proceeds without interruption. Thread.interrupted(); if (!toolbox.getConfig().isEncapsulatedTask()) { log.debug("Not pushing task logs and reports from task."); return; } // isEncapsulatedTask() means "isK8sIngestion". Non-K8s ingestions will proceed after this line. // 7 lines of unrelated code block omitted here. if (reportsFile != null && reportsFile.exists()) { toolbox.getTaskLogPusher().pushTaskReports(id, reportsFile); log.debug("Pushed task reports"); } else { log.debug("No task reports file exists to push"); } if (statusFile != null) { toolbox.getJsonMapper().writeValue(statusFile, taskStatusToReport); toolbox.getTaskLogPusher().pushTaskStatus(id, statusFile); Files.deleteIfExists(statusFile.toPath()); log.debug("Pushed task status"); } else { log.debug("No task status file exists to push"); } } ``` 2. Checking the logs, I found that the only the task status is pushed, but not the task reports. ``` 2025-07-01T03:20:35,548 INFO [query-be62b874-c86c-4079-8a71-a0867f6953c2-segment-load-waiter-0] org.apache.druid.msq.exec.SegmentLoadStatusFetcher - Fetching segment load status for datasource[HELLO_WORLD] from broker 2025-07-01T03:20:40,962 INFO [query-be62b874-c86c-4079-8a71-a0867f6953c2-segment-load-waiter-0] org.apache.druid.msq.exec.SegmentLoadStatusFetcher - Segment loading completed for datasource[HELLO_WORLD] 2025-07-01T03:20:40,978 INFO [task-runner-0-priority-0] org.apache.druid.storage.hdfs.tasklog.HdfsTaskLogs - Writing task status to: hdfs://R2/user/druid/k8s/test-benchmarking-jdk21/indexing-logs/query-be62b874-c86c-4079-8a71-a0867f6953c2.status.json 2025-07-01T03:20:43,850 INFO [task-runner-0-priority-0] org.apache.druid.storage.hdfs.tasklog.HdfsTaskLogs - Wrote task status to: hdfs://R2/user/druid/k8s/test-benchmarking-jdk21/indexing-logs/query-be62b874-c86c-4079-8a71-a0867f6953c2.status.json ``` 3. Entering into the bash terminal of the Peon pod, I found that the report file exists in `./var/tmp/attempt/1/report.json` 4. Through examination, we note that the AbstractTask is trying to read from `./var/tmp/{queryId}/attempt/{attemptId}/report.json` instead. 5. The missing `queryId` in the path is probably a cause for the error. 6. This problem will not happen for MM architecture. #### Why Task Report is Written to the Wrong Path 1. Under CliPeon, we have binded TaskReportFileWriter to SingleFileTaskReportFileWriter. This SingleFileTaskReportFileWriter is configured to write to the input `File`. ```java binder.bind(TaskReportFileWriter.class) .toInstance( new SingleFileTaskReportFileWriter( Paths.get(taskDirPath, "attempt", attemptId, "report.json").toFile() )); // The Path given will translate to: {taskDirPath}/attempt/{attemptId}/report.json ``` 2. Under the K8s Model, we have `taskDirPath` as: ```log 2025-07-03T09:57:39,395 INFO [main] org.apache.druid.cli.CliPeon - Running peon in k8s mode 2025-07-03T09:57:39,395 INFO [main] org.apache.druid.cli.CliPeon - Running peon task in taskDirPath[/opt/druid/var/tmp/] attemptId[1] ``` 3. Under the MM Model, we have `taskDirPath` as: ```log 2025-07-03T09:50:57,522 INFO [main] org.apache.druid.cli.CliPeon - Running peon task in taskDirPath[var/druid/task/slot0/query-ae23e2ef-283a-4546-a295-68995bd90fe1] attemptId[1] ``` 4. However, the `reportFile` and `statusFile` in `AbstractTask` is constructed as such: ```java @Nullable public String setup(TaskToolbox toolbox) throws Exception { if (toolbox.getConfig().isEncapsulatedTask()) { // taskDir = {taskDirPath} File taskDir = toolbox.getConfig().getTaskDir(getId()); FileUtils.mkdirp(taskDir); // attemptDir = {taskDirPath}/attempt/{attemptId} File attemptDir = Paths.get(taskDir.getAbsolutePath(), "attempt", toolbox.getAttemptId()).toFile(); FileUtils.mkdirp(attemptDir); // reportsFile = {taskDirPath}/attempt/{attemptId}/report.json reportsFile = new File(attemptDir, "report.json"); // statusFile = {taskDirPath}/attempt/{attemptId}/status.json statusFile = new File(attemptDir, "status.json"); // Excess code skipped... } ``` 5. Hence, this causes a mismatch between the file path finding. 6. The reason why Druid is able to find the `statusFile` is because of a last-minute write into the status file in `AbstractTask#cleanUp`: ```java // No check for whether statusFile exists. if (statusFile != null) { // Writes value into the status file. toolbox.getJsonMapper().writeValue(statusFile, taskStatusToReport); toolbox.getTaskLogPusher().pushTaskStatus(id, statusFile); Files.deleteIfExists(statusFile.toPath()); log.debug("Pushed task status"); } else { log.debug("No task status file exists to push"); } ``` ### Comparison Between MM and MM-less #### Task Creation MM Architecture constructs the peon startup command from `ForkingTaskRunner#run`: ```java // taskDir: var/druid/task/slot0/query-ae23e2ef-283a-4546-a295-68995bd90fe1 final File taskDir = new File(storageSlot.getDirectory(), task.getId()); final String attemptId = String.valueOf(getNextAttemptID(taskDir)); final File attemptDir = Paths.get(taskDir.getAbsolutePath(), "attempt", attemptId).toFile(); // Omitted Code // Path of taskFile, statusFile, logFile, reportsFile configured properly... final File taskFile = new File(taskDir, "task.json"); final File statusFile = new File(attemptDir, "status.json"); final File logFile = new File(taskDir, "log"); final File reportsFile = new File(attemptDir, "report.json"); // Omitted Code command.add("org.apache.druid.cli.Main"); command.add("internal"); command.add("peon"); command.add(taskDir.toString()); command.add(attemptId); // Final command: org.apache.druid.cli.Main internal peon var/druid/task/slot0/query-ae23e2ef-283a-4546-a295-68995bd90fe1 1 ``` MM-less Architecture constructs the peon startup command from `K8sTaskAdapter#generateCommand`. ```java // peon.sh construct command of structure: org.apache.druid.cli.Main internal peon {1} {2} command.add("/peon.sh"); // taskDir: /opt/druid/var/tmp/ command.add(taskConfig.getBaseTaskDir().getAbsolutePath()); command.add("1"); // Final command: org.apache.druid.cli.Main internal peon /opt/druid/var/tmp/ 1 ``` MM-less Atchitecture with PodTemplate constructs the peon startup command via Helm Chart, with environment variables as constructed in `PodTemplateTaskAdapter#getEnv`. ## Solutions 1. A quick solution will be to change `K8sTaskAdapter` to use `getTaskDir` instead of `getBaseTaskDir`. However, implementing this solution will only help clusters that are not using Pod Templates. This is because Pod Templates hard-code the task directory in the Helm Charts: ```yaml containers: - name: main image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}" command: - sh - -c - | /peon.sh /opt/druid/var/tmp/ 1 ``` 2. Change `peon.sh` to configure its startup script to include the taskId. This approach may be messy, since the `peon.sh` script tries to pass all of its input into starting up the Peon Java instance ```sh exec bin/run-java ${JAVA_OPTS} -cp $COMMON_CONF_DIR:$SERVICE_CONF_DIR:lib/*: org.apache.druid.cli.Main internal peon --taskId "${TASK_ID}" "$@" ``` 3. During `CliPeon` initialization, append the `taskId` to `taskDirPath` for task pods started in K8sMode. 1. In the case of SingleContainerTaskAdapter, or MultiContainerTaskAdapter, the `taskId` is passed in these lines under `K8sTaskAdapter#generateCommand`: ```java command.add("--taskId"); command.add(task.getId()); ``` 2. In the case of PodTemplateTaskAdapter, the `taskId` is passed in through environment variables via `PodTemplateTaskAdapter#getEnv`: ```java new EnvVarBuilder() .withName(DruidK8sConstants.TASK_ID_ENV) .withValue(task.getId()) .build(), ``` This solution is the most simple fix with minimal code changes, and will be used for this PR. #### Release note MSQ queries are able to run without 404 error on the web console. You can now view K8s task reports after the tasks exit on the web console. <hr> ##### Key changed/added classes in this PR * `CliPeon` <hr> <!-- Check the items by putting "x" in the brackets for the done things. Not all of these items apply to every PR. Remove the items which are not done or not relevant to the PR. None of the items from the checklist below are strictly necessary, but it would be very helpful if you at least self-review the PR. --> This PR has: - [x] been self-reviewed. - [ ] added documentation for new or modified features or behaviors. - [x] a release note entry in the PR description. - [x] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader. - [ ] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met. - [ ] added integration tests. - [x] been tested in a test Druid cluster. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
