gianm commented on a change in pull request #12006:
URL: https://github.com/apache/druid/pull/12006#discussion_r784284997
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
##########
@@ -652,37 +652,50 @@ public InputStream openStream() throws IOException
if (zkWorker == null) {
// Worker is not running this task, it might be available in deep storage
return Optional.absent();
- } else {
- TaskLocation taskLocation = runningTasks.get(taskId).getLocation();
- final URL url = TaskRunnerUtils.makeTaskLocationURL(
- taskLocation,
- "/druid/worker/v1/chat/%s/liveReports",
- taskId
- );
- return Optional.of(
- new ByteSource()
+ }
+
+ final RemoteTaskRunnerWorkItem runningWorkItem = runningTasks.get(taskId);
+
+ if (runningWorkItem == null) {
+ // Worker very recently exited.
+ return Optional.absent();
+ }
+
+ final TaskLocation taskLocation = runningWorkItem.getLocation();
+
+ if (TaskLocation.unknown().equals(taskLocation)) {
+ // No location known for this task. It may have not been assigned one
yet.
+ return Optional.absent();
+ }
+
+ final URL url = TaskRunnerUtils.makeTaskLocationURL(
+ taskLocation,
+ "/druid/worker/v1/chat/%s/liveReports",
+ taskId
+ );
+ return Optional.of(
+ new ByteSource()
+ {
+ @Override
+ public InputStream openStream() throws IOException
{
- @Override
- public InputStream openStream() throws IOException
- {
- try {
- return httpClient.go(
- new Request(HttpMethod.GET, url),
- new InputStreamResponseHandler()
- ).get();
- }
- catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- catch (ExecutionException e) {
- // Unwrap if possible
- Throwables.propagateIfPossible(e.getCause(),
IOException.class);
- throw new RuntimeException(e);
- }
+ try {
+ return httpClient.go(
+ new Request(HttpMethod.GET, url),
+ new InputStreamResponseHandler()
+ ).get();
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ catch (ExecutionException e) {
+ // Unwrap if possible
+ Throwables.propagateIfPossible(e.getCause(), IOException.class);
+ throw new RuntimeException(e);
Review comment:
Hmm, it's awkward to do that with the current ByteSource-based API,
since we have to create the Optional before we try to open the stream. But it
could be addressed by changing the API to InputStream instead of ByteSource.
Then, if we get a 404 we can return an empty Optional. This refactor touches a
bunch more files, though, so I will have to come back to this at a later date.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]