This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 15d27f340d6 Fix fetch of task location in SpecificTaskServiceLocator 
(#16462)
15d27f340d6 is described below

commit 15d27f340d68c9eb0a9d8e917f363b2c663035ce
Author: Kashif Faraz <[email protected]>
AuthorDate: Mon May 20 12:35:04 2024 +0530

    Fix fetch of task location in SpecificTaskServiceLocator (#16462)
    
    * Fix fetch of task location in SpecificTaskServiceLocator
    
    * Resolve future if exception occurs while invoking API
    
    * Remove unused import
---
 .../rpc/indexing/SpecificTaskServiceLocator.java   | 160 ++++++++++++++-------
 1 file changed, 105 insertions(+), 55 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/rpc/indexing/SpecificTaskServiceLocator.java
 
b/server/src/main/java/org/apache/druid/rpc/indexing/SpecificTaskServiceLocator.java
index 3f5441318a5..689cbdce307 100644
--- 
a/server/src/main/java/org/apache/druid/rpc/indexing/SpecificTaskServiceLocator.java
+++ 
b/server/src/main/java/org/apache/druid/rpc/indexing/SpecificTaskServiceLocator.java
@@ -23,14 +23,13 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.errorprone.annotations.concurrent.GuardedBy;
 import org.apache.druid.client.indexing.TaskStatusResponse;
-import org.apache.druid.common.guava.FutureUtils;
 import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.TaskStatusPlus;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.rpc.ServiceLocation;
@@ -57,7 +56,6 @@ public class SpecificTaskServiceLocator implements 
ServiceLocator
 
   private final String taskId;
   private final OverlordClient overlordClient;
-  private final TaskLocationFetcher locationFetcher = new 
TaskLocationFetcher();
   private final Object lock = new Object();
 
   @GuardedBy("lock")
@@ -125,42 +123,15 @@ public class SpecificTaskServiceLocator implements 
ServiceLocator
                     lastUpdateTime = System.currentTimeMillis();
 
                     final TaskStatus status = taskStatusMap.get(taskId);
-
                     if (status == null) {
                       // If the task status is unknown, we'll treat it as 
closed.
-                      lastKnownState = null;
-                      lastKnownLocation = null;
+                      resolvePendingFuture(null, null);
+                    } else if 
(TaskLocation.unknown().equals(status.getLocation())) {
+                      // Do not resolve the future just yet, try the fallback 
API instead
+                      fetchFallbackTaskLocation();
                     } else {
-                      lastKnownState = status.getStatusCode();
-                      final TaskLocation location;
-                      if (TaskLocation.unknown().equals(status.getLocation())) 
{
-                        location = locationFetcher.getLocation();
-                      } else {
-                        location = status.getLocation();
-                      }
-
-                      if (TaskLocation.unknown().equals(location)) {
-                        lastKnownLocation = null;
-                      } else {
-                        lastKnownLocation = new ServiceLocation(
-                            location.getHost(),
-                            location.getPort(),
-                            location.getTlsPort(),
-                            StringUtils.format("%s/%s", BASE_PATH, 
StringUtils.urlEncode(taskId))
-                        );
-                      }
+                      resolvePendingFuture(status.getStatusCode(), 
status.getLocation());
                     }
-
-                    if (lastKnownState != TaskState.RUNNING) {
-                      pendingFuture.set(ServiceLocations.closed());
-                    } else if (lastKnownLocation == null) {
-                      
pendingFuture.set(ServiceLocations.forLocations(Collections.emptySet()));
-                    } else {
-                      
pendingFuture.set(ServiceLocations.forLocation(lastKnownLocation));
-                    }
-
-                    // Clear pendingFuture once it has been set.
-                    pendingFuture = null;
                   }
                 }
               }
@@ -168,17 +139,10 @@ public class SpecificTaskServiceLocator implements 
ServiceLocator
               @Override
               public void onFailure(Throwable t)
               {
-                synchronized (lock) {
-                  if (pendingFuture != null) {
-                    pendingFuture.setException(t);
-
-                    // Clear pendingFuture once it has been set.
-                    pendingFuture = null;
-                  }
-                }
+                resolvePendingFutureOnException(t);
               }
             },
-            MoreExecutors.directExecutor()
+            Execs.directExecutor()
         );
 
         return Futures.nonCancellationPropagating(retVal);
@@ -209,18 +173,104 @@ public class SpecificTaskServiceLocator implements 
ServiceLocator
     }
   }
 
-  private class TaskLocationFetcher
+  private void resolvePendingFuture(TaskState state, TaskLocation location)
   {
-    TaskLocation getLocation()
-    {
-      final TaskStatusResponse statusResponse = FutureUtils.getUnchecked(
-          overlordClient.taskStatus(taskId),
-          true
-      );
-      if (statusResponse == null || statusResponse.getStatus() == null) {
-        return TaskLocation.unknown();
-      } else {
-        return statusResponse.getStatus().getLocation();
+    synchronized (lock) {
+      if (pendingFuture != null) {
+        lastKnownState = state;
+        lastKnownLocation = location == null ? null : new ServiceLocation(
+            location.getHost(),
+            location.getPort(),
+            location.getTlsPort(),
+            StringUtils.format("%s/%s", BASE_PATH, 
StringUtils.urlEncode(taskId))
+        );
+
+        if (lastKnownState != TaskState.RUNNING) {
+          pendingFuture.set(ServiceLocations.closed());
+        } else if (lastKnownLocation == null) {
+          
pendingFuture.set(ServiceLocations.forLocations(Collections.emptySet()));
+        } else {
+          pendingFuture.set(ServiceLocations.forLocation(lastKnownLocation));
+        }
+
+        // Clear pendingFuture once it has been set.
+        pendingFuture = null;
+      }
+    }
+  }
+
+  private void resolvePendingFutureOnException(Throwable t)
+  {
+    synchronized (lock) {
+      if (pendingFuture != null) {
+        pendingFuture.setException(t);
+
+        // Clear pendingFuture once it has been set.
+        pendingFuture = null;
+      }
+    }
+  }
+
+  /**
+   * Invokes the single task status API {@link OverlordClient#taskStatus} if 
the
+   * multi-task status API returns an unknown location (this can happen if the
+   * Overlord is running on a version older than Druid 30.0.0 (pre #15724)).
+   */
+  private void fetchFallbackTaskLocation()
+  {
+    synchronized (lock) {
+      if (pendingFuture != null) {
+        final ListenableFuture<TaskStatusResponse> taskStatusFuture;
+        try {
+          taskStatusFuture = overlordClient.taskStatus(taskId);
+        }
+        catch (Exception e) {
+          resolvePendingFutureOnException(e);
+          return;
+        }
+
+        pendingFuture.addListener(
+            () -> {
+              if (!taskStatusFuture.isDone()) {
+                // pendingFuture may resolve without taskStatusFuture due to 
close().
+                taskStatusFuture.cancel(true);
+              }
+            },
+            Execs.directExecutor()
+        );
+
+        Futures.addCallback(
+            taskStatusFuture,
+            new FutureCallback<TaskStatusResponse>()
+            {
+              @Override
+              public void onSuccess(final TaskStatusResponse 
taskStatusResponse)
+              {
+                synchronized (lock) {
+                  if (pendingFuture != null) {
+                    lastUpdateTime = System.currentTimeMillis();
+
+                    final TaskStatusPlus status = 
taskStatusResponse.getStatus();
+                    if (status == null) {
+                      // If the task status is unknown, we'll treat it as 
closed.
+                      resolvePendingFuture(null, null);
+                    } else if 
(TaskLocation.unknown().equals(status.getLocation())) {
+                      resolvePendingFuture(status.getStatusCode(), null);
+                    } else {
+                      resolvePendingFuture(status.getStatusCode(), 
status.getLocation());
+                    }
+                  }
+                }
+              }
+
+              @Override
+              public void onFailure(Throwable t)
+              {
+                resolvePendingFutureOnException(t);
+              }
+            },
+            Execs.directExecutor()
+        );
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to