This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch 25.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/25.0.0 by this push:
     new 63780ed08f fix issue with jetty graceful shutdown of data servers when 
druid.serverview.type=http (#13499) (#13515)
63780ed08f is described below

commit 63780ed08f0442824fb8200516ce1151e07e5aee
Author: Clint Wylie <[email protected]>
AuthorDate: Wed Dec 7 20:26:26 2022 -0800

    fix issue with jetty graceful shutdown of data servers when 
druid.serverview.type=http (#13499) (#13515)
    
    * fix issue with http server inventory view blocking data node http server 
shutdown with long polling
    
    * adjust
    
    * fix test inspections
---
 .../org/apache/druid/guice/AnnouncerModule.java    |  2 +-
 .../coordination/BatchDataSegmentAnnouncer.java    |  8 +++++
 .../server/coordination/ChangeRequestHistory.java  | 19 ++++++++++
 .../coordination/ChangeRequestHistoryTest.java     | 41 ++++++++++++++++++++++
 4 files changed, 69 insertions(+), 1 deletion(-)

diff --git a/server/src/main/java/org/apache/druid/guice/AnnouncerModule.java 
b/server/src/main/java/org/apache/druid/guice/AnnouncerModule.java
index 97a4685422..6b0c96641a 100644
--- a/server/src/main/java/org/apache/druid/guice/AnnouncerModule.java
+++ b/server/src/main/java/org/apache/druid/guice/AnnouncerModule.java
@@ -54,7 +54,7 @@ public class AnnouncerModule implements Module
     JsonConfigProvider.bind(binder, "druid.announcer", 
BatchDataSegmentAnnouncerConfig.class);
     JsonConfigProvider.bind(binder, "druid.announcer", 
DataSegmentAnnouncerProvider.class);
     
binder.bind(DataSegmentAnnouncer.class).toProvider(DataSegmentAnnouncerProvider.class);
-    binder.bind(BatchDataSegmentAnnouncer.class).in(LazySingleton.class);
+    
binder.bind(BatchDataSegmentAnnouncer.class).in(ManageLifecycleAnnouncements.class);
 
     if (isZkEnabled) {
       
binder.bind(DataSegmentServerAnnouncer.class).to(CuratorDataSegmentServerAnnouncer.class).in(LazySingleton.class);
diff --git 
a/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java
 
b/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java
index aa03ec6f45..3d766be098 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java
@@ -36,6 +36,7 @@ import org.apache.druid.curator.announcement.Announcer;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.server.initialization.BatchDataSegmentAnnouncerConfig;
 import org.apache.druid.server.initialization.ZkPathsConfig;
@@ -129,6 +130,13 @@ public class BatchDataSegmentAnnouncer implements 
DataSegmentAnnouncer
     this(server, config, zkPaths, () -> announcer, jsonMapper, 
ZkEnablementConfig.ENABLED);
   }
 
+  @LifecycleStop
+  public void stop()
+  {
+    changes.stop();
+  }
+
+
   @Override
   public void announceSegment(DataSegment segment) throws IOException
   {
diff --git 
a/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHistory.java
 
b/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHistory.java
index 65c8dbd624..a0c45b4a33 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHistory.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHistory.java
@@ -32,6 +32,7 @@ import org.apache.druid.utils.CircularBuffer;
 
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
@@ -43,6 +44,7 @@ import java.util.concurrent.ExecutorService;
  *
  * Clients call {@link #getRequestsSince} to get updates since given counter.
  */
+
 public class ChangeRequestHistory<T>
 {
   private static int MAX_SIZE = 1000;
@@ -74,11 +76,24 @@ public class ChangeRequestHistory<T>
     this.singleThreadedExecutor = 
Execs.singleThreaded("SegmentChangeRequestHistory");
   }
 
+  public void stop()
+  {
+    singleThreadedExecutor.shutdownNow();
+    final LinkedHashSet<CustomSettableFuture<?>> futures = new 
LinkedHashSet<>(waitingFutures.keySet());
+    waitingFutures.clear();
+    for (CustomSettableFuture<?> theFuture : futures) {
+      theFuture.setException(new IllegalStateException("Server is shutting 
down."));
+    }
+  }
+
   /**
    * Add batch of segment changes update.
    */
   public synchronized void addChangeRequests(List<T> requests)
   {
+    if (singleThreadedExecutor.isShutdown()) {
+      return;
+    }
     for (T request : requests) {
       changes.add(new Holder<>(request, getLastCounter().inc()));
     }
@@ -108,6 +123,10 @@ public class ChangeRequestHistory<T>
   public synchronized ListenableFuture<ChangeRequestsSnapshot<T>> 
getRequestsSince(final Counter counter)
   {
     final CustomSettableFuture<T> future = new 
CustomSettableFuture<>(waitingFutures);
+    if (singleThreadedExecutor.isShutdown()) {
+      future.setException(new IllegalStateException("Server is shutting 
down."));
+      return future;
+    }
 
     if (counter.counter < 0) {
       future.setException(new IAE("counter[%s] must be >= 0", counter));
diff --git 
a/server/src/test/java/org/apache/druid/server/coordination/ChangeRequestHistoryTest.java
 
b/server/src/test/java/org/apache/druid/server/coordination/ChangeRequestHistoryTest.java
index 82a67b038e..ecba53a8a0 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordination/ChangeRequestHistoryTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordination/ChangeRequestHistoryTest.java
@@ -25,6 +25,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -171,4 +172,44 @@ public class ChangeRequestHistoryTest
     Assert.assertEquals(1, snapshot.getCounter().getCounter());
     Assert.assertEquals(1, snapshot.getRequests().size());
   }
+
+  @Test
+  public void testStop()
+  {
+    final ChangeRequestHistory<DataSegmentChangeRequest> history = new 
ChangeRequestHistory();
+
+    ListenableFuture<ChangeRequestsSnapshot<DataSegmentChangeRequest>> future 
= history.getRequestsSince(
+        ChangeRequestHistory.Counter.ZERO
+    );
+    Assert.assertEquals(1, history.waitingFutures.size());
+
+    final AtomicBoolean callbackExcecuted = new AtomicBoolean(false);
+    Futures.addCallback(
+        future,
+        new FutureCallback<ChangeRequestsSnapshot<DataSegmentChangeRequest>>()
+        {
+          @Override
+          public void onSuccess(ChangeRequestsSnapshot result)
+          {
+            callbackExcecuted.set(true);
+          }
+
+          @Override
+          public void onFailure(Throwable t)
+          {
+            callbackExcecuted.set(true);
+          }
+        }
+    );
+
+    history.stop();
+    // any new change requests should be ignored, there should be no waiting 
futures, and open futures should be resolved
+    history.addChangeRequest(new SegmentChangeRequestNoop());
+    Assert.assertEquals(0, history.waitingFutures.size());
+    Assert.assertTrue(callbackExcecuted.get());
+    Assert.assertTrue(future.isDone());
+
+    Throwable thrown = Assert.assertThrows(ExecutionException.class, 
future::get);
+    Assert.assertEquals("java.lang.IllegalStateException: Server is shutting 
down.", thrown.getMessage());
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to