This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b23d0b5b52 MSQ: Controller checker should check for "closed" only. 
(#16161)
2b23d0b5b52 is described below

commit 2b23d0b5b52f1184e1642c5b5b15af4f4aa216ad
Author: Gian Merlino <[email protected]>
AuthorDate: Tue Mar 19 19:25:48 2024 -0700

    MSQ: Controller checker should check for "closed" only. (#16161)
    
    * MSQ: Controller checker should check for "closed" only.
    
    Currently, the worker's controller checker will exit the worker if
    the controller location is "closed" (no longer running) or if its location
    is empty (i.e. location unknown).
    
    This patch changes to only exit on "closed". We shouldn't exit on empty
    location, because that may happen if the Overlord is slow to acknowledge the
    location of a task.
    
    * Fix test.
---
 .../apache/druid/msq/indexing/IndexerWorkerContext.java |  4 +++-
 .../druid/msq/indexing/IndexerWorkerContextTest.java    | 17 ++---------------
 2 files changed, 5 insertions(+), 16 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
index 53cd6e942ea..1bd789df769 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
@@ -190,7 +190,9 @@ public class IndexerWorkerContext implements WorkerContext
         break;
       }
 
-      if (controllerLocations.isClosed() || 
controllerLocations.getLocations().isEmpty()) {
+      // Note: don't exit on empty location, because that may happen if the 
Overlord is slow to acknowledge the
+      // location of a task. Only exit on "closed", because that happens only 
if the task is really no longer running.
+      if (controllerLocations.isClosed()) {
         log.warn(
             "Periodic fetch of controller location returned [%s]. Worker task 
[%s] will exit.",
             controllerLocations,
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerWorkerContextTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerWorkerContextTest.java
index 2ae8d155d4d..583c21d3407 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerWorkerContextTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerWorkerContextTest.java
@@ -54,20 +54,6 @@ public class IndexerWorkerContextTest
     );
   }
 
-  @Test
-  public void testControllerCheckerRunnableExitsWhenEmptyStatus()
-  {
-    final ServiceLocator controllerLocatorMock = 
Mockito.mock(ServiceLocator.class);
-    Mockito.when(controllerLocatorMock.locate())
-           
.thenReturn(Futures.immediateFuture(ServiceLocations.forLocations(Collections.emptySet())));
-
-    final Worker workerMock = Mockito.mock(Worker.class);
-
-    indexerWorkerContext.controllerCheckerRunnable(controllerLocatorMock, 
workerMock);
-    Mockito.verify(controllerLocatorMock, Mockito.times(1)).locate();
-    Mockito.verify(workerMock, Mockito.times(1)).controllerFailed();
-  }
-
   @Test
   public void testControllerCheckerRunnableExitsOnlyWhenClosedStatus()
   {
@@ -76,12 +62,13 @@ public class IndexerWorkerContextTest
            
.thenReturn(Futures.immediateFuture(ServiceLocations.forLocation(new 
ServiceLocation("h", 1, -1, "/"))))
            // Done to check the behavior of the runnable, the situation of 
exiting after success might not occur actually
            
.thenReturn(Futures.immediateFuture(ServiceLocations.forLocation(new 
ServiceLocation("h", 1, -1, "/"))))
+           
.thenReturn(Futures.immediateFuture(ServiceLocations.forLocations(Collections.emptySet())))
            .thenReturn(Futures.immediateFuture(ServiceLocations.closed()));
 
     final Worker workerMock = Mockito.mock(Worker.class);
 
     indexerWorkerContext.controllerCheckerRunnable(controllerLocatorMock, 
workerMock);
-    Mockito.verify(controllerLocatorMock, Mockito.times(3)).locate();
+    Mockito.verify(controllerLocatorMock, Mockito.times(4)).locate();
     Mockito.verify(workerMock, Mockito.times(1)).controllerFailed();
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to