This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch 25.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/25.0.0 by this push:
new 63780ed08f fix issue with jetty graceful shutdown of data servers when
druid.serverview.type=http (#13499) (#13515)
63780ed08f is described below
commit 63780ed08f0442824fb8200516ce1151e07e5aee
Author: Clint Wylie <[email protected]>
AuthorDate: Wed Dec 7 20:26:26 2022 -0800
fix issue with jetty graceful shutdown of data servers when
druid.serverview.type=http (#13499) (#13515)
* fix issue with http server inventory view blocking data node http server
shutdown with long polling
* adjust
* fix test inspections
---
.../org/apache/druid/guice/AnnouncerModule.java | 2 +-
.../coordination/BatchDataSegmentAnnouncer.java | 8 +++++
.../server/coordination/ChangeRequestHistory.java | 19 ++++++++++
.../coordination/ChangeRequestHistoryTest.java | 41 ++++++++++++++++++++++
4 files changed, 69 insertions(+), 1 deletion(-)
diff --git a/server/src/main/java/org/apache/druid/guice/AnnouncerModule.java
b/server/src/main/java/org/apache/druid/guice/AnnouncerModule.java
index 97a4685422..6b0c96641a 100644
--- a/server/src/main/java/org/apache/druid/guice/AnnouncerModule.java
+++ b/server/src/main/java/org/apache/druid/guice/AnnouncerModule.java
@@ -54,7 +54,7 @@ public class AnnouncerModule implements Module
JsonConfigProvider.bind(binder, "druid.announcer",
BatchDataSegmentAnnouncerConfig.class);
JsonConfigProvider.bind(binder, "druid.announcer",
DataSegmentAnnouncerProvider.class);
binder.bind(DataSegmentAnnouncer.class).toProvider(DataSegmentAnnouncerProvider.class);
- binder.bind(BatchDataSegmentAnnouncer.class).in(LazySingleton.class);
+
binder.bind(BatchDataSegmentAnnouncer.class).in(ManageLifecycleAnnouncements.class);
if (isZkEnabled) {
binder.bind(DataSegmentServerAnnouncer.class).to(CuratorDataSegmentServerAnnouncer.class).in(LazySingleton.class);
diff --git
a/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java
b/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java
index aa03ec6f45..3d766be098 100644
---
a/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java
+++
b/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java
@@ -36,6 +36,7 @@ import org.apache.druid.curator.announcement.Announcer;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.initialization.BatchDataSegmentAnnouncerConfig;
import org.apache.druid.server.initialization.ZkPathsConfig;
@@ -129,6 +130,13 @@ public class BatchDataSegmentAnnouncer implements
DataSegmentAnnouncer
this(server, config, zkPaths, () -> announcer, jsonMapper,
ZkEnablementConfig.ENABLED);
}
+ @LifecycleStop
+ public void stop()
+ {
+ changes.stop();
+ }
+
+
@Override
public void announceSegment(DataSegment segment) throws IOException
{
diff --git
a/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHistory.java
b/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHistory.java
index 65c8dbd624..a0c45b4a33 100644
---
a/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHistory.java
+++
b/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHistory.java
@@ -32,6 +32,7 @@ import org.apache.druid.utils.CircularBuffer;
import java.util.ArrayList;
import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
@@ -43,6 +44,7 @@ import java.util.concurrent.ExecutorService;
*
* Clients call {@link #getRequestsSince} to get updates since given counter.
*/
+
public class ChangeRequestHistory<T>
{
private static int MAX_SIZE = 1000;
@@ -74,11 +76,24 @@ public class ChangeRequestHistory<T>
this.singleThreadedExecutor =
Execs.singleThreaded("SegmentChangeRequestHistory");
}
+ public void stop()
+ {
+ singleThreadedExecutor.shutdownNow();
+ final LinkedHashSet<CustomSettableFuture<?>> futures = new
LinkedHashSet<>(waitingFutures.keySet());
+ waitingFutures.clear();
+ for (CustomSettableFuture<?> theFuture : futures) {
+ theFuture.setException(new IllegalStateException("Server is shutting
down."));
+ }
+ }
+
/**
* Add batch of segment changes update.
*/
public synchronized void addChangeRequests(List<T> requests)
{
+ if (singleThreadedExecutor.isShutdown()) {
+ return;
+ }
for (T request : requests) {
changes.add(new Holder<>(request, getLastCounter().inc()));
}
@@ -108,6 +123,10 @@ public class ChangeRequestHistory<T>
public synchronized ListenableFuture<ChangeRequestsSnapshot<T>>
getRequestsSince(final Counter counter)
{
final CustomSettableFuture<T> future = new
CustomSettableFuture<>(waitingFutures);
+ if (singleThreadedExecutor.isShutdown()) {
+ future.setException(new IllegalStateException("Server is shutting
down."));
+ return future;
+ }
if (counter.counter < 0) {
future.setException(new IAE("counter[%s] must be >= 0", counter));
diff --git
a/server/src/test/java/org/apache/druid/server/coordination/ChangeRequestHistoryTest.java
b/server/src/test/java/org/apache/druid/server/coordination/ChangeRequestHistoryTest.java
index 82a67b038e..ecba53a8a0 100644
---
a/server/src/test/java/org/apache/druid/server/coordination/ChangeRequestHistoryTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordination/ChangeRequestHistoryTest.java
@@ -25,6 +25,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import org.junit.Assert;
import org.junit.Test;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -171,4 +172,44 @@ public class ChangeRequestHistoryTest
Assert.assertEquals(1, snapshot.getCounter().getCounter());
Assert.assertEquals(1, snapshot.getRequests().size());
}
+
+ @Test
+ public void testStop()
+ {
+ final ChangeRequestHistory<DataSegmentChangeRequest> history = new
ChangeRequestHistory();
+
+ ListenableFuture<ChangeRequestsSnapshot<DataSegmentChangeRequest>> future
= history.getRequestsSince(
+ ChangeRequestHistory.Counter.ZERO
+ );
+ Assert.assertEquals(1, history.waitingFutures.size());
+
+ final AtomicBoolean callbackExcecuted = new AtomicBoolean(false);
+ Futures.addCallback(
+ future,
+ new FutureCallback<ChangeRequestsSnapshot<DataSegmentChangeRequest>>()
+ {
+ @Override
+ public void onSuccess(ChangeRequestsSnapshot result)
+ {
+ callbackExcecuted.set(true);
+ }
+
+ @Override
+ public void onFailure(Throwable t)
+ {
+ callbackExcecuted.set(true);
+ }
+ }
+ );
+
+ history.stop();
+ // any new change requests should be ignored, there should be no waiting
futures, and open futures should be resolved
+ history.addChangeRequest(new SegmentChangeRequestNoop());
+ Assert.assertEquals(0, history.waitingFutures.size());
+ Assert.assertTrue(callbackExcecuted.get());
+ Assert.assertTrue(future.isDone());
+
+ Throwable thrown = Assert.assertThrows(ExecutionException.class,
future::get);
+ Assert.assertEquals("java.lang.IllegalStateException: Server is shutting
down.", thrown.getMessage());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]