GWphua opened a new pull request, #18206:
URL: https://github.com/apache/druid/pull/18206

   <!-- Thanks for trying to help us make Apache Druid be the best it can be! 
Please fill out as much of the following information as is possible (where 
relevant, and remove it when irrelevant) to help make the intention and scope 
of this PR clear in order to ease review. -->
   
   <!-- Please read the doc for contribution 
(https://github.com/apache/druid/blob/master/CONTRIBUTING.md) before making 
this PR. Also, once you open a PR, please _avoid using force pushes and 
rebasing_ since these make it difficult for reviewers to see what you've 
changed in response to their reviews. See [the 'If your pull request shows 
conflicts with master' 
section](https://github.com/apache/druid/blob/master/CONTRIBUTING.md#if-your-pull-request-shows-conflicts-with-master)
 for more details. -->
   
   Fixes #18197 
   
   This problem is introduced through this 
[PR](https://github.com/apache/druid/pull/13896) in Druid v26.
   
   ## Problem Statement
   Using the `druid-multi-stage-query` extension with K8s, using the `INSERT` 
or `REPLACE` query will result in a `failed with status code 404` message. 
However, the query is able to succeed uneventfully on a MM architecture. (See 
related issue for screenshots.)
   
   ### Logs
   
   #### Broker Logs
   
   ```
   2025-06-26T09:08:09,757 INFO [qtp1786513714-155] 
org.apache.druid.msq.sql.resources.SqlStatementResource - Query details not 
found for queryId [query-1ed10341-bb20-4086-a1d4-060d2496266a]
   org.apache.druid.rpc.HttpResponseException: Server error [404 Not Found]; 
body: No task reports were found for this task. The task may not exist, or it 
may not have completed yet.
           at 
org.apache.druid.rpc.ServiceClientImpl$1.onSuccess(ServiceClientImpl.java:200) 
~[druid-server-31.0.1-SNAPSHOT.jar:31.0.1-SNAPSHOT]
           at 
org.apache.druid.rpc.ServiceClientImpl$1.onSuccess(ServiceClientImpl.java:182) 
~[druid-server-31.0.1-SNAPSHOT.jar:31.0.1-SNAPSHOT]
           at 
com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1133)
 ~[guava-32.0.1-jre.jar:?]
           at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
 ~[?:?]
           at 
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
           at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
 ~[?:?]
           at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
 ~[?:?]
           at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
 ~[?:?]
           at java.base/java.lang.Thread.run(Thread.java:840) [?:?]
   ```
   
   #### Overlord Logs
   
   ```
   2025-07-01T03:18:16,150 WARN [qtp1510570627-108] 
org.apache.druid.indexing.overlord.http.OverlordResource - Failed to stream 
task reports for task query-be62b874-c86c-4079-8a71-a0867f6953c2
   org.jboss.netty.channel.ChannelException: Faulty channel in resource pool
        at 
org.apache.druid.java.util.http.client.NettyHttpClient.go(NettyHttpClient.java:134)
 ~[druid-processing-31.0.1-SNAPSHOT.jar:31.0.1-SNAPSHOT]
        at 
org.apache.druid.java.util.http.client.CredentialedHttpClient.go(CredentialedHttpClient.java:48)
 ~[druid-processing-31.0.1-SNAPSHOT.jar:31.0.1-SNAPSHOT]
        at 
org.apache.druid.java.util.http.client.AbstractHttpClient.go(AbstractHttpClient.java:33)
 ~[druid-processing-31.0.1-SNAPSHOT.jar:31.0.1-SNAPSHOT]
        at 
org.apache.druid.k8s.overlord.KubernetesTaskRunner.streamTaskReports(KubernetesTaskRunner.java:298)
 ~[?:?]
        at 
org.apache.druid.indexing.common.tasklogs.TaskRunnerTaskLogStreamer.streamTaskReports(TaskRunnerTaskLogStreamer.java:59)
 ~[druid-indexing-service-31.0.1-SNAPSHOT.jar:31.0.1-SNAPSHOT]
        at 
org.apache.druid.indexing.common.tasklogs.SwitchingTaskLogStreamer.streamTaskReports(SwitchingTaskLogStreamer.java:94)
 ~[druid-indexing-service-31.0.1-SNAPSHOT.jar:31.0.1-SNAPSHOT]
        at 
org.apache.druid.indexing.overlord.http.OverlordResource.doGetReports(OverlordResource.java:810)
 ~[druid-indexing-service-31.0.1-SNAPSHOT.jar:31.0.1-SNAPSHOT]
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:?]
        ...
   Caused by: java.net.ConnectException: Connection refused: /XX.XX.XX.XX:8100
        ...
   ```
   
   ## Investigation
   
   ### Root Cause Analysis
   
   1. The query is able to succeed, and the segments are able to be pushed on 
the Historicals.
   2. On the dashboard, the execution details are queried using the api 
`druid/v2/sql/statement/{queryId}`.
   3. The following error log is found on the webpage console: 
`{"error":"druidException","errorCode":"notFound","persona":"USER","category":"NOT_FOUND","errorMessage":"Query
 [query-a72b5075-0dae-48bf-b1b8-7f66c2e070e6] was not found. The query details 
are no longer present or might not be of the type [query_controller]. Verify 
that the id is correct.","context":{}}`
   
   ### Code Dive
   
   #### Looking into Dashboard API Call
   
   1. The dashboard tries to call a GET API to 
`druid/v2/sql/statement/{queryId}?detail=true`.
   2. The API is provided by `SqlStatementResource#doGetStatus`, and proceeds 
into the following steps:
      1. Authorise and Authenticate
      2. Get Statement Status for query through 
`SqlStatementResource#getStatementStatus`
      3. Returns the statement status as an OK `Response` if exists, else build 
and return a non-OK `Response`.
   
   #### Get Statement Status
   
   1. Contact the overlord for the task status with `taskId` = `queryId` at 
`/druid/indexer/v1/tasks/{taskId}/status`.
   
      1. Hits `OverlordResource#getTaskStatus`
   
   2. Contact the overlord for the task report with `taskId` = `queryId` at  
`/druid/indexer/v1/tasks/{taskId}/reports`.
   
      1. Calls `OverlordResource#doGetReports`, which calls 
`SwitchingTaskLogStreamer#streamTaskReports`.
   
         ```java
         @GET
           @Path("/task/{taskid}/reports")
           @Produces(MediaType.APPLICATION_JSON)
           @ResourceFilters(TaskResourceFilter.class)
           public Response doGetReports(
               @PathParam("taskid") final String taskid
           )
           {
             try {
               final Optional<InputStream> stream = 
taskLogStreamer.streamTaskReports(taskid);
               if (stream.isPresent()) {
                 return Response.ok(stream.get()).build();
               } else {
                 return Response.status(Response.Status.NOT_FOUND)
                                .entity(
                                    "No task reports were found for this task. "
                                    + "The task may not exist, or it may not 
have completed yet."
                                )
                                .build();
               }
             }
             catch (Exception e) {
               log.warn(e, "Failed to stream task reports for task %s", taskid);
               return 
Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
             }
           }
         ```
   
      2. `SwitchingTaskLogStreamer` will first query task report from the task 
runner (Kubernetes pods in K8s context), before trying to find task reports in 
deep storage.
   
         ```java
         @Override
           public Optional<InputStream> streamTaskReports(String taskid) throws 
IOException
           {
             IOException deferIOException = null;
         
             try {
               final Optional<InputStream> stream = 
taskRunnerTaskLogStreamer.streamTaskReports(taskid);
               if (stream.isPresent()) {
                 return stream;
               }
             }
             catch (IOException e) {
               // defer first IO exception due to race in the way tasks update 
their exit status in the overlord
               // It may happen that the task sent the report to deep storage 
but the task is still running with http chat handlers unregistered
               // In such a case, catch and ignore the 1st IOException and try 
deepStorage for the report. If the report is still not found, return the caught 
exception
               deferIOException = e;
             }
         
             for (TaskLogStreamer provider : deepStorageStreamers) {
               try {
                 final Optional<InputStream> stream = 
provider.streamTaskReports(taskid);
                 if (stream.isPresent()) {
                   return stream;
                 }
               }
               catch (IOException e) {
                 if (deferIOException != null) {
                   e.addSuppressed(deferIOException);
                 }
                 throw e;
               }
             }
             // Could not find any InputStream. Throw deferred exception if 
exists
             if (deferIOException != null) {
               throw deferIOException;
             }
             return Optional.absent();
           }
         ```
   
      3. Looking into the above Overlord logs, there is a stack trace that 
surfaces after the task completes. This shows us that the Overlord is unable to 
contact the exited Peon pod to find the task report, and then unable to find 
the task report from deep storage.
   
   ## Hypothesis
   
   Given the trace from above, there is reason to believe that the task report 
cannot be found in the Deep Storage. We are now guessing that the code is 
unable to push the task report to HDFS (Deep Storage).
   
   ### Kubernetes Task Reports Not Uploaded Into Deep Storage During Task 
Cleanup
   
   1. In the Kubernetes context, the K8s tries to push task reports, followed 
by task status via `AbstractTask` when cleaning up the task.
   
      ```java
      @Override
      public void cleanUp(TaskToolbox toolbox, @Nullable TaskStatus taskStatus) 
throws Exception
      {
        // clear any interrupted status to ensure subsequent cleanup proceeds 
without interruption.
        Thread.interrupted();
        
        if (!toolbox.getConfig().isEncapsulatedTask()) {
          log.debug("Not pushing task logs and reports from task.");
          return;
        }
        // isEncapsulatedTask() means "isK8sIngestion". Non-K8s ingestions will 
proceed after this line.
        
        // 7 lines of unrelated code block omitted here.
        
        if (reportsFile != null && reportsFile.exists()) {
          toolbox.getTaskLogPusher().pushTaskReports(id, reportsFile);
          log.debug("Pushed task reports");
        } else {
          log.debug("No task reports file exists to push");
        }
      
        if (statusFile != null) {
          toolbox.getJsonMapper().writeValue(statusFile, taskStatusToReport);
          toolbox.getTaskLogPusher().pushTaskStatus(id, statusFile);
          Files.deleteIfExists(statusFile.toPath());
          log.debug("Pushed task status");
        } else {
          log.debug("No task status file exists to push");
        }
      }
      ```
   
   2. Checking the logs, I found that the only the task status is pushed, but 
not the task reports.
   
      ```
      2025-07-01T03:20:35,548 INFO 
[query-be62b874-c86c-4079-8a71-a0867f6953c2-segment-load-waiter-0] 
org.apache.druid.msq.exec.SegmentLoadStatusFetcher - Fetching segment load 
status for datasource[HELLO_WORLD] from broker
      2025-07-01T03:20:40,962 INFO 
[query-be62b874-c86c-4079-8a71-a0867f6953c2-segment-load-waiter-0] 
org.apache.druid.msq.exec.SegmentLoadStatusFetcher - Segment loading completed 
for datasource[HELLO_WORLD]
      2025-07-01T03:20:40,978 INFO [task-runner-0-priority-0] 
org.apache.druid.storage.hdfs.tasklog.HdfsTaskLogs - Writing task status to: 
hdfs://R2/user/druid/k8s/test-benchmarking-jdk21/indexing-logs/query-be62b874-c86c-4079-8a71-a0867f6953c2.status.json
      2025-07-01T03:20:43,850 INFO [task-runner-0-priority-0] 
org.apache.druid.storage.hdfs.tasklog.HdfsTaskLogs - Wrote task status to: 
hdfs://R2/user/druid/k8s/test-benchmarking-jdk21/indexing-logs/query-be62b874-c86c-4079-8a71-a0867f6953c2.status.json
      ```
   
   3. Entering into the bash terminal of the Peon pod, I found that the report 
file exists in `./var/tmp/attempt/1/report.json`
   
   4. Through examination, we note that the AbstractTask is trying to read from 
`./var/tmp/{queryId}/attempt/{attemptId}/report.json` instead.
   
   5. The missing `queryId` in the path is probably a cause for the error.
   
   6. This problem will not happen for MM architecture.
   
   #### Why Task Report is Written to the Wrong Path
   
   1. Under CliPeon, we have binded TaskReportFileWriter to 
SingleFileTaskReportFileWriter. This SingleFileTaskReportFileWriter is 
configured to write to the input `File`.
   
      ```java
      binder.bind(TaskReportFileWriter.class)
            .toInstance(
                new SingleFileTaskReportFileWriter(
                    Paths.get(taskDirPath, "attempt", attemptId, 
"report.json").toFile()
                ));
                
      // The Path given will translate to: 
{taskDirPath}/attempt/{attemptId}/report.json
      ```
   
   2. Under the K8s Model, we have `taskDirPath` as:
   
      ```log
      2025-07-03T09:57:39,395 INFO [main] org.apache.druid.cli.CliPeon - 
Running peon in k8s mode
      2025-07-03T09:57:39,395 INFO [main] org.apache.druid.cli.CliPeon - 
Running peon task in taskDirPath[/opt/druid/var/tmp/] attemptId[1]
      ```
   
   3. Under the MM Model, we have `taskDirPath` as:
   
      ```log
      2025-07-03T09:50:57,522 INFO [main] org.apache.druid.cli.CliPeon - 
Running peon task in 
taskDirPath[var/druid/task/slot0/query-ae23e2ef-283a-4546-a295-68995bd90fe1] 
attemptId[1]
      ```
   
   4. However, the `reportFile` and `statusFile` in `AbstractTask` is 
constructed as such:
   
      ```java
      @Nullable
        public String setup(TaskToolbox toolbox) throws Exception
        {
          if (toolbox.getConfig().isEncapsulatedTask()) {
            // taskDir = {taskDirPath}
            File taskDir = toolbox.getConfig().getTaskDir(getId());
            FileUtils.mkdirp(taskDir);
            // attemptDir = {taskDirPath}/attempt/{attemptId}
            File attemptDir = Paths.get(taskDir.getAbsolutePath(), "attempt", 
toolbox.getAttemptId()).toFile();
            FileUtils.mkdirp(attemptDir);
            // reportsFile = {taskDirPath}/attempt/{attemptId}/report.json
            reportsFile = new File(attemptDir, "report.json");
            // statusFile = {taskDirPath}/attempt/{attemptId}/status.json
            statusFile = new File(attemptDir, "status.json");
                        // Excess code skipped...
        }
      ```
   
   5. Hence, this causes a mismatch between the file path finding.
   
   6. The reason why Druid is able to find the `statusFile` is because of a 
last-minute write into the status file in `AbstractTask#cleanUp`:
   
      ```java
      // No check for whether statusFile exists.
      if (statusFile != null) {
        // Writes value into the status file.
        toolbox.getJsonMapper().writeValue(statusFile, taskStatusToReport);
        toolbox.getTaskLogPusher().pushTaskStatus(id, statusFile);
        Files.deleteIfExists(statusFile.toPath());
        log.debug("Pushed task status");
      } else {
        log.debug("No task status file exists to push");
      }
      ```
   
   ### Comparison Between MM and MM-less
   
   #### Task Creation
   
   MM Architecture constructs the peon startup command from 
`ForkingTaskRunner#run`:
   
   ```java
   // taskDir: var/druid/task/slot0/query-ae23e2ef-283a-4546-a295-68995bd90fe1
   final File taskDir = new File(storageSlot.getDirectory(), task.getId());
   final String attemptId = String.valueOf(getNextAttemptID(taskDir));
   final File attemptDir = Paths.get(taskDir.getAbsolutePath(), "attempt", 
attemptId).toFile();
   // Omitted Code
   
   // Path of taskFile, statusFile, logFile, reportsFile configured properly...
   final File taskFile = new File(taskDir, "task.json");
   final File statusFile = new File(attemptDir, "status.json");
   final File logFile = new File(taskDir, "log");
   final File reportsFile = new File(attemptDir, "report.json");
   
   // Omitted Code
   
   command.add("org.apache.druid.cli.Main");
   command.add("internal");
   command.add("peon");
   command.add(taskDir.toString());
   command.add(attemptId);
   
   // Final command: org.apache.druid.cli.Main internal peon 
var/druid/task/slot0/query-ae23e2ef-283a-4546-a295-68995bd90fe1 1
   ```
   
   MM-less Architecture constructs the peon startup command from 
`K8sTaskAdapter#generateCommand`.
   
   ```java
   // peon.sh construct command of structure: org.apache.druid.cli.Main 
internal peon {1} {2} 
   command.add("/peon.sh");
   // taskDir: /opt/druid/var/tmp/
   command.add(taskConfig.getBaseTaskDir().getAbsolutePath());
   command.add("1");
   
   // Final command: org.apache.druid.cli.Main internal peon 
/opt/druid/var/tmp/ 1
   ```
   
   MM-less Atchitecture with PodTemplate constructs the peon startup command 
via Helm Chart, with environment variables as constructed in 
`PodTemplateTaskAdapter#getEnv`.
   
   ## Solutions
   
   1. A quick solution will be to change `K8sTaskAdapter` to use `getTaskDir` 
instead of `getBaseTaskDir`.
   
      However, implementing this solution will only help clusters that are not 
using Pod Templates. This is because Pod Templates hard-code the task directory 
in the Helm Charts:
   
      ```yaml
      containers:
        - name: main
          image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}"
          command:
            - sh
            - -c
            - |
              /peon.sh /opt/druid/var/tmp/ 1
      ```
   
   2. Change `peon.sh` to configure its startup script to include the taskId.
   
      This approach may be messy, since the `peon.sh` script tries to pass all 
of its input into starting up the Peon Java instance
   
      ```sh
      exec bin/run-java ${JAVA_OPTS} -cp 
$COMMON_CONF_DIR:$SERVICE_CONF_DIR:lib/*: org.apache.druid.cli.Main internal 
peon --taskId "${TASK_ID}" "$@"
      ```
   
   3. During `CliPeon` initialization, append the `taskId` to `taskDirPath` for 
task pods started in K8sMode.
   
      1. In the case of SingleContainerTaskAdapter, or 
MultiContainerTaskAdapter, the `taskId` is passed in these lines under 
`K8sTaskAdapter#generateCommand`:
   
         ```java
         command.add("--taskId");
         command.add(task.getId());
         ```
   
      2. In the case of PodTemplateTaskAdapter, the `taskId` is passed in 
through environment variables via `PodTemplateTaskAdapter#getEnv`:
   
         ```java 
         new EnvVarBuilder()
                     .withName(DruidK8sConstants.TASK_ID_ENV)
                     .withValue(task.getId())
                     .build(),
         ```
   
      This solution is the most simple fix with minimal code changes, and will 
be used for this PR.
   
   
   
   #### Release note
   MSQ queries are able to run without 404 error on the web console.
   You can now view K8s task reports after the tasks exit on the web console.
   
   <hr>
   
   ##### Key changed/added classes in this PR
    * `CliPeon`
   
   <hr>
   
   <!-- Check the items by putting "x" in the brackets for the done things. Not 
all of these items apply to every PR. Remove the items which are not done or 
not relevant to the PR. None of the items from the checklist below are strictly 
necessary, but it would be very helpful if you at least self-review the PR. -->
   
   This PR has:
   
   - [x] been self-reviewed.
   - [ ] added documentation for new or modified features or behaviors.
   - [x] a release note entry in the PR description.
   - [x] added comments explaining the "why" and the intent of the code 
wherever would not be obvious for an unfamiliar reader.
   - [ ] added unit tests or modified existing tests to cover new code paths, 
ensuring the threshold for [code 
coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md)
 is met.
   - [ ] added integration tests.
   - [x] been tested in a test Druid cluster.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to